You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/05/06 08:59:32 UTC

ignite git commit: client join race

Repository: ignite
Updated Branches:
  refs/heads/ignite-client-join-race [created] cead45f05


client join race


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/cead45f0
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/cead45f0
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/cead45f0

Branch: refs/heads/ignite-client-join-race
Commit: cead45f055bb01be71f1aac515f68ea5227c1bd5
Parents: ba21c46
Author: sboikov <sb...@gridgain.com>
Authored: Sat May 6 11:49:09 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Sat May 6 11:57:30 2017 +0300

----------------------------------------------------------------------
 .../apache/ignite/IgniteSystemProperties.java   |   3 +
 .../cache/DynamicCacheChangeBatch.java          |  17 ++
 .../processors/cache/GridCacheProcessor.java    |  23 +-
 .../ignite/spi/discovery/tcp/ClientImpl.java    |  21 +-
 .../ignite/spi/discovery/tcp/ServerImpl.java    |   2 +
 .../spi/discovery/tcp/TcpDiscoverySpi.java      |   4 +
 .../cache/distributed/CacheStartOnJoinTest.java | 226 +++++++++++++++++++
 .../testsuites/IgniteCacheTestSuite4.java       |   2 +
 8 files changed, 295 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/cead45f0/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
index 96930f8..7e92cf3 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
@@ -586,6 +586,9 @@ public final class IgniteSystemProperties {
     /** Cache start size for on-heap maps. Defaults to 4096. */
     public static final String IGNITE_CACHE_START_SIZE = "IGNITE_CACHE_START_SIZE";
 
+    /** */
+    public static final String IGNITE_START_CACHES_ON_JOIN = "IGNITE_START_CACHES_ON_JOIN";
+
     /** Returns true for system properties only avoiding sending sensitive information. */
     private static final IgnitePredicate<Map.Entry<String, String>> PROPS_FILTER = new IgnitePredicate<Map.Entry<String, String>>() {
         @Override public boolean apply(final Map.Entry<String, String> entry) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/cead45f0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
index a250063..66e780f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
@@ -47,6 +47,9 @@ public class DynamicCacheChangeBatch implements DiscoveryCustomMessage {
     /** */
     private boolean clientReconnect;
 
+    /** */
+    private boolean startCaches;
+
     /**
      * @param reqs Requests.
      */
@@ -114,6 +117,20 @@ public class DynamicCacheChangeBatch implements DiscoveryCustomMessage {
     }
 
     /**
+     * @return {@code True} if required to start all caches on client node.
+     */
+    public boolean startCaches() {
+        return startCaches;
+    }
+
+    /**
+     * @param startCaches {@code True} if required to start all caches on client node.
+     */
+    public void startCaches(boolean startCaches) {
+        this.startCaches = startCaches;
+    }
+
+    /**
      * @return {@code True} if request should trigger partition exchange.
      */
     public boolean exchangeNeeded() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/cead45f0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index d6225c0..315ead9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -43,6 +43,7 @@ import javax.management.JMException;
 import javax.management.MBeanServer;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.cache.CacheExistsException;
 import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.cache.CacheRebalanceMode;
@@ -164,6 +165,10 @@ import static org.apache.ignite.internal.processors.cache.GridCacheUtils.isNearE
  */
 @SuppressWarnings({"unchecked", "TypeMayBeWeakened", "deprecation"})
 public class GridCacheProcessor extends GridProcessorAdapter {
+    /** */
+    private static final boolean START_CLIENT_CACHES =
+        IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_START_CACHES_ON_JOIN, false);
+
     /** Shared cache context. */
     private GridCacheSharedContext<?, ?> sharedCtx;
 
@@ -873,7 +878,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
                 boolean loc = desc.locallyConfigured();
 
-                if (loc || (desc.receivedOnDiscovery() && CU.affinityNode(locNode, filter))) {
+                if (loc || (desc.receivedOnDiscovery() &&
+                    (startAllCachesOnClientStart() || CU.affinityNode(locNode, filter)))) {
                     boolean started = desc.onStart();
 
                     assert started : "Failed to change started flag for locally configured cache: " + desc;
@@ -2166,6 +2172,9 @@ public class GridCacheProcessor extends GridProcessorAdapter {
 
         batch.clientReconnect(reconnect);
 
+        if (ctx.localNodeId().equals(joiningNodeId))
+            batch.startCaches(startAllCachesOnClientStart());
+
         // Reset random batch ID so that serialized batches with the same descriptors will be exactly the same.
         batch.id(null);
 
@@ -2244,6 +2253,13 @@ public class GridCacheProcessor extends GridProcessorAdapter {
         }
     }
 
+    /**
+     * @return {@code True} if need locally start all existing caches on client node start.
+     */
+    private boolean startAllCachesOnClientStart() {
+        return START_CLIENT_CACHES && ctx.clientNode();
+    }
+
     /** {@inheritDoc} */
     @Override public void onJoiningNodeDataReceived(JoiningNodeDiscoveryData data) {
         if (data.hasJoiningNodeData()) {
@@ -2382,6 +2398,11 @@ public class GridCacheProcessor extends GridProcessorAdapter {
                         ctx.discovery().addClientNode(cacheName, tup.getKey(), tup.getValue());
                 }
             }
+
+            if (batch.startCaches()) {
+                for (Map.Entry<String, DynamicCacheDescriptor> entry : registeredCaches.entrySet())
+                    ctx.discovery().addClientNode(entry.getKey(), joiningNodeId, false);
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/cead45f0/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index b5b4c77..4c7199c 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -136,6 +136,9 @@ class ClientImpl extends TcpDiscoveryImpl {
     /** Remote nodes. */
     private final ConcurrentMap<UUID, TcpDiscoveryNode> rmtNodes = new ConcurrentHashMap8<>();
 
+    /** */
+    private final List<DiscoveryDataPacket> delayDiscoData = new ArrayList<>();
+
     /** Topology history. */
     private final NavigableMap<Long, Collection<ClusterNode>> topHist = new TreeMap<>();
 
@@ -1751,6 +1754,8 @@ class ClientImpl extends TcpDiscoveryImpl {
 
             nodeAdded = false;
 
+            delayDiscoData.clear();
+
             IgniteClientDisconnectedCheckedException err =
                 new IgniteClientDisconnectedCheckedException(null, "Failed to ping node, " +
                     "client node disconnected.");
@@ -1774,6 +1779,7 @@ class ClientImpl extends TcpDiscoveryImpl {
             joinCnt++;
 
             T2<SocketStream, Boolean> joinRes;
+
             try {
                 joinRes = joinTopology(false, spi.joinTimeout);
             }
@@ -1919,8 +1925,12 @@ class ClientImpl extends TcpDiscoveryImpl {
 
                         DiscoveryDataPacket dataPacket = msg.gridDiscoveryData();
 
-                        if (dataPacket != null && dataPacket.hasJoiningNodeData())
-                            spi.onExchange(dataPacket, U.resolveClassLoader(spi.ignite().configuration()));
+                        if (dataPacket != null && dataPacket.hasJoiningNodeData()) {
+                            if (joining())
+                                delayDiscoData.add(dataPacket);
+                            else
+                                spi.onExchange(dataPacket, U.resolveClassLoader(spi.ignite().configuration()));
+                        }
                     }
                 }
                 else {
@@ -1944,6 +1954,13 @@ class ClientImpl extends TcpDiscoveryImpl {
                     if (dataContainer != null)
                         spi.onExchange(dataContainer, U.resolveClassLoader(spi.ignite().configuration()));
 
+                    if (!delayDiscoData.isEmpty()) {
+                        for (DiscoveryDataPacket data : delayDiscoData)
+                            spi.onExchange(data, U.resolveClassLoader(spi.ignite().configuration()));
+
+                        delayDiscoData.clear();
+                    }
+
                     locNode.setAttributes(msg.clientNodeAttributes());
                     locNode.visible(true);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/cead45f0/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 6a10ec2..663040d 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -2505,6 +2505,8 @@ class ServerImpl extends TcpDiscoveryImpl {
          * @param msg Message to process.
          */
         @Override protected void processMessage(TcpDiscoveryAbstractMessage msg) {
+            spi.startMessageProcess(msg);
+
             sendMetricsUpdateMessage();
 
             DebugLogger log = messageLogger(msg);

http://git-wip-us.apache.org/repos/asf/ignite/blob/cead45f0/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index 99a7dac..370a020 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -1464,6 +1464,10 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi {
         writeToSocket(sock, socketStream(sock), msg, timeout);
     }
 
+    protected void startMessageProcess(TcpDiscoveryAbstractMessage msg) {
+        // No-op.
+    }
+
     /**
      * Writes message to the socket.
      *

http://git-wip-us.apache.org/repos/asf/ignite/blob/cead45f0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheStartOnJoinTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheStartOnJoinTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheStartOnJoinTest.java
new file mode 100644
index 0000000..5203cf0
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheStartOnJoinTest.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.MemoryConfiguration;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
+import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryJoinRequestMessage;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CyclicBarrier;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+
+/**
+ *
+ */
+public class CacheStartOnJoinTest extends GridCommonAbstractTest {
+    /** */
+    private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** Iteration. */
+    private static final int ITERATIONS = 5;
+
+    /** */
+    private boolean client;
+
+    static void doSleep(long millis) {
+        try {
+            U.sleep(1000);
+        }
+        catch (Exception e) {
+            throw new IgniteException();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        TcpDiscoverySpi testSpi = new TcpDiscoverySpi() {
+            @Override protected void writeToSocket(Socket sock, OutputStream out, TcpDiscoveryAbstractMessage msg, long timeout) throws IOException, IgniteCheckedException {
+                super.writeToSocket(sock, out, msg, timeout);
+            }
+
+            private boolean delay = true;
+
+            @Override protected void startMessageProcess(TcpDiscoveryAbstractMessage msg) {
+                if (getTestIgniteInstanceName(0).equals(ignite.name())) {
+                    if (msg instanceof TcpDiscoveryJoinRequestMessage) {
+                        TcpDiscoveryJoinRequestMessage msg0 = (TcpDiscoveryJoinRequestMessage)msg;
+
+                        if (msg0.client() && delay) {
+                            log.info("Delay join processing: " + msg0);
+
+                            delay = false;
+
+                            doSleep(5000);
+                        }
+                    }
+                }
+
+                super.startMessageProcess(msg);
+            }
+        };
+
+        testSpi.setIpFinder(ipFinder);
+        testSpi.setJoinTimeout(60_000);
+
+        cfg.setDiscoverySpi(testSpi);
+
+        MemoryConfiguration memCfg = new MemoryConfiguration();
+        memCfg.setPageSize(1024);
+        memCfg.setDefaultMemoryPolicySize(50 * 1024 * 1024);
+
+        cfg.setMemoryConfiguration(memCfg);
+
+        cfg.setClientMode(client);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected long getTestTimeout() {
+        return 10 * 60 * 1000L;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        System.setProperty(IgniteSystemProperties.IGNITE_START_CACHES_ON_JOIN, "true");
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        System.clearProperty(IgniteSystemProperties.IGNITE_START_CACHES_ON_JOIN);
+
+        super.afterTestsStopped();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testStartNodes() throws Exception {
+        for (int i = 0; i < ITERATIONS; i++) {
+            try {
+                log.info("Iteration: " + (i + 1) + '/' + ITERATIONS);
+
+                doTest();
+            }
+            finally {
+                stopAllGrids(true);
+            }
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void doTest() throws Exception {
+        client = false;
+
+        final int CLIENTS = 5;
+        final int SRVS = 4;
+
+        Ignite srv = startGrids(SRVS);
+
+        srv.getOrCreateCaches(cacheConfigurations());
+
+        final CyclicBarrier b = new CyclicBarrier(CLIENTS);
+
+        client = true;
+
+        GridTestUtils.runMultiThreaded(new IgniteInClosure<Integer>() {
+            @Override public void apply(Integer idx) {
+                try {
+                    b.await();
+
+                    startGrid(idx + SRVS);
+                }
+                catch (Exception e) {
+                    throw new IgniteException(e);
+                }
+            }
+        }, CLIENTS, "start-client");
+
+        final int NODES = CLIENTS + SRVS;
+
+        for (int i = 0; i < CLIENTS + 1; i++) {
+            Ignite node = ignite(i);
+
+            log.info("Check node: " + node.name());
+
+            assertEquals((Boolean)(i >= SRVS), node.configuration().isClientMode());
+
+            for (int c = 0; c < 5; c++) {
+                Collection<ClusterNode> nodes = node.cluster().forCacheNodes("cache-" + c).nodes();
+
+                assertEquals(NODES, nodes.size());
+            }
+
+            for (int c = 0; c < 5; c++) {
+                for (IgniteCache cache : node.getOrCreateCaches(cacheConfigurations())) {
+                    cache.put(i, i);
+
+                    cache.get(i);
+                }
+            }
+        }
+    }
+
+    private Collection<CacheConfiguration> cacheConfigurations() {
+        List<CacheConfiguration> ccfgs = new ArrayList<>();
+
+        for (int i = 0; i < 5; i++)
+            ccfgs.add(cacheConfiguration("cache-" + i));
+
+        return ccfgs;
+    }
+
+    private CacheConfiguration cacheConfiguration(String cacheName) {
+        CacheConfiguration ccfg = new CacheConfiguration(cacheName);
+
+        ccfg.setName(cacheName);
+        ccfg.setAtomicityMode(TRANSACTIONAL);
+        ccfg.setAffinity(new RendezvousAffinityFunction(false, 16));
+
+        return ccfg;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/cead45f0/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
index 8340cd7..1023140 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
@@ -85,6 +85,7 @@ import org.apache.ignite.internal.processors.cache.distributed.CacheAffinityEarl
 import org.apache.ignite.internal.processors.cache.distributed.CacheAtomicPrimarySyncBackPressureTest;
 import org.apache.ignite.internal.processors.cache.distributed.CacheGetFutureHangsSelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.CacheNoValueClassOnServerNodeTest;
+import org.apache.ignite.internal.processors.cache.distributed.CacheStartOnJoinTest;
 import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheCreatePutMultiNodeSelfTest;
 import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheCreatePutTest;
 import org.apache.ignite.internal.processors.cache.distributed.IgniteCachePrimarySyncTest;
@@ -220,6 +221,7 @@ public class IgniteCacheTestSuite4 extends TestSuite {
         suite.addTestSuite(CacheAffinityEarlyTest.class);
         suite.addTestSuite(IgniteCacheCreatePutMultiNodeSelfTest.class);
         suite.addTestSuite(IgniteCacheCreatePutTest.class);
+        suite.addTestSuite(CacheStartOnJoinTest.class);
 
         suite.addTestSuite(GridCacheTxLoadFromStoreOnLockSelfTest.class);