You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/08/26 12:34:47 UTC
[14/16] ignite git commit: # ignite-1124
# ignite-1124
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4109bf4c
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4109bf4c
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4109bf4c
Branch: refs/heads/ignite-1124
Commit: 4109bf4c47a3c727666ec5b49bd90d63b8e38846
Parents: ca03efd
Author: sboikov <sb...@gridgain.com>
Authored: Wed Aug 26 12:25:37 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Aug 26 12:25:37 2015 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheMvccManager.java | 62 +-
.../dht/atomic/GridDhtAtomicCache.java | 6 +-
.../dht/atomic/GridNearAtomicUpdateFuture.java | 1550 +++++++++---------
.../util/future/GridCompoundFuture.java | 3 +-
.../apache/ignite/internal/util/typedef/X.java | 1 +
.../IgniteCachePutRetryAbstractSelfTest.java | 271 ++-
.../dht/IgniteCachePutRetryAtomicSelfTest.java | 77 +-
...gniteCachePutRetryTransactionalSelfTest.java | 14 +-
.../tcp/IgniteCacheSslStartStopSelfTest.java | 9 +-
9 files changed, 1095 insertions(+), 898 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/4109bf4c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
index 6a8c6fe..bbac42b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
@@ -196,8 +196,12 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
cacheFut.onNodeLeft(discoEvt.eventNode().id());
- if (cacheFut.isCancelled() || cacheFut.isDone())
- atomicFuts.remove(cacheFut.futureId(), fut);
+ if (cacheFut.isCancelled() || cacheFut.isDone()) {
+ GridCacheVersion futVer = cacheFut.version();
+
+ if (futVer != null)
+ atomicFuts.remove(futVer, fut);
+ }
}
}
}
@@ -347,16 +351,6 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
}
/**
- * @param fut Future to check.
- * @return {@code True} if future is registered.
- */
- public boolean hasFuture(GridCacheFuture<?> fut) {
- assert fut != null;
-
- return future(fut.version(), fut.futureId()) != null;
- }
-
- /**
* @param futVer Future ID.
* @param fut Future.
*/
@@ -565,6 +559,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
* @param ver Version.
* @return All futures for given lock version.
*/
+ @SuppressWarnings("unchecked")
public <T> Collection<? extends IgniteInternalFuture<T>> futures(GridCacheVersion ver) {
Collection c = futs.get(ver);
@@ -572,6 +567,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
}
/**
+ * @param cacheCtx Cache context.
* @param ver Lock version to check.
* @return {@code True} if lock had been removed.
*/
@@ -580,6 +576,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
}
/**
+ * @param cacheCtx Cache context.
* @param ver Obsolete entry version.
* @return {@code True} if added.
*/
@@ -688,27 +685,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
}
/**
- * @param keys Keys.
- * @param base Base version.
- * @return Versions that are less than {@code base} whose keys are in the {@code keys} collection.
- */
- public Collection<GridCacheVersion> localDhtPendingVersions(Collection<KeyCacheObject> keys, GridCacheVersion base) {
- Collection<GridCacheVersion> lessPending = new GridLeanSet<>(5);
-
- for (GridCacheMvccCandidate cand : dhtLocCands) {
- if (cand.version().isLess(base)) {
- if (keys.contains(cand.key()))
- lessPending.add(cand.version());
- }
- else
- break;
- }
-
- return lessPending;
- }
-
- /**
- *
+ * @param cacheCtx Cache context.
* @param cand Cache lock candidate to add.
* @return {@code True} if added as a result of this operation,
* {@code false} if was previously added.
@@ -924,24 +901,6 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
X.println(">>> finishFutsSize: " + finishFuts.size());
}
-
- /**
- * @param nodeId Node ID.
- * @return Filter.
- */
- private IgnitePredicate<GridCacheMvccCandidate> nodeIdFilter(final UUID nodeId) {
- if (nodeId == null)
- return F.alwaysTrue();
-
- return new P1<GridCacheMvccCandidate>() {
- @Override public boolean apply(GridCacheMvccCandidate c) {
- UUID otherId = c.otherNodeId();
-
- return c.nodeId().equals(nodeId) || (otherId != null && otherId.equals(nodeId));
- }
- };
- }
-
/**
* @param topVer Topology version.
* @return Future that signals when all locks for given partitions are released.
@@ -994,6 +953,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
*
* @return Finish update future.
*/
+ @SuppressWarnings("unchecked")
public IgniteInternalFuture<?> finishAtomicUpdates(AffinityTopologyVersion topVer) {
GridCompoundFuture<Object, Object> res = new GridCompoundFuture<>();
http://git-wip-us.apache.org/repos/asf/ignite/blob/4109bf4c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index c44b028..073737d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -135,6 +135,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
@SuppressWarnings("ThrowableResultOfMethodCallIgnored")
@Override public void apply(GridNearAtomicUpdateRequest req, GridNearAtomicUpdateResponse res) {
if (ctx.config().getAtomicWriteOrderMode() == CLOCK) {
+ assert req.writeSynchronizationMode() != FULL_ASYNC : req;
+
// Always send reply in CLOCK ordering mode.
sendNearUpdateReply(res.nodeId(), res);
@@ -2243,6 +2245,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
* @param req Request to remap.
*/
private void remapToNewPrimary(GridNearAtomicUpdateRequest req) {
+ assert req.writeSynchronizationMode() == FULL_ASYNC : req;
+
if (log.isDebugEnabled())
log.debug("Remapping near update request locally: " + req);
@@ -2275,7 +2279,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
drRmvVals = null;
}
else {
- assert req.operation() == DELETE;
+ assert req.operation() == DELETE : req;
drRmvVals = req.conflictVersions();
http://git-wip-us.apache.org/repos/asf/ignite/blob/4109bf4c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index 8a2f073..d0c8766 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -35,11 +35,9 @@ import org.apache.ignite.internal.util.typedef.*;
import org.apache.ignite.internal.util.typedef.internal.*;
import org.apache.ignite.lang.*;
import org.jetbrains.annotations.*;
-import org.jsr166.*;
import javax.cache.expiry.*;
import java.util.*;
-import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.*;
@@ -124,1011 +122,1011 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
private final boolean waitTopFut;
/** Remap count. */
- private AtomicInteger remapCnt;
+ private int remapCnt;
/** State. */
private final UpdateState state;
/**
- *
+ * @param cctx Cache context.
+ * @param cache Cache instance.
+ * @param syncMode Write synchronization mode.
+ * @param op Update operation.
+ * @param keys Keys to update.
+ * @param vals Values or transform closure.
+ * @param invokeArgs Optional arguments for entry processor.
+ * @param conflictPutVals Conflict put values (optional).
+ * @param conflictRmvVals Conflict remove values (optional).
+ * @param retval Return value require flag.
+ * @param rawRetval {@code True} if should return {@code GridCacheReturn} as future result.
+ * @param expiryPlc Expiry policy explicitly specified for cache operation.
+ * @param filter Entry filter.
+ * @param subjId Subject ID.
+ * @param taskNameHash Task name hash code.
+ * @param skipStore Skip store flag.
+ * @param remapCnt Maximum number of retries.
+ * @param waitTopFut If {@code false} does not wait for affinity change future.
*/
- private class UpdateState {
- /** */
- private AffinityTopologyVersion topVer = AffinityTopologyVersion.ZERO;
+ public GridNearAtomicUpdateFuture(
+ GridCacheContext cctx,
+ GridDhtAtomicCache cache,
+ CacheWriteSynchronizationMode syncMode,
+ GridCacheOperation op,
+ Collection<?> keys,
+ @Nullable Collection<?> vals,
+ @Nullable Object[] invokeArgs,
+ @Nullable Collection<GridCacheDrInfo> conflictPutVals,
+ @Nullable Collection<GridCacheVersion> conflictRmvVals,
+ final boolean retval,
+ final boolean rawRetval,
+ @Nullable ExpiryPolicy expiryPlc,
+ final CacheEntryPredicate[] filter,
+ UUID subjId,
+ int taskNameHash,
+ boolean skipStore,
+ int remapCnt,
+ boolean waitTopFut
+ ) {
+ this.rawRetval = rawRetval;
- /** */
- private GridCacheVersion updVer;
+ assert vals == null || vals.size() == keys.size();
+ assert conflictPutVals == null || conflictPutVals.size() == keys.size();
+ assert conflictRmvVals == null || conflictRmvVals.size() == keys.size();
+ assert subjId != null;
- /** */
- private AffinityTopologyVersion remapErrTopVer;
+ this.cctx = cctx;
+ this.cache = cache;
+ this.syncMode = syncMode;
+ this.op = op;
+ this.keys = keys;
+ this.vals = vals;
+ this.invokeArgs = invokeArgs;
+ this.conflictPutVals = conflictPutVals;
+ this.conflictRmvVals = conflictRmvVals;
+ this.retval = retval;
+ this.expiryPlc = expiryPlc;
+ this.filter = filter;
+ this.subjId = subjId;
+ this.taskNameHash = taskNameHash;
+ this.skipStore = skipStore;
+ this.waitTopFut = waitTopFut;
- /** Mappings. */
- @GridToStringInclude
- private Map<UUID, GridNearAtomicUpdateRequest> mappings;
+ if (log == null)
+ log = U.logger(cctx.kernalContext(), logRef, GridFutureAdapter.class);
- /** Error. */
- private CachePartialUpdateCheckedException err;
+ fastMap = F.isEmpty(filter) && op != TRANSFORM && cctx.config().getWriteSynchronizationMode() == FULL_SYNC &&
+ cctx.config().getAtomicWriteOrderMode() == CLOCK &&
+ !(cctx.writeThrough() && cctx.config().getInterceptor() != null);
- /** Future ID. */
- private GridCacheVersion futVer;
+ nearEnabled = CU.isNearEnabled(cctx);
- /** Completion future for a particular topology version. */
- private GridFutureAdapter<Void> topCompleteFut;
+ if (!waitTopFut)
+ remapCnt = 1;
- /** */
- private Collection<KeyCacheObject> remapKeys;
+ this.remapCnt = remapCnt;
- /** */
- private GridNearAtomicUpdateRequest singleReq;
+ state = new UpdateState();
+ }
- /**
- * @param failedKeys Failed keys.
- * @param topVer Topology version for failed update.
- * @param err Error cause.
- */
- private void addFailedKeys(Collection<KeyCacheObject> failedKeys,
- AffinityTopologyVersion topVer,
- Throwable err) {
- CachePartialUpdateCheckedException err0 = this.err;
+ /** {@inheritDoc} */
+ @Override public IgniteUuid futureId() {
+ throw new UnsupportedOperationException();
+ }
- if (err0 == null)
- err0 = this.err = new CachePartialUpdateCheckedException("Failed to update keys (retry update if possible).");
+ /** {@inheritDoc} */
+ @Override public GridCacheVersion version() {
+ return state.futureVersion();
+ }
- Collection<Object> keys = new ArrayList<>(failedKeys.size());
+ /** {@inheritDoc} */
+ @Override public Collection<? extends ClusterNode> nodes() {
+ throw new UnsupportedOperationException();
+ }
- for (KeyCacheObject key : failedKeys)
- keys.add(key.value(cctx.cacheObjectContext(), false));
+ /**
+ * @return {@code True} if this future should block partition map exchange.
+ */
+ private boolean waitForPartitionExchange() {
+ // Wait fast-map near atomic update futures in CLOCK mode.
+ return fastMap;
+ }
- err0.add(keys, err, topVer);
- }
+ /** {@inheritDoc} */
+ @Override public Collection<?> keys() {
+ return keys;
+ }
- synchronized IgniteUuid futureId() {
- return futVer.asGridUuid();
- }
+ /** {@inheritDoc} */
+ @Override public boolean onNodeLeft(UUID nodeId) {
+ state.onNodeLeft(nodeId);
- synchronized GridCacheVersion futureVersion() {
- return futVer;
- }
+ return false;
+ }
- /**
- * @param nodeId Left node ID.
- */
- void onNodeLeft(UUID nodeId) {
- GridNearAtomicUpdateResponse res = null;
+ /** {@inheritDoc} */
+ @Override public boolean trackable() {
+ return true;
+ }
- synchronized (this) {
- GridNearAtomicUpdateRequest req;
+ /** {@inheritDoc} */
+ @Override public void markNotTrackable() {
+ // No-op.
+ }
- if (singleReq != null)
- req = singleReq.nodeId().equals(nodeId) ? singleReq : null;
- else
- req = mappings != null ? mappings.get(nodeId) : null;
+ /**
+ * Performs future mapping.
+ */
+ public void map() {
+ AffinityTopologyVersion topVer = null;
- if (req != null) {
- res = new GridNearAtomicUpdateResponse(cctx.cacheId(), nodeId, req.futureVersion());
+ IgniteInternalTx tx = cctx.tm().anyActiveThreadTx();
- res.addFailedKeys(req.keys(),
- new ClusterTopologyCheckedException("Primary node left grid before response is received: " + nodeId));
- }
- }
+ if (tx != null && tx.topologyVersionSnapshot() != null)
+ topVer = tx.topologyVersionSnapshot();
- if (res != null)
- onResult(nodeId, res, true);
- }
+ if (topVer == null)
+ topVer = cctx.mvcc().lastExplicitLockTopologyVersion(Thread.currentThread().getId());
- /**
- * @param ret Result from single node.
- */
- @SuppressWarnings("unchecked")
- private void addInvokeResults(GridCacheReturn ret) {
- assert op == TRANSFORM : op;
- assert ret.value() == null || ret.value() instanceof Map : ret.value();
+ if (topVer == null)
+ mapOnTopology();
+ else {
+ topLocked = true;
- if (ret.value() != null) {
- if (opRes != null)
- opRes.mergeEntryProcessResults(ret);
- else
- opRes = ret;
- }
- }
+ // Cannot remap.
+ remapCnt = 1;
- /**
- * @param nodeId Node ID.
- * @param res Response.
- */
- void onResult(UUID nodeId, GridNearAtomicUpdateResponse res, boolean nodeErr) {
- GridNearAtomicUpdateRequest req;
+ state.map(topVer);
+ }
+ }
- AffinityTopologyVersion errTopVer = null;
+ /** {@inheritDoc} */
+ @Override public IgniteInternalFuture<Void> completeFuture(AffinityTopologyVersion topVer) {
+ if (waitForPartitionExchange()) {
+ GridFutureAdapter<Void> fut = state.completeFuture(topVer);
- GridCacheReturn opRes0 = null;
- CachePartialUpdateCheckedException err0 = null;
+ if (fut != null && isDone()) {
+ fut.onDone();
- boolean rcvAll;
+ return null;
+ }
- GridFutureAdapter<?> fut0 = null;
+ return fut;
+ }
- synchronized (this) {
- if (!res.futureVersion().equals(futVer))
- return;
+ return null;
+ }
- if (singleReq != null) {
- if (!singleReq.nodeId().equals(nodeId))
- return;
+ /** {@inheritDoc} */
+ @SuppressWarnings("ConstantConditions")
+ @Override public boolean onDone(@Nullable Object res, @Nullable Throwable err) {
+ assert res == null || res instanceof GridCacheReturn;
- req = singleReq;
+ GridCacheReturn ret = (GridCacheReturn)res;
- singleReq = null;
+ Object retval =
+ res == null ? null : rawRetval ? ret : (this.retval || op == TRANSFORM) ? ret.value() : ret.success();
- rcvAll = true;
- }
- else {
- req = mappings != null ? mappings.remove(nodeId) : null;
+ if (op == TRANSFORM && retval == null)
+ retval = Collections.emptyMap();
- if (req != null)
- rcvAll = mappings.isEmpty();
- else
- return;
- }
+ if (super.onDone(retval, err)) {
+ GridCacheVersion futVer = state.onFutureDone();
- assert req != null && req.topologyVersion().equals(topVer) : req;
+ if (futVer != null)
+ cctx.mvcc().removeAtomicFuture(futVer);
- if (res.remapKeys() != null) {
- assert !fastMap || cctx.kernalContext().clientNode();
+ return true;
+ }
- if (remapKeys == null)
- remapKeys = U.newHashSet(res.remapKeys().size());
+ return false;
+ }
- remapKeys.addAll(res.remapKeys());
-
- if (remapErrTopVer == null || remapErrTopVer.compareTo(req.topologyVersion()) < 0)
- remapErrTopVer = req.topologyVersion();
- }
- else if (res.error() != null) {
- addFailedKeys(res.failedKeys(), req.topologyVersion(), res.error());
- }
- else {
- if (!req.fastMap() || req.hasPrimary()) {
- GridCacheReturn ret = res.returnValue();
-
- if (op == TRANSFORM) {
- if (ret != null)
- addInvokeResults(ret);
- }
- else
- opRes = ret;
- }
- }
-
- if (rcvAll) {
- if (remapKeys != null) {
- assert remapErrTopVer != null;
-
- errTopVer = remapErrTopVer;
- }
- else {
- if (err != null &&
- X.hasCause(err, CachePartialUpdateCheckedException.class) &&
- X.hasCause(err, ClusterTopologyCheckedException.class) &&
- storeFuture() &&
- remapCnt.decrementAndGet() > 0) {
- ClusterTopologyCheckedException topErr =
- X.cause(err, ClusterTopologyCheckedException.class);
-
- if (!(topErr instanceof ClusterTopologyServerNotFoundException)) {
- CachePartialUpdateCheckedException cause =
- X.cause(err, CachePartialUpdateCheckedException.class);
-
- assert cause != null && cause.topologyVersion() != null : err;
-
- errTopVer = cause.topologyVersion();
-
- err = null;
-
- Collection<Object> failedKeys = cause.failedKeys();
+ /**
+ * Response callback.
+ *
+ * @param nodeId Node ID.
+ * @param res Update response.
+ */
+ public void onResult(UUID nodeId, GridNearAtomicUpdateResponse res) {
+ state.onResult(nodeId, res, false);
+ }
- remapKeys = new ArrayList<>(failedKeys.size());
+ /**
+ * Updates near cache.
+ *
+ * @param req Update request.
+ * @param res Update response.
+ */
+ private void updateNear(GridNearAtomicUpdateRequest req, GridNearAtomicUpdateResponse res) {
+ if (!nearEnabled || !req.hasPrimary())
+ return;
- for (Object key : failedKeys)
- remapKeys.add(cctx.toCacheKeyObject(key));
+ GridNearAtomicCache near = (GridNearAtomicCache)cctx.dht().near();
- updVer = null;
- }
- }
- }
+ near.processNearAtomicUpdateResponse(req, res);
+ }
- if (errTopVer == null) {
- err0 = err;
- opRes0 = opRes;
- }
- else {
- fut0 = topCompleteFut;
+ /**
+ * Maps future on ready topology.
+ */
+ private void mapOnTopology() {
+ cache.topology().readLock();
- topCompleteFut = null;
+ AffinityTopologyVersion topVer = null;
- cctx.mvcc().removeAtomicFuture(futVer);
+ try {
+ if (cache.topology().stopping()) {
+ onDone(new IgniteCheckedException("Failed to perform cache operation (cache is stopped): " +
+ cache.name()));
- futVer = null;
- topVer = AffinityTopologyVersion.ZERO;
- }
- }
+ return;
}
- if (!nodeErr && res.remapKeys() == null)
- updateNear(req, res);
+ GridDhtTopologyFuture fut = cache.topology().topologyVersionFuture();
- if (errTopVer != null) {
- if (fut0 != null)
- fut0.onDone();
+ if (fut.isDone()) {
+ if (!fut.isCacheTopologyValid(cctx)) {
+ onDone(new IgniteCheckedException("Failed to perform cache operation (cache topology is not valid): " +
+ cctx.name()));
- if (errTopVer == AffinityTopologyVersion.NONE)
- mapOnTopology();
- else {
- IgniteInternalFuture<?> fut = cctx.affinity().affinityReadyFuture(errTopVer.topologyVersion() + 1);
+ return;
+ }
- fut.listen(new CI1<IgniteInternalFuture<?>>() {
- @Override public void apply(final IgniteInternalFuture<?> fut) {
+ topVer = fut.topologyVersion();
+ }
+ else {
+ if (waitTopFut) {
+ fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
+ @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
cctx.kernalContext().closure().runLocalSafe(new Runnable() {
@Override public void run() {
- try {
- fut.get();
-
- mapOnTopology();
- }
- catch (IgniteCheckedException e) {
- onDone(e);
- }
+ mapOnTopology();
}
});
}
});
}
+ else
+ onDone(new GridCacheTryPutFailedException());
return;
}
-
- if (rcvAll)
- onDone(opRes0, err0);
}
-
- /**
- * @param req Request.
- * @param e Error.
- */
- void onSendError(GridNearAtomicUpdateRequest req, IgniteCheckedException e) {
- synchronized (this) {
- GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(cctx.cacheId(),
- req.nodeId(),
- req.futureVersion());
-
- res.addFailedKeys(req.keys(), e);
-
- onResult(req.nodeId(), res, true);
- }
+ finally {
+ cache.topology().readUnlock();
}
- /**
- * @param topNodes Cache nodes.
- * @return Mapping.
- * @throws Exception If failed.
- */
- Map<UUID, GridNearAtomicUpdateRequest> mapUpdate(Collection<ClusterNode> topNodes) throws Exception {
- Iterator<?> it = null;
+ state.map(topVer);
+ }
- if (vals != null)
- it = vals.iterator();
+ /**
+ * @return {@code True} future is stored by {@link GridCacheMvccManager#addAtomicFuture}.
+ */
+ private boolean storeFuture() {
+ return cctx.config().getAtomicWriteOrderMode() == CLOCK || syncMode != FULL_ASYNC;
+ }
- Iterator<GridCacheDrInfo> conflictPutValsIt = null;
+ /**
+ * Maps key to nodes. If filters are absent and operation is not TRANSFORM, then we can assign version on near
+ * node and send updates in parallel to all participating nodes.
+ *
+ * @param key Key to map.
+ * @param topVer Topology version to map.
+ * @param fastMap Flag indicating whether mapping is performed for fast-circuit update.
+ * @return Collection of nodes to which key is mapped.
+ */
+ private Collection<ClusterNode> mapKey(
+ KeyCacheObject key,
+ AffinityTopologyVersion topVer,
+ boolean fastMap
+ ) {
+ GridCacheAffinityManager affMgr = cctx.affinity();
- if (conflictPutVals != null)
- conflictPutValsIt = conflictPutVals.iterator();
+ // If we can send updates in parallel - do it.
+ return fastMap ?
+ cctx.topology().nodes(affMgr.partition(key), topVer) :
+ Collections.singletonList(affMgr.primary(key, topVer));
+ }
- Iterator<GridCacheVersion> conflictRmvValsIt = null;
+ /**
+ * Maps future to single node.
+ *
+ * @param nodeId Node ID.
+ * @param req Request.
+ */
+ private void mapSingle(UUID nodeId, GridNearAtomicUpdateRequest req) {
+ if (cctx.localNodeId().equals(nodeId)) {
+ cache.updateAllAsyncInternal(nodeId, req,
+ new CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse>() {
+ @Override public void apply(GridNearAtomicUpdateRequest req, GridNearAtomicUpdateResponse res) {
+ onResult(res.nodeId(), res);
+ }
+ });
+ }
+ else {
+ try {
+ if (log.isDebugEnabled())
+ log.debug("Sending near atomic update request [nodeId=" + req.nodeId() + ", req=" + req + ']');
- if (conflictRmvVals != null)
- conflictRmvValsIt = conflictRmvVals.iterator();
+ cctx.io().send(req.nodeId(), req, cctx.ioPolicy());
- Map<UUID, GridNearAtomicUpdateRequest> pendingMappings = U.newHashMap(topNodes.size());
+ if (syncMode == FULL_ASYNC)
+ onDone(new GridCacheReturn(cctx, true, null, true));
+ }
+ catch (IgniteCheckedException e) {
+ state.onSendError(req, e);
+ }
+ }
+ }
- // Create mappings first, then send messages.
- for (Object key : keys) {
- if (key == null)
- throw new NullPointerException("Null key.");
+ /**
+ * Sends messages to remote nodes and updates local cache.
+ *
+ * @param mappings Mappings to send.
+ */
+ private void doUpdate(Map<UUID, GridNearAtomicUpdateRequest> mappings) {
+ UUID locNodeId = cctx.localNodeId();
- Object val;
- GridCacheVersion conflictVer;
- long conflictTtl;
- long conflictExpireTime;
+ GridNearAtomicUpdateRequest locUpdate = null;
- if (vals != null) {
- val = it.next();
- conflictVer = null;
- conflictTtl = CU.TTL_NOT_CHANGED;
- conflictExpireTime = CU.EXPIRE_TIME_CALCULATE;
+ // Send messages to remote nodes first, then run local update.
+ for (GridNearAtomicUpdateRequest req : mappings.values()) {
+ if (locNodeId.equals(req.nodeId())) {
+ assert locUpdate == null : "Cannot have more than one local mapping [locUpdate=" + locUpdate +
+ ", req=" + req + ']';
- if (val == null)
- throw new NullPointerException("Null value.");
- }
- else if (conflictPutVals != null) {
- GridCacheDrInfo conflictPutVal = conflictPutValsIt.next();
+ locUpdate = req;
+ }
+ else {
+ try {
+ if (log.isDebugEnabled())
+ log.debug("Sending near atomic update request [nodeId=" + req.nodeId() + ", req=" + req + ']');
- val = conflictPutVal.value();
- conflictVer = conflictPutVal.version();
- conflictTtl = conflictPutVal.ttl();
- conflictExpireTime = conflictPutVal.expireTime();
+ cctx.io().send(req.nodeId(), req, cctx.ioPolicy());
}
- else if (conflictRmvVals != null) {
- val = null;
- conflictVer = conflictRmvValsIt.next();
- conflictTtl = CU.TTL_NOT_CHANGED;
- conflictExpireTime = CU.EXPIRE_TIME_CALCULATE;
- }
- else {
- val = null;
- conflictVer = null;
- conflictTtl = CU.TTL_NOT_CHANGED;
- conflictExpireTime = CU.EXPIRE_TIME_CALCULATE;
+ catch (IgniteCheckedException e) {
+ state.onSendError(req, e);
}
+ }
+ }
- if (val == null && op != GridCacheOperation.DELETE)
- continue;
+ if (locUpdate != null) {
+ cache.updateAllAsyncInternal(cctx.localNodeId(), locUpdate,
+ new CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse>() {
+ @Override public void apply(GridNearAtomicUpdateRequest req, GridNearAtomicUpdateResponse res) {
+ onResult(res.nodeId(), res);
+ }
+ });
+ }
- KeyCacheObject cacheKey = cctx.toCacheKeyObject(key);
+ if (syncMode == FULL_ASYNC)
+ onDone(new GridCacheReturn(cctx, true, null, true));
+ }
- if (remapKeys != null && !remapKeys.contains(cacheKey))
- continue;
+ /**
+ *
+ */
+ private class UpdateState {
+ /** */
+ private AffinityTopologyVersion topVer = AffinityTopologyVersion.ZERO;
- if (op != TRANSFORM)
- val = cctx.toCacheObject(val);
+ /** */
+ private GridCacheVersion updVer;
- Collection<ClusterNode> affNodes = mapKey(cacheKey, topVer, fastMap);
+ /** */
+ private AffinityTopologyVersion mapErrTopVer;
- if (affNodes.isEmpty())
- throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
- "(all partition nodes left the grid).");
+ /** Mappings. */
+ @GridToStringInclude
+ private Map<UUID, GridNearAtomicUpdateRequest> mappings;
- int i = 0;
+ /** Error. */
+ private CachePartialUpdateCheckedException err;
- for (ClusterNode affNode : affNodes) {
- if (affNode == null)
- throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
- "(all partition nodes left the grid).");
+ /** Future ID. */
+ private GridCacheVersion futVer;
- UUID nodeId = affNode.id();
+ /** Completion future for a particular topology version. */
+ private GridFutureAdapter<Void> topCompleteFut;
- GridNearAtomicUpdateRequest mapped = pendingMappings.get(nodeId);
+ /** */
+ private Collection<KeyCacheObject> remapKeys;
- if (mapped == null) {
- mapped = new GridNearAtomicUpdateRequest(
- cctx.cacheId(),
- nodeId,
- futVer,
- fastMap,
- updVer,
- topVer,
- topLocked,
- syncMode,
- op,
- retval,
- expiryPlc,
- invokeArgs,
- filter,
- subjId,
- taskNameHash,
- skipStore,
- cctx.kernalContext().clientNode());
+ /** */
+ private GridNearAtomicUpdateRequest singleReq;
- pendingMappings.put(nodeId, mapped);
- }
+ /**
+ * @param failedKeys Failed keys.
+ * @param topVer Topology version for failed update.
+ * @param err Error cause.
+ */
+ private void addFailedKeys(Collection<KeyCacheObject> failedKeys,
+ AffinityTopologyVersion topVer,
+ Throwable err) {
+ CachePartialUpdateCheckedException err0 = this.err;
- mapped.addUpdateEntry(cacheKey, val, conflictTtl, conflictExpireTime, conflictVer, i == 0);
+ if (err0 == null)
+ err0 = this.err = new CachePartialUpdateCheckedException("Failed to update keys (retry update if possible).");
- i++;
- }
- }
+ Collection<Object> keys = new ArrayList<>(failedKeys.size());
- return pendingMappings;
+ for (KeyCacheObject key : failedKeys)
+ keys.add(key.value(cctx.cacheObjectContext(), false));
+
+ err0.add(keys, err, topVer);
}
/**
- * @return Request.
- * @throws Exception If failed.
+ * @return Future version.
*/
- GridNearAtomicUpdateRequest mapSingleUpdate() throws Exception {
- Object key = F.first(keys);
-
- Object val;
- GridCacheVersion conflictVer;
- long conflictTtl;
- long conflictExpireTime;
-
- if (vals != null) {
- // Regular PUT.
- val = F.first(vals);
- conflictVer = null;
- conflictTtl = CU.TTL_NOT_CHANGED;
- conflictExpireTime = CU.EXPIRE_TIME_CALCULATE;
- }
- else if (conflictPutVals != null) {
- // Conflict PUT.
- GridCacheDrInfo conflictPutVal = F.first(conflictPutVals);
-
- val = conflictPutVal.value();
- conflictVer = conflictPutVal.version();
- conflictTtl = conflictPutVal.ttl();
- conflictExpireTime = conflictPutVal.expireTime();
- }
- else if (conflictRmvVals != null) {
- // Conflict REMOVE.
- val = null;
- conflictVer = F.first(conflictRmvVals);
- conflictTtl = CU.TTL_NOT_CHANGED;
- conflictExpireTime = CU.EXPIRE_TIME_CALCULATE;
- }
- else {
- // Regular REMOVE.
- val = null;
- conflictVer = null;
- conflictTtl = CU.TTL_NOT_CHANGED;
- conflictExpireTime = CU.EXPIRE_TIME_CALCULATE;
- }
-
- // We still can get here if user pass map with single element.
- if (key == null)
- throw new NullPointerException("Null key.");
+ @Nullable synchronized GridCacheVersion futureVersion() {
+ return futVer;
+ }
- if (val == null && op != GridCacheOperation.DELETE)
- throw new NullPointerException("Null value.");
+ /**
+ * @param nodeId Left node ID.
+ */
+ void onNodeLeft(UUID nodeId) {
+ GridNearAtomicUpdateResponse res = null;
- KeyCacheObject cacheKey = cctx.toCacheKeyObject(key);
+ synchronized (this) {
+ GridNearAtomicUpdateRequest req;
- if (op != TRANSFORM)
- val = cctx.toCacheObject(val);
+ if (singleReq != null)
+ req = singleReq.nodeId().equals(nodeId) ? singleReq : null;
+ else
+ req = mappings != null ? mappings.get(nodeId) : null;
- ClusterNode primary = cctx.affinity().primary(cacheKey, topVer);
+ if (req != null) {
+ res = new GridNearAtomicUpdateResponse(cctx.cacheId(), nodeId, req.futureVersion());
- if (primary == null)
- throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all partition nodes " +
- "left the grid).");
+ res.addFailedKeys(req.keys(),
+ new ClusterTopologyCheckedException("Primary node left grid before response is received: " + nodeId));
+ }
+ }
- GridNearAtomicUpdateRequest req = new GridNearAtomicUpdateRequest(
- cctx.cacheId(),
- primary.id(),
- futVer,
- fastMap,
- updVer,
- topVer,
- topLocked,
- syncMode,
- op,
- retval,
- expiryPlc,
- invokeArgs,
- filter,
- subjId,
- taskNameHash,
- skipStore,
- cctx.kernalContext().clientNode());
+ if (res != null)
+ onResult(nodeId, res, true);
+ }
- req.addUpdateEntry(cacheKey,
- val,
- conflictTtl,
- conflictExpireTime,
- conflictVer,
- true);
+ /**
+ * @param ret Result from single node.
+ */
+ @SuppressWarnings("unchecked")
+ private void addInvokeResults(GridCacheReturn ret) {
+ assert op == TRANSFORM : op;
+ assert ret.value() == null || ret.value() instanceof Map : ret.value();
- return req;
+ if (ret.value() != null) {
+ if (opRes != null)
+ opRes.mergeEntryProcessResults(ret);
+ else
+ opRes = ret;
+ }
}
/**
- * @param topVer Topology version.
+ * @param nodeId Node ID.
+ * @param res Response.
+ * @param nodeErr {@code True} if response was created on node failure.
*/
- void map(AffinityTopologyVersion topVer) {
- Collection<ClusterNode> topNodes = CU.affinityNodes(cctx, topVer);
+ void onResult(UUID nodeId, GridNearAtomicUpdateResponse res, boolean nodeErr) {
+ GridNearAtomicUpdateRequest req;
- if (F.isEmpty(topNodes)) {
- onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all partition nodes " +
- "left the grid)."));
+ AffinityTopologyVersion remapTopVer = null;
- return;
- }
+ GridCacheReturn opRes0 = null;
+ CachePartialUpdateCheckedException err0 = null;
- Exception err = null;
- Map<UUID, GridNearAtomicUpdateRequest> pendingMappings = null;
+ boolean rcvAll;
- int size = keys.size();
+ GridFutureAdapter<?> fut0 = null;
synchronized (this) {
- assert futVer == null : this;
- assert this.topVer == AffinityTopologyVersion.ZERO : this;
-
- this.topVer = topVer;
+ if (!res.futureVersion().equals(futVer))
+ return;
- futVer = cctx.versions().next(topVer);
+ if (singleReq != null) {
+ if (!singleReq.nodeId().equals(nodeId))
+ return;
- if (storeFuture())
- cctx.mvcc().addAtomicFuture(futVer, GridNearAtomicUpdateFuture.this);
+ req = singleReq;
- // Assign version on near node in CLOCK ordering mode even if fastMap is false.
- if (updVer == null)
- updVer = cctx.config().getAtomicWriteOrderMode() == CLOCK ? cctx.versions().next(topVer) : null;
+ singleReq = null;
- if (updVer != null && log.isDebugEnabled())
- log.debug("Assigned fast-map version for update on near node: " + updVer);
+ rcvAll = true;
+ }
+ else {
+ req = mappings != null ? mappings.remove(nodeId) : null;
- try {
- if (size == 1 && !fastMap) {
- assert remapKeys == null || remapKeys.size() == 1;
+ if (req != null)
+ rcvAll = mappings.isEmpty();
+ else
+ return;
+ }
- singleReq = mapSingleUpdate();
+ assert req != null && req.topologyVersion().equals(topVer) : req;
+
+ if (res.remapKeys() != null) {
+ assert !fastMap || cctx.kernalContext().clientNode();
+
+ if (remapKeys == null)
+ remapKeys = U.newHashSet(res.remapKeys().size());
+
+ remapKeys.addAll(res.remapKeys());
+
+ if (mapErrTopVer == null || mapErrTopVer.compareTo(req.topologyVersion()) < 0)
+ mapErrTopVer = req.topologyVersion();
+ }
+ else if (res.error() != null)
+ addFailedKeys(res.failedKeys(), req.topologyVersion(), res.error());
+ else {
+ if (!req.fastMap() || req.hasPrimary()) {
+ GridCacheReturn ret = res.returnValue();
+
+ if (op == TRANSFORM) {
+ if (ret != null)
+ addInvokeResults(ret);
+ }
+ else
+ opRes = ret;
+ }
+ }
+
+ if (rcvAll) {
+ if (remapKeys != null) {
+ assert mapErrTopVer != null;
+
+ remapTopVer = new AffinityTopologyVersion(mapErrTopVer.topologyVersion() + 1);
}
else {
- pendingMappings = mapUpdate(topNodes);
+ if (err != null &&
+ X.hasCause(err, CachePartialUpdateCheckedException.class) &&
+ X.hasCause(err, ClusterTopologyCheckedException.class) &&
+ storeFuture() &&
+ --remapCnt > 0) {
+ ClusterTopologyCheckedException topErr =
+ X.cause(err, ClusterTopologyCheckedException.class);
- if (pendingMappings.size() == 1)
- singleReq = F.firstValue(pendingMappings);
- else {
- if (syncMode == PRIMARY_SYNC) {
- mappings = U.newHashMap(pendingMappings.size());
+ if (!(topErr instanceof ClusterTopologyServerNotFoundException)) {
+ CachePartialUpdateCheckedException cause =
+ X.cause(err, CachePartialUpdateCheckedException.class);
- for (GridNearAtomicUpdateRequest req : pendingMappings.values()) {
- if (req.hasPrimary())
- mappings.put(req.nodeId(), req);
- }
- }
- else
- mappings = new HashMap<>(pendingMappings);
+ assert cause != null && cause.topologyVersion() != null : err;
- assert !mappings.isEmpty() || size == 0 : GridNearAtomicUpdateFuture.this;
+ remapTopVer =
+ new AffinityTopologyVersion(cause.topologyVersion().topologyVersion() + 1);
+
+ err = null;
+
+ Collection<Object> failedKeys = cause.failedKeys();
+
+ remapKeys = new ArrayList<>(failedKeys.size());
+
+ for (Object key : failedKeys)
+ remapKeys.add(cctx.toCacheKeyObject(key));
+
+ updVer = null;
+ }
}
}
- remapKeys = null;
- }
- catch (Exception e) {
- err = e;
+ if (remapTopVer == null) {
+ err0 = err;
+ opRes0 = opRes;
+ }
+ else {
+ fut0 = topCompleteFut;
+
+ topCompleteFut = null;
+
+ cctx.mvcc().removeAtomicFuture(futVer);
+
+ futVer = null;
+ topVer = AffinityTopologyVersion.ZERO;
+ }
}
}
- if (err != null) {
- onDone(err);
+ if (!nodeErr && res.remapKeys() == null)
+ updateNear(req, res);
+
+ if (remapTopVer != null) {
+ if (fut0 != null)
+ fut0.onDone();
+
+ IgniteInternalFuture<?> fut = cctx.affinity().affinityReadyFuture(remapTopVer);
+
+ fut.listen(new CI1<IgniteInternalFuture<?>>() {
+ @Override public void apply(final IgniteInternalFuture<?> fut) {
+ cctx.kernalContext().closure().runLocalSafe(new Runnable() {
+ @Override public void run() {
+ try {
+ fut.get();
+
+ mapOnTopology();
+ }
+ catch (IgniteCheckedException e) {
+ onDone(e);
+ }
+ }
+ });
+ }
+ });
return;
}
- // Optimize mapping for single key.
- if (singleReq != null)
- mapSingle(singleReq.nodeId(), singleReq);
- else {
- assert pendingMappings != null;
+ assert fut0 == null;
- if (size == 0)
- onDone(new GridCacheReturn(cctx, true, null, true));
- else
- doUpdate(pendingMappings);
- }
+ if (rcvAll)
+ onDone(opRes0, err0);
}
/**
- * @param topVer Topology version.
- * @return Future.
+ * @param req Request.
+ * @param e Error.
*/
- @Nullable synchronized GridFutureAdapter<Void> completeFuture(AffinityTopologyVersion topVer) {
- if (this.topVer == AffinityTopologyVersion.ZERO)
- return null;
+ void onSendError(GridNearAtomicUpdateRequest req, IgniteCheckedException e) {
+ synchronized (this) {
+ GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(cctx.cacheId(),
+ req.nodeId(),
+ req.futureVersion());
- if (this.topVer.compareTo(topVer) < 0) {
- if (topCompleteFut == null)
- topCompleteFut = new GridFutureAdapter<>();
+ res.addFailedKeys(req.keys(), e);
- return topCompleteFut;
+ onResult(req.nodeId(), res, true);
}
-
- return null;
}
/**
- * @return Future version.
+ * @param topNodes Cache nodes.
+ * @return Mapping.
+ * @throws Exception If failed.
*/
- GridCacheVersion onFutureDone() {
- GridCacheVersion ver0;
-
- GridFutureAdapter<Void> fut0;
+ Map<UUID, GridNearAtomicUpdateRequest> mapUpdate(Collection<ClusterNode> topNodes) throws Exception {
+ Iterator<?> it = null;
- synchronized (this) {
- fut0 = topCompleteFut;
+ if (vals != null)
+ it = vals.iterator();
- topCompleteFut = null;
+ Iterator<GridCacheDrInfo> conflictPutValsIt = null;
- ver0 = futVer;
+ if (conflictPutVals != null)
+ conflictPutValsIt = conflictPutVals.iterator();
- futVer = null;
- }
+ Iterator<GridCacheVersion> conflictRmvValsIt = null;
- if (fut0 != null)
- fut0.onDone();
+ if (conflictRmvVals != null)
+ conflictRmvValsIt = conflictRmvVals.iterator();
- return ver0;
- }
+ Map<UUID, GridNearAtomicUpdateRequest> pendingMappings = U.newHashMap(topNodes.size());
- /** {@inheritDoc} */
- @Override public synchronized String toString() {
- return S.toString(UpdateState.class, this);
- }
- }
+ // Create mappings first, then send messages.
+ for (Object key : keys) {
+ if (key == null)
+ throw new NullPointerException("Null key.");
- /**
- * @param cctx Cache context.
- * @param cache Cache instance.
- * @param syncMode Write synchronization mode.
- * @param op Update operation.
- * @param keys Keys to update.
- * @param vals Values or transform closure.
- * @param invokeArgs Optional arguments for entry processor.
- * @param conflictPutVals Conflict put values (optional).
- * @param conflictRmvVals Conflict remove values (optional).
- * @param retval Return value require flag.
- * @param rawRetval {@code True} if should return {@code GridCacheReturn} as future result.
- * @param expiryPlc Expiry policy explicitly specified for cache operation.
- * @param filter Entry filter.
- * @param subjId Subject ID.
- * @param taskNameHash Task name hash code.
- * @param skipStore Skip store flag.
- */
- public GridNearAtomicUpdateFuture(
- GridCacheContext cctx,
- GridDhtAtomicCache cache,
- CacheWriteSynchronizationMode syncMode,
- GridCacheOperation op,
- Collection<?> keys,
- @Nullable Collection<?> vals,
- @Nullable Object[] invokeArgs,
- @Nullable Collection<GridCacheDrInfo> conflictPutVals,
- @Nullable Collection<GridCacheVersion> conflictRmvVals,
- final boolean retval,
- final boolean rawRetval,
- @Nullable ExpiryPolicy expiryPlc,
- final CacheEntryPredicate[] filter,
- UUID subjId,
- int taskNameHash,
- boolean skipStore,
- int remapCnt,
- boolean waitTopFut
- ) {
- this.rawRetval = rawRetval;
+ Object val;
+ GridCacheVersion conflictVer;
+ long conflictTtl;
+ long conflictExpireTime;
- assert vals == null || vals.size() == keys.size();
- assert conflictPutVals == null || conflictPutVals.size() == keys.size();
- assert conflictRmvVals == null || conflictRmvVals.size() == keys.size();
- assert subjId != null;
+ if (vals != null) {
+ val = it.next();
+ conflictVer = null;
+ conflictTtl = CU.TTL_NOT_CHANGED;
+ conflictExpireTime = CU.EXPIRE_TIME_CALCULATE;
- this.cctx = cctx;
- this.cache = cache;
- this.syncMode = syncMode;
- this.op = op;
- this.keys = keys;
- this.vals = vals;
- this.invokeArgs = invokeArgs;
- this.conflictPutVals = conflictPutVals;
- this.conflictRmvVals = conflictRmvVals;
- this.retval = retval;
- this.expiryPlc = expiryPlc;
- this.filter = filter;
- this.subjId = subjId;
- this.taskNameHash = taskNameHash;
- this.skipStore = skipStore;
- this.waitTopFut = waitTopFut;
+ if (val == null)
+ throw new NullPointerException("Null value.");
+ }
+ else if (conflictPutVals != null) {
+ GridCacheDrInfo conflictPutVal = conflictPutValsIt.next();
- if (log == null)
- log = U.logger(cctx.kernalContext(), logRef, GridFutureAdapter.class);
+ val = conflictPutVal.value();
+ conflictVer = conflictPutVal.version();
+ conflictTtl = conflictPutVal.ttl();
+ conflictExpireTime = conflictPutVal.expireTime();
+ }
+ else if (conflictRmvVals != null) {
+ val = null;
+ conflictVer = conflictRmvValsIt.next();
+ conflictTtl = CU.TTL_NOT_CHANGED;
+ conflictExpireTime = CU.EXPIRE_TIME_CALCULATE;
+ }
+ else {
+ val = null;
+ conflictVer = null;
+ conflictTtl = CU.TTL_NOT_CHANGED;
+ conflictExpireTime = CU.EXPIRE_TIME_CALCULATE;
+ }
- fastMap = F.isEmpty(filter) && op != TRANSFORM && cctx.config().getWriteSynchronizationMode() == FULL_SYNC &&
- cctx.config().getAtomicWriteOrderMode() == CLOCK &&
- !(cctx.writeThrough() && cctx.config().getInterceptor() != null);
+ if (val == null && op != GridCacheOperation.DELETE)
+ continue;
- nearEnabled = CU.isNearEnabled(cctx);
+ KeyCacheObject cacheKey = cctx.toCacheKeyObject(key);
- if (!waitTopFut)
- remapCnt = 1;
+ if (remapKeys != null && !remapKeys.contains(cacheKey))
+ continue;
- this.remapCnt = new AtomicInteger(remapCnt);
+ if (op != TRANSFORM)
+ val = cctx.toCacheObject(val);
- state = new UpdateState();
- }
+ Collection<ClusterNode> affNodes = mapKey(cacheKey, topVer, fastMap);
- /** {@inheritDoc} */
- @Override public IgniteUuid futureId() {
- return state.futureId();
- }
+ if (affNodes.isEmpty())
+ throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
+ "(all partition nodes left the grid).");
- /** {@inheritDoc} */
- @Override public GridCacheVersion version() {
- return state.futureVersion();
- }
+ int i = 0;
- /** {@inheritDoc} */
- @Override public Collection<? extends ClusterNode> nodes() {
- throw new UnsupportedOperationException();
- }
+ for (ClusterNode affNode : affNodes) {
+ if (affNode == null)
+ throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
+ "(all partition nodes left the grid).");
- /**
- * @return {@code True} if this future should block partition map exchange.
- */
- private boolean waitForPartitionExchange() {
- // Wait fast-map near atomic update futures in CLOCK mode.
- return fastMap;
- }
+ UUID nodeId = affNode.id();
- /** {@inheritDoc} */
- @Override public Collection<?> keys() {
- return keys;
- }
+ GridNearAtomicUpdateRequest mapped = pendingMappings.get(nodeId);
- /** {@inheritDoc} */
- @Override public boolean onNodeLeft(UUID nodeId) {
- state.onNodeLeft(nodeId);
+ if (mapped == null) {
+ mapped = new GridNearAtomicUpdateRequest(
+ cctx.cacheId(),
+ nodeId,
+ futVer,
+ fastMap,
+ updVer,
+ topVer,
+ topLocked,
+ syncMode,
+ op,
+ retval,
+ expiryPlc,
+ invokeArgs,
+ filter,
+ subjId,
+ taskNameHash,
+ skipStore,
+ cctx.kernalContext().clientNode());
- return false;
- }
+ pendingMappings.put(nodeId, mapped);
+ }
- /** {@inheritDoc} */
- @Override public boolean trackable() {
- return true;
- }
+ mapped.addUpdateEntry(cacheKey, val, conflictTtl, conflictExpireTime, conflictVer, i == 0);
- /** {@inheritDoc} */
- @Override public void markNotTrackable() {
- // No-op.
- }
+ i++;
+ }
+ }
- /**
- * Performs future mapping.
- */
- public void map() {
- AffinityTopologyVersion topVer = null;
+ return pendingMappings;
+ }
- IgniteInternalTx tx = cctx.tm().anyActiveThreadTx();
+ /**
+ * @return Request.
+ * @throws Exception If failed.
+ */
+ GridNearAtomicUpdateRequest mapSingleUpdate() throws Exception {
+ Object key = F.first(keys);
- if (tx != null && tx.topologyVersionSnapshot() != null)
- topVer = tx.topologyVersionSnapshot();
+ Object val;
+ GridCacheVersion conflictVer;
+ long conflictTtl;
+ long conflictExpireTime;
- if (topVer == null)
- topVer = cctx.mvcc().lastExplicitLockTopologyVersion(Thread.currentThread().getId());
+ if (vals != null) {
+ // Regular PUT.
+ val = F.first(vals);
+ conflictVer = null;
+ conflictTtl = CU.TTL_NOT_CHANGED;
+ conflictExpireTime = CU.EXPIRE_TIME_CALCULATE;
+ }
+ else if (conflictPutVals != null) {
+ // Conflict PUT.
+ GridCacheDrInfo conflictPutVal = F.first(conflictPutVals);
- if (topVer == null)
- mapOnTopology();
- else {
- topLocked = true;
+ val = conflictPutVal.value();
+ conflictVer = conflictPutVal.version();
+ conflictTtl = conflictPutVal.ttl();
+ conflictExpireTime = conflictPutVal.expireTime();
+ }
+ else if (conflictRmvVals != null) {
+ // Conflict REMOVE.
+ val = null;
+ conflictVer = F.first(conflictRmvVals);
+ conflictTtl = CU.TTL_NOT_CHANGED;
+ conflictExpireTime = CU.EXPIRE_TIME_CALCULATE;
+ }
+ else {
+ // Regular REMOVE.
+ val = null;
+ conflictVer = null;
+ conflictTtl = CU.TTL_NOT_CHANGED;
+ conflictExpireTime = CU.EXPIRE_TIME_CALCULATE;
+ }
- // Cannot remap.
- remapCnt.set(1);
+ // We still can get here if user pass map with single element.
+ if (key == null)
+ throw new NullPointerException("Null key.");
- state.map(topVer);
- }
- }
+ if (val == null && op != GridCacheOperation.DELETE)
+ throw new NullPointerException("Null value.");
- /** {@inheritDoc} */
- @Override public IgniteInternalFuture<Void> completeFuture(AffinityTopologyVersion topVer) {
- if (waitForPartitionExchange()) {
- GridFutureAdapter<Void> fut = state.completeFuture(topVer);
+ KeyCacheObject cacheKey = cctx.toCacheKeyObject(key);
- if (fut != null && isDone()) {
- fut.onDone();
+ if (op != TRANSFORM)
+ val = cctx.toCacheObject(val);
- return null;
- }
+ ClusterNode primary = cctx.affinity().primary(cacheKey, topVer);
- return fut;
- }
+ if (primary == null)
+ throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all partition nodes " +
+ "left the grid).");
- return null;
- }
+ GridNearAtomicUpdateRequest req = new GridNearAtomicUpdateRequest(
+ cctx.cacheId(),
+ primary.id(),
+ futVer,
+ fastMap,
+ updVer,
+ topVer,
+ topLocked,
+ syncMode,
+ op,
+ retval,
+ expiryPlc,
+ invokeArgs,
+ filter,
+ subjId,
+ taskNameHash,
+ skipStore,
+ cctx.kernalContext().clientNode());
- /** {@inheritDoc} */
- @SuppressWarnings("ConstantConditions")
- @Override public boolean onDone(@Nullable Object res, @Nullable Throwable err) {
- assert res == null || res instanceof GridCacheReturn;
+ req.addUpdateEntry(cacheKey,
+ val,
+ conflictTtl,
+ conflictExpireTime,
+ conflictVer,
+ true);
- GridCacheReturn ret = (GridCacheReturn)res;
+ return req;
+ }
- Object retval =
- res == null ? null : rawRetval ? ret : (this.retval || op == TRANSFORM) ? ret.value() : ret.success();
+ /**
+ * @param topVer Topology version.
+ */
+ void map(AffinityTopologyVersion topVer) {
+ Collection<ClusterNode> topNodes = CU.affinityNodes(cctx, topVer);
- if (op == TRANSFORM && retval == null)
- retval = Collections.emptyMap();
+ if (F.isEmpty(topNodes)) {
+ onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all partition nodes " +
+ "left the grid)."));
- if (super.onDone(retval, err)) {
- GridCacheVersion futVer = state.onFutureDone();
+ return;
+ }
- if (futVer != null)
- cctx.mvcc().removeAtomicFuture(futVer);
+ Exception err = null;
+ Map<UUID, GridNearAtomicUpdateRequest> pendingMappings = null;
- return true;
- }
+ int size = keys.size();
- return false;
- }
+ synchronized (this) {
+ assert futVer == null : this;
+ assert this.topVer == AffinityTopologyVersion.ZERO : this;
- /**
- * Response callback.
- *
- * @param nodeId Node ID.
- * @param res Update response.
- */
- public void onResult(UUID nodeId, GridNearAtomicUpdateResponse res) {
- state.onResult(nodeId, res, false);
- }
+ this.topVer = topVer;
- /**
- * Updates near cache.
- *
- * @param req Update request.
- * @param res Update response.
- */
- private void updateNear(GridNearAtomicUpdateRequest req, GridNearAtomicUpdateResponse res) {
- if (!nearEnabled || !req.hasPrimary())
- return;
+ futVer = cctx.versions().next(topVer);
- GridNearAtomicCache near = (GridNearAtomicCache)cctx.dht().near();
+ if (storeFuture())
+ cctx.mvcc().addAtomicFuture(futVer, GridNearAtomicUpdateFuture.this);
- near.processNearAtomicUpdateResponse(req, res);
- }
+ // Assign version on near node in CLOCK ordering mode even if fastMap is false.
+ if (updVer == null)
+ updVer = cctx.config().getAtomicWriteOrderMode() == CLOCK ? cctx.versions().next(topVer) : null;
- /**
- * Maps future on ready topology.
- */
- private void mapOnTopology() {
- cache.topology().readLock();
+ if (updVer != null && log.isDebugEnabled())
+ log.debug("Assigned fast-map version for update on near node: " + updVer);
- AffinityTopologyVersion topVer = null;
+ try {
+ if (size == 1 && !fastMap) {
+ assert remapKeys == null || remapKeys.size() == 1;
- try {
- if (cache.topology().stopping()) {
- onDone(new IgniteCheckedException("Failed to perform cache operation (cache is stopped): " +
- cache.name()));
+ singleReq = mapSingleUpdate();
+ }
+ else {
+ pendingMappings = mapUpdate(topNodes);
- return;
- }
+ if (pendingMappings.size() == 1)
+ singleReq = F.firstValue(pendingMappings);
+ else {
+ if (syncMode == PRIMARY_SYNC) {
+ mappings = U.newHashMap(pendingMappings.size());
- GridDhtTopologyFuture fut = cache.topology().topologyVersionFuture();
+ for (GridNearAtomicUpdateRequest req : pendingMappings.values()) {
+ if (req.hasPrimary())
+ mappings.put(req.nodeId(), req);
+ }
+ }
+ else
+ mappings = new HashMap<>(pendingMappings);
- if (fut.isDone()) {
- if (!fut.isCacheTopologyValid(cctx)) {
- onDone(new IgniteCheckedException("Failed to perform cache operation (cache topology is not valid): " +
- cctx.name()));
+ assert !mappings.isEmpty() || size == 0 : GridNearAtomicUpdateFuture.this;
+ }
+ }
- return;
+ remapKeys = null;
}
+ catch (Exception e) {
+ err = e;
+ }
+ }
- topVer = fut.topologyVersion();
+ if (err != null) {
+ onDone(err);
+
+ return;
}
+
+ // Optimize mapping for single key.
+ if (singleReq != null)
+ mapSingle(singleReq.nodeId(), singleReq);
else {
- if (waitTopFut) {
- fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
- @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
- cctx.kernalContext().closure().runLocalSafe(new Runnable() {
- @Override public void run() {
- mapOnTopology();
- }
- });
- }
- });
- }
- else
- onDone(new GridCacheTryPutFailedException());
+ assert pendingMappings != null;
- return;
+ if (size == 0)
+ onDone(new GridCacheReturn(cctx, true, null, true));
+ else
+ doUpdate(pendingMappings);
}
}
- finally {
- cache.topology().readUnlock();
- }
-
- state.map(topVer);
- }
- /**
- * @return {@code True} future is stored by {@link GridCacheMvccManager#addAtomicFuture}.
- */
- private boolean storeFuture() {
- return cctx.config().getAtomicWriteOrderMode() == CLOCK || syncMode != FULL_ASYNC;
- }
+ /**
+ * @param topVer Topology version.
+ * @return Future.
+ */
+ @Nullable synchronized GridFutureAdapter<Void> completeFuture(AffinityTopologyVersion topVer) {
+ if (this.topVer == AffinityTopologyVersion.ZERO)
+ return null;
- /**
- * Maps key to nodes. If filters are absent and operation is not TRANSFORM, then we can assign version on near
- * node and send updates in parallel to all participating nodes.
- *
- * @param key Key to map.
- * @param topVer Topology version to map.
- * @param fastMap Flag indicating whether mapping is performed for fast-circuit update.
- * @return Collection of nodes to which key is mapped.
- */
- private Collection<ClusterNode> mapKey(
- KeyCacheObject key,
- AffinityTopologyVersion topVer,
- boolean fastMap
- ) {
- GridCacheAffinityManager affMgr = cctx.affinity();
+ if (this.topVer.compareTo(topVer) < 0) {
+ if (topCompleteFut == null)
+ topCompleteFut = new GridFutureAdapter<>();
- // If we can send updates in parallel - do it.
- return fastMap ?
- cctx.topology().nodes(affMgr.partition(key), topVer) :
- Collections.singletonList(affMgr.primary(key, topVer));
- }
+ return topCompleteFut;
+ }
- /**
- * Maps future to single node.
- *
- * @param nodeId Node ID.
- * @param req Request.
- */
- private void mapSingle(UUID nodeId, GridNearAtomicUpdateRequest req) {
- if (cctx.localNodeId().equals(nodeId)) {
- cache.updateAllAsyncInternal(nodeId, req,
- new CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse>() {
- @Override public void apply(GridNearAtomicUpdateRequest req, GridNearAtomicUpdateResponse res) {
- onResult(res.nodeId(), res);
- }
- });
+ return null;
}
- else {
- try {
- if (log.isDebugEnabled())
- log.debug("Sending near atomic update request [nodeId=" + req.nodeId() + ", req=" + req + ']');
- cctx.io().send(req.nodeId(), req, cctx.ioPolicy());
+ /**
+ * @return Future version.
+ */
+ GridCacheVersion onFutureDone() {
+ GridCacheVersion ver0;
- if (syncMode == FULL_ASYNC)
- onDone(new GridCacheReturn(cctx, true, null, true));
- }
- catch (IgniteCheckedException e) {
- state.onSendError(req, e);
- }
- }
- }
+ GridFutureAdapter<Void> fut0;
- /**
- * Sends messages to remote nodes and updates local cache.
- *
- * @param mappings Mappings to send.
- */
- private void doUpdate(Map<UUID, GridNearAtomicUpdateRequest> mappings) {
- UUID locNodeId = cctx.localNodeId();
+ synchronized (this) {
+ fut0 = topCompleteFut;
- GridNearAtomicUpdateRequest locUpdate = null;
+ topCompleteFut = null;
- // Send messages to remote nodes first, then run local update.
- for (GridNearAtomicUpdateRequest req : mappings.values()) {
- if (locNodeId.equals(req.nodeId())) {
- assert locUpdate == null : "Cannot have more than one local mapping [locUpdate=" + locUpdate +
- ", req=" + req + ']';
+ ver0 = futVer;
- locUpdate = req;
+ futVer = null;
}
- else {
- try {
- if (log.isDebugEnabled())
- log.debug("Sending near atomic update request [nodeId=" + req.nodeId() + ", req=" + req + ']');
- cctx.io().send(req.nodeId(), req, cctx.ioPolicy());
- }
- catch (IgniteCheckedException e) {
- state.onSendError(req, e);
- }
- }
- }
+ if (fut0 != null)
+ fut0.onDone();
- if (locUpdate != null) {
- cache.updateAllAsyncInternal(cctx.localNodeId(), locUpdate,
- new CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse>() {
- @Override public void apply(GridNearAtomicUpdateRequest req, GridNearAtomicUpdateResponse res) {
- onResult(res.nodeId(), res);
- }
- });
+ return ver0;
}
- if (syncMode == FULL_ASYNC)
- onDone(new GridCacheReturn(cctx, true, null, true));
+ /** {@inheritDoc} */
+ @Override public synchronized String toString() {
+ return S.toString(UpdateState.class, this);
+ }
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/4109bf4c/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java
index 2064338..d56ed7c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java
@@ -128,7 +128,8 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> {
/**
* @param ignoreChildFailures Flag indicating whether compound future should ignore child futures failures.
*/
- public void ignoreChildFailures(Class<? extends Throwable>... ignoreChildFailures) {
+ @SafeVarargs
+ public final void ignoreChildFailures(Class<? extends Throwable>... ignoreChildFailures) {
this.ignoreChildFailures = ignoreChildFailures;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/4109bf4c/modules/core/src/main/java/org/apache/ignite/internal/util/typedef/X.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/typedef/X.java b/modules/core/src/main/java/org/apache/ignite/internal/util/typedef/X.java
index d5c5314..fc9dad0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/typedef/X.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/typedef/X.java
@@ -421,6 +421,7 @@ public final class X {
* @return {@code True} if one of the causing exception is an instance of passed in classes,
* {@code false} otherwise.
*/
+ @SafeVarargs
public static boolean hasCause(@Nullable Throwable t, @Nullable Class<? extends Throwable>... cls) {
if (t == null || F.isEmpty(cls))
return false;