You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by se...@apache.org on 2015/07/06 04:30:27 UTC
[05/50] incubator-ignite git commit: IGNITE-621 - Added automatic
retries.
IGNITE-621 - Added automatic retries.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/3787a9d3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/3787a9d3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/3787a9d3
Branch: refs/heads/ignite-959-x
Commit: 3787a9d3353c0c146141a79e3e87e1bbc5128031
Parents: 415264e
Author: Alexey Goncharuk <ag...@gridgain.com>
Authored: Fri Jun 19 17:15:02 2015 -0700
Committer: Alexey Goncharuk <ag...@gridgain.com>
Committed: Fri Jun 19 17:15:02 2015 -0700
----------------------------------------------------------------------
.../java/org/apache/ignite/IgniteCache.java | 5 +
.../processors/cache/CacheOperationContext.java | 44 +++++-
.../processors/cache/GridCacheAdapter.java | 91 +++++++------
.../processors/cache/GridCacheProxyImpl.java | 10 +-
.../processors/cache/IgniteCacheProxy.java | 36 ++++-
.../dht/atomic/GridDhtAtomicCache.java | 18 ++-
.../dht/atomic/GridNearAtomicUpdateFuture.java | 87 ++++++++++--
.../IgniteCachePutRetryAbstractSelfTest.java | 134 +++++++++++++++++++
.../dht/IgniteCachePutRetryAtomicSelfTest.java | 34 +++++
...gniteCachePutRetryTransactionalSelfTest.java | 35 +++++
10 files changed, 422 insertions(+), 72 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3787a9d3/modules/core/src/main/java/org/apache/ignite/IgniteCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteCache.java b/modules/core/src/main/java/org/apache/ignite/IgniteCache.java
index 2b97e55..c8d6d7a 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteCache.java
@@ -106,6 +106,11 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K, V>, IgniteAsyncS
public IgniteCache<K, V> withSkipStore();
/**
+ * @return Cache with no-retries behavior enabled.
+ */
+ public IgniteCache<K, V> withNoRetries();
+
+ /**
* Executes {@link #localLoadCache(IgniteBiPredicate, Object...)} on all cache nodes.
*
* @param p Optional predicate (may be {@code null}). If provided, will be used to
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3787a9d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOperationContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOperationContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOperationContext.java
index 34d2bf4..343a2f0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOperationContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOperationContext.java
@@ -36,6 +36,10 @@ public class CacheOperationContext implements Serializable {
@GridToStringInclude
private final boolean skipStore;
+ /** No retries flag. */
+ @GridToStringInclude
+ private final boolean noRetries;
+
/** Client ID which operates over this projection. */
private final UUID subjId;
@@ -56,6 +60,8 @@ public class CacheOperationContext implements Serializable {
keepPortable = false;
expiryPlc = null;
+
+ noRetries = false;
}
/**
@@ -68,7 +74,8 @@ public class CacheOperationContext implements Serializable {
boolean skipStore,
@Nullable UUID subjId,
boolean keepPortable,
- @Nullable ExpiryPolicy expiryPlc) {
+ @Nullable ExpiryPolicy expiryPlc,
+ boolean noRetries) {
this.skipStore = skipStore;
this.subjId = subjId;
@@ -76,6 +83,8 @@ public class CacheOperationContext implements Serializable {
this.keepPortable = keepPortable;
this.expiryPlc = expiryPlc;
+
+ this.noRetries = noRetries;
}
/**
@@ -95,7 +104,8 @@ public class CacheOperationContext implements Serializable {
skipStore,
subjId,
true,
- expiryPlc);
+ expiryPlc,
+ noRetries);
}
/**
@@ -118,7 +128,8 @@ public class CacheOperationContext implements Serializable {
skipStore,
subjId,
keepPortable,
- expiryPlc);
+ expiryPlc,
+ noRetries);
}
/**
@@ -139,7 +150,8 @@ public class CacheOperationContext implements Serializable {
skipStore,
subjId,
keepPortable,
- expiryPlc);
+ expiryPlc,
+ noRetries);
}
/**
@@ -160,7 +172,29 @@ public class CacheOperationContext implements Serializable {
skipStore,
subjId,
true,
- plc);
+ plc,
+ noRetries);
+ }
+
+ /**
+ * @param noRetries No retries flag.
+ * @return Operation context.
+ */
+ public CacheOperationContext setNoRetries(boolean noRetries) {
+ return new CacheOperationContext(
+ skipStore,
+ subjId,
+ keepPortable,
+ expiryPlc,
+ noRetries
+ );
+ }
+
+ /**
+ * @return No retries flag.
+ */
+ public boolean noRetries() {
+ return noRetries;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3787a9d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 7335d72..f993527 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -78,6 +78,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
/** clearLocally() split threshold. */
public static final int CLEAR_ALL_SPLIT_THRESHOLD = 10000;
+ /** Maximum number of retries when topology changes. */
+ public static final int MAX_RETRIES = 100;
+
/** Deserialization stash. */
private static final ThreadLocal<IgniteBiTuple<String, String>> stash = new ThreadLocal<IgniteBiTuple<String,
String>>() {
@@ -363,7 +366,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
/** {@inheritDoc} */
@Override public GridCacheProxyImpl<K, V> forSubjectId(UUID subjId) {
- CacheOperationContext opCtx = new CacheOperationContext(false, subjId, false, null);
+ CacheOperationContext opCtx = new CacheOperationContext(false, subjId, false, null, false);
return new GridCacheProxyImpl<>(ctx, this, opCtx);
}
@@ -375,14 +378,14 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
/** {@inheritDoc} */
@Override public GridCacheProxyImpl<K, V> setSkipStore(boolean skipStore) {
- CacheOperationContext opCtx = new CacheOperationContext(true, null, false, null);
+ CacheOperationContext opCtx = new CacheOperationContext(true, null, false, null, false);
return new GridCacheProxyImpl<>(ctx, this, opCtx);
}
/** {@inheritDoc} */
@Override public <K1, V1> GridCacheProxyImpl<K1, V1> keepPortable() {
- CacheOperationContext opCtx = new CacheOperationContext(false, null, true, null);
+ CacheOperationContext opCtx = new CacheOperationContext(false, null, true, null, false);
return new GridCacheProxyImpl<>((GridCacheContext<K1, V1>)ctx, (GridCacheAdapter<K1, V1>)this, opCtx);
}
@@ -399,7 +402,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
assert !CU.isAtomicsCache(ctx.name());
assert !CU.isMarshallerCache(ctx.name());
- CacheOperationContext opCtx = new CacheOperationContext(false, null, false, plc);
+ CacheOperationContext opCtx = new CacheOperationContext(false, null, false, plc, false);
return new GridCacheProxyImpl<>(ctx, this, opCtx);
}
@@ -2301,7 +2304,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
* @return Putx operation future.
*/
public IgniteInternalFuture<Boolean> putAsync0(final K key, final V val,
- @Nullable final CacheEntryPredicate... filter) {
+ @Nullable final CacheEntryPredicate... filter) {
A.notNull(key, "key", val, "val");
if (keyCheck)
@@ -3930,51 +3933,63 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
if (tx == null || tx.implicit()) {
TransactionConfiguration tCfg = ctx.gridConfig().getTransactionConfiguration();
- tx = ctx.tm().newTx(
- true,
- op.single(),
- ctx.systemTx() ? ctx : null,
- OPTIMISTIC,
- READ_COMMITTED,
- tCfg.getDefaultTxTimeout(),
- !ctx.skipStore(),
- 0
- );
+ CacheOperationContext opCtx = ctx.operationContextPerCall();
- assert tx != null;
+ int retries = opCtx != null && opCtx.noRetries() ? 1 : MAX_RETRIES;
- try {
- T t = op.op(tx);
+ for (int i = 0; i < retries; i++) {
+ tx = ctx.tm().newTx(
+ true,
+ op.single(),
+ ctx.systemTx() ? ctx : null,
+ OPTIMISTIC,
+ READ_COMMITTED,
+ tCfg.getDefaultTxTimeout(),
+ !ctx.skipStore(),
+ 0
+ );
- assert tx.done() : "Transaction is not done: " + tx;
+ assert tx != null;
- return t;
- }
- catch (IgniteInterruptedCheckedException | IgniteTxHeuristicCheckedException |
- IgniteTxRollbackCheckedException e) {
- throw e;
- }
- catch (IgniteCheckedException e) {
try {
- tx.rollback();
+ T t = op.op(tx);
- e = new IgniteTxRollbackCheckedException("Transaction has been rolled back: " +
- tx.xid(), e);
+ assert tx.done() : "Transaction is not done: " + tx;
+
+ return t;
+ }
+ catch (IgniteInterruptedCheckedException | IgniteTxHeuristicCheckedException |
+ IgniteTxRollbackCheckedException e) {
+ throw e;
}
- catch (IgniteCheckedException | AssertionError | RuntimeException e1) {
- U.error(log, "Failed to rollback transaction (cache may contain stale locks): " + tx, e1);
+ catch (IgniteCheckedException e) {
+ try {
+ tx.rollback();
+
+ e = new IgniteTxRollbackCheckedException("Transaction has been rolled back: " +
+ tx.xid(), e);
+ }
+ catch (IgniteCheckedException | AssertionError | RuntimeException e1) {
+ U.error(log, "Failed to rollback transaction (cache may contain stale locks): " + tx, e1);
+
+ U.addLastCause(e, e1, log);
+ }
+
+ if (X.hasCause(e, ClusterTopologyCheckedException.class) && i != retries - 1)
+ continue;
- U.addLastCause(e, e1, log);
+ throw e;
}
+ finally {
+ ctx.tm().resetContext();
- throw e;
+ if (ctx.isNear())
+ ctx.near().dht().context().tm().resetContext();
+ }
}
- finally {
- ctx.tm().resetContext();
- if (ctx.isNear())
- ctx.near().dht().context().tm().resetContext();
- }
+ // Should not happen.
+ throw new IgniteCheckedException("Failed to perform cache operation (maximum number of retries exceeded).");
}
else
return op.op(tx);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3787a9d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
index 63ba242..cec8c53 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
@@ -118,7 +118,7 @@ public class GridCacheProxyImpl<K, V> implements IgniteInternalCache<K, V>, Exte
CacheOperationContext prev = gate.enter(opCtx);
try {
- return opCtx != null ? opCtx.skipStore() : false;
+ return opCtx != null && opCtx.skipStore();
}
finally {
gate.leave(prev);
@@ -198,7 +198,7 @@ public class GridCacheProxyImpl<K, V> implements IgniteInternalCache<K, V>, Exte
/** {@inheritDoc} */
@Override public GridCacheProxyImpl<K, V> forSubjectId(UUID subjId) {
return new GridCacheProxyImpl<>(ctx, delegate,
- opCtx != null ? opCtx.forSubjectId(subjId) : new CacheOperationContext(false, subjId, false, null));
+ opCtx != null ? opCtx.forSubjectId(subjId) : new CacheOperationContext(false, subjId, false, null, false));
}
/** {@inheritDoc} */
@@ -210,7 +210,7 @@ public class GridCacheProxyImpl<K, V> implements IgniteInternalCache<K, V>, Exte
return this;
return new GridCacheProxyImpl<>(ctx, delegate,
- opCtx != null ? opCtx.setSkipStore(skipStore) : new CacheOperationContext(true, null, false, null));
+ opCtx != null ? opCtx.setSkipStore(skipStore) : new CacheOperationContext(true, null, false, null, false));
}
finally {
gate.leave(prev);
@@ -224,7 +224,7 @@ public class GridCacheProxyImpl<K, V> implements IgniteInternalCache<K, V>, Exte
return new GridCacheProxyImpl<>((GridCacheContext<K1, V1>)ctx,
(GridCacheAdapter<K1, V1>)delegate,
- opCtx != null ? opCtx.keepPortable() : new CacheOperationContext(false, null, true, null));
+ opCtx != null ? opCtx.keepPortable() : new CacheOperationContext(false, null, true, null, false));
}
/** {@inheritDoc} */
@@ -1515,7 +1515,7 @@ public class GridCacheProxyImpl<K, V> implements IgniteInternalCache<K, V>, Exte
try {
return new GridCacheProxyImpl<>(ctx, delegate,
- opCtx != null ? opCtx.withExpiryPolicy(plc) : new CacheOperationContext(false, null, false, plc));
+ opCtx != null ? opCtx.withExpiryPolicy(plc) : new CacheOperationContext(false, null, false, plc, false));
}
finally {
gate.leave(prev);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3787a9d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index 48fd259..0ad2a9a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@ -246,7 +246,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
try {
CacheOperationContext prj0 = opCtx != null ? opCtx.withExpiryPolicy(plc) :
- new CacheOperationContext(false, null, false, plc);
+ new CacheOperationContext(false, null, false, plc, false);
return new IgniteCacheProxy<>(ctx, delegate, prj0, isAsync(), lock);
}
@@ -261,6 +261,30 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
}
/** {@inheritDoc} */
+ @Override public IgniteCache<K, V> withNoRetries() {
+ CacheOperationContext prev = onEnter(opCtx);
+
+ try {
+ boolean noRetries = opCtx != null && opCtx.noRetries();
+
+ if (noRetries)
+ return this;
+
+ CacheOperationContext opCtx0 = opCtx != null ? opCtx.setNoRetries(true) :
+ new CacheOperationContext(false, null, false, null, true);
+
+ return new IgniteCacheProxy<>(ctx,
+ delegate,
+ opCtx0,
+ isAsync(),
+ lock);
+ }
+ finally {
+ onLeave(prev);
+ }
+ }
+
+ /** {@inheritDoc} */
@Override public void loadCache(@Nullable IgniteBiPredicate<K, V> p, @Nullable Object... args) {
try {
CacheOperationContext prev = onEnter(opCtx);
@@ -1498,10 +1522,11 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
try {
CacheOperationContext opCtx0 =
new CacheOperationContext(
- opCtx != null ? opCtx.skipStore() : false,
+ opCtx != null && opCtx.skipStore(),
opCtx != null ? opCtx.subjectId() : null,
true,
- opCtx != null ? opCtx.expiry() : null);
+ opCtx != null ? opCtx.expiry() : null,
+ opCtx != null && opCtx.noRetries());
return new IgniteCacheProxy<>((GridCacheContext<K1, V1>)ctx,
(GridCacheAdapter<K1, V1>)delegate,
@@ -1529,8 +1554,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
CacheOperationContext opCtx0 =
new CacheOperationContext(true,
opCtx != null ? opCtx.subjectId() : null,
- opCtx != null ? opCtx.isKeepPortable() : false,
- opCtx != null ? opCtx.expiry() : null);
+ opCtx != null && opCtx.isKeepPortable(),
+ opCtx != null ? opCtx.expiry() : null,
+ opCtx != null && opCtx.noRetries());
return new IgniteCacheProxy<>(ctx,
delegate,
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3787a9d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 8630421..2863ae8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -767,11 +767,13 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
filter,
subjId,
taskNameHash,
- opCtx != null && opCtx.skipStore());
+ opCtx != null && opCtx.skipStore(),
+ opCtx != null && opCtx.noRetries() ? 1 : MAX_RETRIES,
+ waitTopFut);
return asyncOp(new CO<IgniteInternalFuture<Object>>() {
@Override public IgniteInternalFuture<Object> apply() {
- updateFut.map(waitTopFut);
+ updateFut.map();
return updateFut;
}
@@ -830,14 +832,16 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
filter,
subjId,
taskNameHash,
- opCtx != null && opCtx.skipStore());
+ opCtx != null && opCtx.skipStore(),
+ opCtx != null && opCtx.noRetries() ? 1 : MAX_RETRIES,
+ true);
if (statsEnabled)
updateFut.listen(new UpdateRemoveTimeStatClosure<>(metrics0(), start));
return asyncOp(new CO<IgniteInternalFuture<Object>>() {
@Override public IgniteInternalFuture<Object> apply() {
- updateFut.map(true);
+ updateFut.map();
return updateFut;
}
@@ -2273,9 +2277,11 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
req.filter(),
req.subjectId(),
req.taskNameHash(),
- req.skipStore());
+ req.skipStore(),
+ MAX_RETRIES,
+ true);
- updateFut.map(true);
+ updateFut.map();
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3787a9d3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index 07f5ecf..53150cc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -90,7 +90,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
/** Mappings. */
@GridToStringInclude
- private final ConcurrentMap<UUID, GridNearAtomicUpdateRequest> mappings;
+ private ConcurrentMap<UUID, GridNearAtomicUpdateRequest> mappings;
/** Error. */
private volatile CachePartialUpdateCheckedException err;
@@ -123,7 +123,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
private GridNearAtomicUpdateRequest singleReq;
/** Raw return value flag. */
- private boolean rawRetval;
+ private final boolean rawRetval;
/** Fast map flag. */
private final boolean fastMap;
@@ -149,6 +149,12 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
/** Skip store flag. */
private final boolean skipStore;
+ /** Wait for topology future flag. */
+ private final boolean waitTopFut;
+
+ /** Remap count. */
+ private AtomicInteger remapCnt;
+
/**
* @param cctx Cache context.
* @param cache Cache instance.
@@ -183,7 +189,9 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
final CacheEntryPredicate[] filter,
UUID subjId,
int taskNameHash,
- boolean skipStore
+ boolean skipStore,
+ int remapCnt,
+ boolean waitTopFut
) {
this.rawRetval = rawRetval;
@@ -207,6 +215,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
this.subjId = subjId;
this.taskNameHash = taskNameHash;
this.skipStore = skipStore;
+ this.waitTopFut = waitTopFut;
if (log == null)
log = U.logger(cctx.kernalContext(), logRef, GridFutureAdapter.class);
@@ -218,6 +227,8 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
!(cctx.writeThrough() && cctx.config().getInterceptor() != null);
nearEnabled = CU.isNearEnabled(cctx);
+
+ this.remapCnt = new AtomicInteger(remapCnt);
}
/** {@inheritDoc} */
@@ -295,10 +306,8 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
/**
* Performs future mapping.
- *
- * @param waitTopFut Whether to wait for topology future.
*/
- public void map(boolean waitTopFut) {
+ public void map() {
AffinityTopologyVersion topVer = null;
IgniteInternalTx tx = cctx.tm().anyActiveThreadTx();
@@ -310,14 +319,62 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
topVer = cctx.mvcc().lastExplicitLockTopologyVersion(Thread.currentThread().getId());
if (topVer == null)
- mapOnTopology(null, false, null, waitTopFut);
+ mapOnTopology(null, false, null);
else {
topLocked = true;
+ // Cannot remap.
+ remapCnt.set(1);
+
map0(topVer, null, false, null);
}
}
+ /**
+ * @param failed Keys to remap.
+ */
+ private void remap(Collection<?> failed) {
+ if (futVer != null)
+ cctx.mvcc().removeAtomicFuture(version());
+
+ Collection<Object> remapKeys = new ArrayList<>(failed.size());
+ Collection<Object> remapVals = new ArrayList<>(failed.size());
+
+ Iterator<?> keyIt = keys.iterator();
+ Iterator<?> valsIt = vals.iterator();
+
+ for (Object key : failed) {
+ while (keyIt.hasNext()) {
+ Object nextKey = keyIt.next();
+ Object nextVal = valsIt.next();
+
+ if (F.eq(key, nextKey)) {
+ remapKeys.add(nextKey);
+ remapVals.add(nextVal);
+
+ break;
+ }
+ }
+ }
+
+ keys = remapKeys;
+ vals = remapVals;
+
+ mappings = new ConcurrentHashMap8<>(keys.size(), 1.0f);
+ single = null;
+ futVer = null;
+ err = null;
+ opRes = null;
+ topVer = AffinityTopologyVersion.ZERO;
+ singleNodeId = null;
+ singleReq = null;
+ fastMapRemap = false;
+ updVer = null;
+ topLocked = false;
+
+ map();
+ }
+
/** {@inheritDoc} */
@SuppressWarnings("ConstantConditions")
@Override public boolean onDone(@Nullable Object res, @Nullable Throwable err) {
@@ -331,6 +388,12 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
if (op == TRANSFORM && retval == null)
retval = Collections.emptyMap();
+ if (err != null && X.hasCause(err, CachePartialUpdateCheckedException.class) && remapCnt.decrementAndGet() > 0) {
+ remap(X.cause(err, CachePartialUpdateCheckedException.class).failedKeys());
+
+ return false;
+ }
+
if (super.onDone(retval, err)) {
if (futVer != null)
cctx.mvcc().removeAtomicFuture(version());
@@ -353,7 +416,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
Collection<KeyCacheObject> remapKeys = fastMap ? null : res.remapKeys();
- mapOnTopology(remapKeys, true, nodeId, true);
+ mapOnTopology(remapKeys, true, nodeId);
return;
}
@@ -431,10 +494,8 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
* @param keys Keys to map.
* @param remap Boolean flag indicating if this is partial future remap.
* @param oldNodeId Old node ID if remap.
- * @param waitTopFut Whether to wait for topology future.
*/
- private void mapOnTopology(final Collection<?> keys, final boolean remap, final UUID oldNodeId,
- final boolean waitTopFut) {
+ private void mapOnTopology(final Collection<?> keys, final boolean remap, final UUID oldNodeId) {
cache.topology().readLock();
AffinityTopologyVersion topVer = null;
@@ -465,7 +526,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
@Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
cctx.kernalContext().closure().runLocalSafe(new Runnable() {
@Override public void run() {
- mapOnTopology(keys, remap, oldNodeId, waitTopFut);
+ mapOnTopology(keys, remap, oldNodeId);
}
});
}
@@ -509,7 +570,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
}
if (remap)
- mapOnTopology(null, true, null, true);
+ mapOnTopology(null, true, null);
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3787a9d3/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java
new file mode 100644
index 0000000..89d1040
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import org.apache.ignite.cache.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.cluster.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.testframework.*;
+
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+/**
+ *
+ */
+public abstract class IgniteCachePutRetryAbstractSelfTest extends GridCacheAbstractSelfTest {
+ /** {@inheritDoc} */
+ @Override protected int gridCount() {
+ return 4;
+ }
+
+ /**
+ * @return Keys count for the test.
+ */
+ protected abstract int keysCount();
+
+ /** {@inheritDoc} */
+ @Override protected CacheConfiguration cacheConfiguration(String gridName) throws Exception {
+ CacheConfiguration cfg = super.cacheConfiguration(gridName);
+
+ cfg.setBackups(1);
+
+ return cfg;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPut() throws Exception {
+ final AtomicBoolean finished = new AtomicBoolean();
+
+ IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ while (!finished.get()) {
+ stopGrid(3);
+
+ U.sleep(300);
+
+ startGrid(3);
+ }
+
+ return null;
+ }
+ });
+
+ int keysCnt = keysCount();
+
+ for (int i = 0; i < keysCnt; i++)
+ ignite(0).cache(null).put(i, i);
+
+ finished.set(true);
+ fut.get();
+
+ for (int i = 0; i < keysCnt; i++)
+ assertEquals(i, ignite(0).cache(null).get(i));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testFailWithNoRetries() throws Exception {
+ final AtomicBoolean finished = new AtomicBoolean();
+
+ IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ while (!finished.get()) {
+ stopGrid(3);
+
+ U.sleep(300);
+
+ startGrid(3);
+ }
+
+ return null;
+ }
+ });
+
+ int keysCnt = keysCount();
+
+ boolean exceptionThrown = false;
+
+ for (int i = 0; i < keysCnt; i++) {
+ try {
+ ignite(0).cache(null).withNoRetries().put(i, i);
+ }
+ catch (Exception e) {
+ assertTrue("Invalid exception: " + e, X.hasCause(e, ClusterTopologyCheckedException.class) || X.hasCause(e, CachePartialUpdateException.class));
+
+ exceptionThrown = true;
+
+ break;
+ }
+ }
+
+ assertTrue(exceptionThrown);
+
+ finished.set(true);
+ fut.get();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected long getTestTimeout() {
+ return 3 * 60 * 1000;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3787a9d3/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAtomicSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAtomicSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAtomicSelfTest.java
new file mode 100644
index 0000000..e76663a
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAtomicSelfTest.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import org.apache.ignite.cache.*;
+
+/**
+ *
+ */
+public class IgniteCachePutRetryAtomicSelfTest extends IgniteCachePutRetryAbstractSelfTest {
+ /** {@inheritDoc} */
+ @Override protected CacheAtomicityMode atomicityMode() {
+ return CacheAtomicityMode.ATOMIC;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected int keysCount() {
+ return 60_000;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3787a9d3/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java
new file mode 100644
index 0000000..e65459a
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import org.apache.ignite.cache.*;
+
+/**
+ *
+ */
+public class IgniteCachePutRetryTransactionalSelfTest extends IgniteCachePutRetryAbstractSelfTest {
+ /** {@inheritDoc} */
+ @Override protected CacheAtomicityMode atomicityMode() {
+ return CacheAtomicityMode.TRANSACTIONAL;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected int keysCount() {
+ return 20_000;
+ }
+}