You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/10/23 10:17:36 UTC

ignite git commit: ignite-6700

Repository: ignite
Updated Branches:
  refs/heads/ignite-6700 [created] a30dad5d1


ignite-6700


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a30dad5d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a30dad5d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a30dad5d

Branch: refs/heads/ignite-6700
Commit: a30dad5d104d74063fd4be8ba4e6bfe9da2b2242
Parents: 026254c
Author: sboikov <sb...@gridgain.com>
Authored: Mon Oct 23 13:16:57 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon Oct 23 13:16:57 2017 +0300

----------------------------------------------------------------------
 .../ignite/spi/discovery/tcp/ServerImpl.java    |  45 ++++++--
 .../spi/discovery/tcp/TcpDiscoverySpi.java      |  18 +++
 .../spi/discovery/tcp/TcpDiscoverySelfTest.java | 110 +++++++++++++++++++
 3 files changed, 163 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/a30dad5d/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 60f9d4e..7d5edbd 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -1958,8 +1958,36 @@ class ServerImpl extends TcpDiscoveryImpl {
      * @param msg Message.
      */
     private void processMessageFailedNodes(TcpDiscoveryAbstractMessage msg) {
-        if (msg.failedNodes() != null) {
-            for (UUID nodeId : msg.failedNodes()) {
+        Collection<UUID> msgFailedNodes = msg.failedNodes();
+
+        if (msgFailedNodes != null) {
+            UUID sndId = msg.senderNodeId();
+
+            if (sndId != null) {
+                if (ring.node(sndId) == null) {
+                    if (log.isDebugEnabled()) {
+                        log.debug("Ignore message failed nodes, sender node is not alive [nodeId=" + sndId +
+                            ", failedNodes=" + msgFailedNodes + ']');
+                    }
+
+                    return;
+                }
+
+                synchronized (mux) {
+                    for (TcpDiscoveryNode failedNode : failedNodes.keySet()) {
+                        if (failedNode.id().equals(sndId)) {
+                            if (log.isDebugEnabled()) {
+                                log.debug("Ignore message failed nodes, sender node is in fail list [nodeId=" + sndId +
+                                    ", failedNodes=" + msgFailedNodes + ']');
+                            }
+
+                            return;
+                        }
+                    }
+                }
+            }
+
+            for (UUID nodeId : msgFailedNodes) {
                 TcpDiscoveryNode failedNode = ring.node(nodeId);
 
                 if (failedNode != null) {
@@ -2816,9 +2844,6 @@ class ServerImpl extends TcpDiscoveryImpl {
                     log.trace("Next node remains the same [nextId=" + next.id() +
                         ", nextOrder=" + next.internalOrder() + ']');
 
-                // Flag that shows whether next node exists and accepts incoming connections.
-                boolean nextNodeExists = sock != null;
-
                 final boolean sameHost = U.sameMacs(locNode, next);
 
                 List<InetSocketAddress> locNodeAddrs = U.arrayList(locNode.socketAddresses());
@@ -2843,8 +2868,6 @@ class ServerImpl extends TcpDiscoveryImpl {
                             if (timeoutHelper == null)
                                 timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi, true);
 
-                            nextNodeExists = false;
-
                             boolean success = false;
 
                             boolean openSock = false;
@@ -2973,8 +2996,6 @@ class ServerImpl extends TcpDiscoveryImpl {
                                     sock = null;
                                 }
                                 else {
-                                    // Next node exists and accepts incoming messages.
-                                    nextNodeExists = true;
                                     // Resetting timeout control object to let the code below to use a new one
                                     // for the next bunch of operations.
                                     timeoutHelper = null;
@@ -3066,7 +3087,11 @@ class ServerImpl extends TcpDiscoveryImpl {
                                 if (latencyCheck && log.isInfoEnabled())
                                     log.info("Latency check message has been written to socket: " + msg.id());
 
-                                spi.writeToSocket(sock, out, msg, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
+                                spi.writeToSocket(newNextNode ? newNext : next,
+                                    sock,
+                                    out,
+                                    msg,
+                                    timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
 
                                 long tstamp0 = U.currentTimeMillis();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/a30dad5d/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index eb8ee30..922e787 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -1478,6 +1478,24 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi {
     }
 
     /**
+     * @param node Target node.
+     * @param sock Socket.
+     * @param out Stream to write to.
+     * @param msg Message.
+     * @param timeout Timeout.
+     * @throws IOException If IO failed or write timed out.
+     * @throws IgniteCheckedException If marshalling failed.
+     */
+    protected void writeToSocket(
+        ClusterNode node,
+        Socket sock,
+        OutputStream out,
+        TcpDiscoveryAbstractMessage msg,
+        long timeout) throws IOException, IgniteCheckedException {
+        writeToSocket(sock, out, msg, timeout);
+    }
+
+    /**
      * Writes message to the socket.
      *
      * @param sock Socket.

http://git-wip-us.apache.org/repos/asf/ignite/blob/a30dad5d/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
index bf48fcc..d6d484c 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
@@ -44,6 +44,7 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteIllegalStateException;
 import org.apache.ignite.Ignition;
+import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.events.DiscoveryEvent;
@@ -52,6 +53,7 @@ import org.apache.ignite.events.EventType;
 import org.apache.ignite.internal.GridComponent;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.IgniteKernal;
 import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
 import org.apache.ignite.internal.processors.continuous.StartRoutineAckDiscoveryMessage;
@@ -77,6 +79,7 @@ import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryStatistics;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
 import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
+import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryConnectionCheckMessage;
 import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCustomEventMessage;
 import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryMetricsUpdateMessage;
 import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddFinishedMessage;
@@ -2069,6 +2072,42 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
     }
 
     /**
+     * @throws Exception If failed.
+     */
+    public void testFailedNodeRestoreConnection() throws Exception {
+        try {
+            TestRestoreConnectedSpi.startTest = false;
+
+            for (int i = 1; i < 5; i++) {
+                nodeSpi.set(new TestRestoreConnectedSpi(3));
+
+                startGrid(i);
+            }
+
+            awaitPartitionMapExchange();
+
+            info("Start fail test");
+
+            TestRestoreConnectedSpi.startTest = true;
+
+            waitNodeStop(getTestIgniteInstanceName(3));
+
+            U.sleep(5000);
+
+            for (int i = 1; i < 5; i++) {
+                if (i != 3) {
+                    Ignite node = ignite(i);
+
+                    assertEquals(3, node.cluster().nodes().size());
+                }
+            }
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+
+    /**
      * @param nodeName Node name.
      * @throws Exception If failed.
      */
@@ -2171,6 +2210,77 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
     /**
      *
      */
+    private static class TestRestoreConnectedSpi extends TcpDiscoverySpi {
+        /** */
+        static volatile boolean startTest;
+
+        /** */
+        private long sleepEndTime;
+
+        /** */
+        private long errNodeOrder;
+
+        /** */
+        private ClusterNode errNext;
+
+        /**
+         * @param errNodeOrder
+         */
+        TestRestoreConnectedSpi(long errNodeOrder) {
+            this.errNodeOrder = errNodeOrder;
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void writeToSocket(ClusterNode node,
+            Socket sock,
+            OutputStream out,
+            TcpDiscoveryAbstractMessage msg,
+            long timeout) throws IOException, IgniteCheckedException {
+            if (startTest && !(msg instanceof TcpDiscoveryConnectionCheckMessage)) {
+                if (node.order() == errNodeOrder) {
+                    log.info("Fail write on message send [node=" + node.id() + ", msg=" + msg + ']');
+
+                    throw new SocketTimeoutException();
+                }
+                else if (locNode.order() == errNodeOrder) {
+                    if (sleepEndTime == 0) {
+                        errNext = node;
+
+                        sleepEndTime = System.currentTimeMillis() + 3000;
+                    }
+
+                    long sleepTime = sleepEndTime - System.currentTimeMillis();
+
+                    if (sleepTime > 0) {
+                        log.info("Start sleep on message send: " + msg);
+
+                        try {
+                            U.sleep(sleepTime);
+                        }
+                        catch (IgniteInterruptedCheckedException e) {
+                            log.error("Interrupted on socket write: " + e, e);
+
+                            throw new IOException(e);
+                        }
+
+                        log.info("Stop sleep on message send: " + msg);
+
+                        if (node.equals(errNext)) {
+                            log.info("Fail write after sleep [node=" + node.id() + ", msg=" + msg + ']');
+
+                            throw new SocketTimeoutException();
+                        }
+                    }
+                }
+            }
+
+            super.writeToSocket(node, sock, out, msg, timeout);
+        }
+    }
+
+    /**
+     *
+     */
     private static class TestDiscoveryDataDuplicateSpi extends TcpDiscoverySpi {
         /** */
         static volatile boolean fail;