You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by al...@apache.org on 2022/01/23 08:48:46 UTC
[dubbo] branch 3.0 updated: Refactor the lazy connect client reconnection logic (#9507)
This is an automated email from the ASF dual-hosted git repository.
albumenj pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.0 by this push:
new d1cd33b Refactor the lazy connect client reconnection logic (#9507)
d1cd33b is described below
commit d1cd33bee80df038221b787a8f4590fe2d16e9fe
Author: huazhongming <cr...@gmail.com>
AuthorDate: Sun Jan 23 16:48:37 2022 +0800
Refactor the lazy connect client reconnection logic (#9507)
* Support send.reconnect configurable
Support lazy connect client automatic reconnection
* fix import
* fix import
* fix ut
---
.../apache/dubbo/remoting/exchange/Exchangers.java | 1 -
.../support/header/HeaderExchangeClient.java | 2 +-
.../dubbo/remoting/transport/AbstractClient.java | 38 ++++++++++++-------
.../apache/dubbo/rpc/protocol/dubbo/Constants.java | 12 ------
.../protocol/dubbo/LazyConnectExchangeClient.java | 43 ++++++++++++----------
.../dubbo/ReferenceCountExchangeClient.java | 16 ++------
.../protocol/dubbo/DubboInvokerAvailableTest.java | 9 ++---
.../rpc/protocol/dubbo/DubboLazyConnectTest.java | 2 +-
8 files changed, 58 insertions(+), 65 deletions(-)
diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/Exchangers.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/Exchangers.java
index 63a0ece..2d181dd 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/Exchangers.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/Exchangers.java
@@ -104,7 +104,6 @@ public class Exchangers {
if (handler == null) {
throw new IllegalArgumentException("handler == null");
}
-// url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
return getExchanger(url).connect(url, handler);
}
diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeClient.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeClient.java
index 81df865..b25951a 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeClient.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeClient.java
@@ -53,7 +53,7 @@ public class HeaderExchangeClient implements ExchangeClient {
public static GlobalResourceInitializer<HashedWheelTimer> IDLE_CHECK_TIMER = new GlobalResourceInitializer<>(() ->
new HashedWheelTimer(new NamedThreadFactory("dubbo-client-idleCheck", true), 1,
TimeUnit.SECONDS, TICKS_PER_WHEEL),
- timer -> timer.stop());
+ HashedWheelTimer::stop);
private Timeout reconnectTimer;
private Timeout heartBeatTimer;
diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractClient.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractClient.java
index ff70f09..8157775 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractClient.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractClient.java
@@ -37,6 +37,7 @@ import java.util.concurrent.locks.ReentrantLock;
import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_CLIENT_THREADPOOL;
import static org.apache.dubbo.common.constants.CommonConstants.THREADPOOL_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.THREAD_NAME_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.LAZY_CONNECT_KEY;
/**
* AbstractClient
@@ -48,7 +49,8 @@ public abstract class AbstractClient extends AbstractEndpoint implements Client
private final Lock connectLock = new ReentrantLock();
private final boolean needReconnect;
protected volatile ExecutorService executor;
- private ExecutorRepository executorRepository;
+ private final ExecutorRepository executorRepository;
+
public AbstractClient(URL url, ChannelHandler handler) throws RemotingException {
super(url, handler);
@@ -63,8 +65,8 @@ public abstract class AbstractClient extends AbstractEndpoint implements Client
} catch (Throwable t) {
close();
throw new RemotingException(url.toInetSocketAddress(), null,
- "Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
- + " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t);
+ "Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
+ + " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t);
}
try {
@@ -74,18 +76,28 @@ public abstract class AbstractClient extends AbstractEndpoint implements Client
logger.info("Start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress() + " connect to the server " + getRemoteAddress());
}
} catch (RemotingException t) {
+ // If lazy connect client fails to establish a connection, the client instance will still be created,
+ // and the reconnection will be initiated by ReconnectTask, so there is no need to throw an exception
+ if (url.getParameter(LAZY_CONNECT_KEY, false)) {
+ logger.warn("Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress() +
+ " connect to the server " + getRemoteAddress() +
+ " (the connection request is initiated by lazy connect client, ignore and retry later!), cause: " +
+ t.getMessage(), t);
+ return;
+ }
+
if (url.getParameter(Constants.CHECK_KEY, true)) {
close();
throw t;
} else {
logger.warn("Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
- + " connect to the server " + getRemoteAddress() + " (check == false, ignore and retry later!), cause: " + t.getMessage(), t);
+ + " connect to the server " + getRemoteAddress() + " (check == false, ignore and retry later!), cause: " + t.getMessage(), t);
}
} catch (Throwable t) {
close();
throw new RemotingException(url.toInetSocketAddress(), null,
- "Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
- + " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t);
+ "Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
+ + " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t);
}
}
@@ -195,7 +207,7 @@ public abstract class AbstractClient extends AbstractEndpoint implements Client
if (isClosed() || isClosing()) {
logger.warn("No need to connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " "
- + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion() + ", cause: client status is closed or closing.");
+ + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion() + ", cause: client status is closed or closing.");
return;
}
@@ -203,14 +215,14 @@ public abstract class AbstractClient extends AbstractEndpoint implements Client
if (!isConnected()) {
throw new RemotingException(this, "Failed to connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " "
- + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()
- + ", cause: Connect wait timeout: " + getConnectTimeout() + "ms.");
+ + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()
+ + ", cause: Connect wait timeout: " + getConnectTimeout() + "ms.");
} else {
if (logger.isInfoEnabled()) {
logger.info("Successfully connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " "
- + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()
- + ", channel is " + this.getChannel());
+ + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()
+ + ", channel is " + this.getChannel());
}
}
@@ -219,8 +231,8 @@ public abstract class AbstractClient extends AbstractEndpoint implements Client
} catch (Throwable e) {
throw new RemotingException(this, "Failed to connect to server " + getRemoteAddress() + " from " + getClass().getSimpleName() + " "
- + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()
- + ", cause: " + e.getMessage(), e);
+ + NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion()
+ + ", cause: " + e.getMessage(), e);
} finally {
connectLock.unlock();
diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/Constants.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/Constants.java
index 9ce502f..f16dd5c 100644
--- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/Constants.java
+++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/Constants.java
@@ -49,18 +49,6 @@ public interface Constants {
String CHANNEL_CALLBACK_KEY = "channel.callback.invokers.key";
/**
- * The initial state for lazy connection
- */
- String LAZY_CONNECT_INITIAL_STATE_KEY = "connect.lazy.initial.state";
-
- /**
- * The default value of lazy connection's initial state: true
- *
- * @see #LAZY_CONNECT_INITIAL_STATE_KEY
- */
- boolean DEFAULT_LAZY_CONNECT_INITIAL_STATE = true;
-
- /**
* when this warning rises from invocation, program probably have a bug.
*/
String LAZY_REQUEST_WITH_WARNING_KEY = "lazyclient_request_with_warning";
diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/LazyConnectExchangeClient.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/LazyConnectExchangeClient.java
index 87fba37..fae3bf7 100644
--- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/LazyConnectExchangeClient.java
+++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/LazyConnectExchangeClient.java
@@ -26,6 +26,7 @@ import org.apache.dubbo.remoting.RemotingException;
import org.apache.dubbo.remoting.exchange.ExchangeClient;
import org.apache.dubbo.remoting.exchange.ExchangeHandler;
import org.apache.dubbo.remoting.exchange.Exchangers;
+import org.apache.dubbo.rpc.RpcException;
import java.net.InetSocketAddress;
import java.util.concurrent.CompletableFuture;
@@ -34,10 +35,9 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import static org.apache.dubbo.common.constants.CommonConstants.LAZY_CONNECT_KEY;
import static org.apache.dubbo.remoting.Constants.SEND_RECONNECT_KEY;
-import static org.apache.dubbo.rpc.protocol.dubbo.Constants.DEFAULT_LAZY_CONNECT_INITIAL_STATE;
import static org.apache.dubbo.rpc.protocol.dubbo.Constants.DEFAULT_LAZY_REQUEST_WITH_WARNING;
-import static org.apache.dubbo.rpc.protocol.dubbo.Constants.LAZY_CONNECT_INITIAL_STATE_KEY;
import static org.apache.dubbo.rpc.protocol.dubbo.Constants.LAZY_REQUEST_WITH_WARNING_KEY;
/**
@@ -47,23 +47,20 @@ import static org.apache.dubbo.rpc.protocol.dubbo.Constants.LAZY_REQUEST_WITH_WA
final class LazyConnectExchangeClient implements ExchangeClient {
private final static Logger logger = LoggerFactory.getLogger(LazyConnectExchangeClient.class);
- protected final boolean requestWithWarning;
+ private final boolean requestWithWarning;
private final URL url;
private final ExchangeHandler requestHandler;
private final Lock connectLock = new ReentrantLock();
private final int warningPeriod = 5000;
- /**
- * lazy connect, initial state for connection
- */
- private final boolean initialState;
+ private final boolean needReconnect;
private volatile ExchangeClient client;
private final AtomicLong warningCount = new AtomicLong(0);
public LazyConnectExchangeClient(URL url, ExchangeHandler requestHandler) {
// lazy connect, need set send.reconnect = true, to avoid channel bad status.
- this.url = url.addParameter(SEND_RECONNECT_KEY, Boolean.TRUE.toString());
+ this.url = url.addParameter(LAZY_CONNECT_KEY, true);
+ this.needReconnect = url.getParameter(SEND_RECONNECT_KEY, false);
this.requestHandler = requestHandler;
- this.initialState = url.getParameter(LAZY_CONNECT_INITIAL_STATE_KEY, DEFAULT_LAZY_CONNECT_INITIAL_STATE);
this.requestWithWarning = url.getParameter(LAZY_REQUEST_WITH_WARNING_KEY, DEFAULT_LAZY_REQUEST_WITH_WARNING);
}
@@ -88,7 +85,7 @@ final class LazyConnectExchangeClient implements ExchangeClient {
@Override
public CompletableFuture<Object> request(Object request) throws RemotingException {
warning();
- initClient();
+ checkClient();
return client.request(request);
}
@@ -109,21 +106,21 @@ final class LazyConnectExchangeClient implements ExchangeClient {
@Override
public CompletableFuture<Object> request(Object request, int timeout) throws RemotingException {
warning();
- initClient();
+ checkClient();
return client.request(request, timeout);
}
@Override
public CompletableFuture<Object> request(Object request, ExecutorService executor) throws RemotingException {
warning();
- initClient();
+ checkClient();
return client.request(request, executor);
}
@Override
public CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) throws RemotingException {
warning();
- initClient();
+ checkClient();
return client.request(request, timeout, executor);
}
@@ -148,7 +145,9 @@ final class LazyConnectExchangeClient implements ExchangeClient {
@Override
public boolean isConnected() {
if (client == null) {
- return initialState;
+ // Before the request arrives, LazyConnectExchangeClient always exists in a normal connection state
+ // to prevent ReconnectTask from initiating a reconnection action.
+ return true;
} else {
return client.isConnected();
}
@@ -170,13 +169,13 @@ final class LazyConnectExchangeClient implements ExchangeClient {
@Override
public void send(Object message) throws RemotingException {
- initClient();
+ checkClient();
client.send(message);
}
@Override
public void send(Object message, boolean sent) throws RemotingException {
- initClient();
+ checkClient();
client.send(message, sent);
}
@@ -261,9 +260,15 @@ final class LazyConnectExchangeClient implements ExchangeClient {
}
private void checkClient() {
- if (client == null) {
- throw new IllegalStateException(
- "LazyConnectExchangeClient state error. the client has not be init .url:" + url);
+ try {
+ initClient();
+ } catch (Exception e) {
+ throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);
+ }
+
+ if (!isConnected() && !needReconnect) {
+ throw new IllegalStateException("LazyConnectExchangeClient is not connected normally, " +
+ "and send.reconnect is configured as false, the request fails quickly" + url);
}
}
}
diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/ReferenceCountExchangeClient.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/ReferenceCountExchangeClient.java
index caca77d..2a904b0 100644
--- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/ReferenceCountExchangeClient.java
+++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/ReferenceCountExchangeClient.java
@@ -32,8 +32,6 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_SERVER_SHUTDOWN_TIMEOUT;
-import static org.apache.dubbo.remoting.Constants.SEND_RECONNECT_KEY;
-import static org.apache.dubbo.rpc.protocol.dubbo.Constants.LAZY_CONNECT_INITIAL_STATE_KEY;
/**
* dubbo protocol support class.
@@ -170,9 +168,9 @@ final class ReferenceCountExchangeClient implements ExchangeClient {
/**
* when destroy unused invoker, closeAll should be true
- *
+ *
* @param timeout
- * @param closeAll
+ * @param closeAll
*/
private void closeInternal(int timeout, boolean closeAll) {
if (closeAll || referenceCount.decrementAndGet() <= 0) {
@@ -208,13 +206,7 @@ final class ReferenceCountExchangeClient implements ExchangeClient {
* the order of judgment in the if statement cannot be changed.
*/
if (!(client instanceof LazyConnectExchangeClient)) {
- // this is a defensive operation to avoid client is closed by accident, the initial state of the client is false
- URL lazyUrl = url.addParameter(LAZY_CONNECT_INITIAL_STATE_KEY, Boolean.TRUE)
- //.addParameter(RECONNECT_KEY, Boolean.FALSE)
- .addParameter(SEND_RECONNECT_KEY, Boolean.TRUE.toString());
- //.addParameter(LazyConnectExchangeClient.REQUEST_WITH_WARNING_KEY, true);
-
- client = new LazyConnectExchangeClient(lazyUrl, client.getExchangeHandler());
+ client = new LazyConnectExchangeClient(url, client.getExchangeHandler());
}
}
@@ -230,7 +222,7 @@ final class ReferenceCountExchangeClient implements ExchangeClient {
referenceCount.incrementAndGet();
}
- public int getCount(){
+ public int getCount() {
return referenceCount.get();
}
diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/DubboInvokerAvailableTest.java b/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/DubboInvokerAvailableTest.java
index 1ac3063..1897f4b 100644
--- a/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/DubboInvokerAvailableTest.java
+++ b/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/DubboInvokerAvailableTest.java
@@ -139,16 +139,13 @@ public class DubboInvokerAvailableTest {
ExchangeClient exchangeClient = getClients((DubboInvoker<?>) invoker)[0];
Assertions.assertFalse(exchangeClient.isClosed());
- try {
- exchangeClient.setAttribute(Constants.CHANNEL_ATTRIBUTE_READONLY_KEY, Boolean.TRUE);
- fail();
- } catch (IllegalStateException e) {
-
- }
+ exchangeClient.setAttribute(Constants.CHANNEL_ATTRIBUTE_READONLY_KEY, Boolean.TRUE);
//invoke method --> init client
IDemoService service = (IDemoService) proxy.getProxy(invoker);
Assertions.assertEquals("ok", service.get());
+ Assertions.assertFalse(invoker.isAvailable());
+ exchangeClient.removeAttribute(Constants.CHANNEL_ATTRIBUTE_READONLY_KEY);
Assertions.assertTrue(invoker.isAvailable());
exchangeClient.setAttribute(Constants.CHANNEL_ATTRIBUTE_READONLY_KEY, Boolean.TRUE);
Assertions.assertFalse(invoker.isAvailable());
diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/DubboLazyConnectTest.java b/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/DubboLazyConnectTest.java
index 35264ac..eac5132 100644
--- a/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/DubboLazyConnectTest.java
+++ b/dubbo-rpc/dubbo-rpc-dubbo/src/test/java/org/apache/dubbo/rpc/protocol/dubbo/DubboLazyConnectTest.java
@@ -66,7 +66,7 @@ public class DubboLazyConnectTest {
@Test
public void testSticky3() {
- Assertions.assertThrows(RpcException.class, () -> {
+ Assertions.assertThrows(IllegalStateException.class, () -> {
int port = NetUtils.getAvailablePort();
URL url = URL.valueOf("dubbo://127.0.0.1:" + port + "/org.apache.dubbo.rpc.protocol.dubbo.IDemoService?" + LAZY_CONNECT_KEY + "=true");
IDemoService service = ProtocolUtils.refer(IDemoService.class, url);