You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by GitBox <gi...@apache.org> on 2019/01/16 06:52:14 UTC

[incubator-dubbo] Diff for: [GitHub] kexianjun closed pull request #3213: [Dubbo 3151] Optimize heartbeat mechanism

diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/Constants.java b/dubbo-common/src/main/java/org/apache/dubbo/common/Constants.java
index f799358afe..5bc290c64f 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/common/Constants.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/Constants.java
@@ -296,9 +296,9 @@
     public static final long LEAST_HEARTBEAT_DURATION = 1000;
 
     /**
-     * ticks per wheel. Currently only contains two tasks, so 16 locations are enough
+     * ticks per wheel. All the heartbeat tasks and reconnected heartbeat tasks are maintained in one HashedWheelTimer
      */
-    public static final int TICKS_PER_WHEEL = 16;
+    public static final int TICKS_PER_WHEEL = 128;
 
     public static final String HEARTBEAT_TIMEOUT_KEY = "heartbeat.timeout";
 
diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/AbstractTimerTask.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/AbstractTimerTask.java
index 003af243d8..13986c6fe8 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/AbstractTimerTask.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/AbstractTimerTask.java
@@ -22,7 +22,6 @@
 import org.apache.dubbo.common.timer.TimerTask;
 import org.apache.dubbo.remoting.Channel;
 
-import java.util.Collection;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -30,16 +29,17 @@
  */
 public abstract class AbstractTimerTask implements TimerTask {
 
-    private final ChannelProvider channelProvider;
+
+    private final Channel channel;
 
     private final Long tick;
 
-    AbstractTimerTask(ChannelProvider channelProvider, Long tick) {
-        if (channelProvider == null || tick == null) {
+    AbstractTimerTask(Channel channel, Long tick) {
+        if (channel == null || tick == null) {
             throw new IllegalArgumentException();
         }
         this.tick = tick;
-        this.channelProvider = channelProvider;
+        this.channel = channel;
     }
 
     static Long lastRead(Channel channel) {
@@ -54,7 +54,7 @@ static Long now() {
         return System.currentTimeMillis();
     }
 
-    private void reput(Timeout timeout, Long tick) {
+    protected void reput(Timeout timeout, Long tick) {
         if (timeout == null || tick == null) {
             throw new IllegalArgumentException();
         }
@@ -69,19 +69,13 @@ private void reput(Timeout timeout, Long tick) {
 
     @Override
     public void run(Timeout timeout) throws Exception {
-        Collection<Channel> c = channelProvider.getChannels();
-        for (Channel channel : c) {
-            if (channel.isClosed()) {
-                continue;
-            }
-            doTask(channel);
-        }
-        reput(timeout, tick);
+            doTask(channel, timeout);
     }
 
-    protected abstract void doTask(Channel channel);
+    protected abstract void doTask(Channel channel, Timeout timeout);
 
-    interface ChannelProvider {
-        Collection<Channel> getChannels();
+    protected Long getTick() {
+        return tick;
     }
+
 }
diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeClient.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeClient.java
index 3abbe5b542..9b0bc30c75 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeClient.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeClient.java
@@ -16,10 +16,7 @@
  */
 package org.apache.dubbo.remoting.exchange.support.header;
 
-import org.apache.dubbo.common.Constants;
 import org.apache.dubbo.common.URL;
-import org.apache.dubbo.common.timer.HashedWheelTimer;
-import org.apache.dubbo.common.utils.NamedThreadFactory;
 import org.apache.dubbo.remoting.ChannelHandler;
 import org.apache.dubbo.remoting.Client;
 import org.apache.dubbo.remoting.RemotingException;
@@ -29,8 +26,6 @@
 import org.apache.dubbo.remoting.exchange.ResponseFuture;
 
 import java.net.InetSocketAddress;
-import java.util.Collections;
-import java.util.concurrent.TimeUnit;
 
 /**
  * DefaultMessageClient
@@ -39,33 +34,13 @@
 
     private final Client client;
     private final ExchangeChannel channel;
-    // heartbeat(ms), default value is 0 , won't execute a heartbeat.
-    private int heartbeat;
-    private int heartbeatTimeout;
 
-    private HashedWheelTimer heartbeatTimer;
-
-    public HeaderExchangeClient(Client client, boolean needHeartbeat) {
+    public HeaderExchangeClient(Client client) {
         if (client == null) {
             throw new IllegalArgumentException("client == null");
         }
         this.client = client;
         this.channel = new HeaderExchangeChannel(client);
-        String dubbo = client.getUrl().getParameter(Constants.DUBBO_VERSION_KEY);
-
-        this.heartbeat = client.getUrl().getParameter(Constants.HEARTBEAT_KEY, dubbo != null &&
-                dubbo.startsWith("1.0.") ? Constants.DEFAULT_HEARTBEAT : 0);
-        this.heartbeatTimeout = client.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3);
-        if (heartbeatTimeout < heartbeat * 2) {
-            throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2");
-        }
-
-        if (needHeartbeat) {
-            long tickDuration = calculateLeastDuration(heartbeat);
-            heartbeatTimer = new HashedWheelTimer(new NamedThreadFactory("dubbo-client-heartbeat", true), tickDuration,
-                    TimeUnit.MILLISECONDS, Constants.TICKS_PER_WHEEL);
-            startHeartbeatTimer();
-        }
     }
 
     @Override
@@ -178,39 +153,8 @@ public boolean hasAttribute(String key) {
         return channel.hasAttribute(key);
     }
 
-    private void startHeartbeatTimer() {
-        AbstractTimerTask.ChannelProvider cp = () -> Collections.singletonList(HeaderExchangeClient.this);
-
-        long heartbeatTick = calculateLeastDuration(heartbeat);
-        long heartbeatTimeoutTick = calculateLeastDuration(heartbeatTimeout);
-        HeartbeatTimerTask heartBeatTimerTask = new HeartbeatTimerTask(cp, heartbeatTick, heartbeat);
-        ReconnectTimerTask reconnectTimerTask = new ReconnectTimerTask(cp, heartbeatTimeoutTick, heartbeatTimeout);
-
-        // init task and start timer.
-        heartbeatTimer.newTimeout(heartBeatTimerTask, heartbeatTick, TimeUnit.MILLISECONDS);
-        heartbeatTimer.newTimeout(reconnectTimerTask, heartbeatTimeoutTick, TimeUnit.MILLISECONDS);
-    }
-
-    private void stopHeartbeatTimer() {
-        if (heartbeatTimer != null) {
-            heartbeatTimer.stop();
-            heartbeatTimer = null;
-        }
-    }
-
     private void doClose() {
-        stopHeartbeatTimer();
-    }
-
-    /**
-     * Each interval cannot be less than 1000ms.
-     */
-    private long calculateLeastDuration(int time) {
-        if (time / Constants.HEARTBEAT_CHECK_TICK <= 0) {
-            return Constants.LEAST_HEARTBEAT_DURATION;
-        } else {
-            return time / Constants.HEARTBEAT_CHECK_TICK;
-        }
+        // Do nothing.
     }
 
     @Override
diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeServer.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeServer.java
index 7f3067d60c..84261f74e4 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeServer.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeServer.java
@@ -21,8 +21,6 @@
 import org.apache.dubbo.common.Version;
 import org.apache.dubbo.common.logger.Logger;
 import org.apache.dubbo.common.logger.LoggerFactory;
-import org.apache.dubbo.common.timer.HashedWheelTimer;
-import org.apache.dubbo.common.utils.NamedThreadFactory;
 import org.apache.dubbo.remoting.Channel;
 import org.apache.dubbo.remoting.ChannelHandler;
 import org.apache.dubbo.remoting.RemotingException;
@@ -34,11 +32,8 @@
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import static java.util.Collections.unmodifiableCollection;
-
 /**
  * ExchangeServerImpl
  */
@@ -47,25 +42,15 @@
     protected final Logger logger = LoggerFactory.getLogger(getClass());
 
     private final Server server;
-    // heartbeat timeout (ms), default value is 0 , won't execute a heartbeat.
-    private int heartbeat;
-    private int heartbeatTimeout;
-    private AtomicBoolean closed = new AtomicBoolean(false);
 
-    private HashedWheelTimer heartbeatTimer;
+    private AtomicBoolean closed = new AtomicBoolean(false);
 
     public HeaderExchangeServer(Server server) {
         if (server == null) {
             throw new IllegalArgumentException("server == null");
         }
         this.server = server;
-        this.heartbeat = server.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0);
-        this.heartbeatTimeout = server.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3);
-        if (heartbeatTimeout < heartbeat * 2) {
-            throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2");
-        }
 
-        startHeartbeatTimer();
     }
 
     public Server getServer() {
@@ -148,7 +133,6 @@ private void doClose() {
         if (!closed.compareAndSet(false, true)) {
             return;
         }
-        stopHeartbeatTimer();
     }
 
     @Override
@@ -203,25 +187,6 @@ public ChannelHandler getChannelHandler() {
     @Override
     public void reset(URL url) {
         server.reset(url);
-        try {
-            if (url.hasParameter(Constants.HEARTBEAT_KEY)
-                    || url.hasParameter(Constants.HEARTBEAT_TIMEOUT_KEY)) {
-                int h = url.getParameter(Constants.HEARTBEAT_KEY, heartbeat);
-                int t = url.getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, h * 3);
-                if (t < h * 2) {
-                    throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2");
-                }
-                if (h != heartbeat || t != heartbeatTimeout) {
-                    heartbeat = h;
-                    heartbeatTimeout = t;
-
-                    stopHeartbeatTimer();
-                    startHeartbeatTimer();
-                }
-            }
-        } catch (Throwable t) {
-            logger.error(t.getMessage(), t);
-        }
     }
 
     @Override
@@ -247,40 +212,4 @@ public void send(Object message, boolean sent) throws RemotingException {
         }
         server.send(message, sent);
     }
-
-    /**
-     * Each interval cannot be less than 1000ms.
-     */
-    private long calculateLeastDuration(int time) {
-        if (time / Constants.HEARTBEAT_CHECK_TICK <= 0) {
-            return Constants.LEAST_HEARTBEAT_DURATION;
-        } else {
-            return time / Constants.HEARTBEAT_CHECK_TICK;
-        }
-    }
-
-    private void startHeartbeatTimer() {
-        long tickDuration = calculateLeastDuration(heartbeat);
-        heartbeatTimer = new HashedWheelTimer(new NamedThreadFactory("dubbo-server-heartbeat", true), tickDuration,
-                TimeUnit.MILLISECONDS, Constants.TICKS_PER_WHEEL);
-
-        AbstractTimerTask.ChannelProvider cp = () -> unmodifiableCollection(HeaderExchangeServer.this.getChannels());
-
-        long heartbeatTick = calculateLeastDuration(heartbeat);
-        long heartbeatTimeoutTick = calculateLeastDuration(heartbeatTimeout);
-        HeartbeatTimerTask heartBeatTimerTask = new HeartbeatTimerTask(cp, heartbeatTick, heartbeat);
-        ReconnectTimerTask reconnectTimerTask = new ReconnectTimerTask(cp, heartbeatTimeoutTick, heartbeatTimeout);
-
-        // init task and start timer.
-        heartbeatTimer.newTimeout(heartBeatTimerTask, heartbeatTick, TimeUnit.MILLISECONDS);
-        heartbeatTimer.newTimeout(reconnectTimerTask, heartbeatTimeoutTick, TimeUnit.MILLISECONDS);
-    }
-
-    private void stopHeartbeatTimer() {
-        if (heartbeatTimer != null) {
-            heartbeatTimer.stop();
-            heartbeatTimer = null;
-        }
-    }
-
 }
diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchanger.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchanger.java
index e2e8f1b1c9..e8f7882d67 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchanger.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchanger.java
@@ -36,7 +36,7 @@
 
     @Override
     public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
-        return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
+        return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
     }
 
     @Override
diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeartbeatHandler.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeartbeatHandler.java
index c222885ad4..3257f7233b 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeartbeatHandler.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeartbeatHandler.java
@@ -20,6 +20,8 @@
 import org.apache.dubbo.common.Constants;
 import org.apache.dubbo.common.logger.Logger;
 import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.common.timer.HashedWheelTimer;
+import org.apache.dubbo.common.utils.NamedThreadFactory;
 import org.apache.dubbo.remoting.Channel;
 import org.apache.dubbo.remoting.ChannelHandler;
 import org.apache.dubbo.remoting.RemotingException;
@@ -27,6 +29,8 @@
 import org.apache.dubbo.remoting.exchange.Response;
 import org.apache.dubbo.remoting.transport.AbstractChannelHandlerDelegate;
 
+import java.util.concurrent.TimeUnit;
+
 public class HeartbeatHandler extends AbstractChannelHandlerDelegate {
 
     private static final Logger logger = LoggerFactory.getLogger(HeartbeatHandler.class);
@@ -35,6 +39,12 @@
 
     public static String KEY_WRITE_TIMESTAMP = "WRITE_TIMESTAMP";
 
+    // The whole dubbo service use only one hashedWheelTimer for heartbeat task and reconnect task
+    private static HashedWheelTimer hashedWheelTimer = new HashedWheelTimer(new NamedThreadFactory("dubbo-heartbeat", true),
+            1000,
+            TimeUnit.MILLISECONDS,
+            Constants.TICKS_PER_WHEEL);
+
     public HeartbeatHandler(ChannelHandler handler) {
         super(handler);
     }
@@ -44,6 +54,20 @@ public void connected(Channel channel) throws RemotingException {
         setReadTimestamp(channel);
         setWriteTimestamp(channel);
         handler.connected(channel);
+        // When a channel has connected, add it's heartbeat task and reconnected task to hashedWheelTimer
+        long heartbeat = channel.getUrl().getParameter(Constants.HEARTBEAT_KEY, Constants.LEAST_HEARTBEAT_DURATION);
+
+        long heartbeatTimeout = channel.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3);
+
+        if (heartbeatTimeout < heartbeat * 2) {
+            throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2");
+        }
+
+        HeartbeatTimerTask heartBeatTimerTask = new HeartbeatTimerTask(channel, heartbeat);
+        ReconnectTimerTask reconnectTimerTask = new ReconnectTimerTask(channel, heartbeatTimeout);
+        hashedWheelTimer.newTimeout(heartBeatTimerTask, heartbeat, TimeUnit.MILLISECONDS);
+        hashedWheelTimer.newTimeout(reconnectTimerTask, heartbeatTimeout, TimeUnit.MILLISECONDS);
+
     }
 
     @Override
diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeartbeatTimerTask.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeartbeatTimerTask.java
index cbe01f8506..c467abff73 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeartbeatTimerTask.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeartbeatTimerTask.java
@@ -20,6 +20,7 @@
 import org.apache.dubbo.common.Version;
 import org.apache.dubbo.common.logger.Logger;
 import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.common.timer.Timeout;
 import org.apache.dubbo.remoting.Channel;
 import org.apache.dubbo.remoting.exchange.Request;
 
@@ -30,20 +31,25 @@
 
     private static final Logger logger = LoggerFactory.getLogger(HeartbeatTimerTask.class);
 
-    private final int heartbeat;
-
-    HeartbeatTimerTask(ChannelProvider channelProvider, Long heartbeatTick, int heartbeat) {
-        super(channelProvider, heartbeatTick);
-        this.heartbeat = heartbeat;
+    HeartbeatTimerTask(Channel channel, Long heartbeatTick) {
+        super(channel, heartbeatTick);
     }
 
     @Override
-    protected void doTask(Channel channel) {
+    protected void doTask(Channel channel, Timeout timeout) {
+        long heartbeatDuration = 0;
         try {
             Long lastRead = lastRead(channel);
             Long lastWrite = lastWrite(channel);
-            if ((lastRead != null && now() - lastRead > heartbeat)
-                    || (lastWrite != null && now() - lastWrite > heartbeat)) {
+            long now = now();
+            if (null != lastRead) {
+                heartbeatDuration = now - lastRead;
+            }
+            if (null != lastWrite) {
+                heartbeatDuration = Math.max(heartbeatDuration, now - lastWrite);
+            }
+            if (heartbeatDuration > getTick()) {
+                // Heartbeat timeout, send a heartbeat package.
                 Request req = new Request();
                 req.setVersion(Version.getProtocolVersion());
                 req.setTwoWay(true);
@@ -52,11 +58,15 @@ protected void doTask(Channel channel) {
                 if (logger.isDebugEnabled()) {
                     logger.debug("Send heartbeat to remote channel " + channel.getRemoteAddress()
                             + ", cause: The channel has no data-transmission exceeds a heartbeat period: "
-                            + heartbeat + "ms");
+                            + getTick() + "ms");
                 }
             }
         } catch (Throwable t) {
             logger.warn("Exception when heartbeat to remote channel " + channel.getRemoteAddress(), t);
         }
+        if (!channel.isClosed()) {
+            // Set the next heartbeat task with recalculate tick duration.
+            reput(timeout, Math.min(getTick() - heartbeatDuration > 0 ? getTick() - heartbeatDuration : getTick(), getTick()));
+        }
     }
 }
diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/ReconnectTimerTask.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/ReconnectTimerTask.java
index 2b7dca552c..891837c191 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/ReconnectTimerTask.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/ReconnectTimerTask.java
@@ -17,8 +17,10 @@
 
 package org.apache.dubbo.remoting.exchange.support.header;
 
+import org.apache.dubbo.common.Constants;
 import org.apache.dubbo.common.logger.Logger;
 import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.common.timer.Timeout;
 import org.apache.dubbo.remoting.Channel;
 import org.apache.dubbo.remoting.Client;
 
@@ -29,22 +31,19 @@
 
     private static final Logger logger = LoggerFactory.getLogger(ReconnectTimerTask.class);
 
-    private final int heartbeatTimeout;
-
-    ReconnectTimerTask(ChannelProvider channelProvider, Long heartbeatTimeoutTick, int heartbeatTimeout1) {
-        super(channelProvider, heartbeatTimeoutTick);
-        this.heartbeatTimeout = heartbeatTimeout1;
+    ReconnectTimerTask(Channel channel, long reconnectedTick) {
+        super(channel, reconnectedTick);
     }
 
     @Override
-    protected void doTask(Channel channel) {
+    protected void doTask(Channel channel, Timeout timeout) {
         Long lastRead = lastRead(channel);
-        Long now = now();
-        if (lastRead != null && now - lastRead > heartbeatTimeout) {
-            if (channel instanceof Client) {
+        long heartBeatDuration = lastRead == null ? 0 : now() - lastRead;
+        if (heartBeatDuration > getTick()) {
+            if (Constants.CONSUMER_SIDE.equals(channel.getUrl().getParameter(Constants.SIDE_KEY))) {
                 try {
                     logger.warn("Reconnect to remote channel " + channel.getRemoteAddress() + ", because heartbeat read idle time out: "
-                            + heartbeatTimeout + "ms");
+                            + getTick() + "ms");
                     ((Client) channel).reconnect();
                 } catch (Throwable t) {
                     // do nothing
@@ -52,12 +51,19 @@ protected void doTask(Channel channel) {
             } else {
                 try {
                     logger.warn("Close channel " + channel + ", because heartbeat read idle time out: "
-                            + heartbeatTimeout + "ms");
+                            + getTick() + "ms");
                     channel.close();
+                    // For provider side, if the channel is closed, just return.
+                    return;
                 } catch (Throwable t) {
                     logger.warn("Exception when close channel " + channel, t);
                 }
             }
         }
+        // Set the next heartbeat task with recalculate tick duration.
+        if (!channel.isClosed()) {
+            reput(timeout, Math.min(getTick() - heartBeatDuration > 0 ? getTick() - heartBeatDuration : getTick(), getTick()));
+        }
+
     }
 }
diff --git a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/exchange/support/header/HeartBeatTaskTest.java b/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/exchange/support/header/HeartBeatTaskTest.java
index 782b8480cd..f1bcf85cb9 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/exchange/support/header/HeartBeatTaskTest.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/exchange/support/header/HeartBeatTaskTest.java
@@ -52,8 +52,7 @@ public URL getUrl() {
             }
         };
 
-        AbstractTimerTask.ChannelProvider cp = () -> Collections.<Channel>singletonList(channel);
-        heartbeatTimerTask = new HeartbeatTimerTask(cp, tickDuration / Constants.HEARTBEAT_CHECK_TICK, (int) tickDuration);
+        heartbeatTimerTask = new HeartbeatTimerTask(channel, tickDuration);
     }
 
     @Test
diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/ChannelWrappedInvoker.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/ChannelWrappedInvoker.java
index 4c78c1cceb..281beecbcd 100644
--- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/ChannelWrappedInvoker.java
+++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/ChannelWrappedInvoker.java
@@ -47,7 +47,7 @@
         super(serviceType, url, new String[]{Constants.GROUP_KEY, Constants.TOKEN_KEY, Constants.TIMEOUT_KEY});
         this.channel = channel;
         this.serviceKey = serviceKey;
-        this.currentClient = new HeaderExchangeClient(new ChannelWrapper(this.channel), false);
+        this.currentClient = new HeaderExchangeClient(new ChannelWrapper(this.channel));
     }
 
     @Override


With regards,
Apache Git Services