You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/10/05 18:00:33 UTC
[46/50] ignite git commit: ignite-1534 Fixed races in dynamic cache
start exchange ordering.
ignite-1534 Fixed races in dynamic cache start exchange ordering.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7b54cbd7
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7b54cbd7
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7b54cbd7
Branch: refs/heads/ignite-1093-2
Commit: 7b54cbd7499cd498b04e821dfa3b572bd94debec
Parents: a411f94
Author: sboikov <sb...@gridgain.com>
Authored: Fri Oct 2 11:19:06 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Oct 2 11:19:06 2015 +0300
----------------------------------------------------------------------
.../discovery/GridDiscoveryManager.java | 2 +-
.../cache/DynamicCacheDescriptor.java | 17 ++
.../processors/cache/GridCacheContext.java | 2 +-
.../processors/cache/GridCacheMvccManager.java | 20 ++-
.../GridCachePartitionExchangeManager.java | 72 ++-------
.../processors/cache/GridCacheProcessor.java | 26 ++--
.../cache/distributed/dht/GridDhtGetFuture.java | 4 +-
.../dht/GridPartitionedGetFuture.java | 5 +-
.../dht/atomic/GridNearAtomicUpdateFuture.java | 9 +-
.../distributed/near/GridNearGetFuture.java | 2 +
.../cache/IgniteCachePutAllRestartTest.java | 4 +-
.../CacheGetFutureHangsSelfTest.java | 156 +++++++++----------
.../distributed/IgniteCacheCreatePutTest.java | 125 +++++++++++++++
.../testsuites/IgniteCacheTestSuite4.java | 5 +
14 files changed, 284 insertions(+), 165 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/7b54cbd7/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index b694523..a6f5f08 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -550,7 +550,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
gridStartTime = getSpi().getGridStartTime();
updateTopologyVersionIfGreater(new AffinityTopologyVersion(locNode.order()),
- new DiscoCache(localNode(), getSpi().getRemoteNodes()));
+ new DiscoCache(localNode(), F.view(topSnapshot, F.remoteNodes(locNode.id()))));
startLatch.countDown();
http://git-wip-us.apache.org/repos/asf/ignite/blob/7b54cbd7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
index 24df7e4..b100a31 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
@@ -68,6 +68,9 @@ public class DynamicCacheDescriptor {
/** */
private AffinityTopologyVersion startTopVer;
+ /** */
+ private boolean rcvdOnDiscovery;
+
/**
* @param ctx Context.
* @param cacheCfg Cache configuration.
@@ -236,6 +239,20 @@ public class DynamicCacheDescriptor {
this.updatesAllowed = updatesAllowed;
}
+ /**
+ * @return {@code True} if received in discovery data.
+ */
+ public boolean receivedOnDiscovery() {
+ return rcvdOnDiscovery;
+ }
+
+ /**
+ * @param rcvdOnDiscovery {@code True} if received in discovery data.
+ */
+ public void receivedOnDiscovery(boolean rcvdOnDiscovery) {
+ this.rcvdOnDiscovery = rcvdOnDiscovery;
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(DynamicCacheDescriptor.class, this, "cacheName", U.maskName(cacheCfg.getName()));
http://git-wip-us.apache.org/repos/asf/ignite/blob/7b54cbd7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
index 5385dec..3a1cee6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
@@ -1848,7 +1848,7 @@ public class GridCacheContext<K, V> implements Externalizable {
boolean deserializePortable,
boolean cpy) {
assert key != null;
- assert val != null;
+ assert val != null || skipVals;
if (!keepCacheObjects) {
Object key0 = key.value(cacheObjCtx, false);
http://git-wip-us.apache.org/repos/asf/ignite/blob/7b54cbd7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
index dd51da2..0960c9d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
@@ -391,13 +391,14 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
/**
* @param futVer Future ID.
* @param fut Future.
+ * @return {@code False} if future was forcibly completed with error.
*/
- public void addAtomicFuture(GridCacheVersion futVer, GridCacheAtomicFuture<?> fut) {
+ public boolean addAtomicFuture(GridCacheVersion futVer, GridCacheAtomicFuture<?> fut) {
IgniteInternalFuture<?> old = atomicFuts.put(futVer, fut);
assert old == null : "Old future is not null [futVer=" + futVer + ", fut=" + fut + ", old=" + old + ']';
- onFutureAdded(fut);
+ return onFutureAdded(fut);
}
/**
@@ -529,12 +530,21 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
/**
* @param fut Future.
+ * @return {@code False} if future was forcibly completed with error.
*/
- private void onFutureAdded(IgniteInternalFuture<?> fut) {
- if (stopping)
+ private boolean onFutureAdded(IgniteInternalFuture<?> fut) {
+ if (stopping) {
((GridFutureAdapter)fut).onDone(stopError());
- else if (cctx.kernalContext().clientDisconnected())
+
+ return false;
+ }
+ else if (cctx.kernalContext().clientDisconnected()) {
((GridFutureAdapter)fut).onDone(disconnectedError(null));
+
+ return false;
+ }
+
+ return true;
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/7b54cbd7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 3e77e0d..adc2174 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -105,18 +105,12 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
/** Partition resend timeout after eviction. */
private final long partResendTimeout = getLong(IGNITE_PRELOAD_RESEND_TIMEOUT, DFLT_PRELOAD_RESEND_TIMEOUT);
- /** Latch which completes after local exchange future is created. */
- private GridFutureAdapter<?> locExchFut;
-
/** */
private final ReadWriteLock busyLock = new ReentrantReadWriteLock();
/** Last partition refresh. */
private final AtomicLong lastRefresh = new AtomicLong(-1);
- /** Pending futures. */
- private final Queue<GridDhtPartitionsExchangeFuture> pendingExchangeFuts = new ConcurrentLinkedQueue<>();
-
/** */
@GridToStringInclude
private ExchangeWorker exchWorker;
@@ -229,31 +223,18 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
}
if (exchId != null) {
- // Start exchange process.
- pendingExchangeFuts.add(exchFut);
+ if (log.isDebugEnabled())
+ log.debug("Discovery event (will start exchange): " + exchId);
// Event callback - without this callback future will never complete.
exchFut.onEvent(exchId, e);
+ // Start exchange process.
+ addFuture(exchFut);
+ }
+ else {
if (log.isDebugEnabled())
- log.debug("Discovery event (will start exchange): " + exchId);
-
- locExchFut.listen(new CI1<IgniteInternalFuture<?>>() {
- @Override public void apply(IgniteInternalFuture<?> t) {
- if (!enterBusy())
- return;
-
- try {
- // Unwind in the order of discovery events.
- for (GridDhtPartitionsExchangeFuture f = pendingExchangeFuts.poll(); f != null;
- f = pendingExchangeFuts.poll())
- addFuture(f);
- }
- finally {
- leaveBusy();
- }
- }
- });
+ log.debug("Do not start exchange for discovery event: " + evt);
}
}
finally {
@@ -266,8 +247,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
@Override protected void start0() throws IgniteCheckedException {
super.start0();
- locExchFut = new GridFutureAdapter<>();
-
exchWorker = new ExchangeWorker();
cctx.gridEvents().addLocalEventListener(discoLsnr, EVT_NODE_JOINED, EVT_NODE_LEFT, EVT_NODE_FAILED,
@@ -328,12 +307,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
if (reconnect)
reconnectExchangeFut = new GridFutureAdapter<>();
- new IgniteThread(cctx.gridName(), "exchange-worker", exchWorker).start();
-
- onDiscoveryEvent(cctx.localNodeId(), fut);
+ exchWorker.futQ.addFirst(fut);
- // Allow discovery events to get processed.
- locExchFut.onDone();
+ new IgniteThread(cctx.gridName(), "exchange-worker", exchWorker).start();
if (reconnect) {
fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
@@ -382,8 +358,10 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
}
}
- for (GridCacheContext cacheCtx : cctx.cacheContexts())
- cacheCtx.preloader().onInitialExchangeComplete(null);
+ for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
+ if (cacheCtx.startTopologyVersion() == null)
+ cacheCtx.preloader().onInitialExchangeComplete(null);
+ }
if (log.isDebugEnabled())
log.debug("Finished waiting for initial exchange: " + fut.exchangeId());
@@ -414,12 +392,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
for (AffinityReadyFuture f : readyFuts.values())
f.onDone(stopErr);
- for (GridDhtPartitionsExchangeFuture f : pendingExchangeFuts)
- f.onDone(stopErr);
-
- if (locExchFut != null)
- locExchFut.onDone(stopErr);
-
U.cancel(exchWorker);
if (log.isDebugEnabled())
@@ -583,22 +555,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
}
/**
- * @param nodeId New node ID.
- * @param fut Exchange future.
- */
- void onDiscoveryEvent(UUID nodeId, GridDhtPartitionsExchangeFuture fut) {
- if (!enterBusy())
- return;
-
- try {
- addFuture(fut);
- }
- finally {
- leaveBusy();
- }
- }
-
- /**
* @param evt Discovery event.
* @return Affinity topology version.
*/
@@ -1033,7 +989,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
U.warn(log, "Pending exchange futures:");
- for (GridDhtPartitionsExchangeFuture fut : pendingExchangeFuts)
+ for (GridDhtPartitionsExchangeFuture fut : exchWorker.futQ)
U.warn(log, ">>> " + fut);
ExchangeFutureSet exchFuts = this.exchFuts;
http://git-wip-us.apache.org/repos/asf/ignite/blob/7b54cbd7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 6c13399..daa4475 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -805,7 +805,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
boolean loc = desc.locallyConfigured();
- if (loc || CU.affinityNode(locNode, filter)) {
+ if (loc || (desc.receivedOnDiscovery() && CU.affinityNode(locNode, filter))) {
CacheObjectContext cacheObjCtx = ctx.cacheObjects().contextForCache(ccfg);
CachePluginManager pluginMgr = desc.pluginManager();
@@ -1958,7 +1958,11 @@ public class GridCacheProcessor extends GridProcessorAdapter {
if (req.initiatingNodeId() == null)
desc.staticallyConfigured(true);
- registeredCaches.put(maskNull(req.cacheName()), desc);
+ desc.receivedOnDiscovery(true);
+
+ DynamicCacheDescriptor old = registeredCaches.put(maskNull(req.cacheName()), desc);
+
+ assert old == null : old;
ctx.discovery().setCacheFilter(
req.cacheName(),
@@ -2474,10 +2478,16 @@ public class GridCacheProcessor extends GridProcessorAdapter {
}
}
else {
- if (req.clientStartOnly()) {
- assert req.initiatingNodeId() != null : req;
+ assert req.initiatingNodeId() != null : req;
+
+ // Cache already exists, exchange is needed only if client cache should be created.
+ ClusterNode node = ctx.discovery().node(req.initiatingNodeId());
- needExchange = ctx.discovery().addClientNode(req.cacheName(),
+ boolean clientReq = node != null &&
+ !ctx.discovery().cacheAffinityNode(node, req.cacheName());
+
+ if (req.clientStartOnly()) {
+ needExchange = clientReq && ctx.discovery().addClientNode(req.cacheName(),
req.initiatingNodeId(),
req.nearCacheConfiguration() != null);
}
@@ -2488,12 +2498,6 @@ public class GridCacheProcessor extends GridProcessorAdapter {
"(a cache with the same name is already started): " + U.maskName(req.cacheName())));
}
else {
- // Cache already exists, exchange is needed only if client cache should be created.
- ClusterNode node = ctx.discovery().node(req.initiatingNodeId());
-
- boolean clientReq = node != null &&
- !ctx.discovery().cacheAffinityNode(node, req.cacheName());
-
needExchange = clientReq && ctx.discovery().addClientNode(req.cacheName(),
req.initiatingNodeId(),
req.nearCacheConfiguration() != null);
http://git-wip-us.apache.org/repos/asf/ignite/blob/7b54cbd7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
index 76aaf72..a67b1de 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
@@ -447,8 +447,8 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
if (v == null)
it.remove();
- else if (!skipVals)
- info.value((CacheObject)v);
+ else
+ info.value(skipVals ? null : (CacheObject)v);
}
return infos;
http://git-wip-us.apache.org/repos/asf/ignite/blob/7b54cbd7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
index 0202c53..abbe7b8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
@@ -587,8 +587,11 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
if (keysSize != 0) {
Map<K, V> map = new GridLeanMap<>(keysSize);
- for (GridCacheEntryInfo info : infos)
+ for (GridCacheEntryInfo info : infos) {
+ assert skipVals == (info.value() == null);
+
cctx.addResult(map, info.key(), info.value(), skipVals, false, deserializePortable, false);
+ }
return map;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/7b54cbd7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index fb2c5ad..41df53a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -825,8 +825,13 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
futVer = cctx.versions().next(topVer);
- if (storeFuture())
- cctx.mvcc().addAtomicFuture(futVer, GridNearAtomicUpdateFuture.this);
+ if (storeFuture()) {
+ if (!cctx.mvcc().addAtomicFuture(futVer, GridNearAtomicUpdateFuture.this)) {
+ assert isDone() : GridNearAtomicUpdateFuture.this;
+
+ return;
+ }
+ }
// Assign version on near node in CLOCK ordering mode even if fastMap is false.
if (updVer == null)
http://git-wip-us.apache.org/repos/asf/ignite/blob/7b54cbd7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
index a7875f6..d9763f8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
@@ -703,6 +703,8 @@ public final class GridNearGetFuture<K, V> extends GridCompoundIdentityFuture<Ma
CacheObject val = info.value();
KeyCacheObject key = info.key();
+ assert skipVals == (info.value() == null);
+
cctx.addResult(map, key, val, skipVals, false, deserializePortable, false);
}
catch (GridCacheEntryRemovedException ignore) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/7b54cbd7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCachePutAllRestartTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCachePutAllRestartTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCachePutAllRestartTest.java
index fc14085..ae99926 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCachePutAllRestartTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCachePutAllRestartTest.java
@@ -163,12 +163,12 @@ public class IgniteCachePutAllRestartTest extends GridCommonAbstractTest {
info("Running iteration on the node [idx=" + node + ", nodeId=" + ignite.cluster().localNode().id() + ']');
+ final IgniteCache<Integer, Integer> cache = ignite.cache(CACHE_NAME);
+
IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Void>() {
@Override public Void call() throws Exception {
Thread.currentThread().setName("put-thread");
- IgniteCache<Integer, Integer> cache = ignite.cache(CACHE_NAME);
-
Random rnd = new Random();
long endTime = System.currentTimeMillis() + 60_000;
http://git-wip-us.apache.org/repos/asf/ignite/blob/7b54cbd7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheGetFutureHangsSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheGetFutureHangsSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheGetFutureHangsSelfTest.java
index 8e8447e..e8622aa 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheGetFutureHangsSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheGetFutureHangsSelfTest.java
@@ -18,23 +18,21 @@
package org.apache.ignite.internal.processors.cache.distributed;
import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Set;
+import java.util.List;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReferenceArray;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.marshaller.optimized.OptimizedMarshaller;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC;
@@ -45,22 +43,14 @@ public class CacheGetFutureHangsSelfTest extends GridCommonAbstractTest {
/** Grid count. */
private static final int GRID_CNT = 8;
- /** Grids. */
- private static Ignite[] grids;
+ /** */
+ private AtomicReferenceArray<Ignite> nodes;
- /** Ids. */
- private static String[] ids;
-
- /** Flags. */
- private static AtomicBoolean[] flags;
-
- /** Futs. */
- private static Collection<IgniteInternalFuture> futs;
-
- /** Alive grids. */
- private static Set<Integer> aliveGrids;
+ /** */
+ private volatile boolean done;
/** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
@Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(gridName);
@@ -81,17 +71,27 @@ public class CacheGetFutureHangsSelfTest extends GridCommonAbstractTest {
return cfg;
}
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ super.afterTestsStopped();
+
+ stopAllGrids();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected long getTestTimeout() {
+ return 5 * 60_000;
+ }
+
/**
* @throws Exception If failed.
*/
- public void testFailover() throws Exception {
- int cnt = 10;
+ public void testContainsKeyFailover() throws Exception {
+ int cnt = 3;
for (int i = 0; i < cnt; i++) {
try {
- U.debug("*** Iteration " + (i + 1) + '/' + cnt);
-
- init();
+ log.info("Iteration: " + (i + 1) + '/' + cnt);
doTestFailover();
}
@@ -102,54 +102,34 @@ public class CacheGetFutureHangsSelfTest extends GridCommonAbstractTest {
}
/**
- * Initializes test.
- */
- private void init() {
- grids = new Ignite[GRID_CNT + 1];
-
- ids = new String[GRID_CNT + 1];
-
- aliveGrids = new HashSet<>();
-
- flags = new AtomicBoolean[GRID_CNT + 1];
-
- futs = new ArrayList<>();
- }
-
- /**
* Executes one test iteration.
+ * @throws Exception If failed.
*/
private void doTestFailover() throws Exception {
try {
- for (int i = 0; i < GRID_CNT + 1; i++) {
- final IgniteEx grid = startGrid(i);
+ done = false;
- grids[i] = grid;
+ nodes = new AtomicReferenceArray<>(GRID_CNT);
- ids[i] = grid.localNode().id().toString();
+ startGridsMultiThreaded(GRID_CNT, false);
- aliveGrids.add(i);
+ for (int i = 0; i < GRID_CNT ; i++)
+ assertTrue(nodes.compareAndSet(i, null, ignite(i)));
- flags[i] = new AtomicBoolean();
- }
+ List<IgniteInternalFuture> futs = new ArrayList<>();
for (int i = 0; i < GRID_CNT + 1; i++) {
- final int gridIdx = i;
-
futs.add(multithreadedAsync(new Runnable() {
@Override public void run() {
- IgniteCache cache = grids[gridIdx].cache(null);
+ T2<Ignite, Integer> ignite;
- while (!flags[gridIdx].get()) {
- int idx = ThreadLocalRandom.current().nextInt(GRID_CNT + 1);
+ while ((ignite = randomNode()) != null) {
+ IgniteCache<Object, Object> cache = ignite.get1().cache(null);
- String id = ids[idx];
+ for (int i = 0; i < 100; i++)
+ cache.containsKey(ThreadLocalRandom.current().nextInt(100_000));
- if (id != null /*&& grids[gridIdx] != null*/) {
- //U.debug("!!! Grid containsKey start " + gridIdx);
- cache.containsKey(id);
- //U.debug("!!! Grid containsKey finished " + gridIdx);
- }
+ assertTrue(nodes.compareAndSet(ignite.get2(), null, ignite.get1()));
try {
Thread.sleep(ThreadLocalRandom.current().nextLong(50));
@@ -163,18 +143,15 @@ public class CacheGetFutureHangsSelfTest extends GridCommonAbstractTest {
futs.add(multithreadedAsync(new Runnable() {
@Override public void run() {
- IgniteCache cache = grids[gridIdx].cache(null);
+ T2<Ignite, Integer> ignite;
- while (!flags[gridIdx].get()) {
- int idx = ThreadLocalRandom.current().nextInt(GRID_CNT + 1);
+ while ((ignite = randomNode()) != null) {
+ IgniteCache<Object, Object> cache = ignite.get1().cache(null);
- String id = ids[idx];
+ for (int i = 0; i < 100; i++)
+ cache.put(ThreadLocalRandom.current().nextInt(100_000), UUID.randomUUID());
- if (id != null /*&& grids[gridIdx] != null*/) {
- //U.debug("!!! Grid put start " + gridIdx);
- cache.put(id, UUID.randomUUID());
- //U.debug("!!! Grid put finished " + gridIdx);
- }
+ assertTrue(nodes.compareAndSet(ignite.get2(), null, ignite.get1()));
try {
Thread.sleep(ThreadLocalRandom.current().nextLong(50));
@@ -187,35 +164,50 @@ public class CacheGetFutureHangsSelfTest extends GridCommonAbstractTest {
}, 1, "put-thread-" + i));
}
- while (aliveGrids.size() > 1) {
- final int gridToKill = ThreadLocalRandom.current().nextInt(GRID_CNT) + 1;
+ try {
+ int aliveGrids = GRID_CNT;
- if (gridToKill > 0 && grids[gridToKill] != null) {
- U.debug("!!! Trying to kill grid " + gridToKill);
+ while (aliveGrids > 0) {
+ T2<Ignite, Integer> ignite = randomNode();
- //synchronized (mons[gridToKill]) {
- U.debug("!!! Grid stop start " + gridToKill);
+ assert ignite != null;
- grids[gridToKill].close();
+ Ignite ignite0 = ignite.get1();
- aliveGrids.remove(gridToKill);
+ log.info("Stop node: " + ignite0.name());
- grids[gridToKill] = null;
+ ignite0.close();
- flags[gridToKill].set(true);
+ log.info("Node stop finished: " + ignite0.name());
- U.debug("!!! Grid stop finished " + gridToKill);
- //}
+ aliveGrids--;
}
}
+ finally {
+ done = true;
+ }
- Thread.sleep(ThreadLocalRandom.current().nextLong(100));
+ for (IgniteInternalFuture fut : futs)
+ fut.get();
}
finally {
- flags[0].set(true);
+ done = true;
+ }
+ }
- for (IgniteInternalFuture fut : futs)
- fut.get();
+ /**
+ * @return Random node and its index.
+ */
+ @Nullable private T2<Ignite, Integer> randomNode() {
+ while (!done) {
+ int idx = ThreadLocalRandom.current().nextInt(GRID_CNT);
+
+ Ignite ignite = nodes.get(idx);
+
+ if (ignite != null && nodes.compareAndSet(idx, ignite, null))
+ return new T2<>(ignite, idx);
}
+
+ return null;
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/7b54cbd7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheCreatePutTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheCreatePutTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheCreatePutTest.java
new file mode 100644
index 0000000..8b3d9d3
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheCreatePutTest.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.marshaller.optimized.OptimizedMarshaller;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ *
+ */
+public class IgniteCacheCreatePutTest extends GridCommonAbstractTest {
+ /** Grid count. */
+ private static final int GRID_CNT = 3;
+
+ /** */
+ private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+ /** {@inheritDoc} */
+ protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ cfg.setPeerClassLoadingEnabled(false);
+
+ TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
+ discoSpi.setIpFinder(ipFinder);
+
+ cfg.setDiscoverySpi(discoSpi);
+
+ OptimizedMarshaller marsh = new OptimizedMarshaller();
+ marsh.setRequireSerializable(false);
+
+ cfg.setMarshaller(marsh);
+
+ CacheConfiguration ccfg = new CacheConfiguration();
+
+ ccfg.setName("cache*");
+ ccfg.setCacheMode(CacheMode.PARTITIONED);
+ ccfg.setBackups(1);
+
+ cfg.setCacheConfiguration(ccfg);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected long getTestTimeout() {
+ return 3 * 60 * 1000L;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ super.afterTestsStopped();
+
+ stopAllGrids();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testStartNodes() throws Exception {
+ long stopTime = System.currentTimeMillis() + 2 * 60_000;
+
+ try {
+ int iter = 0;
+
+ while (System.currentTimeMillis() < stopTime) {
+ log.info("Iteration: " + iter++);
+
+ try {
+ final AtomicInteger idx = new AtomicInteger();
+
+ GridTestUtils.runMultiThreaded(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ int node = idx.getAndIncrement();
+
+ Ignite ignite = startGrid(node);
+
+ IgniteCache<Object, Object> cache = ignite.getOrCreateCache("cache1");
+
+ assertNotNull(cache);
+
+ for (int i = 0; i < 100; i++)
+ cache.put(i, i);
+
+ return null;
+ }
+ }, GRID_CNT, "start");
+ }
+ finally {
+ stopAllGrids();
+ }
+ }
+ }
+ finally {
+ stopAllGrids();
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/7b54cbd7/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
index f8c9d26..b89bffd 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
@@ -77,8 +77,10 @@ import org.apache.ignite.internal.processors.cache.IgniteStartCacheInTransaction
import org.apache.ignite.internal.processors.cache.IgniteStartCacheInTransactionSelfTest;
import org.apache.ignite.internal.processors.cache.IgniteSystemCacheOnClientTest;
import org.apache.ignite.internal.processors.cache.distributed.CacheAffinityEarlyTest;
+import org.apache.ignite.internal.processors.cache.distributed.CacheGetFutureHangsSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.CacheNoValueClassOnServerNodeTest;
import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheCreatePutMultiNodeSelfTest;
+import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheCreatePutTest;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridCacheDhtTxPreloadSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.dht.IgniteCacheLockFailoverSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.dht.IgniteCacheMultiTxLockSelfTest;
@@ -205,6 +207,7 @@ public class IgniteCacheTestSuite4 extends TestSuite {
suite.addTestSuite(IgniteDynamicCacheStartNoExchangeTimeoutTest.class);
suite.addTestSuite(CacheAffinityEarlyTest.class);
suite.addTestSuite(IgniteCacheCreatePutMultiNodeSelfTest.class);
+ suite.addTestSuite(IgniteCacheCreatePutTest.class);
suite.addTestSuite(GridCacheTxLoadFromStoreOnLockSelfTest.class);
@@ -278,6 +281,8 @@ public class IgniteCacheTestSuite4 extends TestSuite {
suite.addTestSuite(CrossCacheLockTest.class);
suite.addTestSuite(IgniteCrossCacheTxSelfTest.class);
+ suite.addTestSuite(CacheGetFutureHangsSelfTest.class);
+
return suite;
}
}
\ No newline at end of file