You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2019/02/07 12:24:26 UTC
[ignite] branch master updated: IGNITE-7648 Fixed
IGNITE_ENABLE_FORCIBLE_NODE_KILL system property - Fixes #3501.
This is an automated email from the ASF dual-hosted git repository.
agoncharuk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 26d141e IGNITE-7648 Fixed IGNITE_ENABLE_FORCIBLE_NODE_KILL system property - Fixes #3501.
26d141e is described below
commit 26d141e4ca66f717db0771ab8fdc83e301875595
Author: Pavel Voronkin <pv...@gridgain.com>
AuthorDate: Thu Feb 7 15:17:07 2019 +0300
IGNITE-7648 Fixed IGNITE_ENABLE_FORCIBLE_NODE_KILL system property - Fixes #3501.
Signed-off-by: Alexey Goncharuk <al...@gmail.com>
---
.../spi/ExponentialBackoffTimeoutStrategy.java | 138 +++++++++
.../spi/IgniteSpiOperationTimeoutHelper.java | 8 +-
.../org/apache/ignite/spi/TimeoutStrategy.java | 60 ++++
.../spi/communication/tcp/TcpCommunicationSpi.java | 322 ++++++++++-----------
.../tcp/messages/RecoveryLastReceivedMessage.java | 3 +
.../IgniteClientReconnectAbstractTest.java | 6 +-
.../cache/GridCachePutAllFailoverSelfTest.java | 12 -
.../cache/IgniteCachePutAllRestartTest.java | 14 +-
.../CacheGetInsideLockChangingTopologyTest.java | 6 +-
.../GridCacheAbstractNodeRestartSelfTest.java | 14 +-
.../IgniteBinaryMetadataUpdateNodeRestartTest.java | 11 +-
.../distributed/IgniteCacheGetRestartTest.java | 6 +-
.../IgniteCacheNearRestartRollbackSelfTest.java | 12 -
...WriteSynchronizationModesMultithreadedTest.java | 1 +
.../dht/IgniteCachePutRetryAbstractSelfTest.java | 6 +-
.../GridServiceProxyClientReconnectSelfTest.java | 4 +-
.../ServiceDeploymentOnClientDisconnectTest.java | 5 +-
.../spi/ExponentialBackoffTimeoutStrategyTest.java | 102 +++++++
.../IgniteTcpCommunicationHandshakeWaitTest.java | 16 +-
.../tcp/TcpCommunicationSpiDropNodesTest.java | 139 +++------
.../TcpCommunicationSpiFaultyClientSslTest.java | 38 +++
.../tcp/TcpCommunicationSpiFaultyClientTest.java | 76 ++++-
.../tcp/TcpCommunicationSpiFreezingClientTest.java | 188 ++++++++++++
.../spi/discovery/tcp/IgniteClientConnectTest.java | 89 +++++-
.../tcp/TcpClientDiscoverySpiSelfTest.java | 25 +-
.../IgniteSpiCommunicationSelfTestSuite.java | 6 +
.../IgniteSpiDiscoverySelfTestSuite.java | 3 +
.../ZookeeperDiscoveryClientDisconnectTest.java | 11 +
28 files changed, 966 insertions(+), 355 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/ExponentialBackoffTimeoutStrategy.java b/modules/core/src/main/java/org/apache/ignite/spi/ExponentialBackoffTimeoutStrategy.java
new file mode 100644
index 0000000..41d34e5
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/ExponentialBackoffTimeoutStrategy.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi;
+
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**
+ * Strategy which incorporates retriable network operation, handling of totalTimeout logic.
+ * It increases startTimeout based on exponential backoff algorithm.
+ *
+ * If failure detection is enabled it relies on totalTimeout
+ * otherwise implements exponential backoff totalTimeout logic based on startTimeout, maxTimeout and retryCnt.
+ */
+public class ExponentialBackoffTimeoutStrategy implements TimeoutStrategy {
+ /** Default backoff coefficient to calculate next timeout based on backoff strategy. */
+ private static final double DLFT_BACKOFF_COEFF = 2.0;
+
+ /** Max timeout of the next try, ms. */
+ private final long maxTimeout;
+
+ /** Total timeout, ms. */
+ private final long totalTimeout;
+
+ /** Timestamp of operation start to check totalTimeout. */
+ private final long start;
+
+ /** Current calculated timeout, ms. */
+ private long currTimeout;
+
+ /**
+ * Compute expected max backoff timeout based on initTimeout, maxTimeout and reconCnt and backoff coefficient.
+ *
+ * @param initTimeout Initial timeout.
+ * @param maxTimeout Max Timeout per retry.
+ * @param reconCnt Reconnection count.
+ * @return Calculated total backoff timeout.
+ */
+ public static long totalBackoffTimeout(
+ long initTimeout,
+ long maxTimeout,
+ long reconCnt
+ ) {
+ long totalBackoffTimeout = initTimeout;
+
+ for (int i = 1; i < reconCnt && totalBackoffTimeout < maxTimeout; i++)
+ totalBackoffTimeout += backoffTimeout(totalBackoffTimeout, maxTimeout);
+
+ return totalBackoffTimeout;
+ }
+
+ /**
+ *
+ * @param timeout Timeout.
+ * @param maxTimeout Maximum startTimeout for backoff function.
+ * @return Next exponential backoff timeout.
+ */
+ public static long backoffTimeout(long timeout, long maxTimeout) {
+ return (long) Math.min(timeout * DLFT_BACKOFF_COEFF, maxTimeout);
+ }
+
+ /**
+ *
+ * @param totalTimeout Total startTimeout.
+ * @param startTimeout Initial connection timeout.
+ * @param maxTimeout Max connection Timeout.
+ *
+ */
+ public ExponentialBackoffTimeoutStrategy(
+ long totalTimeout,
+ long startTimeout,
+ long maxTimeout
+ ) {
+ this.totalTimeout = totalTimeout;
+
+ this.maxTimeout = maxTimeout;
+
+ currTimeout = startTimeout;
+
+ start = U.currentTimeMillis();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long nextTimeout(long timeout) throws IgniteSpiOperationTimeoutException {
+ long remainingTime = remainingTime(U.currentTimeMillis());
+
+ if (remainingTime <= 0)
+ throw new IgniteSpiOperationTimeoutException("Operation timed out [timeoutStrategy= " +this +"]");
+
+ /*
+ If timeout is zero that means we need return current verified timeout and calculate next timeout.
+ In case of non zero we just reverify previously calculated value not to breach totalTimeout.
+ */
+ if (timeout == 0) {
+ long prevTimeout = currTimeout;
+
+ currTimeout = backoffTimeout(currTimeout, maxTimeout);
+
+ return Math.min(prevTimeout, remainingTime);
+ } else
+ return Math.min(timeout, remainingTime);
+ }
+
+ /**
+ * Returns remaining time for current totalTimeout chunk.
+ *
+ * @param curTs Current timestamp.
+ * @return Time to wait in millis.
+ */
+ public long remainingTime(long curTs) {
+ return totalTimeout - (curTs - start);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean checkTimeout(long timeInFut) {
+ return remainingTime(U.currentTimeMillis() + timeInFut) <= 0;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(ExponentialBackoffTimeoutStrategy.class, this);
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutHelper.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutHelper.java
index b2432ce..9f1773b 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutHelper.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiOperationTimeoutHelper.java
@@ -27,8 +27,12 @@ import org.apache.ignite.internal.util.typedef.internal.U;
*
* A new instance of the class should be created for every complex network based operations that usually consists of
* request and response parts.
+ *
*/
public class IgniteSpiOperationTimeoutHelper {
+ // https://issues.apache.org/jira/browse/IGNITE-11221
+ // We need to reuse new logic ExponentialBackoffTimeout logic in TcpDiscovery instead of this class.
+
/** */
private long lastOperStartTs;
@@ -84,7 +88,7 @@ public class IgniteSpiOperationTimeoutHelper {
"'failureDetectionTimeout' configuration property [failureDetectionTimeout="
+ failureDetectionTimeout + ']');
}
-
+
return timeout;
}
@@ -103,4 +107,4 @@ public class IgniteSpiOperationTimeoutHelper {
return (timeout - (U.currentTimeMillis() - lastOperStartTs) <= 0);
}
-}
\ No newline at end of file
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/TimeoutStrategy.java b/modules/core/src/main/java/org/apache/ignite/spi/TimeoutStrategy.java
new file mode 100644
index 0000000..d5bfcfb
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/TimeoutStrategy.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi;
+
+/**
+ * Strategy to calculate next timeout and check if total timeout reached.
+ */
+public interface TimeoutStrategy {
+ /**
+ * Get next timeout based on previously timeout calculated by strategy.
+ *
+ * @return Gets next timeout.
+ * @throws IgniteSpiOperationTimeoutException in case of total timeout already breached.
+ */
+ public long nextTimeout(long currTimeout) throws IgniteSpiOperationTimeoutException;
+
+ /**
+ * Get next timeout.
+ *
+ * @return Get next timeout.
+ * @throws IgniteSpiOperationTimeoutException In case of total timeout already breached.
+ */
+ public default long nextTimeout() throws IgniteSpiOperationTimeoutException {
+ return nextTimeout(0);
+ }
+
+ /**
+ * Check if total timeout will be reached in now() + timeInFut.
+ *
+ * If timeInFut is 0, will check that timeout already reached.
+ *
+ * @param timeInFut Some millis in future.
+ * @return {@code True} if total timeout will be reached.
+ */
+ public boolean checkTimeout(long timeInFut);
+
+ /**
+ * Check if total timeout will be reached by now.
+ *
+ * @return {@code True} if total timeout already reached.
+ */
+ public default boolean checkTimeout() {
+ return checkTimeout(0);
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index ee2445d..91cfdeb 100755
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -23,7 +23,6 @@ import java.io.OutputStream;
import java.net.ConnectException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
-import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
@@ -127,6 +126,7 @@ import org.apache.ignite.plugin.extensions.communication.MessageReader;
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.resources.LoggerResource;
+import org.apache.ignite.spi.ExponentialBackoffTimeoutStrategy;
import org.apache.ignite.spi.IgnitePortProtocol;
import org.apache.ignite.spi.IgniteSpiAdapter;
import org.apache.ignite.spi.IgniteSpiConfiguration;
@@ -139,6 +139,7 @@ import org.apache.ignite.spi.IgniteSpiOperationTimeoutException;
import org.apache.ignite.spi.IgniteSpiOperationTimeoutHelper;
import org.apache.ignite.spi.IgniteSpiThread;
import org.apache.ignite.spi.IgniteSpiTimeoutObject;
+import org.apache.ignite.spi.TimeoutStrategy;
import org.apache.ignite.spi.communication.CommunicationListener;
import org.apache.ignite.spi.communication.CommunicationSpi;
import org.apache.ignite.spi.communication.tcp.internal.ConnectionKey;
@@ -164,6 +165,7 @@ import static org.apache.ignite.spi.communication.tcp.internal.TcpCommunicationC
import static org.apache.ignite.spi.communication.tcp.messages.RecoveryLastReceivedMessage.ALREADY_CONNECTED;
import static org.apache.ignite.spi.communication.tcp.messages.RecoveryLastReceivedMessage.NEED_WAIT;
import static org.apache.ignite.spi.communication.tcp.messages.RecoveryLastReceivedMessage.NODE_STOPPING;
+import static org.apache.ignite.spi.communication.tcp.messages.RecoveryLastReceivedMessage.UNKNOWN_NODE;
/**
* <tt>TcpCommunicationSpi</tt> is default communication SPI which uses
@@ -327,6 +329,15 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
*/
public static final int DFLT_SELECTORS_CNT = Math.max(4, Runtime.getRuntime().availableProcessors() / 2);
+ /** Default initial connect/handshake timeout in case of failure detection enabled. */
+ private static final int DFLT_INITIAL_TIMEOUT = 500;
+
+ /** Default initial delay in case of target node is still out of topology. */
+ private static final int DFLT_NEED_WAIT_DELAY = 200;
+
+ /** Default delay between reconnects attempts in case of temporary network issues. */
+ private static final int DFLT_RECONNECT_DELAY = 50;
+
/**
* Version when client is ready to wait to connect to server (could be needed when client tries to open connection
* before it starts being visible for server)
@@ -539,7 +550,11 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
if (unknownNode) {
U.warn(log, "Close incoming connection, unknown node [nodeId=" + sndId + ", ses=" + ses + ']');
- ses.close();
+ ses.send(new RecoveryLastReceivedMessage(UNKNOWN_NODE)).listen(new CI1<IgniteInternalFuture<?>>() {
+ @Override public void apply(IgniteInternalFuture<?> fut) {
+ ses.close();
+ }
+ });
}
else {
ses.send(new RecoveryLastReceivedMessage(NEED_WAIT)).listen(new CI1<IgniteInternalFuture<?>>() {
@@ -2989,6 +3004,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
// then we are likely to run on the same host and shared memory communication could be tried.
if (shmemPort != null && U.sameMacs(locNode, node)) {
try {
+ // https://issues.apache.org/jira/browse/IGNITE-11126 Rework failure detection logic.
GridCommunicationClient client = createShmemClient(
node,
connIdx,
@@ -3082,11 +3098,10 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
try {
safeShmemHandshake(client, node.id(), timeoutHelper.nextTimeoutChunk(connTimeout0));
}
- catch (HandshakeTimeoutException | IgniteSpiOperationTimeoutException e) {
+ catch (IgniteSpiOperationTimeoutException e) {
client.forceClose();
- if (failureDetectionTimeoutEnabled() && (e instanceof HandshakeTimeoutException ||
- timeoutHelper.checkFailureTimeoutReached(e))) {
+ if (failureDetectionTimeoutEnabled() && timeoutHelper.checkFailureTimeoutReached(e)) {
if (log.isDebugEnabled())
log.debug("Handshake timed out (failure threshold reached) [failureDetectionTimeout=" +
failureDetectionTimeout() + ", err=" + e.getMessage() + ", client=" + client + ']');
@@ -3262,17 +3277,24 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
GridCommunicationClient client = null;
IgniteCheckedException errs = null;
- int connectAttempts = 1;
-
- for (InetSocketAddress addr : addrs) {
- long connTimeout0 = connTimeout;
-
- int attempt = 1;
+ long totalTimeout;
- IgniteSpiOperationTimeoutHelper timeoutHelper = new IgniteSpiOperationTimeoutHelper(this,
- !node.isClient());
+ if (failureDetectionTimeoutEnabled())
+ totalTimeout = node.isClient() ? clientFailureDetectionTimeout() : failureDetectionTimeout();
+ else {
+ totalTimeout = ExponentialBackoffTimeoutStrategy.totalBackoffTimeout(
+ connTimeout,
+ maxConnTimeout,
+ reconCnt
+ );
+ }
- long needWaitDelay0 = 200;
+ for (InetSocketAddress addr : addrs) {
+ TimeoutStrategy connTimeoutStgy = new ExponentialBackoffTimeoutStrategy(
+ totalTimeout,
+ failureDetectionTimeoutEnabled() ? DFLT_INITIAL_TIMEOUT : connTimeout,
+ maxConnTimeout
+ );
while (client == null) { // Reconnection on handshake timeout.
if (stopping)
@@ -3286,9 +3308,10 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
break;
}
- boolean needWait = false;
-
try {
+ if (getSpiContext().node(node.id()) == null)
+ throw new ClusterTopologyCheckedException("Failed to send message (node left topology): " + node);
+
SocketChannel ch = SocketChannel.open();
ch.configureBlocking(true);
@@ -3302,13 +3325,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
if (sockSndBuf > 0)
ch.socket().setSendBufferSize(sockSndBuf);
- if (getSpiContext().node(node.id()) == null) {
- U.closeQuiet(ch);
-
- throw new ClusterTopologyCheckedException("Failed to send message " +
- "(node left topology): " + node);
- }
-
ConnectionKey connKey = new ConnectionKey(node.id(), connIdx, -1);
GridNioRecoveryDescriptor recoveryDesc = outRecoveryDescriptor(node, connKey);
@@ -3327,14 +3343,19 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
return null;
}
- Long rcvCnt;
+ long rcvCnt;
Map<Integer, Object> meta = new HashMap<>();
GridSslMeta sslMeta = null;
try {
- ch.socket().connect(addr, (int)timeoutHelper.nextTimeoutChunk(connTimeout));
+ long currTimeout = connTimeoutStgy.nextTimeout();
+
+ ch.socket().connect(addr, (int) currTimeout);
+
+ if (getSpiContext().node(node.id()) == null)
+ throw new ClusterTopologyCheckedException("Failed to send message (node left topology): " + node);
if (isSslEnabled()) {
meta.put(SSL_META.ordinal(), sslMeta = new GridSslMeta());
@@ -3351,7 +3372,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
rcvCnt = safeTcpHandshake(ch,
recoveryDesc,
node.id(),
- timeoutHelper.nextTimeoutChunk(connTimeout0),
+ connTimeoutStgy.nextTimeout(currTimeout),
sslMeta,
handshakeConnIdx);
@@ -3359,11 +3380,38 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
return null;
else if (rcvCnt == NODE_STOPPING)
throw new ClusterTopologyCheckedException("Remote node started stop procedure: " + node.id());
+ else if (rcvCnt == UNKNOWN_NODE)
+ throw new ClusterTopologyCheckedException("Remote node does not observe current node " +
+ "in topology : " + node.id());
else if (rcvCnt == NEED_WAIT) {
- needWait = true;
+ //check that failure timeout will be reached after sleep(outOfTopDelay).
+ if (connTimeoutStgy.checkTimeout(DFLT_NEED_WAIT_DELAY)) {
+ U.warn(log, "Handshake NEED_WAIT timed out (will stop attempts to perform the handshake) " +
+ "[node=" + node.id() +
+ ", connTimeoutStgy=" + connTimeoutStgy +
+ ", addr=" + addr +
+ ", failureDetectionTimeoutEnabled" + failureDetectionTimeoutEnabled() +
+ ", totalTimeout" + totalTimeout + ']');
+
+ throw new ClusterTopologyCheckedException("Failed to connect to node " +
+ "(current or target node is out of topology on target node within timeout). " +
+ "Make sure that each ComputeTask and cache Transaction has a timeout set " +
+ "in order to prevent parties from waiting forever in case of network issues " +
+ "[nodeId=" + node.id() + ", addrs=" + addrs + ']');
+ }
+ else {
+ if (log.isDebugEnabled())
+ log.debug("NEED_WAIT received, handshake after delay [node = "
+ + node + ", outOfTopologyDelay = " + DFLT_NEED_WAIT_DELAY + "ms]");
- continue;
+ U.sleep(DFLT_NEED_WAIT_DELAY);
+
+ continue;
+ }
}
+ else if (rcvCnt < 0)
+ throw new IgniteCheckedException("Unsupported negative receivedCount [rcvCnt=" + rcvCnt +
+ ", senderNode=" + node + ']');
meta.put(CONN_IDX_META, connKey);
@@ -3383,93 +3431,49 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
if (recoveryDesc != null)
recoveryDesc.release();
-
- if (needWait) {
- /*
- https://issues.apache.org/jira/browse/IGNITE-7648
- We should add failure detection check here like exception case.
- */
- if (needWaitDelay0 < 60_000)
- needWaitDelay0 *= 2;
-
- if (log.isDebugEnabled())
- log.debug("NEED_WAIT received, reconnect after delay [node = "
- + node + ", delay = " + needWaitDelay0 + "ms]");
-
- U.sleep(needWaitDelay0);
- }
}
}
}
- catch (HandshakeTimeoutException | IgniteSpiOperationTimeoutException e) {
+ catch (IgniteSpiOperationTimeoutException e) { // Handshake is timed out.
if (client != null) {
client.forceClose();
client = null;
}
- if (failureDetectionTimeoutEnabled() && (e instanceof HandshakeTimeoutException ||
- X.hasCause(e, SocketException.class) ||
- timeoutHelper.checkFailureTimeoutReached(e))) {
-
- String msg = "Handshake timed out (failure detection timeout is reached) " +
- "[failureDetectionTimeout=" + failureDetectionTimeout() + ", addr=" + addr + ']';
-
- onException(msg, e);
-
- if (log.isDebugEnabled())
- log.debug(msg);
-
- if (errs == null)
- errs = new IgniteCheckedException("Failed to connect to node (is node still alive?). " +
- "Make sure that each ComputeTask and cache Transaction has a timeout set " +
- "in order to prevent parties from waiting forever in case of network issues " +
- "[nodeId=" + node.id() + ", addrs=" + addrs + ']');
-
- errs.addSuppressed(new IgniteCheckedException("Failed to connect to address: " + addr, e));
-
- break;
- }
-
- assert !failureDetectionTimeoutEnabled();
-
- onException("Handshake timed out (will retry with increased timeout) [timeout=" + connTimeout0 +
+ onException("Handshake timed out (will retry with increased timeout) [connTimeoutStrategy=" + connTimeoutStgy +
", addr=" + addr + ']', e);
if (log.isDebugEnabled())
- log.debug(
- "Handshake timed out (will retry with increased timeout) [timeout=" + connTimeout0 +
- ", addr=" + addr + ", err=" + e + ']');
-
- if (attempt == reconCnt || connTimeout0 > maxConnTimeout) {
- U.warn(log, "Handshake timedout (will stop attempts to perform the handshake) " +
- "[node=" + node.id() + ", timeout=" + connTimeout0 +
- ", maxConnTimeout=" + maxConnTimeout +
- ", attempt=" + attempt + ", reconCnt=" + reconCnt +
- ", err=" + e.getMessage() + ", addr=" + addr + ']');
+ log.debug("Handshake timed out (will retry with increased timeout) [connTimeoutStrategy=" + connTimeoutStgy +
+ ", addr=" + addr + ", err=" + e + ']'
+ );
+
+ if (connTimeoutStgy.checkTimeout()) {
+ U.warn(log, "Handshake timed out (will stop attempts to perform the handshake) " +
+ "[node=" + node.id() + ", connTimeoutStrategy=" + connTimeoutStgy +
+ ", err=" + e.getMessage() + ", addr=" + addr +
+ ", failureDetectionTimeoutEnabled" + failureDetectionTimeoutEnabled() +
+ ", totalTimeout" + totalTimeout + ']');
+
+ String msg = "Failed to connect to node (is node still alive?). " +
+ "Make sure that each ComputeTask and cache Transaction has a timeout set " +
+ "in order to prevent parties from waiting forever in case of network issues " +
+ "[nodeId=" + node.id() + ", addrs=" + addrs + ']';
if (errs == null)
- errs = new IgniteCheckedException("Failed to connect to node (is node still alive?). " +
- "Make sure that each ComputeTask and cache Transaction has a timeout set " +
- "in order to prevent parties from waiting forever in case of network issues " +
- "[nodeId=" + node.id() + ", addrs=" + addrs + ']');
-
- errs.addSuppressed(new IgniteCheckedException("Failed to connect to address: " + addr, e));
+ errs = new IgniteCheckedException(msg, e);
+ else
+ errs.addSuppressed(new IgniteCheckedException(msg, e));
break;
}
- else {
- attempt++;
-
- connTimeout0 *= 2;
-
- // Continue loop.
- }
}
catch (ClusterTopologyCheckedException e) {
throw e;
}
catch (Exception e) {
+ // Most probably IO error on socket connect or handshake.
if (client != null) {
client.forceClose();
@@ -3481,41 +3485,42 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
if (log.isDebugEnabled())
log.debug("Client creation failed [addr=" + addr + ", err=" + e + ']');
- boolean failureDetThrReached = timeoutHelper.checkFailureTimeoutReached(e);
-
- if (enableTroubleshootingLog)
- U.error(log, "Failed to establish connection to a remote node [node=" + node +
- ", addr=" + addr + ", connectAttempts=" + connectAttempts +
- ", failureDetThrReached=" + failureDetThrReached + ']', e);
+ // check if timeout occured in case of unrecoverable exception
+ if (connTimeoutStgy.checkTimeout()) {
+ U.warn(log, "Connection timed out (will stop attempts to perform the connect) " +
+ "[node=" + node.id() + ", connTimeoutStgy=" + connTimeoutStgy +
+ ", failureDetectionTimeoutEnabled" + failureDetectionTimeoutEnabled() +
+ ", totalTimeout" + totalTimeout +
+ ", err=" + e.getMessage() + ", addr=" + addr + ']');
- if (failureDetThrReached)
- LT.warn(log, "Connect timed out (consider increasing 'failureDetectionTimeout' " +
- "configuration property) [addr=" + addr + ", failureDetectionTimeout=" +
- failureDetectionTimeout() + ']');
- else if (X.hasCause(e, SocketTimeoutException.class))
- LT.warn(log, "Connect timed out (consider increasing 'connTimeout' " +
- "configuration property) [addr=" + addr + ", connTimeout=" + connTimeout + ']');
+ String msg = "Failed to connect to node (is node still alive?). " +
+ "Make sure that each ComputeTask and cache Transaction has a timeout set " +
+ "in order to prevent parties from waiting forever in case of network issues " +
+ "[nodeId=" + node.id() + ", addrs=" + addrs + ']';
- if (errs == null)
- errs = new IgniteCheckedException("Failed to connect to node (is node still alive?). " +
- "Make sure that each ComputeTask and cache Transaction has a timeout set " +
- "in order to prevent parties from waiting forever in case of network issues " +
- "[nodeId=" + node.id() + ", addrs=" + addrs + ']');
+ if (errs == null)
+ errs = new IgniteCheckedException(msg, e);
+ else
+ errs.addSuppressed(new IgniteCheckedException(msg, e));
- errs.addSuppressed(new IgniteCheckedException("Failed to connect to address " +
- "[addr=" + addr + ", err=" + e.getMessage() + ']', e));
+ break;
+ }
- // Reconnect for the second time, if connection is not established.
- if (!failureDetThrReached && connectAttempts < 5 &&
- (X.hasCause(e, ConnectException.class, HandshakeException.class, SocketTimeoutException.class))) {
- U.sleep(200);
+ if (isRecoverableException(e))
+ U.sleep(DFLT_RECONNECT_DELAY);
+ else {
+ String msg = "Failed to connect to node due to unrecoverable exception (is node still alive?). " +
+ "Make sure that each ComputeTask and cache Transaction has a timeout set " +
+ "in order to prevent parties from waiting forever in case of network issues " +
+ "[nodeId=" + node.id() + ", addrs=" + addrs + ", err= "+ e + ']';
- connectAttempts++;
+ if (errs == null)
+ errs = new IgniteCheckedException(msg, e);
+ else
+ errs.addSuppressed(new IgniteCheckedException(msg, e));
- continue;
+ break;
}
-
- break;
}
}
@@ -3558,17 +3563,11 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
) throws IgniteCheckedException {
assert errs != null;
- if (X.hasCause(errs, ConnectException.class))
- LT.warn(log, "Failed to connect to a remote node " +
- "(make sure that destination node is alive and " +
- "operating system firewall is disabled on local and remote hosts) " +
- "[addrs=" + addrs + ']');
-
boolean commErrResolve = false;
IgniteSpiContext ctx = getSpiContext();
- if (connectionError(errs) && ctx.communicationFailureResolveSupported()) {
+ if (isRecoverableException(errs) && ctx.communicationFailureResolveSupported()) {
commErrResolve = true;
ctx.resolveCommunicationFailure(node, errs);
@@ -3576,8 +3575,11 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
if (!commErrResolve && enableForcibleNodeKill) {
if (ctx.node(node.id()) != null
- && (node.isClient() || !getLocalNode().isClient()) &&
- connectionError(errs)) {
+ && node.isClient()
+ && !getLocalNode().isClient()
+ && isRecoverableException(errs)
+ ) {
+ // Only server can fail client for now, as in TcpDiscovery resolveCommunicationFailure() is not supported.
String msg = "TcpCommunicationSpi failed to establish connection to node, node will be dropped from " +
"cluster [" + "rmtNode=" + node + ']';
@@ -3593,21 +3595,26 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
}
}
- if (!X.hasCause(errs, SocketTimeoutException.class, HandshakeTimeoutException.class,
- IgniteSpiOperationTimeoutException.class))
- throw errs;
+ throw errs;
}
/**
* @param errs Error.
- * @return {@code True} if error was caused by some connection IO error.
+ * @return {@code True} if error was caused by some connection IO error or IgniteCheckedException due to timeout.
*/
- private static boolean connectionError(IgniteCheckedException errs) {
- return X.hasCause(errs, ConnectException.class,
+ private boolean isRecoverableException(Exception errs) {
+ return X.hasCause(
+ errs,
+ IOException.class,
HandshakeException.class,
- SocketTimeoutException.class,
- HandshakeTimeoutException.class,
- IgniteSpiOperationTimeoutException.class);
+ IgniteSpiOperationTimeoutException.class
+ );
+ }
+
+ /** */
+ private IgniteSpiOperationTimeoutException handshakeTimeoutException() {
+ return new IgniteSpiOperationTimeoutException("Failed to perform handshake due to timeout " +
+ "(consider increasing 'connectionTimeout' configuration property).");
}
/**
@@ -3633,16 +3640,10 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
client.doHandshake(new HandshakeClosure(rmtNodeId));
}
finally {
- boolean cancelled = obj.cancel();
-
- if (cancelled)
+ if (obj.cancel())
removeTimeoutObject(obj);
-
- // Ignoring whatever happened after timeout - reporting only timeout event.
- if (!cancelled)
- throw new HandshakeTimeoutException(
- new IgniteSpiOperationTimeoutException("Failed to perform handshake due to timeout " +
- "(consider increasing 'connectionTimeout' configuration property)."));
+ else
+ throw handshakeTimeoutException();
}
}
@@ -3890,16 +3891,10 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
throw new IgniteCheckedException("Failed to read from channel.", e);
}
finally {
- boolean cancelled = obj.cancel();
-
- if (cancelled)
+ if (obj.cancel())
removeTimeoutObject(obj);
-
- // Ignoring whatever happened after timeout - reporting only timeout event.
- if (!cancelled)
- throw new HandshakeTimeoutException(
- new IgniteSpiOperationTimeoutException("Failed to perform handshake due to timeout " +
- "(consider increasing 'connectionTimeout' configuration property)."));
+ else
+ throw handshakeTimeoutException();
}
return rcvCnt;
@@ -4120,19 +4115,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
}
}
- /** Internal exception class for proper timeout handling. */
- private static class HandshakeTimeoutException extends IgniteCheckedException {
- /** */
- private static final long serialVersionUID = 0L;
-
- /**
- * @param cause Exception cause
- */
- HandshakeTimeoutException(IgniteSpiOperationTimeoutException cause) {
- super(cause);
- }
- }
-
/**
* This worker takes responsibility to shut the server down when stopping,
* No other thread shall stop passed server.
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/messages/RecoveryLastReceivedMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/messages/RecoveryLastReceivedMessage.java
index 95d43f6..fc9f628 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/messages/RecoveryLastReceivedMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/messages/RecoveryLastReceivedMessage.java
@@ -42,6 +42,9 @@ public class RecoveryLastReceivedMessage implements Message {
/** Need wait. */
public static final long NEED_WAIT = -3;
+ /** Initiator node is not in current topogy. */
+ public static final long UNKNOWN_NODE = -4;
+
/** Message body size in bytes. */
private static final int MESSAGE_SIZE = 8;
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java
index 4fb6787..892757b 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java
@@ -158,8 +158,6 @@ public abstract class IgniteClientReconnectAbstractTest extends GridCommonAbstra
@Override protected void beforeTestsStarted() throws Exception {
super.beforeTestsStarted();
- System.setProperty(IgniteSystemProperties.IGNITE_ENABLE_FORCIBLE_NODE_KILL, "true");
-
int srvs = serverCount();
if (srvs > 0)
@@ -178,7 +176,9 @@ public abstract class IgniteClientReconnectAbstractTest extends GridCommonAbstra
/** {@inheritDoc} */
@Override protected void afterTestsStopped() throws Exception {
- System.clearProperty(IgniteSystemProperties.IGNITE_ENABLE_FORCIBLE_NODE_KILL);
+ super.afterTestsStopped();
+
+ stopAllGrids();
}
/**
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePutAllFailoverSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePutAllFailoverSelfTest.java
index fde2a2b..3bfc0af 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePutAllFailoverSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePutAllFailoverSelfTest.java
@@ -114,18 +114,6 @@ public class GridCachePutAllFailoverSelfTest extends GridCommonAbstractTest {
/** Test failover SPI. */
private MasterFailoverSpi failoverSpi = new MasterFailoverSpi((IgnitePredicate)workerNodesFilter);
- /** {@inheritDoc} */
- @Override protected void beforeTestsStarted() throws Exception {
- System.setProperty(IgniteSystemProperties.IGNITE_ENABLE_FORCIBLE_NODE_KILL, "true");
-
- super.beforeTestsStarted();
- }
-
- /** {@inheritDoc} */
- @Override protected void afterTestsStopped() throws Exception {
- System.clearProperty(IgniteSystemProperties.IGNITE_ENABLE_FORCIBLE_NODE_KILL);
- }
-
/**
* @throws Exception If failed.
*/
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCachePutAllRestartTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCachePutAllRestartTest.java
index 5c34af3..03285bb 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCachePutAllRestartTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCachePutAllRestartTest.java
@@ -73,19 +73,9 @@ public class IgniteCachePutAllRestartTest extends GridCommonAbstractTest {
}
/** {@inheritDoc} */
- @Override protected void beforeTestsStarted() throws Exception {
- System.setProperty(IgniteSystemProperties.IGNITE_ENABLE_FORCIBLE_NODE_KILL, "true");
-
- super.beforeTestsStarted();
- }
-
- /** {@inheritDoc} */
- @Override protected void afterTestsStopped() throws Exception {
- System.clearProperty(IgniteSystemProperties.IGNITE_ENABLE_FORCIBLE_NODE_KILL);
- }
-
- /** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
+ super.afterTest();
+
stopAllGrids();
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheGetInsideLockChangingTopologyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheGetInsideLockChangingTopologyTest.java
index 33efbd1..3b46232 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheGetInsideLockChangingTopologyTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheGetInsideLockChangingTopologyTest.java
@@ -114,8 +114,6 @@ public class CacheGetInsideLockChangingTopologyTest extends GridCommonAbstractTe
/** {@inheritDoc} */
@Override protected void beforeTestsStarted() throws Exception {
- System.setProperty(IgniteSystemProperties.IGNITE_ENABLE_FORCIBLE_NODE_KILL, "true");
-
super.beforeTestsStarted();
startGridsMultiThreaded(SRVS);
@@ -144,7 +142,9 @@ public class CacheGetInsideLockChangingTopologyTest extends GridCommonAbstractTe
/** {@inheritDoc} */
@Override protected void afterTestsStopped() throws Exception {
- System.clearProperty(IgniteSystemProperties.IGNITE_ENABLE_FORCIBLE_NODE_KILL);
+ stopAllGrids();
+
+ super.afterTestsStopped();
}
/**
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheAbstractNodeRestartSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheAbstractNodeRestartSelfTest.java
index 327c549..9333a1c 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheAbstractNodeRestartSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheAbstractNodeRestartSelfTest.java
@@ -152,19 +152,9 @@ public abstract class GridCacheAbstractNodeRestartSelfTest extends GridCommonAbs
protected abstract CacheConfiguration cacheConfiguration();
/** {@inheritDoc} */
- @Override protected void beforeTestsStarted() throws Exception {
- System.setProperty(IgniteSystemProperties.IGNITE_ENABLE_FORCIBLE_NODE_KILL, "true");
-
- super.beforeTestsStarted();
- }
-
- /** {@inheritDoc} */
- @Override protected void afterTestsStopped() throws Exception {
- System.clearProperty(IgniteSystemProperties.IGNITE_ENABLE_FORCIBLE_NODE_KILL);
- }
-
- /** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
+ super.afterTest();
+
stopAllGrids();
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteBinaryMetadataUpdateNodeRestartTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteBinaryMetadataUpdateNodeRestartTest.java
index 09461c8..78c9119 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteBinaryMetadataUpdateNodeRestartTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteBinaryMetadataUpdateNodeRestartTest.java
@@ -83,15 +83,10 @@ public class IgniteBinaryMetadataUpdateNodeRestartTest extends GridCommonAbstrac
}
/** {@inheritDoc} */
- @Override protected void beforeTestsStarted() throws Exception {
- super.beforeTestsStarted();
-
- System.setProperty(IgniteSystemProperties.IGNITE_ENABLE_FORCIBLE_NODE_KILL,"true");
- }
-
- /** {@inheritDoc} */
@Override protected void afterTestsStopped() throws Exception {
- System.clearProperty(IgniteSystemProperties.IGNITE_ENABLE_FORCIBLE_NODE_KILL);
+ stopAllGrids();
+
+ super.afterTestsStopped();
}
/**
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheGetRestartTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheGetRestartTest.java
index d63f731..701e9f0 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheGetRestartTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheGetRestartTest.java
@@ -84,8 +84,6 @@ public class IgniteCacheGetRestartTest extends GridCommonAbstractTest {
/** {@inheritDoc} */
@Override protected void beforeTestsStarted() throws Exception {
- System.setProperty(IgniteSystemProperties.IGNITE_ENABLE_FORCIBLE_NODE_KILL, "true");
-
super.beforeTestsStarted();
startGrids(SRVS);
@@ -101,7 +99,9 @@ public class IgniteCacheGetRestartTest extends GridCommonAbstractTest {
/** {@inheritDoc} */
@Override protected void afterTestsStopped() throws Exception {
- System.clearProperty(IgniteSystemProperties.IGNITE_ENABLE_FORCIBLE_NODE_KILL);
+ super.afterTestsStopped();
+
+ stopAllGrids();
}
/** {@inheritDoc} */
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheNearRestartRollbackSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheNearRestartRollbackSelfTest.java
index 9e57512..ce9d7c5 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheNearRestartRollbackSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheNearRestartRollbackSelfTest.java
@@ -84,18 +84,6 @@ public class IgniteCacheNearRestartRollbackSelfTest extends GridCommonAbstractTe
return cfg;
}
- /** {@inheritDoc} */
- @Override protected void beforeTestsStarted() throws Exception {
- super.beforeTestsStarted();
-
- System.setProperty(IgniteSystemProperties.IGNITE_ENABLE_FORCIBLE_NODE_KILL,"true");
- }
-
- /** {@inheritDoc} */
- @Override protected void afterTestsStopped() throws Exception {
- System.clearProperty(IgniteSystemProperties.IGNITE_ENABLE_FORCIBLE_NODE_KILL);
- }
-
/**
* @param igniteInstanceName Ignite instance name.
* @return Cache configuration.
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxCacheWriteSynchronizationModesMultithreadedTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxCacheWriteSynchronizationModesMultithreadedTest.java
index 3e7e9a5..cff41c6 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxCacheWriteSynchronizationModesMultithreadedTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxCacheWriteSynchronizationModesMultithreadedTest.java
@@ -32,6 +32,7 @@ import javax.cache.integration.CacheWriterException;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.cache.store.CacheStore;
import org.apache.ignite.cache.store.CacheStoreAdapter;
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java
index f7219c7..725c835 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java
@@ -129,8 +129,6 @@ public abstract class IgniteCachePutRetryAbstractSelfTest extends GridCommonAbst
/** {@inheritDoc} */
@Override protected void beforeTestsStarted() throws Exception {
- System.setProperty(IgniteSystemProperties.IGNITE_ENABLE_FORCIBLE_NODE_KILL, "true");
-
super.beforeTestsStarted();
startGridsMultiThreaded(GRID_CNT);
@@ -138,7 +136,9 @@ public abstract class IgniteCachePutRetryAbstractSelfTest extends GridCommonAbst
/** {@inheritDoc} */
@Override protected void afterTestsStopped() throws Exception {
- System.clearProperty(IgniteSystemProperties.IGNITE_ENABLE_FORCIBLE_NODE_KILL);
+ super.afterTestsStopped();
+
+ stopAllGrids();
}
/** {@inheritDoc} */
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProxyClientReconnectSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProxyClientReconnectSelfTest.java
index 87d0f61..1d88acd 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProxyClientReconnectSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProxyClientReconnectSelfTest.java
@@ -76,7 +76,7 @@ public class GridServiceProxyClientReconnectSelfTest extends GridCommonAbstractT
startGrid("server");
- assertTrue(latch.await(10, TimeUnit.SECONDS));
+ assertTrue(latch.await(12, TimeUnit.SECONDS));
client.services().deployClusterSingleton("my-service", new MyServiceImpl());
@@ -113,7 +113,7 @@ public class GridServiceProxyClientReconnectSelfTest extends GridCommonAbstractT
startGrid("server");
- assertTrue(latch.await(10, TimeUnit.SECONDS));
+ assertTrue(latch.await(12, TimeUnit.SECONDS));
client.services().deployClusterSingleton("my-service", new MyLongInitServiceImpl());
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/ServiceDeploymentOnClientDisconnectTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/ServiceDeploymentOnClientDisconnectTest.java
index de2ea20..ba61cd1 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/ServiceDeploymentOnClientDisconnectTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/ServiceDeploymentOnClientDisconnectTest.java
@@ -204,7 +204,10 @@ public class ServiceDeploymentOnClientDisconnectTest extends GridCommonAbstractT
server().close();
- assertTrue(latch.await(CLIENT_FAILURE_DETECTION_TIMEOUT * 2, TimeUnit.MILLISECONDS));
+ assertTrue(latch.await(
+ CLIENT_FAILURE_DETECTION_TIMEOUT * 2 + CLIENT_FAILURE_DETECTION_TIMEOUT/10,
+ TimeUnit.MILLISECONDS
+ ));
task.run();
}
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/ExponentialBackoffTimeoutStrategyTest.java b/modules/core/src/test/java/org/apache/ignite/spi/ExponentialBackoffTimeoutStrategyTest.java
new file mode 100644
index 0000000..e535541
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/spi/ExponentialBackoffTimeoutStrategyTest.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi;
+
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Test for {@link ExponentialBackoffTimeoutStrategyTest}.
+ */
+@RunWith(JUnit4.class)
+public class ExponentialBackoffTimeoutStrategyTest extends GridCommonAbstractTest {
+ /** */
+ @Test
+ public void checkTimeout() {
+ ExponentialBackoffTimeoutStrategy helper = new ExponentialBackoffTimeoutStrategy(
+ 5_000L,
+ 1000L,
+ 3000L
+ );
+
+ checkTimeout(helper, 5_000L);
+ }
+
+ /** */
+ @Test
+ public void backoff() throws IgniteSpiOperationTimeoutException {
+ ExponentialBackoffTimeoutStrategy strategy = new ExponentialBackoffTimeoutStrategy(
+ 25_000L,
+ 1000L,
+ 3_000L
+ );
+
+ assertEquals(1000L, strategy.nextTimeout());
+
+ assertEquals(1000L, strategy.nextTimeout(1000L));
+
+ assertEquals(2000L, strategy.nextTimeout());
+
+ assertEquals(1000L, strategy.nextTimeout(1000L));
+
+ assertEquals(2000L, strategy.nextTimeout(2000L));
+
+ assertEquals(3000L, strategy.nextTimeout());
+
+ assertEquals(3000L, strategy.nextTimeout());
+
+ assertEquals(100L, strategy.nextTimeout(100L));
+ }
+
+ /** */
+ @Test
+ public void totalBackoffTimeout() {
+ assertEquals(8_000, ExponentialBackoffTimeoutStrategy.totalBackoffTimeout(1000, 5000, 3));
+ assertEquals(45_000, ExponentialBackoffTimeoutStrategy.totalBackoffTimeout(5_000, 60_000, 3));
+ }
+
+ /** */
+ private void checkTimeout(
+ ExponentialBackoffTimeoutStrategy strategy,
+ long timeout
+ ) {
+ long start = System.currentTimeMillis();
+
+ while (true) {
+ boolean timedOut = strategy.checkTimeout();
+
+ if (timedOut) {
+ assertTrue( (System.currentTimeMillis() + 100 - start) >= timeout);
+
+ try {
+ strategy.nextTimeout();
+
+ fail("Should fail with IgniteSpiOperationTimeoutException");
+ } catch (IgniteSpiOperationTimeoutException ignored) {
+ //No-op
+ }
+
+ return;
+ }
+ }
+
+
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationHandshakeWaitTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationHandshakeWaitTest.java
index e1db298..f480a69 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationHandshakeWaitTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationHandshakeWaitTest.java
@@ -69,8 +69,8 @@ public class IgniteTcpCommunicationHandshakeWaitTest extends GridCommonAbstractT
TcpCommunicationSpi commSpi = new TcpCommunicationSpi();
commSpi.setConnectTimeout(COMMUNICATION_TIMEOUT);
- commSpi.setMaxConnectTimeout(COMMUNICATION_TIMEOUT);
- commSpi.setReconnectCount(1);
+ commSpi.setMaxConnectTimeout(4 * COMMUNICATION_TIMEOUT);
+ commSpi.setReconnectCount(3);
cfg.setCommunicationSpi(commSpi);
@@ -94,7 +94,7 @@ public class IgniteTcpCommunicationHandshakeWaitTest extends GridCommonAbstractT
slowNet.set(true);
IgniteInternalFuture fut = GridTestUtils.runAsync(() -> {
- latch.await(2 * COMMUNICATION_TIMEOUT, TimeUnit.MILLISECONDS);
+ latch.await(expectedTimeout(), TimeUnit.MILLISECONDS);
Collection<ClusterNode> nodes = ignite.context().discovery().aliveServerNodes();
@@ -108,6 +108,16 @@ public class IgniteTcpCommunicationHandshakeWaitTest extends GridCommonAbstractT
fut.get();
}
+ /** */
+ private long expectedTimeout() {
+ long maxBackoffTimeout = COMMUNICATION_TIMEOUT;
+
+ for (int i = 1; i < 3 && maxBackoffTimeout < 3 * COMMUNICATION_TIMEOUT; i++)
+ maxBackoffTimeout += Math.min(2 * maxBackoffTimeout, 3 * COMMUNICATION_TIMEOUT);
+
+ return maxBackoffTimeout;
+ }
+
/** {@inheritDoc} */
@Override protected void afterTestsStopped() throws Exception {
System.clearProperty(IgniteSystemProperties.IGNITE_ENABLE_FORCIBLE_NODE_KILL);
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiDropNodesTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiDropNodesTest.java
index 1448403..0659db7 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiDropNodesTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiDropNodesTest.java
@@ -21,22 +21,23 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Callable;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.events.Event;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.internal.util.nio.GridCommunicationClient;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.lang.IgniteRunnable;
+import org.apache.ignite.spi.IgniteSpiException;
import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
@@ -45,7 +46,7 @@ import org.junit.Test;
import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
/**
- *
+ * Tests grid node kicking on communication failure.
*/
public class TcpCommunicationSpiDropNodesTest extends GridCommonAbstractTest {
/** Nodes count. */
@@ -100,6 +101,8 @@ public class TcpCommunicationSpiDropNodesTest extends GridCommonAbstractTest {
}
/**
+ * Server node shouldn't be failed by other server node if IGNITE_ENABLE_FORCIBLE_NODE_KILL=true.
+ *
* @throws Exception If failed.
*/
@Test
@@ -112,12 +115,11 @@ public class TcpCommunicationSpiDropNodesTest extends GridCommonAbstractTest {
startGrids(NODES_CNT);
- final CountDownLatch latch = new CountDownLatch(1);
+ AtomicInteger evts = new AtomicInteger();
grid(0).events().localListen(new IgnitePredicate<Event>() {
- @Override
- public boolean apply(Event event) {
- latch.countDown();
+ @Override public boolean apply(Event evt) {
+ evts.incrementAndGet();
return true;
}
@@ -127,56 +129,26 @@ public class TcpCommunicationSpiDropNodesTest extends GridCommonAbstractTest {
block = true;
- grid(0).compute().broadcast(new IgniteRunnable() {
- @Override public void run() {
- // No-op.
- }
- });
-
- assertTrue(latch.await(15, TimeUnit.SECONDS));
-
- assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() {
- @Override public boolean apply() {
- return grid(3).cluster().topologyVersion() == NODES_CNT + 1;
- }
- }, 5000));
-
- for (int i = 0; i < 10; i++) {
- U.sleep(1000);
-
- assertEquals(NODES_CNT - 1, grid(0).cluster().nodes().size());
-
- int liveNodesCnt = 0;
-
- for (int j = 0; j < NODES_CNT; j++) {
- IgniteEx ignite;
-
- try {
- ignite = grid(j);
-
- log.info("Checking topology for grid(" + j + "): " + ignite.cluster().nodes());
-
- ClusterNode locNode = ignite.localNode();
-
- if (locNode.order() != 3) {
- assertEquals(NODES_CNT - 1, ignite.cluster().nodes().size());
-
- for (ClusterNode node : ignite.cluster().nodes())
- assertTrue(node.order() != 3);
-
- liveNodesCnt++;
- }
- }
- catch (Exception e) {
- log.info("Checking topology for grid(" + j + "): no grid in topology.");
+ try {
+ grid(0).compute().broadcast(new IgniteRunnable() {
+ @Override public void run() {
+ // No-op.
}
- }
+ });
- assertEquals(NODES_CNT - 1, liveNodesCnt);
+ fail("Should have exception here.");
+ } catch (IgniteException e) {
+ assertTrue(e.getCause() instanceof IgniteSpiException);
}
+
+ block = false;
+
+ assertEquals(NODES_CNT, grid(0).cluster().nodes().size());
+ assertEquals(0, evts.get());
}
/**
+ * Servers shouldn't fail each other if IGNITE_ENABLE_FORCIBLE_NODE_KILL=true.
* @throws Exception If failed.
*/
@Test
@@ -190,11 +162,11 @@ public class TcpCommunicationSpiDropNodesTest extends GridCommonAbstractTest {
startGrids(NODES_CNT);
- final CountDownLatch latch = new CountDownLatch(1);
+ AtomicInteger evts = new AtomicInteger();
grid(0).events().localListen(new IgnitePredicate<Event>() {
@Override public boolean apply(Event evt) {
- latch.countDown();
+ evts.incrementAndGet();
return true;
}
@@ -234,65 +206,40 @@ public class TcpCommunicationSpiDropNodesTest extends GridCommonAbstractTest {
}
});
- assertTrue(latch.await(5, TimeUnit.SECONDS));
-
- GridTestUtils.waitForCondition(new GridAbsPredicate() {
- @Override public boolean apply() {
- return grid(2).cluster().nodes().size() == NODES_CNT - 1;
- }
- }, 5000);
-
try {
fut1.get();
+
+ fail("Should fail with SpiException");
}
catch (IgniteCheckedException e) {
- // No-op.
+ assertTrue(e.getCause().getCause() instanceof IgniteSpiException);
}
try {
fut2.get();
+
+ fail("Should fail with SpiException");
}
catch (IgniteCheckedException e) {
- // No-op.
+ assertTrue(e.getCause().getCause() instanceof IgniteSpiException);
}
- long failedNodeOrder = 1 + 2 + 3 + 4;
-
- for (ClusterNode node : grid(0).cluster().nodes())
- failedNodeOrder -= node.order();
-
- for (int i = 0; i < 10; i++) {
- U.sleep(1000);
+ assertEquals(NODES_CNT , grid(0).cluster().nodes().size());
+ assertEquals(0, evts.get());
- assertEquals(NODES_CNT - 1, grid(0).cluster().nodes().size());
+ for (int j = 0; j < NODES_CNT; j++) {
+ IgniteEx ignite;
- int liveNodesCnt = 0;
+ try {
+ ignite = grid(j);
- for (int j = 0; j < NODES_CNT; j++) {
- IgniteEx ignite;
+ log.info("Checking topology for grid(" + j + "): " + ignite.cluster().nodes());
- try {
- ignite = grid(j);
-
- log.info("Checking topology for grid(" + j + "): " + ignite.cluster().nodes());
-
- ClusterNode locNode = ignite.localNode();
-
- if (locNode.order() != failedNodeOrder) {
- assertEquals(NODES_CNT - 1, ignite.cluster().nodes().size());
-
- for (ClusterNode node : ignite.cluster().nodes())
- assertTrue(node.order() != failedNodeOrder);
-
- liveNodesCnt++;
- }
- }
- catch (Exception e) {
- log.info("Checking topology for grid(" + j + "): no grid in topology.");
- }
+ assertEquals(NODES_CNT, ignite.cluster().nodes().size());
+ }
+ catch (Exception e) {
+ log.info("Checking topology for grid(" + j + "): no grid in topology.");
}
-
- assertEquals(NODES_CNT - 1, liveNodesCnt);
}
}
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiFaultyClientSslTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiFaultyClientSslTest.java
new file mode 100644
index 0000000..6510475
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiFaultyClientSslTest.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.communication.tcp;
+
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests that faulty client will be failed if connection can't be established.
+ */
+@RunWith(JUnit4.class)
+public class TcpCommunicationSpiFaultyClientSslTest extends TcpCommunicationSpiFaultyClientTest {
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ cfg.setSslContextFactory(GridTestUtils.sslFactory());
+
+ return cfg;
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiFaultyClientTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiFaultyClientTest.java
index 530b898..3e9ee6b 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiFaultyClientTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiFaultyClientTest.java
@@ -66,15 +66,33 @@ public class TcpCommunicationSpiFaultyClientTest extends GridCommonAbstractTest
/** Block. */
private static volatile boolean block;
+ /** */
+ private int failureDetectionTimeout = 3000;
+
+ /** */
+ private int connectTimeout = -1;
+
+ /** */
+ private int maxConnectTimeout = -1;
+
+ /** */
+ private int reconnectCnt = -1;
+
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(gridName);
- cfg.setFailureDetectionTimeout(1000);
+ cfg.setFailureDetectionTimeout(failureDetectionTimeout);
cfg.setClientMode(clientMode);
TestCommunicationSpi spi = new TestCommunicationSpi();
+ if (connectTimeout != -1) {
+ spi.setConnectTimeout(connectTimeout);
+ spi.setMaxConnectTimeout(maxConnectTimeout);
+ spi.setReconnectCount(reconnectCnt);
+ }
+
spi.setIdleConnectionTimeout(100);
spi.setSharedMemoryPort(-1);
@@ -93,7 +111,9 @@ public class TcpCommunicationSpiFaultyClientTest extends GridCommonAbstractTest
}
/** {@inheritDoc} */
- @Override protected void afterTestsStopped() {
+ @Override protected void afterTestsStopped() throws Exception {
+ super.afterTestsStopped();
+
System.clearProperty(IgniteSystemProperties.IGNITE_ENABLE_FORCIBLE_NODE_KILL);
}
@@ -111,12 +131,37 @@ public class TcpCommunicationSpiFaultyClientTest extends GridCommonAbstractTest
stopAllGrids();
}
+ /** */
+ private long computeExpectedDelay() {
+ if (connectTimeout == -1)
+ return failureDetectionTimeout;
+
+ long expDelay = 0;
+
+ for (int i = 1; i < reconnectCnt && expDelay < maxConnectTimeout; i++)
+ expDelay += Math.min(connectTimeout * 2, maxConnectTimeout);
+
+ return expDelay;
+ }
+
/**
* @throws Exception If failed.
*/
@Test
public void testNoServerOnHost() throws Exception {
- testFailClient(null);
+ testFailClient(null, computeExpectedDelay());
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testNoServerOnHostCustomFailureDetection() throws Exception {
+ connectTimeout = 3000;
+ maxConnectTimeout = 6000;
+ reconnectCnt = 3;
+
+ testFailClient(null, computeExpectedDelay());
}
/**
@@ -124,14 +169,27 @@ public class TcpCommunicationSpiFaultyClientTest extends GridCommonAbstractTest
*/
@Test
public void testNotAcceptedConnection() throws Exception {
- testFailClient(new FakeServer());
+ testFailClient(new FakeServer(), computeExpectedDelay());
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testNotAcceptedConnectionCustomFailureDetection() throws Exception {
+ connectTimeout = 3000;
+ maxConnectTimeout = 6000;
+ reconnectCnt = 3;
+
+ testFailClient(new FakeServer(), computeExpectedDelay());
}
/**
* @param srv Server.
+ * @param expDelay Expected delay until client is gone while trying to establish connection.
* @throws Exception If failed.
*/
- private void testFailClient(FakeServer srv) throws Exception {
+ private void testFailClient(FakeServer srv, long expDelay) throws Exception {
IgniteInternalFuture<Long> fut = null;
try {
@@ -179,6 +237,8 @@ public class TcpCommunicationSpiFaultyClientTest extends GridCommonAbstractTest
block = true;
+ long t1 = U.currentTimeMillis();
+
try {
grid(0).compute(grid(0).cluster().forClients()).withNoFailover().broadcast(new IgniteRunnable() {
@Override public void run() {
@@ -190,7 +250,11 @@ public class TcpCommunicationSpiFaultyClientTest extends GridCommonAbstractTest
// No-op.
}
- assertTrue(latch.await(3, TimeUnit.SECONDS));
+ final long time = U.currentTimeMillis() - t1;
+
+ assertTrue("Must try longer than expected delay", time >= expDelay);
+
+ assertTrue(latch.await(expDelay + 1000, TimeUnit.MILLISECONDS));
assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() {
@Override public boolean apply() {
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiFreezingClientTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiFreezingClientTest.java
new file mode 100644
index 0000000..2a6cb00
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiFreezingClientTest.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.communication.tcp;
+
+import java.lang.management.ManagementFactory;
+import java.util.Iterator;
+import javax.cache.Cache;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.cache.query.ScanQuery;
+import org.apache.ignite.cluster.ClusterTopologyException;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.lang.IgniteCallable;
+import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.resources.LoggerResource;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ * Tests that freezing due to JVM STW client will be failed if connection can't be established.
+ */
+@RunWith(JUnit4.class)
+public class TcpCommunicationSpiFreezingClientTest extends GridCommonAbstractTest {
+ /** */
+ private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ cfg.setFailureDetectionTimeout(120000);
+ cfg.setClientFailureDetectionTimeout(120000);
+ cfg.setClientMode("client".equals(gridName));
+
+ TcpCommunicationSpi spi = new TcpCommunicationSpi();
+
+ spi.setConnectTimeout(3000);
+ spi.setMaxConnectTimeout(6000);
+ spi.setReconnectCount(3);
+ spi.setIdleConnectionTimeout(100);
+ spi.setSharedMemoryPort(-1);
+
+ TcpDiscoverySpi discoSpi = (TcpDiscoverySpi)cfg.getDiscoverySpi();
+
+ discoSpi.setIpFinder(IP_FINDER);
+
+ cfg.setCommunicationSpi(spi);
+ cfg.setDiscoverySpi(discoSpi);
+
+ cfg.setCacheConfiguration(new CacheConfiguration(DEFAULT_CACHE_NAME).setWriteSynchronizationMode(FULL_SYNC).
+ setCacheMode(PARTITIONED).setAtomicityMode(ATOMIC));
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ super.beforeTestsStarted();
+
+ System.setProperty(IgniteSystemProperties.IGNITE_ENABLE_FORCIBLE_NODE_KILL, "true");
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ super.afterTestsStopped();
+
+ System.clearProperty(IgniteSystemProperties.IGNITE_ENABLE_FORCIBLE_NODE_KILL);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected boolean isMultiJvm() {
+ return true;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testFreezingClient() throws Exception {
+ try {
+ final IgniteEx srv = startGrid(0);
+
+ final IgniteEx client = startGrid("client");
+
+ final int keysCnt = 100_000;
+
+ try (IgniteDataStreamer<Integer, byte[]> streamer = srv.dataStreamer(DEFAULT_CACHE_NAME)) {
+ for (int i = 0; i < keysCnt; i++)
+ streamer.addData(i, new byte[512]);
+ }
+
+ // Wait for connections go idle.
+ doSleep(1000);
+
+ srv.compute(srv.cluster().forNode(client.localNode())).withNoFailover().call(new ClientClosure());
+
+ fail("Client node must be kicked from topology");
+ }
+ catch (ClusterTopologyException e) {
+ // Expected.
+ }
+ finally {
+ stopAllGrids();
+ }
+ }
+
+ /** */
+ public static class ClientClosure implements IgniteCallable<Integer> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ @IgniteInstanceResource
+ private transient Ignite ignite;
+
+ @LoggerResource
+ private IgniteLogger log;
+
+ /** {@inheritDoc} */
+ @Override public Integer call() throws Exception {
+ Thread loadThread = new Thread() {
+ @Override public void run() {
+ log.info("result = " + simulateLoad());
+ }
+ };
+
+ loadThread.setName("load-thread");
+ loadThread.start();
+
+ int cnt = 0;
+
+ final Iterator<Cache.Entry<Integer, byte[]>> it = ignite.cache(DEFAULT_CACHE_NAME).
+ query(new ScanQuery<Integer, byte[]>().setPageSize(100000)).iterator();
+
+ while (it.hasNext()) {
+ Cache.Entry<Integer, byte[]> entry = it.next();
+
+ // Trigger STW.
+ final long[] tids = ManagementFactory.getThreadMXBean().findDeadlockedThreads();
+
+ cnt++;
+ }
+
+ loadThread.join();
+
+ return cnt;
+ }
+
+ /**
+ *
+ */
+ public static double simulateLoad() {
+ double d = 0;
+
+ for (int i = 0; i < 1000000000; i++)
+ d += Math.log(Math.PI * i);
+
+ return d;
+ }
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/IgniteClientConnectTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/IgniteClientConnectTest.java
index 9928f7e..5ae26d2 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/IgniteClientConnectTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/IgniteClientConnectTest.java
@@ -26,14 +26,17 @@ import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
+import javax.cache.CacheException;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddFinishedMessage;
@@ -52,7 +55,22 @@ public class IgniteClientConnectTest extends GridCommonAbstractTest {
/** Start client flag. */
private final AtomicBoolean clientJustStarted = new AtomicBoolean(false);
-
+
+ /** Failure detection timeout. */
+ private int failureDetectionTimeout = -1;
+
+ /** Node add finished delay. */
+ private int nodeAddFinishedDelay = 5_000;
+
+ /** Connection timeout. */
+ private long connTimeout = -1;
+
+ /** Maxx connection timeout. */
+ private long maxxConnTimeout = -1;
+
+ /** Recon count. */
+ private int reconCnt = -1;
+
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
@@ -65,10 +83,25 @@ public class IgniteClientConnectTest extends GridCommonAbstractTest {
ipFinder.registerAddresses(Collections.singleton(new InetSocketAddress(InetAddress.getLoopbackAddress(), 47501)));
disco.setIpFinder(ipFinder);
+
+
+ if (failureDetectionTimeout != -1)
+ cfg.setFailureDetectionTimeout(failureDetectionTimeout);
+
+ if (connTimeout != -1) {
+ TcpCommunicationSpi tcpCommSpi = (TcpCommunicationSpi)cfg.getCommunicationSpi();
+
+ tcpCommSpi.setConnectTimeout(connTimeout);
+ tcpCommSpi.setMaxConnectTimeout(maxxConnTimeout);
+ tcpCommSpi.setReconnectCount(reconCnt);
+ }
}
- else
+ else {
disco.setIpFinder(sharedStaticIpFinder);
+ cfg.setFailureDetectionTimeout(60_000);
+ }
+
disco.setJoinTimeout(2 * 60_000);
disco.setSocketTimeout(1000);
disco.setNetworkTimeout(6_000);
@@ -94,6 +127,54 @@ public class IgniteClientConnectTest extends GridCommonAbstractTest {
*/
@Test
public void testClientConnectToBigTopology() throws Exception {
+ failureDetectionTimeout = -1;
+ connTimeout = -1;
+
+ testClientConnectToBigTopology0();
+ }
+
+ /**
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testFailureDetectionTimeoutReached() throws Exception {
+ failureDetectionTimeout = 1000;
+ connTimeout = -1;
+
+ try {
+ testClientConnectToBigTopology0();
+ }
+ catch (CacheException e) {
+ assertTrue(e.getCause().getMessage().contains("Failed to send message"));
+ }
+ }
+
+ /**
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testCustomTimeoutReached() throws Exception {
+ failureDetectionTimeout = 1000;
+
+ connTimeout = 1000;
+ maxxConnTimeout = 3000;
+ reconCnt = 3;
+
+ try {
+ testClientConnectToBigTopology0();
+ }
+ catch (CacheException e) {
+ assertTrue(e.getCause().getMessage().contains("Failed to send message"));
+ }
+ }
+
+ /**
+ *
+ * @throws Exception In case of error.
+ */
+ public void testClientConnectToBigTopology0() throws Exception {
Ignite ignite = startGrids(3);
IgniteCache<Object, Object> cache = ignite.cache(DEFAULT_CACHE_NAME);
@@ -140,9 +221,9 @@ public class IgniteClientConnectTest extends GridCommonAbstractTest {
try {
latch.await();
- Thread.sleep(5_000);
+ Thread.sleep(nodeAddFinishedDelay);
} catch (InterruptedException e) {
- e.printStackTrace();
+ fail("Unexpected interrupt on nodeAddFinishedDelay");
}
super.writeToSocket(sock, out, msg, timeout);
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
index db196bc..39dc7f1 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
@@ -42,6 +42,7 @@ import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.events.Event;
+import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteKernal;
@@ -63,6 +64,7 @@ import org.apache.ignite.spi.IgniteSpiException;
import org.apache.ignite.spi.IgniteSpiOperationTimeoutException;
import org.apache.ignite.spi.IgniteSpiOperationTimeoutHelper;
import org.apache.ignite.spi.IgniteSpiThread;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
@@ -163,6 +165,14 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
cfg.setClientFailureDetectionTimeout(clientFailureDetectionTimeout());
+ // Override default settings to speed up reconnection.
+ cfg.setCommunicationSpi(
+ new TcpCommunicationSpi()
+ .setConnectTimeout(500)
+ .setMaxConnectTimeout(1000)
+ .setReconnectCount(2)
+ );
+
TcpDiscoverySpi disco = getDiscoverySpi();
if (igniteInstanceName.startsWith("server"))
@@ -550,10 +560,19 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
((TestTcpDiscoverySpi)client.configuration().getDiscoverySpi()).resumeAll();
- Thread.sleep(2000);
+ GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ try {
+ boolean ping1 = ((IgniteEx) srv1).context().discovery().pingNode(client.cluster().localNode().id());
- assert ((IgniteEx)srv1).context().discovery().pingNode(client.cluster().localNode().id());
- assert ((IgniteEx)srv0).context().discovery().pingNode(client.cluster().localNode().id());
+ boolean ping2 = ((IgniteEx) srv0).context().discovery().pingNode(client.cluster().localNode().id());
+
+ return ping1 && ping2;
+ } catch (IgniteClientDisconnectedException | IgniteClientDisconnectedCheckedException e) {
+ return false;
+ }
+ }
+ }, 5_000);
}
/**
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java
index 64a3590..a3ff567 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java
@@ -39,7 +39,9 @@ import org.apache.ignite.spi.communication.tcp.IgniteTcpCommunicationHandshakeWa
import org.apache.ignite.spi.communication.tcp.IgniteTcpCommunicationHandshakeWaitTest;
import org.apache.ignite.spi.communication.tcp.IgniteTcpCommunicationRecoveryAckClosureSelfTest;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpiDropNodesTest;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpiFaultyClientSslTest;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpiFaultyClientTest;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpiFreezingClientTest;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpiHalfOpenedConnectionTest;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpiSkipMessageSendTest;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationStatisticsTest;
@@ -80,6 +82,10 @@ import org.junit.runners.Suite;
TcpCommunicationSpiSkipMessageSendTest.class,
TcpCommunicationSpiFaultyClientTest.class,
+ TcpCommunicationSpiFaultyClientSslTest.class,
+
+ TcpCommunicationSpiFreezingClientTest.class,
+
TcpCommunicationSpiDropNodesTest.class,
TcpCommunicationSpiHalfOpenedConnectionTest.class,
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
index d37baa6..e3ffc79 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
@@ -18,6 +18,7 @@
package org.apache.ignite.testsuites;
import org.apache.ignite.internal.IgniteDiscoveryMassiveNodeFailTest;
+import org.apache.ignite.spi.ExponentialBackoffTimeoutStrategyTest;
import org.apache.ignite.spi.GridTcpSpiForwardingSelfTest;
import org.apache.ignite.spi.discovery.AuthenticationRestartTest;
import org.apache.ignite.spi.discovery.FilterDataForClientNodeDiscoveryTest;
@@ -95,6 +96,8 @@ import static org.apache.ignite.IgniteSystemProperties.IGNITE_OVERRIDE_MCAST_GRP
GridTcpSpiForwardingSelfTest.class,
+ ExponentialBackoffTimeoutStrategyTest.class,
+
TcpClientDiscoverySpiSelfTest.class,
LongClientConnectToClusterTest.class,
TcpClientDiscoveryMarshallerCheckSelfTest.class,
diff --git a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryClientDisconnectTest.java b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryClientDisconnectTest.java
index 3b14413..9a52e56 100644
--- a/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryClientDisconnectTest.java
+++ b/modules/zookeeper/src/test/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryClientDisconnectTest.java
@@ -33,6 +33,7 @@ import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.events.Event;
import org.apache.ignite.events.EventType;
import org.apache.ignite.internal.IgniteEx;
@@ -60,6 +61,16 @@ import static org.apache.ignite.events.EventType.EVT_CLIENT_NODE_RECONNECTED;
* Tests for Zookeeper SPI discovery.
*/
public class ZookeeperDiscoveryClientDisconnectTest extends ZookeeperDiscoverySpiTestBase {
+ /** */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ // we reduce fealure detection tp speedup failure detection on catch(Exception) clause in createTcpClient().
+ cfg.setFailureDetectionTimeout(2000);
+
+ return cfg;
+ }
+
/**
* Test reproduces failure in case of client resolution failure
* {@link org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi#createTcpClient} from server side, further