You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2015/08/31 23:24:28 UTC
[05/50] [abbrv] ignite git commit: master: back merge from ignite-752
http://git-wip-us.apache.org/repos/asf/ignite/blob/cff25e91/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index b7d6e3f..6130bd7 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -65,12 +65,20 @@ import java.util.concurrent.atomic.*;
* and then this info goes to coordinator. When coordinator processes join request
* and issues node added messages and all other nodes then receive info about new node.
* <h1 class="header">Failure Detection</h1>
- * Configuration defaults (see Configuration section below for details)
- * are chosen to make possible for discovery SPI work reliably on
- * most of hardware and virtual deployments, but this has made failure detection time worse.
+ * Configuration defaults (see Configuration section below and
+ * {@link IgniteConfiguration#getFailureDetectionTimeout()}) for details) are chosen to make possible for discovery
+ * SPI work reliably on most of hardware and virtual deployments, but this has made failure detection time worse.
* <p>
- * For stable low-latency networks the following more aggressive settings are recommended
- * (which allows failure detection time ~200ms):
+ * If it's needed to tune failure detection then it's highly recommended to do this using
+ * {@link IgniteConfiguration#setFailureDetectionTimeout(long)}. This failure timeout automatically controls the
+ * following parameters: {@link #getSocketTimeout()}, {@link #getAckTimeout()}, {@link #getMaxAckTimeout()},
+ * {@link #getReconnectCount()}. If any of those parameters is set explicitly, then the failure timeout setting will be
+ * ignored.
+ * <p>
+ * If it's required to perform advanced settings of failure detection and
+ * {@link IgniteConfiguration#getFailureDetectionTimeout()} is unsuitable then various {@code TcpDiscoverySpi}
+ * configuration parameters may be used. As an example, for stable low-latency networks the following more aggressive
+ * settings are recommended (which allows failure detection time ~200ms):
* <ul>
* <li>Heartbeat frequency (see {@link #setHeartbeatFrequency(long)}) - 100ms</li>
* <li>Socket timeout (see {@link #setSocketTimeout(long)}) - 200ms</li>
@@ -157,6 +165,15 @@ import java.util.concurrent.atomic.*;
@DiscoverySpiOrderSupport(true)
@DiscoverySpiHistorySupport(true)
public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, TcpDiscoverySpiMBean {
+ /** Failure detection timeout feature major version. */
+ final static byte FAILURE_DETECTION_MAJOR_VER = 1;
+
+ /** Failure detection timeout feature minor version. */
+ final static byte FAILURE_DETECTION_MINOR_VER = 4;
+
+ /** Failure detection timeout feature maintainance version. */
+ final static byte FAILURE_DETECTION_MAINT_VER = 1;
+
/** Node attribute that is mapped to node's external addresses (value is <tt>disc.tcp.ext-addrs</tt>). */
public static final String ATTR_EXT_ADDRS = "disc.tcp.ext-addrs";
@@ -221,10 +238,10 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
protected TcpDiscoveryIpFinder ipFinder;
/** Socket operations timeout. */
- protected long sockTimeout; // Must be initialized in the constructor of child class.
+ private long sockTimeout; // Must be initialized in the constructor of child class.
/** Message acknowledgement timeout. */
- protected long ackTimeout; // Must be initialized in the constructor of child class.
+ private long ackTimeout; // Must be initialized in the constructor of child class.
/** Network timeout. */
protected long netTimeout = DFLT_NETWORK_TIMEOUT;
@@ -286,14 +303,14 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
/** Reconnect attempts count. */
@SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"})
- protected int reconCnt = DFLT_RECONNECT_CNT;
+ private int reconCnt = DFLT_RECONNECT_CNT;
/** Statistics print frequency. */
@SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized", "RedundantFieldInitialization"})
protected long statsPrintFreq = DFLT_STATS_PRINT_FREQ;
/** Maximum message acknowledgement timeout. */
- protected long maxAckTimeout = DFLT_MAX_ACK_TIMEOUT;
+ private long maxAckTimeout = DFLT_MAX_ACK_TIMEOUT;
/** Max heartbeats count node can miss without initiating status check. */
protected int maxMissedHbs = DFLT_MAX_MISSED_HEARTBEATS;
@@ -521,6 +538,8 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
* on every retry.
* <p>
* If not specified, default is {@link #DFLT_RECONNECT_CNT}.
+ * <p>
+ * When this property is explicitly set {@link IgniteConfiguration#getFailureDetectionTimeout()} is ignored.
*
* @param reconCnt Number of retries during message sending.
* @see #setAckTimeout(long)
@@ -529,6 +548,8 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
public TcpDiscoverySpi setReconnectCount(int reconCnt) {
this.reconCnt = reconCnt;
+ failureDetectionTimeoutEnabled(false);
+
return this;
}
@@ -548,6 +569,8 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
* If not specified, default is {@link #DFLT_MAX_ACK_TIMEOUT}.
* <p>
* Affected server nodes only.
+ * <p>
+ * When this property is explicitly set {@link IgniteConfiguration#getFailureDetectionTimeout()} is ignored.
*
* @param maxAckTimeout Maximum acknowledgement timeout.
*/
@@ -555,6 +578,8 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
public TcpDiscoverySpi setMaxAckTimeout(long maxAckTimeout) {
this.maxAckTimeout = maxAckTimeout;
+ failureDetectionTimeoutEnabled(false);
+
return this;
}
@@ -701,7 +726,8 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
/**
* Sets IP finder for IP addresses sharing and storing.
* <p>
- * If not provided {@link org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder} will be used by default.
+ * If not provided {@link org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder} will
+ * be used by default.
*
* @param ipFinder IP finder.
*/
@@ -720,6 +746,8 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
* significantly greater than the default (e.g. to {@code 30000}).
* <p>
* If not specified, default is {@link #DFLT_SOCK_TIMEOUT} or {@link #DFLT_SOCK_TIMEOUT_CLIENT}.
+ * <p>
+ * When this property is explicitly set {@link IgniteConfiguration#getFailureDetectionTimeout()} is ignored.
*
* @param sockTimeout Socket connection timeout.
*/
@@ -727,6 +755,8 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
public TcpDiscoverySpi setSocketTimeout(long sockTimeout) {
this.sockTimeout = sockTimeout;
+ failureDetectionTimeoutEnabled(false);
+
return this;
}
@@ -737,6 +767,8 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
* and SPI tries to repeat message sending.
* <p>
* If not specified, default is {@link #DFLT_ACK_TIMEOUT} or {@link #DFLT_ACK_TIMEOUT_CLIENT}.
+ * <p>
+ * When this property is explicitly set {@link IgniteConfiguration#getFailureDetectionTimeout()} is ignored.
*
* @param ackTimeout Acknowledgement timeout.
*/
@@ -744,6 +776,8 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
public TcpDiscoverySpi setAckTimeout(long ackTimeout) {
this.ackTimeout = ackTimeout;
+ failureDetectionTimeoutEnabled(false);
+
return this;
}
@@ -1123,10 +1157,12 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
/**
* @param sockAddr Remote address.
+ * @param timeoutHelper Timeout helper.
* @return Opened socket.
* @throws IOException If failed.
*/
- protected Socket openSocket(InetSocketAddress sockAddr) throws IOException {
+ protected Socket openSocket(InetSocketAddress sockAddr, IgniteSpiOperationTimeoutHelper timeoutHelper)
+ throws IOException, IgniteSpiOperationTimeoutException {
assert sockAddr != null;
InetSocketAddress resolved = sockAddr.isUnresolved() ?
@@ -1142,9 +1178,9 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
sock.setTcpNoDelay(true);
- sock.connect(resolved, (int)sockTimeout);
+ sock.connect(resolved, (int)timeoutHelper.nextTimeoutChunk(sockTimeout));
- writeToSocket(sock, U.IGNITE_HEADER);
+ writeToSocket(sock, U.IGNITE_HEADER, timeoutHelper.nextTimeoutChunk(sockTimeout));
return sock;
}
@@ -1154,14 +1190,15 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
*
* @param sock Socket.
* @param data Raw data to write.
+ * @param timeout Socket write timeout.
* @throws IOException If IO failed or write timed out.
*/
@SuppressWarnings("ThrowFromFinallyBlock")
- protected void writeToSocket(Socket sock, byte[] data) throws IOException {
+ private void writeToSocket(Socket sock, byte[] data, long timeout) throws IOException {
assert sock != null;
assert data != null;
- SocketTimeoutObject obj = new SocketTimeoutObject(sock, U.currentTimeMillis() + sockTimeout);
+ SocketTimeoutObject obj = new SocketTimeoutObject(sock, U.currentTimeMillis() + timeout);
addTimeoutObject(obj);
@@ -1197,11 +1234,13 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
*
* @param sock Socket.
* @param msg Message.
+ * @param timeout Socket write timeout.
* @throws IOException If IO failed or write timed out.
* @throws IgniteCheckedException If marshalling failed.
*/
- protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg) throws IOException, IgniteCheckedException {
- writeToSocket(sock, msg, new GridByteArrayOutputStream(8 * 1024)); // 8K.
+ protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, long timeout)
+ throws IOException, IgniteCheckedException {
+ writeToSocket(sock, msg, new GridByteArrayOutputStream(8 * 1024), timeout); // 8K.
}
/**
@@ -1214,8 +1253,8 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
* @throws IgniteCheckedException If marshalling failed.
*/
@SuppressWarnings("ThrowFromFinallyBlock")
- protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, GridByteArrayOutputStream bout)
- throws IOException, IgniteCheckedException {
+ protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, GridByteArrayOutputStream bout,
+ long timeout) throws IOException, IgniteCheckedException {
assert sock != null;
assert msg != null;
assert bout != null;
@@ -1223,7 +1262,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
// Marshall message first to perform only write after.
marsh.marshal(msg, bout);
- SocketTimeoutObject obj = new SocketTimeoutObject(sock, U.currentTimeMillis() + sockTimeout);
+ SocketTimeoutObject obj = new SocketTimeoutObject(sock, U.currentTimeMillis() + timeout);
addTimeoutObject(obj);
@@ -1260,13 +1299,15 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
* @param msg Received message.
* @param sock Socket.
* @param res Integer response.
+ * @param timeout Socket timeout.
* @throws IOException If IO failed or write timed out.
*/
@SuppressWarnings("ThrowFromFinallyBlock")
- protected void writeToSocket(TcpDiscoveryAbstractMessage msg, Socket sock, int res) throws IOException {
+ protected void writeToSocket(TcpDiscoveryAbstractMessage msg, Socket sock, int res, long timeout)
+ throws IOException {
assert sock != null;
- SocketTimeoutObject obj = new SocketTimeoutObject(sock, U.currentTimeMillis() + sockTimeout);
+ SocketTimeoutObject obj = new SocketTimeoutObject(sock, U.currentTimeMillis() + timeout);
addTimeoutObject(obj);
@@ -1307,7 +1348,8 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
* @throws IOException If IO failed or read timed out.
* @throws IgniteCheckedException If unmarshalling failed.
*/
- protected <T> T readMessage(Socket sock, @Nullable InputStream in, long timeout) throws IOException, IgniteCheckedException {
+ protected <T> T readMessage(Socket sock, @Nullable InputStream in, long timeout) throws IOException,
+ IgniteCheckedException {
assert sock != null;
int oldTimeout = sock.getSoTimeout();
@@ -1315,7 +1357,11 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
try {
sock.setSoTimeout((int)timeout);
- return marsh.unmarshal(in == null ? sock.getInputStream() : in, U.gridClassLoader());
+ T res = marsh.unmarshal(in == null ? sock.getInputStream() : in, U.gridClassLoader());
+
+ impl.onDataReceived();
+
+ return res;
}
catch (IOException | IgniteCheckedException e) {
if (X.hasCause(e, SocketTimeoutException.class))
@@ -1356,6 +1402,8 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
if (res == -1)
throw new EOFException();
+ impl.onDataReceived();
+
return res;
}
catch (SocketTimeoutException e) {
@@ -1570,6 +1618,8 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
/** {@inheritDoc} */
@Override public void spiStart(@Nullable String gridName) throws IgniteSpiException {
+ initFailureDetectionTimeout();
+
if (!forceSrvMode && (Boolean.TRUE.equals(ignite.configuration().isClientMode()))) {
if (ackTimeout == 0)
ackTimeout = DFLT_ACK_TIMEOUT_CLIENT;
@@ -1591,18 +1641,21 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
impl = new ServerImpl(this);
}
+ if (!failureDetectionTimeoutEnabled()) {
+ assertParameter(sockTimeout > 0, "sockTimeout > 0");
+ assertParameter(ackTimeout > 0, "ackTimeout > 0");
+ assertParameter(maxAckTimeout > ackTimeout, "maxAckTimeout > ackTimeout");
+ assertParameter(reconCnt > 0, "reconnectCnt > 0");
+ }
+
+ assertParameter(netTimeout > 0, "networkTimeout > 0");
assertParameter(ipFinder != null, "ipFinder != null");
assertParameter(hbFreq > 0, "heartbeatFreq > 0");
- assertParameter(netTimeout > 0, "networkTimeout > 0");
- assertParameter(sockTimeout > 0, "sockTimeout > 0");
- assertParameter(ackTimeout > 0, "ackTimeout > 0");
assertParameter(ipFinderCleanFreq > 0, "ipFinderCleanFreq > 0");
assertParameter(locPort > 1023, "localPort > 1023");
assertParameter(locPortRange >= 0, "localPortRange >= 0");
assertParameter(locPort + locPortRange <= 0xffff, "locPort + locPortRange <= 0xffff");
- assertParameter(maxAckTimeout > ackTimeout, "maxAckTimeout > ackTimeout");
- assertParameter(reconCnt > 0, "reconnectCnt > 0");
assertParameter(maxMissedHbs > 0, "maxMissedHeartbeats > 0");
assertParameter(maxMissedClientHbs > 0, "maxMissedClientHeartbeats > 0");
assertParameter(threadPri > 0, "threadPri > 0");
@@ -1620,11 +1673,20 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
log.debug(configInfo("localPort", locPort));
log.debug(configInfo("localPortRange", locPortRange));
log.debug(configInfo("threadPri", threadPri));
- log.debug(configInfo("networkTimeout", netTimeout));
- log.debug(configInfo("sockTimeout", sockTimeout));
- log.debug(configInfo("ackTimeout", ackTimeout));
- log.debug(configInfo("maxAckTimeout", maxAckTimeout));
- log.debug(configInfo("reconnectCount", reconCnt));
+
+ if (!failureDetectionTimeoutEnabled()) {
+ log.debug("Failure detection timeout is ignored because at least one of the parameters from this list" +
+ " has been set explicitly: 'sockTimeout', 'ackTimeout', 'maxAckTimeout', 'reconnectCount'.");
+
+ log.debug(configInfo("networkTimeout", netTimeout));
+ log.debug(configInfo("sockTimeout", sockTimeout));
+ log.debug(configInfo("ackTimeout", ackTimeout));
+ log.debug(configInfo("maxAckTimeout", maxAckTimeout));
+ log.debug(configInfo("reconnectCount", reconCnt));
+ }
+ else
+ log.debug(configInfo("failureDetectionTimeout", failureDetectionTimeout()));
+
log.debug(configInfo("ipFinder", ipFinder));
log.debug(configInfo("ipFinderCleanFreq", ipFinderCleanFreq));
log.debug(configInfo("heartbeatFreq", hbFreq));
@@ -1837,7 +1899,10 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
U.closeQuiet(sock);
LT.warn(log, null, "Socket write has timed out (consider increasing " +
- "'sockTimeout' configuration property) [sockTimeout=" + sockTimeout + ']');
+ (failureDetectionTimeoutEnabled() ?
+ "'IgniteConfiguration.failureDetectionTimeout' configuration property) [" +
+ "failureDetectionTimeout=" + failureDetectionTimeout() + ']' :
+ "'sockTimeout' configuration property) [sockTimeout=" + sockTimeout + ']'));
stats.onSocketTimeout();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/cff25e91/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
index 142dbea..44e9006 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryNode.java
@@ -89,6 +89,9 @@ public class TcpDiscoveryNode extends GridMetadataAwareAdapter implements Cluste
@GridToStringExclude
private volatile long lastUpdateTime = U.currentTimeMillis();
+ /** The most recent time when a data chunk was received from a node. */
+ private volatile long lastDataReceivedTime = U.currentTimeMillis();
+
/** Metrics provider (transient). */
@GridToStringExclude
private DiscoveryMetricsProvider metricsProvider;
@@ -390,6 +393,24 @@ public class TcpDiscoveryNode extends GridMetadataAwareAdapter implements Cluste
}
/**
+ * Gets the last time a node received a data chunk from a remote node.
+ *
+ * @return Time in milliseconds.
+ */
+ public long lastDataReceivedTime() {
+ return lastDataReceivedTime;
+ }
+
+ /**
+ * Sets the last time a node receive a data chunk from a remote node in a topology.
+ *
+ * @param lastDataReceivedTime Time in milliseconds.
+ */
+ public void lastDataReceivedTime(long lastDataReceivedTime) {
+ this.lastDataReceivedTime = lastDataReceivedTime;
+ }
+
+ /**
* Gets visible flag.
*
* @return {@code true} if node is in visible state.
http://git-wip-us.apache.org/repos/asf/ignite/blob/cff25e91/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryConnectionCheckMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryConnectionCheckMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryConnectionCheckMessage.java
new file mode 100644
index 0000000..c7e99c8
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryConnectionCheckMessage.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.tcp.messages;
+
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.spi.discovery.tcp.internal.*;
+
+import java.io.*;
+
+/**
+ * Message used to check whether a node is still connected to the topology.
+ * The difference from {@link TcpDiscoveryStatusCheckMessage} is that this message is sent to the next node
+ * which directly replies to the sender without message re-translation to the coordinator.
+ */
+public class TcpDiscoveryConnectionCheckMessage extends TcpDiscoveryAbstractMessage implements Externalizable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /**
+ * Default no-arg constructor for {@link Externalizable} interface.
+ */
+ public TcpDiscoveryConnectionCheckMessage() {
+ // No-op.
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param creatorNode Node created this message.
+ */
+ public TcpDiscoveryConnectionCheckMessage(TcpDiscoveryNode creatorNode) {
+ super(creatorNode.id());
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ // This method has been left empty intentionally to keep message size at min.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ // This method has been left empty intentionally to keep message size at min.
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(TcpDiscoveryConnectionCheckMessage.class, this, "super", super.toString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/cff25e91/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java
index fbaea11..7247d54 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectAbstractTest.java
@@ -281,7 +281,7 @@ public abstract class IgniteClientReconnectAbstractTest extends GridCommonAbstra
volatile CountDownLatch writeLatch;
/** {@inheritDoc} */
- @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg)
+ @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, long timeout)
throws IOException, IgniteCheckedException {
if (msg instanceof TcpDiscoveryJoinRequestMessage) {
CountDownLatch writeLatch0 = writeLatch;
@@ -293,7 +293,7 @@ public abstract class IgniteClientReconnectAbstractTest extends GridCommonAbstra
}
}
- super.writeToSocket(sock, msg);
+ super.writeToSocket(sock, msg, timeout);
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/cff25e91/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java
index eee38a5..538ead5 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java
@@ -79,7 +79,7 @@ abstract class GridTcpCommunicationSpiAbstractTest extends GridAbstractCommunica
for (CommunicationSpi spi : spis.values()) {
ConcurrentMap<UUID, GridCommunicationClient> clients = U.field(spi, "clients");
- assertEquals(2, clients.size());
+ assertEquals(getSpiCount() - 1, clients.size());
clients.put(UUID.randomUUID(), F.first(clients.values()));
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/cff25e91/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java
index 1a4ba22..b4090d0 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java
@@ -166,7 +166,8 @@ public class GridTcpCommunicationSpiRecoveryAckSelfTest<T extends CommunicationS
@Override public boolean apply() {
return recoveryDesc.messagesFutures().isEmpty();
}
- }, 10_000);
+ }, spi.failureDetectionTimeoutEnabled() ? spi.failureDetectionTimeout() + 7000 :
+ 10_000);
assertEquals("Unexpected messages: " + recoveryDesc.messagesFutures(), 0,
recoveryDesc.messagesFutures().size());
http://git-wip-us.apache.org/repos/asf/ignite/blob/cff25e91/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryFailureDetectionSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryFailureDetectionSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryFailureDetectionSelfTest.java
new file mode 100644
index 0000000..a6bfe00
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryFailureDetectionSelfTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.communication.tcp;
+
+import org.apache.ignite.configuration.*;
+
+/**
+ *
+ */
+public class GridTcpCommunicationSpiRecoveryFailureDetectionSelfTest extends GridTcpCommunicationSpiRecoverySelfTest {
+ /** {@inheritDoc} */
+ @Override protected TcpCommunicationSpi getSpi(int idx) {
+ TcpCommunicationSpi spi = new TcpCommunicationSpi();
+
+ spi.setSharedMemoryPort(-1);
+ spi.setLocalPort(port++);
+ spi.setIdleConnectionTimeout(10_000);
+ spi.setAckSendThreshold(5);
+ spi.setSocketSendBuffer(512);
+ spi.setSocketReceiveBuffer(512);
+
+ return spi;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected long awaitForSocketWriteTimeout() {
+ return IgniteConfiguration.DFLT_FAILURE_DETECTION_TIMEOUT + 5_000;
+ }
+
+ /**
+ * @throws Exception if failed.
+ */
+ public void testFailureDetectionEnabled() throws Exception {
+ for (TcpCommunicationSpi spi: spis) {
+ assertTrue(spi.failureDetectionTimeoutEnabled());
+ assertTrue(spi.failureDetectionTimeout() == IgniteConfiguration.DFLT_FAILURE_DETECTION_TIMEOUT);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/cff25e91/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java
index 5d3afd9..67d42d3 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java
@@ -60,7 +60,7 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi>
private static final int ITERS = 10;
/** */
- private static int port = 30_000;
+ protected static int port = 30_000;
/**
*
@@ -163,6 +163,15 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi>
}
/**
+ * Time to wait for socket write timeout.
+ *
+ * @return Timeout.
+ */
+ protected long awaitForSocketWriteTimeout() {
+ return 5000;
+ }
+
+ /**
* @throws Exception If failed.
*/
public void testBlockListener() throws Exception {
@@ -245,7 +254,7 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi>
@Override public boolean apply() {
return lsnr0.rcvCnt.get() >= expMsgs && lsnr1.rcvCnt.get() >= expMsgs;
}
- }, 5000);
+ }, awaitForSocketWriteTimeout());
assertEquals(expMsgs, lsnr0.rcvCnt.get());
assertEquals(expMsgs, lsnr1.rcvCnt.get());
@@ -301,7 +310,7 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi>
@Override public boolean apply() {
return ses0.closeTime() != 0;
}
- }, 5000);
+ }, awaitForSocketWriteTimeout());
assertTrue("Failed to wait for session close", ses0.closeTime() != 0);
@@ -411,7 +420,7 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi>
@Override public boolean apply() {
return ses0.closeTime() != 0;
}
- }, 5000);
+ }, awaitForSocketWriteTimeout());
assertTrue("Failed to wait for session close", ses0.closeTime() != 0);
@@ -423,7 +432,7 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi>
public boolean apply() {
return ses1.closeTime() != 0;
}
- }, 5000);
+ }, awaitForSocketWriteTimeout());
assertTrue("Failed to wait for session close", ses1.closeTime() != 0);
@@ -528,7 +537,7 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi>
@Override public boolean apply() {
return ses0.closeTime() != 0;
}
- }, 5000);
+ }, awaitForSocketWriteTimeout());
assertTrue("Failed to wait for session close", ses0.closeTime() != 0);
@@ -592,7 +601,7 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi>
return !sessions.isEmpty();
}
- }, 5000);
+ }, awaitForSocketWriteTimeout());
Collection<? extends GridNioSession> sessions = GridTestUtils.getFieldValue(srv, "sessions");
http://git-wip-us.apache.org/repos/asf/ignite/blob/cff25e91/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiTcpFailureDetectionSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiTcpFailureDetectionSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiTcpFailureDetectionSelfTest.java
new file mode 100644
index 0000000..56873d1
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiTcpFailureDetectionSelfTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.communication.tcp;
+
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.spi.communication.*;
+
+/**
+ *
+ */
+public class GridTcpCommunicationSpiTcpFailureDetectionSelfTest extends GridTcpCommunicationSpiTcpSelfTest {
+ /** */
+ private final static int SPI_COUNT = 4;
+
+ private TcpCommunicationSpi spis[] = new TcpCommunicationSpi[SPI_COUNT];
+
+ /** {@inheritDoc} */
+ @Override protected int getSpiCount() {
+ return SPI_COUNT;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected CommunicationSpi getSpi(int idx) {
+ TcpCommunicationSpi spi = (TcpCommunicationSpi)super.getSpi(idx);
+
+ switch (idx) {
+ case 0:
+ // Ignore
+ break;
+ case 1:
+ spi.setConnectTimeout(4000);
+ break;
+ case 2:
+ spi.setMaxConnectTimeout(TcpCommunicationSpi.DFLT_MAX_CONN_TIMEOUT);
+ break;
+ case 3:
+ spi.setReconnectCount(2);
+ break;
+ default:
+ assert false;
+ }
+
+ spis[idx] = spi;
+
+ return spi;
+ }
+
+ /**
+ * @throws Exception if failed.
+ */
+ public void testFailureDetectionEnabled() throws Exception {
+ assertTrue(spis[0].failureDetectionTimeoutEnabled());
+ assertTrue(spis[0].failureDetectionTimeout() == IgniteConfiguration.DFLT_FAILURE_DETECTION_TIMEOUT);
+
+ for (int i = 1; i < SPI_COUNT; i++) {
+ assertFalse(spis[i].failureDetectionTimeoutEnabled());
+ assertEquals(0, spis[i].failureDetectionTimeout());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/cff25e91/modules/core/src/test/java/org/apache/ignite/spi/discovery/AbstractDiscoverySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/AbstractDiscoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/AbstractDiscoverySelfTest.java
index 61bb944..892d87d 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/AbstractDiscoverySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/AbstractDiscoverySelfTest.java
@@ -43,12 +43,18 @@ import static org.apache.ignite.lang.IgniteProductVersion.*;
@SuppressWarnings({"JUnitAbstractTestClassNamingConvention"})
public abstract class AbstractDiscoverySelfTest<T extends IgniteSpi> extends GridSpiAbstractTest<T> {
/** */
- private static final List<DiscoverySpi> spis = new ArrayList<>();
+ private static final String HTTP_ADAPTOR_MBEAN_NAME = "mbeanAdaptor:protocol=HTTP";
+
+ /** */
+ protected static final List<DiscoverySpi> spis = new ArrayList<>();
/** */
private static final Collection<IgniteTestResources> spiRsrcs = new ArrayList<>();
/** */
+ private static final List<HttpAdaptor> httpAdaptors = new ArrayList<>();
+
+ /** */
private static long spiStartTime;
/** */
@@ -424,10 +430,12 @@ public abstract class AbstractDiscoverySelfTest<T extends IgniteSpi> extends Gri
adaptor.setPort(Integer.valueOf(GridTestProperties.getProperty("discovery.mbeanserver.selftest.baseport")) +
idx);
- srv.registerMBean(adaptor, new ObjectName("mbeanAdaptor:protocol=HTTP"));
+ srv.registerMBean(adaptor, new ObjectName(HTTP_ADAPTOR_MBEAN_NAME));
adaptor.start();
+ httpAdaptors.add(adaptor);
+
return srv;
}
@@ -442,12 +450,21 @@ public abstract class AbstractDiscoverySelfTest<T extends IgniteSpi> extends Gri
spi.spiStop();
}
- for (IgniteTestResources rscrs : spiRsrcs)
+ for (IgniteTestResources rscrs : spiRsrcs) {
+ MBeanServer mBeanServer = rscrs.getMBeanServer();
+
+ mBeanServer.unregisterMBean(new ObjectName(HTTP_ADAPTOR_MBEAN_NAME));
+
rscrs.stopThreads();
+ }
+
+ for (HttpAdaptor adaptor : httpAdaptors)
+ adaptor.stop();
// Clear.
spis.clear();
spiRsrcs.clear();
+ httpAdaptors.clear();
spiStartTime = 0;
http://git-wip-us.apache.org/repos/asf/ignite/blob/cff25e91/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java
new file mode 100644
index 0000000..3cf44f2
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.tcp;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.spi.discovery.tcp.messages.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.net.*;
+
+/**
+ * Client-based discovery SPI test with failure detection timeout enabled.
+ */
+public class TcpClientDiscoverySpiFailureTimeoutSelfTest extends TcpClientDiscoverySpiSelfTest {
+ /** */
+ private final static int FAILURE_AWAIT_TIME = 7_000;
+
+ /** */
+ private final static long FAILURE_THRESHOLD = 10_000;
+
+ /** */
+ private static long failureThreshold = FAILURE_THRESHOLD;
+
+ /** */
+ private static boolean useTestSpi;
+
+ /** {@inheritDoc} */
+ @Override protected boolean useFailureDetectionTimeout() {
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected long failureDetectionTimeout() {
+ return failureThreshold;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected long awaitTime() {
+ return failureDetectionTimeout() + FAILURE_AWAIT_TIME;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected TcpDiscoverySpi getDiscoverySpi() {
+ return useTestSpi ? new TestTcpDiscoverySpi() : super.getDiscoverySpi();
+ }
+
+ /**
+ * @throws Exception in case of error.
+ */
+ public void testFailureDetectionTimeoutEnabled() throws Exception {
+ startServerNodes(1);
+ startClientNodes(1);
+
+ checkNodes(1, 1);
+
+ assertTrue(((TcpDiscoverySpi)(G.ignite("server-0").configuration().getDiscoverySpi())).
+ failureDetectionTimeoutEnabled());
+ assertEquals(failureDetectionTimeout(),
+ ((TcpDiscoverySpi)(G.ignite("server-0").configuration().getDiscoverySpi())).failureDetectionTimeout());
+
+ assertTrue(((TcpDiscoverySpi)(G.ignite("client-0").configuration().getDiscoverySpi())).
+ failureDetectionTimeoutEnabled());
+ assertEquals(failureDetectionTimeout(),
+ ((TcpDiscoverySpi)(G.ignite("client-0").configuration().getDiscoverySpi())).failureDetectionTimeout());
+ }
+
+ /**
+ * @throws Exception in case of error.
+ */
+ public void testFailureTimeoutWorkabilityAvgTimeout() throws Exception {
+ failureThreshold = 3000;
+
+ try {
+ checkFailureThresholdWorkability();
+ }
+ finally {
+ failureThreshold = FAILURE_THRESHOLD;
+ }
+ }
+
+ /**
+ * @throws Exception in case of error.
+ */
+ public void testFailureTimeoutWorkabilitySmallTimeout() throws Exception {
+ failureThreshold = 500;
+
+ try {
+ checkFailureThresholdWorkability();
+ }
+ finally {
+ failureThreshold = FAILURE_THRESHOLD;
+ }
+ }
+
+ /**
+ * @throws Exception in case of error.
+ */
+ private void checkFailureThresholdWorkability() throws Exception {
+ useTestSpi = true;
+
+ TestTcpDiscoverySpi firstSpi = null;
+ TestTcpDiscoverySpi secondSpi = null;
+
+ try {
+ startServerNodes(2);
+
+ checkNodes(2, 0);
+
+ firstSpi = (TestTcpDiscoverySpi)(G.ignite("server-0").configuration().getDiscoverySpi());
+ secondSpi = (TestTcpDiscoverySpi)(G.ignite("server-1").configuration().getDiscoverySpi());
+
+ assert firstSpi.err == null;
+
+ secondSpi.readDelay = failureDetectionTimeout() + 5000;
+
+ assertFalse(firstSpi.pingNode(secondSpi.getLocalNodeId()));
+
+ Thread.sleep(failureDetectionTimeout());
+
+ assertTrue(firstSpi.err != null && X.hasCause(firstSpi.err, SocketTimeoutException.class));
+
+ firstSpi.reset();
+ secondSpi.reset();
+
+ assertTrue(firstSpi.pingNode(secondSpi.getLocalNodeId()));
+
+ assertTrue(firstSpi.err == null);
+ }
+ finally {
+ useTestSpi = false;
+
+ if (firstSpi != null)
+ firstSpi.reset();
+
+ if (secondSpi != null)
+ secondSpi.reset();
+ }
+ }
+
+ /**
+ *
+ */
+ private static class TestTcpDiscoverySpi extends TcpDiscoverySpi {
+ /** */
+ private long readDelay;
+
+ /** */
+ private Exception err;
+
+ /** {@inheritDoc} */
+ @Override protected <T> T readMessage(Socket sock, @Nullable InputStream in, long timeout)
+ throws IOException, IgniteCheckedException {
+
+ if (readDelay < failureDetectionTimeout()) {
+ try {
+ return super.readMessage(sock, in, timeout);
+ }
+ catch (Exception e) {
+ err = e;
+
+ throw e;
+ }
+ }
+ else {
+ T msg = super.readMessage(sock, in, timeout);
+
+ if (msg instanceof TcpDiscoveryPingRequest) {
+ try {
+ Thread.sleep(2000);
+ } catch (InterruptedException e) {
+ // Ignore
+ }
+ throw new SocketTimeoutException("Forced timeout");
+ }
+
+ return msg;
+ }
+ }
+
+ /**
+ * Resets testing state.
+ */
+ private void reset() {
+ readDelay = 0;
+ err = null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/cff25e91/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
index 63db0c1..69a5f13 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
@@ -118,7 +118,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
@Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(gridName);
- TestTcpDiscoverySpi disco = new TestTcpDiscoverySpi();
+ TcpDiscoverySpi disco = getDiscoverySpi();
disco.setMaxMissedClientHeartbeats(maxMissedClientHbs);
@@ -154,9 +154,19 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
else
throw new IllegalArgumentException();
- if (longSockTimeouts) {
- disco.setAckTimeout(2000);
- disco.setSocketTimeout(2000);
+ if (useFailureDetectionTimeout())
+ cfg.setFailureDetectionTimeout(failureDetectionTimeout());
+ else {
+ if (longSockTimeouts) {
+ disco.setAckTimeout(2000);
+ disco.setSocketTimeout(2000);
+ }
+ else {
+ disco.setAckTimeout(gridName.startsWith("client") ? TcpDiscoverySpi.DFLT_ACK_TIMEOUT_CLIENT :
+ TcpDiscoverySpi.DFLT_ACK_TIMEOUT);
+ disco.setSocketTimeout(gridName.startsWith("client") ? TcpDiscoverySpi.DFLT_SOCK_TIMEOUT_CLIENT :
+ TcpDiscoverySpi.DFLT_SOCK_TIMEOUT);
+ }
}
disco.setJoinTimeout(joinTimeout);
@@ -164,7 +174,8 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
disco.setClientReconnectDisabled(reconnectDisabled);
- disco.afterWrite(afterWrite);
+ if (disco instanceof TestTcpDiscoverySpi)
+ ((TestTcpDiscoverySpi)disco).afterWrite(afterWrite);
cfg.setDiscoverySpi(disco);
@@ -174,6 +185,14 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
return cfg;
}
+ /**
+ * Returns TCP Discovery SPI instance to use in a test.
+ * @return TCP Discovery SPI.
+ */
+ protected TcpDiscoverySpi getDiscoverySpi() {
+ return new TestTcpDiscoverySpi();
+ }
+
/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
Collection<InetSocketAddress> addrs = IP_FINDER.getRegisteredAddresses();
@@ -205,6 +224,24 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
}
/**
+ * Checks whether to use failure detection timeout instead of setting explicit timeouts.
+ *
+ * @return {@code true} if use.
+ */
+ protected boolean useFailureDetectionTimeout() {
+ return false;
+ }
+
+ /**
+ * Gets failure detection timeout to use.
+ *
+ * @return Failure detection timeout.
+ */
+ protected long failureDetectionTimeout() {
+ return 0;
+ }
+
+ /**
* @throws Exception If failed.
*/
public void testJoinTimeout() throws Exception {
@@ -390,12 +427,12 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
final CountDownLatch latch = new CountDownLatch(1);
- ((TcpDiscoverySpi)srv1.configuration().getDiscoverySpi()).addIncomeConnectionListener(new IgniteInClosure<Socket>() {
+ ((TcpDiscoverySpi)srv1.configuration().getDiscoverySpi()).addIncomeConnectionListener(new IgniteInClosure
+ <Socket>() {
@Override public void apply(Socket sock) {
try {
latch.await();
- }
- catch (InterruptedException e) {
+ } catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
@@ -414,11 +451,14 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
startServerNodes(2);
startClientNodes(1);
+ checkNodes(2, 1);
+
Ignite srv0 = G.ignite("server-0");
Ignite srv1 = G.ignite("server-1");
Ignite client = G.ignite("client-0");
- ((TcpDiscoverySpi)srv0.configuration().getDiscoverySpi()).setAckTimeout(1000);
+ if (!useFailureDetectionTimeout())
+ ((TcpDiscoverySpi)srv0.configuration().getDiscoverySpi()).setAckTimeout(1000);
((TestTcpDiscoverySpi)client.configuration().getDiscoverySpi()).pauseSocketWrite();
@@ -756,8 +796,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
@Override public void apply(TcpDiscoveryAbstractMessage msg) {
try {
Thread.sleep(1000000);
- }
- catch (InterruptedException ignored) {
+ } catch (InterruptedException ignored) {
Thread.interrupted();
}
}
@@ -1405,8 +1444,8 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
latch.countDown();
- assertTrue(disconnectLatch.await(10_000, MILLISECONDS));
- assertTrue(reconnectLatch.await(10_000, MILLISECONDS));
+ assertTrue(disconnectLatch.await(awaitTime(), MILLISECONDS));
+ assertTrue(reconnectLatch.await(awaitTime(), MILLISECONDS));
clientNodeIds.add(client.cluster().localNode().id());
@@ -1474,7 +1513,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
* @param failSrv If {@code true} fails server, otherwise server does not send join message.
* @throws Exception If failed.
*/
- private void reconnectSegmentedAfterJoinTimeout(boolean failSrv) throws Exception {
+ protected void reconnectSegmentedAfterJoinTimeout(boolean failSrv) throws Exception {
netTimeout = 4000;
joinTimeout = 5000;
@@ -1542,9 +1581,9 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
clientSpi.brakeConnection();
}
- assertTrue(disconnectLatch.await(10_000, MILLISECONDS));
+ assertTrue(disconnectLatch.await(awaitTime(), MILLISECONDS));
- assertTrue(segmentedLatch.await(10_000, MILLISECONDS));
+ assertTrue(segmentedLatch.await(awaitTime(), MILLISECONDS));
waitSegmented(client);
@@ -1557,7 +1596,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
@Override public boolean apply() {
return srv.cluster().nodes().size() == 1;
}
- }, 10_000);
+ }, awaitTime());
checkNodes(1, 0);
}
@@ -1614,7 +1653,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
srv.close();
- assertTrue(disconnectLatch.await(10_000, MILLISECONDS));
+ assertTrue(disconnectLatch.await(awaitTime(), MILLISECONDS));
srvNodeIds.clear();
srvIdx.set(0);
@@ -1625,7 +1664,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
startServerNodes(1);
- assertTrue(reconnectLatch.await(10_000, MILLISECONDS));
+ assertTrue(reconnectLatch.await(awaitTime(), MILLISECONDS));
clientNodeIds.clear();
clientNodeIds.add(client.cluster().localNode().id());
@@ -1695,7 +1734,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
clientSpi.brakeConnection();
- assertTrue(disconnectLatch.await(10_000, MILLISECONDS));
+ assertTrue(disconnectLatch.await(awaitTime(), MILLISECONDS));
log.info("Fail client connection2.");
@@ -1704,7 +1743,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
clientSpi.brakeConnection();
- assertTrue(reconnectLatch.await(10_000, MILLISECONDS));
+ assertTrue(reconnectLatch.await(awaitTime(), MILLISECONDS));
clientNodeIds.clear();
@@ -1715,7 +1754,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
public boolean apply() {
return srv.cluster().nodes().size() == 2;
}
- }, 10_000);
+ }, awaitTime());
checkNodes(1, 1);
@@ -1759,7 +1798,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
* @param cnt Number of nodes.
* @throws Exception In case of error.
*/
- private void startServerNodes(int cnt) throws Exception {
+ protected void startServerNodes(int cnt) throws Exception {
for (int i = 0; i < cnt; i++) {
Ignite g = startGrid("server-" + srvIdx.getAndIncrement());
@@ -1771,7 +1810,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
* @param cnt Number of nodes.
* @throws Exception In case of error.
*/
- private void startClientNodes(int cnt) throws Exception {
+ protected void startClientNodes(int cnt) throws Exception {
for (int i = 0; i < cnt; i++) {
Ignite g = startGrid("client-" + clientIdx.getAndIncrement());
@@ -1888,7 +1927,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
* @param srvCnt Number of server nodes.
* @param clientCnt Number of client nodes.
*/
- private void checkNodes(int srvCnt, int clientCnt) {
+ protected void checkNodes(int srvCnt, int clientCnt) {
long topVer = -1;
for (int i = 0; i < srvCnt; i++) {
@@ -1950,8 +1989,17 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
* @param latch Latch.
* @throws InterruptedException If interrupted.
*/
- private void await(CountDownLatch latch) throws InterruptedException {
- assertTrue("Latch count: " + latch.getCount(), latch.await(10_000, MILLISECONDS));
+ protected void await(CountDownLatch latch) throws InterruptedException {
+ assertTrue("Latch count: " + latch.getCount(), latch.await(awaitTime(), MILLISECONDS));
+ }
+
+ /**
+ * Time to wait for operation completion.
+ *
+ * @return Time in milliseconds.
+ */
+ protected long awaitTime() {
+ return 10_000;
}
/**
@@ -2072,7 +2120,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
/** {@inheritDoc} */
@Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg,
- GridByteArrayOutputStream bout) throws IOException, IgniteCheckedException {
+ GridByteArrayOutputStream bout, long timeout) throws IOException, IgniteCheckedException {
waitFor(writeLock);
boolean fail = false;
@@ -2097,17 +2145,18 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
sock.close();
}
- super.writeToSocket(sock, msg, bout);
+ super.writeToSocket(sock, msg, bout, timeout);
if (afterWrite != null)
afterWrite.apply(msg, sock);
}
/** {@inheritDoc} */
- @Override protected Socket openSocket(InetSocketAddress sockAddr) throws IOException {
+ @Override protected Socket openSocket(InetSocketAddress sockAddr,
+ IgniteSpiOperationTimeoutHelper timeoutHelper) throws IOException, IgniteSpiOperationTimeoutException {
waitFor(openSockLock);
- return super.openSocket(sockAddr);
+ return super.openSocket(sockAddr, new IgniteSpiOperationTimeoutHelper(this));
}
/**
@@ -2137,7 +2186,8 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
}
/** {@inheritDoc} */
- @Override protected void writeToSocket(TcpDiscoveryAbstractMessage msg, Socket sock, int res) throws IOException {
+ @Override protected void writeToSocket(TcpDiscoveryAbstractMessage msg, Socket sock, int res, long timeout)
+ throws IOException {
if (delayJoinAckFor != null && msg instanceof TcpDiscoveryJoinRequestMessage) {
TcpDiscoveryJoinRequestMessage msg0 = (TcpDiscoveryJoinRequestMessage)msg;
@@ -2155,7 +2205,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
}
}
- super.writeToSocket(msg, sock, res);
+ super.writeToSocket(msg, sock, res, timeout);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/cff25e91/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiConfigSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiConfigSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiConfigSelfTest.java
index 3e895be..8ab2116 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiConfigSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiConfigSelfTest.java
@@ -17,6 +17,7 @@
package org.apache.ignite.spi.discovery.tcp;
+import org.apache.ignite.configuration.*;
import org.apache.ignite.testframework.junits.spi.*;
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/cff25e91/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureTimeoutSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureTimeoutSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureTimeoutSelfTest.java
new file mode 100644
index 0000000..fbea187
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureTimeoutSelfTest.java
@@ -0,0 +1,402 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.tcp;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.util.io.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.spi.*;
+import org.apache.ignite.spi.discovery.*;
+import org.apache.ignite.spi.discovery.tcp.internal.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.spi.discovery.tcp.messages.*;
+
+import java.io.*;
+import java.net.*;
+
+/**
+ *
+ */
+public class TcpDiscoverySpiFailureTimeoutSelfTest extends AbstractDiscoverySelfTest {
+ /** */
+ private static final int SPI_COUNT = 6;
+
+ /** */
+ private TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+ /** {@inheritDoc} */
+ @Override protected int getSpiCount() {
+ return SPI_COUNT;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected DiscoverySpi getSpi(int idx) {
+ TestTcpDiscoverySpi spi = new TestTcpDiscoverySpi();
+
+ spi.setMetricsProvider(createMetricsProvider());
+ spi.setIpFinder(ipFinder);
+
+ switch (idx) {
+ case 0:
+ case 1:
+ // Ignore
+ break;
+ case 2:
+ spi.setAckTimeout(3000);
+ break;
+ case 3:
+ spi.setSocketTimeout(4000);
+ break;
+ case 4:
+ spi.setReconnectCount(4);
+ break;
+ case 5:
+ spi.setMaxAckTimeout(10000);
+ break;
+ default:
+ assert false;
+ }
+
+ return spi;
+ }
+
+ /**
+ * @throws Exception In case of error.
+ */
+ public void testFailureDetectionTimeoutEnabled() throws Exception {
+ assertTrue(firstSpi().failureDetectionTimeoutEnabled());
+ assertTrue(secondSpi().failureDetectionTimeoutEnabled());
+
+ assertEquals(IgniteConfiguration.DFLT_FAILURE_DETECTION_TIMEOUT.longValue(),
+ firstSpi().failureDetectionTimeout());
+ assertEquals(IgniteConfiguration.DFLT_FAILURE_DETECTION_TIMEOUT.longValue(),
+ secondSpi().failureDetectionTimeout());
+ }
+
+ /**
+ * @throws Exception In case of error.
+ */
+ public void testFailureDetectionTimeoutDisabled() throws Exception {
+ for (int i = 2; i < spis.size(); i++) {
+ assertFalse(((TcpDiscoverySpi)spis.get(i)).failureDetectionTimeoutEnabled());
+ assertEquals(0, ((TcpDiscoverySpi)spis.get(i)).failureDetectionTimeout());
+ }
+ }
+
+ /**
+ * @throws Exception In case of error.
+ */
+ public void testFailureDetectionOnSocketOpen() throws Exception {
+ try {
+ ClusterNode node = secondSpi().getLocalNode();
+
+ firstSpi().openSocketTimeout = true;
+
+ assertFalse(firstSpi().pingNode(node.id()));
+ assertTrue(firstSpi().validTimeout);
+ assertTrue(firstSpi().err.getMessage().equals("Timeout: openSocketTimeout"));
+
+ firstSpi().openSocketTimeout = false;
+ firstSpi().openSocketTimeoutWait = true;
+
+ assertFalse(firstSpi().pingNode(node.id()));
+ assertTrue(firstSpi().validTimeout);
+ assertTrue(firstSpi().err.getMessage().equals("Timeout: openSocketTimeoutWait"));
+ }
+ finally {
+ firstSpi().resetState();
+ }
+ }
+
+
+ /**
+ * @throws Exception In case of error.
+ */
+ public void testFailureDetectionOnSocketWrite() throws Exception {
+ try {
+ ClusterNode node = secondSpi().getLocalNode();
+
+ firstSpi().writeToSocketTimeoutWait = true;
+
+ assertFalse(firstSpi().pingNode(node.id()));
+ assertTrue(firstSpi().validTimeout);
+
+ firstSpi().writeToSocketTimeoutWait = false;
+
+ assertTrue(firstSpi().pingNode(node.id()));
+ assertTrue(firstSpi().validTimeout);
+ }
+ finally {
+ firstSpi().resetState();
+ }
+ }
+
+ /**
+ * @throws Exception In case of error.
+ */
+ public void testConnectionCheckMessage() throws Exception {
+ TestTcpDiscoverySpi nextSpi = null;
+
+ try {
+ assert firstSpi().connCheckStatusMsgCntSent == 0;
+
+ TcpDiscoveryNode nextNode = ((ServerImpl)(firstSpi().impl)).ring().nextNode();
+
+ assertNotNull(nextNode);
+
+ nextSpi = null;
+
+ for (int i = 1; i < spis.size(); i++)
+ if (spis.get(i).getLocalNode().id().equals(nextNode.id())) {
+ nextSpi = (TestTcpDiscoverySpi)spis.get(i);
+ break;
+ }
+
+ assertNotNull(nextSpi);
+
+ assert nextSpi.connCheckStatusMsgCntReceived == 0;
+
+ firstSpi().countConnCheckMsg = true;
+ nextSpi.countConnCheckMsg = true;
+
+ Thread.sleep(firstSpi().failureDetectionTimeout());
+
+ firstSpi().countConnCheckMsg = false;
+ nextSpi.countConnCheckMsg = false;
+
+ int sent = firstSpi().connCheckStatusMsgCntSent;
+ int received = nextSpi.connCheckStatusMsgCntReceived;
+
+ assert sent >= 3 && sent < 7 : "messages sent: " + sent;
+ assert received >= 3 && received < 7 : "messages received: " + received;
+ }
+ finally {
+ firstSpi().resetState();
+
+ if (nextSpi != null)
+ nextSpi.resetState();
+ }
+ }
+
+ /**
+ * @throws Exception In case of error.
+ */
+ public void testConnectionCheckMessageBackwardCompatibility() throws Exception {
+ TestTcpDiscoverySpi nextSpi = null;
+ TcpDiscoveryNode nextNode = null;
+
+ IgniteProductVersion nextNodeVer = null;
+
+ try {
+ assert firstSpi().connCheckStatusMsgCntSent == 0;
+
+ nextNode = ((ServerImpl)(firstSpi().impl)).ring().nextNode();
+
+ assertNotNull(nextNode);
+
+ nextSpi = null;
+
+ for (int i = 1; i < spis.size(); i++)
+ if (spis.get(i).getLocalNode().id().equals(nextNode.id())) {
+ nextSpi = (TestTcpDiscoverySpi)spis.get(i);
+ break;
+ }
+
+ assertNotNull(nextSpi);
+
+ assert nextSpi.connCheckStatusMsgCntReceived == 0;
+
+ nextNodeVer = nextNode.version();
+
+ // Overriding the version of the next node. Connection check message must not been sent to it.
+ nextNode.version(new IgniteProductVersion(TcpDiscoverySpi.FAILURE_DETECTION_MAJOR_VER,
+ (byte)(TcpDiscoverySpi.FAILURE_DETECTION_MINOR_VER - 1), TcpDiscoverySpi.FAILURE_DETECTION_MAINT_VER,
+ 0l, null));
+
+ firstSpi().countConnCheckMsg = true;
+ nextSpi.countConnCheckMsg = true;
+
+ Thread.sleep(firstSpi().failureDetectionTimeout() / 2);
+
+ firstSpi().countConnCheckMsg = false;
+ nextSpi.countConnCheckMsg = false;
+
+ int sent = firstSpi().connCheckStatusMsgCntSent;
+ int received = nextSpi.connCheckStatusMsgCntReceived;
+
+ assert sent == 0 : "messages sent: " + sent;
+ assert received == 0 : "messages received: " + received;
+ }
+ finally {
+ firstSpi().resetState();
+
+ if (nextSpi != null)
+ nextSpi.resetState();
+
+ if (nextNode != null && nextNodeVer != null)
+ nextNode.version(nextNodeVer);
+ }
+ }
+
+ /**
+ * Returns the first spi with failure detection timeout enabled.
+ *
+ * @return SPI.
+ */
+ private TestTcpDiscoverySpi firstSpi() {
+ return (TestTcpDiscoverySpi)spis.get(0);
+ }
+
+
+ /**
+ * Returns the second spi with failure detection timeout enabled.
+ *
+ * @return SPI.
+ */
+ private TestTcpDiscoverySpi secondSpi() {
+ return (TestTcpDiscoverySpi)spis.get(1);
+ }
+
+ /**
+ *
+ */
+ private static class TestTcpDiscoverySpi extends TcpDiscoverySpi {
+ /** */
+ private volatile boolean openSocketTimeout;
+
+ /** */
+ private volatile boolean openSocketTimeoutWait;
+
+ /** */
+ private volatile boolean writeToSocketTimeoutWait;
+
+ /** */
+ private volatile boolean countConnCheckMsg;
+
+ /** */
+ private volatile int connCheckStatusMsgCntSent;
+
+ /** */
+ private volatile int connCheckStatusMsgCntReceived;
+
+ /** */
+ private volatile boolean validTimeout = true;
+
+ /** */
+ private volatile IgniteSpiOperationTimeoutException err;
+
+
+ /** {@inheritDoc} */
+ @Override protected Socket openSocket(InetSocketAddress sockAddr, IgniteSpiOperationTimeoutHelper timeoutHelper)
+ throws IOException, IgniteSpiOperationTimeoutException {
+
+ if (openSocketTimeout) {
+ err = new IgniteSpiOperationTimeoutException("Timeout: openSocketTimeout");
+ throw err;
+ }
+ else if (openSocketTimeoutWait) {
+ long timeout = timeoutHelper.nextTimeoutChunk(0);
+
+ try {
+ Thread.sleep(timeout + 1000);
+ }
+ catch (InterruptedException e) {
+ // Ignore
+ }
+
+ try {
+ timeoutHelper.nextTimeoutChunk(0);
+ }
+ catch (IgniteSpiOperationTimeoutException e) {
+ throw (err = new IgniteSpiOperationTimeoutException("Timeout: openSocketTimeoutWait"));
+ }
+ }
+
+ Socket sock = super.openSocket(sockAddr, timeoutHelper);
+
+ try {
+ Thread.sleep(1500);
+ } catch (InterruptedException e) {
+ // Ignore
+ }
+
+ return sock;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, long timeout)
+ throws IOException, IgniteCheckedException {
+ if (!(msg instanceof TcpDiscoveryPingRequest)) {
+ super.writeToSocket(sock, msg, timeout);
+ return;
+ }
+
+ if (timeout >= IgniteConfiguration.DFLT_FAILURE_DETECTION_TIMEOUT) {
+ validTimeout = false;
+
+ throw new IgniteCheckedException("Invalid timeout: " + timeout);
+ }
+
+ if (writeToSocketTimeoutWait) {
+ try {
+ Thread.sleep(timeout);
+ }
+ catch (InterruptedException e) {
+ // Ignore
+ }
+ }
+ else
+ super.writeToSocket(sock, msg, timeout);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg,
+ GridByteArrayOutputStream bout, long timeout) throws IOException, IgniteCheckedException {
+ if (countConnCheckMsg && msg instanceof TcpDiscoveryConnectionCheckMessage)
+ connCheckStatusMsgCntSent++;
+
+ super.writeToSocket(sock, msg, bout, timeout);
+ }
+
+ /** {@inheritDoc} */
+ protected void writeToSocket(TcpDiscoveryAbstractMessage msg, Socket sock, int res, long timeout)
+ throws IOException {
+ if (countConnCheckMsg && msg instanceof TcpDiscoveryConnectionCheckMessage)
+ connCheckStatusMsgCntReceived++;
+
+ super.writeToSocket(msg, sock, res, timeout);
+ }
+
+ /**
+ *
+ */
+ private void resetState() {
+ openSocketTimeout = false;
+ openSocketTimeoutWait = false;
+ writeToSocketTimeoutWait = false;
+ err = null;
+ validTimeout = true;
+ connCheckStatusMsgCntSent = 0;
+ connCheckStatusMsgCntReceived = 0;
+ countConnCheckMsg = false;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/cff25e91/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java
index ff86bda..3f71d7d 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java
@@ -45,6 +45,9 @@ public class IgniteSpiCommunicationSelfTestSuite extends TestSuite {
suite.addTest(new TestSuite(GridTcpCommunicationSpiMultithreadedSelfTest.class));
suite.addTest(new TestSuite(GridTcpCommunicationSpiMultithreadedShmemTest.class));
+ suite.addTest(new TestSuite(GridTcpCommunicationSpiRecoveryFailureDetectionSelfTest.class));
+ suite.addTest(new TestSuite(GridTcpCommunicationSpiTcpFailureDetectionSelfTest.class));
+
suite.addTest(new TestSuite(GridTcpCommunicationSpiConfigSelfTest.class));
return suite;
http://git-wip-us.apache.org/repos/asf/ignite/blob/cff25e91/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
index b7014ad..d77c432 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
@@ -44,6 +44,7 @@ public class IgniteSpiDiscoverySelfTestSuite extends TestSuite {
suite.addTest(new TestSuite(TcpDiscoverySelfTest.class));
suite.addTest(new TestSuite(TcpDiscoverySpiSelfTest.class));
+ suite.addTest(new TestSuite(TcpDiscoverySpiFailureTimeoutSelfTest.class));
suite.addTest(new TestSuite(TcpDiscoverySpiStartStopSelfTest.class));
suite.addTest(new TestSuite(TcpDiscoverySpiConfigSelfTest.class));
suite.addTest(new TestSuite(TcpDiscoveryMarshallerCheckSelfTest.class));
@@ -54,6 +55,7 @@ public class IgniteSpiDiscoverySelfTestSuite extends TestSuite {
suite.addTest(new TestSuite(TcpClientDiscoverySpiSelfTest.class));
suite.addTest(new TestSuite(TcpClientDiscoveryMarshallerCheckSelfTest.class));
suite.addTest(new TestSuite(TcpClientDiscoverySpiMulticastTest.class));
+ suite.addTest(new TestSuite(TcpClientDiscoverySpiFailureTimeoutSelfTest.class));
suite.addTest(new TestSuite(TcpDiscoveryNodeConsistentIdSelfTest.class));
suite.addTest(new TestSuite(TcpDiscoveryNodeConfigConsistentIdSelfTest.class));