You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ak...@apache.org on 2015/07/03 05:27:01 UTC
[02/32] incubator-ignite git commit: IGNITE-621 - Added automatic
retries for atomics.
IGNITE-621 - Added automatic retries for atomics.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/5505b4d3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/5505b4d3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/5505b4d3
Branch: refs/heads/ignite-gg-10416
Commit: 5505b4d3d50caf41246d1194c52298a6df47a239
Parents: 3787a9d
Author: Alexey Goncharuk <ag...@gridgain.com>
Authored: Sun Jun 21 13:09:24 2015 -0700
Committer: Alexey Goncharuk <ag...@gridgain.com>
Committed: Sun Jun 21 13:09:24 2015 -0700
----------------------------------------------------------------------
.../processors/cache/GridCacheUtils.java | 42 ++++++++++++++++++++
.../datastructures/GridCacheAtomicLongImpl.java | 25 ++++++------
.../GridCacheAtomicSequenceImpl.java | 11 ++---
.../GridCacheAtomicStampedImpl.java | 21 +++++-----
.../GridCacheCountDownLatchImpl.java | 16 +++-----
.../IgniteCachePutRetryAbstractSelfTest.java | 13 ++++++
...gniteCachePutRetryTransactionalSelfTest.java | 39 ++++++++++++++++++
.../IgniteCacheFailoverTestSuite.java | 3 ++
8 files changed, 131 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5505b4d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
index 8c26046..f88e288 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
@@ -30,6 +30,7 @@ import org.apache.ignite.internal.processors.cache.distributed.*;
import org.apache.ignite.internal.processors.cache.distributed.dht.*;
import org.apache.ignite.internal.processors.cache.transactions.*;
import org.apache.ignite.internal.processors.cache.version.*;
+import org.apache.ignite.internal.transactions.*;
import org.apache.ignite.internal.util.lang.*;
import org.apache.ignite.internal.util.typedef.*;
import org.apache.ignite.internal.util.typedef.T2;
@@ -1699,4 +1700,45 @@ public class GridCacheUtils {
ctx.resource().cleanupGeneric(lsnr);
}
}
+
+ /**
+ * @param c Closure to retry.
+ * @param <S> Closure type.
+ * @return Wrapped closure.
+ */
+ public static <S> Callable<S> retryTopologySafe(final Callable<S> c ) {
+ return new Callable<S>() {
+ @Override public S call() throws Exception {
+ int retries = GridCacheAdapter.MAX_RETRIES;
+
+ IgniteCheckedException err = null;
+
+ for (int i = 0; i < retries; i++) {
+ try {
+ return c.call();
+ }
+ catch (IgniteCheckedException e) {
+ if (X.hasCause(e, ClusterTopologyCheckedException.class) ||
+ X.hasCause(e, IgniteTxRollbackCheckedException.class) ||
+ X.hasCause(e, CachePartialUpdateCheckedException.class)) {
+ if (i < retries - 1) {
+ err = e;
+
+ U.sleep(1);
+
+ continue;
+ }
+
+ throw e;
+ }
+ else
+ throw e;
+ }
+ }
+
+ // Should never happen.
+ throw err;
+ }
+ };
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5505b4d3/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java
index b18d35a..5e9245d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java
@@ -31,6 +31,7 @@ import java.util.concurrent.*;
import static org.apache.ignite.transactions.TransactionConcurrency.*;
import static org.apache.ignite.transactions.TransactionIsolation.*;
+import static org.apache.ignite.internal.util.typedef.internal.CU.*;
/**
* Cache atomic long implementation.
@@ -78,7 +79,7 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext
};
/** Callable for {@link #incrementAndGet()}. */
- private final Callable<Long> incAndGetCall = new Callable<Long>() {
+ private final Callable<Long> incAndGetCall = retryTopologySafe(new Callable<Long>() {
@Override public Long call() throws Exception {
try (IgniteInternalTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
GridCacheAtomicLongValue val = atomicView.get(key);
@@ -102,10 +103,10 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext
throw e;
}
}
- };
+ });
/** Callable for {@link #getAndIncrement()}. */
- private final Callable<Long> getAndIncCall = new Callable<Long>() {
+ private final Callable<Long> getAndIncCall = retryTopologySafe(new Callable<Long>() {
@Override public Long call() throws Exception {
try (IgniteInternalTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
GridCacheAtomicLongValue val = atomicView.get(key);
@@ -129,10 +130,10 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext
throw e;
}
}
- };
+ });
/** Callable for {@link #decrementAndGet()}. */
- private final Callable<Long> decAndGetCall = new Callable<Long>() {
+ private final Callable<Long> decAndGetCall = retryTopologySafe(new Callable<Long>() {
@Override public Long call() throws Exception {
try (IgniteInternalTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
GridCacheAtomicLongValue val = atomicView.get(key);
@@ -156,10 +157,10 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext
throw e;
}
}
- };
+ });
/** Callable for {@link #getAndDecrement()}. */
- private final Callable<Long> getAndDecCall = new Callable<Long>() {
+ private final Callable<Long> getAndDecCall = retryTopologySafe(new Callable<Long>() {
@Override public Long call() throws Exception {
try (IgniteInternalTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
GridCacheAtomicLongValue val = atomicView.get(key);
@@ -183,7 +184,7 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext
throw e;
}
}
- };
+ });
/**
* Empty constructor required by {@link Externalizable}.
@@ -378,7 +379,7 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext
* @return Callable for execution in async and sync mode.
*/
private Callable<Long> internalAddAndGet(final long l) {
- return new Callable<Long>() {
+ return retryTopologySafe(new Callable<Long>() {
@Override public Long call() throws Exception {
try (IgniteInternalTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
GridCacheAtomicLongValue val = atomicView.get(key);
@@ -402,7 +403,7 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext
throw e;
}
}
- };
+ });
}
/**
@@ -412,7 +413,7 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext
* @return Callable for execution in async and sync mode.
*/
private Callable<Long> internalGetAndAdd(final long l) {
- return new Callable<Long>() {
+ return retryTopologySafe(new Callable<Long>() {
@Override public Long call() throws Exception {
try (IgniteInternalTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
GridCacheAtomicLongValue val = atomicView.get(key);
@@ -436,7 +437,7 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext
throw e;
}
}
- };
+ });
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5505b4d3/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java
index e66c11e..2400a7e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java
@@ -35,6 +35,7 @@ import java.util.concurrent.locks.*;
import static java.util.concurrent.TimeUnit.*;
import static org.apache.ignite.transactions.TransactionConcurrency.*;
import static org.apache.ignite.transactions.TransactionIsolation.*;
+import static org.apache.ignite.internal.util.typedef.internal.CU.*;
/**
* Cache sequence implementation.
@@ -435,11 +436,9 @@ public final class GridCacheAtomicSequenceImpl implements GridCacheAtomicSequenc
*/
@SuppressWarnings("TooBroadScope")
private Callable<Long> internalUpdate(final long l, final boolean updated) {
- return new Callable<Long>() {
+ return retryTopologySafe(new Callable<Long>() {
@Override public Long call() throws Exception {
- IgniteInternalTx tx = CU.txStartInternal(ctx, seqView, PESSIMISTIC, REPEATABLE_READ);
-
- try {
+ try (IgniteInternalTx tx = CU.txStartInternal(ctx, seqView, PESSIMISTIC, REPEATABLE_READ)) {
GridCacheAtomicSequenceValue seq = seqView.get(key);
checkRemoved();
@@ -506,11 +505,9 @@ public final class GridCacheAtomicSequenceImpl implements GridCacheAtomicSequenc
U.error(log, "Failed to get and add: " + this, e);
throw e;
- } finally {
- tx.close();
}
}
- };
+ });
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5505b4d3/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicStampedImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicStampedImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicStampedImpl.java
index a898e58..76ea7ca 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicStampedImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicStampedImpl.java
@@ -33,6 +33,7 @@ import java.util.concurrent.*;
import static org.apache.ignite.transactions.TransactionConcurrency.*;
import static org.apache.ignite.transactions.TransactionIsolation.*;
+import static org.apache.ignite.internal.util.typedef.internal.CU.*;
/**
* Cache atomic stamped implementation.
@@ -68,7 +69,7 @@ public final class GridCacheAtomicStampedImpl<T, S> implements GridCacheAtomicSt
private GridCacheContext ctx;
/** Callable for {@link #get()} operation */
- private final Callable<IgniteBiTuple<T, S>> getCall = new Callable<IgniteBiTuple<T, S>>() {
+ private final Callable<IgniteBiTuple<T, S>> getCall = retryTopologySafe(new Callable<IgniteBiTuple<T, S>>() {
@Override public IgniteBiTuple<T, S> call() throws Exception {
GridCacheAtomicStampedValue<T, S> stmp = atomicView.get(key);
@@ -77,10 +78,10 @@ public final class GridCacheAtomicStampedImpl<T, S> implements GridCacheAtomicSt
return stmp.get();
}
- };
+ });
/** Callable for {@link #value()} operation */
- private final Callable<T> valCall = new Callable<T>() {
+ private final Callable<T> valCall = retryTopologySafe(new Callable<T>() {
@Override public T call() throws Exception {
GridCacheAtomicStampedValue<T, S> stmp = atomicView.get(key);
@@ -89,10 +90,10 @@ public final class GridCacheAtomicStampedImpl<T, S> implements GridCacheAtomicSt
return stmp.value();
}
- };
+ });
/** Callable for {@link #stamp()} operation */
- private final Callable<S> stampCall = new Callable<S>() {
+ private final Callable<S> stampCall = retryTopologySafe(new Callable<S>() {
@Override public S call() throws Exception {
GridCacheAtomicStampedValue<T, S> stmp = atomicView.get(key);
@@ -101,7 +102,7 @@ public final class GridCacheAtomicStampedImpl<T, S> implements GridCacheAtomicSt
return stmp.stamp();
}
- };
+ });
/**
* Empty constructor required by {@link Externalizable}.
@@ -254,7 +255,7 @@ public final class GridCacheAtomicStampedImpl<T, S> implements GridCacheAtomicSt
* @return Callable for execution in async and sync mode.
*/
private Callable<Boolean> internalSet(final T val, final S stamp) {
- return new Callable<Boolean>() {
+ return retryTopologySafe(new Callable<Boolean>() {
@Override public Boolean call() throws Exception {
try (IgniteInternalTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
GridCacheAtomicStampedValue<T, S> stmp = atomicView.get(key);
@@ -276,7 +277,7 @@ public final class GridCacheAtomicStampedImpl<T, S> implements GridCacheAtomicSt
throw e;
}
}
- };
+ });
}
/**
@@ -292,7 +293,7 @@ public final class GridCacheAtomicStampedImpl<T, S> implements GridCacheAtomicSt
private Callable<Boolean> internalCompareAndSet(final IgnitePredicate<T> expValPred,
final IgniteClosure<T, T> newValClos, final IgnitePredicate<S> expStampPred,
final IgniteClosure<S, S> newStampClos) {
- return new Callable<Boolean>() {
+ return retryTopologySafe(new Callable<Boolean>() {
@Override public Boolean call() throws Exception {
try (IgniteInternalTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
GridCacheAtomicStampedValue<T, S> stmp = atomicView.get(key);
@@ -323,7 +324,7 @@ public final class GridCacheAtomicStampedImpl<T, S> implements GridCacheAtomicSt
throw e;
}
}
- };
+ });
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5505b4d3/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java
index 33547d9..ea7924f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java
@@ -32,6 +32,7 @@ import java.util.concurrent.atomic.*;
import static org.apache.ignite.transactions.TransactionConcurrency.*;
import static org.apache.ignite.transactions.TransactionIsolation.*;
+import static org.apache.ignite.internal.util.typedef.internal.CU.*;
/**
* Cache count down latch implementation.
@@ -179,12 +180,7 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc
/** {@inheritDoc} */
@Override public int countDown() {
- try {
- return CU.outTx(new CountDownCallable(1), ctx);
- }
- catch (IgniteCheckedException e) {
- throw U.convertException(e);
- }
+ return countDown(1);
}
/** {@inheritDoc} */
@@ -192,7 +188,7 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc
A.ensure(val > 0, "val should be positive");
try {
- return CU.outTx(new CountDownCallable(val), ctx);
+ return CU.outTx(retryTopologySafe(new CountDownCallable(val)), ctx);
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
@@ -202,7 +198,7 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc
/** {@inheritDoc}*/
@Override public void countDownAll() {
try {
- CU.outTx(new CountDownCallable(0), ctx);
+ CU.outTx(retryTopologySafe(new CountDownCallable(0)), ctx);
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
@@ -248,7 +244,7 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc
if (initGuard.compareAndSet(false, true)) {
try {
internalLatch = CU.outTx(
- new Callable<CountDownLatch>() {
+ retryTopologySafe(new Callable<CountDownLatch>() {
@Override public CountDownLatch call() throws Exception {
try (IgniteInternalTx tx = CU.txStartInternal(ctx, latchView, PESSIMISTIC, REPEATABLE_READ)) {
GridCacheCountDownLatchValue val = latchView.get(key);
@@ -267,7 +263,7 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc
return new CountDownLatch(val.get());
}
}
- },
+ }),
ctx
);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5505b4d3/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java
index 89d1040..bfddbe7 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java
@@ -52,6 +52,19 @@ public abstract class IgniteCachePutRetryAbstractSelfTest extends GridCacheAbstr
return cfg;
}
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ AtomicConfiguration acfg = new AtomicConfiguration();
+
+ acfg.setBackups(1);
+
+ cfg.setAtomicConfiguration(acfg);
+
+ return cfg;
+ }
+
/**
* @throws Exception If failed.
*/
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5505b4d3/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java
index e65459a..91c454a 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java
@@ -17,7 +17,14 @@
package org.apache.ignite.internal.processors.cache.distributed.dht;
+import org.apache.ignite.*;
import org.apache.ignite.cache.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.testframework.*;
+
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
/**
*
@@ -32,4 +39,36 @@ public class IgniteCachePutRetryTransactionalSelfTest extends IgniteCachePutRetr
@Override protected int keysCount() {
return 20_000;
}
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testAtomicLongRetries() throws Exception {
+ final AtomicBoolean finished = new AtomicBoolean();
+
+ IgniteAtomicLong atomic = ignite(0).atomicLong("TestAtomic", 0, true);
+
+ IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() {
+ @Override
+ public Object call() throws Exception {
+ while (!finished.get()) {
+ stopGrid(3);
+
+ U.sleep(300);
+
+ startGrid(3);
+ }
+
+ return null;
+ }
+ });
+
+ int keysCnt = keysCount();
+
+ for (int i = 0; i < keysCnt; i++)
+ atomic.incrementAndGet();
+
+ finished.set(true);
+ fut.get();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5505b4d3/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java
index dda86c1..80bfbf2 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java
@@ -72,6 +72,9 @@ public class IgniteCacheFailoverTestSuite extends TestSuite {
suite.addTestSuite(IgniteCacheTxNearDisabledPutGetRestartTest.class);
suite.addTestSuite(IgniteCacheTxNearDisabledFairAffinityPutGetRestartTest.class);
+ suite.addTestSuite(IgniteCachePutRetryAtomicSelfTest.class);
+ suite.addTestSuite(IgniteCachePutRetryTransactionalSelfTest.class);
+
return suite;
}
}