You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/03/13 15:09:38 UTC
[4/8] ignite git commit: ignite-4705 Atomic cache protocol change:
notify client node from backups
http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
index 0a816a7..930c4af 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
@@ -17,9 +17,8 @@
package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
-import java.util.ArrayList;
-import java.util.Collection;
import java.util.Collections;
+import java.util.List;
import java.util.Map;
import java.util.UUID;
import javax.cache.expiry.ExpiryPolicy;
@@ -41,36 +40,29 @@ import org.apache.ignite.internal.processors.cache.GridCacheTryPutFailedExceptio
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearAtomicCache;
-import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
-import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.lang.IgniteProductVersion;
import org.jetbrains.annotations.Nullable;
-import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.CLOCK;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC;
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
/**
* DHT atomic cache near update future.
*/
public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpdateFuture {
- /** */
- private static final IgniteProductVersion SINGLE_UPDATE_REQUEST = IgniteProductVersion.fromString("1.7.4");
-
/** Keys */
private Object key;
/** Values. */
- @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"})
private Object val;
- /** Not null is operation is mapped to single node. */
- private GridNearAtomicAbstractUpdateRequest req;
+ /** */
+ private PrimaryRequestState reqState;
/**
* @param cctx Cache context.
@@ -110,8 +102,21 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
int remapCnt,
boolean waitTopFut
) {
- super(cctx, cache, syncMode, op, invokeArgs, retval, rawRetval, expiryPlc, filter, subjId, taskNameHash,
- skipStore, keepBinary, remapCnt, waitTopFut);
+ super(cctx,
+ cache,
+ syncMode,
+ op,
+ invokeArgs,
+ retval,
+ rawRetval,
+ expiryPlc,
+ filter,
+ subjId,
+ taskNameHash,
+ skipStore,
+ keepBinary,
+ remapCnt,
+ waitTopFut);
assert subjId != null;
@@ -120,52 +125,63 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
}
/** {@inheritDoc} */
- @Override public GridCacheVersion version() {
+ @Override public Long id() {
synchronized (mux) {
- return futVer;
+ return futId;
}
}
/** {@inheritDoc} */
@Override public boolean onNodeLeft(UUID nodeId) {
- GridNearAtomicUpdateResponse res = null;
+ GridCacheReturn opRes0 = null;
+ CachePartialUpdateCheckedException err0 = null;
+ AffinityTopologyVersion remapTopVer0 = null;
- GridNearAtomicAbstractUpdateRequest req;
+ GridNearAtomicCheckUpdateRequest checkReq = null;
+
+ boolean rcvAll = false;
synchronized (mux) {
- req = this.req != null && this.req.nodeId().equals(nodeId) ? this.req : null;
+ if (reqState == null)
+ return false;
- if (req != null && req.response() == null) {
- res = new GridNearAtomicUpdateResponse(cctx.cacheId(),
- nodeId,
- req.futureVersion(),
- cctx.deploymentEnabled());
+ if (reqState.req.nodeId.equals(nodeId)) {
+ GridNearAtomicAbstractUpdateRequest req = reqState.onPrimaryFail();
+
+ if (req != null) {
+ GridNearAtomicUpdateResponse res = primaryFailedResponse(req);
- ClusterTopologyCheckedException e = new ClusterTopologyCheckedException("Primary node left grid " +
- "before response is received: " + nodeId);
+ rcvAll = true;
- e.retryReadyFuture(cctx.shared().nextAffinityReadyFuture(req.topologyVersion()));
+ reqState.onPrimaryResponse(res, cctx);
- res.addFailedKeys(req.keys(), e);
+ onPrimaryError(req, res);
+ }
}
- }
+ else {
+ DhtLeftResult res = reqState.onDhtNodeLeft(nodeId);
- if (res != null) {
- if (msgLog.isDebugEnabled()) {
- msgLog.debug("Near update single fut, node left [futId=" + req.futureVersion() +
- ", writeVer=" + req.updateVersion() +
- ", node=" + nodeId + ']');
+ if (res == DhtLeftResult.DONE)
+ rcvAll = true;
+ else if (res == DhtLeftResult.ALL_RCVD_CHECK_PRIMARY)
+ checkReq = new GridNearAtomicCheckUpdateRequest(reqState.req);
+ else
+ return false;
}
- onResult(nodeId, res, true);
+ if (rcvAll) {
+ opRes0 = opRes;
+ err0 = err;
+ remapTopVer0 = onAllReceived();
+ }
}
- return false;
- }
+ if (checkReq != null)
+ sendCheckUpdateRequest(checkReq);
+ else if (rcvAll)
+ finishUpdateFuture(opRes0, err0, remapTopVer0);
- /** {@inheritDoc} */
- @Override public IgniteInternalFuture<Void> completeFuture(AffinityTopologyVersion topVer) {
- return null;
+ return false;
}
/** {@inheritDoc} */
@@ -175,15 +191,14 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
GridCacheReturn ret = (GridCacheReturn)res;
- Object retval =
- res == null ? null : rawRetval ? ret : (this.retval || op == TRANSFORM) ?
- cctx.unwrapBinaryIfNeeded(ret.value(), keepBinary) : ret.success();
+ Object retval = res == null ? null : rawRetval ? ret : (this.retval || op == TRANSFORM) ?
+ cctx.unwrapBinaryIfNeeded(ret.value(), keepBinary) : ret.success();
if (op == TRANSFORM && retval == null)
retval = Collections.emptyMap();
if (super.onDone(retval, err)) {
- GridCacheVersion futVer = onFutureDone();
+ Long futVer = onFutureDone();
if (futVer != null)
cctx.mvcc().removeAtomicFuture(futVer);
@@ -195,112 +210,103 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
}
/** {@inheritDoc} */
+ @Override public void onDhtResponse(UUID nodeId, GridDhtAtomicNearResponse res) {
+ GridCacheReturn opRes0;
+ CachePartialUpdateCheckedException err0;
+ AffinityTopologyVersion remapTopVer0;
+
+ synchronized (mux) {
+ if (futId == null || futId != res.futureId())
+ return;
+
+ assert reqState != null;
+ assert reqState.req.nodeId().equals(res.primaryId());
+
+ if (opRes == null && res.hasResult())
+ opRes = res.result();
+
+ if (reqState.onDhtResponse(nodeId, res)) {
+ opRes0 = opRes;
+ err0 = err;
+ remapTopVer0 = onAllReceived();
+ }
+ else
+ return;
+ }
+
+ UpdateErrors errors = res.errors();
+
+ if (errors != null) {
+ assert errors.error() != null;
+
+ onDone(errors.error());
+
+ return;
+ }
+
+ finishUpdateFuture(opRes0, err0, remapTopVer0);
+ }
+
+ /** {@inheritDoc} */
@SuppressWarnings({"unchecked", "ThrowableResultOfMethodCallIgnored"})
- @Override public void onResult(UUID nodeId, GridNearAtomicUpdateResponse res, boolean nodeErr) {
+ @Override public void onPrimaryResponse(UUID nodeId, GridNearAtomicUpdateResponse res, boolean nodeErr) {
GridNearAtomicAbstractUpdateRequest req;
- AffinityTopologyVersion remapTopVer = null;
+ AffinityTopologyVersion remapTopVer0;
GridCacheReturn opRes0 = null;
CachePartialUpdateCheckedException err0 = null;
- GridFutureAdapter<?> fut0 = null;
-
synchronized (mux) {
- if (!res.futureVersion().equals(futVer))
+ if (futId == null || futId != res.futureId())
return;
- if (!this.req.nodeId().equals(nodeId))
- return;
+ req = reqState.processPrimaryResponse(nodeId, res);
- req = this.req;
-
- this.req = null;
+ if (req == null)
+ return;
- boolean remapKey = !F.isEmpty(res.remapKeys());
+ boolean remapKey = res.remapTopologyVersion() != null;
if (remapKey) {
- if (mapErrTopVer == null || mapErrTopVer.compareTo(req.topologyVersion()) < 0)
- mapErrTopVer = req.topologyVersion();
- }
- else if (res.error() != null) {
- if (res.failedKeys() != null) {
- if (err == null)
- err = new CachePartialUpdateCheckedException(
- "Failed to update keys (retry update if possible).");
-
- Collection<Object> keys = new ArrayList<>(res.failedKeys().size());
+ assert !req.topologyVersion().equals(res.remapTopologyVersion());
- for (KeyCacheObject key : res.failedKeys())
- keys.add(cctx.cacheObjectContext().unwrapBinaryIfNeeded(key, keepBinary, false));
+ assert remapTopVer == null : remapTopVer;
- err.add(keys, res.error(), req.topologyVersion());
- }
+ remapTopVer = res.remapTopologyVersion();
}
+ else if (res.error() != null)
+ onPrimaryError(req, res);
else {
- if (!req.fastMap() || req.hasPrimary()) {
- GridCacheReturn ret = res.returnValue();
-
- if (op == TRANSFORM) {
- if (ret != null) {
- assert ret.value() == null || ret.value() instanceof Map : ret.value();
-
- if (ret.value() != null) {
- if (opRes != null)
- opRes.mergeEntryProcessResults(ret);
- else
- opRes = ret;
- }
+ GridCacheReturn ret = res.returnValue();
+
+ if (op == TRANSFORM) {
+ if (ret != null) {
+ assert ret.value() == null || ret.value() instanceof Map : ret.value();
+
+ if (ret.value() != null) {
+ if (opRes != null)
+ opRes.mergeEntryProcessResults(ret);
+ else
+ opRes = ret;
}
}
- else
- opRes = ret;
}
- }
+ else
+ opRes = ret;
- if (remapKey) {
- assert mapErrTopVer != null;
+ assert reqState != null;
- remapTopVer = cctx.shared().exchange().topologyVersion();
+ if (!reqState.onPrimaryResponse(res, cctx))
+ return;
}
- else {
- if (err != null &&
- X.hasCause(err, CachePartialUpdateCheckedException.class) &&
- X.hasCause(err, ClusterTopologyCheckedException.class) &&
- storeFuture() &&
- --remapCnt > 0) {
- ClusterTopologyCheckedException topErr =
- X.cause(err, ClusterTopologyCheckedException.class);
-
- if (!(topErr instanceof ClusterTopologyServerNotFoundException)) {
- CachePartialUpdateCheckedException cause =
- X.cause(err, CachePartialUpdateCheckedException.class);
- assert cause != null && cause.topologyVersion() != null : err;
+ remapTopVer0 = onAllReceived();
- remapTopVer =
- new AffinityTopologyVersion(cause.topologyVersion().topologyVersion() + 1);
-
- err = null;
- updVer = null;
- }
- }
- }
-
- if (remapTopVer == null) {
+ if (remapTopVer0 == null) {
err0 = err;
opRes0 = opRes;
}
- else {
- fut0 = topCompleteFut;
-
- topCompleteFut = null;
-
- cctx.mvcc().removeAtomicFuture(futVer);
-
- futVer = null;
- topVer = AffinityTopologyVersion.ZERO;
- }
}
if (res.error() != null && res.failedKeys() == null) {
@@ -309,55 +315,102 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
return;
}
+ if (remapTopVer0 != null) {
+ waitAndRemap(remapTopVer0);
+
+ return;
+ }
+
if (nearEnabled && !nodeErr)
updateNear(req, res);
- if (remapTopVer != null) {
- if (fut0 != null)
- fut0.onDone();
+ onDone(opRes0, err0);
+ }
- if (!waitTopFut) {
- onDone(new GridCacheTryPutFailedException());
+ /**
+ * @return Non-null topology version if update should be remapped.
+ */
+ private AffinityTopologyVersion onAllReceived() {
+ assert futId != null;
- return;
+ AffinityTopologyVersion remapTopVer0 = null;
+
+ if (remapTopVer == null) {
+ if (err != null &&
+ X.hasCause(err, CachePartialUpdateCheckedException.class) &&
+ X.hasCause(err, ClusterTopologyCheckedException.class) &&
+ storeFuture() &&
+ --remapCnt > 0) {
+ ClusterTopologyCheckedException topErr = X.cause(err, ClusterTopologyCheckedException.class);
+
+ if (!(topErr instanceof ClusterTopologyServerNotFoundException)) {
+ CachePartialUpdateCheckedException cause =
+ X.cause(err, CachePartialUpdateCheckedException.class);
+
+ assert cause != null && cause.topologyVersion() != null : err;
+
+ remapTopVer0 = new AffinityTopologyVersion(cause.topologyVersion().topologyVersion() + 1);
+
+ err = null;
+ }
}
+ }
+ else
+ remapTopVer0 = remapTopVer;
- if (topLocked) {
- CachePartialUpdateCheckedException e =
- new CachePartialUpdateCheckedException("Failed to update keys (retry update if possible).");
+ if (remapTopVer0 != null) {
+ cctx.mvcc().removeAtomicFuture(futId);
- ClusterTopologyCheckedException cause = new ClusterTopologyCheckedException(
- "Failed to update keys, topology changed while execute atomic update inside transaction.");
+ reqState = null;
+ futId = null;
+ topVer = AffinityTopologyVersion.ZERO;
- cause.retryReadyFuture(cctx.affinity().affinityReadyFuture(remapTopVer));
+ remapTopVer = null;
+ }
- e.add(Collections.singleton(cctx.toCacheKeyObject(key)), cause);
+ return remapTopVer0;
+ }
- onDone(e);
+ /**
+ * @param remapTopVer New topology version.
+ */
+ private void waitAndRemap(AffinityTopologyVersion remapTopVer) {
+ if (!waitTopFut) {
+ onDone(new GridCacheTryPutFailedException());
- return;
- }
+ return;
+ }
- IgniteInternalFuture<AffinityTopologyVersion> fut =
- cctx.shared().exchange().affinityReadyFuture(remapTopVer);
+ if (topLocked) {
+ CachePartialUpdateCheckedException e =
+ new CachePartialUpdateCheckedException("Failed to update keys (retry update if possible).");
- if (fut == null)
- fut = new GridFinishedFuture<>(remapTopVer);
+ ClusterTopologyCheckedException cause = new ClusterTopologyCheckedException(
+ "Failed to update keys, topology changed while execute atomic update inside transaction.");
- fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
- @Override public void apply(final IgniteInternalFuture<AffinityTopologyVersion> fut) {
- cctx.kernalContext().closure().runLocalSafe(new Runnable() {
- @Override public void run() {
- mapOnTopology();
- }
- });
- }
- });
+ cause.retryReadyFuture(cctx.affinity().affinityReadyFuture(remapTopVer));
+
+ e.add(Collections.singleton(cctx.toCacheKeyObject(key)), cause);
+
+ onDone(e);
return;
}
- onDone(opRes0, err0);
+ IgniteInternalFuture<AffinityTopologyVersion> fut = cctx.shared().exchange().affinityReadyFuture(remapTopVer);
+
+ if (fut == null)
+ fut = new GridFinishedFuture<>(remapTopVer);
+
+ fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
+ @Override public void apply(final IgniteInternalFuture<AffinityTopologyVersion> fut) {
+ cctx.kernalContext().closure().runLocalSafe(new Runnable() {
+ @Override public void run() {
+ mapOnTopology();
+ }
+ });
+ }
+ });
}
/**
@@ -369,7 +422,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
private void updateNear(GridNearAtomicAbstractUpdateRequest req, GridNearAtomicUpdateResponse res) {
assert nearEnabled;
- if (res.remapKeys() != null || !req.hasPrimary())
+ if (res.remapTopologyVersion() != null)
return;
GridNearAtomicCache near = (GridNearAtomicCache)cctx.dht().near();
@@ -380,103 +433,74 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
/** {@inheritDoc} */
@Override protected void mapOnTopology() {
AffinityTopologyVersion topVer;
- GridCacheVersion futVer;
-
- cache.topology().readLock();
- try {
- if (cache.topology().stopping()) {
- onDone(new IgniteCheckedException("Failed to perform cache operation (cache is stopped): " +
- cache.name()));
+ if (cache.topology().stopping()) {
+ onDone(new IgniteCheckedException("Failed to perform cache operation (cache is stopped): " +
+ cache.name()));
- return;
- }
-
- GridDhtTopologyFuture fut = cache.topology().topologyVersionFuture();
+ return;
+ }
- if (fut.isDone()) {
- Throwable err = fut.validateCache(cctx);
+ GridDhtTopologyFuture fut = cache.topology().topologyVersionFuture();
- if (err != null) {
- onDone(err);
+ if (fut.isDone()) {
+ Throwable err = fut.validateCache(cctx);
- return;
- }
-
- topVer = fut.topologyVersion();
-
- futVer = addAtomicFuture(topVer);
- }
- else {
- if (waitTopFut) {
- assert !topLocked : this;
-
- fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
- @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
- cctx.kernalContext().closure().runLocalSafe(new Runnable() {
- @Override public void run() {
- mapOnTopology();
- }
- });
- }
- });
- }
- else
- onDone(new GridCacheTryPutFailedException());
+ if (err != null) {
+ onDone(err);
return;
}
- }
- finally {
- cache.topology().readUnlock();
- }
-
- if (futVer != null)
- map(topVer, futVer);
- }
- /** {@inheritDoc} */
- @Override protected void map(AffinityTopologyVersion topVer, GridCacheVersion futVer) {
- Collection<ClusterNode> topNodes = CU.affinityNodes(cctx, topVer);
-
- if (F.isEmpty(topNodes)) {
- onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all partition nodes " +
- "left the grid)."));
+ topVer = fut.topologyVersion();
+ }
+ else {
+ if (waitTopFut) {
+ assert !topLocked : this;
+
+ fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
+ @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
+ cctx.kernalContext().closure().runLocalSafe(new Runnable() {
+ @Override public void run() {
+ mapOnTopology();
+ }
+ });
+ }
+ });
+ }
+ else
+ onDone(new GridCacheTryPutFailedException());
return;
}
- GridCacheVersion updVer;
-
- // Assign version on near node in CLOCK ordering mode even if fastMap is false.
- if (cctx.config().getAtomicWriteOrderMode() == CLOCK) {
- updVer = this.updVer;
-
- if (updVer == null) {
- updVer = futVer;
+ map(topVer);
+ }
- if (log.isDebugEnabled())
- log.debug("Assigned fast-map version for update on near node: " + updVer);
- }
- }
- else
- updVer = null;
+ /** {@inheritDoc} */
+ @Override protected void map(AffinityTopologyVersion topVer) {
+ long futId = cctx.mvcc().atomicFutureId();
Exception err = null;
- GridNearAtomicAbstractUpdateRequest singleReq0 = null;
+ PrimaryRequestState reqState0 = null;
try {
- singleReq0 = mapSingleUpdate(topVer, futVer, updVer);
+ reqState0 = mapSingleUpdate(topVer, futId);
synchronized (mux) {
- assert this.futVer == futVer || (this.isDone() && this.error() != null);
- assert this.topVer == topVer;
+ assert this.futId == null : this;
+ assert this.topVer == AffinityTopologyVersion.ZERO : this;
+
+ this.topVer = topVer;
+ this.futId = futId;
- this.updVer = updVer;
+ reqState = reqState0;
+ }
- resCnt = 0;
+ if (storeFuture() && !cctx.mvcc().addAtomicFuture(futId, this)) {
+ assert isDone();
- req = singleReq0;
+ return;
}
}
catch (Exception e) {
@@ -490,43 +514,80 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
}
// Optimize mapping for single key.
- mapSingle(singleReq0.nodeId(), singleReq0);
+ sendSingleRequest(reqState0.req.nodeId(), reqState0.req);
+
+ if (syncMode == FULL_ASYNC) {
+ onDone(new GridCacheReturn(cctx, true, true, null, true));
+
+ return;
+ }
+
+ if (reqState0.req.initMappingLocally() && (cctx.discovery().topologyVersion() != topVer.topologyVersion()))
+ checkDhtNodes(futId);
}
/**
- * @return Future version.
+ * @param futId
+ * @return
*/
- private GridCacheVersion onFutureDone() {
- GridCacheVersion ver0;
+ private boolean checkDhtNodes(Long futId) {
+ GridCacheReturn opRes0 = null;
+ CachePartialUpdateCheckedException err0 = null;
+ AffinityTopologyVersion remapTopVer0 = null;
- GridFutureAdapter<Void> fut0;
+ GridNearAtomicCheckUpdateRequest checkReq = null;
synchronized (mux) {
- fut0 = topCompleteFut;
+ if (this.futId == null || !this.futId.equals(futId))
+ return false;
- topCompleteFut = null;
+ assert reqState != null;
- ver0 = futVer;
+ DhtLeftResult res = reqState.checkDhtNodes(cctx);
- futVer = null;
+ if (res == DhtLeftResult.DONE) {
+ opRes0 = opRes;
+ err0 = err;
+ remapTopVer0 = onAllReceived();
+ }
+ else if (res == DhtLeftResult.ALL_RCVD_CHECK_PRIMARY){
+ checkReq = new GridNearAtomicCheckUpdateRequest(reqState.req);
+ }
+ else
+ return true;
}
- if (fut0 != null)
- fut0.onDone();
+ if (checkReq != null)
+ sendCheckUpdateRequest(checkReq);
+ else
+ finishUpdateFuture(opRes0, err0, remapTopVer0);
+
+ return false;
+ }
+
+ /**
+ * @return Future ID.
+ */
+ private Long onFutureDone() {
+ Long id0;
+
+ synchronized (mux) {
+ id0 = futId;
- return ver0;
+ futId = null;
+ }
+
+ return id0;
}
/**
* @param topVer Topology version.
- * @param futVer Future version.
- * @param updVer Update version.
+ * @param futId Future ID.
* @return Request.
* @throws Exception If failed.
*/
- private GridNearAtomicAbstractUpdateRequest mapSingleUpdate(AffinityTopologyVersion topVer,
- GridCacheVersion futVer,
- @Nullable GridCacheVersion updVer) throws Exception {
+ private PrimaryRequestState mapSingleUpdate(AffinityTopologyVersion topVer, long futId)
+ throws Exception {
if (key == null)
throw new NullPointerException("Null key.");
@@ -542,22 +603,27 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
else
val = EntryProcessorResourceInjectorProxy.wrap(cctx.kernalContext(), (EntryProcessor)val);
- ClusterNode primary = cctx.affinity().primaryByKey(cacheKey, topVer);
+ boolean mappingKnown = cctx.topology().rebalanceFinished(topVer) &&
+ !cctx.discovery().hasNearCache(cctx.cacheId(), topVer);
+
+ List<ClusterNode> nodes = cctx.affinity().nodesByKey(cacheKey, topVer);
- if (primary == null)
+ if (F.isEmpty(nodes))
throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all partition nodes " +
"left the grid).");
+ ClusterNode primary = nodes.get(0);
+
+ boolean needPrimaryRes = !mappingKnown || primary.isLocal() || nodes.size() == 1;
+
GridNearAtomicAbstractUpdateRequest req;
- if (canUseSingleRequest(primary)) {
+ if (canUseSingleRequest()) {
if (op == TRANSFORM) {
req = new GridNearAtomicSingleUpdateInvokeRequest(
cctx.cacheId(),
primary.id(),
- futVer,
- false,
- updVer,
+ futId,
topVer,
topLocked,
syncMode,
@@ -566,9 +632,9 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
invokeArgs,
subjId,
taskNameHash,
+ needPrimaryRes,
skipStore,
keepBinary,
- cctx.kernalContext().clientNode(),
cctx.deploymentEnabled());
}
else {
@@ -576,9 +642,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
req = new GridNearAtomicSingleUpdateRequest(
cctx.cacheId(),
primary.id(),
- futVer,
- false,
- updVer,
+ futId,
topVer,
topLocked,
syncMode,
@@ -586,18 +650,16 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
retval,
subjId,
taskNameHash,
+ needPrimaryRes,
skipStore,
keepBinary,
- cctx.kernalContext().clientNode(),
cctx.deploymentEnabled());
}
else {
req = new GridNearAtomicSingleUpdateFilterRequest(
cctx.cacheId(),
primary.id(),
- futVer,
- false,
- updVer,
+ futId,
topVer,
topLocked,
syncMode,
@@ -606,9 +668,9 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
filter,
subjId,
taskNameHash,
+ needPrimaryRes,
skipStore,
keepBinary,
- cctx.kernalContext().clientNode(),
cctx.deploymentEnabled());
}
}
@@ -617,9 +679,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
req = new GridNearAtomicFullUpdateRequest(
cctx.cacheId(),
primary.id(),
- futVer,
- false,
- updVer,
+ futId,
topVer,
topLocked,
syncMode,
@@ -630,9 +690,9 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
filter,
subjId,
taskNameHash,
+ needPrimaryRes,
skipStore,
keepBinary,
- cctx.kernalContext().clientNode(),
cctx.deploymentEnabled(),
1);
}
@@ -641,18 +701,39 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
val,
CU.TTL_NOT_CHANGED,
CU.EXPIRE_TIME_CALCULATE,
- null,
- true);
+ null);
+
+ return new PrimaryRequestState(req, nodes, true);
+ }
+
+ /**
+ * @param opRes Operation result.
+ * @param err Operation error.
+ * @param remapTopVer Not-null topology version if need remap update.
+ */
+ private void finishUpdateFuture(GridCacheReturn opRes,
+ CachePartialUpdateCheckedException err,
+ @Nullable AffinityTopologyVersion remapTopVer) {
+ if (remapTopVer != null) {
+ waitAndRemap(remapTopVer);
+
+ return;
+ }
+
+ if (nearEnabled) {
+ assert reqState.req.response() != null;
+
+ updateNear(reqState.req, reqState.req.response());
+ }
- return req;
+ onDone(opRes, err);
}
/**
- * @param node Target node
* @return {@code True} can use 'single' update requests.
*/
- private boolean canUseSingleRequest(ClusterNode node) {
- return expiryPlc == null && node != null && node.version().compareToIgnoreTimestamp(SINGLE_UPDATE_REQUEST) >= 0;
+ private boolean canUseSingleRequest() {
+ return expiryPlc == null;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateInvokeRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateInvokeRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateInvokeRequest.java
index 6582063..f8b3984 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateInvokeRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateInvokeRequest.java
@@ -76,9 +76,7 @@ public class GridNearAtomicSingleUpdateInvokeRequest extends GridNearAtomicSingl
*
* @param cacheId Cache ID.
* @param nodeId Node ID.
- * @param futVer Future version.
- * @param fastMap Fast map scheme flag.
- * @param updateVer Update version set if fast map is performed.
+ * @param futId Future ID.
* @param topVer Topology version.
* @param topLocked Topology locked flag.
* @param syncMode Synchronization mode.
@@ -89,15 +87,12 @@ public class GridNearAtomicSingleUpdateInvokeRequest extends GridNearAtomicSingl
* @param taskNameHash Task name hash code.
* @param skipStore Skip write-through to a persistent storage.
* @param keepBinary Keep binary flag.
- * @param clientReq Client node request flag.
* @param addDepInfo Deployment info flag.
*/
GridNearAtomicSingleUpdateInvokeRequest(
int cacheId,
UUID nodeId,
- GridCacheVersion futVer,
- boolean fastMap,
- @Nullable GridCacheVersion updateVer,
+ long futId,
@NotNull AffinityTopologyVersion topVer,
boolean topLocked,
CacheWriteSynchronizationMode syncMode,
@@ -106,17 +101,15 @@ public class GridNearAtomicSingleUpdateInvokeRequest extends GridNearAtomicSingl
@Nullable Object[] invokeArgs,
@Nullable UUID subjId,
int taskNameHash,
+ boolean needPrimaryRes,
boolean skipStore,
boolean keepBinary,
- boolean clientReq,
boolean addDepInfo
) {
super(
cacheId,
nodeId,
- futVer,
- fastMap,
- updateVer,
+ futId,
topVer,
topLocked,
syncMode,
@@ -124,14 +117,15 @@ public class GridNearAtomicSingleUpdateInvokeRequest extends GridNearAtomicSingl
retval,
subjId,
taskNameHash,
+ needPrimaryRes,
skipStore,
keepBinary,
- clientReq,
addDepInfo
);
- this.invokeArgs = invokeArgs;
assert op == TRANSFORM : op;
+
+ this.invokeArgs = invokeArgs;
}
/**
@@ -140,14 +134,12 @@ public class GridNearAtomicSingleUpdateInvokeRequest extends GridNearAtomicSingl
* @param conflictTtl Conflict TTL (optional).
* @param conflictExpireTime Conflict expire time (optional).
* @param conflictVer Conflict version (optional).
- * @param primary If given key is primary on this mapping.
*/
@Override public void addUpdateEntry(KeyCacheObject key,
@Nullable Object val,
long conflictTtl,
long conflictExpireTime,
- @Nullable GridCacheVersion conflictVer,
- boolean primary) {
+ @Nullable GridCacheVersion conflictVer) {
assert conflictTtl < 0 : conflictTtl;
assert conflictExpireTime < 0 : conflictExpireTime;
assert conflictVer == null : conflictVer;
@@ -156,9 +148,6 @@ public class GridNearAtomicSingleUpdateInvokeRequest extends GridNearAtomicSingl
entryProcessor = (EntryProcessor<Object, Object, Object>)val;
this.key = key;
- partId = key.partition();
-
- hasPrimary(hasPrimary() | primary);
}
/** {@inheritDoc} */
@@ -246,13 +235,13 @@ public class GridNearAtomicSingleUpdateInvokeRequest extends GridNearAtomicSingl
}
switch (writer.state()) {
- case 14:
+ case 12:
if (!writer.writeByteArray("entryProcessorBytes", entryProcessorBytes))
return false;
writer.incrementState();
- case 15:
+ case 13:
if (!writer.writeObjectArray("invokeArgsBytes", invokeArgsBytes, MessageCollectionItemType.BYTE_ARR))
return false;
@@ -274,7 +263,7 @@ public class GridNearAtomicSingleUpdateInvokeRequest extends GridNearAtomicSingl
return false;
switch (reader.state()) {
- case 14:
+ case 12:
entryProcessorBytes = reader.readByteArray("entryProcessorBytes");
if (!reader.isLastRead())
@@ -282,7 +271,7 @@ public class GridNearAtomicSingleUpdateInvokeRequest extends GridNearAtomicSingl
reader.incrementState();
- case 15:
+ case 13:
invokeArgsBytes = reader.readObjectArray("invokeArgsBytes", MessageCollectionItemType.BYTE_ARR, byte[].class);
if (!reader.isLastRead())
@@ -297,7 +286,7 @@ public class GridNearAtomicSingleUpdateInvokeRequest extends GridNearAtomicSingl
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 16;
+ return 14;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
index c3e9fbe..b9a1fc6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
@@ -57,9 +57,6 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractSin
/** Value to update. */
protected CacheObject val;
- /** Partition of key. */
- protected int partId;
-
/**
* Empty constructor required by {@link Externalizable}.
*/
@@ -72,9 +69,7 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractSin
*
* @param cacheId Cache ID.
* @param nodeId Node ID.
- * @param futVer Future version.
- * @param fastMap Fast map scheme flag.
- * @param updateVer Update version set if fast map is performed.
+ * @param futId Future ID.
* @param topVer Topology version.
* @param topLocked Topology locked flag.
* @param syncMode Synchronization mode.
@@ -84,15 +79,12 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractSin
* @param taskNameHash Task name hash code.
* @param skipStore Skip write-through to a persistent storage.
* @param keepBinary Keep binary flag.
- * @param clientReq Client node request flag.
* @param addDepInfo Deployment info flag.
*/
GridNearAtomicSingleUpdateRequest(
int cacheId,
UUID nodeId,
- GridCacheVersion futVer,
- boolean fastMap,
- @Nullable GridCacheVersion updateVer,
+ long futId,
@NotNull AffinityTopologyVersion topVer,
boolean topLocked,
CacheWriteSynchronizationMode syncMode,
@@ -100,17 +92,14 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractSin
boolean retval,
@Nullable UUID subjId,
int taskNameHash,
+ boolean needPrimaryRes,
boolean skipStore,
boolean keepBinary,
- boolean clientReq,
boolean addDepInfo
) {
- super(
- cacheId,
+ super(cacheId,
nodeId,
- futVer,
- fastMap,
- updateVer,
+ futId,
topVer,
topLocked,
syncMode,
@@ -118,16 +107,17 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractSin
retval,
subjId,
taskNameHash,
+ needPrimaryRes,
skipStore,
keepBinary,
- clientReq,
- addDepInfo
- );
+ addDepInfo);
}
/** {@inheritDoc} */
@Override public int partition() {
- return partId;
+ assert key != null;
+
+ return key.partition();
}
/**
@@ -136,14 +126,12 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractSin
* @param conflictTtl Conflict TTL (optional).
* @param conflictExpireTime Conflict expire time (optional).
* @param conflictVer Conflict version (optional).
- * @param primary If given key is primary on this mapping.
*/
@Override public void addUpdateEntry(KeyCacheObject key,
@Nullable Object val,
long conflictTtl,
long conflictExpireTime,
- @Nullable GridCacheVersion conflictVer,
- boolean primary) {
+ @Nullable GridCacheVersion conflictVer) {
assert op != TRANSFORM;
assert val != null || op == DELETE;
assert conflictTtl < 0 : conflictTtl;
@@ -151,19 +139,18 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractSin
assert conflictVer == null : conflictVer;
this.key = key;
- partId = key.partition();
if (val != null) {
assert val instanceof CacheObject : val;
this.val = (CacheObject)val;
}
-
- hasPrimary(hasPrimary() | primary);
}
/** {@inheritDoc} */
@Override public int size() {
+ assert key != null;
+
return key == null ? 0 : 1;
}
@@ -253,8 +240,6 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractSin
if (val != null)
val.finishUnmarshal(cctx.cacheObjectContext(), ldr);
-
- key.partition(partId);
}
/** {@inheritDoc} */
@@ -272,19 +257,13 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractSin
}
switch (writer.state()) {
- case 11:
+ case 10:
if (!writer.writeMessage("key", key))
return false;
writer.incrementState();
- case 12:
- if (!writer.writeInt("partId", partId))
- return false;
-
- writer.incrementState();
-
- case 13:
+ case 11:
if (!writer.writeMessage("val", val))
return false;
@@ -306,7 +285,7 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractSin
return false;
switch (reader.state()) {
- case 11:
+ case 10:
key = reader.readMessage("key");
if (!reader.isLastRead())
@@ -314,15 +293,7 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractSin
reader.incrementState();
- case 12:
- partId = reader.readInt("partId");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 13:
+ case 11:
val = reader.readMessage("val");
if (!reader.isLastRead())
@@ -350,7 +321,7 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractSin
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 14;
+ return 12;
}
/** {@inheritDoc} */