You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by GitBox <gi...@apache.org> on 2019/01/16 06:52:14 UTC
[incubator-dubbo] Diff for: [GitHub] kexianjun closed pull request #3213:
[Dubbo 3151] Optimize heartbeat mechanism
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/Constants.java b/dubbo-common/src/main/java/org/apache/dubbo/common/Constants.java
index f799358afe..5bc290c64f 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/common/Constants.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/Constants.java
@@ -296,9 +296,9 @@
public static final long LEAST_HEARTBEAT_DURATION = 1000;
/**
- * ticks per wheel. Currently only contains two tasks, so 16 locations are enough
+ * ticks per wheel. All the heartbeat tasks and reconnected heartbeat tasks are maintained in one HashedWheelTimer
*/
- public static final int TICKS_PER_WHEEL = 16;
+ public static final int TICKS_PER_WHEEL = 128;
public static final String HEARTBEAT_TIMEOUT_KEY = "heartbeat.timeout";
diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/AbstractTimerTask.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/AbstractTimerTask.java
index 003af243d8..13986c6fe8 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/AbstractTimerTask.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/AbstractTimerTask.java
@@ -22,7 +22,6 @@
import org.apache.dubbo.common.timer.TimerTask;
import org.apache.dubbo.remoting.Channel;
-import java.util.Collection;
import java.util.concurrent.TimeUnit;
/**
@@ -30,16 +29,17 @@
*/
public abstract class AbstractTimerTask implements TimerTask {
- private final ChannelProvider channelProvider;
+
+ private final Channel channel;
private final Long tick;
- AbstractTimerTask(ChannelProvider channelProvider, Long tick) {
- if (channelProvider == null || tick == null) {
+ AbstractTimerTask(Channel channel, Long tick) {
+ if (channel == null || tick == null) {
throw new IllegalArgumentException();
}
this.tick = tick;
- this.channelProvider = channelProvider;
+ this.channel = channel;
}
static Long lastRead(Channel channel) {
@@ -54,7 +54,7 @@ static Long now() {
return System.currentTimeMillis();
}
- private void reput(Timeout timeout, Long tick) {
+ protected void reput(Timeout timeout, Long tick) {
if (timeout == null || tick == null) {
throw new IllegalArgumentException();
}
@@ -69,19 +69,13 @@ private void reput(Timeout timeout, Long tick) {
@Override
public void run(Timeout timeout) throws Exception {
- Collection<Channel> c = channelProvider.getChannels();
- for (Channel channel : c) {
- if (channel.isClosed()) {
- continue;
- }
- doTask(channel);
- }
- reput(timeout, tick);
+ doTask(channel, timeout);
}
- protected abstract void doTask(Channel channel);
+ protected abstract void doTask(Channel channel, Timeout timeout);
- interface ChannelProvider {
- Collection<Channel> getChannels();
+ protected Long getTick() {
+ return tick;
}
+
}
diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeClient.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeClient.java
index 3abbe5b542..9b0bc30c75 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeClient.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeClient.java
@@ -16,10 +16,7 @@
*/
package org.apache.dubbo.remoting.exchange.support.header;
-import org.apache.dubbo.common.Constants;
import org.apache.dubbo.common.URL;
-import org.apache.dubbo.common.timer.HashedWheelTimer;
-import org.apache.dubbo.common.utils.NamedThreadFactory;
import org.apache.dubbo.remoting.ChannelHandler;
import org.apache.dubbo.remoting.Client;
import org.apache.dubbo.remoting.RemotingException;
@@ -29,8 +26,6 @@
import org.apache.dubbo.remoting.exchange.ResponseFuture;
import java.net.InetSocketAddress;
-import java.util.Collections;
-import java.util.concurrent.TimeUnit;
/**
* DefaultMessageClient
@@ -39,33 +34,13 @@
private final Client client;
private final ExchangeChannel channel;
- // heartbeat(ms), default value is 0 , won't execute a heartbeat.
- private int heartbeat;
- private int heartbeatTimeout;
- private HashedWheelTimer heartbeatTimer;
-
- public HeaderExchangeClient(Client client, boolean needHeartbeat) {
+ public HeaderExchangeClient(Client client) {
if (client == null) {
throw new IllegalArgumentException("client == null");
}
this.client = client;
this.channel = new HeaderExchangeChannel(client);
- String dubbo = client.getUrl().getParameter(Constants.DUBBO_VERSION_KEY);
-
- this.heartbeat = client.getUrl().getParameter(Constants.HEARTBEAT_KEY, dubbo != null &&
- dubbo.startsWith("1.0.") ? Constants.DEFAULT_HEARTBEAT : 0);
- this.heartbeatTimeout = client.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3);
- if (heartbeatTimeout < heartbeat * 2) {
- throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2");
- }
-
- if (needHeartbeat) {
- long tickDuration = calculateLeastDuration(heartbeat);
- heartbeatTimer = new HashedWheelTimer(new NamedThreadFactory("dubbo-client-heartbeat", true), tickDuration,
- TimeUnit.MILLISECONDS, Constants.TICKS_PER_WHEEL);
- startHeartbeatTimer();
- }
}
@Override
@@ -178,39 +153,8 @@ public boolean hasAttribute(String key) {
return channel.hasAttribute(key);
}
- private void startHeartbeatTimer() {
- AbstractTimerTask.ChannelProvider cp = () -> Collections.singletonList(HeaderExchangeClient.this);
-
- long heartbeatTick = calculateLeastDuration(heartbeat);
- long heartbeatTimeoutTick = calculateLeastDuration(heartbeatTimeout);
- HeartbeatTimerTask heartBeatTimerTask = new HeartbeatTimerTask(cp, heartbeatTick, heartbeat);
- ReconnectTimerTask reconnectTimerTask = new ReconnectTimerTask(cp, heartbeatTimeoutTick, heartbeatTimeout);
-
- // init task and start timer.
- heartbeatTimer.newTimeout(heartBeatTimerTask, heartbeatTick, TimeUnit.MILLISECONDS);
- heartbeatTimer.newTimeout(reconnectTimerTask, heartbeatTimeoutTick, TimeUnit.MILLISECONDS);
- }
-
- private void stopHeartbeatTimer() {
- if (heartbeatTimer != null) {
- heartbeatTimer.stop();
- heartbeatTimer = null;
- }
- }
-
private void doClose() {
- stopHeartbeatTimer();
- }
-
- /**
- * Each interval cannot be less than 1000ms.
- */
- private long calculateLeastDuration(int time) {
- if (time / Constants.HEARTBEAT_CHECK_TICK <= 0) {
- return Constants.LEAST_HEARTBEAT_DURATION;
- } else {
- return time / Constants.HEARTBEAT_CHECK_TICK;
- }
+ // Do nothing.
}
@Override
diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeServer.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeServer.java
index 7f3067d60c..84261f74e4 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeServer.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeServer.java
@@ -21,8 +21,6 @@
import org.apache.dubbo.common.Version;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
-import org.apache.dubbo.common.timer.HashedWheelTimer;
-import org.apache.dubbo.common.utils.NamedThreadFactory;
import org.apache.dubbo.remoting.Channel;
import org.apache.dubbo.remoting.ChannelHandler;
import org.apache.dubbo.remoting.RemotingException;
@@ -34,11 +32,8 @@
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import static java.util.Collections.unmodifiableCollection;
-
/**
* ExchangeServerImpl
*/
@@ -47,25 +42,15 @@
protected final Logger logger = LoggerFactory.getLogger(getClass());
private final Server server;
- // heartbeat timeout (ms), default value is 0 , won't execute a heartbeat.
- private int heartbeat;
- private int heartbeatTimeout;
- private AtomicBoolean closed = new AtomicBoolean(false);
- private HashedWheelTimer heartbeatTimer;
+ private AtomicBoolean closed = new AtomicBoolean(false);
public HeaderExchangeServer(Server server) {
if (server == null) {
throw new IllegalArgumentException("server == null");
}
this.server = server;
- this.heartbeat = server.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0);
- this.heartbeatTimeout = server.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3);
- if (heartbeatTimeout < heartbeat * 2) {
- throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2");
- }
- startHeartbeatTimer();
}
public Server getServer() {
@@ -148,7 +133,6 @@ private void doClose() {
if (!closed.compareAndSet(false, true)) {
return;
}
- stopHeartbeatTimer();
}
@Override
@@ -203,25 +187,6 @@ public ChannelHandler getChannelHandler() {
@Override
public void reset(URL url) {
server.reset(url);
- try {
- if (url.hasParameter(Constants.HEARTBEAT_KEY)
- || url.hasParameter(Constants.HEARTBEAT_TIMEOUT_KEY)) {
- int h = url.getParameter(Constants.HEARTBEAT_KEY, heartbeat);
- int t = url.getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, h * 3);
- if (t < h * 2) {
- throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2");
- }
- if (h != heartbeat || t != heartbeatTimeout) {
- heartbeat = h;
- heartbeatTimeout = t;
-
- stopHeartbeatTimer();
- startHeartbeatTimer();
- }
- }
- } catch (Throwable t) {
- logger.error(t.getMessage(), t);
- }
}
@Override
@@ -247,40 +212,4 @@ public void send(Object message, boolean sent) throws RemotingException {
}
server.send(message, sent);
}
-
- /**
- * Each interval cannot be less than 1000ms.
- */
- private long calculateLeastDuration(int time) {
- if (time / Constants.HEARTBEAT_CHECK_TICK <= 0) {
- return Constants.LEAST_HEARTBEAT_DURATION;
- } else {
- return time / Constants.HEARTBEAT_CHECK_TICK;
- }
- }
-
- private void startHeartbeatTimer() {
- long tickDuration = calculateLeastDuration(heartbeat);
- heartbeatTimer = new HashedWheelTimer(new NamedThreadFactory("dubbo-server-heartbeat", true), tickDuration,
- TimeUnit.MILLISECONDS, Constants.TICKS_PER_WHEEL);
-
- AbstractTimerTask.ChannelProvider cp = () -> unmodifiableCollection(HeaderExchangeServer.this.getChannels());
-
- long heartbeatTick = calculateLeastDuration(heartbeat);
- long heartbeatTimeoutTick = calculateLeastDuration(heartbeatTimeout);
- HeartbeatTimerTask heartBeatTimerTask = new HeartbeatTimerTask(cp, heartbeatTick, heartbeat);
- ReconnectTimerTask reconnectTimerTask = new ReconnectTimerTask(cp, heartbeatTimeoutTick, heartbeatTimeout);
-
- // init task and start timer.
- heartbeatTimer.newTimeout(heartBeatTimerTask, heartbeatTick, TimeUnit.MILLISECONDS);
- heartbeatTimer.newTimeout(reconnectTimerTask, heartbeatTimeoutTick, TimeUnit.MILLISECONDS);
- }
-
- private void stopHeartbeatTimer() {
- if (heartbeatTimer != null) {
- heartbeatTimer.stop();
- heartbeatTimer = null;
- }
- }
-
}
diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchanger.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchanger.java
index e2e8f1b1c9..e8f7882d67 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchanger.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchanger.java
@@ -36,7 +36,7 @@
@Override
public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
- return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
+ return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}
@Override
diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeartbeatHandler.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeartbeatHandler.java
index c222885ad4..3257f7233b 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeartbeatHandler.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeartbeatHandler.java
@@ -20,6 +20,8 @@
import org.apache.dubbo.common.Constants;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.common.timer.HashedWheelTimer;
+import org.apache.dubbo.common.utils.NamedThreadFactory;
import org.apache.dubbo.remoting.Channel;
import org.apache.dubbo.remoting.ChannelHandler;
import org.apache.dubbo.remoting.RemotingException;
@@ -27,6 +29,8 @@
import org.apache.dubbo.remoting.exchange.Response;
import org.apache.dubbo.remoting.transport.AbstractChannelHandlerDelegate;
+import java.util.concurrent.TimeUnit;
+
public class HeartbeatHandler extends AbstractChannelHandlerDelegate {
private static final Logger logger = LoggerFactory.getLogger(HeartbeatHandler.class);
@@ -35,6 +39,12 @@
public static String KEY_WRITE_TIMESTAMP = "WRITE_TIMESTAMP";
+ // The whole dubbo service use only one hashedWheelTimer for heartbeat task and reconnect task
+ private static HashedWheelTimer hashedWheelTimer = new HashedWheelTimer(new NamedThreadFactory("dubbo-heartbeat", true),
+ 1000,
+ TimeUnit.MILLISECONDS,
+ Constants.TICKS_PER_WHEEL);
+
public HeartbeatHandler(ChannelHandler handler) {
super(handler);
}
@@ -44,6 +54,20 @@ public void connected(Channel channel) throws RemotingException {
setReadTimestamp(channel);
setWriteTimestamp(channel);
handler.connected(channel);
+ // When a channel has connected, add it's heartbeat task and reconnected task to hashedWheelTimer
+ long heartbeat = channel.getUrl().getParameter(Constants.HEARTBEAT_KEY, Constants.LEAST_HEARTBEAT_DURATION);
+
+ long heartbeatTimeout = channel.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3);
+
+ if (heartbeatTimeout < heartbeat * 2) {
+ throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2");
+ }
+
+ HeartbeatTimerTask heartBeatTimerTask = new HeartbeatTimerTask(channel, heartbeat);
+ ReconnectTimerTask reconnectTimerTask = new ReconnectTimerTask(channel, heartbeatTimeout);
+ hashedWheelTimer.newTimeout(heartBeatTimerTask, heartbeat, TimeUnit.MILLISECONDS);
+ hashedWheelTimer.newTimeout(reconnectTimerTask, heartbeatTimeout, TimeUnit.MILLISECONDS);
+
}
@Override
diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeartbeatTimerTask.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeartbeatTimerTask.java
index cbe01f8506..c467abff73 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeartbeatTimerTask.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeartbeatTimerTask.java
@@ -20,6 +20,7 @@
import org.apache.dubbo.common.Version;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.common.timer.Timeout;
import org.apache.dubbo.remoting.Channel;
import org.apache.dubbo.remoting.exchange.Request;
@@ -30,20 +31,25 @@
private static final Logger logger = LoggerFactory.getLogger(HeartbeatTimerTask.class);
- private final int heartbeat;
-
- HeartbeatTimerTask(ChannelProvider channelProvider, Long heartbeatTick, int heartbeat) {
- super(channelProvider, heartbeatTick);
- this.heartbeat = heartbeat;
+ HeartbeatTimerTask(Channel channel, Long heartbeatTick) {
+ super(channel, heartbeatTick);
}
@Override
- protected void doTask(Channel channel) {
+ protected void doTask(Channel channel, Timeout timeout) {
+ long heartbeatDuration = 0;
try {
Long lastRead = lastRead(channel);
Long lastWrite = lastWrite(channel);
- if ((lastRead != null && now() - lastRead > heartbeat)
- || (lastWrite != null && now() - lastWrite > heartbeat)) {
+ long now = now();
+ if (null != lastRead) {
+ heartbeatDuration = now - lastRead;
+ }
+ if (null != lastWrite) {
+ heartbeatDuration = Math.max(heartbeatDuration, now - lastWrite);
+ }
+ if (heartbeatDuration > getTick()) {
+ // Heartbeat timeout, send a heartbeat package.
Request req = new Request();
req.setVersion(Version.getProtocolVersion());
req.setTwoWay(true);
@@ -52,11 +58,15 @@ protected void doTask(Channel channel) {
if (logger.isDebugEnabled()) {
logger.debug("Send heartbeat to remote channel " + channel.getRemoteAddress()
+ ", cause: The channel has no data-transmission exceeds a heartbeat period: "
- + heartbeat + "ms");
+ + getTick() + "ms");
}
}
} catch (Throwable t) {
logger.warn("Exception when heartbeat to remote channel " + channel.getRemoteAddress(), t);
}
+ if (!channel.isClosed()) {
+ // Set the next heartbeat task with recalculate tick duration.
+ reput(timeout, Math.min(getTick() - heartbeatDuration > 0 ? getTick() - heartbeatDuration : getTick(), getTick()));
+ }
}
}
diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/ReconnectTimerTask.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/ReconnectTimerTask.java
index 2b7dca552c..891837c191 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/ReconnectTimerTask.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/ReconnectTimerTask.java
@@ -17,8 +17,10 @@
package org.apache.dubbo.remoting.exchange.support.header;
+import org.apache.dubbo.common.Constants;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.common.timer.Timeout;
import org.apache.dubbo.remoting.Channel;
import org.apache.dubbo.remoting.Client;
@@ -29,22 +31,19 @@
private static final Logger logger = LoggerFactory.getLogger(ReconnectTimerTask.class);
- private final int heartbeatTimeout;
-
- ReconnectTimerTask(ChannelProvider channelProvider, Long heartbeatTimeoutTick, int heartbeatTimeout1) {
- super(channelProvider, heartbeatTimeoutTick);
- this.heartbeatTimeout = heartbeatTimeout1;
+ ReconnectTimerTask(Channel channel, long reconnectedTick) {
+ super(channel, reconnectedTick);
}
@Override
- protected void doTask(Channel channel) {
+ protected void doTask(Channel channel, Timeout timeout) {
Long lastRead = lastRead(channel);
- Long now = now();
- if (lastRead != null && now - lastRead > heartbeatTimeout) {
- if (channel instanceof Client) {
+ long heartBeatDuration = lastRead == null ? 0 : now() - lastRead;
+ if (heartBeatDuration > getTick()) {
+ if (Constants.CONSUMER_SIDE.equals(channel.getUrl().getParameter(Constants.SIDE_KEY))) {
try {
logger.warn("Reconnect to remote channel " + channel.getRemoteAddress() + ", because heartbeat read idle time out: "
- + heartbeatTimeout + "ms");
+ + getTick() + "ms");
((Client) channel).reconnect();
} catch (Throwable t) {
// do nothing
@@ -52,12 +51,19 @@ protected void doTask(Channel channel) {
} else {
try {
logger.warn("Close channel " + channel + ", because heartbeat read idle time out: "
- + heartbeatTimeout + "ms");
+ + getTick() + "ms");
channel.close();
+ // For provider side, if the channel is closed, just return.
+ return;
} catch (Throwable t) {
logger.warn("Exception when close channel " + channel, t);
}
}
}
+ // Set the next heartbeat task with recalculate tick duration.
+ if (!channel.isClosed()) {
+ reput(timeout, Math.min(getTick() - heartBeatDuration > 0 ? getTick() - heartBeatDuration : getTick(), getTick()));
+ }
+
}
}
diff --git a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/exchange/support/header/HeartBeatTaskTest.java b/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/exchange/support/header/HeartBeatTaskTest.java
index 782b8480cd..f1bcf85cb9 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/exchange/support/header/HeartBeatTaskTest.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/exchange/support/header/HeartBeatTaskTest.java
@@ -52,8 +52,7 @@ public URL getUrl() {
}
};
- AbstractTimerTask.ChannelProvider cp = () -> Collections.<Channel>singletonList(channel);
- heartbeatTimerTask = new HeartbeatTimerTask(cp, tickDuration / Constants.HEARTBEAT_CHECK_TICK, (int) tickDuration);
+ heartbeatTimerTask = new HeartbeatTimerTask(channel, tickDuration);
}
@Test
diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/ChannelWrappedInvoker.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/ChannelWrappedInvoker.java
index 4c78c1cceb..281beecbcd 100644
--- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/ChannelWrappedInvoker.java
+++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/ChannelWrappedInvoker.java
@@ -47,7 +47,7 @@
super(serviceType, url, new String[]{Constants.GROUP_KEY, Constants.TOKEN_KEY, Constants.TIMEOUT_KEY});
this.channel = channel;
this.serviceKey = serviceKey;
- this.currentClient = new HeaderExchangeClient(new ChannelWrapper(this.channel), false);
+ this.currentClient = new HeaderExchangeClient(new ChannelWrapper(this.channel));
}
@Override
With regards,
Apache Git Services