You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/07/19 12:33:22 UTC

[8/9] incubator-ignite git commit: ignite-752: tests for tcp communication spi

ignite-752: tests for tcp communication spi


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/123efaff
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/123efaff
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/123efaff

Branch: refs/heads/ignite-752
Commit: 123efafffd7fda44cff2d32fda0a43954b7f07f0
Parents: af624eb
Author: Denis Magda <dm...@gridgain.com>
Authored: Sun Jul 19 13:30:13 2015 +0300
Committer: Denis Magda <dm...@gridgain.com>
Committed: Sun Jul 19 13:30:13 2015 +0300

----------------------------------------------------------------------
 .../communication/tcp/TcpCommunicationSpi.java  | 17 ++++-
 .../GridTcpCommunicationSpiAbstractTest.java    |  2 +-
 ...tionSpiRecoveryFailureDetectionSelfTest.java | 54 ++++++++++++++
 ...GridTcpCommunicationSpiRecoverySelfTest.java | 23 ++++--
 ...unicationSpiTcpFailureDetectionSelfTest.java | 78 ++++++++++++++++++++
 ...entDiscoverySpiFailureThresholdSelfTest.java |  8 +-
 .../IgniteSpiCommunicationSelfTestSuite.java    |  3 +
 7 files changed, 169 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/123efaff/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index be75ab2..96c8770 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -236,12 +236,18 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
         new GridNioServerListenerAdapter<Message>() {
             @Override public void onSessionWriteTimeout(GridNioSession ses) {
                 LT.warn(log, null, "Communication SPI Session write timed out (consider increasing " +
+                    (!failureDetectionThresholdEnabled() ?
                     "'socketWriteTimeout' " + "configuration property) [remoteAddr=" + ses.remoteAddress() +
-                    ", writeTimeout=" + sockWriteTimeout + ']');
+                    ", writeTimeout=" + sockWriteTimeout + ']' :
+                    "'failureDetectionThreshold' " + "configuration property) [remoteAddr=" + ses.remoteAddress() +
+                    ", failureDetectionThreshold=" + failureDetectionThreshold()) + ']'
+                    );
 
                 if (log.isDebugEnabled())
                     log.debug("Closing communication SPI session on write timeout [remoteAddr=" + ses.remoteAddress() +
-                        ", writeTimeout=" + sockWriteTimeout + ']');
+                        (!failureDetectionThresholdEnabled() ?
+                        ", writeTimeout=" + sockWriteTimeout :
+                        ", failureDetectionThreshold=" + failureDetectionThreshold()) + ']');
 
                 ses.close();
             }
@@ -916,6 +922,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     @IgniteSpiConfiguration(optional = true)
     public void setSocketWriteTimeout(long sockWriteTimeout) {
         this.sockWriteTimeout = sockWriteTimeout;
+
+        failureDetectionThresholdEnabled(false);
     }
 
     /** {@inheritDoc} */
@@ -1286,9 +1294,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
             assertParameter(reconCnt > 0, "reconnectCnt > 0");
             assertParameter(connTimeout >= 0, "connTimeout >= 0");
             assertParameter(maxConnTimeout >= connTimeout, "maxConnTimeout >= connTimeout");
+            assertParameter(sockWriteTimeout >= 0, "sockWriteTimeout >= 0");
         }
 
-        assertParameter(sockWriteTimeout >= 0, "sockWriteTimeout >= 0");
         assertParameter(ackSndThreshold > 0, "ackSndThreshold > 0");
         assertParameter(unackedMsgsBufSize >= 0, "unackedMsgsBufSize >= 0");
 
@@ -1526,7 +1534,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                         .sendQueueLimit(msgQueueLimit)
                         .directMode(true)
                         .metricsListener(metricsLsnr)
-                        .writeTimeout(sockWriteTimeout)
+                        .writeTimeout(failureDetectionThresholdEnabled() ? failureDetectionThreshold() :
+                            sockWriteTimeout)
                         .filters(new GridNioCodecFilter(parser, log, true),
                             new GridConnectionBytesVerifyFilter(log))
                         .messageFormatter(msgFormatter)

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/123efaff/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java
index eee38a5..538ead5 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java
@@ -79,7 +79,7 @@ abstract class GridTcpCommunicationSpiAbstractTest extends GridAbstractCommunica
         for (CommunicationSpi spi : spis.values()) {
             ConcurrentMap<UUID, GridCommunicationClient> clients = U.field(spi, "clients");
 
-            assertEquals(2, clients.size());
+            assertEquals(getSpiCount() - 1, clients.size());
 
             clients.put(UUID.randomUUID(), F.first(clients.values()));
         }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/123efaff/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryFailureDetectionSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryFailureDetectionSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryFailureDetectionSelfTest.java
new file mode 100644
index 0000000..7d10316
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryFailureDetectionSelfTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.communication.tcp;
+
+import org.apache.ignite.configuration.*;
+
+/**
+ *
+ */
+public class GridTcpCommunicationSpiRecoveryFailureDetectionSelfTest extends GridTcpCommunicationSpiRecoverySelfTest {
+    /** {@inheritDoc} */
+    @Override protected TcpCommunicationSpi getSpi(int idx) {
+        TcpCommunicationSpi spi = new TcpCommunicationSpi();
+
+        spi.setSharedMemoryPort(-1);
+        spi.setLocalPort(port++);
+        spi.setIdleConnectionTimeout(10_000);
+        spi.setAckSendThreshold(5);
+        spi.setSocketSendBuffer(512);
+        spi.setSocketReceiveBuffer(512);
+
+        return spi;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected long awaitForSocketWriteTimeout() {
+        return IgniteConfiguration.DFLT_FAILURE_DETECTION_THRESHOLD + 5_000;
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    public void testFailureDetectionEnabled() throws Exception {
+        for (TcpCommunicationSpi spi: spis) {
+            assertTrue(spi.failureDetectionThresholdEnabled());
+            assertTrue(spi.failureDetectionThreshold() == IgniteConfiguration.DFLT_FAILURE_DETECTION_THRESHOLD);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/123efaff/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java
index 5d3afd9..67d42d3 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java
@@ -60,7 +60,7 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi>
     private static final int ITERS = 10;
 
     /** */
-    private static int port = 30_000;
+    protected static int port = 30_000;
 
     /**
      *
@@ -163,6 +163,15 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi>
     }
 
     /**
+     * Time to wait for socket write timeout.
+     *
+     * @return Timeout.
+     */
+    protected long awaitForSocketWriteTimeout() {
+        return 5000;
+    }
+
+    /**
      * @throws Exception If failed.
      */
     public void testBlockListener() throws Exception {
@@ -245,7 +254,7 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi>
             @Override public boolean apply() {
                 return lsnr0.rcvCnt.get() >= expMsgs && lsnr1.rcvCnt.get() >= expMsgs;
             }
-        }, 5000);
+        }, awaitForSocketWriteTimeout());
 
         assertEquals(expMsgs, lsnr0.rcvCnt.get());
         assertEquals(expMsgs, lsnr1.rcvCnt.get());
@@ -301,7 +310,7 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi>
                         @Override public boolean apply() {
                             return ses0.closeTime() != 0;
                         }
-                    }, 5000);
+                    }, awaitForSocketWriteTimeout());
 
                     assertTrue("Failed to wait for session close", ses0.closeTime() != 0);
 
@@ -411,7 +420,7 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi>
                         @Override public boolean apply() {
                             return ses0.closeTime() != 0;
                         }
-                    }, 5000);
+                    }, awaitForSocketWriteTimeout());
 
                     assertTrue("Failed to wait for session close", ses0.closeTime() != 0);
 
@@ -423,7 +432,7 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi>
                         public boolean apply() {
                             return ses1.closeTime() != 0;
                         }
-                    }, 5000);
+                    }, awaitForSocketWriteTimeout());
 
                     assertTrue("Failed to wait for session close", ses1.closeTime() != 0);
 
@@ -528,7 +537,7 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi>
                         @Override public boolean apply() {
                             return ses0.closeTime() != 0;
                         }
-                    }, 5000);
+                    }, awaitForSocketWriteTimeout());
 
                     assertTrue("Failed to wait for session close", ses0.closeTime() != 0);
 
@@ -592,7 +601,7 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi>
 
                 return !sessions.isEmpty();
             }
-        }, 5000);
+        }, awaitForSocketWriteTimeout());
 
         Collection<? extends GridNioSession> sessions = GridTestUtils.getFieldValue(srv, "sessions");
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/123efaff/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiTcpFailureDetectionSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiTcpFailureDetectionSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiTcpFailureDetectionSelfTest.java
new file mode 100644
index 0000000..8b85227
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiTcpFailureDetectionSelfTest.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.communication.tcp;
+
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.spi.communication.*;
+
+/**
+ *
+ */
+public class GridTcpCommunicationSpiTcpFailureDetectionSelfTest extends GridTcpCommunicationSpiTcpSelfTest {
+    /** */
+    private final static int SPI_COUNT = 5;
+
+    private TcpCommunicationSpi spis[] = new TcpCommunicationSpi[SPI_COUNT];
+
+    /** {@inheritDoc} */
+    @Override protected int getSpiCount() {
+        return SPI_COUNT;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected CommunicationSpi getSpi(int idx) {
+        TcpCommunicationSpi spi = (TcpCommunicationSpi)super.getSpi(idx);
+
+        switch (idx) {
+            case 0:
+                // Ignore
+                break;
+            case 1:
+                spi.setConnectTimeout(4000);
+                break;
+            case 2:
+                spi.setMaxConnectTimeout(TcpCommunicationSpi.DFLT_MAX_CONN_TIMEOUT);
+                break;
+            case 3:
+                spi.setReconnectCount(2);
+                break;
+            case 4:
+                spi.setSocketWriteTimeout(5000);
+                break;
+            default:
+                assert false;
+        }
+
+        spis[idx] = spi;
+
+        return spi;
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    public void testFailureDetectionEnabled() throws Exception {
+        assertTrue(spis[0].failureDetectionThresholdEnabled());
+        assertTrue(spis[0].failureDetectionThreshold() == IgniteConfiguration.DFLT_FAILURE_DETECTION_THRESHOLD);
+
+        for (int i = 1; i < SPI_COUNT; i++) {
+            assertFalse(spis[i].failureDetectionThresholdEnabled());
+            assertEquals(0, spis[i].failureDetectionThreshold());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/123efaff/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureThresholdSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureThresholdSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureThresholdSelfTest.java
index 202b328..8145fd1 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureThresholdSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureThresholdSelfTest.java
@@ -41,7 +41,7 @@ public class TcpClientDiscoverySpiFailureThresholdSelfTest extends TcpClientDisc
     }
 
     /** {@inheritDoc} */
-    protected void await(CountDownLatch latch) throws InterruptedException {
+    @Override protected void await(CountDownLatch latch) throws InterruptedException {
         assertTrue("Latch count: " + latch.getCount(), latch.await(failureDetectionThreshold() +
             FAILURE_AWAIT_TIME, MILLISECONDS));
     }
@@ -67,17 +67,17 @@ public class TcpClientDiscoverySpiFailureThresholdSelfTest extends TcpClientDisc
     }
 
     /** {@inheritDoc} */
-    public void testReconnectSegmentedAfterJoinTimeoutServerFailed() throws Exception {
+    @Override public void testReconnectSegmentedAfterJoinTimeoutServerFailed() throws Exception {
         reconnectSegmentedAfterJoinTimeout(true, failureDetectionThreshold() + FAILURE_AWAIT_TIME);
     }
 
     /** {@inheritDoc} */
-    public void testReconnectSegmentedAfterJoinTimeoutNetworkError() throws Exception {
+    @Override public void testReconnectSegmentedAfterJoinTimeoutNetworkError() throws Exception {
         reconnectSegmentedAfterJoinTimeout(false, failureDetectionThreshold() + FAILURE_AWAIT_TIME);
     }
 
     /** {@inheritDoc} */
-    public void testDisconnectAfterNetworkTimeout() throws Exception {
+    @Override public void testDisconnectAfterNetworkTimeout() throws Exception {
         testDisconnectAfterNetworkTimeout(failureDetectionThreshold() + FAILURE_AWAIT_TIME);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/123efaff/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java
index ff86bda..3f71d7d 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java
@@ -45,6 +45,9 @@ public class IgniteSpiCommunicationSelfTestSuite extends TestSuite {
         suite.addTest(new TestSuite(GridTcpCommunicationSpiMultithreadedSelfTest.class));
         suite.addTest(new TestSuite(GridTcpCommunicationSpiMultithreadedShmemTest.class));
 
+        suite.addTest(new TestSuite(GridTcpCommunicationSpiRecoveryFailureDetectionSelfTest.class));
+        suite.addTest(new TestSuite(GridTcpCommunicationSpiTcpFailureDetectionSelfTest.class));
+
         suite.addTest(new TestSuite(GridTcpCommunicationSpiConfigSelfTest.class));
 
         return suite;