You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/07/22 10:43:58 UTC

[2/4] incubator-ignite git commit: ignite-752: added new tests, fixed bugs

ignite-752: added new tests, fixed bugs


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/f2a2dcf6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/f2a2dcf6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/f2a2dcf6

Branch: refs/heads/ignite-752
Commit: f2a2dcf6baac19cef0cbb61592decd2135c6646e
Parents: 6d5e7d3
Author: Denis Magda <dm...@gridgain.com>
Authored: Wed Jul 22 10:48:56 2015 +0300
Committer: Denis Magda <dm...@gridgain.com>
Committed: Wed Jul 22 10:48:56 2015 +0300

----------------------------------------------------------------------
 .../ignite/spi/discovery/tcp/ServerImpl.java    |  19 ++-
 .../spi/discovery/tcp/TcpDiscoverySpi.java      |   3 +-
 ...entDiscoverySpiFailureThresholdSelfTest.java | 120 ++++++++++++++++++-
 .../tcp/TcpClientDiscoverySpiSelfTest.java      |  22 ++--
 4 files changed, 152 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f2a2dcf6/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 877d53c..56472aa 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -501,6 +501,8 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                 int reconCnt = 0;
 
+                boolean openedSock = false;
+
                 while (true) {
                     try {
                         if (addr.isUnresolved())
@@ -510,6 +512,8 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                         sock = spi.openSocket(addr, timeoutHelper);
 
+                        openedSock = true;
+
                         spi.writeToSocket(sock, new TcpDiscoveryPingRequest(locNodeId, clientNodeId),
                             timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
 
@@ -537,9 +541,14 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                         errs.add(e);
 
+                        reconCnt++;
+
+                        if (!openedSock && reconCnt == 2)
+                            break;
+
                         if (timeoutHelper.checkThresholdReached(e))
                             break;
-                        else if (!spi.failureDetectionThresholdEnabled() && ++reconCnt == spi.getReconnectCount())
+                        else if (!spi.failureDetectionThresholdEnabled() && reconCnt == spi.getReconnectCount())
                             break;
                     }
                     finally {
@@ -1846,6 +1855,9 @@ class ServerImpl extends TcpDiscoveryImpl {
 
         /** {@inheritDoc} */
         @Override protected void noMessageLoop() {
+            if (locNode == null)
+                return;
+
             checkConnection();
 
             sendHeartbeatMessage();
@@ -1919,11 +1931,12 @@ class ServerImpl extends TcpDiscoveryImpl {
                         if (debugMode)
                             debugLog("No next node in topology.");
 
-                        /*if (ring.hasRemoteNodes()) {
+                        if (ring.hasRemoteNodes() && !(msg instanceof TcpDiscoveryConnectionCheckMessage) &&
+                            !(msg instanceof TcpDiscoveryStatusCheckMessage && msg.creatorNodeId().equals(locNodeId))) {
                             msg.senderNodeId(locNodeId);
 
                             addMessage(msg);
-                        }*/
+                        }
 
                         break;
                     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f2a2dcf6/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index 588ff98..3821a0a 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -1369,7 +1369,8 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
      * @throws IOException If IO failed or read timed out.
      * @throws IgniteCheckedException If unmarshalling failed.
      */
-    protected <T> T readMessage(Socket sock, @Nullable InputStream in, long timeout) throws IOException, IgniteCheckedException {
+    protected <T> T readMessage(Socket sock, @Nullable InputStream in, long timeout) throws IOException,
+        IgniteCheckedException {
         assert sock != null;
 
         int oldTimeout = sock.getSoTimeout();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f2a2dcf6/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureThresholdSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureThresholdSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureThresholdSelfTest.java
index 8e80047..939286d 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureThresholdSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureThresholdSelfTest.java
@@ -17,7 +17,14 @@
 
 package org.apache.ignite.spi.discovery.tcp;
 
+import org.apache.ignite.*;
+import org.apache.ignite.configuration.*;
 import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.spi.discovery.tcp.messages.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.net.*;
 
 /**
  * Client-based discovery SPI test with failure detection threshold enabled.
@@ -26,6 +33,9 @@ public class TcpClientDiscoverySpiFailureThresholdSelfTest extends TcpClientDisc
     /** */
     private final static int FAILURE_AWAIT_TIME = 7_000;
 
+    /** */
+    private static boolean useTestSpi;
+
     /** {@inheritDoc} */
     @Override protected boolean useFailureDetectionThreshold() {
         return true;
@@ -33,7 +43,7 @@ public class TcpClientDiscoverySpiFailureThresholdSelfTest extends TcpClientDisc
 
     /** {@inheritDoc} */
     @Override protected long failureDetectionThreshold() {
-        return 10_000;
+        return useTestSpi ? 5000 : 10_000;
     }
 
     /** {@inheritDoc} */
@@ -41,6 +51,18 @@ public class TcpClientDiscoverySpiFailureThresholdSelfTest extends TcpClientDisc
         return failureDetectionThreshold() + FAILURE_AWAIT_TIME;
     }
 
+    /** {@inheritDoc} */
+    @Override protected TcpDiscoverySpi getDiscoverySpi() {
+        return useTestSpi ? new TestTcpDiscoverySpi() : super.getDiscoverySpi();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        return cfg;
+    }
+
     /**
      * @throws Exception in case of error.
      */
@@ -60,4 +82,100 @@ public class TcpClientDiscoverySpiFailureThresholdSelfTest extends TcpClientDisc
         assertEquals(failureDetectionThreshold(),
             ((TcpDiscoverySpi)(G.ignite("client-0").configuration().getDiscoverySpi())).failureDetectionThreshold());
     }
+
+    /**
+     * @throws Exception in case of error.
+     */
+    public void testFailureThresholdWorkability() throws Exception {
+        useTestSpi = true;
+
+        TestTcpDiscoverySpi firstSpi = null;
+        TestTcpDiscoverySpi secondSpi = null;
+
+        try {
+            startServerNodes(2);
+
+            checkNodes(2, 0);
+
+            firstSpi = (TestTcpDiscoverySpi)(G.ignite("server-0").configuration().getDiscoverySpi());
+            secondSpi = (TestTcpDiscoverySpi)(G.ignite("server-1").configuration().getDiscoverySpi());
+
+            assert firstSpi.err == null;
+
+            secondSpi.readDelay = failureDetectionThreshold() + 5000;
+
+            assertFalse(firstSpi.pingNode(secondSpi.getLocalNodeId()));
+
+            Thread.sleep(failureDetectionThreshold());
+
+            assertTrue(firstSpi.err != null && X.hasCause(firstSpi.err, SocketTimeoutException.class));
+
+            firstSpi.reset();
+            secondSpi.reset();
+
+            assertTrue(firstSpi.pingNode(secondSpi.getLocalNodeId()));
+
+            assertTrue(firstSpi.err == null);
+        }
+        finally {
+            useTestSpi = false;
+
+            if (firstSpi != null)
+                firstSpi.reset();
+
+            if (secondSpi != null)
+                secondSpi.reset();
+        }
+    }
+
+    /**
+     *
+     */
+    private static class TestTcpDiscoverySpi extends TcpDiscoverySpi {
+        /** */
+        private long readDelay;
+
+        /** */
+        private Exception err;
+
+        /** {@inheritDoc} */
+        @Override protected <T> T readMessage(Socket sock, @Nullable InputStream in, long timeout)
+            throws IOException, IgniteCheckedException {
+
+            if (readDelay < failureDetectionThreshold()) {
+                try {
+                    T msg = super.readMessage(sock, in, timeout);
+
+                    return msg;
+                }
+                catch (Exception e) {
+                    err = e;
+
+                    throw e;
+                }
+            }
+            else {
+                T msg = super.readMessage(sock, in, timeout);
+
+                if (msg instanceof TcpDiscoveryPingRequest) {
+                    try {
+                        Thread.sleep(2000);
+                    } catch (InterruptedException e) {
+                        // Ignore
+                    }
+                    throw new SocketTimeoutException("Forced timeout");
+                }
+
+                return msg;
+            }
+        }
+
+        /**
+         * Resets testing state.
+         */
+        private void reset() {
+            readDelay = 0;
+            err = null;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f2a2dcf6/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
index 9df7bd9..a67b5cf 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
@@ -118,7 +118,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
     @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(gridName);
 
-        TestTcpDiscoverySpi disco = new TestTcpDiscoverySpi();
+        TcpDiscoverySpi disco = getDiscoverySpi();
 
         disco.setMaxMissedClientHeartbeats(maxMissedClientHbs);
 
@@ -167,7 +167,8 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
 
         disco.setClientReconnectDisabled(reconnectDisabled);
 
-        disco.afterWrite(afterWrite);
+        if (disco instanceof TestTcpDiscoverySpi)
+            ((TestTcpDiscoverySpi)disco).afterWrite(afterWrite);
 
         cfg.setDiscoverySpi(disco);
 
@@ -177,6 +178,14 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
         return cfg;
     }
 
+    /**
+     * Returns TCP Discovery SPI instance to use in a test.
+     * @return TCP Discovery SPI.
+     */
+    protected TcpDiscoverySpi getDiscoverySpi() {
+        return new TestTcpDiscoverySpi();
+    }
+
     /** {@inheritDoc} */
     @Override protected void beforeTest() throws Exception {
         Collection<InetSocketAddress> addrs = IP_FINDER.getRegisteredAddresses();
@@ -411,12 +420,12 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
 
         final CountDownLatch latch = new CountDownLatch(1);
 
-        ((TcpDiscoverySpi)srv1.configuration().getDiscoverySpi()).addIncomeConnectionListener(new IgniteInClosure<Socket>() {
+        ((TcpDiscoverySpi)srv1.configuration().getDiscoverySpi()).addIncomeConnectionListener(new IgniteInClosure
+            <Socket>() {
             @Override public void apply(Socket sock) {
                 try {
                     latch.await();
-                }
-                catch (InterruptedException e) {
+                } catch (InterruptedException e) {
                     throw new RuntimeException(e);
                 }
             }
@@ -778,8 +787,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
             @Override public void apply(TcpDiscoveryAbstractMessage msg) {
                 try {
                     Thread.sleep(1000000);
-                }
-                catch (InterruptedException ignored) {
+                } catch (InterruptedException ignored) {
                     Thread.interrupted();
                 }
             }