You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/08/21 10:13:20 UTC
[14/34] incubator-ignite git commit: IGNITE-1265 - Added ready future
to ClusterTopologyException,
added test for correct explicit transaction retries.
IGNITE-1265 - Added ready future to ClusterTopologyException, added test for correct explicit transaction retries.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/8f1c1c0d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/8f1c1c0d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/8f1c1c0d
Branch: refs/heads/master
Commit: 8f1c1c0d7406f9b43681a6da69560c8ba6bbb737
Parents: 8ced207
Author: Alexey Goncharuk <ag...@gridgain.com>
Authored: Wed Aug 19 22:55:48 2015 -0700
Committer: Alexey Goncharuk <ag...@gridgain.com>
Committed: Wed Aug 19 22:55:48 2015 -0700
----------------------------------------------------------------------
.../cluster/ClusterTopologyException.java | 18 ++
.../ClusterTopologyCheckedException.java | 18 ++
.../cache/GridCacheSharedContext.java | 15 ++
.../colocated/GridDhtColocatedLockFuture.java | 8 +-
.../distributed/near/GridNearLockFuture.java | 8 +-
.../near/GridNearOptimisticTxPrepareFuture.java | 7 +-
.../GridNearPessimisticTxPrepareFuture.java | 7 +-
.../ignite/internal/util/IgniteUtils.java | 9 +-
.../IgniteCachePutRetryAbstractSelfTest.java | 1 +
...gniteCachePutRetryTransactionalSelfTest.java | 179 +++++++++++++++++++
10 files changed, 263 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8f1c1c0d/modules/core/src/main/java/org/apache/ignite/cluster/ClusterTopologyException.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cluster/ClusterTopologyException.java b/modules/core/src/main/java/org/apache/ignite/cluster/ClusterTopologyException.java
index d28c409..61bc367 100644
--- a/modules/core/src/main/java/org/apache/ignite/cluster/ClusterTopologyException.java
+++ b/modules/core/src/main/java/org/apache/ignite/cluster/ClusterTopologyException.java
@@ -18,6 +18,7 @@
package org.apache.ignite.cluster;
import org.apache.ignite.*;
+import org.apache.ignite.lang.*;
import org.jetbrains.annotations.*;
/**
@@ -27,6 +28,9 @@ public class ClusterTopologyException extends IgniteException {
/** */
private static final long serialVersionUID = 0L;
+ /** Retry ready future. */
+ private transient IgniteFuture<?> readyFut;
+
/**
* Creates new topology exception with given error message.
*
@@ -46,4 +50,18 @@ public class ClusterTopologyException extends IgniteException {
public ClusterTopologyException(String msg, @Nullable Throwable cause) {
super(msg, cause);
}
+
+ /**
+ * @return Retry ready future.
+ */
+ public IgniteFuture<?> retryReadyFuture() {
+ return readyFut;
+ }
+
+ /**
+ * @param readyFut Retry ready future.
+ */
+ public void retryReadyFuture(IgniteFuture<?> readyFut) {
+ this.readyFut = readyFut;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8f1c1c0d/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterTopologyCheckedException.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterTopologyCheckedException.java b/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterTopologyCheckedException.java
index 8f985b4..2d7b0de 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterTopologyCheckedException.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterTopologyCheckedException.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.cluster;
import org.apache.ignite.*;
+import org.apache.ignite.internal.*;
import org.jetbrains.annotations.*;
/**
@@ -27,6 +28,9 @@ public class ClusterTopologyCheckedException extends IgniteCheckedException {
/** */
private static final long serialVersionUID = 0L;
+ /** Next topology version to wait. */
+ private transient IgniteInternalFuture<?> readyFut;
+
/**
* Creates new topology exception with given error message.
*
@@ -46,4 +50,18 @@ public class ClusterTopologyCheckedException extends IgniteCheckedException {
public ClusterTopologyCheckedException(String msg, @Nullable Throwable cause) {
super(msg, cause);
}
+
+ /**
+ * @return Retry ready future.
+ */
+ public IgniteInternalFuture<?> retryReadyFuture() {
+ return readyFut;
+ }
+
+ /**
+ * @param readyFut Retry ready future.
+ */
+ public void retryReadyFuture(IgniteInternalFuture<?> readyFut) {
+ this.readyFut = readyFut;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8f1c1c0d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
index 7f4daff..f7763ec 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
@@ -444,6 +444,21 @@ public class GridCacheSharedContext<K, V> {
}
/**
+ * Gets ready future for the next affinity topology version (used in cases when a node leaves grid).
+ *
+ * @param curVer Current topology version (before a node left grid).
+ * @return Ready future.
+ */
+ public IgniteInternalFuture<?> nextAffinityReadyFuture(AffinityTopologyVersion curVer) {
+ if (curVer == null)
+ return null;
+
+ AffinityTopologyVersion nextVer = new AffinityTopologyVersion(curVer.topologyVersion() + 1);
+
+ return exchMgr.affinityReadyFuture(nextVer);
+ }
+
+ /**
* @param tx Transaction to check.
* @param activeCacheIds Active cache IDs.
* @param cacheCtx Cache context.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8f1c1c0d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
index c784948..90ca8df 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
@@ -1125,8 +1125,12 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
* @return Topology exception with user-friendly message.
*/
private ClusterTopologyCheckedException newTopologyException(@Nullable Throwable nested, UUID nodeId) {
- return new ClusterTopologyCheckedException("Failed to acquire lock for keys (primary node left grid, " +
- "retry transaction if possible) [keys=" + keys + ", node=" + nodeId + ']', nested);
+ ClusterTopologyCheckedException topEx = new ClusterTopologyCheckedException("Failed to acquire lock for keys " +
+ "(primary node left grid, retry transaction if possible) [keys=" + keys + ", node=" + nodeId + ']', nested);
+
+ topEx.retryReadyFuture(cctx.shared().nextAffinityReadyFuture(topVer.get()));
+
+ return topEx;
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8f1c1c0d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
index b7e0d73..2815194 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
@@ -1232,8 +1232,12 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
* @return Topology exception with user-friendly message.
*/
private ClusterTopologyCheckedException newTopologyException(@Nullable Throwable nested, UUID nodeId) {
- return new ClusterTopologyCheckedException("Failed to acquire lock for keys (primary node left grid, " +
- "retry transaction if possible) [keys=" + keys + ", node=" + nodeId + ']', nested);
+ ClusterTopologyCheckedException topEx = new ClusterTopologyCheckedException("Failed to acquire lock for keys " +
+ "(primary node left grid, retry transaction if possible) [keys=" + keys + ", node=" + nodeId + ']', nested);
+
+ topEx.retryReadyFuture(cctx.shared().nextAffinityReadyFuture(topVer.get()));
+
+ return topEx;
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8f1c1c0d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
index 2b86672..4bb4c67 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
@@ -100,7 +100,12 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd
MiniFuture f = (MiniFuture) fut;
if (f.node().id().equals(nodeId)) {
- f.onResult(new ClusterTopologyCheckedException("Remote node left grid: " + nodeId));
+ ClusterTopologyCheckedException e = new ClusterTopologyCheckedException("Remote node left grid: " +
+ nodeId);
+
+ e.retryReadyFuture(cctx.nextAffinityReadyFuture(tx.topologyVersion()));
+
+ f.onResult(e);
found = true;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8f1c1c0d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
index 7006114..3d43797 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
@@ -72,7 +72,12 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
MiniFuture f = (MiniFuture)fut;
if (f.node().id().equals(nodeId)) {
- f.onNodeLeft(new ClusterTopologyCheckedException("Remote node left grid: " + nodeId));
+ ClusterTopologyCheckedException e = new ClusterTopologyCheckedException("Remote node left grid: " +
+ nodeId);
+
+ e.retryReadyFuture(cctx.nextAffinityReadyFuture(tx.topologyVersion()));
+
+ f.onNodeLeft(e);
found = true;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8f1c1c0d/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index 3366256..e259084 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -31,6 +31,7 @@ import org.apache.ignite.internal.mxbean.*;
import org.apache.ignite.internal.processors.cache.*;
import org.apache.ignite.internal.processors.cache.version.*;
import org.apache.ignite.internal.transactions.*;
+import org.apache.ignite.internal.util.future.*;
import org.apache.ignite.internal.util.io.*;
import org.apache.ignite.internal.util.ipc.shmem.*;
import org.apache.ignite.internal.util.lang.*;
@@ -580,7 +581,13 @@ public abstract class IgniteUtils {
m.put(ClusterTopologyCheckedException.class, new C1<IgniteCheckedException, IgniteException>() {
@Override public IgniteException apply(IgniteCheckedException e) {
- return new ClusterTopologyException(e.getMessage(), e);
+ ClusterTopologyException topEx = new ClusterTopologyException(e.getMessage(), e);
+
+ ClusterTopologyCheckedException checked = (ClusterTopologyCheckedException)e;
+
+ topEx.retryReadyFuture(new IgniteFutureImpl<>(checked.retryReadyFuture()));
+
+ return topEx;
}
});
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8f1c1c0d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java
index 9abc5c8..6624f8e 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java
@@ -54,6 +54,7 @@ public abstract class IgniteCachePutRetryAbstractSelfTest extends GridCacheAbstr
cfg.setAtomicWriteOrderMode(writeOrderMode());
cfg.setBackups(1);
+ cfg.setRebalanceMode(CacheRebalanceMode.SYNC);
return cfg;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8f1c1c0d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java
index 91c454a..9a6bb31 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java
@@ -19,17 +19,29 @@ package org.apache.ignite.internal.processors.cache.distributed.dht;
import org.apache.ignite.*;
import org.apache.ignite.cache.*;
+import org.apache.ignite.cluster.*;
import org.apache.ignite.internal.*;
import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
import org.apache.ignite.testframework.*;
+import org.apache.ignite.transactions.*;
+import javax.cache.*;
+import javax.cache.processor.*;
+import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
+import static org.apache.ignite.transactions.TransactionConcurrency.*;
+import static org.apache.ignite.transactions.TransactionIsolation.*;
+
/**
*
*/
public class IgniteCachePutRetryTransactionalSelfTest extends IgniteCachePutRetryAbstractSelfTest {
+ /** */
+ private static final int FACTOR = 1000;
+
/** {@inheritDoc} */
@Override protected CacheAtomicityMode atomicityMode() {
return CacheAtomicityMode.TRANSACTIONAL;
@@ -71,4 +83,171 @@ public class IgniteCachePutRetryTransactionalSelfTest extends IgniteCachePutRetr
finished.set(true);
fut.get();
}
+
+ /** {@inheritDoc} */
+ public void testExplicitTransactionRetries() throws Exception {
+ final AtomicInteger idx = new AtomicInteger();
+ int threads = 8;
+
+ final AtomicReferenceArray<Exception> err = new AtomicReferenceArray<>(threads);
+
+ IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Object>() {
+ @Override
+ public Object call() throws Exception {
+ int th = idx.getAndIncrement();
+ int base = th * FACTOR;
+
+ Ignite ignite = ignite(0);
+ final IgniteCache<Object, Object> cache = ignite.cache(null);
+
+ try {
+ for (int i = 0; i < FACTOR; i++) {
+ doInTransaction(ignite, new ProcessCallable(cache, base, i));
+
+ if (i > 0 && i % 500 == 0)
+ info("Done: " + i);
+ }
+ }
+ catch (Exception e) {
+ err.set(th, e);
+ }
+
+ return null;
+ }
+ }, threads, "tx-runner");
+
+ while (!fut.isDone()) {
+ int stopIdx = ThreadLocalRandom.current().nextInt(2, 4); // Random in [2, 3].
+
+ stopGrid(stopIdx);
+
+ startGrid(stopIdx);
+ }
+
+ for (int i = 0; i < threads; i++) {
+ Exception error = err.get(i);
+
+ if (error != null)
+ throw error;
+ }
+
+ // Verify contents of the cache.
+ for (int g = 0; g < gridCount(); g++) {
+ IgniteCache<Object, Object> cache = ignite(g).cache(null);
+
+ for (int th = 0; th < threads; th++) {
+ int base = th * FACTOR;
+
+ String key = "key-" + base;
+
+ Set<String> set = (Set<String>)cache.get(key);
+
+ assertNotNull("Missing set for key: " + key, set);
+ assertEquals(FACTOR, set.size());
+
+ for (int i = 0; i < FACTOR; i++) {
+ assertEquals("value-" + i, cache.get("key-" + base + "-" + i));
+ assertTrue(set.contains("value-" + i));
+ }
+ }
+ }
+ }
+
+ /**
+ * @param ignite Ignite instance.
+ * @param clo Closure.
+ * @return Result of closure execution.
+ * @throws Exception
+ */
+ private <T> T doInTransaction(Ignite ignite, Callable<T> clo) throws Exception {
+ while (true) {
+ try (Transaction tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ T res = clo.call();
+
+ tx.commit();
+
+ return res;
+ }
+ catch (CacheException e) {
+ if (e.getCause() instanceof ClusterTopologyException) {
+ ClusterTopologyException topEx = (ClusterTopologyException)e.getCause();
+
+ topEx.retryReadyFuture().get();
+ }
+ else
+ throw e;
+ }
+ catch (ClusterTopologyException e) {
+ IgniteFuture<?> fut = e.retryReadyFuture();
+
+ fut.get();
+ }
+ catch (TransactionRollbackException ignore) {
+ // Safe to retry right away.
+ }
+ }
+ }
+
+ /**
+ * Callable to process inside transaction.
+ */
+ private static class ProcessCallable implements Callable<Void> {
+ /** */
+ private IgniteCache cache;
+
+ /** */
+ private int base;
+
+ /** */
+ private int i;
+
+ /**
+ * @param cache Cache.
+ * @param base Base index.
+ * @param i Iteration index.
+ */
+ private ProcessCallable(IgniteCache<Object, Object> cache, int base, int i) {
+ this.cache = cache;
+ this.base = base;
+ this.i = i;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Void call() throws Exception {
+ ((IgniteCache<String, String>)cache).put("key-" + base + "-" + i, "value-" + i);
+
+ ((IgniteCache<String, Set<String>>)cache).invoke("key-" + base, new AddEntryProcessor("value-" + i));
+
+ return null;
+ }
+ }
+
+ /**
+ *
+ */
+ private static class AddEntryProcessor implements CacheEntryProcessor<String, Set<String>, Void> {
+ /** */
+ private String addVal;
+
+ /**
+ * @param addVal Value to add.
+ */
+ private AddEntryProcessor(String addVal) {
+ this.addVal = addVal;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Void process(MutableEntry<String, Set<String>> entry, Object... arguments) throws EntryProcessorException {
+ Set<String> set = entry.getValue();
+
+ if (set == null)
+ set = new HashSet<>();
+
+ set.add(addVal);
+
+ entry.setValue(set);
+
+ return null;
+ }
+ }
}