You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/07/19 12:33:21 UTC
[7/9] incubator-ignite git commit: ignite-752: added tests for client
spi
ignite-752: added tests for client spi
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/af624eb4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/af624eb4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/af624eb4
Branch: refs/heads/ignite-752
Commit: af624eb4fb1db6563ea583b298b7471de2506ab2
Parents: 0cc31b2
Author: Denis Magda <dm...@gridgain.com>
Authored: Sun Jul 19 12:37:14 2015 +0300
Committer: Denis Magda <dm...@gridgain.com>
Committed: Sun Jul 19 12:37:14 2015 +0300
----------------------------------------------------------------------
.../ignite/spi/discovery/tcp/ServerImpl.java | 7 +-
.../spi/discovery/tcp/TcpDiscoverySpi.java | 6 +-
...entDiscoverySpiFailureThresholdSelfTest.java | 83 ++++++
.../tcp/TcpClientDiscoverySpiSelfTest.java | 77 ++++--
.../tcp/TcpDiscoverySpiConfigSelfTest.java | 4 +
...TcpDiscoverySpiFailureThresholdSelfTest.java | 270 ++++++++++++++++---
.../IgniteSpiDiscoverySelfTestSuite.java | 1 +
7 files changed, 385 insertions(+), 63 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/af624eb4/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index f05d027..6e0d199 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -622,8 +622,11 @@ class ServerImpl extends TcpDiscoveryImpl {
/** {@inheritDoc} */
@Override protected void onDataReceived() {
if (spi.failureDetectionThresholdEnabled()) {
- locNode.lastDataReceivedTime(U.currentTimeMillis());
- chkStatusSnd.onDataReceived();
+ if (locNode != null)
+ locNode.lastDataReceivedTime(U.currentTimeMillis());
+
+ if (chkStatusSnd != null)
+ chkStatusSnd.onDataReceived();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/af624eb4/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index f231c29..23166f2 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -158,13 +158,13 @@ import java.util.concurrent.atomic.*;
@DiscoverySpiHistorySupport(true)
public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, TcpDiscoverySpiMBean {
/** Failure detection threshold feature major version. */
- final static int FAILURE_DETECTION_MAJOR_VER = 1;
+ final static byte FAILURE_DETECTION_MAJOR_VER = 1;
/** Failure detection threshold feature minor version. */
- final static int FAILURE_DETECTION_MINOR_VER = 3;
+ final static byte FAILURE_DETECTION_MINOR_VER = 4;
/** Failure detection threshold feature maintainance version. */
- final static int FAILURE_DETECTION_MAINT_VER = 1;
+ final static byte FAILURE_DETECTION_MAINT_VER = 1;
/** Node attribute that is mapped to node's external addresses (value is <tt>disc.tcp.ext-addrs</tt>). */
public static final String ATTR_EXT_ADDRS = "disc.tcp.ext-addrs";
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/af624eb4/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureThresholdSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureThresholdSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureThresholdSelfTest.java
new file mode 100644
index 0000000..202b328
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureThresholdSelfTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.tcp;
+
+import org.apache.ignite.internal.util.typedef.*;
+
+import java.util.concurrent.*;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+
+/**
+ * Client-based discovery SPI test with failure detection threshold enabled.
+ */
+public class TcpClientDiscoverySpiFailureThresholdSelfTest extends TcpClientDiscoverySpiSelfTest {
+ /** */
+ private final static int FAILURE_AWAIT_TIME = 7_000;
+
+ /** {@inheritDoc} */
+ @Override protected boolean useFailureDetectionThreshold() {
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected long failureDetectionThreshold() {
+ return 10_000;
+ }
+
+ /** {@inheritDoc} */
+ protected void await(CountDownLatch latch) throws InterruptedException {
+ assertTrue("Latch count: " + latch.getCount(), latch.await(failureDetectionThreshold() +
+ FAILURE_AWAIT_TIME, MILLISECONDS));
+ }
+
+ /**
+ * @throws Exception in case of error.
+ */
+ public void testFailureDetectionThresholdEnabled() throws Exception {
+ startServerNodes(1);
+ startClientNodes(1);
+
+ checkNodes(1, 1);
+
+ assertTrue(((TcpDiscoverySpi)(G.ignite("server-0").configuration().getDiscoverySpi())).
+ failureDetectionThresholdEnabled());
+ assertEquals(failureDetectionThreshold(),
+ ((TcpDiscoverySpi)(G.ignite("server-0").configuration().getDiscoverySpi())).failureDetectionThreshold());
+
+ assertTrue(((TcpDiscoverySpi)(G.ignite("client-0").configuration().getDiscoverySpi())).
+ failureDetectionThresholdEnabled());
+ assertEquals(failureDetectionThreshold(),
+ ((TcpDiscoverySpi)(G.ignite("client-0").configuration().getDiscoverySpi())).failureDetectionThreshold());
+ }
+
+ /** {@inheritDoc} */
+ public void testReconnectSegmentedAfterJoinTimeoutServerFailed() throws Exception {
+ reconnectSegmentedAfterJoinTimeout(true, failureDetectionThreshold() + FAILURE_AWAIT_TIME);
+ }
+
+ /** {@inheritDoc} */
+ public void testReconnectSegmentedAfterJoinTimeoutNetworkError() throws Exception {
+ reconnectSegmentedAfterJoinTimeout(false, failureDetectionThreshold() + FAILURE_AWAIT_TIME);
+ }
+
+ /** {@inheritDoc} */
+ public void testDisconnectAfterNetworkTimeout() throws Exception {
+ testDisconnectAfterNetworkTimeout(failureDetectionThreshold() + FAILURE_AWAIT_TIME);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/af624eb4/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
index be442b5..458e545 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
@@ -114,6 +114,9 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
/** */
private boolean reconnectDisabled;
+ /** */
+ private boolean useFailureDetectionThreshold;
+
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(gridName);
@@ -154,13 +157,17 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
else
throw new IllegalArgumentException();
- if (longSockTimeouts) {
+ if (longSockTimeouts && !useFailureDetectionThreshold()) {
disco.setAckTimeout(2000);
disco.setSocketTimeout(2000);
}
disco.setJoinTimeout(joinTimeout);
- disco.setNetworkTimeout(netTimeout);
+
+ if (!useFailureDetectionThreshold())
+ disco.setNetworkTimeout(netTimeout);
+ else
+ cfg.setFailureDetectionThreshold(failureDetectionThreshold());
disco.setClientReconnectDisabled(reconnectDisabled);
@@ -205,6 +212,24 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
}
/**
+ * Checks whether to use failure detection threshold instead of setting explicit timeouts.
+ *
+ * @return {@code true} if use.
+ */
+ protected boolean useFailureDetectionThreshold() {
+ return false;
+ }
+
+ /**
+ * Gets failure detection threshold to use.
+ *
+ * @return Failure detection threshold.
+ */
+ protected long failureDetectionThreshold() {
+ return 0;
+ }
+
+ /**
* @throws Exception If failed.
*/
public void testJoinTimeout() throws Exception {
@@ -418,7 +443,8 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
Ignite srv1 = G.ignite("server-1");
Ignite client = G.ignite("client-0");
- ((TcpDiscoverySpi)srv0.configuration().getDiscoverySpi()).setAckTimeout(1000);
+ if (!useFailureDetectionThreshold())
+ ((TcpDiscoverySpi)srv0.configuration().getDiscoverySpi()).setAckTimeout(1000);
((TestTcpDiscoverySpi)client.configuration().getDiscoverySpi()).pauseSocketWrite();
@@ -756,8 +782,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
@Override public void apply(TcpDiscoveryAbstractMessage msg) {
try {
Thread.sleep(1000000);
- }
- catch (InterruptedException ignored) {
+ } catch (InterruptedException ignored) {
Thread.interrupted();
}
}
@@ -896,7 +921,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
startClientNodes(1);
assertEquals(G.ignite("server-0").cluster().localNode().id(),
- ((TcpDiscoveryNode) G.ignite("client-0").cluster().localNode()).clientRouterNodeId());
+ ((TcpDiscoveryNode)G.ignite("client-0").cluster().localNode()).clientRouterNodeId());
checkNodes(2, 1);
@@ -1460,21 +1485,21 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
* @throws Exception If failed.
*/
public void testReconnectSegmentedAfterJoinTimeoutServerFailed() throws Exception {
- reconnectSegmentedAfterJoinTimeout(true);
+ reconnectSegmentedAfterJoinTimeout(true, 10_000);
}
/**
* @throws Exception If failed.
*/
public void testReconnectSegmentedAfterJoinTimeoutNetworkError() throws Exception {
- reconnectSegmentedAfterJoinTimeout(false);
+ reconnectSegmentedAfterJoinTimeout(false, 10_000);
}
/**
* @param failSrv If {@code true} fails server, otherwise server does not send join message.
* @throws Exception If failed.
*/
- private void reconnectSegmentedAfterJoinTimeout(boolean failSrv) throws Exception {
+ protected void reconnectSegmentedAfterJoinTimeout(boolean failSrv, long awaitTimeout) throws Exception {
netTimeout = 4000;
joinTimeout = 5000;
@@ -1542,9 +1567,9 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
clientSpi.brakeConnection();
}
- assertTrue(disconnectLatch.await(10_000, MILLISECONDS));
+ assertTrue(disconnectLatch.await(awaitTimeout, MILLISECONDS));
- assertTrue(segmentedLatch.await(10_000, MILLISECONDS));
+ assertTrue(segmentedLatch.await(awaitTimeout, MILLISECONDS));
waitSegmented(client);
@@ -1557,7 +1582,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
@Override public boolean apply() {
return srv.cluster().nodes().size() == 1;
}
- }, 10_000);
+ }, awaitTimeout);
checkNodes(1, 0);
}
@@ -1590,8 +1615,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
assertEquals(1, disconnectLatch.getCount());
disconnectLatch.countDown();
- }
- else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
+ } else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
log.info("Reconnected event.");
assertEquals(1, reconnectLatch.getCount());
@@ -1599,8 +1623,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
assertFalse(err.get());
reconnectLatch.countDown();
- }
- else {
+ } else {
log.error("Unexpected event: " + evt);
err.set(true);
@@ -1639,6 +1662,14 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
* @throws Exception If failed.
*/
public void testDisconnectAfterNetworkTimeout() throws Exception {
+ testDisconnectAfterNetworkTimeout(10_000);
+ }
+
+ /**
+ * @param timeout Timeout to wait.
+ * @throws Exception if failed.
+ */
+ public void testDisconnectAfterNetworkTimeout(long timeout) throws Exception {
netTimeout = 5000;
joinTimeout = 60_000;
maxMissedClientHbs = 2;
@@ -1695,7 +1726,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
clientSpi.brakeConnection();
- assertTrue(disconnectLatch.await(10_000, MILLISECONDS));
+ assertTrue(disconnectLatch.await(timeout, MILLISECONDS));
log.info("Fail client connection2.");
@@ -1704,7 +1735,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
clientSpi.brakeConnection();
- assertTrue(reconnectLatch.await(10_000, MILLISECONDS));
+ assertTrue(reconnectLatch.await(timeout, MILLISECONDS));
clientNodeIds.clear();
@@ -1715,7 +1746,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
public boolean apply() {
return srv.cluster().nodes().size() == 2;
}
- }, 10_000);
+ }, timeout);
checkNodes(1, 1);
@@ -1759,7 +1790,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
* @param cnt Number of nodes.
* @throws Exception In case of error.
*/
- private void startServerNodes(int cnt) throws Exception {
+ protected void startServerNodes(int cnt) throws Exception {
for (int i = 0; i < cnt; i++) {
Ignite g = startGrid("server-" + srvIdx.getAndIncrement());
@@ -1771,7 +1802,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
* @param cnt Number of nodes.
* @throws Exception In case of error.
*/
- private void startClientNodes(int cnt) throws Exception {
+ protected void startClientNodes(int cnt) throws Exception {
for (int i = 0; i < cnt; i++) {
Ignite g = startGrid("client-" + clientIdx.getAndIncrement());
@@ -1888,7 +1919,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
* @param srvCnt Number of server nodes.
* @param clientCnt Number of client nodes.
*/
- private void checkNodes(int srvCnt, int clientCnt) {
+ protected void checkNodes(int srvCnt, int clientCnt) {
long topVer = -1;
for (int i = 0; i < srvCnt; i++) {
@@ -1950,7 +1981,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
* @param latch Latch.
* @throws InterruptedException If interrupted.
*/
- private void await(CountDownLatch latch) throws InterruptedException {
+ protected void await(CountDownLatch latch) throws InterruptedException {
assertTrue("Latch count: " + latch.getCount(), latch.await(10_000, MILLISECONDS));
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/af624eb4/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiConfigSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiConfigSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiConfigSelfTest.java
index 3e895be..91f4f9e 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiConfigSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiConfigSelfTest.java
@@ -17,6 +17,7 @@
package org.apache.ignite.spi.discovery.tcp;
+import org.apache.ignite.configuration.*;
import org.apache.ignite.testframework.junits.spi.*;
/**
@@ -41,5 +42,8 @@ public class TcpDiscoverySpiConfigSelfTest extends GridSpiAbstractConfigTest<Tcp
checkNegativeSpiProperty(new TcpDiscoverySpi(), "threadPriority", -1);
checkNegativeSpiProperty(new TcpDiscoverySpi(), "maxMissedHeartbeats", 0);
checkNegativeSpiProperty(new TcpDiscoverySpi(), "statisticsPrintFrequency", 0);
+ checkNegativeSpiProperty(new TcpDiscoverySpi(), "connectionCheckFrequency", 0);
+ checkNegativeSpiProperty(new TcpDiscoverySpi(), "connectionCheckFrequency",
+ IgniteConfiguration.DFLT_FAILURE_DETECTION_THRESHOLD + 1000);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/af624eb4/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureThresholdSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureThresholdSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureThresholdSelfTest.java
index db0d9c5..fab3628 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureThresholdSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureThresholdSelfTest.java
@@ -20,11 +20,15 @@ package org.apache.ignite.spi.discovery.tcp;
import org.apache.ignite.*;
import org.apache.ignite.cluster.*;
import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.util.io.*;
+import org.apache.ignite.lang.*;
import org.apache.ignite.spi.*;
import org.apache.ignite.spi.discovery.*;
+import org.apache.ignite.spi.discovery.tcp.internal.*;
import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
import org.apache.ignite.spi.discovery.tcp.messages.*;
+import org.jetbrains.annotations.*;
import java.io.*;
import java.net.*;
@@ -34,26 +38,56 @@ import java.net.*;
*/
public class TcpDiscoverySpiFailureThresholdSelfTest extends AbstractDiscoverySelfTest {
/** */
- private static TestTcpDiscoverySpi firstSpi;
+ private static final int SPI_COUNT = 7;
/** */
- private static TestTcpDiscoverySpi secondSpi;
+ private static final long CONN_CHECK_FREQ = 2000;
+
+ /** */
+ private static TestTcpDiscoverySpi spis[] = new TestTcpDiscoverySpi[SPI_COUNT];
/** */
private TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
/** {@inheritDoc} */
+ @Override protected int getSpiCount() {
+ return SPI_COUNT;
+ }
+
+ /** {@inheritDoc} */
@Override protected DiscoverySpi getSpi(int idx) {
TestTcpDiscoverySpi spi = new TestTcpDiscoverySpi();
- if (idx == 0)
- firstSpi = spi;
- else
- secondSpi = spi;
-
spi.setMetricsProvider(createMetricsProvider());
spi.setIpFinder(ipFinder);
+ spis[idx] = spi;
+
+ switch (idx) {
+ case 0:
+ spi.setConnectionCheckFrequency(CONN_CHECK_FREQ);
+ break;
+ case 1:
+ // Ignore
+ break;
+ case 2:
+ spi.setAckTimeout(3000);
+ break;
+ case 3:
+ spi.setSocketTimeout(4000);
+ break;
+ case 4:
+ spi.setReconnectCount(4);
+ break;
+ case 5:
+ spi.setMaxAckTimeout(10000);
+ break;
+ case 6:
+ spi.setNetworkTimeout(4000);
+ break;
+ default:
+ assert false;
+ }
return spi;
}
@@ -61,11 +95,21 @@ public class TcpDiscoverySpiFailureThresholdSelfTest extends AbstractDiscoverySe
* @throws Exception In case of error.
*/
public void testFailureDetectionThresholdEnabled() throws Exception {
- assertTrue(firstSpi.failureDetectionThresholdEnabled());
- assertTrue(secondSpi.failureDetectionThresholdEnabled());
+ assertTrue(firstSpi().failureDetectionThresholdEnabled());
+ assertTrue(secondSpi().failureDetectionThresholdEnabled());
+
+ assertEquals(IgniteConfiguration.DFLT_FAILURE_DETECTION_THRESHOLD, firstSpi().failureDetectionThreshold());
+ assertEquals(IgniteConfiguration.DFLT_FAILURE_DETECTION_THRESHOLD, secondSpi().failureDetectionThreshold());
+ }
- assertEquals(IgniteConfiguration.DFLT_FAILURE_DETECTION_THRESHOLD, firstSpi.failureDetectionThreshold());
- assertEquals(IgniteConfiguration.DFLT_FAILURE_DETECTION_THRESHOLD, secondSpi.failureDetectionThreshold());
+ /**
+ * @throws Exception In case of error.
+ */
+ public void testFailureDetectionThresholdDisabled() throws Exception {
+ for (int i = 2; i < spis.length; i++) {
+ assertFalse(spis[i].failureDetectionThresholdEnabled());
+ assertEquals(0, spis[i].failureDetectionThreshold());
+ }
}
/**
@@ -73,23 +117,23 @@ public class TcpDiscoverySpiFailureThresholdSelfTest extends AbstractDiscoverySe
*/
public void testFailureDetectionOnSocketOpen() throws Exception {
try {
- ClusterNode node = secondSpi.getLocalNode();
+ ClusterNode node = secondSpi().getLocalNode();
- firstSpi.openSocketTimeout = true;
+ firstSpi().openSocketTimeout = true;
- assertFalse(firstSpi.pingNode(node.id()));
- assertTrue(firstSpi.validTimeout);
- assertTrue(firstSpi.err.getMessage().equals("Timeout: openSocketTimeout"));
+ assertFalse(firstSpi().pingNode(node.id()));
+ assertTrue(firstSpi().validTimeout);
+ assertTrue(firstSpi().err.getMessage().equals("Timeout: openSocketTimeout"));
- firstSpi.openSocketTimeout = false;
- firstSpi.openSocketTimeoutWait = true;
+ firstSpi().openSocketTimeout = false;
+ firstSpi().openSocketTimeoutWait = true;
- assertFalse(firstSpi.pingNode(node.id()));
- assertTrue(firstSpi.validTimeout);
- assertTrue(firstSpi.err.getMessage().equals("Timeout: openSocketTimeoutWait"));
+ assertFalse(firstSpi().pingNode(node.id()));
+ assertTrue(firstSpi().validTimeout);
+ assertTrue(firstSpi().err.getMessage().equals("Timeout: openSocketTimeoutWait"));
}
finally {
- firstSpi.resetState();
+ firstSpi().resetState();
}
}
@@ -99,41 +143,176 @@ public class TcpDiscoverySpiFailureThresholdSelfTest extends AbstractDiscoverySe
*/
public void testFailureDetectionOnSocketWrite() throws Exception {
try {
- ClusterNode node = secondSpi.getLocalNode();
+ ClusterNode node = secondSpi().getLocalNode();
+
+ firstSpi().writeToSocketTimeoutWait = true;
+
+ assertFalse(firstSpi().pingNode(node.id()));
+ assertTrue(firstSpi().validTimeout);
+
+ firstSpi().writeToSocketTimeoutWait = false;
+
+ assertTrue(firstSpi().pingNode(node.id()));
+ assertTrue(firstSpi().validTimeout);
+ }
+ finally {
+ firstSpi().resetState();
+ }
+ }
+
+ /**
+ * @throws Exception In case of error.
+ */
+ public void testConnectionCheckMessage() throws Exception {
+ TestTcpDiscoverySpi nextSpi = null;
+
+ try {
+ assert firstSpi().connCheckStatusMsgCntSent == 0;
+
+ TcpDiscoveryNode nextNode = ((ServerImpl)(firstSpi().impl)).ring().nextNode();
+
+ assertNotNull(nextNode);
+
+ nextSpi = null;
+
+ for (int i = 1; i < spis.length; i++)
+ if (spis[i].getLocalNode().id().equals(nextNode.id())) {
+ nextSpi = spis[i];
+ break;
+ }
+
+ assertNotNull(nextSpi);
+
+ assert nextSpi.connCheckStatusMsgCntReceived == 0;
+
+ firstSpi().countConnCheckMsg = true;
+ nextSpi.countConnCheckMsg = true;
+
+ Thread.sleep(CONN_CHECK_FREQ * 5);
+
+ firstSpi().countConnCheckMsg = false;
+ nextSpi.countConnCheckMsg = false;
+
+ int sent = firstSpi().connCheckStatusMsgCntSent;
+ int received = nextSpi.connCheckStatusMsgCntReceived;
+
+ assert sent >= 3 && sent < 7 : "messages sent: " + sent;
+ assert received >= 3 && received < 7 : "messages received: " + received;
+ }
+ finally {
+ firstSpi().resetState();
+
+ if (nextSpi != null)
+ nextSpi.resetState();
+ }
+ }
+
+ /**
+ * @throws Exception In case of error.
+ */
+ public void testConnectionCheckMessageBackwardCompatibility() throws Exception {
+ TestTcpDiscoverySpi nextSpi = null;
+ TcpDiscoveryNode nextNode = null;
+
+ IgniteProductVersion nextNodeVer = null;
+
+ try {
+ assert firstSpi().connCheckStatusMsgCntSent == 0;
+
+ nextNode = ((ServerImpl)(firstSpi().impl)).ring().nextNode();
+
+ assertNotNull(nextNode);
- firstSpi.writeToSocketTimeoutWait = true;
+ nextSpi = null;
- assertFalse(firstSpi.pingNode(node.id()));
- assertTrue(firstSpi.validTimeout);
+ for (int i = 1; i < spis.length; i++)
+ if (spis[i].getLocalNode().id().equals(nextNode.id())) {
+ nextSpi = spis[i];
+ break;
+ }
+
+ assertNotNull(nextSpi);
+
+ assert nextSpi.connCheckStatusMsgCntReceived == 0;
+
+ nextNodeVer = nextNode.version();
+
+ // Overriding the version of the next node. Connection check message must not been sent to it.
+ nextNode.version(new IgniteProductVersion(TcpDiscoverySpi.FAILURE_DETECTION_MAJOR_VER,
+ (byte)(TcpDiscoverySpi.FAILURE_DETECTION_MINOR_VER - 1), TcpDiscoverySpi.FAILURE_DETECTION_MAINT_VER,
+ 0l, null));
+
+ firstSpi().countConnCheckMsg = true;
+ nextSpi.countConnCheckMsg = true;
- firstSpi.writeToSocketTimeoutWait = false;
+ Thread.sleep(CONN_CHECK_FREQ * 5);
- assertTrue(firstSpi.pingNode(node.id()));
- assertTrue(firstSpi.validTimeout);
+ firstSpi().countConnCheckMsg = false;
+ nextSpi.countConnCheckMsg = false;
+
+ int sent = firstSpi().connCheckStatusMsgCntSent;
+ int received = nextSpi.connCheckStatusMsgCntReceived;
+
+ assert sent == 0 : "messages sent: " + sent;
+ assert received == 0 : "messages received: " + received;
}
finally {
- firstSpi.resetState();
+ firstSpi().resetState();
+
+ if (nextSpi != null)
+ nextSpi.resetState();
+
+ if (nextNode != null && nextNodeVer != null)
+ nextNode.version(nextNodeVer);
}
}
/**
+ * Returns the first spi with failure detection threshold enabled.
+ *
+ * @return SPI.
+ */
+ private TestTcpDiscoverySpi firstSpi() {
+ return spis[0];
+ }
+
+
+ /**
+ * Returns the second spi with failure detection threshold enabled.
+ *
+ * @return SPI.
+ */
+ private TestTcpDiscoverySpi secondSpi() {
+ return spis[1];
+ }
+
+ /**
*
*/
private static class TestTcpDiscoverySpi extends TcpDiscoverySpi {
/** */
- private boolean openSocketTimeout;
+ private volatile boolean openSocketTimeout;
+
+ /** */
+ private volatile boolean openSocketTimeoutWait;
+
+ /** */
+ private volatile boolean writeToSocketTimeoutWait;
+
+ /** */
+ private volatile boolean countConnCheckMsg;
/** */
- private boolean openSocketTimeoutWait;
+ private volatile int connCheckStatusMsgCntSent;
/** */
- private boolean writeToSocketTimeoutWait;
+ private volatile int connCheckStatusMsgCntReceived;
/** */
- private boolean validTimeout = true;
+ private volatile boolean validTimeout = true;
/** */
- private IgniteSpiOperationTimeoutException err;
+ private volatile IgniteSpiOperationTimeoutException err;
/** {@inheritDoc} */
@@ -200,6 +379,24 @@ public class TcpDiscoverySpiFailureThresholdSelfTest extends AbstractDiscoverySe
super.writeToSocket(sock, msg, timeout);
}
+ /** {@inheritDoc} */
+ @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg,
+ GridByteArrayOutputStream bout, long timeout) throws IOException, IgniteCheckedException {
+ if (countConnCheckMsg && msg instanceof TcpDiscoveryConnectionCheckMessage)
+ connCheckStatusMsgCntSent++;
+
+ super.writeToSocket(sock, msg, bout, timeout);
+ }
+
+ /** {@inheritDoc} */
+ protected void writeToSocket(TcpDiscoveryAbstractMessage msg, Socket sock, int res, long timeout)
+ throws IOException {
+ if (countConnCheckMsg && msg instanceof TcpDiscoveryConnectionCheckMessage)
+ connCheckStatusMsgCntReceived++;
+
+ super.writeToSocket(msg, sock, res, timeout);
+ }
+
/**
*
*/
@@ -209,6 +406,9 @@ public class TcpDiscoverySpiFailureThresholdSelfTest extends AbstractDiscoverySe
writeToSocketTimeoutWait = false;
err = null;
validTimeout = true;
+ connCheckStatusMsgCntSent = 0;
+ connCheckStatusMsgCntReceived = 0;
+ countConnCheckMsg = false;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/af624eb4/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
index 357fd93..a78ab25 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
@@ -55,6 +55,7 @@ public class IgniteSpiDiscoverySelfTestSuite extends TestSuite {
suite.addTest(new TestSuite(TcpClientDiscoverySpiSelfTest.class));
suite.addTest(new TestSuite(TcpClientDiscoveryMarshallerCheckSelfTest.class));
suite.addTest(new TestSuite(TcpClientDiscoverySpiMulticastTest.class));
+ suite.addTest(new TestSuite(TcpClientDiscoverySpiFailureThresholdSelfTest.class));
suite.addTest(new TestSuite(TcpDiscoveryNodeConsistentIdSelfTest.class));