You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/07/19 05:22:57 UTC

[3/9] ignite git commit: ignite-5763 Race in concurrent client cache start

ignite-5763 Race in concurrent client cache start


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/644c9f39
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/644c9f39
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/644c9f39

Branch: refs/heads/ignite-5578-locJoin
Commit: 644c9f39cd980e8087438519b34a79986c82d8b4
Parents: 624b451
Author: sboikov <sb...@gridgain.com>
Authored: Mon Jul 17 17:06:55 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon Jul 17 17:06:55 2017 +0300

----------------------------------------------------------------------
 .../discovery/GridDiscoveryManager.java         |  38 ++++-
 .../cache/IgniteDynamicCacheMultinodeTest.java  | 168 +++++++++++++++++++
 .../testsuites/IgniteCacheTestSuite4.java       |   2 +
 3 files changed, 202 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/644c9f39/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 9f5bd3f..347f6fe 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -192,6 +192,9 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
         }
     };
 
+    /** */
+    private final Object discoEvtMux = new Object();
+
     /** Discovery event worker. */
     private final DiscoveryWorker discoWrk = new DiscoveryWorker();
 
@@ -551,6 +554,26 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
                 final ClusterNode node,
                 final Collection<ClusterNode> topSnapshot,
                 final Map<Long, Collection<ClusterNode>> snapshots,
+                @Nullable DiscoverySpiCustomMessage spiCustomMsg) {
+                synchronized (discoEvtMux) {
+                    onDiscovery0(type, topVer, node, topSnapshot, snapshots, spiCustomMsg);
+                }
+            }
+
+            /**
+             * @param type Event type.
+             * @param topVer Event topology version.
+             * @param node Event node.
+             * @param topSnapshot Topology snapsjot.
+             * @param snapshots Topology snapshots history.
+             * @param spiCustomMsg Custom event.
+             */
+            private void onDiscovery0(
+                final int type,
+                final long topVer,
+                final ClusterNode node,
+                final Collection<ClusterNode> topSnapshot,
+                final Map<Long, Collection<ClusterNode>> snapshots,
                 @Nullable DiscoverySpiCustomMessage spiCustomMsg
             ) {
                 DiscoveryCustomMessage customMsg = spiCustomMsg == null ? null
@@ -2062,12 +2085,15 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
     public void clientCacheStartEvent(UUID reqId,
         @Nullable Map<String, DynamicCacheChangeRequest> startReqs,
         @Nullable Set<String> cachesToClose) {
-        discoWrk.addEvent(EVT_DISCOVERY_CUSTOM_EVT,
-            AffinityTopologyVersion.NONE,
-            localNode(),
-            null,
-            Collections.<ClusterNode>emptyList(),
-            new ClientCacheChangeDummyDiscoveryMessage(reqId, startReqs, cachesToClose));
+        // Prevent race when discovery message was processed, but was passed to discoWrk.
+        synchronized (discoEvtMux) {
+            discoWrk.addEvent(EVT_DISCOVERY_CUSTOM_EVT,
+                AffinityTopologyVersion.NONE,
+                localNode(),
+                null,
+                Collections.<ClusterNode>emptyList(),
+                new ClientCacheChangeDummyDiscoveryMessage(reqId, startReqs, cachesToClose));
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/644c9f39/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheMultinodeTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheMultinodeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheMultinodeTest.java
new file mode 100644
index 0000000..d362189
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheMultinodeTest.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+
+/**
+ *
+ */
+public class IgniteDynamicCacheMultinodeTest extends GridCommonAbstractTest {
+    /** */
+    private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private static final int NODES = 6;
+
+    /** */
+    private boolean client;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
+        cfg.setClientMode(client);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        startGridsMultiThreaded(NODES - 2);
+
+        client = true;
+
+        startGridsMultiThreaded(NODES - 2, 2);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+
+        super.afterTestsStopped();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testGetOrCreateCache() throws Exception {
+        createCacheMultinode(TestOp.GET_OR_CREATE_CACHE);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testGetOrCreateCaches() throws Exception {
+        createCacheMultinode(TestOp.GET_OR_CREATE_CACHES);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void createCacheMultinode(final TestOp op) throws Exception {
+        final int THREADS = NODES * 3;
+
+        for (int i = 0; i < 10; i++) {
+            log.info("Iteration: " + i);
+
+            final CyclicBarrier b = new CyclicBarrier(THREADS);
+
+            final AtomicInteger idx = new AtomicInteger();
+
+            final int iter = i;
+
+            GridTestUtils.runMultiThreaded(new Callable<Void>() {
+                @Override public Void call() throws Exception {
+                    Ignite node = ignite(idx.incrementAndGet() % NODES);
+
+                    b.await();
+
+                    boolean sleep = iter % 2 == 0;
+
+                    if (sleep)
+                        Thread.sleep(ThreadLocalRandom.current().nextLong(100) + 1);
+
+                    switch (op) {
+                        case GET_OR_CREATE_CACHE:
+                            node.getOrCreateCache(new CacheConfiguration<>(DEFAULT_CACHE_NAME));
+
+                            break;
+
+                        case GET_OR_CREATE_CACHES:
+                            node.getOrCreateCaches(cacheConfigurations());
+
+                            break;
+                    }
+
+                    return null;
+                }
+            }, THREADS, "start-cache");
+
+            for (String cache : ignite(0).cacheNames())
+                ignite(0).destroyCache(cache);
+        }
+    }
+
+    /**
+     * @return Cache configurations.
+     */
+    private List<CacheConfiguration> cacheConfigurations() {
+        List<CacheConfiguration> ccfgs = new ArrayList<>();
+
+        for (int i = 0; i < 10; i++) {
+            CacheConfiguration ccfg = new CacheConfiguration("cache-" + i);
+
+            ccfg.setAtomicityMode(i % 2 == 0 ? ATOMIC : TRANSACTIONAL);
+
+            ccfgs.add(ccfg);
+        }
+
+        return ccfgs;
+    }
+
+    /**
+     *
+     */
+    enum TestOp {
+        /** */
+        GET_OR_CREATE_CACHE,
+
+        /** */
+        GET_OR_CREATE_CACHES
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/644c9f39/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
index e7f38be..d931ea9 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
@@ -74,6 +74,7 @@ import org.apache.ignite.internal.processors.cache.IgniteCacheTxPreloadNoWriteTe
 import org.apache.ignite.internal.processors.cache.IgniteCacheTxReplicatedPeekModesTest;
 import org.apache.ignite.internal.processors.cache.IgniteClientCacheInitializationFailTest;
 import org.apache.ignite.internal.processors.cache.IgniteDynamicCacheFilterTest;
+import org.apache.ignite.internal.processors.cache.IgniteDynamicCacheMultinodeTest;
 import org.apache.ignite.internal.processors.cache.IgniteDynamicCacheStartNoExchangeTimeoutTest;
 import org.apache.ignite.internal.processors.cache.IgniteDynamicCacheStartSelfTest;
 import org.apache.ignite.internal.processors.cache.IgniteDynamicCacheStartStopConcurrentTest;
@@ -219,6 +220,7 @@ public class IgniteCacheTestSuite4 extends TestSuite {
         suite.addTestSuite(IgniteCacheTxPreloadNoWriteTest.class);
 
         suite.addTestSuite(IgniteDynamicCacheStartSelfTest.class);
+        suite.addTestSuite(IgniteDynamicCacheMultinodeTest.class);
         suite.addTestSuite(IgniteDynamicCacheWithConfigStartSelfTest.class);
         suite.addTestSuite(IgniteCacheDynamicStopSelfTest.class);
         suite.addTestSuite(IgniteDynamicCacheStartStopConcurrentTest.class);