You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by pt...@apache.org on 2022/12/14 18:36:06 UTC
[ignite-3] branch main updated: IGNITE-18071 Add client-side heartbeat timeout (#1448)
This is an automated email from the ASF dual-hosted git repository.
ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new c7ea5c8bd5 IGNITE-18071 Add client-side heartbeat timeout (#1448)
c7ea5c8bd5 is described below
commit c7ea5c8bd5811316e5facb54cd8d72f501681137
Author: Pavel Tupitsyn <pt...@apache.org>
AuthorDate: Wed Dec 14 21:36:00 2022 +0300
IGNITE-18071 Add client-side heartbeat timeout (#1448)
* Add `IgniteClientConfiguration.heartbeatTimeout`.
* Close connection when there is no response from server to heartbeat message within the specified timeout.
* Minor fixes - add nullable annotations, improve logger handling.
* Add test for `connectTimeout`.
* Fix Netty buffer leak in case of connect timeout.
---
.idea/inspectionProfiles/Project_Default.xml | 2 +-
.../org/apache/ignite/client/IgniteClient.java | 27 ++++++++++++--
.../ignite/client/IgniteClientConfiguration.java | 18 +++++++--
.../apache/ignite/internal/client/ClientUtils.java | 20 ++++++++++
.../client/IgniteClientConfigurationImpl.java | 16 +++++++-
.../ignite/internal/client/ReliableChannel.java | 4 +-
.../ignite/internal/client/TcpClientChannel.java | 43 +++++++++++++++++++---
.../ignite/internal/client/TcpIgniteClient.java | 10 +----
.../apache/ignite/client/AbstractClientTest.java | 2 +-
.../apache/ignite/client/ClientComputeTest.java | 6 +--
.../org/apache/ignite/client/ConnectionTest.java | 23 +++++++++++-
.../org/apache/ignite/client/HeartbeatTest.java | 28 ++++++++++++++
.../org/apache/ignite/client/MultiClusterTest.java | 6 +--
.../org/apache/ignite/client/RetryPolicyTest.java | 4 +-
.../ignite/client/TestClientHandlerModule.java | 35 ++++++++++++++++++
.../java/org/apache/ignite/client/TestServer.java | 15 ++++++--
16 files changed, 220 insertions(+), 39 deletions(-)
diff --git a/.idea/inspectionProfiles/Project_Default.xml b/.idea/inspectionProfiles/Project_Default.xml
index a77fb48328..2b5ac76e01 100644
--- a/.idea/inspectionProfiles/Project_Default.xml
+++ b/.idea/inspectionProfiles/Project_Default.xml
@@ -1185,4 +1185,4 @@
<option name="ADD_NONJAVA_TO_ENTRIES" value="true" />
</inspection_tool>
</profile>
-</component>
\ No newline at end of file
+</component>
diff --git a/modules/client/src/main/java/org/apache/ignite/client/IgniteClient.java b/modules/client/src/main/java/org/apache/ignite/client/IgniteClient.java
index 9f5b5c123d..5580f2f331 100644
--- a/modules/client/src/main/java/org/apache/ignite/client/IgniteClient.java
+++ b/modules/client/src/main/java/org/apache/ignite/client/IgniteClient.java
@@ -19,6 +19,7 @@ package org.apache.ignite.client;
import static org.apache.ignite.client.IgniteClientConfiguration.DFLT_CONNECT_TIMEOUT;
import static org.apache.ignite.client.IgniteClientConfiguration.DFLT_HEARTBEAT_INTERVAL;
+import static org.apache.ignite.client.IgniteClientConfiguration.DFLT_HEARTBEAT_TIMEOUT;
import static org.apache.ignite.client.IgniteClientConfiguration.DFLT_RECONNECT_THROTTLING_PERIOD;
import static org.apache.ignite.client.IgniteClientConfiguration.DFLT_RECONNECT_THROTTLING_RETRIES;
import static org.apache.ignite.internal.client.ClientUtils.sync;
@@ -34,6 +35,7 @@ import org.apache.ignite.internal.client.IgniteClientConfigurationImpl;
import org.apache.ignite.internal.client.TcpIgniteClient;
import org.apache.ignite.lang.LoggerFactory;
import org.apache.ignite.network.ClusterNode;
+import org.jetbrains.annotations.Nullable;
/**
* Ignite client entry point.
@@ -86,10 +88,14 @@ public interface IgniteClient extends Ignite {
/** Heartbeat interval. */
private long heartbeatInterval = DFLT_HEARTBEAT_INTERVAL;
+ /** Heartbeat timeout. */
+ private long heartbeatTimeout = DFLT_HEARTBEAT_TIMEOUT;
+
/** Retry policy. */
- private RetryPolicy retryPolicy = new RetryReadPolicy();
+ private @Nullable RetryPolicy retryPolicy = new RetryReadPolicy();
- private LoggerFactory loggerFactory;
+ /** Logger factory. */
+ private @Nullable LoggerFactory loggerFactory;
/**
* Sets the addresses of Ignite server nodes within a cluster. An address can be an IP address or a hostname, with or without port.
@@ -116,7 +122,7 @@ public interface IgniteClient extends Ignite {
* @param retryPolicy Retry policy.
* @return This instance.
*/
- public Builder retryPolicy(RetryPolicy retryPolicy) {
+ public Builder retryPolicy(@Nullable RetryPolicy retryPolicy) {
this.retryPolicy = retryPolicy;
return this;
@@ -240,6 +246,20 @@ public interface IgniteClient extends Ignite {
return this;
}
+ /**
+ * Sets the heartbeat message timeout, in milliseconds. Default is <code>5000</code>.
+ *
+ * <p>When a server does not respond to a heartbeat within the specified timeout, client will close the connection.
+ *
+ * @param heartbeatTimeout Heartbeat timeout.
+ * @return This instance.
+ */
+ public Builder heartbeatTimeout(long heartbeatTimeout) {
+ this.heartbeatTimeout = heartbeatTimeout;
+
+ return this;
+ }
+
/**
* Builds the client.
*
@@ -263,6 +283,7 @@ public interface IgniteClient extends Ignite {
reconnectThrottlingRetries,
asyncContinuationExecutor,
heartbeatInterval,
+ heartbeatTimeout,
retryPolicy,
loggerFactory
);
diff --git a/modules/client/src/main/java/org/apache/ignite/client/IgniteClientConfiguration.java b/modules/client/src/main/java/org/apache/ignite/client/IgniteClientConfiguration.java
index 5fefcbfcd7..d957492bb1 100644
--- a/modules/client/src/main/java/org/apache/ignite/client/IgniteClientConfiguration.java
+++ b/modules/client/src/main/java/org/apache/ignite/client/IgniteClientConfiguration.java
@@ -36,12 +36,12 @@ public interface IgniteClientConfiguration {
/** Default socket connect timeout, in milliseconds. */
int DFLT_CONNECT_TIMEOUT = 5000;
+ /** Default heartbeat timeout, in milliseconds. */
+ int DFLT_HEARTBEAT_TIMEOUT = 5000;
+
/** Default heartbeat interval, in milliseconds. */
int DFLT_HEARTBEAT_INTERVAL = 30_000;
- /** Default operation retry limit. */
- int DFLT_RETRY_LIMIT = 5;
-
/** Default reconnect throttling period. */
long DFLT_RECONNECT_THROTTLING_PERIOD = 30_000L;
@@ -121,6 +121,18 @@ public interface IgniteClientConfiguration {
*/
public long heartbeatInterval();
+ /**
+ * Gets the heartbeat message timeout, in milliseconds. Default is <code>5000</code>.
+ *
+ * <p>When a server does not respond to a heartbeat within the specified timeout, client will close the connection.
+ *
+ * <p>When thin client connection is idle (no operations are performed), heartbeat messages are sent periodically
+ * to keep the connection alive and detect potential half-open state.
+ *
+ * @return Heartbeat interval.
+ */
+ public long heartbeatTimeout();
+
/**
* Returns the logger factory. This factory will be used to create a logger instance when needed.
*
diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/ClientUtils.java b/modules/client/src/main/java/org/apache/ignite/internal/client/ClientUtils.java
index fea4e78e20..3e0ba81d86 100644
--- a/modules/client/src/main/java/org/apache/ignite/internal/client/ClientUtils.java
+++ b/modules/client/src/main/java/org/apache/ignite/internal/client/ClientUtils.java
@@ -20,8 +20,12 @@ package org.apache.ignite.internal.client;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import org.apache.ignite.client.ClientOperationType;
+import org.apache.ignite.client.IgniteClientConfiguration;
import org.apache.ignite.internal.client.proto.ClientOp;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.lang.IgniteException;
+import org.apache.ignite.lang.LoggerFactory;
/**
* Client utilities.
@@ -178,4 +182,20 @@ public class ClientUtils {
throw new UnsupportedOperationException("Invalid op code: " + opCode);
}
}
+
+ /**
+ * Gets a logger for the given class.
+ *
+ * @param cls Class.
+ * @return Logger.
+ */
+ public static <T> IgniteLogger logger(IgniteClientConfiguration cfg, Class<T> cls) {
+ var loggerFactory = cfg.loggerFactory() == null
+ ? (LoggerFactory) System::getLogger
+ : cfg.loggerFactory();
+
+ return loggerFactory == null
+ ? Loggers.voidLogger()
+ : Loggers.forClass(cls, loggerFactory);
+ }
}
diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/IgniteClientConfigurationImpl.java b/modules/client/src/main/java/org/apache/ignite/internal/client/IgniteClientConfigurationImpl.java
index 842c53c8e2..2fa413b2c2 100644
--- a/modules/client/src/main/java/org/apache/ignite/internal/client/IgniteClientConfigurationImpl.java
+++ b/modules/client/src/main/java/org/apache/ignite/internal/client/IgniteClientConfigurationImpl.java
@@ -49,6 +49,9 @@ public final class IgniteClientConfigurationImpl implements IgniteClientConfigur
/** Heartbeat interval. */
private final long heartbeatInterval;
+ /** Heartbeat timout. */
+ private final long heartbeatTimeout;
+
/** Retry policy. */
private final RetryPolicy retryPolicy;
@@ -64,6 +67,7 @@ public final class IgniteClientConfigurationImpl implements IgniteClientConfigur
* @param reconnectThrottlingRetries Reconnect throttling retries.
* @param asyncContinuationExecutor Async continuation executor.
* @param heartbeatInterval Heartbeat message interval.
+ * @param heartbeatTimeout Heartbeat message timeout.
* @param retryPolicy Retry policy.
* @param loggerFactory Logger factory which will be used to create a logger instance for this this particular client when needed.
*/
@@ -75,8 +79,9 @@ public final class IgniteClientConfigurationImpl implements IgniteClientConfigur
int reconnectThrottlingRetries,
Executor asyncContinuationExecutor,
long heartbeatInterval,
- RetryPolicy retryPolicy,
- LoggerFactory loggerFactory
+ long heartbeatTimeout,
+ @Nullable RetryPolicy retryPolicy,
+ @Nullable LoggerFactory loggerFactory
) {
this.addressFinder = addressFinder;
@@ -88,6 +93,7 @@ public final class IgniteClientConfigurationImpl implements IgniteClientConfigur
this.reconnectThrottlingRetries = reconnectThrottlingRetries;
this.asyncContinuationExecutor = asyncContinuationExecutor;
this.heartbeatInterval = heartbeatInterval;
+ this.heartbeatTimeout = heartbeatTimeout;
this.retryPolicy = retryPolicy;
this.loggerFactory = loggerFactory;
}
@@ -134,6 +140,12 @@ public final class IgniteClientConfigurationImpl implements IgniteClientConfigur
return heartbeatInterval;
}
+ /** {@inheritDoc} */
+ @Override
+ public long heartbeatTimeout() {
+ return heartbeatTimeout;
+ }
+
/** {@inheritDoc} */
@Override
public @Nullable LoggerFactory loggerFactory() {
diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java b/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
index d739ba0598..04d80f52a0 100644
--- a/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
+++ b/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
@@ -114,10 +114,10 @@ public final class ReliableChannel implements AutoCloseable {
* @param clientCfg Client config.
*/
ReliableChannel(BiFunction<ClientChannelConfiguration, ClientConnectionMultiplexer, ClientChannel> chFactory,
- IgniteClientConfiguration clientCfg, IgniteLogger log) {
+ IgniteClientConfiguration clientCfg) {
this.clientCfg = Objects.requireNonNull(clientCfg, "clientCfg");
this.chFactory = Objects.requireNonNull(chFactory, "chFactory");
- this.log = Objects.requireNonNull(log, "log");
+ this.log = ClientUtils.logger(clientCfg, ReliableChannel.class);
connMgr = new NettyClientConnectionMultiplexer();
connMgr.start(clientCfg);
diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
index 4ca527d49b..6f7bde9e2c 100644
--- a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
+++ b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
@@ -38,6 +38,7 @@ import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
@@ -53,6 +54,7 @@ import org.apache.ignite.internal.client.proto.ClientOp;
import org.apache.ignite.internal.client.proto.ProtocolVersion;
import org.apache.ignite.internal.client.proto.ResponseFlags;
import org.apache.ignite.internal.client.proto.ServerMessageType;
+import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.NetworkAddress;
@@ -97,9 +99,15 @@ class TcpClientChannel implements ClientChannel, ClientMessageHandler, ClientCon
/** Connect timeout in milliseconds. */
private final long connectTimeout;
+ /** Heartbeat timeout in milliseconds. */
+ private final long heartbeatTimeout;
+
/** Heartbeat timer. */
private final Timer heartbeatTimer;
+ /** Logger. */
+ private final IgniteLogger log;
+
/** Last send operation timestamp. */
private volatile long lastSendMillis;
@@ -112,11 +120,14 @@ class TcpClientChannel implements ClientChannel, ClientMessageHandler, ClientCon
TcpClientChannel(ClientChannelConfiguration cfg, ClientConnectionMultiplexer connMgr) {
validateConfiguration(cfg);
+ log = ClientUtils.logger(cfg.clientConfiguration(), TcpClientChannel.class);
+
asyncContinuationExecutor = cfg.clientConfiguration().asyncContinuationExecutor() == null
? ForkJoinPool.commonPool()
: cfg.clientConfiguration().asyncContinuationExecutor();
connectTimeout = cfg.clientConfiguration().connectTimeout();
+ heartbeatTimeout = cfg.clientConfiguration().heartbeatTimeout();
sock = connMgr.open(cfg.getAddress(), this, this);
@@ -136,7 +147,7 @@ class TcpClientChannel implements ClientChannel, ClientMessageHandler, ClientCon
/**
* Close the channel with cause.
*/
- private void close(Exception cause) {
+ private void close(@Nullable Exception cause) {
if (closed.compareAndSet(false, true)) {
// Disconnect can happen before we initialize the timer.
var timer = heartbeatTimer;
@@ -312,7 +323,7 @@ class TcpClientChannel implements ClientChannel, ClientMessageHandler, ClientCon
* @param unpacker Unpacker.
* @return Exception.
*/
- private IgniteException readError(ClientMessageUnpacker unpacker) {
+ private static IgniteException readError(ClientMessageUnpacker unpacker) {
var traceId = unpacker.unpackUuid();
var code = unpacker.unpackInt();
var errClassName = unpacker.unpackString();
@@ -379,8 +390,14 @@ class TcpClientChannel implements ClientChannel, ClientMessageHandler, ClientCon
try {
handshakeReq(ver);
- var res = connectTimeout > 0 ? fut.get(connectTimeout, TimeUnit.MILLISECONDS) : fut.get();
- handshakeRes(res, ver);
+ // handshakeRes must be called even in case of timeout to release the buffer.
+ var resFut = fut.thenAccept(res -> handshakeRes(res, ver));
+
+ if (connectTimeout > 0) {
+ resFut.get(connectTimeout, TimeUnit.MILLISECONDS);
+ } else {
+ resFut.get();
+ }
} catch (Throwable e) {
throw IgniteException.wrap(e);
}
@@ -498,7 +515,7 @@ class TcpClientChannel implements ClientChannel, ClientMessageHandler, ClientCon
private final long interval;
/** Constructor. */
- public HeartbeatTask(long interval) {
+ HeartbeatTask(long interval) {
this.interval = interval;
}
@@ -506,7 +523,21 @@ class TcpClientChannel implements ClientChannel, ClientMessageHandler, ClientCon
@Override public void run() {
try {
if (System.currentTimeMillis() - lastSendMillis > interval) {
- serviceAsync(ClientOp.HEARTBEAT, null, null);
+ var fut = serviceAsync(ClientOp.HEARTBEAT, null, null);
+
+ if (connectTimeout > 0) {
+ fut
+ .orTimeout(heartbeatTimeout, TimeUnit.MILLISECONDS)
+ .exceptionally(e -> {
+ if (e instanceof TimeoutException) {
+ log.warn("Heartbeat timeout, closing the channel");
+
+ close((TimeoutException) e);
+ }
+
+ return null;
+ });
+ }
}
} catch (Throwable ignored) {
// Ignore failed heartbeats.
diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpIgniteClient.java b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpIgniteClient.java
index 2ccf7506af..efc3227dde 100644
--- a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpIgniteClient.java
+++ b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpIgniteClient.java
@@ -34,8 +34,6 @@ import org.apache.ignite.internal.client.sql.ClientSql;
import org.apache.ignite.internal.client.table.ClientTables;
import org.apache.ignite.internal.client.tx.ClientTransactions;
import org.apache.ignite.internal.jdbc.proto.ClientMessage;
-import org.apache.ignite.internal.logger.Loggers;
-import org.apache.ignite.lang.LoggerFactory;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.sql.IgniteSql;
@@ -88,13 +86,7 @@ public class TcpIgniteClient implements IgniteClient {
this.cfg = cfg;
- var loggerFactory = cfg.loggerFactory() == null
- ? (LoggerFactory) System::getLogger
- : cfg.loggerFactory();
-
- var log = Loggers.forClass(TcpIgniteClient.class, loggerFactory);
-
- ch = new ReliableChannel(chFactory, cfg, log);
+ ch = new ReliableChannel(chFactory, cfg);
tables = new ClientTables(ch);
transactions = new ClientTransactions(ch);
compute = new ClientCompute(ch, tables);
diff --git a/modules/client/src/test/java/org/apache/ignite/client/AbstractClientTest.java b/modules/client/src/test/java/org/apache/ignite/client/AbstractClientTest.java
index 422e56993b..efb03a47f8 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/AbstractClientTest.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/AbstractClientTest.java
@@ -136,7 +136,7 @@ public abstract class AbstractClientTest {
Ignite ignite,
String nodeName
) {
- return new TestServer(port, portRange, idleTimeout, ignite, null, nodeName, clusterId);
+ return new TestServer(port, portRange, idleTimeout, ignite, null, null, nodeName, clusterId);
}
/**
diff --git a/modules/client/src/test/java/org/apache/ignite/client/ClientComputeTest.java b/modules/client/src/test/java/org/apache/ignite/client/ClientComputeTest.java
index 22c23493fa..db8db0311d 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/ClientComputeTest.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/ClientComputeTest.java
@@ -176,9 +176,9 @@ public class ClientComputeTest {
var clusterId = UUID.randomUUID();
- server1 = new TestServer(10900, 10, 0, ignite, shouldDropConnection, "s1", clusterId);
- server2 = new TestServer(10910, 10, 0, ignite, shouldDropConnection, "s2", clusterId);
- server3 = new TestServer(10920, 10, 0, ignite, shouldDropConnection, "s3", clusterId);
+ server1 = new TestServer(10900, 10, 0, ignite, shouldDropConnection, null, "s1", clusterId);
+ server2 = new TestServer(10910, 10, 0, ignite, shouldDropConnection, null, "s2", clusterId);
+ server3 = new TestServer(10920, 10, 0, ignite, shouldDropConnection, null, "s3", clusterId);
}
private Set<ClusterNode> getClusterNodes(String... names) {
diff --git a/modules/client/src/test/java/org/apache/ignite/client/ConnectionTest.java b/modules/client/src/test/java/org/apache/ignite/client/ConnectionTest.java
index a6ee3677cf..7c6f03aba8 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/ConnectionTest.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/ConnectionTest.java
@@ -17,11 +17,17 @@
package org.apache.ignite.client;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.endsWith;
import static org.junit.jupiter.api.Assertions.assertThrows;
+import java.util.UUID;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Function;
+import org.apache.ignite.client.IgniteClient.Builder;
+import org.apache.ignite.client.fakes.FakeIgnite;
import org.apache.ignite.lang.IgniteException;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
@@ -70,7 +76,22 @@ public class ConnectionTest extends AbstractClientTest {
testConnection("[::1]:" + serverPort);
}
- private void testConnection(String... addrs) throws Exception {
+ @SuppressWarnings("ThrowableNotThrown")
+ @Test
+ public void testNoResponseFromServerWithinConnectTimeoutThrowsException() throws Exception {
+ Function<Integer, Integer> responseDelay = x -> 500;
+
+ try (var srv = new TestServer(10800, 10, 300, new FakeIgnite(), x -> false, responseDelay, null, UUID.randomUUID())) {
+ Builder builder = IgniteClient.builder()
+ .addresses("127.0.0.1:" + srv.port())
+ .retryPolicy(new RetryLimitPolicy().retryLimit(1))
+ .connectTimeout(50);
+
+ assertThrowsWithCause(builder::build, TimeoutException.class);
+ }
+ }
+
+ private static void testConnection(String... addrs) throws Exception {
AbstractClientTest.startClient(addrs).close();
}
}
diff --git a/modules/client/src/test/java/org/apache/ignite/client/HeartbeatTest.java b/modules/client/src/test/java/org/apache/ignite/client/HeartbeatTest.java
index c0cfe67c16..8a797702a9 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/HeartbeatTest.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/HeartbeatTest.java
@@ -20,8 +20,11 @@ package org.apache.ignite.client;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
+import java.util.UUID;
+import java.util.function.Function;
import org.apache.ignite.client.IgniteClient.Builder;
import org.apache.ignite.client.fakes.FakeIgnite;
+import org.apache.ignite.internal.testframework.IgniteTestUtils;
import org.junit.jupiter.api.Test;
/**
@@ -75,4 +78,29 @@ public class HeartbeatTest {
assertEquals("Negative delay.", ex.getMessage());
}
}
+
+ @Test
+ public void testHeartbeatTimeoutClosesConnection() throws Exception {
+ Function<Integer, Integer> responseDelayFunc = requestCount -> requestCount > 1 ? 500 : 0;
+ var loggerFactory = new TestLoggerFactory("client");
+
+ try (var srv = new TestServer(10800, 10, 300, new FakeIgnite(), x -> false, responseDelayFunc, null, UUID.randomUUID())) {
+ int srvPort = srv.port();
+
+ Builder builder = IgniteClient.builder()
+ .addresses("127.0.0.1:" + srvPort)
+ .retryPolicy(new RetryLimitPolicy().retryLimit(1))
+ .heartbeatTimeout(30)
+ .reconnectThrottlingPeriod(5000)
+ .reconnectThrottlingRetries(0)
+ .heartbeatInterval(50)
+ .loggerFactory(loggerFactory);
+
+ try (var ignored = builder.build()) {
+ IgniteTestUtils.waitForCondition(
+ () -> loggerFactory.logger.entries().stream().anyMatch(x -> x.contains("Heartbeat timeout, closing the channel")),
+ 3000);
+ }
+ }
+ }
}
diff --git a/modules/client/src/test/java/org/apache/ignite/client/MultiClusterTest.java b/modules/client/src/test/java/org/apache/ignite/client/MultiClusterTest.java
index 9af2164593..7955e58739 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/MultiClusterTest.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/MultiClusterTest.java
@@ -49,8 +49,8 @@ public class MultiClusterTest {
@BeforeEach
void setUp() {
- server1 = new TestServer(10900, 10, 0, new FakeIgnite(), null, "s1", clusterId1);
- server2 = new TestServer(10900, 10, 0, new FakeIgnite(), null, "s2", clusterId2);
+ server1 = new TestServer(10900, 10, 0, new FakeIgnite(), null, null, "s1", clusterId1);
+ server2 = new TestServer(10900, 10, 0, new FakeIgnite(), null, null, "s2", clusterId2);
}
@AfterEach
@@ -91,7 +91,7 @@ public class MultiClusterTest {
client.tables().tables();
server1.close();
- server1 = new TestServer(10900, 10, 0, new FakeIgnite(), null, "s1", clusterId2);
+ server1 = new TestServer(10900, 10, 0, new FakeIgnite(), null, null, "s1", clusterId2);
IgniteClientConnectionException ex = (IgniteClientConnectionException) assertThrowsWithCause(
() -> client.tables().tables(), IgniteClientConnectionException.class, "Cluster ID mismatch");
diff --git a/modules/client/src/test/java/org/apache/ignite/client/RetryPolicyTest.java b/modules/client/src/test/java/org/apache/ignite/client/RetryPolicyTest.java
index 35b7ee1297..cbcde03218 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/RetryPolicyTest.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/RetryPolicyTest.java
@@ -216,7 +216,7 @@ public class RetryPolicyTest {
@Test
public void testRetryReadPolicyAllOperationsSupported() throws IllegalAccessException {
var plc = new RetryReadPolicy();
- var cfg = new IgniteClientConfigurationImpl(null, null, 0, 0, 0, null, 0, null, null);
+ var cfg = new IgniteClientConfigurationImpl(null, null, 0, 0, 0, null, 0, 0, null, null);
for (var op : ClientOperationType.values()) {
var ctx = new RetryPolicyContextImpl(cfg, op, 0, null);
@@ -265,6 +265,6 @@ public class RetryPolicyTest {
FakeIgnite ign = new FakeIgnite();
((FakeIgniteTables) ign.tables()).createTable("t");
- server = new TestServer(10900, 10, 0, ign, shouldDropConnection, null, UUID.randomUUID());
+ server = new TestServer(10900, 10, 0, ign, shouldDropConnection, null, null, UUID.randomUUID());
}
}
diff --git a/modules/client/src/test/java/org/apache/ignite/client/TestClientHandlerModule.java b/modules/client/src/test/java/org/apache/ignite/client/TestClientHandlerModule.java
index bc23b933bc..6995ecd021 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/TestClientHandlerModule.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/TestClientHandlerModule.java
@@ -60,6 +60,9 @@ public class TestClientHandlerModule implements IgniteComponent {
/** Connection drop condition. */
private final Function<Integer, Boolean> shouldDropConnection;
+ /** Server response delay function. */
+ private final Function<Integer, Integer> responseDelay;
+
/** Cluster service. */
private final ClusterService clusterService;
@@ -82,6 +85,7 @@ public class TestClientHandlerModule implements IgniteComponent {
* @param registry Configuration registry.
* @param bootstrapFactory Bootstrap factory.
* @param shouldDropConnection Connection drop condition.
+ * @param responseDelay Response delay, in milliseconds.
* @param clusterService Cluster service.
* @param compute Compute.
* @param clusterId Cluster id.
@@ -91,6 +95,7 @@ public class TestClientHandlerModule implements IgniteComponent {
ConfigurationRegistry registry,
NettyBootstrapFactory bootstrapFactory,
Function<Integer, Boolean> shouldDropConnection,
+ @Nullable Function<Integer, Integer> responseDelay,
ClusterService clusterService,
IgniteCompute compute,
UUID clusterId) {
@@ -102,6 +107,7 @@ public class TestClientHandlerModule implements IgniteComponent {
this.registry = registry;
this.bootstrapFactory = bootstrapFactory;
this.shouldDropConnection = shouldDropConnection;
+ this.responseDelay = responseDelay;
this.clusterService = clusterService;
this.compute = compute;
this.clusterId = clusterId;
@@ -166,6 +172,7 @@ public class TestClientHandlerModule implements IgniteComponent {
ch.pipeline().addLast(
new ClientMessageDecoder(),
new ConnectionDropHandler(requestCounter, shouldDropConnection),
+ new ResponseDelayHandler(responseDelay),
new ClientInboundMessageHandler(
(IgniteTablesInternal) ignite.tables(),
ignite.transactions(),
@@ -230,4 +237,32 @@ public class TestClientHandlerModule implements IgniteComponent {
}
}
}
+
+ private static class ResponseDelayHandler extends ChannelInboundHandlerAdapter {
+ /** Delay. */
+ private final Function<Integer, Integer> delay;
+
+ /** Counter. */
+ private final AtomicInteger cnt = new AtomicInteger();
+
+ /**
+ * Constructor.
+ *
+ * @param delay Delay.
+ */
+ private ResponseDelayHandler(@Nullable Function<Integer, Integer> delay) {
+ this.delay = delay;
+ }
+
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+ var delayMs = delay == null ? 0 : delay.apply(cnt.incrementAndGet());
+
+ if (delayMs > 0) {
+ Thread.sleep(delayMs);
+ }
+
+ super.channelRead(ctx, msg);
+ }
+ }
}
diff --git a/modules/client/src/test/java/org/apache/ignite/client/TestServer.java b/modules/client/src/test/java/org/apache/ignite/client/TestServer.java
index df1e9d8045..432c777224 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/TestServer.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/TestServer.java
@@ -74,7 +74,7 @@ public class TestServer implements AutoCloseable {
long idleTimeout,
Ignite ignite
) {
- this(port, portRange, idleTimeout, ignite, null, null, UUID.randomUUID());
+ this(port, portRange, idleTimeout, ignite, null, null, null, UUID.randomUUID());
}
/**
@@ -91,7 +91,8 @@ public class TestServer implements AutoCloseable {
long idleTimeout,
Ignite ignite,
@Nullable Function<Integer, Boolean> shouldDropConnection,
- String nodeName,
+ @Nullable Function<Integer, Integer> responseDelay,
+ @Nullable String nodeName,
UUID clusterId
) {
cfg = new ConfigurationRegistry(
@@ -131,7 +132,15 @@ public class TestServer implements AutoCloseable {
compute.executeColocated(anyString(), any(), anyString(), any())).thenReturn(CompletableFuture.completedFuture(nodeName));
module = shouldDropConnection != null
- ? new TestClientHandlerModule(ignite, cfg, bootstrapFactory, shouldDropConnection, clusterService, compute, clusterId)
+ ? new TestClientHandlerModule(
+ ignite,
+ cfg,
+ bootstrapFactory,
+ shouldDropConnection,
+ responseDelay,
+ clusterService,
+ compute,
+ clusterId)
: new ClientHandlerModule(
((FakeIgnite) ignite).queryEngine(),
(IgniteTablesInternal) ignite.tables(),