You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by pt...@apache.org on 2022/03/21 14:06:12 UTC

[ignite-3] branch main updated: IGNITE-16532 Java thin: add heartbeats (#737)

This is an automated email from the ASF dual-hosted git repository.

ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new a9b9f0d  IGNITE-16532 Java thin: add heartbeats (#737)
a9b9f0d is described below

commit a9b9f0d22d0cc61e8f180134b0d45a6b51bd9378
Author: Pavel Tupitsyn <pt...@apache.org>
AuthorDate: Mon Mar 21 17:06:08 2022 +0300

    IGNITE-16532 Java thin: add heartbeats (#737)
    
    Implement heartbeats in Java thin client: https://cwiki.apache.org/confluence/display/IGNITE/IEP-83+Thin+Client+Keepalive
    
    * Add `IgniteClientConfiguration.heartbeatInterval`, default 30 seconds.
    * Add `ClientConnectorConfiguration.idleTimeout`, default 0 (disabled).
    * Add `idleTimeout` to handshake.
    * Set effective heartbeat interval to `Math.min(heartbeatInterval, idleTimeout / 3)`.
---
 .../ClientConnectorConfigurationSchema.java        |  5 ++
 .../ignite/internal/client/proto/ClientOp.java     |  4 +-
 .../ignite/client/handler/ItClientHandlerTest.java |  3 +-
 .../ignite/client/handler/ClientHandlerModule.java | 27 ++++++-
 .../handler/ClientInboundMessageHandler.java       | 16 +++-
 .../org/apache/ignite/client/IgniteClient.java     | 26 ++++++-
 .../ignite/client/IgniteClientConfiguration.java   | 18 ++++-
 .../client/IgniteClientConfigurationImpl.java      | 14 +++-
 .../ignite/internal/client/ProtocolContext.java    | 18 ++++-
 .../ignite/internal/client/TcpClientChannel.java   | 91 +++++++++++++++++++++-
 .../apache/ignite/client/AbstractClientTest.java   |  6 +-
 .../org/apache/ignite/client/HeartbeatTest.java    | 66 ++++++++++++++++
 .../org/apache/ignite/client/ReconnectTest.java    |  3 +
 .../java/org/apache/ignite/client/TestServer.java  |  4 +-
 .../dotnet/Apache.Ignite.Tests/FakeServer.cs       |  4 +-
 .../RawSocketConnectionTests.cs                    |  3 +-
 .../dotnet/Apache.Ignite/Internal/ClientSocket.cs  |  1 +
 17 files changed, 289 insertions(+), 20 deletions(-)

diff --git a/modules/api/src/main/java/org/apache/ignite/configuration/schemas/clientconnector/ClientConnectorConfigurationSchema.java b/modules/api/src/main/java/org/apache/ignite/configuration/schemas/clientconnector/ClientConnectorConfigurationSchema.java
index 4d26120..23fdd1d 100644
--- a/modules/api/src/main/java/org/apache/ignite/configuration/schemas/clientconnector/ClientConnectorConfigurationSchema.java
+++ b/modules/api/src/main/java/org/apache/ignite/configuration/schemas/clientconnector/ClientConnectorConfigurationSchema.java
@@ -44,4 +44,9 @@ public class ClientConnectorConfigurationSchema {
     @Min(0)
     @Value(hasDefault = true)
     public final int connectTimeout = 5000;
+
+    /** Idle timeout. */
+    @Min(0)
+    @Value(hasDefault = true)
+    public final long idleTimeout = 0;
 }
diff --git a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientOp.java b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientOp.java
index 0606e14..8916f00 100644
--- a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientOp.java
+++ b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientOp.java
@@ -21,8 +21,8 @@ package org.apache.ignite.internal.client.proto;
  * Client operation codes.
  */
 public class ClientOp {
-    /** Create table. */
-    public static final int TABLE_CREATE = 1;
+    /** Heartbeat. */
+    public static final int HEARTBEAT = 1;
 
     /** Drop table. */
     public static final int TABLE_DROP = 2;
diff --git a/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/ItClientHandlerTest.java b/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/ItClientHandlerTest.java
index 93fe8db..176a2d9 100644
--- a/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/ItClientHandlerTest.java
+++ b/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/ItClientHandlerTest.java
@@ -106,6 +106,7 @@ public class ItClientHandlerTest {
 
             packer.packBinaryHeader(0); // Features.
             packer.packMapHeader(0); // Extensions.
+            packer.packInt(0); // Idle timeout.
 
             out.write(packer.toByteArray());
             out.flush();
@@ -127,7 +128,7 @@ public class ItClientHandlerTest {
             unpacker.skipValue(extensionsLen);
 
             assertArrayEquals(MAGIC, magic);
-            assertEquals(7, len);
+            assertEquals(8, len);
             assertEquals(3, major);
             assertEquals(0, minor);
             assertEquals(0, patch);
diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java
index 41c2c9e..25cc93b 100644
--- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java
+++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java
@@ -19,11 +19,17 @@ package org.apache.ignite.client.handler;
 
 import io.netty.bootstrap.ServerBootstrap;
 import io.netty.channel.Channel;
+import io.netty.channel.ChannelDuplexHandler;
 import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelOption;
+import io.netty.handler.timeout.IdleState;
+import io.netty.handler.timeout.IdleStateEvent;
+import io.netty.handler.timeout.IdleStateHandler;
 import java.net.BindException;
 import java.net.SocketAddress;
+import java.util.concurrent.TimeUnit;
 import org.apache.ignite.configuration.schemas.clientconnector.ClientConnectorConfiguration;
 import org.apache.ignite.internal.client.proto.ClientMessageDecoder;
 import org.apache.ignite.internal.configuration.ConfigurationRegistry;
@@ -143,9 +149,17 @@ public class ClientHandlerModule implements IgniteComponent {
         bootstrap.childHandler(new ChannelInitializer<>() {
                     @Override
                     protected void initChannel(Channel ch) {
+                        if (configuration.idleTimeout() > 0) {
+                            IdleStateHandler idleStateHandler = new IdleStateHandler(
+                                    configuration.idleTimeout(), 0, 0, TimeUnit.MILLISECONDS);
+
+                            ch.pipeline().addLast(idleStateHandler);
+                            ch.pipeline().addLast(new IdleChannelHandler());
+                        }
+
                         ch.pipeline().addLast(
                                 new ClientMessageDecoder(),
-                                new ClientInboundMessageHandler(igniteTables, igniteTransactions, queryProcessor));
+                                new ClientInboundMessageHandler(igniteTables, igniteTransactions, queryProcessor, configuration));
                     }
                 })
                 .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, configuration.connectTimeout());
@@ -176,4 +190,15 @@ public class ClientHandlerModule implements IgniteComponent {
 
         return ch.closeFuture();
     }
+
+    /** Idle channel state handler. */
+    private static class IdleChannelHandler extends ChannelDuplexHandler {
+        /** {@inheritDoc} */
+        @Override
+        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
+            if (evt instanceof IdleStateEvent && ((IdleStateEvent) evt).state() == IdleState.READER_IDLE) {
+                ctx.close();
+            }
+        }
+    }
 }
diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
index 7ea6093..c625ee4 100644
--- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
+++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
@@ -59,6 +59,7 @@ import org.apache.ignite.client.handler.requests.tx.ClientTransactionBeginReques
 import org.apache.ignite.client.handler.requests.tx.ClientTransactionCommitRequest;
 import org.apache.ignite.client.handler.requests.tx.ClientTransactionRollbackRequest;
 import org.apache.ignite.client.proto.query.JdbcQueryEventHandler;
+import org.apache.ignite.configuration.schemas.clientconnector.ClientConnectorView;
 import org.apache.ignite.internal.client.proto.ClientErrorCode;
 import org.apache.ignite.internal.client.proto.ClientMessageCommon;
 import org.apache.ignite.internal.client.proto.ClientMessagePacker;
@@ -92,26 +93,33 @@ public class ClientInboundMessageHandler extends ChannelInboundHandlerAdapter {
     /** Connection resources. */
     private final ClientResourceRegistry resources = new ClientResourceRegistry();
 
+    /** Configuration. */
+    private final ClientConnectorView configuration;
+
     /** Context. */
     private ClientContext clientContext;
 
     /**
      * Constructor.
      *
-     * @param igniteTables       Ignite tables API entry point.
+     * @param igniteTables      Ignite tables API entry point.
      * @param igniteTransactions Transactions API.
      * @param processor          Sql query processor.
+     * @param configuration      Configuration.
      */
     public ClientInboundMessageHandler(
             IgniteTables igniteTables,
             IgniteTransactions igniteTransactions,
-            QueryProcessor processor) {
+            QueryProcessor processor,
+            ClientConnectorView configuration) {
         assert igniteTables != null;
         assert igniteTransactions != null;
         assert processor != null;
+        assert configuration != null;
 
         this.igniteTables = igniteTables;
         this.igniteTransactions = igniteTransactions;
+        this.configuration = configuration;
 
         this.jdbcQueryEventHandler = new JdbcQueryEventHandlerImpl(processor, new JdbcMetadataCatalog(igniteTables));
     }
@@ -167,6 +175,7 @@ public class ClientInboundMessageHandler extends ChannelInboundHandlerAdapter {
             packer.packInt(ClientErrorCode.SUCCESS);
             packer.packBinaryHeader(0); // Features.
             packer.packMapHeader(0); // Extensions.
+            packer.packLong(configuration.idleTimeout());
 
             write(packer, ctx);
         } catch (Throwable t) {
@@ -280,6 +289,9 @@ public class ClientInboundMessageHandler extends ChannelInboundHandlerAdapter {
             int opCode
     ) {
         switch (opCode) {
+            case ClientOp.HEARTBEAT:
+                return null;
+
             case ClientOp.TABLE_DROP:
                 return ClientTableDropRequest.process(in, igniteTables);
 
diff --git a/modules/client/src/main/java/org/apache/ignite/client/IgniteClient.java b/modules/client/src/main/java/org/apache/ignite/client/IgniteClient.java
index 8096696..604b8bb 100644
--- a/modules/client/src/main/java/org/apache/ignite/client/IgniteClient.java
+++ b/modules/client/src/main/java/org/apache/ignite/client/IgniteClient.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.client;
 
 import static org.apache.ignite.client.IgniteClientConfiguration.DFLT_CONNECT_TIMEOUT;
+import static org.apache.ignite.client.IgniteClientConfiguration.DFLT_HEARTBEAT_INTERVAL;
 import static org.apache.ignite.client.IgniteClientConfiguration.DFLT_RECONNECT_THROTTLING_PERIOD;
 import static org.apache.ignite.client.IgniteClientConfiguration.DFLT_RECONNECT_THROTTLING_RETRIES;
 import static org.apache.ignite.client.IgniteClientConfiguration.DFLT_RETRY_LIMIT;
@@ -53,6 +54,7 @@ public interface IgniteClient extends Ignite {
     }
 
     /** Client builder. */
+    @SuppressWarnings("PublicInnerClass")
     class Builder {
         /** Addresses. */
         private String[] addresses;
@@ -75,6 +77,9 @@ public interface IgniteClient extends Ignite {
         /** Async continuation executor. */
         private Executor asyncContinuationExecutor;
 
+        /** Heartbeat interval. */
+        private long heartbeatInterval = DFLT_HEARTBEAT_INTERVAL;
+
         /**
          * Sets the addresses of Ignite server nodes within a cluster. An address can be an IP address or a hostname, with or without port.
          * If port is not set then Ignite will generate multiple addresses for default port range. See {@link
@@ -193,6 +198,24 @@ public interface IgniteClient extends Ignite {
         }
 
         /**
+         * Sets the heartbeat message interval, in milliseconds. Default is <code>30_000</code>.
+         *
+         * <p>When server-side idle timeout is not zero, effective heartbeat
+         * interval is set to <code>min(heartbeatInterval, idleTimeout / 3)</code>.
+         *
+         * <p>When thin client connection is idle (no operations are performed), heartbeat messages are sent periodically
+         * to keep the connection alive and detect potential half-open state.
+         *
+         * @param heartbeatInterval Heartbeat interval.
+         * @return This instance.
+         */
+        public Builder heartbeatInterval(long heartbeatInterval) {
+            this.heartbeatInterval = heartbeatInterval;
+
+            return this;
+        }
+
+        /**
          * Builds the client.
          *
          * @return Ignite client.
@@ -214,7 +237,8 @@ public interface IgniteClient extends Ignite {
                     connectTimeout,
                     reconnectThrottlingPeriod,
                     reconnectThrottlingRetries,
-                    asyncContinuationExecutor);
+                    asyncContinuationExecutor,
+                    heartbeatInterval);
 
             return TcpIgniteClient.startAsync(cfg);
         }
diff --git a/modules/client/src/main/java/org/apache/ignite/client/IgniteClientConfiguration.java b/modules/client/src/main/java/org/apache/ignite/client/IgniteClientConfiguration.java
index dfde409..4697c66 100644
--- a/modules/client/src/main/java/org/apache/ignite/client/IgniteClientConfiguration.java
+++ b/modules/client/src/main/java/org/apache/ignite/client/IgniteClientConfiguration.java
@@ -32,9 +32,12 @@ public interface IgniteClientConfiguration {
     /** Default port range. */
     int DFLT_PORT_RANGE = 100;
 
-    /** Default socket connect timeout. */
+    /** Default socket connect timeout, in milliseconds. */
     int DFLT_CONNECT_TIMEOUT = 5000;
 
+    /** Default heartbeat interval, in milliseconds. */
+    int DFLT_HEARTBEAT_INTERVAL = 30_000;
+
     /** Default operation retry limit. */
     int DFLT_RETRY_LIMIT = 5;
 
@@ -102,4 +105,17 @@ public interface IgniteClientConfiguration {
      * @return Executor for async continuations.
      */
     @Nullable Executor asyncContinuationExecutor();
+
+    /**
+     * Gets the heartbeat message interval, in milliseconds. Default is <code>30_000</code>.
+     *
+     * <p>When server-side idle timeout is not zero, effective heartbeat
+     * interval is set to <code>min(heartbeatInterval, idleTimeout / 3)</code>.
+     *
+     * <p>When thin client connection is idle (no operations are performed), heartbeat messages are sent periodically
+     * to keep the connection alive and detect potential half-open state.
+     *
+     * @return Heartbeat interval.
+     */
+    public long heartbeatInterval();
 }
diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/IgniteClientConfigurationImpl.java b/modules/client/src/main/java/org/apache/ignite/internal/client/IgniteClientConfigurationImpl.java
index f084dfc..40f0919 100644
--- a/modules/client/src/main/java/org/apache/ignite/internal/client/IgniteClientConfigurationImpl.java
+++ b/modules/client/src/main/java/org/apache/ignite/internal/client/IgniteClientConfigurationImpl.java
@@ -47,6 +47,9 @@ public final class IgniteClientConfigurationImpl implements IgniteClientConfigur
     /** Async continuation executor. */
     private final Executor asyncContinuationExecutor;
 
+    /** Heartbeat interval. */
+    private final long heartbeatInterval;
+
     /**
      * Constructor.
      *
@@ -55,6 +58,7 @@ public final class IgniteClientConfigurationImpl implements IgniteClientConfigur
      * @param retryLimit                Retry limit.
      * @param connectTimeout            Socket connect timeout.
      * @param asyncContinuationExecutor Async continuation executor.
+     * @param heartbeatInterval         Heartbeat message interval.
      */
     public IgniteClientConfigurationImpl(
             IgniteClientAddressFinder addressFinder,
@@ -63,7 +67,8 @@ public final class IgniteClientConfigurationImpl implements IgniteClientConfigur
             long connectTimeout,
             long reconnectThrottlingPeriod,
             int reconnectThrottlingRetries,
-            Executor asyncContinuationExecutor) {
+            Executor asyncContinuationExecutor,
+            long heartbeatInterval) {
         this.addressFinder = addressFinder;
 
         //noinspection AssignmentOrReturnOfFieldWithMutableType (cloned in Builder).
@@ -74,6 +79,7 @@ public final class IgniteClientConfigurationImpl implements IgniteClientConfigur
         this.reconnectThrottlingPeriod = reconnectThrottlingPeriod;
         this.reconnectThrottlingRetries = reconnectThrottlingRetries;
         this.asyncContinuationExecutor = asyncContinuationExecutor;
+        this.heartbeatInterval = heartbeatInterval;
     }
 
     /** {@inheritDoc} */
@@ -117,4 +123,10 @@ public final class IgniteClientConfigurationImpl implements IgniteClientConfigur
     public @Nullable Executor asyncContinuationExecutor() {
         return asyncContinuationExecutor;
     }
+
+    /** {@inheritDoc} */
+    @Override
+    public long heartbeatInterval() {
+        return heartbeatInterval;
+    }
 }
diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/ProtocolContext.java b/modules/client/src/main/java/org/apache/ignite/internal/client/ProtocolContext.java
index 59cde78..70a1418 100644
--- a/modules/client/src/main/java/org/apache/ignite/internal/client/ProtocolContext.java
+++ b/modules/client/src/main/java/org/apache/ignite/internal/client/ProtocolContext.java
@@ -31,15 +31,20 @@ public class ProtocolContext {
     /** Features. */
     private final EnumSet<ProtocolBitmaskFeature> features;
 
+    /** Server idle timeout. */
+    private final long serverIdleTimeout;
+
     /**
      * Constructor.
      *
-     * @param ver      Protocol version.
+     * @param ver Protocol version.
      * @param features Supported features.
+     * @param serverIdleTimeout Server idle timeout.
      */
-    public ProtocolContext(ProtocolVersion ver, EnumSet<ProtocolBitmaskFeature> features) {
+    public ProtocolContext(ProtocolVersion ver, EnumSet<ProtocolBitmaskFeature> features, long serverIdleTimeout) {
         this.ver = ver;
         this.features = features != null ? features : EnumSet.noneOf(ProtocolBitmaskFeature.class);
+        this.serverIdleTimeout = serverIdleTimeout;
     }
 
     /**
@@ -81,4 +86,13 @@ public class ProtocolContext {
     public ProtocolVersion version() {
         return ver;
     }
+
+    /**
+     * Returns server idle timeout.
+     *
+     * @return Server idle timeout.
+     */
+    public long getServerIdleTimeout() {
+        return serverIdleTimeout;
+    }
 }
diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
index 81041f3..6ab3943 100644
--- a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
+++ b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
@@ -24,6 +24,8 @@ import java.net.InetSocketAddress;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executor;
@@ -43,6 +45,7 @@ import org.apache.ignite.internal.client.proto.ClientErrorCode;
 import org.apache.ignite.internal.client.proto.ClientMessageCommon;
 import org.apache.ignite.internal.client.proto.ClientMessagePacker;
 import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
+import org.apache.ignite.internal.client.proto.ClientOp;
 import org.apache.ignite.internal.client.proto.ProtocolVersion;
 import org.apache.ignite.internal.client.proto.ServerMessageType;
 import org.apache.ignite.lang.IgniteException;
@@ -60,6 +63,9 @@ class TcpClientChannel implements ClientChannel, ClientMessageHandler, ClientCon
             ProtocolVersion.V3_0_0
     );
 
+    /** Minimum supported heartbeat interval. */
+    private static final long MIN_RECOMMENDED_HEARTBEAT_INTERVAL = 500;
+
     /** Protocol context. */
     private volatile ProtocolContext protocolCtx;
 
@@ -81,6 +87,12 @@ class TcpClientChannel implements ClientChannel, ClientMessageHandler, ClientCon
     /** Connect timeout in milliseconds. */
     private final long connectTimeout;
 
+    /** Heartbeat timer. */
+    private final Timer heartbeatTimer;
+
+    /** Last send operation timestamp. */
+    private volatile long lastSendMillis;
+
     /**
      * Constructor.
      *
@@ -99,6 +111,10 @@ class TcpClientChannel implements ClientChannel, ClientMessageHandler, ClientCon
         sock = connMgr.open(cfg.getAddress(), this, this);
 
         handshake(DEFAULT_VERSION);
+
+        // Netty has a built-in IdleStateHandler to detect idle connections (used on the server side).
+        // However, to adjust the heartbeat interval dynamically, we have to use a timer here.
+        heartbeatTimer = initHeartbeat(cfg.clientConfiguration().heartbeatInterval());
     }
 
     /** {@inheritDoc} */
@@ -112,6 +128,8 @@ class TcpClientChannel implements ClientChannel, ClientMessageHandler, ClientCon
      */
     private void close(Exception cause) {
         if (closed.compareAndSet(false, true)) {
+            heartbeatTimer.cancel();
+
             sock.close();
 
             for (ClientRequestFuture pendingReq : pendingReqs.values()) {
@@ -339,10 +357,11 @@ class TcpClientChannel implements ClientChannel, ClientMessageHandler, ClientCon
      * Returns protocol context for a version.
      *
      * @param ver Protocol version.
+     * @param serverIdleTimeout Server idle timeout.
      * @return Protocol context for a version.
      */
-    private ProtocolContext protocolContextFromVersion(ProtocolVersion ver) {
-        return new ProtocolContext(ver, ProtocolBitmaskFeature.allFeaturesAsEnumSet());
+    private ProtocolContext protocolContextFromVersion(ProtocolVersion ver, long serverIdleTimeout) {
+        return new ProtocolContext(ver, ProtocolBitmaskFeature.allFeaturesAsEnumSet(), serverIdleTimeout);
     }
 
     /** Receive and handle handshake response. */
@@ -381,20 +400,86 @@ class TcpClientChannel implements ClientChannel, ClientMessageHandler, ClientCon
             var extensionsLen = unpacker.unpackMapHeader();
             unpacker.skipValues(extensionsLen);
 
-            protocolCtx = protocolContextFromVersion(srvVer);
+            var serverIdleTimeout = unpacker.unpackLong();
+
+            protocolCtx = protocolContextFromVersion(srvVer, serverIdleTimeout);
         }
     }
 
     /** Write bytes to the output stream. */
     private ChannelFuture write(ClientMessagePacker packer) throws IgniteClientConnectionException {
+        lastSendMillis = System.currentTimeMillis();
+
         var buf = packer.getBuffer();
 
         return sock.send(buf);
     }
 
     /**
+     * Initializes heartbeats.
+     *
+     * @param configuredInterval Configured heartbeat interval, in milliseconds.
+     * @return Heartbeat timer.
+     */
+    private Timer initHeartbeat(long configuredInterval) {
+        long heartbeatInterval = getHeartbeatInterval(configuredInterval);
+
+        Timer timer = new Timer("tcp-client-channel-heartbeats-" + hashCode());
+
+        timer.schedule(new HeartbeatTask(heartbeatInterval), heartbeatInterval, heartbeatInterval);
+
+        return timer;
+    }
+
+    /**
+     * Gets the heartbeat interval based on the configured value and served-side idle timeout.
+     *
+     * @param configuredInterval Configured interval.
+     * @return Resolved interval.
+     */
+    private long getHeartbeatInterval(long configuredInterval) {
+        long serverIdleTimeoutMs = protocolCtx.getServerIdleTimeout();
+
+        if (serverIdleTimeoutMs <= 0) {
+            return configuredInterval;
+        }
+
+        long recommendedHeartbeatInterval = serverIdleTimeoutMs / 3;
+
+        if (recommendedHeartbeatInterval < MIN_RECOMMENDED_HEARTBEAT_INTERVAL) {
+            recommendedHeartbeatInterval = MIN_RECOMMENDED_HEARTBEAT_INTERVAL;
+        }
+
+        return Math.min(configuredInterval, recommendedHeartbeatInterval);
+    }
+
+    /**
      * Client request future.
      */
     private static class ClientRequestFuture extends CompletableFuture<ClientMessageUnpacker> {
     }
+
+    /**
+     * Sends heartbeat messages.
+     */
+    private class HeartbeatTask extends TimerTask {
+        /** Heartbeat interval. */
+        private final long interval;
+
+        /** Constructor. */
+        public HeartbeatTask(long interval) {
+            this.interval = interval;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void run() {
+            try {
+                if (System.currentTimeMillis() - lastSendMillis > interval) {
+                    serviceAsync(ClientOp.HEARTBEAT, null, null);
+                }
+            } catch (Throwable ignored) {
+                // Ignore failed heartbeats.
+            }
+        }
+    }
 }
diff --git a/modules/client/src/test/java/org/apache/ignite/client/AbstractClientTest.java b/modules/client/src/test/java/org/apache/ignite/client/AbstractClientTest.java
index 673516c..d796c79 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/AbstractClientTest.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/AbstractClientTest.java
@@ -54,7 +54,7 @@ public abstract class AbstractClientTest {
 
         server = new FakeIgnite();
 
-        testServer = startServer(10800, 10, server);
+        testServer = startServer(10800, 10, 0, server);
 
         serverPort = getPort(testServer.module());
 
@@ -101,15 +101,17 @@ public abstract class AbstractClientTest {
      *
      * @param port Port.
      * @param portRange Port range.
+     * @param idleTimeout Idle timeout.
      * @param ignite Ignite.
      * @return Server.
      */
     public static TestServer startServer(
             int port,
             int portRange,
+            long idleTimeout,
             Ignite ignite
     ) {
-        return new TestServer(port, portRange, ignite);
+        return new TestServer(port, portRange, idleTimeout, ignite);
     }
 
     /**
diff --git a/modules/client/src/test/java/org/apache/ignite/client/HeartbeatTest.java b/modules/client/src/test/java/org/apache/ignite/client/HeartbeatTest.java
new file mode 100644
index 0000000..3079bb0
--- /dev/null
+++ b/modules/client/src/test/java/org/apache/ignite/client/HeartbeatTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.client;
+
+import static org.apache.ignite.client.AbstractClientTest.getPort;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import org.apache.ignite.client.IgniteClient.Builder;
+import org.apache.ignite.client.fakes.FakeIgnite;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests heartbeat and idle timeout behavior.
+ */
+public class HeartbeatTest {
+    @Test
+    public void testHeartbeatLongerThanIdleTimeoutCausesDisconnect() throws Exception {
+        try (var srv = new TestServer(10800, 10, 50, new FakeIgnite())) {
+            int srvPort = getPort(srv.module());
+
+            Builder builder = IgniteClient.builder()
+                    .addresses("127.0.0.1:" + srvPort)
+                    .retryLimit(0);
+
+            try (var client = builder.build()) {
+                Thread.sleep(300);
+
+                assertThrows(IgniteClientConnectionException.class, () -> client.tables().tables());
+            }
+        }
+    }
+
+    @Test
+    public void testHeartbeatShorterThanIdleTimeoutKeepsConnectionAlive() throws Exception {
+        try (var srv = new TestServer(10800, 10, 300, new FakeIgnite())) {
+            int srvPort = getPort(srv.module());
+
+            Builder builder = IgniteClient.builder()
+                    .addresses("127.0.0.1:" + srvPort)
+                    .heartbeatInterval(50)
+                    .retryLimit(0);
+
+            try (var client = builder.build()) {
+                Thread.sleep(900);
+
+                assertEquals(0, client.tables().tables().size());
+            }
+        }
+    }
+}
diff --git a/modules/client/src/test/java/org/apache/ignite/client/ReconnectTest.java b/modules/client/src/test/java/org/apache/ignite/client/ReconnectTest.java
index 55ffcee..581cd8f 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/ReconnectTest.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/ReconnectTest.java
@@ -48,6 +48,7 @@ public class ReconnectTest {
         server = AbstractClientTest.startServer(
                 10900,
                 10,
+                0,
                 ignite1);
 
         var client = IgniteClient.builder()
@@ -65,6 +66,7 @@ public class ReconnectTest {
         server2 = AbstractClientTest.startServer(
                 10950,
                 10,
+                0,
                 ignite2);
 
         assertEquals("t2", client.tables().tables().get(0).name());
@@ -78,6 +80,7 @@ public class ReconnectTest {
         server = AbstractClientTest.startServer(
                 10900,
                 10,
+                0,
                 ignite1);
 
         var client = IgniteClient.builder()
diff --git a/modules/client/src/test/java/org/apache/ignite/client/TestServer.java b/modules/client/src/test/java/org/apache/ignite/client/TestServer.java
index 76f8d63..5f5560a 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/TestServer.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/TestServer.java
@@ -45,11 +45,13 @@ public class TestServer implements AutoCloseable {
      *
      * @param port Port.
      * @param portRange Port range.
+     * @param idleTimeout Idle timeout.
      * @param ignite Ignite.
      */
     public TestServer(
             int port,
             int portRange,
+            long idleTimeout,
             Ignite ignite
     ) {
         cfg = new ConfigurationRegistry(
@@ -63,7 +65,7 @@ public class TestServer implements AutoCloseable {
         cfg.start();
 
         cfg.getConfiguration(ClientConnectorConfiguration.KEY).change(
-                local -> local.changePort(port).changePortRange(portRange)
+                local -> local.changePort(port).changePortRange(portRange).changeIdleTimeout(idleTimeout)
         ).join();
 
         bootstrapFactory = new NettyBootstrapFactory(cfg.getConfiguration(NetworkConfiguration.KEY), "TestServer-");
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs b/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs
index 58ef00d..8df6aa6 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs
@@ -110,8 +110,8 @@ namespace Apache.Ignite.Tests
 
                 // Write handshake response.
                 handler.Send(ProtoCommon.MagicBytes);
-                handler.Send(new byte[] { 0, 0, 0, 7 }); // Size.
-                handler.Send(new byte[] { 3, 0, 0, 0, 196, 0, 128 });
+                handler.Send(new byte[] { 0, 0, 0, 8 }); // Size.
+                handler.Send(new byte[] { 3, 0, 0, 0, 196, 0, 128, 0 });
 
                 while (!_cts.IsCancellationRequested)
                 {
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/RawSocketConnectionTests.cs b/modules/platforms/dotnet/Apache.Ignite.Tests/RawSocketConnectionTests.cs
index bd8fa17..d311b2c 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/RawSocketConnectionTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/RawSocketConnectionTests.cs
@@ -83,7 +83,7 @@ namespace Apache.Ignite.Tests
 
             var str = Encoding.UTF8.GetString(msg);
 
-            Assert.AreEqual(7, msgSize, str);
+            Assert.AreEqual(8, msgSize, str);
 
             // Protocol version.
             Assert.AreEqual(3, msg[0]);
@@ -108,6 +108,7 @@ namespace Apache.Ignite.Tests
 
             writer.WriteBinHeader(0); // Features.
             writer.WriteMapHeader(0); // Extensions.
+            writer.Write(0); // Idle timeout.
 
             writer.Flush();
 
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/ClientSocket.cs b/modules/platforms/dotnet/Apache.Ignite/Internal/ClientSocket.cs
index d362fe0..4903080 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/ClientSocket.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/ClientSocket.cs
@@ -252,6 +252,7 @@ namespace Apache.Ignite.Internal
 
             reader.Skip(); // Features.
             reader.Skip(); // Extensions.
+            reader.Skip(); // Idle timeout.
         }
 
         private static IgniteClientException? ReadError(ref MessagePackReader reader)