You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by pt...@apache.org on 2022/03/21 14:06:12 UTC
[ignite-3] branch main updated: IGNITE-16532 Java thin: add heartbeats (#737)
This is an automated email from the ASF dual-hosted git repository.
ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new a9b9f0d IGNITE-16532 Java thin: add heartbeats (#737)
a9b9f0d is described below
commit a9b9f0d22d0cc61e8f180134b0d45a6b51bd9378
Author: Pavel Tupitsyn <pt...@apache.org>
AuthorDate: Mon Mar 21 17:06:08 2022 +0300
IGNITE-16532 Java thin: add heartbeats (#737)
Implement heartbeats in Java thin client: https://cwiki.apache.org/confluence/display/IGNITE/IEP-83+Thin+Client+Keepalive
* Add `IgniteClientConfiguration.heartbeatInterval`, default 30 seconds.
* Add `ClientConnectorConfiguration.idleTimeout`, default 0 (disabled).
* Add `idleTimeout` to handshake.
* Set effective heartbeat interval to `Math.min(heartbeatInterval, idleTimeout / 3)`.
---
.../ClientConnectorConfigurationSchema.java | 5 ++
.../ignite/internal/client/proto/ClientOp.java | 4 +-
.../ignite/client/handler/ItClientHandlerTest.java | 3 +-
.../ignite/client/handler/ClientHandlerModule.java | 27 ++++++-
.../handler/ClientInboundMessageHandler.java | 16 +++-
.../org/apache/ignite/client/IgniteClient.java | 26 ++++++-
.../ignite/client/IgniteClientConfiguration.java | 18 ++++-
.../client/IgniteClientConfigurationImpl.java | 14 +++-
.../ignite/internal/client/ProtocolContext.java | 18 ++++-
.../ignite/internal/client/TcpClientChannel.java | 91 +++++++++++++++++++++-
.../apache/ignite/client/AbstractClientTest.java | 6 +-
.../org/apache/ignite/client/HeartbeatTest.java | 66 ++++++++++++++++
.../org/apache/ignite/client/ReconnectTest.java | 3 +
.../java/org/apache/ignite/client/TestServer.java | 4 +-
.../dotnet/Apache.Ignite.Tests/FakeServer.cs | 4 +-
.../RawSocketConnectionTests.cs | 3 +-
.../dotnet/Apache.Ignite/Internal/ClientSocket.cs | 1 +
17 files changed, 289 insertions(+), 20 deletions(-)
diff --git a/modules/api/src/main/java/org/apache/ignite/configuration/schemas/clientconnector/ClientConnectorConfigurationSchema.java b/modules/api/src/main/java/org/apache/ignite/configuration/schemas/clientconnector/ClientConnectorConfigurationSchema.java
index 4d26120..23fdd1d 100644
--- a/modules/api/src/main/java/org/apache/ignite/configuration/schemas/clientconnector/ClientConnectorConfigurationSchema.java
+++ b/modules/api/src/main/java/org/apache/ignite/configuration/schemas/clientconnector/ClientConnectorConfigurationSchema.java
@@ -44,4 +44,9 @@ public class ClientConnectorConfigurationSchema {
@Min(0)
@Value(hasDefault = true)
public final int connectTimeout = 5000;
+
+ /** Idle timeout. */
+ @Min(0)
+ @Value(hasDefault = true)
+ public final long idleTimeout = 0;
}
diff --git a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientOp.java b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientOp.java
index 0606e14..8916f00 100644
--- a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientOp.java
+++ b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientOp.java
@@ -21,8 +21,8 @@ package org.apache.ignite.internal.client.proto;
* Client operation codes.
*/
public class ClientOp {
- /** Create table. */
- public static final int TABLE_CREATE = 1;
+ /** Heartbeat. */
+ public static final int HEARTBEAT = 1;
/** Drop table. */
public static final int TABLE_DROP = 2;
diff --git a/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/ItClientHandlerTest.java b/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/ItClientHandlerTest.java
index 93fe8db..176a2d9 100644
--- a/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/ItClientHandlerTest.java
+++ b/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/ItClientHandlerTest.java
@@ -106,6 +106,7 @@ public class ItClientHandlerTest {
packer.packBinaryHeader(0); // Features.
packer.packMapHeader(0); // Extensions.
+ packer.packInt(0); // Idle timeout.
out.write(packer.toByteArray());
out.flush();
@@ -127,7 +128,7 @@ public class ItClientHandlerTest {
unpacker.skipValue(extensionsLen);
assertArrayEquals(MAGIC, magic);
- assertEquals(7, len);
+ assertEquals(8, len);
assertEquals(3, major);
assertEquals(0, minor);
assertEquals(0, patch);
diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java
index 41c2c9e..25cc93b 100644
--- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java
+++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java
@@ -19,11 +19,17 @@ package org.apache.ignite.client.handler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
+import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
+import io.netty.handler.timeout.IdleState;
+import io.netty.handler.timeout.IdleStateEvent;
+import io.netty.handler.timeout.IdleStateHandler;
import java.net.BindException;
import java.net.SocketAddress;
+import java.util.concurrent.TimeUnit;
import org.apache.ignite.configuration.schemas.clientconnector.ClientConnectorConfiguration;
import org.apache.ignite.internal.client.proto.ClientMessageDecoder;
import org.apache.ignite.internal.configuration.ConfigurationRegistry;
@@ -143,9 +149,17 @@ public class ClientHandlerModule implements IgniteComponent {
bootstrap.childHandler(new ChannelInitializer<>() {
@Override
protected void initChannel(Channel ch) {
+ if (configuration.idleTimeout() > 0) {
+ IdleStateHandler idleStateHandler = new IdleStateHandler(
+ configuration.idleTimeout(), 0, 0, TimeUnit.MILLISECONDS);
+
+ ch.pipeline().addLast(idleStateHandler);
+ ch.pipeline().addLast(new IdleChannelHandler());
+ }
+
ch.pipeline().addLast(
new ClientMessageDecoder(),
- new ClientInboundMessageHandler(igniteTables, igniteTransactions, queryProcessor));
+ new ClientInboundMessageHandler(igniteTables, igniteTransactions, queryProcessor, configuration));
}
})
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, configuration.connectTimeout());
@@ -176,4 +190,15 @@ public class ClientHandlerModule implements IgniteComponent {
return ch.closeFuture();
}
+
+ /** Idle channel state handler. */
+ private static class IdleChannelHandler extends ChannelDuplexHandler {
+ /** {@inheritDoc} */
+ @Override
+ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
+ if (evt instanceof IdleStateEvent && ((IdleStateEvent) evt).state() == IdleState.READER_IDLE) {
+ ctx.close();
+ }
+ }
+ }
}
diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
index 7ea6093..c625ee4 100644
--- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
+++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
@@ -59,6 +59,7 @@ import org.apache.ignite.client.handler.requests.tx.ClientTransactionBeginReques
import org.apache.ignite.client.handler.requests.tx.ClientTransactionCommitRequest;
import org.apache.ignite.client.handler.requests.tx.ClientTransactionRollbackRequest;
import org.apache.ignite.client.proto.query.JdbcQueryEventHandler;
+import org.apache.ignite.configuration.schemas.clientconnector.ClientConnectorView;
import org.apache.ignite.internal.client.proto.ClientErrorCode;
import org.apache.ignite.internal.client.proto.ClientMessageCommon;
import org.apache.ignite.internal.client.proto.ClientMessagePacker;
@@ -92,26 +93,33 @@ public class ClientInboundMessageHandler extends ChannelInboundHandlerAdapter {
/** Connection resources. */
private final ClientResourceRegistry resources = new ClientResourceRegistry();
+ /** Configuration. */
+ private final ClientConnectorView configuration;
+
/** Context. */
private ClientContext clientContext;
/**
* Constructor.
*
- * @param igniteTables Ignite tables API entry point.
+ * @param igniteTables Ignite tables API entry point.
* @param igniteTransactions Transactions API.
* @param processor Sql query processor.
+ * @param configuration Configuration.
*/
public ClientInboundMessageHandler(
IgniteTables igniteTables,
IgniteTransactions igniteTransactions,
- QueryProcessor processor) {
+ QueryProcessor processor,
+ ClientConnectorView configuration) {
assert igniteTables != null;
assert igniteTransactions != null;
assert processor != null;
+ assert configuration != null;
this.igniteTables = igniteTables;
this.igniteTransactions = igniteTransactions;
+ this.configuration = configuration;
this.jdbcQueryEventHandler = new JdbcQueryEventHandlerImpl(processor, new JdbcMetadataCatalog(igniteTables));
}
@@ -167,6 +175,7 @@ public class ClientInboundMessageHandler extends ChannelInboundHandlerAdapter {
packer.packInt(ClientErrorCode.SUCCESS);
packer.packBinaryHeader(0); // Features.
packer.packMapHeader(0); // Extensions.
+ packer.packLong(configuration.idleTimeout());
write(packer, ctx);
} catch (Throwable t) {
@@ -280,6 +289,9 @@ public class ClientInboundMessageHandler extends ChannelInboundHandlerAdapter {
int opCode
) {
switch (opCode) {
+ case ClientOp.HEARTBEAT:
+ return null;
+
case ClientOp.TABLE_DROP:
return ClientTableDropRequest.process(in, igniteTables);
diff --git a/modules/client/src/main/java/org/apache/ignite/client/IgniteClient.java b/modules/client/src/main/java/org/apache/ignite/client/IgniteClient.java
index 8096696..604b8bb 100644
--- a/modules/client/src/main/java/org/apache/ignite/client/IgniteClient.java
+++ b/modules/client/src/main/java/org/apache/ignite/client/IgniteClient.java
@@ -18,6 +18,7 @@
package org.apache.ignite.client;
import static org.apache.ignite.client.IgniteClientConfiguration.DFLT_CONNECT_TIMEOUT;
+import static org.apache.ignite.client.IgniteClientConfiguration.DFLT_HEARTBEAT_INTERVAL;
import static org.apache.ignite.client.IgniteClientConfiguration.DFLT_RECONNECT_THROTTLING_PERIOD;
import static org.apache.ignite.client.IgniteClientConfiguration.DFLT_RECONNECT_THROTTLING_RETRIES;
import static org.apache.ignite.client.IgniteClientConfiguration.DFLT_RETRY_LIMIT;
@@ -53,6 +54,7 @@ public interface IgniteClient extends Ignite {
}
/** Client builder. */
+ @SuppressWarnings("PublicInnerClass")
class Builder {
/** Addresses. */
private String[] addresses;
@@ -75,6 +77,9 @@ public interface IgniteClient extends Ignite {
/** Async continuation executor. */
private Executor asyncContinuationExecutor;
+ /** Heartbeat interval. */
+ private long heartbeatInterval = DFLT_HEARTBEAT_INTERVAL;
+
/**
* Sets the addresses of Ignite server nodes within a cluster. An address can be an IP address or a hostname, with or without port.
* If port is not set then Ignite will generate multiple addresses for default port range. See {@link
@@ -193,6 +198,24 @@ public interface IgniteClient extends Ignite {
}
/**
+ * Sets the heartbeat message interval, in milliseconds. Default is <code>30_000</code>.
+ *
+ * <p>When server-side idle timeout is not zero, effective heartbeat
+ * interval is set to <code>min(heartbeatInterval, idleTimeout / 3)</code>.
+ *
+ * <p>When thin client connection is idle (no operations are performed), heartbeat messages are sent periodically
+ * to keep the connection alive and detect potential half-open state.
+ *
+ * @param heartbeatInterval Heartbeat interval.
+ * @return This instance.
+ */
+ public Builder heartbeatInterval(long heartbeatInterval) {
+ this.heartbeatInterval = heartbeatInterval;
+
+ return this;
+ }
+
+ /**
* Builds the client.
*
* @return Ignite client.
@@ -214,7 +237,8 @@ public interface IgniteClient extends Ignite {
connectTimeout,
reconnectThrottlingPeriod,
reconnectThrottlingRetries,
- asyncContinuationExecutor);
+ asyncContinuationExecutor,
+ heartbeatInterval);
return TcpIgniteClient.startAsync(cfg);
}
diff --git a/modules/client/src/main/java/org/apache/ignite/client/IgniteClientConfiguration.java b/modules/client/src/main/java/org/apache/ignite/client/IgniteClientConfiguration.java
index dfde409..4697c66 100644
--- a/modules/client/src/main/java/org/apache/ignite/client/IgniteClientConfiguration.java
+++ b/modules/client/src/main/java/org/apache/ignite/client/IgniteClientConfiguration.java
@@ -32,9 +32,12 @@ public interface IgniteClientConfiguration {
/** Default port range. */
int DFLT_PORT_RANGE = 100;
- /** Default socket connect timeout. */
+ /** Default socket connect timeout, in milliseconds. */
int DFLT_CONNECT_TIMEOUT = 5000;
+ /** Default heartbeat interval, in milliseconds. */
+ int DFLT_HEARTBEAT_INTERVAL = 30_000;
+
/** Default operation retry limit. */
int DFLT_RETRY_LIMIT = 5;
@@ -102,4 +105,17 @@ public interface IgniteClientConfiguration {
* @return Executor for async continuations.
*/
@Nullable Executor asyncContinuationExecutor();
+
+ /**
+ * Gets the heartbeat message interval, in milliseconds. Default is <code>30_000</code>.
+ *
+ * <p>When server-side idle timeout is not zero, effective heartbeat
+ * interval is set to <code>min(heartbeatInterval, idleTimeout / 3)</code>.
+ *
+ * <p>When thin client connection is idle (no operations are performed), heartbeat messages are sent periodically
+ * to keep the connection alive and detect potential half-open state.
+ *
+ * @return Heartbeat interval.
+ */
+ public long heartbeatInterval();
}
diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/IgniteClientConfigurationImpl.java b/modules/client/src/main/java/org/apache/ignite/internal/client/IgniteClientConfigurationImpl.java
index f084dfc..40f0919 100644
--- a/modules/client/src/main/java/org/apache/ignite/internal/client/IgniteClientConfigurationImpl.java
+++ b/modules/client/src/main/java/org/apache/ignite/internal/client/IgniteClientConfigurationImpl.java
@@ -47,6 +47,9 @@ public final class IgniteClientConfigurationImpl implements IgniteClientConfigur
/** Async continuation executor. */
private final Executor asyncContinuationExecutor;
+ /** Heartbeat interval. */
+ private final long heartbeatInterval;
+
/**
* Constructor.
*
@@ -55,6 +58,7 @@ public final class IgniteClientConfigurationImpl implements IgniteClientConfigur
* @param retryLimit Retry limit.
* @param connectTimeout Socket connect timeout.
* @param asyncContinuationExecutor Async continuation executor.
+ * @param heartbeatInterval Heartbeat message interval.
*/
public IgniteClientConfigurationImpl(
IgniteClientAddressFinder addressFinder,
@@ -63,7 +67,8 @@ public final class IgniteClientConfigurationImpl implements IgniteClientConfigur
long connectTimeout,
long reconnectThrottlingPeriod,
int reconnectThrottlingRetries,
- Executor asyncContinuationExecutor) {
+ Executor asyncContinuationExecutor,
+ long heartbeatInterval) {
this.addressFinder = addressFinder;
//noinspection AssignmentOrReturnOfFieldWithMutableType (cloned in Builder).
@@ -74,6 +79,7 @@ public final class IgniteClientConfigurationImpl implements IgniteClientConfigur
this.reconnectThrottlingPeriod = reconnectThrottlingPeriod;
this.reconnectThrottlingRetries = reconnectThrottlingRetries;
this.asyncContinuationExecutor = asyncContinuationExecutor;
+ this.heartbeatInterval = heartbeatInterval;
}
/** {@inheritDoc} */
@@ -117,4 +123,10 @@ public final class IgniteClientConfigurationImpl implements IgniteClientConfigur
public @Nullable Executor asyncContinuationExecutor() {
return asyncContinuationExecutor;
}
+
+ /** {@inheritDoc} */
+ @Override
+ public long heartbeatInterval() {
+ return heartbeatInterval;
+ }
}
diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/ProtocolContext.java b/modules/client/src/main/java/org/apache/ignite/internal/client/ProtocolContext.java
index 59cde78..70a1418 100644
--- a/modules/client/src/main/java/org/apache/ignite/internal/client/ProtocolContext.java
+++ b/modules/client/src/main/java/org/apache/ignite/internal/client/ProtocolContext.java
@@ -31,15 +31,20 @@ public class ProtocolContext {
/** Features. */
private final EnumSet<ProtocolBitmaskFeature> features;
+ /** Server idle timeout. */
+ private final long serverIdleTimeout;
+
/**
* Constructor.
*
- * @param ver Protocol version.
+ * @param ver Protocol version.
* @param features Supported features.
+ * @param serverIdleTimeout Server idle timeout.
*/
- public ProtocolContext(ProtocolVersion ver, EnumSet<ProtocolBitmaskFeature> features) {
+ public ProtocolContext(ProtocolVersion ver, EnumSet<ProtocolBitmaskFeature> features, long serverIdleTimeout) {
this.ver = ver;
this.features = features != null ? features : EnumSet.noneOf(ProtocolBitmaskFeature.class);
+ this.serverIdleTimeout = serverIdleTimeout;
}
/**
@@ -81,4 +86,13 @@ public class ProtocolContext {
public ProtocolVersion version() {
return ver;
}
+
+ /**
+ * Returns server idle timeout.
+ *
+ * @return Server idle timeout.
+ */
+ public long getServerIdleTimeout() {
+ return serverIdleTimeout;
+ }
}
diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
index 81041f3..6ab3943 100644
--- a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
+++ b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
@@ -24,6 +24,8 @@ import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
@@ -43,6 +45,7 @@ import org.apache.ignite.internal.client.proto.ClientErrorCode;
import org.apache.ignite.internal.client.proto.ClientMessageCommon;
import org.apache.ignite.internal.client.proto.ClientMessagePacker;
import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
+import org.apache.ignite.internal.client.proto.ClientOp;
import org.apache.ignite.internal.client.proto.ProtocolVersion;
import org.apache.ignite.internal.client.proto.ServerMessageType;
import org.apache.ignite.lang.IgniteException;
@@ -60,6 +63,9 @@ class TcpClientChannel implements ClientChannel, ClientMessageHandler, ClientCon
ProtocolVersion.V3_0_0
);
+ /** Minimum supported heartbeat interval. */
+ private static final long MIN_RECOMMENDED_HEARTBEAT_INTERVAL = 500;
+
/** Protocol context. */
private volatile ProtocolContext protocolCtx;
@@ -81,6 +87,12 @@ class TcpClientChannel implements ClientChannel, ClientMessageHandler, ClientCon
/** Connect timeout in milliseconds. */
private final long connectTimeout;
+ /** Heartbeat timer. */
+ private final Timer heartbeatTimer;
+
+ /** Last send operation timestamp. */
+ private volatile long lastSendMillis;
+
/**
* Constructor.
*
@@ -99,6 +111,10 @@ class TcpClientChannel implements ClientChannel, ClientMessageHandler, ClientCon
sock = connMgr.open(cfg.getAddress(), this, this);
handshake(DEFAULT_VERSION);
+
+ // Netty has a built-in IdleStateHandler to detect idle connections (used on the server side).
+ // However, to adjust the heartbeat interval dynamically, we have to use a timer here.
+ heartbeatTimer = initHeartbeat(cfg.clientConfiguration().heartbeatInterval());
}
/** {@inheritDoc} */
@@ -112,6 +128,8 @@ class TcpClientChannel implements ClientChannel, ClientMessageHandler, ClientCon
*/
private void close(Exception cause) {
if (closed.compareAndSet(false, true)) {
+ heartbeatTimer.cancel();
+
sock.close();
for (ClientRequestFuture pendingReq : pendingReqs.values()) {
@@ -339,10 +357,11 @@ class TcpClientChannel implements ClientChannel, ClientMessageHandler, ClientCon
* Returns protocol context for a version.
*
* @param ver Protocol version.
+ * @param serverIdleTimeout Server idle timeout.
* @return Protocol context for a version.
*/
- private ProtocolContext protocolContextFromVersion(ProtocolVersion ver) {
- return new ProtocolContext(ver, ProtocolBitmaskFeature.allFeaturesAsEnumSet());
+ private ProtocolContext protocolContextFromVersion(ProtocolVersion ver, long serverIdleTimeout) {
+ return new ProtocolContext(ver, ProtocolBitmaskFeature.allFeaturesAsEnumSet(), serverIdleTimeout);
}
/** Receive and handle handshake response. */
@@ -381,20 +400,86 @@ class TcpClientChannel implements ClientChannel, ClientMessageHandler, ClientCon
var extensionsLen = unpacker.unpackMapHeader();
unpacker.skipValues(extensionsLen);
- protocolCtx = protocolContextFromVersion(srvVer);
+ var serverIdleTimeout = unpacker.unpackLong();
+
+ protocolCtx = protocolContextFromVersion(srvVer, serverIdleTimeout);
}
}
/** Write bytes to the output stream. */
private ChannelFuture write(ClientMessagePacker packer) throws IgniteClientConnectionException {
+ lastSendMillis = System.currentTimeMillis();
+
var buf = packer.getBuffer();
return sock.send(buf);
}
/**
+ * Initializes heartbeats.
+ *
+ * @param configuredInterval Configured heartbeat interval, in milliseconds.
+ * @return Heartbeat timer.
+ */
+ private Timer initHeartbeat(long configuredInterval) {
+ long heartbeatInterval = getHeartbeatInterval(configuredInterval);
+
+ Timer timer = new Timer("tcp-client-channel-heartbeats-" + hashCode());
+
+ timer.schedule(new HeartbeatTask(heartbeatInterval), heartbeatInterval, heartbeatInterval);
+
+ return timer;
+ }
+
+ /**
+ * Gets the heartbeat interval based on the configured value and served-side idle timeout.
+ *
+ * @param configuredInterval Configured interval.
+ * @return Resolved interval.
+ */
+ private long getHeartbeatInterval(long configuredInterval) {
+ long serverIdleTimeoutMs = protocolCtx.getServerIdleTimeout();
+
+ if (serverIdleTimeoutMs <= 0) {
+ return configuredInterval;
+ }
+
+ long recommendedHeartbeatInterval = serverIdleTimeoutMs / 3;
+
+ if (recommendedHeartbeatInterval < MIN_RECOMMENDED_HEARTBEAT_INTERVAL) {
+ recommendedHeartbeatInterval = MIN_RECOMMENDED_HEARTBEAT_INTERVAL;
+ }
+
+ return Math.min(configuredInterval, recommendedHeartbeatInterval);
+ }
+
+ /**
* Client request future.
*/
private static class ClientRequestFuture extends CompletableFuture<ClientMessageUnpacker> {
}
+
+ /**
+ * Sends heartbeat messages.
+ */
+ private class HeartbeatTask extends TimerTask {
+ /** Heartbeat interval. */
+ private final long interval;
+
+ /** Constructor. */
+ public HeartbeatTask(long interval) {
+ this.interval = interval;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void run() {
+ try {
+ if (System.currentTimeMillis() - lastSendMillis > interval) {
+ serviceAsync(ClientOp.HEARTBEAT, null, null);
+ }
+ } catch (Throwable ignored) {
+ // Ignore failed heartbeats.
+ }
+ }
+ }
}
diff --git a/modules/client/src/test/java/org/apache/ignite/client/AbstractClientTest.java b/modules/client/src/test/java/org/apache/ignite/client/AbstractClientTest.java
index 673516c..d796c79 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/AbstractClientTest.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/AbstractClientTest.java
@@ -54,7 +54,7 @@ public abstract class AbstractClientTest {
server = new FakeIgnite();
- testServer = startServer(10800, 10, server);
+ testServer = startServer(10800, 10, 0, server);
serverPort = getPort(testServer.module());
@@ -101,15 +101,17 @@ public abstract class AbstractClientTest {
*
* @param port Port.
* @param portRange Port range.
+ * @param idleTimeout Idle timeout.
* @param ignite Ignite.
* @return Server.
*/
public static TestServer startServer(
int port,
int portRange,
+ long idleTimeout,
Ignite ignite
) {
- return new TestServer(port, portRange, ignite);
+ return new TestServer(port, portRange, idleTimeout, ignite);
}
/**
diff --git a/modules/client/src/test/java/org/apache/ignite/client/HeartbeatTest.java b/modules/client/src/test/java/org/apache/ignite/client/HeartbeatTest.java
new file mode 100644
index 0000000..3079bb0
--- /dev/null
+++ b/modules/client/src/test/java/org/apache/ignite/client/HeartbeatTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.client;
+
+import static org.apache.ignite.client.AbstractClientTest.getPort;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import org.apache.ignite.client.IgniteClient.Builder;
+import org.apache.ignite.client.fakes.FakeIgnite;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests heartbeat and idle timeout behavior.
+ */
+public class HeartbeatTest {
+ @Test
+ public void testHeartbeatLongerThanIdleTimeoutCausesDisconnect() throws Exception {
+ try (var srv = new TestServer(10800, 10, 50, new FakeIgnite())) {
+ int srvPort = getPort(srv.module());
+
+ Builder builder = IgniteClient.builder()
+ .addresses("127.0.0.1:" + srvPort)
+ .retryLimit(0);
+
+ try (var client = builder.build()) {
+ Thread.sleep(300);
+
+ assertThrows(IgniteClientConnectionException.class, () -> client.tables().tables());
+ }
+ }
+ }
+
+ @Test
+ public void testHeartbeatShorterThanIdleTimeoutKeepsConnectionAlive() throws Exception {
+ try (var srv = new TestServer(10800, 10, 300, new FakeIgnite())) {
+ int srvPort = getPort(srv.module());
+
+ Builder builder = IgniteClient.builder()
+ .addresses("127.0.0.1:" + srvPort)
+ .heartbeatInterval(50)
+ .retryLimit(0);
+
+ try (var client = builder.build()) {
+ Thread.sleep(900);
+
+ assertEquals(0, client.tables().tables().size());
+ }
+ }
+ }
+}
diff --git a/modules/client/src/test/java/org/apache/ignite/client/ReconnectTest.java b/modules/client/src/test/java/org/apache/ignite/client/ReconnectTest.java
index 55ffcee..581cd8f 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/ReconnectTest.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/ReconnectTest.java
@@ -48,6 +48,7 @@ public class ReconnectTest {
server = AbstractClientTest.startServer(
10900,
10,
+ 0,
ignite1);
var client = IgniteClient.builder()
@@ -65,6 +66,7 @@ public class ReconnectTest {
server2 = AbstractClientTest.startServer(
10950,
10,
+ 0,
ignite2);
assertEquals("t2", client.tables().tables().get(0).name());
@@ -78,6 +80,7 @@ public class ReconnectTest {
server = AbstractClientTest.startServer(
10900,
10,
+ 0,
ignite1);
var client = IgniteClient.builder()
diff --git a/modules/client/src/test/java/org/apache/ignite/client/TestServer.java b/modules/client/src/test/java/org/apache/ignite/client/TestServer.java
index 76f8d63..5f5560a 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/TestServer.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/TestServer.java
@@ -45,11 +45,13 @@ public class TestServer implements AutoCloseable {
*
* @param port Port.
* @param portRange Port range.
+ * @param idleTimeout Idle timeout.
* @param ignite Ignite.
*/
public TestServer(
int port,
int portRange,
+ long idleTimeout,
Ignite ignite
) {
cfg = new ConfigurationRegistry(
@@ -63,7 +65,7 @@ public class TestServer implements AutoCloseable {
cfg.start();
cfg.getConfiguration(ClientConnectorConfiguration.KEY).change(
- local -> local.changePort(port).changePortRange(portRange)
+ local -> local.changePort(port).changePortRange(portRange).changeIdleTimeout(idleTimeout)
).join();
bootstrapFactory = new NettyBootstrapFactory(cfg.getConfiguration(NetworkConfiguration.KEY), "TestServer-");
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs b/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs
index 58ef00d..8df6aa6 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs
@@ -110,8 +110,8 @@ namespace Apache.Ignite.Tests
// Write handshake response.
handler.Send(ProtoCommon.MagicBytes);
- handler.Send(new byte[] { 0, 0, 0, 7 }); // Size.
- handler.Send(new byte[] { 3, 0, 0, 0, 196, 0, 128 });
+ handler.Send(new byte[] { 0, 0, 0, 8 }); // Size.
+ handler.Send(new byte[] { 3, 0, 0, 0, 196, 0, 128, 0 });
while (!_cts.IsCancellationRequested)
{
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/RawSocketConnectionTests.cs b/modules/platforms/dotnet/Apache.Ignite.Tests/RawSocketConnectionTests.cs
index bd8fa17..d311b2c 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/RawSocketConnectionTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/RawSocketConnectionTests.cs
@@ -83,7 +83,7 @@ namespace Apache.Ignite.Tests
var str = Encoding.UTF8.GetString(msg);
- Assert.AreEqual(7, msgSize, str);
+ Assert.AreEqual(8, msgSize, str);
// Protocol version.
Assert.AreEqual(3, msg[0]);
@@ -108,6 +108,7 @@ namespace Apache.Ignite.Tests
writer.WriteBinHeader(0); // Features.
writer.WriteMapHeader(0); // Extensions.
+ writer.Write(0); // Idle timeout.
writer.Flush();
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/ClientSocket.cs b/modules/platforms/dotnet/Apache.Ignite/Internal/ClientSocket.cs
index d362fe0..4903080 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/ClientSocket.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/ClientSocket.cs
@@ -252,6 +252,7 @@ namespace Apache.Ignite.Internal
reader.Skip(); // Features.
reader.Skip(); // Extensions.
+ reader.Skip(); // Idle timeout.
}
private static IgniteClientException? ReadError(ref MessagePackReader reader)