You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2017/08/18 16:17:04 UTC

ignite git commit: IGNITE-5943 Communication. Server node may reject client connection during massive clients join. This closes #2423

Repository: ignite
Updated Branches:
  refs/heads/master 45708b972 -> 153931534


IGNITE-5943 Communication. Server node may reject client connection during massive clients join. This closes #2423


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/15393153
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/15393153
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/15393153

Branch: refs/heads/master
Commit: 1539315342d09150a4af942fce2f8147e390f85e
Parents: 45708b9
Author: EdShangGG <es...@gridgain.com>
Authored: Fri Aug 18 18:52:59 2017 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Fri Aug 18 19:13:58 2017 +0300

----------------------------------------------------------------------
 .../dht/atomic/GridDhtAtomicCache.java          |  32 ++--
 .../communication/tcp/TcpCommunicationSpi.java  |  60 ++++++-
 .../ignite/spi/discovery/tcp/ServerImpl.java    |  16 ++
 .../spi/discovery/tcp/TcpDiscoverySpi.java      |  10 ++
 .../discovery/tcp/IgniteClientConnectTest.java  | 163 +++++++++++++++++++
 .../IgniteSpiDiscoverySelfTestSuite.java        |   6 +
 6 files changed, 268 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/15393153/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index be4aace..0b7a243 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -609,21 +609,23 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
         final boolean skipStore = opCtx != null && opCtx.skipStore();
 
-        if (asyncOp) {return asyncOp(new CO<IgniteInternalFuture<Map<K, V>>>() {
-            @Override public IgniteInternalFuture<Map<K, V>> apply() {
-                return getAllAsync0(ctx.cacheKeysView(keys),
-                    forcePrimary,
-                    subjId0,
-                    taskName,
-                    deserializeBinary,
-                    recovery,
-                    expiryPlc,
-                    skipVals,
-                    skipStore,
-                    canRemap,
-                    needVer);
-            }
-        });}
+        if (asyncOp) {
+            return asyncOp(new CO<IgniteInternalFuture<Map<K, V>>>() {
+                @Override public IgniteInternalFuture<Map<K, V>> apply() {
+                    return getAllAsync0(ctx.cacheKeysView(keys),
+                        forcePrimary,
+                        subjId0,
+                        taskName,
+                        deserializeBinary,
+                        recovery,
+                        expiryPlc,
+                        skipVals,
+                        skipStore,
+                        canRemap,
+                        needVer);
+                }
+            });
+        }
         else {
             return getAllAsync0(ctx.cacheKeysView(keys),
                 forcePrimary,

http://git-wip-us.apache.org/repos/asf/ignite/blob/15393153/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 1b00b5d..bab9cfa 100755
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -109,6 +109,7 @@ import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.lang.IgniteProductVersion;
 import org.apache.ignite.lang.IgniteRunnable;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.plugin.extensions.communication.Message;
@@ -132,6 +133,8 @@ import org.apache.ignite.spi.IgniteSpiThread;
 import org.apache.ignite.spi.IgniteSpiTimeoutObject;
 import org.apache.ignite.spi.communication.CommunicationListener;
 import org.apache.ignite.spi.communication.CommunicationSpi;
+import org.apache.ignite.spi.discovery.DiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.thread.IgniteThread;
 import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentLinkedDeque8;
@@ -141,6 +144,7 @@ import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
 import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
 import static org.apache.ignite.internal.util.nio.GridNioSessionMetaKey.SSL_META;
 import static org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.RecoveryLastReceivedMessage.ALREADY_CONNECTED;
+import static org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.RecoveryLastReceivedMessage.NEED_WAIT;
 import static org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.RecoveryLastReceivedMessage.NODE_STOPPING;
 
 /**
@@ -296,6 +300,12 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
      */
     public static final int DFLT_SELECTORS_CNT = Math.max(4, Runtime.getRuntime().availableProcessors() / 2);
 
+    /**
+     * Version when client is ready to wait to connect to server (could be needed when client tries to open connection
+     * before it starts being visible for server)
+     */
+    private static final IgniteProductVersion VERSION_SINCE_CLIENT_COULD_WAIT_TO_CONNECT = IgniteProductVersion.fromString("2.1.4");
+
     /** Connection index meta for session. */
     private static final int CONN_IDX_META = GridNioSessionMetaKey.nextUniqueKey();
 
@@ -442,7 +452,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
              * @param ses Session.
              * @param msg Message.
              */
-            private void onFirstMessage(GridNioSession ses, Message msg) {
+            private void onFirstMessage(final GridNioSession ses, Message msg) {
                 UUID sndId;
 
                 ConnectionKey connKey;
@@ -466,10 +476,35 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
                 final ClusterNode rmtNode = getSpiContext().node(sndId);
 
                 if (rmtNode == null) {
-                    U.warn(log, "Close incoming connection, unknown node [nodeId=" + sndId +
-                        ", ses=" + ses + ']');
+                    DiscoverySpi discoverySpi = ignite().configuration().getDiscoverySpi();
+
+                    assert discoverySpi instanceof TcpDiscoverySpi;
+
+                    TcpDiscoverySpi tcpDiscoverySpi = (TcpDiscoverySpi) discoverySpi;
+
+                    ClusterNode node0 = tcpDiscoverySpi.getNode0(sndId);
+
+                    boolean unknownNode = true;
 
-                    ses.close();
+                    if (node0 != null) {
+                        assert node0.isClient() : node0;
+
+                        if (node0.version().compareTo(VERSION_SINCE_CLIENT_COULD_WAIT_TO_CONNECT) >= 0)
+                            unknownNode = false;
+                    }
+
+                    if (unknownNode) {
+                        U.warn(log, "Close incoming connection, unknown node [nodeId=" + sndId + ", ses=" + ses + ']');
+
+                        ses.close();
+                    }
+                    else {
+                        ses.send(new RecoveryLastReceivedMessage(NEED_WAIT)).listen(new CI1<IgniteInternalFuture<?>>() {
+                            @Override public void apply(IgniteInternalFuture<?> fut) {
+                                ses.close();
+                            }
+                        });
+                    }
 
                     return;
                 }
@@ -3031,6 +3066,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
             IgniteSpiOperationTimeoutHelper timeoutHelper = new IgniteSpiOperationTimeoutHelper(this,
                 !node.isClient());
 
+            int lastWaitingTimeout = 1;
+
             while (!conn) { // Reconnection on handshake timeout.
                 try {
                     SocketChannel ch = SocketChannel.open();
@@ -3101,6 +3138,18 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
 
                             throw new ClusterTopologyCheckedException("Remote node started stop procedure: " + node.id());
                         }
+                        else if (rcvCnt == NEED_WAIT) {
+                            recoveryDesc.release();
+
+                            U.closeQuiet(ch);
+
+                            if (lastWaitingTimeout < 60000)
+                                lastWaitingTimeout *= 2;
+
+                            U.sleep(lastWaitingTimeout);
+
+                            continue;
+                        }
                     }
                     finally {
                         if (recoveryDesc != null && rcvCnt == null)
@@ -4559,6 +4608,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
         /** */
         static final long NODE_STOPPING = -2;
 
+        /** Need wait. */
+        static final long NEED_WAIT = -3;
+
         /** Message body size in bytes. */
         private static final int MESSAGE_SIZE = 8;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/15393153/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index a6de34b..ca7dd4d 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -1805,6 +1805,22 @@ class ServerImpl extends TcpDiscoveryImpl {
     }
 
     /**
+     * Trying get node in any state (visible or not)
+     * @param nodeId Node id.
+     */
+    ClusterNode getNode0(UUID nodeId) {
+        assert nodeId != null;
+
+        UUID locNodeId0 = getLocalNodeId();
+
+        if (locNodeId0 != null && locNodeId0.equals(nodeId))
+            // Return local node directly.
+            return locNode;
+
+        return ring.node(nodeId);
+    }
+
+    /**
      * Thread that cleans IP finder and keeps it in the correct state, unregistering
      * addresses of the nodes that has left the topology.
      * <p>

http://git-wip-us.apache.org/repos/asf/ignite/blob/15393153/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index c988d7e..e6eaa8e 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -433,6 +433,16 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi {
         return impl.getNode(nodeId);
     }
 
+    /**
+     * @param id Id.
+     */
+    public ClusterNode getNode0(UUID id) {
+        if (impl instanceof ServerImpl)
+            return ((ServerImpl)impl).getNode0(id);
+
+        return getNode(id);
+    }
+
     /** {@inheritDoc} */
     @Override public boolean pingNode(UUID nodeId) {
         return impl.pingNode(nodeId);

http://git-wip-us.apache.org/repos/asf/ignite/blob/15393153/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/IgniteClientConnectTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/IgniteClientConnectTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/IgniteClientConnectTest.java
new file mode 100644
index 0000000..1a89987
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/IgniteClientConnectTest.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.tcp;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.util.*;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
+import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddFinishedMessage;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+
+/**
+ * We emulate that client receive message about joining to topology earlier than some server nodes in topology.
+ * And make this client connect to such servers.
+ * To emulate this we connect client to second node in topology and pause sending message about joining finishing to
+ * third node.
+ */
+public class IgniteClientConnectTest extends GridCommonAbstractTest {
+    /** */
+    private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** Latch to stop message sending. */
+    private final CountDownLatch latch = new CountDownLatch(1);
+
+    /** Start client flag. */
+    private final AtomicBoolean clientJustStarted = new AtomicBoolean(false);
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        TestTcpDiscoverySpi disco = new TestTcpDiscoverySpi();
+
+        if (igniteInstanceName.equals("client")) {
+            TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder();
+
+            ipFinder.registerAddresses(Collections.singleton(new InetSocketAddress(InetAddress.getLoopbackAddress(), 47501)));
+
+            disco.setIpFinder(ipFinder);
+        }
+        else
+            disco.setIpFinder(ipFinder);
+
+        disco.setJoinTimeout(2 * 60_000);
+        disco.setSocketTimeout(1000);
+        disco.setNetworkTimeout(2000);
+
+        cfg.setDiscoverySpi(disco);
+
+        CacheConfiguration cacheConfiguration = new CacheConfiguration()
+                .setName(DEFAULT_CACHE_NAME)
+                .setCacheMode(CacheMode.PARTITIONED)
+                .setAffinity(new RendezvousAffinityFunction(false, 8))
+                .setBackups(0);
+
+        cfg.setCacheConfiguration(cacheConfiguration);
+
+        return cfg;
+    }
+
+    /**
+     *
+     * @throws Exception If failed.
+     */
+    public void testClientConnectToBigTopology() throws Exception {
+        Ignite ignite = startGrids(3);
+
+        IgniteCache<Object, Object> cache = ignite.cache(DEFAULT_CACHE_NAME);
+
+        Set<Integer> keys = new HashSet<>();
+
+        for (int i = 0; i < 80; i++) {
+            cache.put(i, i);
+
+            keys.add(i);
+        }
+
+        TcpDiscoveryImpl discovery = ((TestTcpDiscoverySpi) ignite.configuration().getDiscoverySpi()).discovery();
+
+        assertTrue(discovery instanceof ServerImpl);
+
+        IgniteConfiguration clientCfg = getConfiguration("client");
+
+        clientCfg.setClientMode(true);
+
+        clientJustStarted.set(true);
+
+        IgniteEx client = startGrid(clientCfg);
+
+        latch.countDown();
+
+        System.err.println("GET ALL");
+        client.cache(DEFAULT_CACHE_NAME).getAll(keys);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+    }
+
+    /**
+     *
+     */
+    class TestTcpDiscoverySpi extends TcpDiscoverySpi {
+        /** {@inheritDoc} */
+        protected void writeToSocket(Socket sock, OutputStream out, TcpDiscoveryAbstractMessage msg, long timeout) throws IOException,
+                IgniteCheckedException {
+            if (msg instanceof TcpDiscoveryNodeAddFinishedMessage) {
+                if (msg.senderNodeId() != null && clientJustStarted.get())
+                    try {
+                        latch.await();
+
+                        Thread.sleep(3000);
+                    } catch (InterruptedException e) {
+                        e.printStackTrace();
+                    }
+
+                super.writeToSocket(sock, out, msg, timeout);
+            }
+            else
+                super.writeToSocket(sock, out, msg, timeout);
+        }
+
+        /**
+         *
+         */
+        TcpDiscoveryImpl discovery() {
+            return impl;
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/15393153/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
index 1287149..c506ca7 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiDiscoverySelfTestSuite.java
@@ -20,6 +20,8 @@ package org.apache.ignite.testsuites;
 import junit.framework.TestSuite;
 import org.apache.ignite.spi.GridTcpSpiForwardingSelfTest;
 import org.apache.ignite.spi.discovery.AuthenticationRestartTest;
+import org.apache.ignite.spi.discovery.tcp.IgniteClientConnectTest;
+import org.apache.ignite.spi.discovery.tcp.IgniteClientReconnectMassiveShutdownTest;
 import org.apache.ignite.spi.discovery.tcp.TcpClientDiscoveryMarshallerCheckSelfTest;
 import org.apache.ignite.spi.discovery.tcp.TcpClientDiscoverySpiFailureTimeoutSelfTest;
 import org.apache.ignite.spi.discovery.tcp.TcpClientDiscoverySpiMulticastTest;
@@ -90,6 +92,10 @@ public class IgniteSpiDiscoverySelfTestSuite extends TestSuite {
         suite.addTest(new TestSuite(TcpDiscoveryNodeAttributesUpdateOnReconnectTest.class));
         suite.addTest(new TestSuite(AuthenticationRestartTest.class));
 
+        //Client connect
+        suite.addTest(new TestSuite(IgniteClientConnectTest.class));
+        suite.addTest(new TestSuite(IgniteClientReconnectMassiveShutdownTest.class));
+
         // SSL.
         suite.addTest(new TestSuite(TcpDiscoverySslSelfTest.class));
         suite.addTest(new TestSuite(TcpDiscoverySslSecuredUnsecuredTest.class));