You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/08/14 09:36:37 UTC

[1/2] incubator-ignite git commit: ignite-1229: interrupt ping queries if remote node leaves or fails

Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-1229 2159d79e2 -> 23dc6558e


ignite-1229: interrupt ping queries if remote node leaves or fails


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/4381bf7f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/4381bf7f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/4381bf7f

Branch: refs/heads/ignite-1229
Commit: 4381bf7fbbcf6faa906cc9ed465d3feb2a302224
Parents: 2159d79
Author: Denis Magda <dm...@gridgain.com>
Authored: Thu Aug 13 15:54:58 2015 +0300
Committer: Denis Magda <dm...@gridgain.com>
Committed: Thu Aug 13 15:54:58 2015 +0300

----------------------------------------------------------------------
 .../ignite/spi/discovery/tcp/ServerImpl.java    | 57 ++++++++++++++++++--
 .../spi/discovery/tcp/TcpDiscoverySpi.java      | 41 +++++++++++---
 2 files changed, 87 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4381bf7f/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 76144e3..1f0266e 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -114,7 +114,7 @@ class ServerImpl extends TcpDiscoveryImpl {
     protected TcpDiscoverySpiState spiState = DISCONNECTED;
 
     /** Map with proceeding ping requests. */
-    private final ConcurrentMap<InetSocketAddress, IgniteInternalFuture<IgniteBiTuple<UUID, Boolean>>> pingMap =
+    private final ConcurrentMap<InetSocketAddress, GridPingFutureAdapter<IgniteBiTuple<UUID, Boolean>>> pingMap =
         new ConcurrentHashMap8<>();
 
     /**
@@ -497,9 +497,9 @@ class ServerImpl extends TcpDiscoveryImpl {
             return F.t(getLocalNodeId(), clientPingRes);
         }
 
-        GridFutureAdapter<IgniteBiTuple<UUID, Boolean>> fut = new GridFutureAdapter<>();
+        GridPingFutureAdapter<IgniteBiTuple<UUID, Boolean>> fut = new GridPingFutureAdapter<>();
 
-        IgniteInternalFuture<IgniteBiTuple<UUID, Boolean>> oldFut = pingMap.putIfAbsent(addr, fut);
+        GridPingFutureAdapter<IgniteBiTuple<UUID, Boolean>> oldFut = pingMap.putIfAbsent(addr, fut);
 
         if (oldFut != null)
             return oldFut.get();
@@ -520,7 +520,11 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                         long tstamp = U.currentTimeMillis();
 
-                        sock = spi.openSocket(addr, timeoutHelper);
+                        sock = spi.createSocket();
+
+                        fut.sock = sock;
+
+                        sock = spi.openSocket(sock, addr, timeoutHelper);
 
                         openedSock = true;
 
@@ -597,6 +601,21 @@ class ServerImpl extends TcpDiscoveryImpl {
         }
     }
 
+    /**
+     * Interrupts all existed 'ping' request for the given node.
+     *
+     * @param node Node that may be pinged.
+     */
+    private void interruptPing(TcpDiscoveryNode node) {
+        for (InetSocketAddress addr : spi.getNodeAddresses(node)) {
+            GridPingFutureAdapter fut = pingMap.get(addr);
+
+            if (fut != null && fut.sock != null)
+                // Reference to the socket is not set to null. No need to assign it to a local variable.
+                U.closeQuiet(fut.sock);
+        }
+    }
+
     /** {@inheritDoc} */
     @Override public void disconnect() throws IgniteSpiException {
         spiStop0(true);
@@ -3366,6 +3385,8 @@ class ServerImpl extends TcpDiscoveryImpl {
             if (msg.verified() && !locNodeId.equals(leavingNodeId)) {
                 TcpDiscoveryNode leftNode = ring.removeNode(leavingNodeId);
 
+                interruptPing(leavingNode);
+
                 assert leftNode != null;
 
                 if (log.isDebugEnabled())
@@ -3533,6 +3554,8 @@ class ServerImpl extends TcpDiscoveryImpl {
             if (msg.verified()) {
                 node = ring.removeNode(nodeId);
 
+                interruptPing(node);
+
                 assert node != null;
 
                 long topVer;
@@ -5142,4 +5165,30 @@ class ServerImpl extends TcpDiscoveryImpl {
             spi.writeToSocket(sock, msg, bout, timeout);
         }
     }
+
+    /**
+     *
+     */
+    private static class GridPingFutureAdapter<R> extends GridFutureAdapter<R> {
+        /** Socket. */
+        private Socket sock;
+
+        /**
+         * Returns socket associated with this ping future.
+         *
+         * @return Socket or {@code null} if no socket associated.
+         */
+        public Socket sock() {
+            return sock;
+        }
+
+        /**
+         * Associates socket with this ping futer.
+         *
+         * @param sock Socket.
+         */
+        public void sock(Socket sock) {
+            this.sock = sock;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4381bf7f/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index 18a540c..65ab8fd 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -1167,18 +1167,49 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
      * @param timeoutHelper Timeout helper.
      * @return Opened socket.
      * @throws IOException If failed.
+     * @throws IgniteSpiOperationTimeoutException In case of timeout.
      */
     protected Socket openSocket(InetSocketAddress sockAddr, IgniteSpiOperationTimeoutHelper timeoutHelper)
         throws IOException, IgniteSpiOperationTimeoutException {
-        assert sockAddr != null;
+        return openSocket(createSocket(), sockAddr, timeoutHelper);
+    }
+
+    /**
+     * Connects to remote address sending {@code U.IGNITE_HEADER} when connection is established.
+     *
+     * @param sock Socket bound to a local host address.
+     * @param remAddr Remote address.
+     * @param timeoutHelper Timeout helper.
+     * @return Connected socket.
+     * @throws IOException If failed.
+     * @throws IgniteSpiOperationTimeoutException In case of timeout.
+     */
+    Socket openSocket(Socket sock, InetSocketAddress remAddr, IgniteSpiOperationTimeoutHelper timeoutHelper)
+        throws IOException, IgniteSpiOperationTimeoutException {
+
+        assert remAddr != null;
 
-        InetSocketAddress resolved = sockAddr.isUnresolved() ?
-            new InetSocketAddress(InetAddress.getByName(sockAddr.getHostName()), sockAddr.getPort()) : sockAddr;
+        InetSocketAddress resolved = remAddr.isUnresolved() ?
+            new InetSocketAddress(InetAddress.getByName(remAddr.getHostName()), remAddr.getPort()) : remAddr;
 
         InetAddress addr = resolved.getAddress();
 
         assert addr != null;
 
+        sock.connect(resolved, (int)timeoutHelper.nextTimeoutChunk(sockTimeout));
+
+        writeToSocket(sock, U.IGNITE_HEADER, timeoutHelper.nextTimeoutChunk(sockTimeout));
+
+        return sock;
+    }
+
+    /**
+     * Creates socket binding it to a local host address. This operation is not blocking.
+     *
+     * @return Created socket.
+     * @throws IOException If failed.
+     */
+    Socket createSocket() throws IOException {
         Socket sock;
 
         if (isSslEnabled())
@@ -1190,10 +1221,6 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
 
         sock.setTcpNoDelay(true);
 
-        sock.connect(resolved, (int)timeoutHelper.nextTimeoutChunk(sockTimeout));
-
-        writeToSocket(sock, U.IGNITE_HEADER, timeoutHelper.nextTimeoutChunk(sockTimeout));
-
         return sock;
     }
 


[2/2] incubator-ignite git commit: ignite-1229: added tests that check ping interruption

Posted by sb...@apache.org.
ignite-1229: added tests that check ping interruption


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/23dc6558
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/23dc6558
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/23dc6558

Branch: refs/heads/ignite-1229
Commit: 23dc6558e771382a244cd5d3ec7abe5ddbad1f61
Parents: 4381bf7
Author: Denis Magda <dm...@gridgain.com>
Authored: Fri Aug 14 10:35:57 2015 +0300
Committer: Denis Magda <dm...@gridgain.com>
Committed: Fri Aug 14 10:35:57 2015 +0300

----------------------------------------------------------------------
 .../spi/discovery/tcp/TcpDiscoverySpi.java      |   6 +-
 .../spi/discovery/tcp/TcpDiscoverySelfTest.java | 151 ++++++++++++++++++-
 .../TcpDiscoverySpiFailureTimeoutSelfTest.java  |   5 +-
 3 files changed, 156 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/23dc6558/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index 65ab8fd..2f3d410 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -1184,7 +1184,7 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
      * @throws IOException If failed.
      * @throws IgniteSpiOperationTimeoutException In case of timeout.
      */
-    Socket openSocket(Socket sock, InetSocketAddress remAddr, IgniteSpiOperationTimeoutHelper timeoutHelper)
+    protected Socket openSocket(Socket sock, InetSocketAddress remAddr, IgniteSpiOperationTimeoutHelper timeoutHelper)
         throws IOException, IgniteSpiOperationTimeoutException {
 
         assert remAddr != null;
@@ -1277,8 +1277,8 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
      * @throws IOException If IO failed or write timed out.
      * @throws IgniteCheckedException If marshalling failed.
      */
-    protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, long timeout)
-        throws IOException, IgniteCheckedException {
+    protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, long timeout) throws IOException,
+        IgniteCheckedException {
         writeToSocket(sock, msg, new GridByteArrayOutputStream(8 * 1024), timeout); // 8K.
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/23dc6558/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
index 9a44c24..e5118e3 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
@@ -70,7 +70,8 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
     @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(gridName);
 
-        TcpDiscoverySpi spi = new TcpDiscoverySpi();
+        TcpDiscoverySpi spi = gridName.contains("testPingInterruptedOnNodeFailedFailingNode") ?
+            new TestTcpDiscoverySpi() : new TcpDiscoverySpi();
 
         discoMap.put(gridName, spi);
 
@@ -128,6 +129,8 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
             if (U.isMacOs())
                 spi.setLocalAddress(F.first(U.allLocalIps()));
         }
+        else if (gridName.contains("testPingInterruptedOnNodeFailedPingingNode"))
+            cfg.setFailureDetectionTimeout(30_000);
 
         return cfg;
     }
@@ -339,6 +342,152 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
     /**
      * @throws Exception If any error occurs.
      */
+    public void testPingInterruptedOnNodeFailed() throws Exception {
+        try {
+            final Ignite pingingNode = startGrid("testPingInterruptedOnNodeFailedPingingNode");
+            final Ignite failedNode = startGrid("testPingInterruptedOnNodeFailedFailingNode");
+            startGrid("testPingInterruptedOnNodeFailedSimpleNode");
+
+            ((TestTcpDiscoverySpi)failedNode.configuration().getDiscoverySpi()).ignorePingResponse = true;
+
+            final CountDownLatch pingLatch = new CountDownLatch(1);
+
+            final CountDownLatch eventLatch = new CountDownLatch(1);
+
+            final AtomicBoolean pingRes = new AtomicBoolean(true);
+
+            final AtomicBoolean failRes = new AtomicBoolean(false);
+
+            long startTs = System.currentTimeMillis();
+
+            pingingNode.events().localListen(
+                new IgnitePredicate<Event>() {
+                    @Override public boolean apply(Event event) {
+                        if (((DiscoveryEvent)event).eventNode().id().equals(failedNode.cluster().localNode().id())) {
+                            failRes.set(true);
+                            eventLatch.countDown();
+                        }
+
+                        return true;
+                    }
+                },
+                EventType.EVT_NODE_FAILED);
+
+            IgniteInternalFuture<?> pingFut = multithreadedAsync(
+                new Callable<Object>() {
+                    @Override public Object call() throws Exception {
+                        pingLatch.countDown();
+
+                        pingRes.set(pingingNode.configuration().getDiscoverySpi().pingNode(
+                            failedNode.cluster().localNode().id()));
+
+                        return null;
+                    }
+                }, 1);
+
+            IgniteInternalFuture<?> failingFut = multithreadedAsync(
+                new Callable<Object>() {
+                    @Override public Object call() throws Exception {
+                        pingLatch.await();
+
+                        Thread.sleep(3000);
+
+                        ((TestTcpDiscoverySpi)failedNode.configuration().getDiscoverySpi()).simulateNodeFailure();
+
+                        return null;
+                    }
+                }, 1);
+
+            failingFut.get();
+            pingFut.get();
+
+            assertFalse(pingRes.get());
+
+            assertTrue(System.currentTimeMillis() - startTs <
+                pingingNode.configuration().getFailureDetectionTimeout() / 2);
+
+            assertTrue(eventLatch.await(7, TimeUnit.SECONDS));
+            assertTrue(failRes.get());
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+
+    /**
+     * @throws Exception If any error occurs.
+     */
+    public void testPingInterruptedOnNodeLeft() throws Exception {
+        try {
+            final Ignite pingingNode = startGrid("testPingInterruptedOnNodeFailedPingingNode");
+            final Ignite leftNode = startGrid("testPingInterruptedOnNodeFailedFailingNode");
+            startGrid("testPingInterruptedOnNodeFailedSimpleNode");
+
+            ((TestTcpDiscoverySpi)leftNode.configuration().getDiscoverySpi()).ignorePingResponse = true;
+
+            final CountDownLatch pingLatch = new CountDownLatch(1);
+
+            final AtomicBoolean pingRes = new AtomicBoolean(true);
+
+            long startTs = System.currentTimeMillis();
+
+            IgniteInternalFuture<?> pingFut = multithreadedAsync(
+                new Callable<Object>() {
+                    @Override public Object call() throws Exception {
+                        pingLatch.countDown();
+
+                        pingRes.set(pingingNode.configuration().getDiscoverySpi().pingNode(
+                            leftNode.cluster().localNode().id()));
+
+                        return null;
+                    }
+                }, 1);
+
+            IgniteInternalFuture<?> stoppingFut = multithreadedAsync(
+                new Callable<Object>() {
+                    @Override public Object call() throws Exception {
+                        pingLatch.await();
+
+                        Thread.sleep(3000);
+
+                        stopGrid("testPingInterruptedOnNodeFailedFailingNode");
+
+                        return null;
+                    }
+                }, 1);
+
+            stoppingFut.get();
+            pingFut.get();
+
+            assertFalse(pingRes.get());
+
+            assertTrue(System.currentTimeMillis() - startTs <
+                pingingNode.configuration().getFailureDetectionTimeout() / 2);
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+
+    /**
+     *
+     */
+    private static class TestTcpDiscoverySpi extends TcpDiscoverySpi {
+        /** */
+        private boolean ignorePingResponse;
+
+        protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, long timeout) throws IOException,
+            IgniteCheckedException {
+            if (msg instanceof TcpDiscoveryPingResponse && ignorePingResponse)
+                return;
+            else
+                super.writeToSocket(sock, msg, timeout);
+        }
+    }
+
+    /**
+     * @throws Exception If any error occurs.
+     */
     public void testNodeAdded() throws Exception {
         try {
             final Ignite g1 = startGrid(1);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/23dc6558/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureTimeoutSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureTimeoutSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureTimeoutSelfTest.java
index fbea187..630f2fd 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureTimeoutSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiFailureTimeoutSelfTest.java
@@ -305,7 +305,8 @@ public class TcpDiscoverySpiFailureTimeoutSelfTest extends AbstractDiscoverySelf
 
 
         /** {@inheritDoc} */
-        @Override protected Socket openSocket(InetSocketAddress sockAddr, IgniteSpiOperationTimeoutHelper timeoutHelper)
+        @Override protected Socket openSocket(Socket sock, InetSocketAddress sockAddr,
+            IgniteSpiOperationTimeoutHelper timeoutHelper)
             throws IOException, IgniteSpiOperationTimeoutException {
 
             if (openSocketTimeout) {
@@ -330,7 +331,7 @@ public class TcpDiscoverySpiFailureTimeoutSelfTest extends AbstractDiscoverySelf
                 }
             }
 
-            Socket sock = super.openSocket(sockAddr, timeoutHelper);
+            super.openSocket(sock, sockAddr, timeoutHelper);
 
             try {
                 Thread.sleep(1500);