You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2015/12/17 09:10:19 UTC
[3/6] ignite git commit: ignite-2146 Avoid hang in 'cache.get' if
topology locked.
ignite-2146 Avoid hang in 'cache.get' if topology locked.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/96dc238f
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/96dc238f
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/96dc238f
Branch: refs/heads/ignite-2100
Commit: 96dc238fd3e159c68ba54eef2fc6f3589aa6b2ca
Parents: d8c8214
Author: sboikov <sb...@gridgain.com>
Authored: Thu Dec 17 08:45:48 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Dec 17 08:45:48 2015 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheProcessor.java | 8 +-
.../cache/GridCacheSharedContext.java | 20 +
.../dht/CacheDistributedGetFutureAdapter.java | 2 +-
.../dht/GridPartitionedGetFuture.java | 15 +-
.../dht/GridPartitionedSingleGetFuture.java | 8 +
.../dht/atomic/GridNearAtomicUpdateFuture.java | 10 +-
.../colocated/GridDhtColocatedLockFuture.java | 2 +-
.../distributed/near/GridNearGetFuture.java | 17 +-
.../distributed/near/GridNearLockFuture.java | 30 +-
...ridNearOptimisticTxPrepareFutureAdapter.java | 6 +-
.../cache/transactions/IgniteTxManager.java | 6 +-
.../cache/IgniteCacheNearLockValueSelfTest.java | 11 +-
.../IgniteCacheStoreValueAbstractTest.java | 2 +-
.../IgniteStartCacheInTransactionSelfTest.java | 39 +-
.../CacheGetInsideLockChangingTopologyTest.java | 475 +++++++++++++++++++
.../IgniteCacheFailoverTestSuite3.java | 2 +
16 files changed, 593 insertions(+), 60 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/96dc238f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index f0bed99..380c163 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -3343,16 +3343,14 @@ public class GridCacheProcessor extends GridProcessorAdapter {
* @throws IgniteException If transaction exist.
*/
private void checkEmptyTransactions() throws IgniteException {
- if (transactions().tx() != null)
- throw new IgniteException("Cannot start/stop cache within transaction.");
-
- if (sharedCtx.mvcc().lastExplicitLockTopologyVersion(Thread.currentThread().getId()) != null)
- throw new IgniteException("Cannot start/stop cache within lock.");
+ if (transactions().tx() != null || sharedCtx.lockedTopologyVersion(null) != null)
+ throw new IgniteException("Cannot start/stop cache within lock or transaction.");
}
/**
* @param val Object to check.
* @throws IgniteCheckedException If validation failed.
+ * @return Configuration copy.
*/
private CacheConfiguration cloneCheckSerializable(CacheConfiguration val) throws IgniteCheckedException {
if (val == null)
http://git-wip-us.apache.org/repos/asf/ignite/blob/96dc238f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
index 608829a..5ed1df9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
@@ -572,6 +572,26 @@ public class GridCacheSharedContext<K, V> {
}
/**
+ * @param ignore Transaction to ignore.
+ * @return Not null topology version if current thread holds lock preventing topology change.
+ */
+ @Nullable public AffinityTopologyVersion lockedTopologyVersion(IgniteInternalTx ignore) {
+ long threadId = Thread.currentThread().getId();
+
+ IgniteInternalTx tx = txMgr.anyActiveThreadTx(threadId, ignore);
+
+ AffinityTopologyVersion topVer = null;
+
+ if (tx != null && tx.topologyVersionSnapshot() != null)
+ topVer = tx.topologyVersionSnapshot();
+
+ if (topVer == null)
+ topVer = mvccMgr.lastExplicitLockTopologyVersion(threadId);
+
+ return topVer;
+ }
+
+ /**
* Nulling references to potentially leak-prone objects.
*/
public void cleanup() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/96dc238f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java
index 320c3c2..cfbc21b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java
@@ -87,7 +87,7 @@ public abstract class CacheDistributedGetFutureAdapter<K, V> extends GridCompoun
protected IgniteCacheExpiryPolicy expiryPlc;
/** Flag indicating that get should be done on a locked topology version. */
- protected final boolean canRemap;
+ protected boolean canRemap;
/** */
protected final boolean needVer;
http://git-wip-us.apache.org/repos/asf/ignite/blob/96dc238f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
index 6867e21..e8aaca0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
@@ -133,10 +133,19 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda
* Initializes future.
*/
public void init() {
- AffinityTopologyVersion topVer = this.topVer.topologyVersion() > 0 ? this.topVer :
- canRemap ? cctx.affinity().affinityTopologyVersion() : cctx.shared().exchange().readyAffinityVersion();
+ AffinityTopologyVersion lockedTopVer = cctx.shared().lockedTopologyVersion(null);
- map(keys, Collections.<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>>emptyMap(), topVer);
+ if (lockedTopVer != null) {
+ canRemap = false;
+
+ map(keys, Collections.<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>>emptyMap(), lockedTopVer);
+ }
+ else {
+ AffinityTopologyVersion topVer = this.topVer.topologyVersion() > 0 ? this.topVer :
+ canRemap ? cctx.affinity().affinityTopologyVersion() : cctx.shared().exchange().readyAffinityVersion();
+
+ map(keys, Collections.<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>>emptyMap(), topVer);
+ }
markInitialized();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/96dc238f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
index 5d0814f..29971fd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
@@ -156,6 +156,14 @@ public class GridPartitionedSingleGetFuture extends GridFutureAdapter<Object> im
) {
assert key != null;
+ AffinityTopologyVersion lockedTopVer = cctx.shared().lockedTopologyVersion(null);
+
+ if (lockedTopVer != null) {
+ topVer = lockedTopVer;
+
+ canRemap = false;
+ }
+
this.cctx = cctx;
this.key = key;
this.readThrough = readThrough;
http://git-wip-us.apache.org/repos/asf/ignite/blob/96dc238f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index b384bab..eefdc73 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -278,15 +278,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
* Performs future mapping.
*/
public void map() {
- AffinityTopologyVersion topVer = null;
-
- IgniteInternalTx tx = cctx.tm().anyActiveThreadTx(null);
-
- if (tx != null && tx.topologyVersionSnapshot() != null)
- topVer = tx.topologyVersionSnapshot();
-
- if (topVer == null)
- topVer = cctx.mvcc().lastExplicitLockTopologyVersion(Thread.currentThread().getId());
+ AffinityTopologyVersion topVer = cctx.shared().lockedTopologyVersion(null);
if (topVer == null)
mapOnTopology();
http://git-wip-us.apache.org/repos/asf/ignite/blob/96dc238f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
index d3028ca..22b329c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
@@ -597,7 +597,7 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
// If there is another system transaction in progress, use it's topology version to prevent deadlock.
if (topVer == null && tx != null && tx.system()) {
- IgniteInternalTx tx0 = cctx.tm().anyActiveThreadTx(tx);
+ IgniteInternalTx tx0 = cctx.tm().anyActiveThreadTx(Thread.currentThread().getId(), tx);
if (tx0 != null)
topVer = tx0.topologyVersionSnapshot();
http://git-wip-us.apache.org/repos/asf/ignite/blob/96dc238f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
index cb866e3..a121af9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
@@ -142,11 +142,20 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap
* Initializes future.
*/
public void init() {
- AffinityTopologyVersion topVer = tx == null ?
- (canRemap ? cctx.affinity().affinityTopologyVersion() : cctx.shared().exchange().readyAffinityVersion()) :
- tx.topologyVersion();
+ AffinityTopologyVersion lockedTopVer = cctx.shared().lockedTopologyVersion(null);
- map(keys, Collections.<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>>emptyMap(), topVer);
+ if (lockedTopVer != null) {
+ canRemap = false;
+
+ map(keys, Collections.<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>>emptyMap(), lockedTopVer);
+ }
+ else {
+ AffinityTopologyVersion topVer = tx == null ?
+ (canRemap ? cctx.affinity().affinityTopologyVersion() : cctx.shared().exchange().readyAffinityVersion()) :
+ tx.topologyVersion();
+
+ map(keys, Collections.<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>>emptyMap(), topVer);
+ }
markInitialized();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/96dc238f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
index 4cb7248..23e0f6b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
@@ -56,8 +56,10 @@ import org.apache.ignite.internal.util.future.GridEmbeddedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.C1;
import org.apache.ignite.internal.util.typedef.C2;
import org.apache.ignite.internal.util.typedef.CI1;
+import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -163,6 +165,7 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
* @param accessTtl TTL for read operation.
* @param filter Filter.
* @param skipStore skipStore
+ * @param keepBinary Keep binary flag.
*/
public GridNearLockFuture(
GridCacheContext<?, ?> cctx,
@@ -678,7 +681,22 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
/** {@inheritDoc} */
@Override public String toString() {
- return S.toString(GridNearLockFuture.class, this, "inTx", inTx(), "super", super.toString());
+ Collection<String> futs = F.viewReadOnly(futures(), new C1<IgniteInternalFuture<?>, String>() {
+ @Override public String apply(IgniteInternalFuture<?> f) {
+ if (isMini(f)) {
+ MiniFuture m = (MiniFuture)f;
+
+ return "[node=" + m.node().id() + ", loc=" + m.node().isLocal() + ", done=" + f.isDone() + "]";
+ }
+ else
+ return "[loc=true, done=" + f.isDone() + "]";
+ }
+ });
+
+ return S.toString(GridNearLockFuture.class, this,
+ "innerFuts", futs,
+ "inTx", inTx(),
+ "super", super.toString());
}
/**
@@ -700,11 +718,13 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
*/
void map() {
// Obtain the topology version to use.
- AffinityTopologyVersion topVer = cctx.mvcc().lastExplicitLockTopologyVersion(Thread.currentThread().getId());
+ long threadId = Thread.currentThread().getId();
+
+ AffinityTopologyVersion topVer = cctx.mvcc().lastExplicitLockTopologyVersion(threadId);
// If there is another system transaction in progress, use it's topology version to prevent deadlock.
if (topVer == null && tx != null && tx.system()) {
- IgniteInternalTx tx0 = cctx.tm().anyActiveThreadTx(tx);
+ IgniteInternalTx tx0 = cctx.tm().anyActiveThreadTx(threadId, tx);
if (tx0 != null)
topVer = tx0.topologyVersionSnapshot();
@@ -1273,8 +1293,6 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
cctx.io().send(node, req, cctx.ioPolicy());
}
catch (ClusterTopologyCheckedException ex) {
- assert fut != null;
-
fut.onResult(ex);
}
}
@@ -1288,8 +1306,6 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
cctx.io().send(node, req, cctx.ioPolicy());
}
catch (ClusterTopologyCheckedException ex) {
- assert fut != null;
-
fut.onResult(ex);
}
catch (IgniteCheckedException e) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/96dc238f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java
index 5c7553f..b3eab34 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java
@@ -50,11 +50,13 @@ public abstract class GridNearOptimisticTxPrepareFutureAdapter extends GridNearT
/** {@inheritDoc} */
@Override public final void prepare() {
// Obtain the topology version to use.
- AffinityTopologyVersion topVer = cctx.mvcc().lastExplicitLockTopologyVersion(Thread.currentThread().getId());
+ long threadId = Thread.currentThread().getId();
+
+ AffinityTopologyVersion topVer = cctx.mvcc().lastExplicitLockTopologyVersion(threadId);
// If there is another system transaction in progress, use it's topology version to prevent deadlock.
if (topVer == null && tx != null && tx.system()) {
- IgniteInternalTx tx0 = cctx.tm().anyActiveThreadTx(tx);
+ IgniteInternalTx tx0 = cctx.tm().anyActiveThreadTx(threadId, tx);
if (tx0 != null)
topVer = tx0.topologyVersionSnapshot();
http://git-wip-us.apache.org/repos/asf/ignite/blob/96dc238f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index 243c4cb..d2b803a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -605,11 +605,11 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
}
/**
+ * @param threadId Thread ID.
+ * @param ignore Transaction to ignore.
* @return Any transaction associated with the current thread.
*/
- public IgniteInternalTx anyActiveThreadTx(IgniteInternalTx ignore) {
- long threadId = Thread.currentThread().getId();
-
+ public IgniteInternalTx anyActiveThreadTx(long threadId, IgniteInternalTx ignore) {
IgniteInternalTx tx = threadMap.get(threadId);
if (tx != null && tx.topologyVersionSnapshot() != null)
http://git-wip-us.apache.org/repos/asf/ignite/blob/96dc238f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheNearLockValueSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheNearLockValueSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheNearLockValueSelfTest.java
index d6f0257..100acfe 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheNearLockValueSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheNearLockValueSelfTest.java
@@ -35,6 +35,8 @@ import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.spi.IgniteSpiException;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.transactions.Transaction;
@@ -45,6 +47,9 @@ import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_REA
*
*/
public class IgniteCacheNearLockValueSelfTest extends GridCommonAbstractTest {
+ /** */
+ private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
/** {@inheritDoc} */
@Override protected void beforeTestsStarted() throws Exception {
startGridsMultiThreaded(2);
@@ -61,7 +66,7 @@ public class IgniteCacheNearLockValueSelfTest extends GridCommonAbstractTest {
@Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(gridName);
- cfg.setDiscoverySpi(new TcpDiscoverySpi().setForceServerMode(true));
+ cfg.setDiscoverySpi(new TcpDiscoverySpi().setForceServerMode(true).setIpFinder(IP_FINDER));
if (getTestGridName(0).equals(gridName))
cfg.setClientMode(true);
@@ -126,7 +131,7 @@ public class IgniteCacheNearLockValueSelfTest extends GridCommonAbstractTest {
private Collection<GridNearLockRequest> reqs = new ConcurrentLinkedDeque<>();
/** {@inheritDoc} */
- @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackClosure)
+ @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackC)
throws IgniteSpiException {
if (msg instanceof GridIoMessage) {
GridIoMessage ioMsg = (GridIoMessage)msg;
@@ -135,7 +140,7 @@ public class IgniteCacheNearLockValueSelfTest extends GridCommonAbstractTest {
reqs.add((GridNearLockRequest)ioMsg.message());
}
- super.sendMessage(node, msg, ackClosure);
+ super.sendMessage(node, msg, ackC);
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/96dc238f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheStoreValueAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheStoreValueAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheStoreValueAbstractTest.java
index 70c5dc3..c0fb11d 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheStoreValueAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheStoreValueAbstractTest.java
@@ -107,7 +107,7 @@ public abstract class IgniteCacheStoreValueAbstractTest extends IgniteCacheAbstr
/** {@inheritDoc} */
@Override protected long getTestTimeout() {
- return 2 * 60_000;
+ return 3 * 60_000;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/96dc238f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteStartCacheInTransactionSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteStartCacheInTransactionSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteStartCacheInTransactionSelfTest.java
index 1e10a03..6212c4e 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteStartCacheInTransactionSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteStartCacheInTransactionSelfTest.java
@@ -31,10 +31,10 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.transactions.Transaction;
-import org.apache.ignite.transactions.TransactionConcurrency;
-import org.apache.ignite.transactions.TransactionIsolation;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
/**
* Check starting cache in transaction.
@@ -43,6 +43,9 @@ public class IgniteStartCacheInTransactionSelfTest extends GridCommonAbstractTes
/** */
private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+ /** */
+ private static final String EXPECTED_MSG = "Cannot start/stop cache within lock or transaction.";
+
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(gridName);
@@ -97,8 +100,7 @@ public class IgniteStartCacheInTransactionSelfTest extends GridCommonAbstractTes
final String key = "key";
final String val = "val";
- try (Transaction tx = ignite.transactions().txStart(
- TransactionConcurrency.PESSIMISTIC, TransactionIsolation.REPEATABLE_READ)){
+ try (Transaction tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)){
ignite.cache(null).put(key, val);
GridTestUtils.assertThrows(log, new Callable<Object>() {
@@ -107,7 +109,7 @@ public class IgniteStartCacheInTransactionSelfTest extends GridCommonAbstractTes
return null;
}
- }, IgniteException.class, "Cannot start/stop cache within transaction.");
+ }, IgniteException.class, EXPECTED_MSG);
tx.commit();
}
@@ -122,8 +124,7 @@ public class IgniteStartCacheInTransactionSelfTest extends GridCommonAbstractTes
final String key = "key";
final String val = "val";
- try (Transaction tx = ignite.transactions().txStart(
- TransactionConcurrency.PESSIMISTIC, TransactionIsolation.REPEATABLE_READ)){
+ try (Transaction tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)){
ignite.cache(null).put(key, val);
GridTestUtils.assertThrows(log, new Callable<Object>() {
@@ -132,7 +133,7 @@ public class IgniteStartCacheInTransactionSelfTest extends GridCommonAbstractTes
return null;
}
- }, IgniteException.class, "Cannot start/stop cache within transaction.");
+ }, IgniteException.class, EXPECTED_MSG);
tx.commit();
}
@@ -147,8 +148,7 @@ public class IgniteStartCacheInTransactionSelfTest extends GridCommonAbstractTes
final String key = "key";
final String val = "val";
- try (Transaction tx = ignite.transactions().txStart(
- TransactionConcurrency.PESSIMISTIC, TransactionIsolation.REPEATABLE_READ)){
+ try (Transaction tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)){
ignite.cache(null).put(key, val);
GridTestUtils.assertThrows(log, new Callable<Object>() {
@@ -157,7 +157,7 @@ public class IgniteStartCacheInTransactionSelfTest extends GridCommonAbstractTes
return null;
}
- }, IgniteException.class, "Cannot start/stop cache within transaction.");
+ }, IgniteException.class, EXPECTED_MSG);
tx.commit();
}
@@ -172,8 +172,7 @@ public class IgniteStartCacheInTransactionSelfTest extends GridCommonAbstractTes
final String key = "key";
final String val = "val";
- try (Transaction tx = ignite.transactions().txStart(
- TransactionConcurrency.PESSIMISTIC, TransactionIsolation.REPEATABLE_READ)){
+ try (Transaction tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)){
ignite.cache(null).put(key, val);
GridTestUtils.assertThrows(log, new Callable<Object>() {
@@ -182,7 +181,7 @@ public class IgniteStartCacheInTransactionSelfTest extends GridCommonAbstractTes
return null;
}
- }, IgniteException.class, "Cannot start/stop cache within transaction.");
+ }, IgniteException.class, EXPECTED_MSG);
tx.commit();
}
@@ -197,8 +196,7 @@ public class IgniteStartCacheInTransactionSelfTest extends GridCommonAbstractTes
final String key = "key";
final String val = "val";
- try (Transaction tx = ignite.transactions().txStart(
- TransactionConcurrency.PESSIMISTIC, TransactionIsolation.REPEATABLE_READ)){
+ try (Transaction tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)){
ignite.cache(null).put(key, val);
GridTestUtils.assertThrows(log, new Callable<Object>() {
@@ -207,7 +205,7 @@ public class IgniteStartCacheInTransactionSelfTest extends GridCommonAbstractTes
return null;
}
- }, IgniteException.class, "Cannot start/stop cache within transaction.");
+ }, IgniteException.class, EXPECTED_MSG);
tx.commit();
}
@@ -222,8 +220,7 @@ public class IgniteStartCacheInTransactionSelfTest extends GridCommonAbstractTes
final String key = "key";
final String val = "val";
- try (Transaction tx = ignite.transactions().txStart(
- TransactionConcurrency.PESSIMISTIC, TransactionIsolation.REPEATABLE_READ)){
+ try (Transaction tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)){
ignite.cache(null).put(key, val);
GridTestUtils.assertThrows(log, new Callable<Object>() {
@@ -232,7 +229,7 @@ public class IgniteStartCacheInTransactionSelfTest extends GridCommonAbstractTes
return null;
}
- }, IgniteException.class, "Cannot start/stop cache within transaction.");
+ }, IgniteException.class, EXPECTED_MSG);
tx.commit();
}
@@ -259,7 +256,7 @@ public class IgniteStartCacheInTransactionSelfTest extends GridCommonAbstractTes
return null;
}
- }, IgniteException.class, "Cannot start/stop cache within lock.");
+ }, IgniteException.class, EXPECTED_MSG);
lock.unlock();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/96dc238f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheGetInsideLockChangingTopologyTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheGetInsideLockChangingTopologyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheGetInsideLockChangingTopologyTest.java
new file mode 100644
index 0000000..7073a94
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheGetInsideLockChangingTopologyTest.java
@@ -0,0 +1,475 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
+import javax.cache.CacheException;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.NearCacheConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.cache.GridCacheAlwaysEvictionPolicy;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionConcurrency;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
+import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+
+/**
+ *
+ */
+public class CacheGetInsideLockChangingTopologyTest extends GridCommonAbstractTest {
+ /** */
+ private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+ /** */
+ private static ThreadLocal<Boolean> client = new ThreadLocal<>();
+
+ /** */
+ private static final int SRVS = 3;
+
+ /** */
+ private static final int CLIENTS = 2;
+
+ /** */
+ private static final String TX_CACHE1 = "tx1";
+
+ /** */
+ private static final String TX_CACHE2 = "tx2";
+
+ /** */
+ private static final String ATOMIC_CACHE = "atomic";
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1);
+
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER);
+
+ Boolean clientMode = client.get();
+
+ client.set(null);
+
+ if (clientMode != null && clientMode)
+ cfg.setClientMode(true);
+ else {
+ cfg.setCacheConfiguration(cacheConfiguration(TX_CACHE1, TRANSACTIONAL),
+ cacheConfiguration(TX_CACHE2, TRANSACTIONAL),
+ cacheConfiguration(ATOMIC_CACHE, ATOMIC));
+ }
+
+ return cfg;
+ }
+
+ /**
+ * @param name Cache name.
+ * @param atomicityMode Atomicity mode.
+ * @return Cache configuration.
+ */
+ private CacheConfiguration cacheConfiguration(String name, CacheAtomicityMode atomicityMode) {
+ CacheConfiguration ccfg = new CacheConfiguration();
+
+ ccfg.setName(name);
+ ccfg.setAtomicityMode(atomicityMode);
+ ccfg.setWriteSynchronizationMode(FULL_SYNC);
+ ccfg.setBackups(1);
+
+ return ccfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ super.beforeTestsStarted();
+
+ startGridsMultiThreaded(SRVS);
+
+ client.set(true);
+
+ Ignite client1 = startGrid(SRVS);
+
+ assertTrue(client1.configuration().isClientMode());
+
+ client.set(true);
+
+ Ignite client2 = startGrid(SRVS + 1);
+
+ assertTrue(client2.configuration().isClientMode());
+
+ client2.createNearCache(TX_CACHE1,
+ new NearCacheConfiguration<>().setNearEvictionPolicy(new GridCacheAlwaysEvictionPolicy<>()));
+
+ client2.createNearCache(TX_CACHE2,
+ new NearCacheConfiguration<>().setNearEvictionPolicy(new GridCacheAlwaysEvictionPolicy<>()));
+
+ client2.createNearCache(ATOMIC_CACHE,
+ new NearCacheConfiguration<>().setNearEvictionPolicy(new GridCacheAlwaysEvictionPolicy<>()));
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ stopAllGrids();
+
+ super.afterTestsStopped();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTxGetInsideLockStopPrimary() throws Exception {
+ getInsideLockStopPrimary(ignite(SRVS), TX_CACHE1);
+ getInsideLockStopPrimary(ignite(SRVS + 1), TX_CACHE1);
+
+ getInsideLockStopPrimary(ignite(SRVS), TX_CACHE2);
+ getInsideLockStopPrimary(ignite(SRVS + 1), TX_CACHE2);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testAtomicGetInsideLockStopPrimary() throws Exception {
+ getInsideLockStopPrimary(ignite(SRVS), ATOMIC_CACHE);
+
+ getInsideLockStopPrimary(ignite(SRVS + 1), ATOMIC_CACHE);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testAtomicGetInsideTxStopPrimary() throws Exception {
+ getInsideTxStopPrimary(ignite(SRVS), ATOMIC_CACHE);
+
+ getInsideTxStopPrimary(ignite(SRVS + 1), ATOMIC_CACHE);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testReadCommittedPessimisticStopPrimary() throws Exception {
+ getReadCommittedStopPrimary(ignite(SRVS), TX_CACHE1, PESSIMISTIC);
+ getReadCommittedStopPrimary(ignite(SRVS + 1), TX_CACHE1, PESSIMISTIC);
+
+ getReadCommittedStopPrimary(ignite(SRVS), TX_CACHE2, PESSIMISTIC);
+ getReadCommittedStopPrimary(ignite(SRVS + 1), TX_CACHE2, PESSIMISTIC);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testReadCommittedOptimisticStopPrimary() throws Exception {
+ getReadCommittedStopPrimary(ignite(SRVS), TX_CACHE1, OPTIMISTIC);
+ getReadCommittedStopPrimary(ignite(SRVS + 1), TX_CACHE1, OPTIMISTIC);
+
+ getReadCommittedStopPrimary(ignite(SRVS), TX_CACHE2, OPTIMISTIC);
+ getReadCommittedStopPrimary(ignite(SRVS + 1), TX_CACHE2, OPTIMISTIC);
+ }
+
+ /**
+ * @param ignite Node.
+ * @param cacheName Cache name.
+ * @param concurrency Transaction concurrency.
+ * @throws Exception If failed.
+ */
+ private void getReadCommittedStopPrimary(Ignite ignite,
+ String cacheName,
+ TransactionConcurrency concurrency) throws Exception {
+ IgniteCache<Integer, Integer> txCache = ignite.cache(TX_CACHE1);
+
+ IgniteCache<Integer, Integer> getCache = ignite.cache(cacheName);
+
+ final int NEW_NODE = SRVS + CLIENTS;
+
+ Ignite srv = startGrid(NEW_NODE);
+
+ try {
+ Integer key = primaryKey(srv.cache(cacheName));
+
+ Integer txKey = nearKey(srv.cache(cacheName));
+
+ srv.cache(cacheName).put(key, 1);
+
+ IgniteInternalFuture<?> stopFut = GridTestUtils.runAsync(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ U.sleep(500);
+
+ log.info("Stop node.");
+
+ stopGrid(NEW_NODE);
+
+ log.info("Node stopped.");
+
+ return null;
+ }
+ }, "stop-thread");
+
+ try (Transaction tx = ignite.transactions().txStart(concurrency, READ_COMMITTED)) {
+ txCache.put(txKey, 1);
+
+ while (!stopFut.isDone())
+ assertEquals(1, (Object)getCache.get(key));
+
+ tx.commit();
+ }
+ }
+ finally {
+ stopGrid(NEW_NODE);
+ }
+ }
+
+ /**
+ * @param ignite Node.
+ * @param cacheName Cache name.
+ * @throws Exception If failed.
+ */
+ private void getInsideLockStopPrimary(Ignite ignite, String cacheName) throws Exception {
+ IgniteCache<Integer, Integer> lockCache = ignite.cache(TX_CACHE1);
+
+ IgniteCache<Integer, Integer> getCache = ignite.cache(cacheName);
+
+ final int NEW_NODE = SRVS + CLIENTS;
+
+ Ignite srv = startGrid(NEW_NODE);
+
+ try {
+ Integer key = primaryKey(srv.cache(cacheName));
+
+ getCache.put(key, 1);
+
+ IgniteInternalFuture<?> stopFut = GridTestUtils.runAsync(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ U.sleep(500);
+
+ log.info("Stop node.");
+
+ stopGrid(NEW_NODE);
+
+ log.info("Node stopped.");
+
+ return null;
+ }
+ }, "stop-thread");
+
+ Lock lock = lockCache.lock(key + 1);
+
+ lock.lock();
+
+ try {
+ while (!stopFut.isDone())
+ assertEquals(1, (Object)getCache.get(key));
+ }
+ finally {
+ lock.unlock();
+ }
+
+ stopFut.get();
+ }
+ finally {
+ stopGrid(NEW_NODE);
+ }
+ }
+
+ /**
+ * @param ignite Node.
+ * @param cacheName Cache name.
+ * @throws Exception If failed.
+ */
+ private void getInsideTxStopPrimary(Ignite ignite, String cacheName) throws Exception {
+ IgniteCache<Integer, Integer> txCache = ignite.cache(TX_CACHE1);
+
+ IgniteCache<Integer, Integer> getCache = ignite.cache(cacheName);
+
+ final int NEW_NODE = SRVS + CLIENTS;
+
+ Ignite srv = startGrid(NEW_NODE);
+
+ try {
+ Integer key = primaryKey(srv.cache(cacheName));
+
+ getCache.put(key, 1);
+
+ IgniteInternalFuture<?> stopFut = GridTestUtils.runAsync(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ U.sleep(500);
+
+ log.info("Stop node.");
+
+ stopGrid(NEW_NODE);
+
+ log.info("Node stopped.");
+
+ return null;
+ }
+ }, "stop-thread");
+
+ try (Transaction tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ txCache.get(key + 1);
+
+ while (!stopFut.isDone())
+ assertEquals(1, (Object)getCache.get(key));
+
+ tx.commit();
+ }
+ }
+ finally {
+ stopGrid(NEW_NODE);
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testMultithreaded() throws Exception {
+ final AtomicBoolean finished = new AtomicBoolean();
+
+ final int NEW_NODE = SRVS + CLIENTS;
+
+ final AtomicInteger stopIdx = new AtomicInteger();
+
+ IgniteInternalFuture<?> restartFut = GridTestUtils.runMultiThreadedAsync(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ int idx = stopIdx.getAndIncrement();
+
+ int node = NEW_NODE + idx;
+
+ while (!finished.get()) {
+ log.info("Start node: " + node);
+
+ startGrid(node);
+
+ U.sleep(300);
+
+ log.info("Stop node: " + node);
+
+ stopGrid(node);
+ }
+
+ return null;
+ }
+ }, 2, "stop-thread");
+
+ try {
+ final long stopTime = System.currentTimeMillis() + 60_000;
+
+ final AtomicInteger idx = new AtomicInteger();
+
+ final int KEYS = 100_000;
+
+ GridTestUtils.runMultiThreaded(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ int node = idx.getAndIncrement() % (SRVS + CLIENTS);
+
+ Ignite ignite = ignite(node);
+
+ IgniteCache<Integer, Integer> txCache1 = ignite.cache(TX_CACHE1);
+ IgniteCache<Integer, Integer> txCache2 = ignite.cache(TX_CACHE2);
+ IgniteCache<Integer, Integer> atomicCache = ignite.cache(ATOMIC_CACHE);
+
+ ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+ while (U.currentTimeMillis() < stopTime) {
+ Integer lockKey = rnd.nextInt(KEYS, KEYS + 1000);
+
+ Lock lock = txCache1.lock(lockKey);
+
+ try {
+ lock.lock();
+
+ try {
+ executeGet(txCache1);
+
+ executeGet(txCache2);
+
+ executeGet(atomicCache);
+ } finally {
+ lock.unlock();
+ }
+
+ try (Transaction tx = ignite.transactions().txStart(PESSIMISTIC, READ_COMMITTED)) {
+ txCache1.put(lockKey, lockKey);
+
+ executeGet(txCache1);
+
+ executeGet(txCache2);
+
+ executeGet(atomicCache);
+
+ tx.commit();
+ }
+ }
+ catch (IgniteException | CacheException e) {
+ log.info("Error: " + e);
+ }
+ }
+
+ return null;
+ }
+
+ private void executeGet(IgniteCache<Integer, Integer> cache) {
+ ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+ for (int i = 0; i < 100; i++)
+ cache.get(rnd.nextInt(KEYS));
+
+ Set<Integer> keys = new HashSet<>();
+
+ for (int i = 0; i < 100; i++) {
+ keys.add(rnd.nextInt(KEYS));
+
+ if (keys.size() == 20) {
+ cache.getAll(keys);
+
+ keys.clear();
+ }
+ }
+
+ cache.getAll(keys);
+ }
+ }, 10, "test-thread");
+
+ finished.set(true);
+
+ restartFut.get();
+ }
+ finally {
+ finished.set(true);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/96dc238f/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite3.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite3.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite3.java
index 4b04c05..85b6e93 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite3.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite3.java
@@ -18,6 +18,7 @@
package org.apache.ignite.testsuites;
import junit.framework.TestSuite;
+import org.apache.ignite.internal.processors.cache.distributed.CacheGetInsideLockChangingTopologyTest;
import org.apache.ignite.internal.processors.cache.distributed.dht.IgniteCachePutRetryAtomicSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.dht.IgniteCachePutRetryTransactionalSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.IgniteCachePutRetryAtomicPrimaryWriteOrderSelfTest;
@@ -36,6 +37,7 @@ public class IgniteCacheFailoverTestSuite3 extends TestSuite {
suite.addTestSuite(IgniteCachePutRetryAtomicSelfTest.class);
suite.addTestSuite(IgniteCachePutRetryAtomicPrimaryWriteOrderSelfTest.class);
suite.addTestSuite(IgniteCachePutRetryTransactionalSelfTest.class);
+ suite.addTestSuite(CacheGetInsideLockChangingTopologyTest.class);
return suite;
}