You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/02/03 10:47:06 UTC
[3/8] ignite git commit: IGNITE-2532: WIP. Only refactorings for now.
http://git-wip-us.apache.org/repos/asf/ignite/blob/29c2aee6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index 8b1673f..2aa510d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -62,8 +62,8 @@ import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRA
/**
* DHT atomic cache near update future.
*/
-public class GridNearAtomicUpdateFuture extends GridNearAbstractAtomicUpdateFuture
- {
+@SuppressWarnings("ThrowableResultOfMethodCallIgnored")
+public class GridNearAtomicUpdateFuture extends GridNearAbstractAtomicUpdateFuture {
/** Keys */
private Collection<?> keys;
@@ -79,8 +79,39 @@ public class GridNearAtomicUpdateFuture extends GridNearAbstractAtomicUpdateFutu
@SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"})
private Collection<GridCacheVersion> conflictRmvVals;
- /** State. */
- private final UpdateState state;
+ /** Current topology version. */
+ private AffinityTopologyVersion topVer = AffinityTopologyVersion.ZERO;
+
+ /** */
+ private GridCacheVersion updVer;
+
+ /** Topology version when got mapping error. */
+ private AffinityTopologyVersion mapErrTopVer;
+
+ /** Mappings if operations is mapped to more than one node. */
+ @GridToStringInclude
+ private Map<UUID, GridNearAtomicUpdateRequest> mappings;
+
+ /** */
+ private int resCnt;
+
+ /** Error. */
+ private CachePartialUpdateCheckedException err;
+
+ /** Future ID. */
+ private GridCacheVersion futVer;
+
+ /** Completion future for a particular topology version. */
+ private GridFutureAdapter<Void> topCompleteFut;
+
+ /** Keys to remap. */
+ private Collection<KeyCacheObject> remapKeys;
+
+ /** Not null is operation is mapped to single node. */
+ private GridNearAtomicUpdateRequest singleReq;
+
+ /** Operation result. */
+ private GridCacheReturn opRes;
/**
* @param cctx Cache context.
@@ -130,55 +161,22 @@ public class GridNearAtomicUpdateFuture extends GridNearAbstractAtomicUpdateFutu
assert vals == null || vals.size() == keys.size();
assert conflictPutVals == null || conflictPutVals.size() == keys.size();
assert conflictRmvVals == null || conflictRmvVals.size() == keys.size();
- assert subjId != null;
this.keys = keys;
this.vals = vals;
this.conflictPutVals = conflictPutVals;
this.conflictRmvVals = conflictRmvVals;
-
- state = new UpdateState();
- }
-
- /** {@inheritDoc} */
- @Override public GridCacheVersion version() {
- return state.futureVersion();
}
/** {@inheritDoc} */
- @Override public Collection<?> keys() {
- return keys;
- }
-
- /** {@inheritDoc} */
- @Override public boolean onNodeLeft(UUID nodeId) {
- state.onNodeLeft(nodeId);
-
- return false;
- }
-
- /**
- * Performs future mapping.
- */
- public void map() {
- AffinityTopologyVersion topVer = cctx.shared().lockedTopologyVersion(null);
-
- if (topVer == null)
- mapOnTopology();
- else {
- topLocked = true;
-
- // Cannot remap.
- remapCnt = 1;
-
- state.map(topVer, null);
- }
+ @Override public synchronized GridCacheVersion version() {
+ return futVer;
}
/** {@inheritDoc} */
@Override public IgniteInternalFuture<Void> completeFuture(AffinityTopologyVersion topVer) {
if (waitForPartitionExchange()) {
- GridFutureAdapter<Void> fut = state.completeFuture(topVer);
+ GridFutureAdapter<Void> fut = completeFuture0(topVer);
if (fut != null && isDone()) {
fut.onDone();
@@ -193,6 +191,39 @@ public class GridNearAtomicUpdateFuture extends GridNearAbstractAtomicUpdateFutu
}
/** {@inheritDoc} */
+ @Override public boolean onNodeLeft(UUID nodeId) {
+ GridNearAtomicUpdateResponse res = null;
+
+ synchronized (this) {
+ GridNearAtomicUpdateRequest req;
+
+ if (singleReq != null)
+ req = singleReq.nodeId().equals(nodeId) ? singleReq : null;
+ else
+ req = mappings != null ? mappings.get(nodeId) : null;
+
+ if (req != null && req.response() == null) {
+ res = new GridNearAtomicUpdateResponse(cctx.cacheId(),
+ nodeId,
+ req.futureVersion(),
+ cctx.deploymentEnabled());
+
+ ClusterTopologyCheckedException e = new ClusterTopologyCheckedException("Primary node left grid " +
+ "before response is received: " + nodeId);
+
+ e.retryReadyFuture(cctx.shared().nextAffinityReadyFuture(req.topologyVersion()));
+
+ res.addFailedKeys(req.keys(), e);
+ }
+ }
+
+ if (res != null)
+ onResult(nodeId, res, true);
+
+ return false;
+ }
+
+ /** {@inheritDoc} */
@SuppressWarnings("ConstantConditions")
@Override public boolean onDone(@Nullable Object res, @Nullable Throwable err) {
assert res == null || res instanceof GridCacheReturn;
@@ -207,7 +238,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAbstractAtomicUpdateFutu
retval = Collections.emptyMap();
if (super.onDone(retval, err)) {
- GridCacheVersion futVer = state.onFutureDone();
+ GridCacheVersion futVer = onFutureDone();
if (futVer != null)
cctx.mvcc().removeAtomicFuture(futVer);
@@ -219,13 +250,31 @@ public class GridNearAtomicUpdateFuture extends GridNearAbstractAtomicUpdateFutu
}
/**
+ * Performs future mapping.
+ */
+ public void map() {
+ AffinityTopologyVersion topVer = cctx.shared().lockedTopologyVersion(null);
+
+ if (topVer == null)
+ mapOnTopology();
+ else {
+ topLocked = true;
+
+ // Cannot remap.
+ remapCnt = 1;
+
+ map(topVer, null);
+ }
+ }
+
+ /**
* Response callback.
*
* @param nodeId Node ID.
* @param res Update response.
*/
public void onResult(UUID nodeId, GridNearAtomicUpdateResponse res) {
- state.onResult(nodeId, res, false);
+ onResult(nodeId, res, false);
}
/**
@@ -281,7 +330,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAbstractAtomicUpdateFutu
cache.topology().readUnlock();
}
- state.map(topVer, null);
+ map(topVer, null);
}
/**
@@ -310,7 +359,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAbstractAtomicUpdateFutu
onDone(new GridCacheReturn(cctx, true, true, null, true));
}
catch (IgniteCheckedException e) {
- state.onSendError(req, e);
+ onSendError(req, e);
}
}
}
@@ -341,7 +390,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAbstractAtomicUpdateFutu
cctx.io().send(req.nodeId(), req, cctx.ioPolicy());
}
catch (IgniteCheckedException e) {
- state.onSendError(req, e);
+ onSendError(req, e);
}
}
}
@@ -360,610 +409,422 @@ public class GridNearAtomicUpdateFuture extends GridNearAbstractAtomicUpdateFutu
}
/**
- *
+ * @param nodeId Node ID.
+ * @param res Response.
+ * @param nodeErr {@code True} if response was created on node failure.
*/
- private class UpdateState {
- /** Current topology version. */
- private AffinityTopologyVersion topVer = AffinityTopologyVersion.ZERO;
-
- /** */
- private GridCacheVersion updVer;
-
- /** Topology version when got mapping error. */
- private AffinityTopologyVersion mapErrTopVer;
-
- /** Mappings if operations is mapped to more than one node. */
- @GridToStringInclude
- private Map<UUID, GridNearAtomicUpdateRequest> mappings;
-
- /** */
- private int resCnt;
-
- /** Error. */
- private CachePartialUpdateCheckedException err;
+ @SuppressWarnings("unchecked")
+ void onResult(UUID nodeId, GridNearAtomicUpdateResponse res, boolean nodeErr) {
+ GridNearAtomicUpdateRequest req;
- /** Future ID. */
- private GridCacheVersion futVer;
+ AffinityTopologyVersion remapTopVer = null;
- /** Completion future for a particular topology version. */
- private GridFutureAdapter<Void> topCompleteFut;
+ GridCacheReturn opRes0 = null;
+ CachePartialUpdateCheckedException err0 = null;
- /** Keys to remap. */
- private Collection<KeyCacheObject> remapKeys;
+ boolean rcvAll;
- /** Not null is operation is mapped to single node. */
- private GridNearAtomicUpdateRequest singleReq;
+ GridFutureAdapter<?> fut0 = null;
- /** Operation result. */
- private GridCacheReturn opRes;
-
- /**
- * @return Future version.
- */
- @Nullable synchronized GridCacheVersion futureVersion() {
- return futVer;
- }
-
- /**
- * @param nodeId Left node ID.
- */
- void onNodeLeft(UUID nodeId) {
- GridNearAtomicUpdateResponse res = null;
-
- synchronized (this) {
- GridNearAtomicUpdateRequest req;
-
- if (singleReq != null)
- req = singleReq.nodeId().equals(nodeId) ? singleReq : null;
- else
- req = mappings != null ? mappings.get(nodeId) : null;
+ synchronized (this) {
+ if (!res.futureVersion().equals(futVer))
+ return;
- if (req != null && req.response() == null) {
- res = new GridNearAtomicUpdateResponse(cctx.cacheId(),
- nodeId,
- req.futureVersion(),
- cctx.deploymentEnabled());
+ if (singleReq != null) {
+ if (!singleReq.nodeId().equals(nodeId))
+ return;
- ClusterTopologyCheckedException e = new ClusterTopologyCheckedException("Primary node left grid " +
- "before response is received: " + nodeId);
+ req = singleReq;
- e.retryReadyFuture(cctx.shared().nextAffinityReadyFuture(req.topologyVersion()));
+ singleReq = null;
- res.addFailedKeys(req.keys(), e);
- }
+ rcvAll = true;
}
+ else {
+ req = mappings != null ? mappings.get(nodeId) : null;
- if (res != null)
- onResult(nodeId, res, true);
- }
-
- /**
- * @param nodeId Node ID.
- * @param res Response.
- * @param nodeErr {@code True} if response was created on node failure.
- */
- @SuppressWarnings("unchecked")
- void onResult(UUID nodeId, GridNearAtomicUpdateResponse res, boolean nodeErr) {
- GridNearAtomicUpdateRequest req;
-
- AffinityTopologyVersion remapTopVer = null;
-
- GridCacheReturn opRes0 = null;
- CachePartialUpdateCheckedException err0 = null;
-
- boolean rcvAll;
-
- GridFutureAdapter<?> fut0 = null;
+ if (req != null && req.onResponse(res)) {
+ resCnt++;
- synchronized (this) {
- if (!res.futureVersion().equals(futVer))
+ rcvAll = mappings.size() == resCnt;
+ }
+ else
return;
+ }
- if (singleReq != null) {
- if (!singleReq.nodeId().equals(nodeId))
- return;
+ assert req != null && req.topologyVersion().equals(topVer) : req;
- req = singleReq;
+ if (res.remapKeys() != null) {
+ assert !fastMap || cctx.kernalContext().clientNode();
- singleReq = null;
+ if (remapKeys == null)
+ remapKeys = U.newHashSet(res.remapKeys().size());
- rcvAll = true;
- }
- else {
- req = mappings != null ? mappings.get(nodeId) : null;
+ remapKeys.addAll(res.remapKeys());
- if (req != null && req.onResponse(res)) {
- resCnt++;
+ if (mapErrTopVer == null || mapErrTopVer.compareTo(req.topologyVersion()) < 0)
+ mapErrTopVer = req.topologyVersion();
+ }
+ else if (res.error() != null) {
+ if (res.failedKeys() != null)
+ addFailedKeys(res.failedKeys(), req.topologyVersion(), res.error());
+ }
+ else {
+ if (!req.fastMap() || req.hasPrimary()) {
+ GridCacheReturn ret = res.returnValue();
- rcvAll = mappings.size() == resCnt;
+ if (op == TRANSFORM) {
+ if (ret != null)
+ addInvokeResults(ret);
}
else
- return;
+ opRes = ret;
}
+ }
- assert req != null && req.topologyVersion().equals(topVer) : req;
-
- if (res.remapKeys() != null) {
- assert !fastMap || cctx.kernalContext().clientNode();
-
- if (remapKeys == null)
- remapKeys = U.newHashSet(res.remapKeys().size());
-
- remapKeys.addAll(res.remapKeys());
+ if (rcvAll) {
+ if (remapKeys != null) {
+ assert mapErrTopVer != null;
- if (mapErrTopVer == null || mapErrTopVer.compareTo(req.topologyVersion()) < 0)
- mapErrTopVer = req.topologyVersion();
- }
- else if (res.error() != null) {
- if (res.failedKeys() != null)
- addFailedKeys(res.failedKeys(), req.topologyVersion(), res.error());
+ remapTopVer = new AffinityTopologyVersion(mapErrTopVer.topologyVersion() + 1);
}
else {
- if (!req.fastMap() || req.hasPrimary()) {
- GridCacheReturn ret = res.returnValue();
-
- if (op == TRANSFORM) {
- if (ret != null)
- addInvokeResults(ret);
- }
- else
- opRes = ret;
- }
- }
-
- if (rcvAll) {
- if (remapKeys != null) {
- assert mapErrTopVer != null;
+ if (err != null &&
+ X.hasCause(err, CachePartialUpdateCheckedException.class) &&
+ X.hasCause(err, ClusterTopologyCheckedException.class) &&
+ storeFuture() &&
+ --remapCnt > 0) {
+ ClusterTopologyCheckedException topErr =
+ X.cause(err, ClusterTopologyCheckedException.class);
- remapTopVer = new AffinityTopologyVersion(mapErrTopVer.topologyVersion() + 1);
- }
- else {
- if (err != null &&
- X.hasCause(err, CachePartialUpdateCheckedException.class) &&
- X.hasCause(err, ClusterTopologyCheckedException.class) &&
- storeFuture() &&
- --remapCnt > 0) {
- ClusterTopologyCheckedException topErr =
- X.cause(err, ClusterTopologyCheckedException.class);
+ if (!(topErr instanceof ClusterTopologyServerNotFoundException)) {
+ CachePartialUpdateCheckedException cause =
+ X.cause(err, CachePartialUpdateCheckedException.class);
- if (!(topErr instanceof ClusterTopologyServerNotFoundException)) {
- CachePartialUpdateCheckedException cause =
- X.cause(err, CachePartialUpdateCheckedException.class);
+ assert cause != null && cause.topologyVersion() != null : err;
- assert cause != null && cause.topologyVersion() != null : err;
+ remapTopVer =
+ new AffinityTopologyVersion(cause.topologyVersion().topologyVersion() + 1);
- remapTopVer =
- new AffinityTopologyVersion(cause.topologyVersion().topologyVersion() + 1);
+ err = null;
- err = null;
+ Collection<Object> failedKeys = cause.failedKeys();
- Collection<Object> failedKeys = cause.failedKeys();
+ remapKeys = new ArrayList<>(failedKeys.size());
- remapKeys = new ArrayList<>(failedKeys.size());
+ for (Object key : failedKeys)
+ remapKeys.add(cctx.toCacheKeyObject(key));
- for (Object key : failedKeys)
- remapKeys.add(cctx.toCacheKeyObject(key));
-
- updVer = null;
- }
+ updVer = null;
}
}
+ }
- if (remapTopVer == null) {
- err0 = err;
- opRes0 = opRes;
- }
- else {
- fut0 = topCompleteFut;
+ if (remapTopVer == null) {
+ err0 = err;
+ opRes0 = opRes;
+ }
+ else {
+ fut0 = topCompleteFut;
- topCompleteFut = null;
+ topCompleteFut = null;
- cctx.mvcc().removeAtomicFuture(futVer);
+ cctx.mvcc().removeAtomicFuture(futVer);
- futVer = null;
- topVer = AffinityTopologyVersion.ZERO;
- }
+ futVer = null;
+ topVer = AffinityTopologyVersion.ZERO;
}
}
+ }
- if (res.error() != null && res.failedKeys() == null) {
- onDone(res.error());
+ if (res.error() != null && res.failedKeys() == null) {
+ onDone(res.error());
- return;
- }
+ return;
+ }
- if (rcvAll && nearEnabled) {
- if (mappings != null) {
- for (GridNearAtomicUpdateRequest req0 : mappings.values()) {
- GridNearAtomicUpdateResponse res0 = req0.response();
+ if (rcvAll && nearEnabled) {
+ if (mappings != null) {
+ for (GridNearAtomicUpdateRequest req0 : mappings.values()) {
+ GridNearAtomicUpdateResponse res0 = req0.response();
- assert res0 != null : req0;
+ assert res0 != null : req0;
- updateNear(req0, res0);
- }
+ updateNear(req0, res0);
}
- else if (!nodeErr)
- updateNear(req, res);
}
+ else if (!nodeErr)
+ updateNear(req, res);
+ }
- if (remapTopVer != null) {
- if (fut0 != null)
- fut0.onDone();
+ if (remapTopVer != null) {
+ if (fut0 != null)
+ fut0.onDone();
- if (!waitTopFut) {
- onDone(new GridCacheTryPutFailedException());
+ if (!waitTopFut) {
+ onDone(new GridCacheTryPutFailedException());
- return;
- }
+ return;
+ }
- if (topLocked) {
- assert !F.isEmpty(remapKeys) : remapKeys;
+ if (topLocked) {
+ assert !F.isEmpty(remapKeys) : remapKeys;
- CachePartialUpdateCheckedException e =
- new CachePartialUpdateCheckedException("Failed to update keys (retry update if possible).");
+ CachePartialUpdateCheckedException e =
+ new CachePartialUpdateCheckedException("Failed to update keys (retry update if possible).");
- ClusterTopologyCheckedException cause = new ClusterTopologyCheckedException(
- "Failed to update keys, topology changed while execute atomic update inside transaction.");
+ ClusterTopologyCheckedException cause = new ClusterTopologyCheckedException(
+ "Failed to update keys, topology changed while execute atomic update inside transaction.");
- cause.retryReadyFuture(cctx.affinity().affinityReadyFuture(remapTopVer));
+ cause.retryReadyFuture(cctx.affinity().affinityReadyFuture(remapTopVer));
- e.add(remapKeys, cause);
+ e.add(remapKeys, cause);
- onDone(e);
+ onDone(e);
- return;
- }
+ return;
+ }
- IgniteInternalFuture<AffinityTopologyVersion> fut =
- cctx.shared().exchange().affinityReadyFuture(remapTopVer);
+ IgniteInternalFuture<AffinityTopologyVersion> fut =
+ cctx.shared().exchange().affinityReadyFuture(remapTopVer);
- if (fut == null)
- fut = new GridFinishedFuture<>(remapTopVer);
+ if (fut == null)
+ fut = new GridFinishedFuture<>(remapTopVer);
- fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
- @Override public void apply(final IgniteInternalFuture<AffinityTopologyVersion> fut) {
- cctx.kernalContext().closure().runLocalSafe(new Runnable() {
- @Override public void run() {
- try {
- AffinityTopologyVersion topVer = fut.get();
+ fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
+ @Override public void apply(final IgniteInternalFuture<AffinityTopologyVersion> fut) {
+ cctx.kernalContext().closure().runLocalSafe(new Runnable() {
+ @Override public void run() {
+ try {
+ AffinityTopologyVersion topVer = fut.get();
- map(topVer, remapKeys);
- }
- catch (IgniteCheckedException e) {
- onDone(e);
- }
+ map(topVer, remapKeys);
}
- });
- }
- });
-
- return;
- }
+ catch (IgniteCheckedException e) {
+ onDone(e);
+ }
+ }
+ });
+ }
+ });
- if (rcvAll)
- onDone(opRes0, err0);
+ return;
}
- /**
- * @param req Request.
- * @param e Error.
- */
- void onSendError(GridNearAtomicUpdateRequest req, IgniteCheckedException e) {
- synchronized (this) {
- GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(cctx.cacheId(),
- req.nodeId(),
- req.futureVersion(),
- cctx.deploymentEnabled());
+ if (rcvAll)
+ onDone(opRes0, err0);
+ }
- res.addFailedKeys(req.keys(), e);
+ /**
+ * @param req Request.
+ * @param e Error.
+ */
+ void onSendError(GridNearAtomicUpdateRequest req, IgniteCheckedException e) {
+ synchronized (this) {
+ GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(cctx.cacheId(),
+ req.nodeId(),
+ req.futureVersion(),
+ cctx.deploymentEnabled());
- onResult(req.nodeId(), res, true);
- }
+ res.addFailedKeys(req.keys(), e);
+
+ onResult(req.nodeId(), res, true);
}
+ }
- /**
- * @param topVer Topology version.
- * @param remapKeys Keys to remap.
- */
- void map(AffinityTopologyVersion topVer, @Nullable Collection<KeyCacheObject> remapKeys) {
- Collection<ClusterNode> topNodes = CU.affinityNodes(cctx, topVer);
+ /**
+ * @param topVer Topology version.
+ * @param remapKeys Keys to remap.
+ */
+ void map(AffinityTopologyVersion topVer, @Nullable Collection<KeyCacheObject> remapKeys) {
+ Collection<ClusterNode> topNodes = CU.affinityNodes(cctx, topVer);
- if (F.isEmpty(topNodes)) {
- onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all partition nodes " +
- "left the grid)."));
+ if (F.isEmpty(topNodes)) {
+ onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all partition nodes " +
+ "left the grid)."));
- return;
- }
+ return;
+ }
- Exception err = null;
- GridNearAtomicUpdateRequest singleReq0 = null;
- Map<UUID, GridNearAtomicUpdateRequest> mappings0 = null;
+ Exception err = null;
+ GridNearAtomicUpdateRequest singleReq0 = null;
+ Map<UUID, GridNearAtomicUpdateRequest> mappings0 = null;
- int size = keys.size();
+ int size = keys.size();
- GridCacheVersion futVer = cctx.versions().next(topVer);
+ GridCacheVersion futVer = cctx.versions().next(topVer);
- GridCacheVersion updVer;
+ GridCacheVersion updVer;
- // Assign version on near node in CLOCK ordering mode even if fastMap is false.
- if (cctx.config().getAtomicWriteOrderMode() == CLOCK) {
- updVer = this.updVer;
+ // Assign version on near node in CLOCK ordering mode even if fastMap is false.
+ if (cctx.config().getAtomicWriteOrderMode() == CLOCK) {
+ updVer = this.updVer;
- if (updVer == null) {
- updVer = cctx.versions().next(topVer);
+ if (updVer == null) {
+ updVer = cctx.versions().next(topVer);
- if (log.isDebugEnabled())
- log.debug("Assigned fast-map version for update on near node: " + updVer);
- }
+ if (log.isDebugEnabled())
+ log.debug("Assigned fast-map version for update on near node: " + updVer);
}
- else
- updVer = null;
+ }
+ else
+ updVer = null;
- try {
- if (size == 1 && !fastMap) {
- assert remapKeys == null || remapKeys.size() == 1;
+ try {
+ if (size == 1 && !fastMap) {
+ assert remapKeys == null || remapKeys.size() == 1;
- singleReq0 = mapSingleUpdate(topVer, futVer, updVer);
- }
+ singleReq0 = mapSingleUpdate(topVer, futVer, updVer);
+ }
+ else {
+ Map<UUID, GridNearAtomicUpdateRequest> pendingMappings = mapUpdate(topNodes,
+ topVer,
+ futVer,
+ updVer,
+ remapKeys);
+
+ if (pendingMappings.size() == 1)
+ singleReq0 = F.firstValue(pendingMappings);
else {
- Map<UUID, GridNearAtomicUpdateRequest> pendingMappings = mapUpdate(topNodes,
- topVer,
- futVer,
- updVer,
- remapKeys);
+ if (syncMode == PRIMARY_SYNC) {
+ mappings0 = U.newHashMap(pendingMappings.size());
- if (pendingMappings.size() == 1)
- singleReq0 = F.firstValue(pendingMappings);
- else {
- if (syncMode == PRIMARY_SYNC) {
- mappings0 = U.newHashMap(pendingMappings.size());
-
- for (GridNearAtomicUpdateRequest req : pendingMappings.values()) {
- if (req.hasPrimary())
- mappings0.put(req.nodeId(), req);
- }
+ for (GridNearAtomicUpdateRequest req : pendingMappings.values()) {
+ if (req.hasPrimary())
+ mappings0.put(req.nodeId(), req);
}
- else
- mappings0 = pendingMappings;
-
- assert !mappings0.isEmpty() || size == 0 : GridNearAtomicUpdateFuture.this;
}
+ else
+ mappings0 = pendingMappings;
+
+ assert !mappings0.isEmpty() || size == 0 : this;
}
+ }
- synchronized (this) {
- assert this.futVer == null : this;
- assert this.topVer == AffinityTopologyVersion.ZERO : this;
+ synchronized (this) {
+ assert this.futVer == null : this;
+ assert this.topVer == AffinityTopologyVersion.ZERO : this;
- this.topVer = topVer;
- this.updVer = updVer;
- this.futVer = futVer;
+ this.topVer = topVer;
+ this.updVer = updVer;
+ this.futVer = futVer;
- resCnt = 0;
+ resCnt = 0;
- singleReq = singleReq0;
- mappings = mappings0;
+ singleReq = singleReq0;
+ mappings = mappings0;
- this.remapKeys = null;
- }
- }
- catch (Exception e) {
- err = e;
+ this.remapKeys = null;
}
+ }
+ catch (Exception e) {
+ err = e;
+ }
- if (err != null) {
- onDone(err);
+ if (err != null) {
+ onDone(err);
- return;
- }
+ return;
+ }
- if (storeFuture()) {
- if (!cctx.mvcc().addAtomicFuture(futVer, GridNearAtomicUpdateFuture.this)) {
- assert isDone() : GridNearAtomicUpdateFuture.this;
+ if (storeFuture()) {
+ if (!cctx.mvcc().addAtomicFuture(futVer, this)) {
+ assert isDone() : this;
- return;
- }
+ return;
}
+ }
- // Optimize mapping for single key.
- if (singleReq0 != null)
- mapSingle(singleReq0.nodeId(), singleReq0);
- else {
- assert mappings0 != null;
+ // Optimize mapping for single key.
+ if (singleReq0 != null)
+ mapSingle(singleReq0.nodeId(), singleReq0);
+ else {
+ assert mappings0 != null;
- if (size == 0)
- onDone(new GridCacheReturn(cctx, true, true, null, true));
- else
- doUpdate(mappings0);
- }
+ if (size == 0)
+ onDone(new GridCacheReturn(cctx, true, true, null, true));
+ else
+ doUpdate(mappings0);
}
+ }
- /**
- * @param topVer Topology version.
- * @return Future.
- */
- @Nullable synchronized GridFutureAdapter<Void> completeFuture(AffinityTopologyVersion topVer) {
- if (this.topVer == AffinityTopologyVersion.ZERO)
- return null;
-
- if (this.topVer.compareTo(topVer) < 0) {
- if (topCompleteFut == null)
- topCompleteFut = new GridFutureAdapter<>();
+ /**
+ * @param topVer Topology version.
+ * @return Future.
+ */
+ @Nullable private synchronized GridFutureAdapter<Void> completeFuture0(AffinityTopologyVersion topVer) {
+ if (this.topVer == AffinityTopologyVersion.ZERO)
+ return null;
- return topCompleteFut;
- }
+ if (this.topVer.compareTo(topVer) < 0) {
+ if (topCompleteFut == null)
+ topCompleteFut = new GridFutureAdapter<>();
- return null;
+ return topCompleteFut;
}
- /**
- * @return Future version.
- */
- GridCacheVersion onFutureDone() {
- GridCacheVersion ver0;
-
- GridFutureAdapter<Void> fut0;
+ return null;
+ }
- synchronized (this) {
- fut0 = topCompleteFut;
+ /**
+ * @return Future version.
+ */
+ private GridCacheVersion onFutureDone() {
+ GridCacheVersion ver0;
- topCompleteFut = null;
+ GridFutureAdapter<Void> fut0;
- ver0 = futVer;
+ synchronized (this) {
+ fut0 = topCompleteFut;
- futVer = null;
- }
+ topCompleteFut = null;
- if (fut0 != null)
- fut0.onDone();
+ ver0 = futVer;
- return ver0;
+ futVer = null;
}
- /**
- * @param topNodes Cache nodes.
- * @param topVer Topology version.
- * @param futVer Future version.
- * @param updVer Update version.
- * @param remapKeys Keys to remap.
- * @return Mapping.
- * @throws Exception If failed.
- */
- private Map<UUID, GridNearAtomicUpdateRequest> mapUpdate(Collection<ClusterNode> topNodes,
- AffinityTopologyVersion topVer,
- GridCacheVersion futVer,
- @Nullable GridCacheVersion updVer,
- @Nullable Collection<KeyCacheObject> remapKeys) throws Exception {
- Iterator<?> it = null;
-
- if (vals != null)
- it = vals.iterator();
-
- Iterator<GridCacheDrInfo> conflictPutValsIt = null;
-
- if (conflictPutVals != null)
- conflictPutValsIt = conflictPutVals.iterator();
-
- Iterator<GridCacheVersion> conflictRmvValsIt = null;
-
- if (conflictRmvVals != null)
- conflictRmvValsIt = conflictRmvVals.iterator();
-
- Map<UUID, GridNearAtomicUpdateRequest> pendingMappings = U.newHashMap(topNodes.size());
-
- // Create mappings first, then send messages.
- for (Object key : keys) {
- if (key == null)
- throw new NullPointerException("Null key.");
-
- Object val;
- GridCacheVersion conflictVer;
- long conflictTtl;
- long conflictExpireTime;
-
- if (vals != null) {
- val = it.next();
- conflictVer = null;
- conflictTtl = CU.TTL_NOT_CHANGED;
- conflictExpireTime = CU.EXPIRE_TIME_CALCULATE;
-
- if (val == null)
- throw new NullPointerException("Null value.");
- }
- else if (conflictPutVals != null) {
- GridCacheDrInfo conflictPutVal = conflictPutValsIt.next();
-
- val = conflictPutVal.valueEx();
- conflictVer = conflictPutVal.version();
- conflictTtl = conflictPutVal.ttl();
- conflictExpireTime = conflictPutVal.expireTime();
- }
- else if (conflictRmvVals != null) {
- val = null;
- conflictVer = conflictRmvValsIt.next();
- conflictTtl = CU.TTL_NOT_CHANGED;
- conflictExpireTime = CU.EXPIRE_TIME_CALCULATE;
- }
- else {
- val = null;
- conflictVer = null;
- conflictTtl = CU.TTL_NOT_CHANGED;
- conflictExpireTime = CU.EXPIRE_TIME_CALCULATE;
- }
-
- if (val == null && op != GridCacheOperation.DELETE)
- continue;
-
- KeyCacheObject cacheKey = cctx.toCacheKeyObject(key);
+ if (fut0 != null)
+ fut0.onDone();
- if (remapKeys != null && !remapKeys.contains(cacheKey))
- continue;
+ return ver0;
+ }
- if (op != TRANSFORM)
- val = cctx.toCacheObject(val);
+ /**
+ * @param topNodes Cache nodes.
+ * @param topVer Topology version.
+ * @param futVer Future version.
+ * @param updVer Update version.
+ * @param remapKeys Keys to remap.
+ * @return Mapping.
+ * @throws Exception If failed.
+ */
+ private Map<UUID, GridNearAtomicUpdateRequest> mapUpdate(Collection<ClusterNode> topNodes,
+ AffinityTopologyVersion topVer,
+ GridCacheVersion futVer,
+ @Nullable GridCacheVersion updVer,
+ @Nullable Collection<KeyCacheObject> remapKeys) throws Exception {
+ Iterator<?> it = null;
- Collection<ClusterNode> affNodes = mapKey(cacheKey, topVer, fastMap);
+ if (vals != null)
+ it = vals.iterator();
- if (affNodes.isEmpty())
- throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
- "(all partition nodes left the grid).");
+ Iterator<GridCacheDrInfo> conflictPutValsIt = null;
- int i = 0;
-
- for (ClusterNode affNode : affNodes) {
- if (affNode == null)
- throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
- "(all partition nodes left the grid).");
-
- UUID nodeId = affNode.id();
-
- GridNearAtomicUpdateRequest mapped = pendingMappings.get(nodeId);
-
- if (mapped == null) {
- mapped = new GridNearAtomicUpdateRequest(
- cctx.cacheId(),
- nodeId,
- futVer,
- fastMap,
- updVer,
- topVer,
- topLocked,
- syncMode,
- op,
- retval,
- expiryPlc,
- invokeArgs,
- filter,
- subjId,
- taskNameHash,
- skipStore,
- keepBinary,
- cctx.kernalContext().clientNode(),
- cctx.deploymentEnabled(),
- keys.size());
-
- pendingMappings.put(nodeId, mapped);
- }
+ if (conflictPutVals != null)
+ conflictPutValsIt = conflictPutVals.iterator();
- mapped.addUpdateEntry(cacheKey, val, conflictTtl, conflictExpireTime, conflictVer, i == 0);
+ Iterator<GridCacheVersion> conflictRmvValsIt = null;
- i++;
- }
- }
+ if (conflictRmvVals != null)
+ conflictRmvValsIt = conflictRmvVals.iterator();
- return pendingMappings;
- }
+ Map<UUID, GridNearAtomicUpdateRequest> pendingMappings = U.newHashMap(topNodes.size());
- /**
- * @param topVer Topology version.
- * @param futVer Future version.
- * @param updVer Update version.
- * @return Request.
- * @throws Exception If failed.
- */
- private GridNearAtomicUpdateRequest mapSingleUpdate(AffinityTopologyVersion topVer,
- GridCacheVersion futVer,
- @Nullable GridCacheVersion updVer) throws Exception {
- Object key = F.first(keys);
+ // Create mappings first, then send messages.
+ for (Object key : keys) {
+ if (key == null)
+ throw new NullPointerException("Null key.");
Object val;
GridCacheVersion conflictVer;
@@ -971,127 +832,231 @@ public class GridNearAtomicUpdateFuture extends GridNearAbstractAtomicUpdateFutu
long conflictExpireTime;
if (vals != null) {
- // Regular PUT.
- val = F.first(vals);
+ val = it.next();
conflictVer = null;
conflictTtl = CU.TTL_NOT_CHANGED;
conflictExpireTime = CU.EXPIRE_TIME_CALCULATE;
+
+ if (val == null)
+ throw new NullPointerException("Null value.");
}
else if (conflictPutVals != null) {
- // Conflict PUT.
- GridCacheDrInfo conflictPutVal = F.first(conflictPutVals);
+ GridCacheDrInfo conflictPutVal = conflictPutValsIt.next();
val = conflictPutVal.valueEx();
conflictVer = conflictPutVal.version();
- conflictTtl = conflictPutVal.ttl();
+ conflictTtl = conflictPutVal.ttl();
conflictExpireTime = conflictPutVal.expireTime();
}
else if (conflictRmvVals != null) {
- // Conflict REMOVE.
val = null;
- conflictVer = F.first(conflictRmvVals);
+ conflictVer = conflictRmvValsIt.next();
conflictTtl = CU.TTL_NOT_CHANGED;
conflictExpireTime = CU.EXPIRE_TIME_CALCULATE;
}
else {
- // Regular REMOVE.
val = null;
conflictVer = null;
conflictTtl = CU.TTL_NOT_CHANGED;
conflictExpireTime = CU.EXPIRE_TIME_CALCULATE;
}
- // We still can get here if user pass map with single element.
- if (key == null)
- throw new NullPointerException("Null key.");
-
if (val == null && op != GridCacheOperation.DELETE)
- throw new NullPointerException("Null value.");
+ continue;
KeyCacheObject cacheKey = cctx.toCacheKeyObject(key);
+ if (remapKeys != null && !remapKeys.contains(cacheKey))
+ continue;
+
if (op != TRANSFORM)
val = cctx.toCacheObject(val);
- ClusterNode primary = cctx.affinity().primary(cacheKey, topVer);
-
- if (primary == null)
- throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all partition nodes " +
- "left the grid).");
-
- GridNearAtomicUpdateRequest req = new GridNearAtomicUpdateRequest(
- cctx.cacheId(),
- primary.id(),
- futVer,
- fastMap,
- updVer,
- topVer,
- topLocked,
- syncMode,
- op,
- retval,
- expiryPlc,
- invokeArgs,
- filter,
- subjId,
- taskNameHash,
- skipStore,
- keepBinary,
- cctx.kernalContext().clientNode(),
- cctx.deploymentEnabled(),
- 1);
-
- req.addUpdateEntry(cacheKey,
- val,
- conflictTtl,
- conflictExpireTime,
- conflictVer,
- true);
-
- return req;
- }
+ Collection<ClusterNode> affNodes = mapKey(cacheKey, topVer, fastMap);
- /**
- * @param ret Result from single node.
- */
- @SuppressWarnings("unchecked")
- private void addInvokeResults(GridCacheReturn ret) {
- assert op == TRANSFORM : op;
- assert ret.value() == null || ret.value() instanceof Map : ret.value();
-
- if (ret.value() != null) {
- if (opRes != null)
- opRes.mergeEntryProcessResults(ret);
- else
- opRes = ret;
- }
- }
+ if (affNodes.isEmpty())
+ throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
+ "(all partition nodes left the grid).");
+
+ int i = 0;
- /**
- * @param failedKeys Failed keys.
- * @param topVer Topology version for failed update.
- * @param err Error cause.
- */
- private void addFailedKeys(Collection<KeyCacheObject> failedKeys,
- AffinityTopologyVersion topVer,
- Throwable err) {
- CachePartialUpdateCheckedException err0 = this.err;
+ for (ClusterNode affNode : affNodes) {
+ if (affNode == null)
+ throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
+ "(all partition nodes left the grid).");
- if (err0 == null)
- err0 = this.err = new CachePartialUpdateCheckedException("Failed to update keys (retry update if possible).");
+ UUID nodeId = affNode.id();
- Collection<Object> keys = new ArrayList<>(failedKeys.size());
+ GridNearAtomicUpdateRequest mapped = pendingMappings.get(nodeId);
+
+ if (mapped == null) {
+ mapped = new GridNearAtomicUpdateRequest(
+ cctx.cacheId(),
+ nodeId,
+ futVer,
+ fastMap,
+ updVer,
+ topVer,
+ topLocked,
+ syncMode,
+ op,
+ retval,
+ expiryPlc,
+ invokeArgs,
+ filter,
+ subjId,
+ taskNameHash,
+ skipStore,
+ keepBinary,
+ cctx.kernalContext().clientNode(),
+ cctx.deploymentEnabled(),
+ keys.size());
+
+ pendingMappings.put(nodeId, mapped);
+ }
- for (KeyCacheObject key : failedKeys)
- keys.add(cctx.cacheObjectContext().unwrapBinaryIfNeeded(key, keepBinary, false));
+ mapped.addUpdateEntry(cacheKey, val, conflictTtl, conflictExpireTime, conflictVer, i == 0);
- err0.add(keys, err, topVer);
+ i++;
+ }
}
- /** {@inheritDoc} */
- @Override public synchronized String toString() {
- return S.toString(UpdateState.class, this);
+ return pendingMappings;
+ }
+
+ /**
+ * @param topVer Topology version.
+ * @param futVer Future version.
+ * @param updVer Update version.
+ * @return Request.
+ * @throws Exception If failed.
+ */
+ private GridNearAtomicUpdateRequest mapSingleUpdate(AffinityTopologyVersion topVer,
+ GridCacheVersion futVer,
+ @Nullable GridCacheVersion updVer) throws Exception {
+ Object key = F.first(keys);
+
+ Object val;
+ GridCacheVersion conflictVer;
+ long conflictTtl;
+ long conflictExpireTime;
+
+ if (vals != null) {
+ // Regular PUT.
+ val = F.first(vals);
+ conflictVer = null;
+ conflictTtl = CU.TTL_NOT_CHANGED;
+ conflictExpireTime = CU.EXPIRE_TIME_CALCULATE;
+ }
+ else if (conflictPutVals != null) {
+ // Conflict PUT.
+ GridCacheDrInfo conflictPutVal = F.first(conflictPutVals);
+
+ val = conflictPutVal.valueEx();
+ conflictVer = conflictPutVal.version();
+ conflictTtl = conflictPutVal.ttl();
+ conflictExpireTime = conflictPutVal.expireTime();
+ }
+ else if (conflictRmvVals != null) {
+ // Conflict REMOVE.
+ val = null;
+ conflictVer = F.first(conflictRmvVals);
+ conflictTtl = CU.TTL_NOT_CHANGED;
+ conflictExpireTime = CU.EXPIRE_TIME_CALCULATE;
}
+ else {
+ // Regular REMOVE.
+ val = null;
+ conflictVer = null;
+ conflictTtl = CU.TTL_NOT_CHANGED;
+ conflictExpireTime = CU.EXPIRE_TIME_CALCULATE;
+ }
+
+ // We still can get here if user pass map with single element.
+ if (key == null)
+ throw new NullPointerException("Null key.");
+
+ if (val == null && op != GridCacheOperation.DELETE)
+ throw new NullPointerException("Null value.");
+
+ KeyCacheObject cacheKey = cctx.toCacheKeyObject(key);
+
+ if (op != TRANSFORM)
+ val = cctx.toCacheObject(val);
+
+ ClusterNode primary = cctx.affinity().primary(cacheKey, topVer);
+
+ if (primary == null)
+ throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all partition nodes " +
+ "left the grid).");
+
+ GridNearAtomicUpdateRequest req = new GridNearAtomicUpdateRequest(
+ cctx.cacheId(),
+ primary.id(),
+ futVer,
+ fastMap,
+ updVer,
+ topVer,
+ topLocked,
+ syncMode,
+ op,
+ retval,
+ expiryPlc,
+ invokeArgs,
+ filter,
+ subjId,
+ taskNameHash,
+ skipStore,
+ keepBinary,
+ cctx.kernalContext().clientNode(),
+ cctx.deploymentEnabled(),
+ 1);
+
+ req.addUpdateEntry(cacheKey,
+ val,
+ conflictTtl,
+ conflictExpireTime,
+ conflictVer,
+ true);
+
+ return req;
+ }
+
+ /**
+ * @param ret Result from single node.
+ */
+ @SuppressWarnings("unchecked")
+ private void addInvokeResults(GridCacheReturn ret) {
+ assert op == TRANSFORM : op;
+ assert ret.value() == null || ret.value() instanceof Map : ret.value();
+
+ if (ret.value() != null) {
+ if (opRes != null)
+ opRes.mergeEntryProcessResults(ret);
+ else
+ opRes = ret;
+ }
+ }
+
+ /**
+ * @param failedKeys Failed keys.
+ * @param topVer Topology version for failed update.
+ * @param err Error cause.
+ */
+ private void addFailedKeys(Collection<KeyCacheObject> failedKeys,
+ AffinityTopologyVersion topVer,
+ Throwable err) {
+ CachePartialUpdateCheckedException err0 = this.err;
+
+ if (err0 == null)
+ err0 = this.err = new CachePartialUpdateCheckedException("Failed to update keys (retry update if possible).");
+
+ Collection<Object> keys = new ArrayList<>(failedKeys.size());
+
+ for (KeyCacheObject key : failedKeys)
+ keys.add(cctx.cacheObjectContext().unwrapBinaryIfNeeded(key, keepBinary, false));
+
+ err0.add(keys, err, topVer);
}
/** {@inheritDoc} */