You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by pt...@apache.org on 2022/12/14 18:36:06 UTC

[ignite-3] branch main updated: IGNITE-18071 Add client-side heartbeat timeout (#1448)

This is an automated email from the ASF dual-hosted git repository.

ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new c7ea5c8bd5 IGNITE-18071 Add client-side heartbeat timeout (#1448)
c7ea5c8bd5 is described below

commit c7ea5c8bd5811316e5facb54cd8d72f501681137
Author: Pavel Tupitsyn <pt...@apache.org>
AuthorDate: Wed Dec 14 21:36:00 2022 +0300

    IGNITE-18071 Add client-side heartbeat timeout (#1448)
    
    * Add `IgniteClientConfiguration.heartbeatTimeout`.
    * Close connection when there is no response from server to heartbeat message within the specified timeout.
    * Minor fixes - add nullable annotations, improve logger handling.
    * Add test for `connectTimeout`.
    * Fix Netty buffer leak in case of connect timeout.
---
 .idea/inspectionProfiles/Project_Default.xml       |  2 +-
 .../org/apache/ignite/client/IgniteClient.java     | 27 ++++++++++++--
 .../ignite/client/IgniteClientConfiguration.java   | 18 +++++++--
 .../apache/ignite/internal/client/ClientUtils.java | 20 ++++++++++
 .../client/IgniteClientConfigurationImpl.java      | 16 +++++++-
 .../ignite/internal/client/ReliableChannel.java    |  4 +-
 .../ignite/internal/client/TcpClientChannel.java   | 43 +++++++++++++++++++---
 .../ignite/internal/client/TcpIgniteClient.java    | 10 +----
 .../apache/ignite/client/AbstractClientTest.java   |  2 +-
 .../apache/ignite/client/ClientComputeTest.java    |  6 +--
 .../org/apache/ignite/client/ConnectionTest.java   | 23 +++++++++++-
 .../org/apache/ignite/client/HeartbeatTest.java    | 28 ++++++++++++++
 .../org/apache/ignite/client/MultiClusterTest.java |  6 +--
 .../org/apache/ignite/client/RetryPolicyTest.java  |  4 +-
 .../ignite/client/TestClientHandlerModule.java     | 35 ++++++++++++++++++
 .../java/org/apache/ignite/client/TestServer.java  | 15 ++++++--
 16 files changed, 220 insertions(+), 39 deletions(-)

diff --git a/.idea/inspectionProfiles/Project_Default.xml b/.idea/inspectionProfiles/Project_Default.xml
index a77fb48328..2b5ac76e01 100644
--- a/.idea/inspectionProfiles/Project_Default.xml
+++ b/.idea/inspectionProfiles/Project_Default.xml
@@ -1185,4 +1185,4 @@
       <option name="ADD_NONJAVA_TO_ENTRIES" value="true" />
     </inspection_tool>
   </profile>
-</component>
\ No newline at end of file
+</component>
diff --git a/modules/client/src/main/java/org/apache/ignite/client/IgniteClient.java b/modules/client/src/main/java/org/apache/ignite/client/IgniteClient.java
index 9f5b5c123d..5580f2f331 100644
--- a/modules/client/src/main/java/org/apache/ignite/client/IgniteClient.java
+++ b/modules/client/src/main/java/org/apache/ignite/client/IgniteClient.java
@@ -19,6 +19,7 @@ package org.apache.ignite.client;
 
 import static org.apache.ignite.client.IgniteClientConfiguration.DFLT_CONNECT_TIMEOUT;
 import static org.apache.ignite.client.IgniteClientConfiguration.DFLT_HEARTBEAT_INTERVAL;
+import static org.apache.ignite.client.IgniteClientConfiguration.DFLT_HEARTBEAT_TIMEOUT;
 import static org.apache.ignite.client.IgniteClientConfiguration.DFLT_RECONNECT_THROTTLING_PERIOD;
 import static org.apache.ignite.client.IgniteClientConfiguration.DFLT_RECONNECT_THROTTLING_RETRIES;
 import static org.apache.ignite.internal.client.ClientUtils.sync;
@@ -34,6 +35,7 @@ import org.apache.ignite.internal.client.IgniteClientConfigurationImpl;
 import org.apache.ignite.internal.client.TcpIgniteClient;
 import org.apache.ignite.lang.LoggerFactory;
 import org.apache.ignite.network.ClusterNode;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Ignite client entry point.
@@ -86,10 +88,14 @@ public interface IgniteClient extends Ignite {
         /** Heartbeat interval. */
         private long heartbeatInterval = DFLT_HEARTBEAT_INTERVAL;
 
+        /** Heartbeat timeout. */
+        private long heartbeatTimeout = DFLT_HEARTBEAT_TIMEOUT;
+
         /** Retry policy. */
-        private RetryPolicy retryPolicy = new RetryReadPolicy();
+        private @Nullable RetryPolicy retryPolicy = new RetryReadPolicy();
 
-        private LoggerFactory loggerFactory;
+        /** Logger factory. */
+        private @Nullable LoggerFactory loggerFactory;
 
         /**
          * Sets the addresses of Ignite server nodes within a cluster. An address can be an IP address or a hostname, with or without port.
@@ -116,7 +122,7 @@ public interface IgniteClient extends Ignite {
          * @param retryPolicy Retry policy.
          * @return This instance.
          */
-        public Builder retryPolicy(RetryPolicy retryPolicy) {
+        public Builder retryPolicy(@Nullable RetryPolicy retryPolicy) {
             this.retryPolicy = retryPolicy;
 
             return this;
@@ -240,6 +246,20 @@ public interface IgniteClient extends Ignite {
             return this;
         }
 
+        /**
+         * Sets the heartbeat message timeout, in milliseconds. Default is <code>5000</code>.
+         *
+         * <p>When a server does not respond to a heartbeat within the specified timeout, client will close the connection.
+         *
+         * @param heartbeatTimeout Heartbeat timeout.
+         * @return This instance.
+         */
+        public Builder heartbeatTimeout(long heartbeatTimeout) {
+            this.heartbeatTimeout = heartbeatTimeout;
+
+            return this;
+        }
+
         /**
          * Builds the client.
          *
@@ -263,6 +283,7 @@ public interface IgniteClient extends Ignite {
                     reconnectThrottlingRetries,
                     asyncContinuationExecutor,
                     heartbeatInterval,
+                    heartbeatTimeout,
                     retryPolicy,
                     loggerFactory
             );
diff --git a/modules/client/src/main/java/org/apache/ignite/client/IgniteClientConfiguration.java b/modules/client/src/main/java/org/apache/ignite/client/IgniteClientConfiguration.java
index 5fefcbfcd7..d957492bb1 100644
--- a/modules/client/src/main/java/org/apache/ignite/client/IgniteClientConfiguration.java
+++ b/modules/client/src/main/java/org/apache/ignite/client/IgniteClientConfiguration.java
@@ -36,12 +36,12 @@ public interface IgniteClientConfiguration {
     /** Default socket connect timeout, in milliseconds. */
     int DFLT_CONNECT_TIMEOUT = 5000;
 
+    /** Default heartbeat timeout, in milliseconds. */
+    int DFLT_HEARTBEAT_TIMEOUT = 5000;
+
     /** Default heartbeat interval, in milliseconds. */
     int DFLT_HEARTBEAT_INTERVAL = 30_000;
 
-    /** Default operation retry limit. */
-    int DFLT_RETRY_LIMIT = 5;
-
     /** Default reconnect throttling period. */
     long DFLT_RECONNECT_THROTTLING_PERIOD = 30_000L;
 
@@ -121,6 +121,18 @@ public interface IgniteClientConfiguration {
      */
     public long heartbeatInterval();
 
+    /**
+     * Gets the heartbeat message timeout, in milliseconds. Default is <code>5000</code>.
+     *
+     * <p>When a server does not respond to a heartbeat within the specified timeout, client will close the connection.
+     *
+     * <p>When thin client connection is idle (no operations are performed), heartbeat messages are sent periodically
+     * to keep the connection alive and detect potential half-open state.
+     *
+     * @return Heartbeat interval.
+     */
+    public long heartbeatTimeout();
+
     /**
      * Returns the logger factory. This factory will be used to create a logger instance when needed.
      *
diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/ClientUtils.java b/modules/client/src/main/java/org/apache/ignite/internal/client/ClientUtils.java
index fea4e78e20..3e0ba81d86 100644
--- a/modules/client/src/main/java/org/apache/ignite/internal/client/ClientUtils.java
+++ b/modules/client/src/main/java/org/apache/ignite/internal/client/ClientUtils.java
@@ -20,8 +20,12 @@ package org.apache.ignite.internal.client;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import org.apache.ignite.client.ClientOperationType;
+import org.apache.ignite.client.IgniteClientConfiguration;
 import org.apache.ignite.internal.client.proto.ClientOp;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.lang.IgniteException;
+import org.apache.ignite.lang.LoggerFactory;
 
 /**
  * Client utilities.
@@ -178,4 +182,20 @@ public class ClientUtils {
                 throw new UnsupportedOperationException("Invalid op code: " + opCode);
         }
     }
+
+    /**
+     * Gets a logger for the given class.
+     *
+     * @param cls Class.
+     * @return Logger.
+     */
+    public static <T> IgniteLogger logger(IgniteClientConfiguration cfg, Class<T> cls) {
+        var loggerFactory = cfg.loggerFactory() == null
+                ? (LoggerFactory) System::getLogger
+                : cfg.loggerFactory();
+
+        return loggerFactory == null
+                ? Loggers.voidLogger()
+                : Loggers.forClass(cls, loggerFactory);
+    }
 }
diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/IgniteClientConfigurationImpl.java b/modules/client/src/main/java/org/apache/ignite/internal/client/IgniteClientConfigurationImpl.java
index 842c53c8e2..2fa413b2c2 100644
--- a/modules/client/src/main/java/org/apache/ignite/internal/client/IgniteClientConfigurationImpl.java
+++ b/modules/client/src/main/java/org/apache/ignite/internal/client/IgniteClientConfigurationImpl.java
@@ -49,6 +49,9 @@ public final class IgniteClientConfigurationImpl implements IgniteClientConfigur
     /** Heartbeat interval. */
     private final long heartbeatInterval;
 
+    /** Heartbeat timout. */
+    private final long heartbeatTimeout;
+
     /** Retry policy. */
     private final RetryPolicy retryPolicy;
 
@@ -64,6 +67,7 @@ public final class IgniteClientConfigurationImpl implements IgniteClientConfigur
      * @param reconnectThrottlingRetries Reconnect throttling retries.
      * @param asyncContinuationExecutor Async continuation executor.
      * @param heartbeatInterval Heartbeat message interval.
+     * @param heartbeatTimeout Heartbeat message timeout.
      * @param retryPolicy Retry policy.
      * @param loggerFactory Logger factory which will be used to create a logger instance for this this particular client when needed.
      */
@@ -75,8 +79,9 @@ public final class IgniteClientConfigurationImpl implements IgniteClientConfigur
             int reconnectThrottlingRetries,
             Executor asyncContinuationExecutor,
             long heartbeatInterval,
-            RetryPolicy retryPolicy,
-            LoggerFactory loggerFactory
+            long heartbeatTimeout,
+            @Nullable RetryPolicy retryPolicy,
+            @Nullable LoggerFactory loggerFactory
     ) {
         this.addressFinder = addressFinder;
 
@@ -88,6 +93,7 @@ public final class IgniteClientConfigurationImpl implements IgniteClientConfigur
         this.reconnectThrottlingRetries = reconnectThrottlingRetries;
         this.asyncContinuationExecutor = asyncContinuationExecutor;
         this.heartbeatInterval = heartbeatInterval;
+        this.heartbeatTimeout = heartbeatTimeout;
         this.retryPolicy = retryPolicy;
         this.loggerFactory = loggerFactory;
     }
@@ -134,6 +140,12 @@ public final class IgniteClientConfigurationImpl implements IgniteClientConfigur
         return heartbeatInterval;
     }
 
+    /** {@inheritDoc} */
+    @Override
+    public long heartbeatTimeout() {
+        return heartbeatTimeout;
+    }
+
     /** {@inheritDoc} */
     @Override
     public @Nullable LoggerFactory loggerFactory() {
diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java b/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
index d739ba0598..04d80f52a0 100644
--- a/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
+++ b/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
@@ -114,10 +114,10 @@ public final class ReliableChannel implements AutoCloseable {
      * @param clientCfg Client config.
      */
     ReliableChannel(BiFunction<ClientChannelConfiguration, ClientConnectionMultiplexer, ClientChannel> chFactory,
-            IgniteClientConfiguration clientCfg, IgniteLogger log) {
+            IgniteClientConfiguration clientCfg) {
         this.clientCfg = Objects.requireNonNull(clientCfg, "clientCfg");
         this.chFactory = Objects.requireNonNull(chFactory, "chFactory");
-        this.log = Objects.requireNonNull(log, "log");
+        this.log = ClientUtils.logger(clientCfg, ReliableChannel.class);
 
         connMgr = new NettyClientConnectionMultiplexer();
         connMgr.start(clientCfg);
diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
index 4ca527d49b..6f7bde9e2c 100644
--- a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
+++ b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
@@ -38,6 +38,7 @@ import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Consumer;
@@ -53,6 +54,7 @@ import org.apache.ignite.internal.client.proto.ClientOp;
 import org.apache.ignite.internal.client.proto.ProtocolVersion;
 import org.apache.ignite.internal.client.proto.ResponseFlags;
 import org.apache.ignite.internal.client.proto.ServerMessageType;
+import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.lang.IgniteException;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.NetworkAddress;
@@ -97,9 +99,15 @@ class TcpClientChannel implements ClientChannel, ClientMessageHandler, ClientCon
     /** Connect timeout in milliseconds. */
     private final long connectTimeout;
 
+    /** Heartbeat timeout in milliseconds. */
+    private final long heartbeatTimeout;
+
     /** Heartbeat timer. */
     private final Timer heartbeatTimer;
 
+    /** Logger. */
+    private final IgniteLogger log;
+
     /** Last send operation timestamp. */
     private volatile long lastSendMillis;
 
@@ -112,11 +120,14 @@ class TcpClientChannel implements ClientChannel, ClientMessageHandler, ClientCon
     TcpClientChannel(ClientChannelConfiguration cfg, ClientConnectionMultiplexer connMgr) {
         validateConfiguration(cfg);
 
+        log = ClientUtils.logger(cfg.clientConfiguration(), TcpClientChannel.class);
+
         asyncContinuationExecutor = cfg.clientConfiguration().asyncContinuationExecutor() == null
                 ? ForkJoinPool.commonPool()
                 : cfg.clientConfiguration().asyncContinuationExecutor();
 
         connectTimeout = cfg.clientConfiguration().connectTimeout();
+        heartbeatTimeout = cfg.clientConfiguration().heartbeatTimeout();
 
         sock = connMgr.open(cfg.getAddress(), this, this);
 
@@ -136,7 +147,7 @@ class TcpClientChannel implements ClientChannel, ClientMessageHandler, ClientCon
     /**
      * Close the channel with cause.
      */
-    private void close(Exception cause) {
+    private void close(@Nullable Exception cause) {
         if (closed.compareAndSet(false, true)) {
             // Disconnect can happen before we initialize the timer.
             var timer = heartbeatTimer;
@@ -312,7 +323,7 @@ class TcpClientChannel implements ClientChannel, ClientMessageHandler, ClientCon
      * @param unpacker Unpacker.
      * @return Exception.
      */
-    private IgniteException readError(ClientMessageUnpacker unpacker) {
+    private static IgniteException readError(ClientMessageUnpacker unpacker) {
         var traceId = unpacker.unpackUuid();
         var code = unpacker.unpackInt();
         var errClassName = unpacker.unpackString();
@@ -379,8 +390,14 @@ class TcpClientChannel implements ClientChannel, ClientMessageHandler, ClientCon
         try {
             handshakeReq(ver);
 
-            var res = connectTimeout > 0 ? fut.get(connectTimeout, TimeUnit.MILLISECONDS) : fut.get();
-            handshakeRes(res, ver);
+            // handshakeRes must be called even in case of timeout to release the buffer.
+            var resFut = fut.thenAccept(res -> handshakeRes(res, ver));
+
+            if (connectTimeout > 0) {
+                resFut.get(connectTimeout, TimeUnit.MILLISECONDS);
+            } else {
+                resFut.get();
+            }
         } catch (Throwable e) {
             throw IgniteException.wrap(e);
         }
@@ -498,7 +515,7 @@ class TcpClientChannel implements ClientChannel, ClientMessageHandler, ClientCon
         private final long interval;
 
         /** Constructor. */
-        public HeartbeatTask(long interval) {
+        HeartbeatTask(long interval) {
             this.interval = interval;
         }
 
@@ -506,7 +523,21 @@ class TcpClientChannel implements ClientChannel, ClientMessageHandler, ClientCon
         @Override public void run() {
             try {
                 if (System.currentTimeMillis() - lastSendMillis > interval) {
-                    serviceAsync(ClientOp.HEARTBEAT, null, null);
+                    var fut = serviceAsync(ClientOp.HEARTBEAT, null, null);
+
+                    if (connectTimeout > 0) {
+                        fut
+                                .orTimeout(heartbeatTimeout, TimeUnit.MILLISECONDS)
+                                .exceptionally(e -> {
+                                    if (e instanceof TimeoutException) {
+                                        log.warn("Heartbeat timeout, closing the channel");
+
+                                        close((TimeoutException) e);
+                                    }
+
+                                    return null;
+                                });
+                    }
                 }
             } catch (Throwable ignored) {
                 // Ignore failed heartbeats.
diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpIgniteClient.java b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpIgniteClient.java
index 2ccf7506af..efc3227dde 100644
--- a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpIgniteClient.java
+++ b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpIgniteClient.java
@@ -34,8 +34,6 @@ import org.apache.ignite.internal.client.sql.ClientSql;
 import org.apache.ignite.internal.client.table.ClientTables;
 import org.apache.ignite.internal.client.tx.ClientTransactions;
 import org.apache.ignite.internal.jdbc.proto.ClientMessage;
-import org.apache.ignite.internal.logger.Loggers;
-import org.apache.ignite.lang.LoggerFactory;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.NetworkAddress;
 import org.apache.ignite.sql.IgniteSql;
@@ -88,13 +86,7 @@ public class TcpIgniteClient implements IgniteClient {
 
         this.cfg = cfg;
 
-        var loggerFactory = cfg.loggerFactory() == null
-                ? (LoggerFactory) System::getLogger
-                : cfg.loggerFactory();
-
-        var log = Loggers.forClass(TcpIgniteClient.class, loggerFactory);
-
-        ch = new ReliableChannel(chFactory, cfg, log);
+        ch = new ReliableChannel(chFactory, cfg);
         tables = new ClientTables(ch);
         transactions = new ClientTransactions(ch);
         compute = new ClientCompute(ch, tables);
diff --git a/modules/client/src/test/java/org/apache/ignite/client/AbstractClientTest.java b/modules/client/src/test/java/org/apache/ignite/client/AbstractClientTest.java
index 422e56993b..efb03a47f8 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/AbstractClientTest.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/AbstractClientTest.java
@@ -136,7 +136,7 @@ public abstract class AbstractClientTest {
             Ignite ignite,
             String nodeName
     ) {
-        return new TestServer(port, portRange, idleTimeout, ignite, null, nodeName, clusterId);
+        return new TestServer(port, portRange, idleTimeout, ignite, null, null, nodeName, clusterId);
     }
 
     /**
diff --git a/modules/client/src/test/java/org/apache/ignite/client/ClientComputeTest.java b/modules/client/src/test/java/org/apache/ignite/client/ClientComputeTest.java
index 22c23493fa..db8db0311d 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/ClientComputeTest.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/ClientComputeTest.java
@@ -176,9 +176,9 @@ public class ClientComputeTest {
 
         var clusterId = UUID.randomUUID();
 
-        server1 = new TestServer(10900, 10, 0, ignite, shouldDropConnection, "s1", clusterId);
-        server2 = new TestServer(10910, 10, 0, ignite, shouldDropConnection, "s2", clusterId);
-        server3 = new TestServer(10920, 10, 0, ignite, shouldDropConnection, "s3", clusterId);
+        server1 = new TestServer(10900, 10, 0, ignite, shouldDropConnection, null, "s1", clusterId);
+        server2 = new TestServer(10910, 10, 0, ignite, shouldDropConnection, null, "s2", clusterId);
+        server3 = new TestServer(10920, 10, 0, ignite, shouldDropConnection, null, "s3", clusterId);
     }
 
     private Set<ClusterNode> getClusterNodes(String... names) {
diff --git a/modules/client/src/test/java/org/apache/ignite/client/ConnectionTest.java b/modules/client/src/test/java/org/apache/ignite/client/ConnectionTest.java
index a6ee3677cf..7c6f03aba8 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/ConnectionTest.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/ConnectionTest.java
@@ -17,11 +17,17 @@
 
 package org.apache.ignite.client;
 
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.endsWith;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
+import java.util.UUID;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Function;
+import org.apache.ignite.client.IgniteClient.Builder;
+import org.apache.ignite.client.fakes.FakeIgnite;
 import org.apache.ignite.lang.IgniteException;
 import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
@@ -70,7 +76,22 @@ public class ConnectionTest extends AbstractClientTest {
         testConnection("[::1]:" + serverPort);
     }
 
-    private void testConnection(String... addrs) throws Exception {
+    @SuppressWarnings("ThrowableNotThrown")
+    @Test
+    public void testNoResponseFromServerWithinConnectTimeoutThrowsException() throws Exception {
+        Function<Integer, Integer> responseDelay = x -> 500;
+
+        try (var srv = new TestServer(10800, 10, 300, new FakeIgnite(), x -> false, responseDelay, null, UUID.randomUUID())) {
+            Builder builder = IgniteClient.builder()
+                    .addresses("127.0.0.1:" + srv.port())
+                    .retryPolicy(new RetryLimitPolicy().retryLimit(1))
+                    .connectTimeout(50);
+
+            assertThrowsWithCause(builder::build, TimeoutException.class);
+        }
+    }
+
+    private static void testConnection(String... addrs) throws Exception {
         AbstractClientTest.startClient(addrs).close();
     }
 }
diff --git a/modules/client/src/test/java/org/apache/ignite/client/HeartbeatTest.java b/modules/client/src/test/java/org/apache/ignite/client/HeartbeatTest.java
index c0cfe67c16..8a797702a9 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/HeartbeatTest.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/HeartbeatTest.java
@@ -20,8 +20,11 @@ package org.apache.ignite.client;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
+import java.util.UUID;
+import java.util.function.Function;
 import org.apache.ignite.client.IgniteClient.Builder;
 import org.apache.ignite.client.fakes.FakeIgnite;
+import org.apache.ignite.internal.testframework.IgniteTestUtils;
 import org.junit.jupiter.api.Test;
 
 /**
@@ -75,4 +78,29 @@ public class HeartbeatTest {
             assertEquals("Negative delay.", ex.getMessage());
         }
     }
+
+    @Test
+    public void testHeartbeatTimeoutClosesConnection() throws Exception {
+        Function<Integer, Integer> responseDelayFunc = requestCount -> requestCount > 1 ? 500 : 0;
+        var loggerFactory = new TestLoggerFactory("client");
+
+        try (var srv = new TestServer(10800, 10, 300, new FakeIgnite(), x -> false, responseDelayFunc, null, UUID.randomUUID())) {
+            int srvPort = srv.port();
+
+            Builder builder = IgniteClient.builder()
+                    .addresses("127.0.0.1:" + srvPort)
+                    .retryPolicy(new RetryLimitPolicy().retryLimit(1))
+                    .heartbeatTimeout(30)
+                    .reconnectThrottlingPeriod(5000)
+                    .reconnectThrottlingRetries(0)
+                    .heartbeatInterval(50)
+                    .loggerFactory(loggerFactory);
+
+            try (var ignored = builder.build()) {
+                IgniteTestUtils.waitForCondition(
+                        () -> loggerFactory.logger.entries().stream().anyMatch(x -> x.contains("Heartbeat timeout, closing the channel")),
+                        3000);
+            }
+        }
+    }
 }
diff --git a/modules/client/src/test/java/org/apache/ignite/client/MultiClusterTest.java b/modules/client/src/test/java/org/apache/ignite/client/MultiClusterTest.java
index 9af2164593..7955e58739 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/MultiClusterTest.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/MultiClusterTest.java
@@ -49,8 +49,8 @@ public class MultiClusterTest {
 
     @BeforeEach
     void setUp() {
-        server1 = new TestServer(10900, 10, 0, new FakeIgnite(), null, "s1", clusterId1);
-        server2 = new TestServer(10900, 10, 0, new FakeIgnite(), null, "s2", clusterId2);
+        server1 = new TestServer(10900, 10, 0, new FakeIgnite(), null, null, "s1", clusterId1);
+        server2 = new TestServer(10900, 10, 0, new FakeIgnite(), null, null, "s2", clusterId2);
     }
 
     @AfterEach
@@ -91,7 +91,7 @@ public class MultiClusterTest {
             client.tables().tables();
 
             server1.close();
-            server1 = new TestServer(10900, 10, 0, new FakeIgnite(), null, "s1", clusterId2);
+            server1 = new TestServer(10900, 10, 0, new FakeIgnite(), null, null, "s1", clusterId2);
 
             IgniteClientConnectionException ex = (IgniteClientConnectionException) assertThrowsWithCause(
                     () -> client.tables().tables(), IgniteClientConnectionException.class, "Cluster ID mismatch");
diff --git a/modules/client/src/test/java/org/apache/ignite/client/RetryPolicyTest.java b/modules/client/src/test/java/org/apache/ignite/client/RetryPolicyTest.java
index 35b7ee1297..cbcde03218 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/RetryPolicyTest.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/RetryPolicyTest.java
@@ -216,7 +216,7 @@ public class RetryPolicyTest {
     @Test
     public void testRetryReadPolicyAllOperationsSupported() throws IllegalAccessException {
         var plc = new RetryReadPolicy();
-        var cfg = new IgniteClientConfigurationImpl(null, null, 0, 0, 0, null, 0, null, null);
+        var cfg = new IgniteClientConfigurationImpl(null, null, 0, 0, 0, null, 0, 0, null, null);
 
         for (var op : ClientOperationType.values()) {
             var ctx = new RetryPolicyContextImpl(cfg, op, 0, null);
@@ -265,6 +265,6 @@ public class RetryPolicyTest {
         FakeIgnite ign = new FakeIgnite();
         ((FakeIgniteTables) ign.tables()).createTable("t");
 
-        server = new TestServer(10900, 10, 0, ign, shouldDropConnection, null, UUID.randomUUID());
+        server = new TestServer(10900, 10, 0, ign, shouldDropConnection, null, null, UUID.randomUUID());
     }
 }
diff --git a/modules/client/src/test/java/org/apache/ignite/client/TestClientHandlerModule.java b/modules/client/src/test/java/org/apache/ignite/client/TestClientHandlerModule.java
index bc23b933bc..6995ecd021 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/TestClientHandlerModule.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/TestClientHandlerModule.java
@@ -60,6 +60,9 @@ public class TestClientHandlerModule implements IgniteComponent {
     /** Connection drop condition. */
     private final Function<Integer, Boolean> shouldDropConnection;
 
+    /** Server response delay function. */
+    private final Function<Integer, Integer> responseDelay;
+
     /** Cluster service. */
     private final ClusterService clusterService;
 
@@ -82,6 +85,7 @@ public class TestClientHandlerModule implements IgniteComponent {
      * @param registry Configuration registry.
      * @param bootstrapFactory Bootstrap factory.
      * @param shouldDropConnection Connection drop condition.
+     * @param responseDelay Response delay, in milliseconds.
      * @param clusterService Cluster service.
      * @param compute Compute.
      * @param clusterId Cluster id.
@@ -91,6 +95,7 @@ public class TestClientHandlerModule implements IgniteComponent {
             ConfigurationRegistry registry,
             NettyBootstrapFactory bootstrapFactory,
             Function<Integer, Boolean> shouldDropConnection,
+            @Nullable Function<Integer, Integer> responseDelay,
             ClusterService clusterService,
             IgniteCompute compute,
             UUID clusterId) {
@@ -102,6 +107,7 @@ public class TestClientHandlerModule implements IgniteComponent {
         this.registry = registry;
         this.bootstrapFactory = bootstrapFactory;
         this.shouldDropConnection = shouldDropConnection;
+        this.responseDelay = responseDelay;
         this.clusterService = clusterService;
         this.compute = compute;
         this.clusterId = clusterId;
@@ -166,6 +172,7 @@ public class TestClientHandlerModule implements IgniteComponent {
                         ch.pipeline().addLast(
                                 new ClientMessageDecoder(),
                                 new ConnectionDropHandler(requestCounter, shouldDropConnection),
+                                new ResponseDelayHandler(responseDelay),
                                 new ClientInboundMessageHandler(
                                         (IgniteTablesInternal) ignite.tables(),
                                         ignite.transactions(),
@@ -230,4 +237,32 @@ public class TestClientHandlerModule implements IgniteComponent {
             }
         }
     }
+
+    private static class ResponseDelayHandler extends ChannelInboundHandlerAdapter {
+        /** Delay. */
+        private final Function<Integer, Integer> delay;
+
+        /** Counter. */
+        private final AtomicInteger cnt = new AtomicInteger();
+
+        /**
+         * Constructor.
+         *
+         * @param delay Delay.
+         */
+        private ResponseDelayHandler(@Nullable Function<Integer, Integer> delay) {
+            this.delay = delay;
+        }
+
+        @Override
+        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+            var delayMs = delay == null ? 0 : delay.apply(cnt.incrementAndGet());
+
+            if (delayMs > 0) {
+                Thread.sleep(delayMs);
+            }
+
+            super.channelRead(ctx, msg);
+        }
+    }
 }
diff --git a/modules/client/src/test/java/org/apache/ignite/client/TestServer.java b/modules/client/src/test/java/org/apache/ignite/client/TestServer.java
index df1e9d8045..432c777224 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/TestServer.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/TestServer.java
@@ -74,7 +74,7 @@ public class TestServer implements AutoCloseable {
             long idleTimeout,
             Ignite ignite
     ) {
-        this(port, portRange, idleTimeout, ignite, null, null, UUID.randomUUID());
+        this(port, portRange, idleTimeout, ignite, null, null, null, UUID.randomUUID());
     }
 
     /**
@@ -91,7 +91,8 @@ public class TestServer implements AutoCloseable {
             long idleTimeout,
             Ignite ignite,
             @Nullable Function<Integer, Boolean> shouldDropConnection,
-            String nodeName,
+            @Nullable Function<Integer, Integer> responseDelay,
+            @Nullable String nodeName,
             UUID clusterId
     ) {
         cfg = new ConfigurationRegistry(
@@ -131,7 +132,15 @@ public class TestServer implements AutoCloseable {
                 compute.executeColocated(anyString(), any(), anyString(), any())).thenReturn(CompletableFuture.completedFuture(nodeName));
 
         module = shouldDropConnection != null
-                ? new TestClientHandlerModule(ignite, cfg, bootstrapFactory, shouldDropConnection, clusterService, compute, clusterId)
+                ? new TestClientHandlerModule(
+                        ignite,
+                        cfg,
+                        bootstrapFactory,
+                        shouldDropConnection,
+                        responseDelay,
+                        clusterService,
+                        compute,
+                        clusterId)
                 : new ClientHandlerModule(
                         ((FakeIgnite) ignite).queryEngine(),
                         (IgniteTablesInternal) ignite.tables(),