You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2017/03/14 15:00:22 UTC
[28/40] ignite git commit: ignite-4705 Atomic cache protocol change:
notify client node from backups
http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index f182ecb..a44ccf9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -36,7 +36,6 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
import org.apache.ignite.internal.processors.cache.CachePartialUpdateCheckedException;
import org.apache.ignite.internal.processors.cache.EntryProcessorResourceInjectorProxy;
-import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheOperation;
import org.apache.ignite.internal.processors.cache.GridCacheReturn;
@@ -47,10 +46,8 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearAtom
import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
-import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.CI1;
-import org.apache.ignite.internal.util.typedef.CI2;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.CU;
@@ -58,18 +55,14 @@ import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.jetbrains.annotations.Nullable;
-import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.CLOCK;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC;
-import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
/**
* DHT atomic cache near update future.
*/
public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFuture {
- /** Fast map flag. */
- private final boolean fastMap;
-
/** Keys */
private Collection<?> keys;
@@ -87,13 +80,18 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
/** Mappings if operations is mapped to more than one node. */
@GridToStringInclude
- private Map<UUID, GridNearAtomicFullUpdateRequest> mappings;
+ private Map<UUID, PrimaryRequestState> mappings;
/** Keys to remap. */
+ @GridToStringInclude
private Collection<KeyCacheObject> remapKeys;
/** Not null is operation is mapped to single node. */
- private GridNearAtomicFullUpdateRequest singleReq;
+ @GridToStringInclude
+ private PrimaryRequestState singleReq;
+
+ /** */
+ private int resCnt;
/**
* @param cctx Cache context.
@@ -149,84 +147,124 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
this.vals = vals;
this.conflictPutVals = conflictPutVals;
this.conflictRmvVals = conflictRmvVals;
-
- fastMap = cache.isFastMap(filter, op);
}
/** {@inheritDoc} */
- @Override public GridCacheVersion version() {
+ @Override public Long id() {
synchronized (mux) {
- return futVer;
+ return futId;
}
}
/** {@inheritDoc} */
@Override public boolean onNodeLeft(UUID nodeId) {
- GridNearAtomicUpdateResponse res = null;
+ GridCacheReturn opRes0 = null;
+ CachePartialUpdateCheckedException err0 = null;
+ AffinityTopologyVersion remapTopVer0 = null;
+
+ boolean rcvAll = false;
- GridNearAtomicFullUpdateRequest req;
+ List<GridNearAtomicCheckUpdateRequest> checkReqs = null;
synchronized (mux) {
- if (singleReq != null)
- req = singleReq.nodeId().equals(nodeId) ? singleReq : null;
- else
- req = mappings != null ? mappings.get(nodeId) : null;
+ if (futId == null)
+ return false;
- if (req != null && req.response() == null) {
- res = new GridNearAtomicUpdateResponse(cctx.cacheId(),
- nodeId,
- req.futureVersion(),
- cctx.deploymentEnabled());
+ if (singleReq != null) {
+ if (singleReq.req.nodeId.equals(nodeId)) {
+ GridNearAtomicAbstractUpdateRequest req = singleReq.onPrimaryFail();
- ClusterTopologyCheckedException e = new ClusterTopologyCheckedException("Primary node left grid " +
- "before response is received: " + nodeId);
+ if (req != null) {
+ rcvAll = true;
- e.retryReadyFuture(cctx.shared().nextAffinityReadyFuture(req.topologyVersion()));
+ GridNearAtomicUpdateResponse res = primaryFailedResponse(req);
- res.addFailedKeys(req.keys(), e);
- }
- }
+ singleReq.onPrimaryResponse(res, cctx);
+
+ onPrimaryError(req, res);
+ }
+ }
+ else {
+ DhtLeftResult res = singleReq.onDhtNodeLeft(nodeId);
+
+ if (res == DhtLeftResult.DONE)
+ rcvAll = true;
+ else if (res == DhtLeftResult.ALL_RCVD_CHECK_PRIMARY)
+ checkReqs = Collections.singletonList(new GridNearAtomicCheckUpdateRequest(singleReq.req));
+ }
- if (res != null) {
- if (msgLog.isDebugEnabled()) {
- msgLog.debug("Near update fut, node left [futId=" + req.futureVersion() +
- ", writeVer=" + req.updateVersion() +
- ", node=" + nodeId + ']');
+ if (rcvAll) {
+ opRes0 = opRes;
+ err0 = err;
+ remapTopVer0 = onAllReceived();
+ }
}
+ else {
+ if (mappings == null)
+ return false;
- onResult(nodeId, res, true);
- }
+ for (Map.Entry<UUID, PrimaryRequestState> e : mappings.entrySet()) {
+ assert e.getKey().equals(e.getValue().req.nodeId());
- return false;
- }
+ PrimaryRequestState reqState = e.getValue();
- /** {@inheritDoc} */
- @Override public IgniteInternalFuture<Void> completeFuture(AffinityTopologyVersion topVer) {
- // Wait fast-map near atomic update futures in CLOCK mode.
- if (fastMap) {
- GridFutureAdapter<Void> fut;
+ boolean reqDone = false;
- synchronized (mux) {
- if (this.topVer != AffinityTopologyVersion.ZERO && this.topVer.compareTo(topVer) < 0) {
- if (topCompleteFut == null)
- topCompleteFut = new GridFutureAdapter<>();
+ if (e.getKey().equals(nodeId)) {
+ GridNearAtomicAbstractUpdateRequest req = reqState.onPrimaryFail();
- fut = topCompleteFut;
- }
- else
- fut = null;
- }
+ if (req != null) {
+ reqDone = true;
+
+ GridNearAtomicUpdateResponse res = primaryFailedResponse(req);
+
+ reqState.onPrimaryResponse(res, cctx);
+
+ onPrimaryError(req, res);
+ }
+ }
+ else {
+ DhtLeftResult res = reqState.onDhtNodeLeft(nodeId);
+
+ if (res == DhtLeftResult.DONE)
+ reqDone = true;
+ else if (res == DhtLeftResult.ALL_RCVD_CHECK_PRIMARY) {
+ if (checkReqs == null)
+ checkReqs = new ArrayList<>();
- if (fut != null && isDone()) {
- fut.onDone();
+ checkReqs.add(new GridNearAtomicCheckUpdateRequest(reqState.req));
+ }
+ }
+
+ if (reqDone) {
+ assert mappings.size() > resCnt : "[mappings=" + mappings.size() + ", cnt=" + resCnt + ']';
+
+ resCnt++;
+
+ if (mappings.size() == resCnt) {
+ rcvAll = true;
- return null;
+ opRes0 = opRes;
+ err0 = err;
+ remapTopVer0 = onAllReceived();
+
+ break;
+ }
+ }
+ }
}
+ }
- return fut;
+ if (checkReqs != null) {
+ assert !rcvAll;
+
+ for (int i = 0; i < checkReqs.size(); i++)
+ sendCheckUpdateRequest(checkReqs.get(i));
}
+ else if (rcvAll)
+ finishUpdateFuture(opRes0, err0, remapTopVer0);
- return null;
+ return false;
}
/** {@inheritDoc} */
@@ -244,10 +282,10 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
retval = Collections.emptyMap();
if (super.onDone(retval, err)) {
- GridCacheVersion futVer = onFutureDone();
+ Long futId = onFutureDone();
- if (futVer != null)
- cctx.mvcc().removeAtomicFuture(futVer);
+ if (futId != null)
+ cctx.mvcc().removeAtomicFuture(futId);
return true;
}
@@ -256,145 +294,166 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
}
/** {@inheritDoc} */
- @SuppressWarnings({"unchecked", "ThrowableResultOfMethodCallIgnored"})
- @Override public void onResult(UUID nodeId, GridNearAtomicUpdateResponse res, boolean nodeErr) {
- GridNearAtomicFullUpdateRequest req;
-
- AffinityTopologyVersion remapTopVer = null;
-
- GridCacheReturn opRes0 = null;
- CachePartialUpdateCheckedException err0 = null;
-
- boolean rcvAll;
-
- GridFutureAdapter<?> fut0 = null;
+ @Override public void onDhtResponse(UUID nodeId, GridDhtAtomicNearResponse res) {
+ GridCacheReturn opRes0;
+ CachePartialUpdateCheckedException err0;
+ AffinityTopologyVersion remapTopVer0;
synchronized (mux) {
- if (!res.futureVersion().equals(futVer))
+ if (futId == null || futId != res.futureId())
return;
- if (singleReq != null) {
- if (!singleReq.nodeId().equals(nodeId))
- return;
+ PrimaryRequestState reqState;
- req = singleReq;
+ if (singleReq != null) {
+ assert singleReq.req.nodeId().equals(res.primaryId());
- singleReq = null;
+ if (opRes == null && res.hasResult())
+ opRes = res.result();
- rcvAll = true;
+ if (singleReq.onDhtResponse(nodeId, res)) {
+ opRes0 = opRes;
+ err0 = err;
+ remapTopVer0 = onAllReceived();
+ }
+ else
+ return;
}
else {
- req = mappings != null ? mappings.get(nodeId) : null;
+ reqState = mappings != null ? mappings.get(res.primaryId()) : null;
- if (req != null && req.onResponse(res)) {
- resCnt++;
+ if (reqState != null) {
+ if (opRes == null && res.hasResult())
+ opRes = res.result();
- rcvAll = mappings.size() == resCnt;
+ if (reqState.onDhtResponse(nodeId, res)) {
+ assert mappings.size() > resCnt : "[mappings=" + mappings.size() + ", cnt=" + resCnt + ']';
+
+ resCnt++;
+
+ if (mappings.size() == resCnt) {
+ opRes0 = opRes;
+ err0 = err;
+ remapTopVer0 = onAllReceived();
+ }
+ else
+ return;
+ }
+ else
+ return;
}
else
return;
}
+ }
- assert req != null && req.topologyVersion().equals(topVer) : req;
+ UpdateErrors errors = res.errors();
- if (res.remapKeys() != null) {
- assert !fastMap || cctx.kernalContext().clientNode();
+ if (errors != null) {
+ assert errors.error() != null;
- if (remapKeys == null)
- remapKeys = U.newHashSet(res.remapKeys().size());
+ onDone(errors.error());
+
+ return;
+ }
- remapKeys.addAll(res.remapKeys());
+ finishUpdateFuture(opRes0, err0, remapTopVer0);
+ }
- if (mapErrTopVer == null || mapErrTopVer.compareTo(req.topologyVersion()) < 0)
- mapErrTopVer = req.topologyVersion();
- }
- else if (res.error() != null) {
- if (res.failedKeys() != null) {
- if (err == null)
- err = new CachePartialUpdateCheckedException(
- "Failed to update keys (retry update if possible).");
+ /** {@inheritDoc} */
+ @SuppressWarnings({"unchecked", "ThrowableResultOfMethodCallIgnored"})
+ @Override public void onPrimaryResponse(UUID nodeId, GridNearAtomicUpdateResponse res, boolean nodeErr) {
+ GridNearAtomicAbstractUpdateRequest req;
- Collection<Object> keys = new ArrayList<>(res.failedKeys().size());
+ AffinityTopologyVersion remapTopVer0 = null;
- for (KeyCacheObject key : res.failedKeys())
- keys.add(cctx.cacheObjectContext().unwrapBinaryIfNeeded(key, keepBinary, false));
+ GridCacheReturn opRes0 = null;
+ CachePartialUpdateCheckedException err0 = null;
- err.add(keys, res.error(), req.topologyVersion());
- }
+ boolean rcvAll;
+
+ synchronized (mux) {
+ if (futId == null || futId != res.futureId())
+ return;
+
+ if (singleReq != null) {
+ req = singleReq.processPrimaryResponse(nodeId, res);
+
+ if (req == null)
+ return;
+
+ rcvAll = singleReq.onPrimaryResponse(res, cctx);
}
else {
- if (!req.fastMap() || req.hasPrimary()) {
- GridCacheReturn ret = res.returnValue();
-
- if (op == TRANSFORM) {
- if (ret != null) {
- assert ret.value() == null || ret.value() instanceof Map : ret.value();
-
- if (ret.value() != null) {
- if (opRes != null)
- opRes.mergeEntryProcessResults(ret);
- else
- opRes = ret;
- }
- }
- }
- else
- opRes = ret;
- }
- }
+ if (mappings == null)
+ return;
- if (rcvAll) {
- if (remapKeys != null) {
- assert mapErrTopVer != null;
+ PrimaryRequestState reqState = mappings.get(nodeId);
- remapTopVer = cctx.shared().exchange().topologyVersion();
- }
- else {
- if (err != null &&
- X.hasCause(err, CachePartialUpdateCheckedException.class) &&
- X.hasCause(err, ClusterTopologyCheckedException.class) &&
- storeFuture() &&
- --remapCnt > 0) {
- ClusterTopologyCheckedException topErr =
- X.cause(err, ClusterTopologyCheckedException.class);
+ if (reqState == null)
+ return;
- if (!(topErr instanceof ClusterTopologyServerNotFoundException)) {
- CachePartialUpdateCheckedException cause =
- X.cause(err, CachePartialUpdateCheckedException.class);
+ req = reqState.processPrimaryResponse(nodeId, res);
+
+ if (req != null) {
+ if (reqState.onPrimaryResponse(res, cctx)) {
+ assert mappings.size() > resCnt : "[mappings=" + mappings.size() + ", cnt=" + resCnt + ']';
+
+ resCnt++;
+
+ rcvAll = mappings.size() == resCnt;
+ }
+ else {
+ assert mappings.size() > resCnt : "[mappings=" + mappings.size() + ", cnt=" + resCnt + ']';
+
+ rcvAll = false;
+ }
+ }
+ else
+ return;
+ }
- assert cause != null && cause.topologyVersion() != null : err;
+ assert req.topologyVersion().equals(topVer) : req;
- remapTopVer =
- new AffinityTopologyVersion(cause.topologyVersion().topologyVersion() + 1);
+ if (res.remapTopologyVersion() != null) {
+ assert !req.topologyVersion().equals(res.remapTopologyVersion());
- err = null;
+ if (remapKeys == null)
+ remapKeys = U.newHashSet(req.size());
- Collection<Object> failedKeys = cause.failedKeys();
+ remapKeys.addAll(req.keys());
- remapKeys = new ArrayList<>(failedKeys.size());
+ if (remapTopVer == null || remapTopVer.compareTo(res.remapTopologyVersion()) < 0)
+ remapTopVer = req.topologyVersion();
+ }
+ else if (res.error() != null)
+ onPrimaryError(req, res);
+ else {
+ GridCacheReturn ret = res.returnValue();
- for (Object key : failedKeys)
- remapKeys.add(cctx.toCacheKeyObject(key));
+ if (op == TRANSFORM) {
+ if (ret != null) {
+ assert ret.value() == null || ret.value() instanceof Map : ret.value();
- updVer = null;
+ if (ret.value() != null) {
+ if (opRes != null)
+ opRes.mergeEntryProcessResults(ret);
+ else
+ opRes = ret;
}
}
}
+ else
+ opRes = ret;
+ }
+
+ if (rcvAll) {
+ remapTopVer0 = onAllReceived();
- if (remapTopVer == null) {
+ if (remapTopVer0 == null) {
err0 = err;
opRes0 = opRes;
}
- else {
- fut0 = topCompleteFut;
-
- topCompleteFut = null;
-
- cctx.mvcc().removeAtomicFuture(futVer);
-
- futVer = null;
- topVer = AffinityTopologyVersion.ZERO;
- }
}
}
@@ -406,67 +465,160 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
if (rcvAll && nearEnabled) {
if (mappings != null) {
- for (GridNearAtomicFullUpdateRequest req0 : mappings.values()) {
- GridNearAtomicUpdateResponse res0 = req0.response();
+ for (PrimaryRequestState reqState : mappings.values()) {
+ GridNearAtomicUpdateResponse res0 = reqState.req.response();
- assert res0 != null : req0;
+ assert res0 != null : reqState;
- updateNear(req0, res0);
+ updateNear(reqState.req, res0);
}
}
else if (!nodeErr)
updateNear(req, res);
}
- if (remapTopVer != null) {
- if (fut0 != null)
- fut0.onDone();
+ if (remapTopVer0 != null) {
+ waitAndRemap(remapTopVer0);
- if (!waitTopFut) {
- onDone(new GridCacheTryPutFailedException());
+ return;
+ }
- return;
+ if (rcvAll)
+ onDone(opRes0, err0);
+ }
+
+ private void waitAndRemap(AffinityTopologyVersion remapTopVer) {
+ assert remapTopVer != null;
+
+ if (!waitTopFut) {
+ onDone(new GridCacheTryPutFailedException());
+
+ return;
+ }
+
+ if (topLocked) {
+ assert !F.isEmpty(remapKeys) : remapKeys;
+
+ CachePartialUpdateCheckedException e =
+ new CachePartialUpdateCheckedException("Failed to update keys (retry update if possible).");
+
+ ClusterTopologyCheckedException cause = new ClusterTopologyCheckedException(
+ "Failed to update keys, topology changed while execute atomic update inside transaction.");
+
+ cause.retryReadyFuture(cctx.affinity().affinityReadyFuture(remapTopVer));
+
+ e.add(remapKeys, cause);
+
+ onDone(e);
+
+ return;
+ }
+
+ IgniteInternalFuture<AffinityTopologyVersion> fut = cctx.shared().exchange().affinityReadyFuture(remapTopVer);
+
+ if (fut == null)
+ fut = new GridFinishedFuture<>(remapTopVer);
+
+ fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
+ @Override public void apply(final IgniteInternalFuture<AffinityTopologyVersion> fut) {
+ cctx.kernalContext().closure().runLocalSafe(new Runnable() {
+ @Override public void run() {
+ mapOnTopology();
+ }
+ });
}
+ });
+ }
+
+ /**
+ * @return Non null topology version if update should be remapped.
+ */
+ @Nullable private AffinityTopologyVersion onAllReceived() {
+ assert futId != null;
- if (topLocked) {
- assert !F.isEmpty(remapKeys) : remapKeys;
+ AffinityTopologyVersion remapTopVer0 = null;
- CachePartialUpdateCheckedException e =
- new CachePartialUpdateCheckedException("Failed to update keys (retry update if possible).");
+ if (remapKeys != null) {
+ assert remapTopVer != null;
- ClusterTopologyCheckedException cause = new ClusterTopologyCheckedException(
- "Failed to update keys, topology changed while execute atomic update inside transaction.");
+ remapTopVer0 = remapTopVer;
+ }
+ else {
+ if (err != null &&
+ X.hasCause(err, CachePartialUpdateCheckedException.class) &&
+ X.hasCause(err, ClusterTopologyCheckedException.class) &&
+ storeFuture() &&
+ --remapCnt > 0) {
+ ClusterTopologyCheckedException topErr = X.cause(err, ClusterTopologyCheckedException.class);
- cause.retryReadyFuture(cctx.affinity().affinityReadyFuture(remapTopVer));
+ if (!(topErr instanceof ClusterTopologyServerNotFoundException)) {
+ CachePartialUpdateCheckedException cause =
+ X.cause(err, CachePartialUpdateCheckedException.class);
- e.add(remapKeys, cause);
+ assert cause != null && cause.topologyVersion() != null : err;
+ assert remapKeys == null;
+ assert remapTopVer == null;
- onDone(e);
+ remapTopVer = remapTopVer0 =
+ new AffinityTopologyVersion(cause.topologyVersion().topologyVersion() + 1);
- return;
+ err = null;
+
+ Collection<Object> failedKeys = cause.failedKeys();
+
+ remapKeys = new ArrayList<>(failedKeys.size());
+
+ for (Object key : failedKeys)
+ remapKeys.add(cctx.toCacheKeyObject(key));
+ }
}
+ }
- IgniteInternalFuture<AffinityTopologyVersion> fut =
- cctx.shared().exchange().affinityReadyFuture(remapTopVer);
+ if (remapTopVer0 != null) {
+ cctx.mvcc().removeAtomicFuture(futId);
- if (fut == null)
- fut = new GridFinishedFuture<>(remapTopVer);
+ futId = null;
+ topVer = AffinityTopologyVersion.ZERO;
- fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
- @Override public void apply(final IgniteInternalFuture<AffinityTopologyVersion> fut) {
- cctx.kernalContext().closure().runLocalSafe(new Runnable() {
- @Override public void run() {
- mapOnTopology();
- }
- });
+ remapTopVer = null;
+ }
+
+ return remapTopVer0;
+ }
+
+ /**
+ * @param opRes Operation result.
+ * @param err Operation error.
+ */
+ private void finishUpdateFuture(GridCacheReturn opRes,
+ CachePartialUpdateCheckedException err,
+ @Nullable AffinityTopologyVersion remapTopVer) {
+ if (nearEnabled) {
+ if (mappings != null) {
+ for (PrimaryRequestState reqState : mappings.values()) {
+ GridNearAtomicUpdateResponse res0 = reqState.req.response();
+
+ assert res0 != null : reqState;
+
+ updateNear(reqState.req, res0);
}
- });
+ }
+ else {
+ assert singleReq != null && singleReq.req.response() != null;
+
+ updateNear(singleReq.req, singleReq.req.response());
+ }
+ }
+
+ if (remapTopVer != null) {
+ assert !F.isEmpty(remapKeys);
+
+ waitAndRemap(remapTopVer);
return;
}
- if (rcvAll)
- onDone(opRes0, err0);
+ onDone(opRes, err);
}
/**
@@ -475,10 +627,10 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
* @param req Update request.
* @param res Update response.
*/
- private void updateNear(GridNearAtomicFullUpdateRequest req, GridNearAtomicUpdateResponse res) {
+ private void updateNear(GridNearAtomicAbstractUpdateRequest req, GridNearAtomicUpdateResponse res) {
assert nearEnabled;
- if (res.remapKeys() != null || !req.hasPrimary())
+ if (res.remapTopologyVersion() != null)
return;
GridNearAtomicCache near = (GridNearAtomicCache)cctx.dht().near();
@@ -489,59 +641,48 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
/** {@inheritDoc} */
@Override protected void mapOnTopology() {
AffinityTopologyVersion topVer;
- GridCacheVersion futVer;
-
- cache.topology().readLock();
-
- try {
- if (cache.topology().stopping()) {
- onDone(new IgniteCheckedException("Failed to perform cache operation (cache is stopped): " +
- cache.name()));
- return;
- }
+ if (cache.topology().stopping()) {
+ onDone(new IgniteCheckedException("Failed to perform cache operation (cache is stopped): " +
+ cache.name()));
- GridDhtTopologyFuture fut = cache.topology().topologyVersionFuture();
-
- if (fut.isDone()) {
- Throwable err = fut.validateCache(cctx);
+ return;
+ }
- if (err != null) {
- onDone(err);
+ GridDhtTopologyFuture fut = cache.topology().topologyVersionFuture();
- return;
- }
+ if (fut.isDone()) {
+ Throwable err = fut.validateCache(cctx);
- topVer = fut.topologyVersion();
-
- futVer = addAtomicFuture(topVer);
- }
- else {
- if (waitTopFut) {
- assert !topLocked : this;
-
- fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
- @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
- cctx.kernalContext().closure().runLocalSafe(new Runnable() {
- @Override public void run() {
- mapOnTopology();
- }
- });
- }
- });
- }
- else
- onDone(new GridCacheTryPutFailedException());
+ if (err != null) {
+ onDone(err);
return;
}
+
+ topVer = fut.topologyVersion();
}
- finally {
- cache.topology().readUnlock();
+ else {
+ if (waitTopFut) {
+ assert !topLocked : this;
+
+ fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
+ @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
+ cctx.kernalContext().closure().runLocalSafe(new Runnable() {
+ @Override public void run() {
+ mapOnTopology();
+ }
+ });
+ }
+ });
+ }
+ else
+ onDone(new GridCacheTryPutFailedException());
+
+ return;
}
- if (futVer != null)
- map(topVer, futVer, remapKeys);
+ map(topVer, remapKeys);
}
/**
@@ -549,13 +690,15 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
*
* @param mappings Mappings to send.
*/
- private void doUpdate(Map<UUID, GridNearAtomicFullUpdateRequest> mappings) {
+ private void sendUpdateRequests(Map<UUID, PrimaryRequestState> mappings) {
UUID locNodeId = cctx.localNodeId();
- GridNearAtomicFullUpdateRequest locUpdate = null;
+ GridNearAtomicAbstractUpdateRequest locUpdate = null;
// Send messages to remote nodes first, then run local update.
- for (GridNearAtomicFullUpdateRequest req : mappings.values()) {
+ for (PrimaryRequestState reqState : mappings.values()) {
+ GridNearAtomicAbstractUpdateRequest req = reqState.req;
+
if (locNodeId.equals(req.nodeId())) {
assert locUpdate == null : "Cannot have more than one local mapping [locUpdate=" + locUpdate +
", req=" + req + ']';
@@ -564,18 +707,22 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
}
else {
try {
+ if (req.initMappingLocally() && reqState.dhtNodes.isEmpty()) {
+ reqState.dhtNodes = null;
+
+ req.needPrimaryResponse(true);
+ }
+
cctx.io().send(req.nodeId(), req, cctx.ioPolicy());
if (msgLog.isDebugEnabled()) {
- msgLog.debug("Near update fut, sent request [futId=" + req.futureVersion() +
- ", writeVer=" + req.updateVersion() +
+ msgLog.debug("Near update fut, sent request [futId=" + req.futureId() +
", node=" + req.nodeId() + ']');
}
}
catch (IgniteCheckedException e) {
if (msgLog.isDebugEnabled()) {
- msgLog.debug("Near update fut, failed to send request [futId=" + req.futureVersion() +
- ", writeVer=" + req.updateVersion() +
+ msgLog.debug("Near update fut, failed to send request [futId=" + req.futureId() +
", node=" + req.nodeId() +
", err=" + e + ']');
}
@@ -587,9 +734,12 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
if (locUpdate != null) {
cache.updateAllAsyncInternal(cctx.localNodeId(), locUpdate,
- new CI2<GridNearAtomicFullUpdateRequest, GridNearAtomicUpdateResponse>() {
- @Override public void apply(GridNearAtomicFullUpdateRequest req, GridNearAtomicUpdateResponse res) {
- onResult(res.nodeId(), res, false);
+ new GridDhtAtomicCache.UpdateReplyClosure() {
+ @Override public void apply(GridNearAtomicAbstractUpdateRequest req, GridNearAtomicUpdateResponse res) {
+ if (syncMode != FULL_ASYNC)
+ onPrimaryResponse(res.nodeId(), res, false);
+ else if (res.remapTopologyVersion() != null)
+ ((GridDhtAtomicCache)cctx.cache()).remapToNewPrimary(req);
}
});
}
@@ -599,18 +749,15 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
}
/** {@inheritDoc} */
- @Override protected void map(AffinityTopologyVersion topVer, GridCacheVersion futVer) {
- map(topVer, futVer, null);
+ @Override protected void map(AffinityTopologyVersion topVer) {
+ map(topVer, null);
}
/**
* @param topVer Topology version.
- * @param futVer Future ID.
* @param remapKeys Keys to remap.
*/
- void map(AffinityTopologyVersion topVer,
- GridCacheVersion futVer,
- @Nullable Collection<KeyCacheObject> remapKeys) {
+ void map(AffinityTopologyVersion topVer, @Nullable Collection<KeyCacheObject> remapKeys) {
Collection<ClusterNode> topNodes = CU.affinityNodes(cctx, topVer);
if (F.isEmpty(topNodes)) {
@@ -620,64 +767,45 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
return;
}
- GridCacheVersion updVer;
-
- // Assign version on near node in CLOCK ordering mode even if fastMap is false.
- if (cctx.config().getAtomicWriteOrderMode() == CLOCK) {
- updVer = this.updVer;
-
- if (updVer == null) {
- updVer = futVer;
-
- if (log.isDebugEnabled())
- log.debug("Assigned fast-map version for update on near node: " + updVer);
- }
- }
- else
- updVer = null;
+ Long futId = cctx.mvcc().atomicFutureId();
Exception err = null;
- GridNearAtomicFullUpdateRequest singleReq0 = null;
- Map<UUID, GridNearAtomicFullUpdateRequest> mappings0 = null;
+ PrimaryRequestState singleReq0 = null;
+ Map<UUID, PrimaryRequestState> mappings0 = null;
int size = keys.size();
+ boolean mappingKnown = cctx.topology().rebalanceFinished(topVer) &&
+ !cctx.discovery().hasNearCache(cctx.cacheId(), topVer);
+
try {
- if (size == 1 && !fastMap) {
+ if (size == 1) {
assert remapKeys == null || remapKeys.size() == 1;
- singleReq0 = mapSingleUpdate(topVer, futVer, updVer);
+ singleReq0 = mapSingleUpdate(topVer, futId, mappingKnown);
}
else {
- Map<UUID, GridNearAtomicFullUpdateRequest> pendingMappings = mapUpdate(topNodes,
+ Map<UUID, PrimaryRequestState> pendingMappings = mapUpdate(topNodes,
topVer,
- futVer,
- updVer,
- remapKeys);
+ futId,
+ remapKeys,
+ mappingKnown);
if (pendingMappings.size() == 1)
singleReq0 = F.firstValue(pendingMappings);
else {
- if (syncMode == PRIMARY_SYNC) {
- mappings0 = U.newHashMap(pendingMappings.size());
-
- for (GridNearAtomicFullUpdateRequest req : pendingMappings.values()) {
- if (req.hasPrimary())
- mappings0.put(req.nodeId(), req);
- }
- }
- else
- mappings0 = pendingMappings;
+ mappings0 = pendingMappings;
assert !mappings0.isEmpty() || size == 0 : this;
}
}
synchronized (mux) {
- assert this.futVer == futVer || (this.isDone() && this.error() != null);
- assert this.topVer == topVer;
+ assert this.futId == null : this;
+ assert this.topVer == AffinityTopologyVersion.ZERO : this;
- this.updVer = updVer;
+ this.topVer = topVer;
+ this.futId = futId;
resCnt = 0;
@@ -686,6 +814,12 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
this.remapKeys = null;
}
+
+ if (storeFuture() && !cctx.mvcc().addAtomicFuture(futId, this)) {
+ assert isDone();
+
+ return;
+ }
}
catch (Exception e) {
err = e;
@@ -699,56 +833,133 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
// Optimize mapping for single key.
if (singleReq0 != null)
- mapSingle(singleReq0.nodeId(), singleReq0);
+ sendSingleRequest(singleReq0.req.nodeId(), singleReq0.req);
else {
assert mappings0 != null;
- if (size == 0)
+ if (size == 0) {
onDone(new GridCacheReturn(cctx, true, true, null, true));
+
+ return;
+ }
else
- doUpdate(mappings0);
+ sendUpdateRequests(mappings0);
}
+
+ if (syncMode == FULL_ASYNC) {
+ onDone(new GridCacheReturn(cctx, true, true, null, true));
+
+ return;
+ }
+
+ if (mappingKnown && syncMode == FULL_SYNC && cctx.discovery().topologyVersion() != topVer.topologyVersion())
+ checkDhtNodes(futId);
}
- /**
- * @return Future version.
- */
- private GridCacheVersion onFutureDone() {
- GridCacheVersion ver0;
+ private void checkDhtNodes(Long futId) {
+ GridCacheReturn opRes0 = null;
+ CachePartialUpdateCheckedException err0 = null;
+ AffinityTopologyVersion remapTopVer0 = null;
+
+ List<GridNearAtomicCheckUpdateRequest> checkReqs = null;
- GridFutureAdapter<Void> fut0;
+ boolean rcvAll = false;
synchronized (mux) {
- fut0 = topCompleteFut;
+ if (this.futId == null || !this.futId.equals(futId))
+ return;
+
+ if (singleReq != null) {
+ if (!singleReq.req.initMappingLocally())
+ return;
+
+ DhtLeftResult res = singleReq.checkDhtNodes(cctx);
+
+ if (res == DhtLeftResult.DONE) {
+ opRes0 = opRes;
+ err0 = err;
+ remapTopVer0 = onAllReceived();
+ }
+ else if (res == DhtLeftResult.ALL_RCVD_CHECK_PRIMARY)
+ checkReqs = Collections.singletonList(new GridNearAtomicCheckUpdateRequest(singleReq.req));
+ else
+ return;
+ }
+ else {
+ if (mappings != null) {
+ for (PrimaryRequestState reqState : mappings.values()) {
+ if (!reqState.req.initMappingLocally())
+ continue;
+
+ DhtLeftResult res = reqState.checkDhtNodes(cctx);
+
+ if (res == DhtLeftResult.DONE) {
+ assert mappings.size() > resCnt : "[mappings=" + mappings.size() + ", cnt=" + resCnt + ']';
+
+ resCnt++;
- topCompleteFut = null;
+ if (mappings.size() == resCnt) {
+ rcvAll = true;
- ver0 = futVer;
+ opRes0 = opRes;
+ err0 = err;
+ remapTopVer0 = onAllReceived();
- futVer = null;
+ break;
+ }
+ }
+ else if (res == DhtLeftResult.ALL_RCVD_CHECK_PRIMARY) {
+ if (checkReqs == null)
+ checkReqs = new ArrayList<>(mappings.size());
+
+ checkReqs.add(new GridNearAtomicCheckUpdateRequest(reqState.req));
+ }
+ }
+ }
+ else
+ return;
+ }
}
- if (fut0 != null)
- fut0.onDone();
+ if (checkReqs != null) {
+ assert !rcvAll;
+
+ for (int i = 0; i < checkReqs.size(); i++)
+ sendCheckUpdateRequest(checkReqs.get(i));
+ }
+ else if (rcvAll)
+ finishUpdateFuture(opRes0, err0, remapTopVer0);
+ }
+
+ /**
+ * @return Future version.
+ */
+ private Long onFutureDone() {
+ Long id0;
+
+ synchronized (mux) {
+ id0 = futId;
+
+ futId = null;
+ }
- return ver0;
+ return id0;
}
/**
* @param topNodes Cache nodes.
* @param topVer Topology version.
- * @param futVer Future version.
- * @param updVer Update version.
+ * @param futId Future ID.
* @param remapKeys Keys to remap.
* @return Mapping.
* @throws Exception If failed.
*/
@SuppressWarnings("ForLoopReplaceableByForEach")
- private Map<UUID, GridNearAtomicFullUpdateRequest> mapUpdate(Collection<ClusterNode> topNodes,
+ private Map<UUID, PrimaryRequestState> mapUpdate(Collection<ClusterNode> topNodes,
AffinityTopologyVersion topVer,
- GridCacheVersion futVer,
- @Nullable GridCacheVersion updVer,
- @Nullable Collection<KeyCacheObject> remapKeys) throws Exception {
+ Long futId,
+ @Nullable Collection<KeyCacheObject> remapKeys,
+ boolean mappingKnown) throws Exception {
Iterator<?> it = null;
if (vals != null)
@@ -764,7 +975,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
if (conflictRmvVals != null)
conflictRmvValsIt = conflictRmvVals.iterator();
- Map<UUID, GridNearAtomicFullUpdateRequest> pendingMappings = U.newHashMap(topNodes.size());
+ Map<UUID, PrimaryRequestState> pendingMappings = U.newHashMap(topNodes.size());
// Create mappings first, then send messages.
for (Object key : keys) {
@@ -819,55 +1030,50 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
else
val = EntryProcessorResourceInjectorProxy.wrap(cctx.kernalContext(), (EntryProcessor)val);
- List<ClusterNode> affNodes = mapKey(cacheKey, topVer);
+ List<ClusterNode> nodes = cctx.affinity().nodesByKey(cacheKey, topVer);
- if (affNodes.isEmpty())
+ if (F.isEmpty(nodes))
throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
"(all partition nodes left the grid).");
- int i = 0;
-
- for (int n = 0; n < affNodes.size(); n++) {
- ClusterNode affNode = affNodes.get(n);
-
- if (affNode == null)
- throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
- "(all partition nodes left the grid).");
-
- UUID nodeId = affNode.id();
-
- GridNearAtomicFullUpdateRequest mapped = pendingMappings.get(nodeId);
-
- if (mapped == null) {
- mapped = new GridNearAtomicFullUpdateRequest(
- cctx.cacheId(),
- nodeId,
- futVer,
- fastMap,
- updVer,
- topVer,
- topLocked,
- syncMode,
- op,
- retval,
- expiryPlc,
- invokeArgs,
- filter,
- subjId,
- taskNameHash,
- skipStore,
- keepBinary,
- cctx.kernalContext().clientNode(),
- cctx.deploymentEnabled(),
- keys.size());
-
- pendingMappings.put(nodeId, mapped);
- }
+ ClusterNode primary = nodes.get(0);
+
+ boolean needPrimaryRes = !mappingKnown || primary.isLocal();
+
+ UUID nodeId = primary.id();
- mapped.addUpdateEntry(cacheKey, val, conflictTtl, conflictExpireTime, conflictVer, i == 0);
+ PrimaryRequestState mapped = pendingMappings.get(nodeId);
- i++;
+ if (mapped == null) {
+ GridNearAtomicFullUpdateRequest req = new GridNearAtomicFullUpdateRequest(
+ cctx.cacheId(),
+ nodeId,
+ futId,
+ topVer,
+ topLocked,
+ syncMode,
+ op,
+ retval,
+ expiryPlc,
+ invokeArgs,
+ filter,
+ subjId,
+ taskNameHash,
+ needPrimaryRes,
+ skipStore,
+ keepBinary,
+ cctx.deploymentEnabled(),
+ keys.size());
+
+ mapped = new PrimaryRequestState(req, nodes, false);
+
+ pendingMappings.put(nodeId, mapped);
}
+
+ if (mapped.req.initMappingLocally())
+ mapped.addMapping(nodes);
+
+ mapped.req.addUpdateEntry(cacheKey, val, conflictTtl, conflictExpireTime, conflictVer);
}
return pendingMappings;
@@ -875,14 +1081,13 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
/**
* @param topVer Topology version.
- * @param futVer Future version.
- * @param updVer Update version.
+ * @param futId Future ID.
+ * @param mappingKnown {@code True} if update mapping is known locally.
* @return Request.
* @throws Exception If failed.
*/
- private GridNearAtomicFullUpdateRequest mapSingleUpdate(AffinityTopologyVersion topVer,
- GridCacheVersion futVer,
- @Nullable GridCacheVersion updVer) throws Exception {
+ private PrimaryRequestState mapSingleUpdate(AffinityTopologyVersion topVer, Long futId, boolean mappingKnown)
+ throws Exception {
Object key = F.first(keys);
Object val;
@@ -935,18 +1140,20 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
else
val = EntryProcessorResourceInjectorProxy.wrap(cctx.kernalContext(), (EntryProcessor)val);
- ClusterNode primary = cctx.affinity().primaryByPartition(cacheKey.partition(), topVer);
+ List<ClusterNode> nodes = cctx.affinity().nodesByKey(cacheKey, topVer);
- if (primary == null)
- throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all partition nodes " +
- "left the grid).");
+ if (F.isEmpty(nodes))
+ throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
+ "(all partition nodes left the grid).");
+
+ ClusterNode primary = nodes.get(0);
+
+ boolean needPrimaryRes = !mappingKnown || primary.isLocal() || nodes.size() == 1;
GridNearAtomicFullUpdateRequest req = new GridNearAtomicFullUpdateRequest(
cctx.cacheId(),
primary.id(),
- futVer,
- fastMap,
- updVer,
+ futId,
topVer,
topLocked,
syncMode,
@@ -957,9 +1164,9 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
filter,
subjId,
taskNameHash,
+ needPrimaryRes,
skipStore,
keepBinary,
- cctx.kernalContext().clientNode(),
cctx.deploymentEnabled(),
1);
@@ -967,26 +1174,9 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
val,
conflictTtl,
conflictExpireTime,
- conflictVer,
- true);
-
- return req;
- }
-
- /**
- * Maps key to nodes. If filters are absent and operation is not TRANSFORM, then we can assign version on near
- * node and send updates in parallel to all participating nodes.
- *
- * @param key Key to map.
- * @param topVer Topology version to map.
- * @return Collection of nodes to which key is mapped.
- */
- private List<ClusterNode> mapKey(KeyCacheObject key, AffinityTopologyVersion topVer) {
- GridCacheAffinityManager affMgr = cctx.affinity();
+ conflictVer);
- // If we can send updates in parallel - do it.
- return fastMap ? cctx.topology().nodes(affMgr.partition(key), topVer) :
- Collections.singletonList(affMgr.primaryByKey(key, topVer));
+ return new PrimaryRequestState(req, nodes, true);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
index 22e01ae..4e20fc7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
@@ -23,11 +23,11 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.UUID;
-import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.GridDirectCollection;
import org.apache.ignite.internal.GridDirectTransient;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
@@ -39,7 +39,6 @@ import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.GridLongList;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
@@ -59,29 +58,18 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
@GridDirectTransient
private UUID nodeId;
- /** Future version. */
- private GridCacheVersion futVer;
+ /** Future ID. */
+ private long futId;
- /** Update error. */
- @GridDirectTransient
- private volatile IgniteCheckedException err;
-
- /** Serialized error. */
- private byte[] errBytes;
+ /** */
+ private UpdateErrors errs;
/** Return value. */
@GridToStringInclude
private GridCacheReturn ret;
- /** Failed keys. */
- @GridToStringInclude
- @GridDirectCollection(KeyCacheObject.class)
- private volatile Collection<KeyCacheObject> failedKeys;
-
- /** Keys that should be remapped. */
- @GridToStringInclude
- @GridDirectCollection(KeyCacheObject.class)
- private List<KeyCacheObject> remapKeys;
+ /** */
+ private AffinityTopologyVersion remapTopVer;
/** Indexes of keys for which values were generated on primary node (used if originating node has near cache). */
@GridDirectCollection(int.class)
@@ -108,6 +96,15 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
/** Partition ID. */
private int partId = -1;
+ /** */
+ @GridDirectCollection(UUID.class)
+ @GridToStringInclude
+ private List<UUID> dhtNodes;
+
+ /** */
+ @GridDirectTransient
+ private boolean nodeLeft;
+
/**
* Empty constructor required by {@link Externalizable}.
*/
@@ -118,24 +115,52 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
/**
* @param cacheId Cache ID.
* @param nodeId Node ID this reply should be sent to.
- * @param futVer Future version.
+ * @param futId Future ID.
+ * @param partId Partition.
+ * @param nodeLeft {@code True} if primary node failed.
* @param addDepInfo Deployment info flag.
*/
- public GridNearAtomicUpdateResponse(int cacheId, UUID nodeId, GridCacheVersion futVer, boolean addDepInfo) {
- assert futVer != null;
-
+ public GridNearAtomicUpdateResponse(int cacheId,
+ UUID nodeId,
+ long futId,
+ int partId,
+ boolean nodeLeft,
+ boolean addDepInfo) {
this.cacheId = cacheId;
this.nodeId = nodeId;
- this.futVer = futVer;
+ this.futId = futId;
+ this.partId = partId;
+ this.nodeLeft = nodeLeft;
this.addDepInfo = addDepInfo;
}
+ /**
+ * @return {@code True} if primary node failed.
+ */
+ public boolean nodeLeftResponse() {
+ return nodeLeft;
+ }
+
/** {@inheritDoc} */
@Override public int lookupIndex() {
return CACHE_MSG_IDX;
}
/**
+ * @param dhtNodes DHT nodes.
+ */
+ public void dhtNodes(List<UUID> dhtNodes) {
+ this.dhtNodes = dhtNodes;
+ }
+
+ /**
+ * @return DHT nodes.
+ */
+ @Nullable public List<UUID> dhtNodes() {
+ return dhtNodes;
+ }
+
+ /**
* @return Node ID this response should be sent to.
*/
public UUID nodeId() {
@@ -150,17 +175,10 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
}
/**
- * @return Future version.
+ * @return Future ID.
*/
- public GridCacheVersion futureVersion() {
- return futVer;
- }
-
- /**
- * @param partId Partition ID for proper striping on near node.
- */
- public void partition(int partId) {
- this.partId = partId;
+ public long futureId() {
+ return futId;
}
/**
@@ -169,19 +187,22 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
* @param err Error.
*/
public void error(IgniteCheckedException err){
- this.err = err;
+ if (errs == null)
+ errs = new UpdateErrors();
+
+ errs.onError(err);
}
/** {@inheritDoc} */
@Override public IgniteCheckedException error() {
- return err;
+ return errs != null ? errs.error() : null;
}
/**
* @return Collection of failed keys.
*/
public Collection<KeyCacheObject> failedKeys() {
- return failedKeys;
+ return errs != null ? errs.failedKeys() : null;
}
/**
@@ -200,17 +221,17 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
}
/**
- * @param remapKeys Remap keys.
+ * @param remapTopVer Topology version to remap update.
*/
- public void remapKeys(List<KeyCacheObject> remapKeys) {
- this.remapKeys = remapKeys;
+ void remapTopologyVersion(AffinityTopologyVersion remapTopVer) {
+ this.remapTopVer = remapTopVer;
}
/**
- * @return Remap keys.
+ * @return Topology version if update should be remapped.
*/
- public Collection<KeyCacheObject> remapKeys() {
- return remapKeys;
+ @Nullable AffinityTopologyVersion remapTopologyVersion() {
+ return remapTopVer;
}
/**
@@ -221,7 +242,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
* @param ttl TTL for near cache update.
* @param expireTime Expire time for near cache update.
*/
- public void addNearValue(int keyIdx,
+ void addNearValue(int keyIdx,
@Nullable CacheObject val,
long ttl,
long expireTime) {
@@ -242,7 +263,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
* @param expireTime Expire time for near cache update.
*/
@SuppressWarnings("ForLoopReplaceableByForEach")
- public void addNearTtl(int keyIdx, long ttl, long expireTime) {
+ void addNearTtl(int keyIdx, long ttl, long expireTime) {
if (ttl >= 0) {
if (nearTtls == null) {
nearTtls = new GridLongList(16);
@@ -299,7 +320,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
/**
* @param nearVer Version generated on primary node to be used for originating node's near cache update.
*/
- public void nearVersion(GridCacheVersion nearVer) {
+ void nearVersion(GridCacheVersion nearVer) {
this.nearVer = nearVer;
}
@@ -313,7 +334,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
/**
* @param keyIdx Index of key for which update was skipped
*/
- public void addSkippedIndex(int keyIdx) {
+ void addSkippedIndex(int keyIdx) {
if (nearSkipIdxs == null)
nearSkipIdxs = new ArrayList<>();
@@ -351,35 +372,10 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
* @param e Error cause.
*/
public synchronized void addFailedKey(KeyCacheObject key, Throwable e) {
- if (failedKeys == null)
- failedKeys = new ConcurrentLinkedQueue<>();
-
- failedKeys.add(key);
-
- if (err == null)
- err = new IgniteCheckedException("Failed to update keys on primary node.");
-
- err.addSuppressed(e);
- }
-
- /**
- * Adds keys to collection of failed keys.
- *
- * @param keys Key to add.
- * @param e Error cause.
- */
- public synchronized void addFailedKeys(Collection<KeyCacheObject> keys, Throwable e) {
- if (keys != null) {
- if (failedKeys == null)
- failedKeys = new ArrayList<>(keys.size());
-
- failedKeys.addAll(keys);
- }
-
- if (err == null)
- err = new IgniteCheckedException("Failed to update keys on primary node.");
+ if (errs == null)
+ errs = new UpdateErrors();
- err.addSuppressed(e);
+ errs.addFailedKey(key, e);
}
/**
@@ -387,18 +383,12 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
*
* @param keys Key to add.
* @param e Error cause.
- * @param ctx Context.
*/
- public synchronized void addFailedKeys(Collection<KeyCacheObject> keys, Throwable e, GridCacheContext ctx) {
- if (failedKeys == null)
- failedKeys = new ArrayList<>(keys.size());
+ synchronized void addFailedKeys(Collection<KeyCacheObject> keys, Throwable e) {
+ if (errs == null)
+ errs = new UpdateErrors();
- failedKeys.addAll(keys);
-
- if (err == null)
- err = new IgniteCheckedException("Failed to update keys on primary node.");
-
- err.addSuppressed(e);
+ errs.addFailedKeys(keys, e);
}
/** {@inheritDoc}
@@ -406,14 +396,10 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
@Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
super.prepareMarshal(ctx);
- if (err != null && errBytes == null)
- errBytes = U.marshal(ctx, err);
-
GridCacheContext cctx = ctx.cacheContext(cacheId);
- prepareMarshalCacheObjects(failedKeys, cctx);
-
- prepareMarshalCacheObjects(remapKeys, cctx);
+ if (errs != null)
+ errs.prepareMarshal(this, cctx);
prepareMarshalCacheObjects(nearVals, cctx);
@@ -425,14 +411,10 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
@Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException {
super.finishUnmarshal(ctx, ldr);
- if (errBytes != null && err == null)
- err = U.unmarshal(ctx, errBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
-
GridCacheContext cctx = ctx.cacheContext(cacheId);
- finishUnmarshalCacheObjects(failedKeys, cctx, ldr);
-
- finishUnmarshalCacheObjects(remapKeys, cctx, ldr);
+ if (errs != null)
+ errs.finishUnmarshal(this, cctx, ldr);
finishUnmarshalCacheObjects(nearVals, cctx, ldr);
@@ -471,19 +453,19 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
switch (writer.state()) {
case 3:
- if (!writer.writeByteArray("errBytes", errBytes))
+ if (!writer.writeCollection("dhtNodes", dhtNodes, MessageCollectionItemType.UUID))
return false;
writer.incrementState();
case 4:
- if (!writer.writeCollection("failedKeys", failedKeys, MessageCollectionItemType.MSG))
+ if (!writer.writeMessage("errs", errs))
return false;
writer.incrementState();
case 5:
- if (!writer.writeMessage("futVer", futVer))
+ if (!writer.writeLong("futId", futId))
return false;
writer.incrementState();
@@ -531,7 +513,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
writer.incrementState();
case 13:
- if (!writer.writeCollection("remapKeys", remapKeys, MessageCollectionItemType.MSG))
+ if (!writer.writeMessage("remapTopVer", remapTopVer))
return false;
writer.incrementState();
@@ -559,7 +541,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
switch (reader.state()) {
case 3:
- errBytes = reader.readByteArray("errBytes");
+ dhtNodes = reader.readCollection("dhtNodes", MessageCollectionItemType.UUID);
if (!reader.isLastRead())
return false;
@@ -567,7 +549,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
reader.incrementState();
case 4:
- failedKeys = reader.readCollection("failedKeys", MessageCollectionItemType.MSG);
+ errs = reader.readMessage("errs");
if (!reader.isLastRead())
return false;
@@ -575,7 +557,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
reader.incrementState();
case 5:
- futVer = reader.readMessage("futVer");
+ futId = reader.readLong("futId");
if (!reader.isLastRead())
return false;
@@ -639,7 +621,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
reader.incrementState();
case 13:
- remapKeys = reader.readCollection("remapKeys", MessageCollectionItemType.MSG);
+ remapTopVer = reader.readMessage("remapTopVer");
if (!reader.isLastRead())
return false;
http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/UpdateErrors.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/UpdateErrors.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/UpdateErrors.java
new file mode 100644
index 0000000..1d415c8
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/UpdateErrors.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.GridDirectCollection;
+import org.apache.ignite.internal.GridDirectTransient;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheMessage;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ *
+ */
+public class UpdateErrors implements Message {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Failed keys. */
+ @GridToStringInclude
+ @GridDirectCollection(KeyCacheObject.class)
+ private List<KeyCacheObject> failedKeys;
+
+ /** Update error. */
+ @GridDirectTransient
+ @GridToStringInclude
+ private IgniteCheckedException err;
+
+ /** Serialized update error. */
+ private byte[] errBytes;
+
+ /**
+ *
+ */
+ public UpdateErrors() {
+ // No-op.
+ }
+
+ /**
+ * @param err Error.
+ */
+ public UpdateErrors(IgniteCheckedException err) {
+ assert err != null;
+
+ this.err = err;
+ }
+
+ /**
+ * @param err Error.
+ */
+ public void onError(IgniteCheckedException err){
+ this.err = err;
+ }
+
+ /**
+ * @return Error.
+ */
+ public IgniteCheckedException error() {
+ return err;
+ }
+
+ /**
+ * @return Failed keys.
+ */
+ public Collection<KeyCacheObject> failedKeys() {
+ return failedKeys;
+ }
+
+ /**
+ * Adds key to collection of failed keys.
+ *
+ * @param key Key to add.
+ * @param e Error cause.
+ */
+ void addFailedKey(KeyCacheObject key, Throwable e) {
+ if (failedKeys == null)
+ failedKeys = new ArrayList<>();
+
+ failedKeys.add(key);
+
+ if (err == null)
+ err = new IgniteCheckedException("Failed to update keys.");
+
+ err.addSuppressed(e);
+ }
+
+ /**
+ * @param keys Keys.
+ * @param e Error.
+ */
+ void addFailedKeys(Collection<KeyCacheObject> keys, Throwable e) {
+ if (failedKeys == null)
+ failedKeys = new ArrayList<>(keys.size());
+
+ failedKeys.addAll(keys);
+
+ if (err == null)
+ err = new IgniteCheckedException("Failed to update keys on primary node.");
+
+ err.addSuppressed(e);
+ }
+
+ /** {@inheritDoc} */
+ void prepareMarshal(GridCacheMessage msg, GridCacheContext cctx) throws IgniteCheckedException {
+ msg.prepareMarshalCacheObjects(failedKeys, cctx);
+
+ if (errBytes == null)
+ errBytes = U.marshal(cctx.marshaller(), err);
+ }
+
+ /** {@inheritDoc} */
+ void finishUnmarshal(GridCacheMessage msg, GridCacheContext cctx, ClassLoader ldr) throws IgniteCheckedException {
+ msg.finishUnmarshalCacheObjects(failedKeys, cctx, ldr);
+
+ if (errBytes != null && err == null)
+ err = U.unmarshal(cctx.marshaller(), errBytes, U.resolveClassLoader(ldr, cctx.gridConfig()));
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ writer.setBuffer(buf);
+
+ if (!writer.isHeaderWritten()) {
+ if (!writer.writeHeader(directType(), fieldsCount()))
+ return false;
+
+ writer.onHeaderWritten();
+ }
+
+ switch (writer.state()) {
+ case 0:
+ if (!writer.writeByteArray("errBytes", errBytes))
+ return false;
+
+ writer.incrementState();
+
+ case 1:
+ if (!writer.writeCollection("failedKeys", failedKeys, MessageCollectionItemType.MSG))
+ return false;
+
+ writer.incrementState();
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ reader.setBuffer(buf);
+
+ if (!reader.beforeMessageRead())
+ return false;
+
+ switch (reader.state()) {
+ case 0:
+ errBytes = reader.readByteArray("errBytes");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 1:
+ failedKeys = reader.readCollection("failedKeys", MessageCollectionItemType.MSG);
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ }
+
+ return reader.afterMessageRead(UpdateErrors.class);
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte directType() {
+ return -46;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte fieldsCount() {
+ return 2;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onAckReceived() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(UpdateErrors.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
index 41632ef..62aecd1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.processors.cache.distributed.near;
import java.io.Externalizable;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
@@ -43,7 +44,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheA
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache;
-import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateResponse;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicNearResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateResponse;
import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
@@ -141,10 +142,7 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
List<Integer> nearValsIdxs = res.nearValuesIndexes();
List<Integer> skipped = res.skippedIndexes();
- GridCacheVersion ver = req.updateVersion();
-
- if (ver == null)
- ver = res.nearVersion();
+ GridCacheVersion ver = res.nearVersion();
assert ver != null : "Failed to find version [req=" + req + ", res=" + res + ']';
@@ -194,7 +192,6 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
processNearAtomicUpdateResponse(ver,
key,
val,
- null,
ttl,
expireTime,
req.keepBinary(),
@@ -212,7 +209,6 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
* @param ver Version.
* @param key Key.
* @param val Value.
- * @param valBytes Value bytes.
* @param ttl TTL.
* @param expireTime Expire time.
* @param nodeId Node ID.
@@ -224,7 +220,6 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
GridCacheVersion ver,
KeyCacheObject key,
@Nullable CacheObject val,
- @Nullable byte[] valBytes,
long ttl,
long expireTime,
boolean keepBinary,
@@ -241,7 +236,7 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
try {
entry = entryEx(key, topVer);
- GridCacheOperation op = (val != null || valBytes != null) ? UPDATE : DELETE;
+ GridCacheOperation op = val != null ? UPDATE : DELETE;
GridCacheUpdateAtomicResult updRes = entry.innerUpdate(
ver,
@@ -299,11 +294,12 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
* @param nodeId Sender node ID.
* @param req Dht atomic update request.
* @param res Dht atomic update response.
+ * @return Evicted near keys (if any).
*/
- public void processDhtAtomicUpdateRequest(
+ @Nullable public List<KeyCacheObject> processDhtAtomicUpdateRequest(
UUID nodeId,
GridDhtAtomicAbstractUpdateRequest req,
- GridDhtAtomicUpdateResponse res
+ GridDhtAtomicNearResponse res
) {
GridCacheVersion ver = req.writeVersion();
@@ -313,6 +309,8 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
String taskName = ctx.kernalContext().task().resolveTaskName(req.taskNameHash());
+ List<KeyCacheObject> nearEvicted = null;
+
for (int i = 0; i < req.nearSize(); i++) {
KeyCacheObject key = req.nearKey(i);
@@ -322,7 +320,10 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
GridCacheEntryEx entry = peekEx(key);
if (entry == null) {
- res.addNearEvicted(key);
+ if (nearEvicted == null)
+ nearEvicted = new ArrayList<>();
+
+ nearEvicted.add(key);
break;
}
@@ -388,6 +389,8 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
res.addFailedKey(key, new IgniteCheckedException("Failed to update near cache key: " + key, e));
}
}
+
+ return nearEvicted;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index b3f0684..485059f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -425,7 +425,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
onEntryUpdate(evt, notify, loc, recordIgniteEvt);
}
- });
+ }, sync);
}
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java
index 5ca3da8..35fbe11 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java
@@ -140,7 +140,10 @@ public interface IgniteCacheObjectProcessor extends GridProcessor {
* before stored in cache.
* @return Cache key object.
*/
- public KeyCacheObject toCacheKeyObject(CacheObjectContext ctx, @Nullable GridCacheContext cctx, Object obj, boolean userObj);
+ public KeyCacheObject toCacheKeyObject(CacheObjectContext ctx,
+ @Nullable GridCacheContext cctx,
+ Object obj,
+ boolean userObj);
/**
* @param ctx Cache context.
http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java
index ff7c4ba..e0549fb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java
@@ -231,8 +231,8 @@ public class IgniteCacheObjectProcessorImpl extends GridProcessorAdapter impleme
cctx.affinity().partition(obj, false) :
ctx.kernalContext().affinity().partition0(ctx.cacheName(), obj, null);
}
- catch (IgniteCheckedException ignored) {
- U.error(log, "Failed to get partition");
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to get partition", e);
return -1;
}
@@ -327,13 +327,7 @@ public class IgniteCacheObjectProcessorImpl extends GridProcessorAdapter impleme
/**
* @param key Key.
- */
- UserKeyCacheObjectImpl(Object key) {
- this(key, -1);
- }
-
- /**
- * @param key Key.
+ * @param part Partition.
*/
UserKeyCacheObjectImpl(Object key, int part) {
super(key, null, part);
@@ -341,6 +335,8 @@ public class IgniteCacheObjectProcessorImpl extends GridProcessorAdapter impleme
/**
* @param key Key.
+ * @param valBytes Marshalled key.
+ * @param part Partition.
*/
UserKeyCacheObjectImpl(Object key, byte[] valBytes, int part) {
super(key, valBytes, part);
@@ -366,10 +362,10 @@ public class IgniteCacheObjectProcessorImpl extends GridProcessorAdapter impleme
Object val = ctx.processor().unmarshal(ctx, valBytes, ldr);
- return new KeyCacheObjectImpl(val, valBytes);
+ return new KeyCacheObjectImpl(val, valBytes, partition());
}
- return new KeyCacheObjectImpl(val, valBytes);
+ return new KeyCacheObjectImpl(val, valBytes, partition());
}
catch (IgniteCheckedException e) {
throw new IgniteException("Failed to marshal object: " + val, e);
http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java b/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
index 53096ab..6c85b32 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
@@ -56,7 +56,7 @@ public class StripedExecutor implements ExecutorService {
/**
* @param cnt Count.
- * @param gridName Node name.
+ * @param igniteInstanceName Node name.
* @param poolName Pool name.
* @param log Logger.
*/
@@ -435,7 +435,11 @@ public class StripedExecutor implements ExecutorService {
* Starts the stripe.
*/
void start() {
- thread = new IgniteThread(igniteInstanceName, poolName + "-stripe-" + idx, this);
+ thread = new IgniteThread(igniteInstanceName,
+ poolName + "-stripe-" + idx,
+ this,
+ IgniteThread.GRP_IDX_UNASSIGNED,
+ idx);
thread.start();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java
index 7abd367..96f3797 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java
@@ -18,8 +18,10 @@
package org.apache.ignite.internal.util.future;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.IgniteFutureCancelledCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.NodeStoppingException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.transactions.IgniteTxOptimisticCheckedException;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
@@ -118,7 +120,14 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> implements Ig
}
catch (IgniteCheckedException e) {
if (!ignoreFailure(e)) {
- U.error(null, "Failed to execute compound future reducer: " + this, e);
+ if (e instanceof NodeStoppingException) {
+ IgniteLogger log = logger();
+
+ if (log != null && log.isDebugEnabled())
+ log.debug("Failed to execute compound future reducer, node stopped.");
+ }
+ else
+ U.error(null, "Failed to execute compound future reducer: " + this, e);
onDone(e);
}