You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by al...@apache.org on 2022/01/23 08:48:46 UTC

[dubbo] branch 3.0 updated: Refactor the lazy connect client reconnection logic (#9507)

This is an automated email from the ASF dual-hosted git repository.

albumenj pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new d1cd33b  Refactor the lazy connect client reconnection logic (#9507)
d1cd33b is described below

commit d1cd33bee80df038221b787a8f4590fe2d16e9fe
Author: huazhongming <cr...@gmail.com>
AuthorDate: Sun Jan 23 16:48:37 2022 +0800

    Refactor the lazy connect client reconnection logic (#9507)
    
    * Support send.reconnect configurable
    Support lazy connect client automatic reconnection
    
    * fix import
    
    * fix import
    
    * fix ut
---
 .../apache/dubbo/remoting/exchange/Exchangers.java |  1 -
 .../support/header/HeaderExchangeClient.java       |  2 +-
 .../dubbo/remoting/transport/AbstractClient.java   | 38 ++++++++++++-------
 .../apache/dubbo/rpc/protocol/dubbo/Constants.java | 12 ------
 .../protocol/dubbo/LazyConnectExchangeClient.java  | 43 ++++++++++++----------
 .../dubbo/ReferenceCountExchangeClient.java        | 16 ++------
 .../protocol/dubbo/DubboInvokerAvailableTest.java  |  9 ++---
 .../rpc/protocol/dubbo/DubboLazyConnectTest.java   |  2 +-
 8 files changed, 58 insertions(+), 65 deletions(-)

diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/Exchangers.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/Exchangers.java
index 63a0ece..2d181dd 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/Exchangers.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/Exchangers.java
@@ -104,7 +104,6 @@ public class Exchangers {
         if (handler == null) {
             throw new IllegalArgumentException("handler == null");
         }
-//        url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
         return getExchanger(url).connect(url, handler);
     }
 
diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeClient.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeClient.java
index 81df865..b25951a 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeClient.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeClient.java
@@ -53,7 +53,7 @@ public class HeaderExchangeClient implements ExchangeClient {
     public static GlobalResourceInitializer<HashedWheelTimer> IDLE_CHECK_TIMER = new GlobalResourceInitializer<>(() ->
         new HashedWheelTimer(new NamedThreadFactory("dubbo-client-idleCheck", true), 1,
             TimeUnit.SECONDS, TICKS_PER_WHEEL),
-        timer -> timer.stop());
+        HashedWheelTimer::stop);
 
     private Timeout reconnectTimer;
     private Timeout heartBeatTimer;
diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractClient.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractClient.java
index ff70f09..8157775 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractClient.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractClient.java
@@ -37,6 +37,7 @@ import java.util.concurrent.locks.ReentrantLock;
 import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_CLIENT_THREADPOOL;
 import static org.apache.dubbo.common.constants.CommonConstants.THREADPOOL_KEY;
 import static org.apache.dubbo.common.constants.CommonConstants.THREAD_NAME_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.LAZY_CONNECT_KEY;
 
 /**
  * AbstractClient
@@ -48,7 +49,8 @@ public abstract class AbstractClient extends AbstractEndpoint implements Client
     private final Lock connectLock = new ReentrantLock();
     private final boolean needReconnect;
     protected volatile ExecutorService executor;
-    private ExecutorRepository executorRepository;
+    private final ExecutorRepository executorRepository;
+
 
     public AbstractClient(URL url, ChannelHandler handler) throws RemotingException {
         super(url, handler);
@@ -63,8 +65,8 @@ public abstract class AbstractClient extends AbstractEndpoint implements Client
         } catch (Throwable t) {
             close();
             throw new RemotingException(url.toInetSocketAddress(), null,
-                    "Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
-                            + " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t);
+                "Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
+                    + " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t);
         }
 
         try {
@@ -74,18 +76,28 @@ public abstract class AbstractClient extends AbstractEndpoint implements Client
                 logger.info("Start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress() + " connect to the server " + getRemoteAddress());
             }
         } catch (RemotingException t) {
+            // If lazy connect client fails to establish a connection, the client instance will still be created,
+            // and the reconnection will be initiated by ReconnectTask, so there is no need to throw an exception
+            if (url.getParameter(LAZY_CONNECT_KEY, false)) {
+                logger.warn("Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress() +
+                    " connect to the server " + getRemoteAddress() +
+                    " (the connection request is initiated by lazy connect client, ignore and retry later!), cause: " +
+                    t.getMessage(), t);
+                return;
+            }
+
             if (url.getParameter(Constants.CHECK_KEY, true)) {
                 close();
                 throw t;
             } else {
                 logger.warn("Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
-                        + " connect to the server " + getRemoteAddress() + " (check == false, ignore and retry later!), cause: " + t.getMessage(), t);
+                    + " connect to the server " + getRemoteAddress() + " (check == false, ignore and retry later!), cause: " + t.getMessage(), t);
             }
         } catch (Throwable t) {
             close();
             throw new RemotingException(url.toInetSocketAddress(), null,
-                    "Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
-                            + " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t);
+                "Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
+                    + " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t);
         }
     }
 
@@ -195,7 +207,7 @@ public abstract class AbstractClient extends AbstractEndpoint implements Client
 
             if (isClosed() || isClosing()) {
                 logger.warn("No need to connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " "
-                        + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion() + ", cause: client status is closed or closing.");
+                    + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion() + ", cause: client status is closed or closing.");
                 return;
             }
 
@@ -203,14 +215,14 @@ public abstract class AbstractClient extends AbstractEndpoint implements Client
 
             if (!isConnected()) {
                 throw new RemotingException(this, "Failed to connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " "
-                        + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()
-                        + ", cause: Connect wait timeout: " + getConnectTimeout() + "ms.");
+                    + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()
+                    + ", cause: Connect wait timeout: " + getConnectTimeout() + "ms.");
 
             } else {
                 if (logger.isInfoEnabled()) {
                     logger.info("Successfully connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " "
-                            + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()
-                            + ", channel is " + this.getChannel());
+                        + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()
+                        + ", channel is " + this.getChannel());
                 }
             }
 
@@ -219,8 +231,8 @@ public abstract class AbstractClient extends AbstractEndpoint implements Client
 
         } catch (Throwable e) {
             throw new RemotingException(this, "Failed to connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " "
-                    + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()
-                    + ", cause: " + e.getMessage(), e);
+                + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()
+                + ", cause: " + e.getMessage(), e);
 
         } finally {
             connectLock.unlock();
diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/Constants.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/Constants.java
index 9ce502f..f16dd5c 100644
--- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/Constants.java
+++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/Constants.java
@@ -49,18 +49,6 @@ public interface Constants {
     String CHANNEL_CALLBACK_KEY = "channel.callback.invokers.key";
 
     /**
-     * The initial state for lazy connection
-     */
-    String LAZY_CONNECT_INITIAL_STATE_KEY = "connect.lazy.initial.state";
-
-    /**
-     * The default value of lazy connection's initial state: true
-     *
-     * @see #LAZY_CONNECT_INITIAL_STATE_KEY
-     */
-    boolean DEFAULT_LAZY_CONNECT_INITIAL_STATE = true;
-
-    /**
      * when this warning rises from invocation, program probably have a bug.
      */
     String LAZY_REQUEST_WITH_WARNING_KEY = "lazyclient_request_with_warning";
diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/LazyConnectExchangeClient.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/LazyConnectExchangeClient.java
index 87fba37..fae3bf7 100644
--- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/LazyConnectExchangeClient.java
+++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/LazyConnectExchangeClient.java
@@ -26,6 +26,7 @@ import org.apache.dubbo.remoting.RemotingException;
 import org.apache.dubbo.remoting.exchange.ExchangeClient;
 import org.apache.dubbo.remoting.exchange.ExchangeHandler;
 import org.apache.dubbo.remoting.exchange.Exchangers;
+import org.apache.dubbo.rpc.RpcException;
 
 import java.net.InetSocketAddress;
 import java.util.concurrent.CompletableFuture;
@@ -34,10 +35,9 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
+import static org.apache.dubbo.common.constants.CommonConstants.LAZY_CONNECT_KEY;
 import static org.apache.dubbo.remoting.Constants.SEND_RECONNECT_KEY;
-import static org.apache.dubbo.rpc.protocol.dubbo.Constants.DEFAULT_LAZY_CONNECT_INITIAL_STATE;
 import static org.apache.dubbo.rpc.protocol.dubbo.Constants.DEFAULT_LAZY_REQUEST_WITH_WARNING;
-import static org.apache.dubbo.rpc.protocol.dubbo.Constants.LAZY_CONNECT_INITIAL_STATE_KEY;
 import static org.apache.dubbo.rpc.protocol.dubbo.Constants.LAZY_REQUEST_WITH_WARNING_KEY;
 
 /**
@@ -47,23 +47,20 @@ import static org.apache.dubbo.rpc.protocol.dubbo.Constants.LAZY_REQUEST_WITH_WA
 final class LazyConnectExchangeClient implements ExchangeClient {
 
     private final static Logger logger = LoggerFactory.getLogger(LazyConnectExchangeClient.class);
-    protected final boolean requestWithWarning;
+    private final boolean requestWithWarning;
     private final URL url;
     private final ExchangeHandler requestHandler;
     private final Lock connectLock = new ReentrantLock();
     private final int warningPeriod = 5000;
-    /**
-     * lazy connect, initial state for connection
-     */
-    private final boolean initialState;
+    private final boolean needReconnect;
     private volatile ExchangeClient client;
     private final AtomicLong warningCount = new AtomicLong(0);
 
     public LazyConnectExchangeClient(URL url, ExchangeHandler requestHandler) {
         // lazy connect, need set send.reconnect = true, to avoid channel bad status.
-        this.url = url.addParameter(SEND_RECONNECT_KEY, Boolean.TRUE.toString());
+        this.url = url.addParameter(LAZY_CONNECT_KEY, true);
+        this.needReconnect = url.getParameter(SEND_RECONNECT_KEY, false);
         this.requestHandler = requestHandler;
-        this.initialState = url.getParameter(LAZY_CONNECT_INITIAL_STATE_KEY, DEFAULT_LAZY_CONNECT_INITIAL_STATE);
         this.requestWithWarning = url.getParameter(LAZY_REQUEST_WITH_WARNING_KEY, DEFAULT_LAZY_REQUEST_WITH_WARNING);
     }
 
@@ -88,7 +85,7 @@ final class LazyConnectExchangeClient implements ExchangeClient {
     @Override
     public CompletableFuture<Object> request(Object request) throws RemotingException {
         warning();
-        initClient();
+        checkClient();
         return client.request(request);
     }
 
@@ -109,21 +106,21 @@ final class LazyConnectExchangeClient implements ExchangeClient {
     @Override
     public CompletableFuture<Object> request(Object request, int timeout) throws RemotingException {
         warning();
-        initClient();
+        checkClient();
         return client.request(request, timeout);
     }
 
     @Override
     public CompletableFuture<Object> request(Object request, ExecutorService executor) throws RemotingException {
         warning();
-        initClient();
+        checkClient();
         return client.request(request, executor);
     }
 
     @Override
     public CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) throws RemotingException {
         warning();
-        initClient();
+        checkClient();
         return client.request(request, timeout, executor);
     }
 
@@ -148,7 +145,9 @@ final class LazyConnectExchangeClient implements ExchangeClient {
     @Override
     public boolean isConnected() {
         if (client == null) {
-            return initialState;
+            // Before the request arrives, LazyConnectExchangeClient always exists in a normal connection state
+            // to prevent ReconnectTask from initiating a reconnection action.
+            return true;
         } else {
             return client.isConnected();
         }
@@ -170,13 +169,13 @@ final class LazyConnectExchangeClient implements ExchangeClient {
 
     @Override
     public void send(Object message) throws RemotingException {
-        initClient();
+        checkClient();
         client.send(message);
     }
 
     @Override
     public void send(Object message, boolean sent) throws RemotingException {
-        initClient();
+        checkClient();
         client.send(message, sent);
     }
 
@@ -261,9 +260,15 @@ final class LazyConnectExchangeClient implements ExchangeClient {
     }
 
     private void checkClient() {
-        if (client == null) {
-            throw new IllegalStateException(
-                    "LazyConnectExchangeClient state error. the client has not be init .url:" + url);
+        try {
+            initClient();
+        } catch (Exception e) {
+            throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);
+        }
+
+        if (!isConnected() && !needReconnect) {
+            throw new IllegalStateException("LazyConnectExchangeClient is not connected normally, " +
+                "and send.reconnect is configured as false, the request fails quickly" + url);
         }
     }
 }
diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/ReferenceCountExchangeClient.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/ReferenceCountExchangeClient.java
index caca77d..2a904b0 100644
--- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/ReferenceCountExchangeClient.java
+++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/ReferenceCountExchangeClient.java
@@ -32,8 +32,6 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_SERVER_SHUTDOWN_TIMEOUT;
-import static org.apache.dubbo.remoting.Constants.SEND_RECONNECT_KEY;
-import static org.apache.dubbo.rpc.protocol.dubbo.Constants.LAZY_CONNECT_INITIAL_STATE_KEY;
 
 /**
  * dubbo protocol support class.
@@ -170,9 +168,9 @@ final class ReferenceCountExchangeClient implements ExchangeClient {
 
     /**
      * when destroy unused invoker, closeAll should be true
-     * 
+     *
      * @param timeout
-     * @param closeAll  
+     * @param closeAll
      */
     private void closeInternal(int timeout, boolean closeAll) {
         if (closeAll || referenceCount.decrementAndGet() <= 0) {
@@ -208,13 +206,7 @@ final class ReferenceCountExchangeClient implements ExchangeClient {
          * the order of judgment in the if statement cannot be changed.
          */
         if (!(client instanceof LazyConnectExchangeClient)) {
-            // this is a defensive operation to avoid client is closed by accident, the initial state of the client is false
-            URL lazyUrl = url.addParameter(LAZY_CONNECT_INITIAL_STATE_KEY, Boolean.TRUE)
-                //.addParameter(RECONNECT_KEY, Boolean.FALSE)
-                .addParameter(SEND_RECONNECT_KEY, Boolean.TRUE.toString());
-            //.addParameter(LazyConnectExchangeClient.REQUEST_WITH_WARNING_KEY, true);
-
-            client = new LazyConnectExchangeClient(lazyUrl, client.getExchangeHandler());
+            client = new LazyConnectExchangeClient(url, client.getExchangeHandler());
         }
     }
 
@@ -230,7 +222,7 @@ final class ReferenceCountExchangeClient implements ExchangeClient {
         referenceCount.incrementAndGet();
     }
 
-    public int getCount(){
+    public int getCount() {
         return referenceCount.get();
     }
 
diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/DubboInvokerAvailableTest.java b/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/DubboInvokerAvailableTest.java
index 1ac3063..1897f4b 100644
--- a/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/DubboInvokerAvailableTest.java
+++ b/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/DubboInvokerAvailableTest.java
@@ -139,16 +139,13 @@ public class DubboInvokerAvailableTest {
 
         ExchangeClient exchangeClient = getClients((DubboInvoker<?>) invoker)[0];
         Assertions.assertFalse(exchangeClient.isClosed());
-        try {
-            exchangeClient.setAttribute(Constants.CHANNEL_ATTRIBUTE_READONLY_KEY, Boolean.TRUE);
-            fail();
-        } catch (IllegalStateException e) {
-
-        }
+        exchangeClient.setAttribute(Constants.CHANNEL_ATTRIBUTE_READONLY_KEY, Boolean.TRUE);
         //invoke method --> init client
         IDemoService service = (IDemoService) proxy.getProxy(invoker);
         Assertions.assertEquals("ok", service.get());
+        Assertions.assertFalse(invoker.isAvailable());
 
+        exchangeClient.removeAttribute(Constants.CHANNEL_ATTRIBUTE_READONLY_KEY);
         Assertions.assertTrue(invoker.isAvailable());
         exchangeClient.setAttribute(Constants.CHANNEL_ATTRIBUTE_READONLY_KEY, Boolean.TRUE);
         Assertions.assertFalse(invoker.isAvailable());
diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/DubboLazyConnectTest.java b/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/DubboLazyConnectTest.java
index 35264ac..eac5132 100644
--- a/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/DubboLazyConnectTest.java
+++ b/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/DubboLazyConnectTest.java
@@ -66,7 +66,7 @@ public class DubboLazyConnectTest {
 
     @Test
     public void testSticky3() {
-        Assertions.assertThrows(RpcException.class, () -> {
+        Assertions.assertThrows(IllegalStateException.class, () -> {
             int port = NetUtils.getAvailablePort();
             URL url = URL.valueOf("dubbo://127.0.0.1:" + port + "/org.apache.dubbo.rpc.protocol.dubbo.IDemoService?" + LAZY_CONNECT_KEY + "=true");
             IDemoService service = ProtocolUtils.refer(IDemoService.class, url);