You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/07/19 12:33:21 UTC

[7/9] incubator-ignite git commit: ignite-752: added tests for client spi

ignite-752: added tests for client spi


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/af624eb4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/af624eb4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/af624eb4

Branch: refs/heads/ignite-752
Commit: af624eb4fb1db6563ea583b298b7471de2506ab2
Parents: 0cc31b2
Author: Denis Magda <dm...@gridgain.com>
Authored: Sun Jul 19 12:37:14 2015 +0300
Committer: Denis Magda <dm...@gridgain.com>
Committed: Sun Jul 19 12:37:14 2015 +0300

----------------------------------------------------------------------
 .../ignite/spi/discovery/tcp/ServerImpl.java    |   7 +-
 .../spi/discovery/tcp/TcpDiscoverySpi.java      |   6 +-
 ...entDiscoverySpiFailureThresholdSelfTest.java |  83 ++++++
 .../tcp/TcpClientDiscoverySpiSelfTest.java      |  77 ++++--
 .../tcp/TcpDiscoverySpiConfigSelfTest.java      |   4 +
 ...TcpDiscoverySpiFailureThresholdSelfTest.java | 270 ++++++++++++++++---
 .../IgniteSpiDiscoverySelfTestSuite.java        |   1 +
 7 files changed, 385 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/af624eb4/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index f05d027..6e0d199 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -622,8 +622,11 @@ class ServerImpl extends TcpDiscoveryImpl {
     /** {@inheritDoc} */
     @Override protected void onDataReceived() {
         if (spi.failureDetectionThresholdEnabled()) {
-            locNode.lastDataReceivedTime(U.currentTimeMillis());
-            chkStatusSnd.onDataReceived();
+            if (locNode != null)
+                locNode.lastDataReceivedTime(U.currentTimeMillis());
+
+            if (chkStatusSnd != null)
+                chkStatusSnd.onDataReceived();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/af624eb4/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index f231c29..23166f2 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -158,13 +158,13 @@ import java.util.concurrent.atomic.*;
 @DiscoverySpiHistorySupport(true)
 public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, TcpDiscoverySpiMBean {
     /** Failure detection threshold feature major version. */
-    final static int FAILURE_DETECTION_MAJOR_VER = 1;
+    final static byte FAILURE_DETECTION_MAJOR_VER = 1;
 
     /** Failure detection threshold feature minor version. */
-    final static int FAILURE_DETECTION_MINOR_VER = 3;
+    final static byte FAILURE_DETECTION_MINOR_VER = 4;
 
     /** Failure detection threshold feature maintainance version. */
-    final static int FAILURE_DETECTION_MAINT_VER = 1;
+    final static byte FAILURE_DETECTION_MAINT_VER = 1;
 
     /** Node attribute that is mapped to node's external addresses (value is <tt>disc.tcp.ext-addrs</tt>). */
     public static final String ATTR_EXT_ADDRS = "disc.tcp.ext-addrs";

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/af624eb4/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureThresholdSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureThresholdSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureThresholdSelfTest.java
new file mode 100644
index 0000000..202b328
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureThresholdSelfTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.tcp;
+
+import org.apache.ignite.internal.util.typedef.*;
+
+import java.util.concurrent.*;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+
+/**
+ * Client-based discovery SPI test with failure detection threshold enabled.
+ */
+public class TcpClientDiscoverySpiFailureThresholdSelfTest extends TcpClientDiscoverySpiSelfTest {
+    /** */
+    private final static int FAILURE_AWAIT_TIME = 7_000;
+
+    /** {@inheritDoc} */
+    @Override protected boolean useFailureDetectionThreshold() {
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected long failureDetectionThreshold() {
+        return 10_000;
+    }
+
+    /** {@inheritDoc} */
+    protected void await(CountDownLatch latch) throws InterruptedException {
+        assertTrue("Latch count: " + latch.getCount(), latch.await(failureDetectionThreshold() +
+            FAILURE_AWAIT_TIME, MILLISECONDS));
+    }
+
+    /**
+     * @throws Exception in case of error.
+     */
+    public void testFailureDetectionThresholdEnabled() throws Exception {
+        startServerNodes(1);
+        startClientNodes(1);
+
+        checkNodes(1, 1);
+
+        assertTrue(((TcpDiscoverySpi)(G.ignite("server-0").configuration().getDiscoverySpi())).
+            failureDetectionThresholdEnabled());
+        assertEquals(failureDetectionThreshold(),
+            ((TcpDiscoverySpi)(G.ignite("server-0").configuration().getDiscoverySpi())).failureDetectionThreshold());
+
+        assertTrue(((TcpDiscoverySpi)(G.ignite("client-0").configuration().getDiscoverySpi())).
+            failureDetectionThresholdEnabled());
+        assertEquals(failureDetectionThreshold(),
+                     ((TcpDiscoverySpi)(G.ignite("client-0").configuration().getDiscoverySpi())).failureDetectionThreshold());
+    }
+
+    /** {@inheritDoc} */
+    public void testReconnectSegmentedAfterJoinTimeoutServerFailed() throws Exception {
+        reconnectSegmentedAfterJoinTimeout(true, failureDetectionThreshold() + FAILURE_AWAIT_TIME);
+    }
+
+    /** {@inheritDoc} */
+    public void testReconnectSegmentedAfterJoinTimeoutNetworkError() throws Exception {
+        reconnectSegmentedAfterJoinTimeout(false, failureDetectionThreshold() + FAILURE_AWAIT_TIME);
+    }
+
+    /** {@inheritDoc} */
+    public void testDisconnectAfterNetworkTimeout() throws Exception {
+        testDisconnectAfterNetworkTimeout(failureDetectionThreshold() + FAILURE_AWAIT_TIME);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/af624eb4/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
index be442b5..458e545 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
@@ -114,6 +114,9 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
     /** */
     private boolean reconnectDisabled;
 
+    /** */
+    private boolean useFailureDetectionThreshold;
+
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(gridName);
@@ -154,13 +157,17 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
         else
             throw new IllegalArgumentException();
 
-        if (longSockTimeouts) {
+        if (longSockTimeouts && !useFailureDetectionThreshold()) {
             disco.setAckTimeout(2000);
             disco.setSocketTimeout(2000);
         }
 
         disco.setJoinTimeout(joinTimeout);
-        disco.setNetworkTimeout(netTimeout);
+
+        if (!useFailureDetectionThreshold())
+            disco.setNetworkTimeout(netTimeout);
+        else
+            cfg.setFailureDetectionThreshold(failureDetectionThreshold());
 
         disco.setClientReconnectDisabled(reconnectDisabled);
 
@@ -205,6 +212,24 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
     }
 
     /**
+     * Checks whether to use failure detection threshold instead of setting explicit timeouts.
+     *
+     * @return {@code true} if use.
+     */
+    protected boolean useFailureDetectionThreshold() {
+        return false;
+    }
+
+    /**
+     * Gets failure detection threshold to use.
+     *
+     * @return Failure detection threshold.
+     */
+    protected long failureDetectionThreshold() {
+        return 0;
+    }
+
+    /**
      * @throws Exception If failed.
      */
     public void testJoinTimeout() throws Exception {
@@ -418,7 +443,8 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
         Ignite srv1 = G.ignite("server-1");
         Ignite client = G.ignite("client-0");
 
-        ((TcpDiscoverySpi)srv0.configuration().getDiscoverySpi()).setAckTimeout(1000);
+        if (!useFailureDetectionThreshold())
+            ((TcpDiscoverySpi)srv0.configuration().getDiscoverySpi()).setAckTimeout(1000);
 
         ((TestTcpDiscoverySpi)client.configuration().getDiscoverySpi()).pauseSocketWrite();
 
@@ -756,8 +782,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
             @Override public void apply(TcpDiscoveryAbstractMessage msg) {
                 try {
                     Thread.sleep(1000000);
-                }
-                catch (InterruptedException ignored) {
+                } catch (InterruptedException ignored) {
                     Thread.interrupted();
                 }
             }
@@ -896,7 +921,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
             startClientNodes(1);
 
             assertEquals(G.ignite("server-0").cluster().localNode().id(),
-                ((TcpDiscoveryNode) G.ignite("client-0").cluster().localNode()).clientRouterNodeId());
+                         ((TcpDiscoveryNode)G.ignite("client-0").cluster().localNode()).clientRouterNodeId());
 
             checkNodes(2, 1);
 
@@ -1460,21 +1485,21 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
      * @throws Exception If failed.
      */
     public void testReconnectSegmentedAfterJoinTimeoutServerFailed() throws Exception {
-        reconnectSegmentedAfterJoinTimeout(true);
+        reconnectSegmentedAfterJoinTimeout(true, 10_000);
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testReconnectSegmentedAfterJoinTimeoutNetworkError() throws Exception {
-        reconnectSegmentedAfterJoinTimeout(false);
+        reconnectSegmentedAfterJoinTimeout(false, 10_000);
     }
 
     /**
      * @param failSrv If {@code true} fails server, otherwise server does not send join message.
      * @throws Exception If failed.
      */
-    private void reconnectSegmentedAfterJoinTimeout(boolean failSrv) throws Exception {
+    protected void reconnectSegmentedAfterJoinTimeout(boolean failSrv, long awaitTimeout) throws Exception {
         netTimeout = 4000;
         joinTimeout = 5000;
 
@@ -1542,9 +1567,9 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
             clientSpi.brakeConnection();
         }
 
-        assertTrue(disconnectLatch.await(10_000, MILLISECONDS));
+        assertTrue(disconnectLatch.await(awaitTimeout, MILLISECONDS));
 
-        assertTrue(segmentedLatch.await(10_000, MILLISECONDS));
+        assertTrue(segmentedLatch.await(awaitTimeout, MILLISECONDS));
 
         waitSegmented(client);
 
@@ -1557,7 +1582,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
                 @Override public boolean apply() {
                     return srv.cluster().nodes().size() == 1;
                 }
-            }, 10_000);
+            }, awaitTimeout);
 
             checkNodes(1, 0);
         }
@@ -1590,8 +1615,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
                     assertEquals(1, disconnectLatch.getCount());
 
                     disconnectLatch.countDown();
-                }
-                else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
+                } else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
                     log.info("Reconnected event.");
 
                     assertEquals(1, reconnectLatch.getCount());
@@ -1599,8 +1623,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
                     assertFalse(err.get());
 
                     reconnectLatch.countDown();
-                }
-                else {
+                } else {
                     log.error("Unexpected event: " + evt);
 
                     err.set(true);
@@ -1639,6 +1662,14 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
      * @throws Exception If failed.
      */
     public void testDisconnectAfterNetworkTimeout() throws Exception {
+        testDisconnectAfterNetworkTimeout(10_000);
+    }
+
+    /**
+     * @param timeout Timeout to wait.
+     * @throws Exception if failed.
+     */
+    public void testDisconnectAfterNetworkTimeout(long timeout) throws Exception {
         netTimeout = 5000;
         joinTimeout = 60_000;
         maxMissedClientHbs = 2;
@@ -1695,7 +1726,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
 
         clientSpi.brakeConnection();
 
-        assertTrue(disconnectLatch.await(10_000, MILLISECONDS));
+        assertTrue(disconnectLatch.await(timeout, MILLISECONDS));
 
         log.info("Fail client connection2.");
 
@@ -1704,7 +1735,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
 
         clientSpi.brakeConnection();
 
-        assertTrue(reconnectLatch.await(10_000, MILLISECONDS));
+        assertTrue(reconnectLatch.await(timeout, MILLISECONDS));
 
         clientNodeIds.clear();
 
@@ -1715,7 +1746,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
             public boolean apply() {
                 return srv.cluster().nodes().size() == 2;
             }
-        }, 10_000);
+        }, timeout);
 
         checkNodes(1, 1);
 
@@ -1759,7 +1790,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
      * @param cnt Number of nodes.
      * @throws Exception In case of error.
      */
-    private void startServerNodes(int cnt) throws Exception {
+    protected void startServerNodes(int cnt) throws Exception {
         for (int i = 0; i < cnt; i++) {
             Ignite g = startGrid("server-" + srvIdx.getAndIncrement());
 
@@ -1771,7 +1802,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
      * @param cnt Number of nodes.
      * @throws Exception In case of error.
      */
-    private void startClientNodes(int cnt) throws Exception {
+    protected void startClientNodes(int cnt) throws Exception {
         for (int i = 0; i < cnt; i++) {
             Ignite g = startGrid("client-" + clientIdx.getAndIncrement());
 
@@ -1888,7 +1919,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
      * @param srvCnt Number of server nodes.
      * @param clientCnt Number of client nodes.
      */
-    private void checkNodes(int srvCnt, int clientCnt) {
+    protected void checkNodes(int srvCnt, int clientCnt) {
         long topVer = -1;
 
         for (int i = 0; i < srvCnt; i++) {
@@ -1950,7 +1981,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
      * @param latch Latch.
      * @throws InterruptedException If interrupted.
      */
-    private void await(CountDownLatch latch) throws InterruptedException {
+    protected void await(CountDownLatch latch) throws InterruptedException {
         assertTrue("Latch count: " + latch.getCount(), latch.await(10_000, MILLISECONDS));
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/af624eb4/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiConfigSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiConfigSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiConfigSelfTest.java
index 3e895be..91f4f9e 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiConfigSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiConfigSelfTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.spi.discovery.tcp;
 
+import org.apache.ignite.configuration.*;
 import org.apache.ignite.testframework.junits.spi.*;
 
 /**
@@ -41,5 +42,8 @@ public class TcpDiscoverySpiConfigSelfTest extends GridSpiAbstractConfigTest<Tcp
         checkNegativeSpiProperty(new TcpDiscoverySpi(), "threadPriority", -1);
         checkNegativeSpiProperty(new TcpDiscoverySpi(), "maxMissedHeartbeats", 0);
         checkNegativeSpiProperty(new TcpDiscoverySpi(), "statisticsPrintFrequency", 0);
+        checkNegativeSpiProperty(new TcpDiscoverySpi(), "connectionCheckFrequency", 0);
+        checkNegativeSpiProperty(new TcpDiscoverySpi(), "connectionCheckFrequency",
+            IgniteConfiguration.DFLT_FAILURE_DETECTION_THRESHOLD + 1000);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/af624eb4/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureThresholdSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureThresholdSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureThresholdSelfTest.java
index db0d9c5..fab3628 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureThresholdSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureThresholdSelfTest.java
@@ -20,11 +20,15 @@ package org.apache.ignite.spi.discovery.tcp;
 import org.apache.ignite.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.util.io.*;
+import org.apache.ignite.lang.*;
 import org.apache.ignite.spi.*;
 import org.apache.ignite.spi.discovery.*;
+import org.apache.ignite.spi.discovery.tcp.internal.*;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
 import org.apache.ignite.spi.discovery.tcp.messages.*;
+import org.jetbrains.annotations.*;
 
 import java.io.*;
 import java.net.*;
@@ -34,26 +38,56 @@ import java.net.*;
  */
 public class TcpDiscoverySpiFailureThresholdSelfTest extends AbstractDiscoverySelfTest {
     /** */
-    private static TestTcpDiscoverySpi firstSpi;
+    private static final int SPI_COUNT = 7;
 
     /** */
-    private static TestTcpDiscoverySpi secondSpi;
+    private static final long CONN_CHECK_FREQ = 2000;
+
+    /** */
+    private static TestTcpDiscoverySpi spis[] = new TestTcpDiscoverySpi[SPI_COUNT];
 
     /** */
     private TcpDiscoveryIpFinder ipFinder =  new TcpDiscoveryVmIpFinder(true);
 
     /** {@inheritDoc} */
+    @Override protected int getSpiCount() {
+        return SPI_COUNT;
+    }
+
+    /** {@inheritDoc} */
     @Override protected DiscoverySpi getSpi(int idx) {
         TestTcpDiscoverySpi spi = new TestTcpDiscoverySpi();
 
-        if (idx == 0)
-            firstSpi = spi;
-        else
-            secondSpi = spi;
-
         spi.setMetricsProvider(createMetricsProvider());
         spi.setIpFinder(ipFinder);
 
+        spis[idx] = spi;
+
+        switch (idx) {
+            case 0:
+                spi.setConnectionCheckFrequency(CONN_CHECK_FREQ);
+                break;
+            case 1:
+                // Ignore
+                break;
+            case 2:
+                spi.setAckTimeout(3000);
+                break;
+            case 3:
+                spi.setSocketTimeout(4000);
+                break;
+            case 4:
+                spi.setReconnectCount(4);
+                break;
+            case 5:
+                spi.setMaxAckTimeout(10000);
+                break;
+            case 6:
+                spi.setNetworkTimeout(4000);
+                break;
+            default:
+                assert false;
+        }
         return spi;
     }
 
@@ -61,11 +95,21 @@ public class TcpDiscoverySpiFailureThresholdSelfTest extends AbstractDiscoverySe
      * @throws Exception In case of error.
      */
     public void testFailureDetectionThresholdEnabled() throws Exception {
-        assertTrue(firstSpi.failureDetectionThresholdEnabled());
-        assertTrue(secondSpi.failureDetectionThresholdEnabled());
+        assertTrue(firstSpi().failureDetectionThresholdEnabled());
+        assertTrue(secondSpi().failureDetectionThresholdEnabled());
+
+        assertEquals(IgniteConfiguration.DFLT_FAILURE_DETECTION_THRESHOLD, firstSpi().failureDetectionThreshold());
+        assertEquals(IgniteConfiguration.DFLT_FAILURE_DETECTION_THRESHOLD, secondSpi().failureDetectionThreshold());
+    }
 
-        assertEquals(IgniteConfiguration.DFLT_FAILURE_DETECTION_THRESHOLD, firstSpi.failureDetectionThreshold());
-        assertEquals(IgniteConfiguration.DFLT_FAILURE_DETECTION_THRESHOLD, secondSpi.failureDetectionThreshold());
+    /**
+     * @throws Exception In case of error.
+     */
+    public void testFailureDetectionThresholdDisabled() throws Exception {
+        for (int i = 2; i < spis.length; i++) {
+            assertFalse(spis[i].failureDetectionThresholdEnabled());
+            assertEquals(0, spis[i].failureDetectionThreshold());
+        }
     }
 
     /**
@@ -73,23 +117,23 @@ public class TcpDiscoverySpiFailureThresholdSelfTest extends AbstractDiscoverySe
      */
     public void testFailureDetectionOnSocketOpen() throws Exception {
         try {
-            ClusterNode node = secondSpi.getLocalNode();
+            ClusterNode node = secondSpi().getLocalNode();
 
-            firstSpi.openSocketTimeout = true;
+            firstSpi().openSocketTimeout = true;
 
-            assertFalse(firstSpi.pingNode(node.id()));
-            assertTrue(firstSpi.validTimeout);
-            assertTrue(firstSpi.err.getMessage().equals("Timeout: openSocketTimeout"));
+            assertFalse(firstSpi().pingNode(node.id()));
+            assertTrue(firstSpi().validTimeout);
+            assertTrue(firstSpi().err.getMessage().equals("Timeout: openSocketTimeout"));
 
-            firstSpi.openSocketTimeout = false;
-            firstSpi.openSocketTimeoutWait = true;
+            firstSpi().openSocketTimeout = false;
+            firstSpi().openSocketTimeoutWait = true;
 
-            assertFalse(firstSpi.pingNode(node.id()));
-            assertTrue(firstSpi.validTimeout);
-            assertTrue(firstSpi.err.getMessage().equals("Timeout: openSocketTimeoutWait"));
+            assertFalse(firstSpi().pingNode(node.id()));
+            assertTrue(firstSpi().validTimeout);
+            assertTrue(firstSpi().err.getMessage().equals("Timeout: openSocketTimeoutWait"));
         }
         finally {
-            firstSpi.resetState();
+            firstSpi().resetState();
         }
     }
 
@@ -99,41 +143,176 @@ public class TcpDiscoverySpiFailureThresholdSelfTest extends AbstractDiscoverySe
      */
     public void testFailureDetectionOnSocketWrite() throws Exception {
         try {
-            ClusterNode node = secondSpi.getLocalNode();
+            ClusterNode node = secondSpi().getLocalNode();
+
+            firstSpi().writeToSocketTimeoutWait = true;
+
+            assertFalse(firstSpi().pingNode(node.id()));
+            assertTrue(firstSpi().validTimeout);
+
+            firstSpi().writeToSocketTimeoutWait = false;
+
+            assertTrue(firstSpi().pingNode(node.id()));
+            assertTrue(firstSpi().validTimeout);
+        }
+        finally {
+            firstSpi().resetState();
+        }
+    }
+
+    /**
+     * @throws Exception In case of error.
+     */
+    public void testConnectionCheckMessage() throws Exception {
+        TestTcpDiscoverySpi nextSpi = null;
+
+        try {
+            assert firstSpi().connCheckStatusMsgCntSent == 0;
+
+            TcpDiscoveryNode nextNode = ((ServerImpl)(firstSpi().impl)).ring().nextNode();
+
+            assertNotNull(nextNode);
+
+            nextSpi = null;
+
+            for (int i = 1; i < spis.length; i++)
+                if (spis[i].getLocalNode().id().equals(nextNode.id())) {
+                    nextSpi = spis[i];
+                    break;
+                }
+
+            assertNotNull(nextSpi);
+
+            assert nextSpi.connCheckStatusMsgCntReceived == 0;
+
+            firstSpi().countConnCheckMsg = true;
+            nextSpi.countConnCheckMsg = true;
+
+            Thread.sleep(CONN_CHECK_FREQ * 5);
+
+            firstSpi().countConnCheckMsg = false;
+            nextSpi.countConnCheckMsg = false;
+
+            int sent = firstSpi().connCheckStatusMsgCntSent;
+            int received = nextSpi.connCheckStatusMsgCntReceived;
+
+            assert sent >= 3 && sent < 7 : "messages sent: " + sent;
+            assert received >= 3 && received < 7 : "messages received: " + received;
+        }
+        finally {
+            firstSpi().resetState();
+
+            if (nextSpi != null)
+                nextSpi.resetState();
+        }
+    }
+
+    /**
+     * @throws Exception In case of error.
+     */
+    public void testConnectionCheckMessageBackwardCompatibility() throws Exception {
+        TestTcpDiscoverySpi nextSpi = null;
+        TcpDiscoveryNode nextNode = null;
+
+        IgniteProductVersion nextNodeVer = null;
+
+        try {
+            assert firstSpi().connCheckStatusMsgCntSent == 0;
+
+            nextNode = ((ServerImpl)(firstSpi().impl)).ring().nextNode();
+
+            assertNotNull(nextNode);
 
-            firstSpi.writeToSocketTimeoutWait = true;
+            nextSpi = null;
 
-            assertFalse(firstSpi.pingNode(node.id()));
-            assertTrue(firstSpi.validTimeout);
+            for (int i = 1; i < spis.length; i++)
+                if (spis[i].getLocalNode().id().equals(nextNode.id())) {
+                    nextSpi = spis[i];
+                    break;
+                }
+
+            assertNotNull(nextSpi);
+
+            assert nextSpi.connCheckStatusMsgCntReceived == 0;
+
+            nextNodeVer = nextNode.version();
+
+            // Overriding the version of the next node. Connection check message must not been sent to it.
+            nextNode.version(new IgniteProductVersion(TcpDiscoverySpi.FAILURE_DETECTION_MAJOR_VER,
+                (byte)(TcpDiscoverySpi.FAILURE_DETECTION_MINOR_VER - 1), TcpDiscoverySpi.FAILURE_DETECTION_MAINT_VER,
+                0l, null));
+
+            firstSpi().countConnCheckMsg = true;
+            nextSpi.countConnCheckMsg = true;
 
-            firstSpi.writeToSocketTimeoutWait = false;
+            Thread.sleep(CONN_CHECK_FREQ * 5);
 
-            assertTrue(firstSpi.pingNode(node.id()));
-            assertTrue(firstSpi.validTimeout);
+            firstSpi().countConnCheckMsg = false;
+            nextSpi.countConnCheckMsg = false;
+
+            int sent = firstSpi().connCheckStatusMsgCntSent;
+            int received = nextSpi.connCheckStatusMsgCntReceived;
+
+            assert sent == 0 : "messages sent: " + sent;
+            assert received == 0 : "messages received: " + received;
         }
         finally {
-            firstSpi.resetState();
+            firstSpi().resetState();
+
+            if (nextSpi != null)
+                nextSpi.resetState();
+
+            if (nextNode != null && nextNodeVer != null)
+                nextNode.version(nextNodeVer);
         }
     }
 
     /**
+     * Returns the first spi with failure detection threshold enabled.
+     * 
+     * @return SPI.
+     */
+    private TestTcpDiscoverySpi firstSpi() {
+        return spis[0];
+    }
+
+
+    /**
+     * Returns the second spi with failure detection threshold enabled.
+     *
+     * @return SPI.
+     */
+    private TestTcpDiscoverySpi secondSpi() {
+        return spis[1];
+    }
+    
+    /**
      *
      */
     private static class TestTcpDiscoverySpi extends TcpDiscoverySpi {
         /** */
-        private boolean openSocketTimeout;
+        private volatile boolean openSocketTimeout;
+
+        /** */
+        private volatile boolean openSocketTimeoutWait;
+
+        /** */
+        private volatile boolean writeToSocketTimeoutWait;
+
+        /** */
+        private volatile boolean countConnCheckMsg;
 
         /** */
-        private boolean openSocketTimeoutWait;
+        private volatile int connCheckStatusMsgCntSent;
 
         /** */
-        private boolean writeToSocketTimeoutWait;
+        private volatile int connCheckStatusMsgCntReceived;
 
         /** */
-        private boolean validTimeout = true;
+        private volatile boolean validTimeout = true;
 
         /** */
-        private IgniteSpiOperationTimeoutException err;
+        private volatile IgniteSpiOperationTimeoutException err;
 
 
         /** {@inheritDoc} */
@@ -200,6 +379,24 @@ public class TcpDiscoverySpiFailureThresholdSelfTest extends AbstractDiscoverySe
                 super.writeToSocket(sock, msg, timeout);
         }
 
+        /** {@inheritDoc} */
+        @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg,
+            GridByteArrayOutputStream bout, long timeout) throws IOException, IgniteCheckedException {
+            if (countConnCheckMsg && msg instanceof TcpDiscoveryConnectionCheckMessage)
+                connCheckStatusMsgCntSent++;
+
+            super.writeToSocket(sock, msg, bout, timeout);
+        }
+
+        /** {@inheritDoc} */
+        protected void writeToSocket(TcpDiscoveryAbstractMessage msg, Socket sock, int res, long timeout)
+            throws IOException {
+            if (countConnCheckMsg && msg instanceof TcpDiscoveryConnectionCheckMessage)
+                connCheckStatusMsgCntReceived++;
+
+            super.writeToSocket(msg, sock, res, timeout);
+        }
+
         /**
          *
          */
@@ -209,6 +406,9 @@ public class TcpDiscoverySpiFailureThresholdSelfTest extends AbstractDiscoverySe
             writeToSocketTimeoutWait = false;
             err = null;
             validTimeout = true;
+            connCheckStatusMsgCntSent = 0;
+            connCheckStatusMsgCntReceived = 0;
+            countConnCheckMsg = false;
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/af624eb4/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
index 357fd93..a78ab25 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
@@ -55,6 +55,7 @@ public class IgniteSpiDiscoverySelfTestSuite extends TestSuite {
         suite.addTest(new TestSuite(TcpClientDiscoverySpiSelfTest.class));
         suite.addTest(new TestSuite(TcpClientDiscoveryMarshallerCheckSelfTest.class));
         suite.addTest(new TestSuite(TcpClientDiscoverySpiMulticastTest.class));
+        suite.addTest(new TestSuite(TcpClientDiscoverySpiFailureThresholdSelfTest.class));
 
         suite.addTest(new TestSuite(TcpDiscoveryNodeConsistentIdSelfTest.class));