You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/07/17 14:07:08 UTC
ignite git commit: ignite-5763 Race in concurrent client cache start
Repository: ignite
Updated Branches:
refs/heads/master 624b451b2 -> 644c9f39c
ignite-5763 Race in concurrent client cache start
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/644c9f39
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/644c9f39
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/644c9f39
Branch: refs/heads/master
Commit: 644c9f39cd980e8087438519b34a79986c82d8b4
Parents: 624b451
Author: sboikov <sb...@gridgain.com>
Authored: Mon Jul 17 17:06:55 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon Jul 17 17:06:55 2017 +0300
----------------------------------------------------------------------
.../discovery/GridDiscoveryManager.java | 38 ++++-
.../cache/IgniteDynamicCacheMultinodeTest.java | 168 +++++++++++++++++++
.../testsuites/IgniteCacheTestSuite4.java | 2 +
3 files changed, 202 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/644c9f39/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 9f5bd3f..347f6fe 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -192,6 +192,9 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
}
};
+ /** */
+ private final Object discoEvtMux = new Object();
+
/** Discovery event worker. */
private final DiscoveryWorker discoWrk = new DiscoveryWorker();
@@ -551,6 +554,26 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
final ClusterNode node,
final Collection<ClusterNode> topSnapshot,
final Map<Long, Collection<ClusterNode>> snapshots,
+ @Nullable DiscoverySpiCustomMessage spiCustomMsg) {
+ synchronized (discoEvtMux) {
+ onDiscovery0(type, topVer, node, topSnapshot, snapshots, spiCustomMsg);
+ }
+ }
+
+ /**
+ * @param type Event type.
+ * @param topVer Event topology version.
+ * @param node Event node.
+ * @param topSnapshot Topology snapsjot.
+ * @param snapshots Topology snapshots history.
+ * @param spiCustomMsg Custom event.
+ */
+ private void onDiscovery0(
+ final int type,
+ final long topVer,
+ final ClusterNode node,
+ final Collection<ClusterNode> topSnapshot,
+ final Map<Long, Collection<ClusterNode>> snapshots,
@Nullable DiscoverySpiCustomMessage spiCustomMsg
) {
DiscoveryCustomMessage customMsg = spiCustomMsg == null ? null
@@ -2062,12 +2085,15 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
public void clientCacheStartEvent(UUID reqId,
@Nullable Map<String, DynamicCacheChangeRequest> startReqs,
@Nullable Set<String> cachesToClose) {
- discoWrk.addEvent(EVT_DISCOVERY_CUSTOM_EVT,
- AffinityTopologyVersion.NONE,
- localNode(),
- null,
- Collections.<ClusterNode>emptyList(),
- new ClientCacheChangeDummyDiscoveryMessage(reqId, startReqs, cachesToClose));
+ // Prevent race when discovery message was processed, but was passed to discoWrk.
+ synchronized (discoEvtMux) {
+ discoWrk.addEvent(EVT_DISCOVERY_CUSTOM_EVT,
+ AffinityTopologyVersion.NONE,
+ localNode(),
+ null,
+ Collections.<ClusterNode>emptyList(),
+ new ClientCacheChangeDummyDiscoveryMessage(reqId, startReqs, cachesToClose));
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/644c9f39/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheMultinodeTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheMultinodeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheMultinodeTest.java
new file mode 100644
index 0000000..d362189
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteDynamicCacheMultinodeTest.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+
+/**
+ *
+ */
+public class IgniteDynamicCacheMultinodeTest extends GridCommonAbstractTest {
+ /** */
+ private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+ /** */
+ private static final int NODES = 6;
+
+ /** */
+ private boolean client;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
+ cfg.setClientMode(client);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ super.beforeTestsStarted();
+
+ startGridsMultiThreaded(NODES - 2);
+
+ client = true;
+
+ startGridsMultiThreaded(NODES - 2, 2);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ stopAllGrids();
+
+ super.afterTestsStopped();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testGetOrCreateCache() throws Exception {
+ createCacheMultinode(TestOp.GET_OR_CREATE_CACHE);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testGetOrCreateCaches() throws Exception {
+ createCacheMultinode(TestOp.GET_OR_CREATE_CACHES);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ private void createCacheMultinode(final TestOp op) throws Exception {
+ final int THREADS = NODES * 3;
+
+ for (int i = 0; i < 10; i++) {
+ log.info("Iteration: " + i);
+
+ final CyclicBarrier b = new CyclicBarrier(THREADS);
+
+ final AtomicInteger idx = new AtomicInteger();
+
+ final int iter = i;
+
+ GridTestUtils.runMultiThreaded(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ Ignite node = ignite(idx.incrementAndGet() % NODES);
+
+ b.await();
+
+ boolean sleep = iter % 2 == 0;
+
+ if (sleep)
+ Thread.sleep(ThreadLocalRandom.current().nextLong(100) + 1);
+
+ switch (op) {
+ case GET_OR_CREATE_CACHE:
+ node.getOrCreateCache(new CacheConfiguration<>(DEFAULT_CACHE_NAME));
+
+ break;
+
+ case GET_OR_CREATE_CACHES:
+ node.getOrCreateCaches(cacheConfigurations());
+
+ break;
+ }
+
+ return null;
+ }
+ }, THREADS, "start-cache");
+
+ for (String cache : ignite(0).cacheNames())
+ ignite(0).destroyCache(cache);
+ }
+ }
+
+ /**
+ * @return Cache configurations.
+ */
+ private List<CacheConfiguration> cacheConfigurations() {
+ List<CacheConfiguration> ccfgs = new ArrayList<>();
+
+ for (int i = 0; i < 10; i++) {
+ CacheConfiguration ccfg = new CacheConfiguration("cache-" + i);
+
+ ccfg.setAtomicityMode(i % 2 == 0 ? ATOMIC : TRANSACTIONAL);
+
+ ccfgs.add(ccfg);
+ }
+
+ return ccfgs;
+ }
+
+ /**
+ *
+ */
+ enum TestOp {
+ /** */
+ GET_OR_CREATE_CACHE,
+
+ /** */
+ GET_OR_CREATE_CACHES
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/644c9f39/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
index e7f38be..d931ea9 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
@@ -74,6 +74,7 @@ import org.apache.ignite.internal.processors.cache.IgniteCacheTxPreloadNoWriteTe
import org.apache.ignite.internal.processors.cache.IgniteCacheTxReplicatedPeekModesTest;
import org.apache.ignite.internal.processors.cache.IgniteClientCacheInitializationFailTest;
import org.apache.ignite.internal.processors.cache.IgniteDynamicCacheFilterTest;
+import org.apache.ignite.internal.processors.cache.IgniteDynamicCacheMultinodeTest;
import org.apache.ignite.internal.processors.cache.IgniteDynamicCacheStartNoExchangeTimeoutTest;
import org.apache.ignite.internal.processors.cache.IgniteDynamicCacheStartSelfTest;
import org.apache.ignite.internal.processors.cache.IgniteDynamicCacheStartStopConcurrentTest;
@@ -219,6 +220,7 @@ public class IgniteCacheTestSuite4 extends TestSuite {
suite.addTestSuite(IgniteCacheTxPreloadNoWriteTest.class);
suite.addTestSuite(IgniteDynamicCacheStartSelfTest.class);
+ suite.addTestSuite(IgniteDynamicCacheMultinodeTest.class);
suite.addTestSuite(IgniteDynamicCacheWithConfigStartSelfTest.class);
suite.addTestSuite(IgniteCacheDynamicStopSelfTest.class);
suite.addTestSuite(IgniteDynamicCacheStartStopConcurrentTest.class);