You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/03/22 13:31:05 UTC
[2/2] ignite git commit: Restore clock mode.
Restore clock mode.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0015962a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0015962a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0015962a
Branch: refs/heads/ignite-2.0-clock
Commit: 0015962a43cc48d5b7857503a0f84a43518cbd7f
Parents: 827befb
Author: sboikov <sb...@gridgain.com>
Authored: Wed Mar 22 15:20:14 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Mar 22 16:30:49 2017 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheMapEntry.java | 12 +-
.../GridDhtAtomicAbstractUpdateFuture.java | 62 +-
.../dht/atomic/GridDhtAtomicCache.java | 544 +++++++---
.../atomic/GridDhtAtomicSingleUpdateFuture.java | 6 +-
.../dht/atomic/GridDhtAtomicUpdateFuture.java | 6 +-
...idNearAtomicAbstractSingleUpdateRequest.java | 4 +
.../GridNearAtomicAbstractUpdateFuture.java | 72 +-
.../GridNearAtomicAbstractUpdateRequest.java | 120 +-
.../GridNearAtomicFastMapUpdateFuture.java | 1026 ++++++++++++++++++
.../atomic/GridNearAtomicFullUpdateRequest.java | 48 +-
...GridNearAtomicSingleUpdateFilterRequest.java | 10 +-
.../GridNearAtomicSingleUpdateFuture.java | 23 +-
...GridNearAtomicSingleUpdateInvokeRequest.java | 17 +-
.../GridNearAtomicSingleUpdateRequest.java | 25 +-
.../dht/atomic/GridNearAtomicUpdateFuture.java | 29 +-
.../distributed/near/GridNearAtomicCache.java | 5 +-
.../GridCacheAtomicMessageCountSelfTest.java | 14 +
.../atomic/IgniteCacheAtomicProtocolTest.java | 2 +
18 files changed, 1745 insertions(+), 280 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/0015962a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 54b4ed7..2237e22 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -2170,6 +2170,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
assert conflictCtx != null;
+ boolean ignoreTime = cctx.config().getAtomicWriteOrderMode() == CacheAtomicWriteOrderMode.PRIMARY;
+
// Use old value?
if (conflictCtx.isUseOld()) {
GridCacheVersion newConflictVer = conflictVer != null ? conflictVer : newVer;
@@ -2178,7 +2180,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
if (!isNew() && // Not initial value,
verCheck && // and atomic version check,
oldConflictVer.dataCenterId() == newConflictVer.dataCenterId() && // and data centers are equal,
- ATOMIC_VER_COMPARATOR.compare(oldConflictVer, newConflictVer, true) == 0 && // and both versions are equal,
+ ATOMIC_VER_COMPARATOR.compare(oldConflictVer, newConflictVer, ignoreTime) == 0 && // and both versions are equal,
cctx.writeThrough() && // and store is enabled,
primary) // and we are primary.
{
@@ -2224,11 +2226,13 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
conflictVer = null;
}
+ boolean ignoreTime = cctx.config().getAtomicWriteOrderMode() == CacheAtomicWriteOrderMode.PRIMARY;
+
// Perform version check only in case there was no explicit conflict resolution.
if (conflictCtx == null) {
if (verCheck) {
- if (!isNew() && ATOMIC_VER_COMPARATOR.compare(ver, newVer, true) >= 0) {
- if (ATOMIC_VER_COMPARATOR.compare(ver, newVer, true) == 0 && cctx.writeThrough() && primary) {
+ if (!isNew() && ATOMIC_VER_COMPARATOR.compare(ver, newVer, ignoreTime) >= 0) {
+ if (ATOMIC_VER_COMPARATOR.compare(ver, newVer, ignoreTime) == 0 && cctx.writeThrough() && primary) {
if (log.isDebugEnabled())
log.debug("Received entry update with same version as current (will update store) " +
"[entry=" + this + ", newVer=" + newVer + ']');
@@ -2303,7 +2307,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
}
}
else
- assert isNew() || ATOMIC_VER_COMPARATOR.compare(ver, newVer, true) <= 0 :
+ assert isNew() || ATOMIC_VER_COMPARATOR.compare(ver, newVer, ignoreTime) <= 0 :
"Invalid version for inner update [isNew=" + isNew() + ", entry=" + this + ", newVer=" + newVer + ']';
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/0015962a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
index 5ff5aa4..219b04a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
@@ -85,6 +85,14 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
/** Update request. */
final GridNearAtomicAbstractUpdateRequest updateReq;
+ /** Update response. */
+ @GridToStringExclude
+ private final GridNearAtomicUpdateResponse updateRes;
+
+ /** Completion callback. */
+ @GridToStringExclude
+ private final GridDhtAtomicCache.UpdateReplyClosure completionCb;
+
/** Mappings. */
@GridToStringExclude
protected Map<UUID, GridDhtAtomicAbstractUpdateRequest> mappings;
@@ -99,16 +107,22 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
* @param cctx Cache context.
* @param writeVer Write version.
* @param updateReq Update request.
+ * @param updateRes Response.
+ * @param completionCb Callback to invoke to send response to near node.
*/
protected GridDhtAtomicAbstractUpdateFuture(
GridCacheContext cctx,
GridCacheVersion writeVer,
- GridNearAtomicAbstractUpdateRequest updateReq
- ) {
+ GridNearAtomicAbstractUpdateRequest updateReq,
+ GridNearAtomicUpdateResponse updateRes,
+ GridDhtAtomicCache.UpdateReplyClosure completionCb)
+ {
this.cctx = cctx;
- this.updateReq = updateReq;
this.writeVer = writeVer;
+ this.updateReq = updateReq;
+ this.updateRes = updateRes;
+ this.completionCb = completionCb;
futId = cctx.mvcc().atomicFutureId();
@@ -354,13 +368,8 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
*
* @param nearNode Near node.
* @param ret Cache operation return value.
- * @param updateRes Response.
- * @param completionCb Callback to invoke to send response to near node.
*/
- final void map(ClusterNode nearNode,
- GridCacheReturn ret,
- GridNearAtomicUpdateResponse updateRes,
- GridDhtAtomicCache.UpdateReplyClosure completionCb) {
+ final void map(ClusterNode nearNode, GridCacheReturn ret) {
if (F.isEmpty(mappings)) {
updateRes.dhtNodes(Collections.<UUID>emptyList());
@@ -371,23 +380,27 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
return;
}
- boolean needReplyToNear = updateReq.writeSynchronizationMode() == PRIMARY_SYNC ||
- !ret.emptyResult() ||
- updateRes.nearVersion() != null ||
- cctx.localNodeId().equals(nearNode.id());
+ if (!updateReq.fastMap()) {
+ boolean needReplyToNear = updateReq.writeSynchronizationMode() == PRIMARY_SYNC ||
+ !ret.emptyResult() ||
+ updateRes.nearVersion() != null ||
+ cctx.localNodeId().equals(nearNode.id());
- boolean needMapping = updateReq.fullSync() && (updateReq.needPrimaryResponse() || !sendAllToDht());
+ boolean needMapping = updateReq.fullSync() && (updateReq.needPrimaryResponse() || !sendAllToDht());
- if (needMapping) {
- initMapping(updateRes);
+ if (needMapping) {
+ initMapping(updateRes);
- needReplyToNear = true;
- }
+ needReplyToNear = true;
+ }
- sendDhtRequests(nearNode, ret);
+ sendDhtRequests(nearNode, ret);
- if (needReplyToNear)
- completionCb.apply(updateReq, updateRes);
+ if (needReplyToNear)
+ completionCb.apply(updateReq, updateRes);
+ }
+ else
+ sendDhtRequests(nearNode, ret);
}
/**
@@ -416,14 +429,14 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
try {
assert !cctx.localNodeId().equals(req.nodeId()) : req;
- if (updateReq.fullSync()) {
+ if (!updateReq.fastMap() && updateReq.fullSync()) {
req.nearReplyInfo(nearNode.id(), updateReq.futureId());
if (ret.emptyResult())
req.hasResult(true);
}
- if (cntQryClsrs != null)
+ if (cntQryClsrs != null || updateReq.fastMap())
req.replyWithoutDelay(true);
cctx.io().send(req.nodeId(), req, cctx.ioPolicy());
@@ -519,6 +532,9 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
clsr.apply(suc);
}
+ if (updateReq.fastMap())
+ completionCb.apply(updateReq, updateRes);
+
return true;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/0015962a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index c20ed48..3735824 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -92,6 +92,7 @@ import org.apache.ignite.internal.util.typedef.CI2;
import org.apache.ignite.internal.util.typedef.CO;
import org.apache.ignite.internal.util.typedef.CX1;
import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.P1;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
@@ -106,7 +107,9 @@ import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_ATOMIC_DEFERRED_ACK_BUFFER_SIZE;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_ATOMIC_DEFERRED_ACK_TIMEOUT;
+import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.CLOCK;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.DELETE;
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.UPDATE;
@@ -1115,27 +1118,54 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
int taskNameHash = ctx.kernalContext().job().currentTaskNameHash();
- final GridNearAtomicUpdateFuture updateFut = new GridNearAtomicUpdateFuture(
- ctx,
- this,
- ctx.config().getWriteSynchronizationMode(),
- op,
- map != null ? map.keySet() : invokeMap != null ? invokeMap.keySet() : conflictPutMap != null ?
- conflictPutMap.keySet() : conflictRmvMap.keySet(),
- map != null ? map.values() : invokeMap != null ? invokeMap.values() : null,
- invokeArgs,
- (Collection)(conflictPutMap != null ? conflictPutMap.values() : null),
- conflictRmvMap != null ? conflictRmvMap.values() : null,
- retval,
- rawRetval,
- opCtx != null ? opCtx.expiry() : null,
- CU.filterArray(null),
- subjId,
- taskNameHash,
- opCtx != null && opCtx.skipStore(),
- opCtx != null && opCtx.isKeepBinary(),
- opCtx != null && opCtx.noRetries() ? 1 : MAX_RETRIES,
- waitTopFut);
+ final GridNearAtomicAbstractUpdateFuture updateFut;
+
+ if (isFastMap(null, op)) {
+ updateFut = new GridNearAtomicFastMapUpdateFuture(
+ ctx,
+ this,
+ ctx.config().getWriteSynchronizationMode(),
+ op,
+ map != null ? map.keySet() : invokeMap != null ? invokeMap.keySet() : conflictPutMap != null ?
+ conflictPutMap.keySet() : conflictRmvMap.keySet(),
+ map != null ? map.values() : invokeMap != null ? invokeMap.values() : null,
+ invokeArgs,
+ (Collection)(conflictPutMap != null ? conflictPutMap.values() : null),
+ conflictRmvMap != null ? conflictRmvMap.values() : null,
+ retval,
+ rawRetval,
+ opCtx != null ? opCtx.expiry() : null,
+ CU.filterArray(null),
+ subjId,
+ taskNameHash,
+ opCtx != null && opCtx.skipStore(),
+ opCtx != null && opCtx.isKeepBinary(),
+ opCtx != null && opCtx.noRetries() ? 1 : MAX_RETRIES,
+ waitTopFut);
+ }
+ else {
+ updateFut = new GridNearAtomicUpdateFuture(
+ ctx,
+ this,
+ ctx.config().getWriteSynchronizationMode(),
+ op,
+ map != null ? map.keySet() : invokeMap != null ? invokeMap.keySet() : conflictPutMap != null ?
+ conflictPutMap.keySet() : conflictRmvMap.keySet(),
+ map != null ? map.values() : invokeMap != null ? invokeMap.values() : null,
+ invokeArgs,
+ (Collection)(conflictPutMap != null ? conflictPutMap.values() : null),
+ conflictRmvMap != null ? conflictRmvMap.values() : null,
+ retval,
+ rawRetval,
+ opCtx != null ? opCtx.expiry() : null,
+ CU.filterArray(null),
+ subjId,
+ taskNameHash,
+ opCtx != null && opCtx.skipStore(),
+ opCtx != null && opCtx.isKeepBinary(),
+ opCtx != null && opCtx.noRetries() ? 1 : MAX_RETRIES,
+ waitTopFut);
+ }
if (async) {
return asyncOp(new CO<IgniteInternalFuture<Object>>() {
@@ -1306,7 +1336,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
CacheEntryPredicate[] filters = CU.filterArray(filter);
- if (conflictPutVal == null && conflictRmvVer == null) {
+ if (conflictPutVal == null &&
+ conflictRmvVer == null &&
+ !isFastMap(filters, op)) {
return new GridNearAtomicSingleUpdateFuture(
ctx,
this,
@@ -1328,30 +1360,67 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
);
}
else {
- return new GridNearAtomicUpdateFuture(
- ctx,
- this,
- ctx.config().getWriteSynchronizationMode(),
- op,
- Collections.singletonList(key),
- val0 != null ? Collections.singletonList(val0) : null,
- invokeArgs,
- conflictPutVal != null ? Collections.singleton(conflictPutVal) : null,
- conflictRmvVer != null ? Collections.singleton(conflictRmvVer) : null,
- retval,
- false,
- opCtx != null ? opCtx.expiry() : null,
- filters,
- ctx.subjectIdPerCall(null, opCtx),
- ctx.kernalContext().job().currentTaskNameHash(),
- opCtx != null && opCtx.skipStore(),
- opCtx != null && opCtx.isKeepBinary(),
- opCtx != null && opCtx.noRetries() ? 1 : MAX_RETRIES,
- waitTopFut);
+ if (isFastMap(filters, op)) {
+ return new GridNearAtomicFastMapUpdateFuture(
+ ctx,
+ this,
+ ctx.config().getWriteSynchronizationMode(),
+ op,
+ Collections.singletonList(key),
+ val0 != null ? Collections.singletonList(val0) : null,
+ invokeArgs,
+ conflictPutVal != null ? Collections.singleton(conflictPutVal) : null,
+ conflictRmvVer != null ? Collections.singleton(conflictRmvVer) : null,
+ retval,
+ false,
+ opCtx != null ? opCtx.expiry() : null,
+ filters,
+ ctx.subjectIdPerCall(null, opCtx),
+ ctx.kernalContext().job().currentTaskNameHash(),
+ opCtx != null && opCtx.skipStore(),
+ opCtx != null && opCtx.isKeepBinary(),
+ opCtx != null && opCtx.noRetries() ? 1 : MAX_RETRIES,
+ waitTopFut);
+ }
+ else {
+ return new GridNearAtomicUpdateFuture(
+ ctx,
+ this,
+ ctx.config().getWriteSynchronizationMode(),
+ op,
+ Collections.singletonList(key),
+ val0 != null ? Collections.singletonList(val0) : null,
+ invokeArgs,
+ conflictPutVal != null ? Collections.singleton(conflictPutVal) : null,
+ conflictRmvVer != null ? Collections.singleton(conflictRmvVer) : null,
+ retval,
+ false,
+ opCtx != null ? opCtx.expiry() : null,
+ filters,
+ ctx.subjectIdPerCall(null, opCtx),
+ ctx.kernalContext().job().currentTaskNameHash(),
+ opCtx != null && opCtx.skipStore(),
+ opCtx != null && opCtx.isKeepBinary(),
+ opCtx != null && opCtx.noRetries() ? 1 : MAX_RETRIES,
+ waitTopFut);
+ }
}
}
/**
+ * Whether this is fast-map operation.
+ *
+ * @param filters Filters.
+ * @param op Operation.
+ * @return {@code True} if fast-map.
+ */
+ public boolean isFastMap(CacheEntryPredicate[] filters, GridCacheOperation op) {
+ return F.isEmpty(filters) && op != TRANSFORM && ctx.config().getWriteSynchronizationMode() == FULL_SYNC &&
+ ctx.config().getAtomicWriteOrderMode() == CLOCK &&
+ !(ctx.writeThrough() && ctx.config().getInterceptor() != null);
+ }
+
+ /**
* Entry point for all public API remove methods.
*
* @param keys Keys to remove.
@@ -1394,26 +1463,52 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
});
}
- final GridNearAtomicUpdateFuture updateFut = new GridNearAtomicUpdateFuture(
- ctx,
- this,
- ctx.config().getWriteSynchronizationMode(),
- DELETE,
- keys != null ? keys : conflictMap.keySet(),
- null,
- null,
- null,
- drVers != null ? drVers : (keys != null ? null : conflictMap.values()),
- retval,
- rawRetval,
- opCtx != null ? opCtx.expiry() : null,
- CU.filterArray(null),
- subjId,
- taskNameHash,
- opCtx != null && opCtx.skipStore(),
- opCtx != null && opCtx.isKeepBinary(),
- opCtx != null && opCtx.noRetries() ? 1 : MAX_RETRIES,
- true);
+ final GridNearAtomicAbstractUpdateFuture updateFut;
+
+ if (isFastMap(null, DELETE)) {
+ updateFut = new GridNearAtomicFastMapUpdateFuture(
+ ctx,
+ this,
+ ctx.config().getWriteSynchronizationMode(),
+ DELETE,
+ keys != null ? keys : conflictMap.keySet(),
+ null,
+ null,
+ null,
+ drVers != null ? drVers : (keys != null ? null : conflictMap.values()),
+ retval,
+ rawRetval,
+ opCtx != null ? opCtx.expiry() : null,
+ CU.filterArray(null),
+ subjId,
+ taskNameHash,
+ opCtx != null && opCtx.skipStore(),
+ opCtx != null && opCtx.isKeepBinary(),
+ opCtx != null && opCtx.noRetries() ? 1 : MAX_RETRIES,
+ true);
+ }
+ else {
+ updateFut = new GridNearAtomicUpdateFuture(
+ ctx,
+ this,
+ ctx.config().getWriteSynchronizationMode(),
+ DELETE,
+ keys != null ? keys : conflictMap.keySet(),
+ null,
+ null,
+ null,
+ drVers != null ? drVers : (keys != null ? null : conflictMap.values()),
+ retval,
+ rawRetval,
+ opCtx != null ? opCtx.expiry() : null,
+ CU.filterArray(null),
+ subjId,
+ taskNameHash,
+ opCtx != null && opCtx.skipStore(),
+ opCtx != null && opCtx.isKeepBinary(),
+ opCtx != null && opCtx.noRetries() ? 1 : MAX_RETRIES,
+ true);
+ }
if (async) {
return asyncOp(new CO<IgniteInternalFuture<Object>>() {
@@ -1773,29 +1868,36 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
return;
}
- // Do not check topology version if topology was locked on near node by
+ // Do not check topology version for CLOCK versioning since
+ // partition exchange will wait for near update future (if future is on server node).
+ // Also do not check topology version if topology was locked on near node by
// external transaction or explicit lock.
- if (req.topologyLocked() || !needRemap(req.topologyVersion(), top.topologyVersion())) {
+ if ((req.fastMap() && !req.clientRequest()) || req.topologyLocked() ||
+ !needRemap(req.topologyVersion(), top.topologyVersion())) {
locked = lockEntries(req, req.topologyVersion());
boolean hasNear = ctx.discovery().cacheNearNode(node, name());
- // Assign next version for update inside entries lock.
- GridCacheVersion ver = ctx.versions().next(top.topologyVersion());
+ GridCacheVersion ver = req.updateVersion();
+
+ if (ver == null) {
+ // Assign next version for update inside entries lock.
+ ver = ctx.versions().next(top.topologyVersion());
- if (hasNear)
- res.nearVersion(ver);
+ if (hasNear)
+ res.nearVersion(ver);
- if (msgLog.isDebugEnabled()) {
- msgLog.debug("Assigned update version [futId=" + req.futureId() +
- ", writeVer=" + ver + ']');
+ if (msgLog.isDebugEnabled()) {
+ msgLog.debug("Assigned update version [futId=" + req.futureId() +
+ ", writeVer=" + ver + ']');
+ }
}
assert ver != null : "Got null version for update request: " + req;
boolean sndPrevVal = !top.rebalanceFinished(req.topologyVersion());
- dhtFut = createDhtFuture(ver, req);
+ dhtFut = req.fastMap() ? null : createDhtFuture(ver, req, res, completionCb);
expiry = expiryPolicy(req.expiry());
@@ -1812,6 +1914,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
hasNear,
req,
res,
+ completionCb,
locked,
ver,
dhtFut,
@@ -1831,6 +1934,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
hasNear,
req,
res,
+ completionCb,
locked,
ver,
dhtFut,
@@ -1910,9 +2014,15 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
completionCb.apply(req, res);
}
- else
+ else {
if (dhtFut != null)
- dhtFut.map(node, res.returnValue(), res, completionCb);
+ dhtFut.map(node, res.returnValue());
+ else {
+ assert req.fastMap() : req;
+
+ completionCb.apply(req, res);
+ }
+ }
if (req.writeSynchronizationMode() != FULL_ASYNC)
req.cleanup(!node.isLocal());
@@ -1943,6 +2053,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
final boolean hasNear,
final GridNearAtomicAbstractUpdateRequest req,
final GridNearAtomicUpdateResponse res,
+ GridDhtAtomicCache.UpdateReplyClosure completionCb,
final List<GridDhtCacheEntry> locked,
final GridCacheVersion ver,
@Nullable GridDhtAtomicAbstractUpdateFuture dhtFut,
@@ -1990,6 +2101,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
for (int i = 0; i < locked.size(); i++) {
GridDhtCacheEntry entry = locked.get(i);
+ if (entry == null)
+ continue;
+
try {
if (!checkFilter(entry, req, res)) {
if (expiry != null && entry.hasValue()) {
@@ -2095,6 +2209,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
dhtFut,
req,
res,
+ completionCb,
replicate,
updRes,
taskName,
@@ -2143,6 +2258,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
dhtFut,
req,
res,
+ completionCb,
replicate,
updRes,
taskName,
@@ -2269,6 +2385,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
dhtFut,
req,
res,
+ completionCb,
replicate,
updRes,
taskName,
@@ -2358,6 +2475,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
boolean hasNear,
GridNearAtomicAbstractUpdateRequest req,
GridNearAtomicUpdateResponse res,
+ GridDhtAtomicCache.UpdateReplyClosure completionCb,
List<GridDhtCacheEntry> locked,
GridCacheVersion ver,
@Nullable GridDhtAtomicAbstractUpdateFuture dhtFut,
@@ -2373,6 +2491,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
boolean checkReaders = hasNear || ctx.discovery().hasNearCache(ctx.cacheId(), topVer);
+ boolean readersOnly = false;
+
boolean intercept = ctx.config().getInterceptor() != null;
AffinityAssignment affAssignment = ctx.affinity().assignment(topVer);
@@ -2388,12 +2508,18 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
try {
GridDhtCacheEntry entry = locked.get(i);
+ if (entry == null)
+ continue;
+
GridCacheVersion newConflictVer = req.conflictVersion(i);
long newConflictTtl = req.conflictTtl(i);
long newConflictExpireTime = req.conflictExpireTime(i);
assert !(newConflictVer instanceof GridCacheVersionEx) : newConflictVer;
+ boolean primary = !req.fastMap() || ctx.affinity().primaryByPartition(ctx.localNode(), entry.partition(),
+ req.topologyVersion());
+
Object writeVal = op == TRANSFORM ? req.entryProcessor(i) : req.writeValue(i);
Collection<UUID> readers = null;
@@ -2411,18 +2537,19 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
op,
writeVal,
req.invokeArguments(),
- writeThrough() && !req.skipStore(),
+ (primary || (ctx.store().isLocal() && !ctx.shared().localStorePrimaryOnly()))
+ && writeThrough() && !req.skipStore(),
!req.skipStore(),
sndPrevVal || req.returnValue(),
req.keepBinary(),
expiry,
/*event*/true,
/*metrics*/true,
- /*primary*/true,
- /*verCheck*/false,
+ primary,
+ ctx.config().getAtomicWriteOrderMode() == CLOCK, // Check version in CLOCK mode on primary node.
topVer,
req.filter(),
- replicate ? DR_PRIMARY : DR_NONE,
+ replicate ? primary ? DR_PRIMARY : DR_BACKUP : DR_NONE,
newConflictTtl,
newConflictExpireTime,
newConflictVer,
@@ -2434,6 +2561,12 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
/*updateCntr*/null,
dhtFut);
+ if (dhtFut == null && !F.isEmpty(filteredReaders)) {
+ dhtFut = createDhtFuture(ver, req, res, completionCb);
+
+ readersOnly = true;
+ }
+
if (dhtFut != null) {
if (updRes.sendToDht()) { // Send to backups even in case of remove-remove scenarios.
GridCacheVersionConflictContext<?, ?> conflictCtx = updRes.conflictResolveResult();
@@ -2445,17 +2578,19 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
EntryProcessor<Object, Object, Object> entryProcessor = null;
- dhtFut.addWriteEntry(
- affAssignment,
- entry,
- updRes.newValue(),
- entryProcessor,
- updRes.newTtl(),
- updRes.conflictExpireTime(),
- newConflictVer,
- sndPrevVal,
- updRes.oldValue(),
- updRes.updateCounter());
+ if (!readersOnly) {
+ dhtFut.addWriteEntry(
+ affAssignment,
+ entry,
+ updRes.newValue(),
+ entryProcessor,
+ updRes.newTtl(),
+ updRes.conflictExpireTime(),
+ newConflictVer,
+ sndPrevVal,
+ updRes.oldValue(),
+ updRes.updateCounter());
+ }
if (!F.isEmpty(filteredReaders))
dhtFut.addNearWriteEntries(
@@ -2474,7 +2609,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
}
if (hasNear) {
- if (updRes.sendToDht()) {
+ if (primary && updRes.sendToDht()) {
if (!ctx.affinity().partitionBelongs(nearNode, entry.partition(), topVer)) {
// If put the same value as in request then do not need to send it back.
if (op == TRANSFORM || writeVal != updRes.newValue()) {
@@ -2580,6 +2715,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
@Nullable GridDhtAtomicAbstractUpdateFuture dhtFut,
final GridNearAtomicAbstractUpdateRequest req,
final GridNearAtomicUpdateResponse res,
+ GridDhtAtomicCache.UpdateReplyClosure completionCb,
final boolean replicate,
final UpdateBatchResult batchRes,
final String taskName,
@@ -2600,8 +2736,17 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
GridCacheOperation op;
if (putMap != null) {
+ // If fast mapping, filter primary keys for write to store.
+ Map<KeyCacheObject, CacheObject> storeMap = req.fastMap() ?
+ F.view(putMap, new P1<CacheObject>() {
+ @Override public boolean apply(CacheObject key) {
+ return ctx.affinity().primaryByKey(ctx.localNode(), key, req.topologyVersion());
+ }
+ }) :
+ putMap;
+
try {
- ctx.store().putAll(null, F.viewReadOnly(putMap, new C1<CacheObject, IgniteBiTuple<CacheObject, GridCacheVersion>>() {
+ ctx.store().putAll(null, F.viewReadOnly(storeMap, new C1<CacheObject, IgniteBiTuple<CacheObject, GridCacheVersion>>() {
@Override public IgniteBiTuple<CacheObject, GridCacheVersion> apply(CacheObject v) {
return F.t(v, ver);
}
@@ -2614,8 +2759,17 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
op = UPDATE;
}
else {
+ // If fast mapping, filter primary keys for write to store.
+ Collection<KeyCacheObject> storeKeys = req.fastMap() ?
+ F.view(rmvKeys, new P1<Object>() {
+ @Override public boolean apply(Object key) {
+ return ctx.affinity().primaryByKey(ctx.localNode(), key, req.topologyVersion());
+ }
+ }) :
+ rmvKeys;
+
try {
- ctx.store().removeAll(null, rmvKeys);
+ ctx.store().removeAll(null, storeKeys);
}
catch (CacheStorePartialUpdateException e) {
storeErr = e;
@@ -2650,6 +2804,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
assert writeVal != null || op == DELETE : "null write value found.";
+ boolean primary = !req.fastMap() || ctx.affinity().primaryByPartition(ctx.localNode(),
+ entry.partition(),
+ req.topologyVersion());
+
Collection<UUID> readers = null;
Collection<UUID> filteredReaders = null;
@@ -2672,11 +2830,11 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
expiry,
/*event*/true,
/*metrics*/true,
- /*primary*/true,
- /*verCheck*/false,
+ primary,
+ ctx.config().getAtomicWriteOrderMode() == CLOCK, // Check version in CLOCK mode on primary node.
topVer,
null,
- replicate ? DR_PRIMARY : DR_NONE,
+ replicate ? primary ? DR_PRIMARY : DR_BACKUP : DR_NONE,
CU.TTL_NOT_CHANGED,
CU.EXPIRE_TIME_CALCULATE,
null,
@@ -2710,23 +2868,31 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
batchRes.addDeleted(entry, updRes, entries);
+ if (dhtFut == null && !F.isEmpty(filteredReaders)) {
+ dhtFut = createDhtFuture(ver, req, res, completionCb);
+
+ batchRes.readersOnly(true);
+ }
+
if (dhtFut != null) {
EntryProcessor<Object, Object, Object> entryProcessor =
entryProcessorMap == null ? null : entryProcessorMap.get(entry.key());
- dhtFut.addWriteEntry(
- affAssignment,
- entry,
- writeVal,
- entryProcessor,
- updRes.newTtl(),
- CU.EXPIRE_TIME_CALCULATE,
- null,
- sndPrevVal,
- updRes.oldValue(),
- updRes.updateCounter());
+ if (!batchRes.readersOnly()) {
+ dhtFut.addWriteEntry(
+ affAssignment,
+ entry,
+ writeVal,
+ entryProcessor,
+ updRes.newTtl(),
+ CU.EXPIRE_TIME_CALCULATE,
+ null,
+ sndPrevVal,
+ updRes.oldValue(),
+ updRes.updateCounter());
+ }
- if (!F.isEmpty(filteredReaders))
+ if (!F.isEmpty(filteredReaders)) {
dhtFut.addNearWriteEntries(
filteredReaders,
entry,
@@ -2734,29 +2900,34 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
entryProcessor,
updRes.newTtl(),
CU.EXPIRE_TIME_CALCULATE);
+ }
}
if (hasNear) {
- if (!ctx.affinity().partitionBelongs(nearNode, entry.partition(), topVer)) {
- int idx = firstEntryIdx + i;
-
- if (req.operation() == TRANSFORM) {
- res.addNearValue(idx,
- writeVal,
- updRes.newTtl(),
- CU.EXPIRE_TIME_CALCULATE);
- }
- else
- res.addNearTtl(idx, updRes.newTtl(), CU.EXPIRE_TIME_CALCULATE);
+ if (primary) {
+ if (!ctx.affinity().partitionBelongs(nearNode, entry.partition(), topVer)) {
+ int idx = firstEntryIdx + i;
+
+ if (req.operation() == TRANSFORM) {
+ res.addNearValue(idx,
+ writeVal,
+ updRes.newTtl(),
+ CU.EXPIRE_TIME_CALCULATE);
+ }
+ else
+ res.addNearTtl(idx, updRes.newTtl(), CU.EXPIRE_TIME_CALCULATE);
- if (writeVal != null || entry.hasValue()) {
- IgniteInternalFuture<Boolean> f = entry.addReader(nearNode.id(), req.messageId(), topVer);
+ if (writeVal != null || entry.hasValue()) {
+ IgniteInternalFuture<Boolean> f = entry.addReader(nearNode.id(), req.messageId(), topVer);
- assert f == null : f;
+ assert f == null : f;
+ }
}
+ else if (readers.contains(nearNode.id())) // Reader became primary or backup.
+ entry.removeReader(nearNode.id(), req.messageId());
+ else
+ res.addSkippedIndex(firstEntryIdx + i);
}
- else if (readers.contains(nearNode.id())) // Reader became primary or backup.
- entry.removeReader(nearNode.id(), req.messageId());
else
res.addSkippedIndex(firstEntryIdx + i);
}
@@ -2800,14 +2971,23 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
KeyCacheObject key = req.key(0);
while (true) {
- GridDhtCacheEntry entry = entryExx(key, topVer);
+ try {
+ GridDhtCacheEntry entry = entryExx(key, topVer);
- GridUnsafe.monitorEnter(entry);
+ GridUnsafe.monitorEnter(entry);
- if (entry.obsolete())
- GridUnsafe.monitorExit(entry);
- else
- return Collections.singletonList(entry);
+ if (entry.obsolete())
+ GridUnsafe.monitorExit(entry);
+ else
+ return Collections.singletonList(entry);
+ }
+ catch (GridDhtInvalidPartitionException e) {
+ // Ignore invalid partition exception in CLOCK ordering mode.
+ if (ctx.config().getAtomicWriteOrderMode() == CLOCK)
+ return Collections.singletonList(null);
+ else
+ throw e;
+ }
}
}
else {
@@ -2815,9 +2995,18 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
while (true) {
for (int i = 0; i < req.size(); i++) {
- GridDhtCacheEntry entry = entryExx(req.key(i), topVer);
+ try {
+ GridDhtCacheEntry entry = entryExx(req.key(i), topVer);
- locked.add(entry);
+ locked.add(entry);
+ }
+ catch (GridDhtInvalidPartitionException e) {
+ // Ignore invalid partition exception in CLOCK ordering mode.
+ if (ctx.config().getAtomicWriteOrderMode() == CLOCK)
+ locked.add(null);
+ else
+ throw e;
+ }
}
boolean retry = false;
@@ -2970,28 +3159,54 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
drPutVals = null;
}
- GridNearAtomicUpdateFuture updateFut = new GridNearAtomicUpdateFuture(
- ctx,
- this,
- ctx.config().getWriteSynchronizationMode(),
- req.operation(),
- req.keys(),
- vals,
- req.invokeArguments(),
- drPutVals,
- drRmvVals,
- req.returnValue(),
- false,
- req.expiry(),
- req.filter(),
- req.subjectId(),
- req.taskNameHash(),
- req.skipStore(),
- req.keepBinary(),
- MAX_RETRIES,
- true);
+ if (isFastMap(req.filter(), req.operation())) {
+ GridNearAtomicFastMapUpdateFuture updateFut = new GridNearAtomicFastMapUpdateFuture(
+ ctx,
+ this,
+ ctx.config().getWriteSynchronizationMode(),
+ req.operation(),
+ req.keys(),
+ vals,
+ req.invokeArguments(),
+ drPutVals,
+ drRmvVals,
+ req.returnValue(),
+ false,
+ req.expiry(),
+ req.filter(),
+ req.subjectId(),
+ req.taskNameHash(),
+ req.skipStore(),
+ req.keepBinary(),
+ MAX_RETRIES,
+ true);
- updateFut.map();
+ updateFut.map();
+ }
+ else {
+ GridNearAtomicUpdateFuture updateFut = new GridNearAtomicUpdateFuture(
+ ctx,
+ this,
+ ctx.config().getWriteSynchronizationMode(),
+ req.operation(),
+ req.keys(),
+ vals,
+ req.invokeArguments(),
+ drPutVals,
+ drRmvVals,
+ req.returnValue(),
+ false,
+ req.expiry(),
+ req.filter(),
+ req.subjectId(),
+ req.taskNameHash(),
+ req.skipStore(),
+ req.keepBinary(),
+ MAX_RETRIES,
+ true);
+
+ updateFut.map();
+ }
}
/**
@@ -3003,12 +3218,14 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
*/
private GridDhtAtomicAbstractUpdateFuture createDhtFuture(
GridCacheVersion writeVer,
- GridNearAtomicAbstractUpdateRequest updateReq
+ GridNearAtomicAbstractUpdateRequest updateReq,
+ GridNearAtomicUpdateResponse updateRes,
+ GridDhtAtomicCache.UpdateReplyClosure completionCb
) {
if (updateReq.size() == 1)
- return new GridDhtAtomicSingleUpdateFuture(ctx, writeVer, updateReq);
+ return new GridDhtAtomicSingleUpdateFuture(ctx, writeVer, updateReq, updateRes, completionCb);
else
- return new GridDhtAtomicUpdateFuture(ctx, writeVer, updateReq);
+ return new GridDhtAtomicUpdateFuture(ctx, writeVer, updateReq, updateRes, completionCb);
}
/**
@@ -3520,6 +3737,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
private GridDhtAtomicAbstractUpdateFuture dhtFut;
/** */
+ private boolean readersOnly;
+
+ /** */
private GridCacheReturn invokeRes;
/**
@@ -3572,6 +3792,20 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
private void dhtFuture(@Nullable GridDhtAtomicAbstractUpdateFuture dhtFut) {
this.dhtFut = dhtFut;
}
+
+ /**
+ * @return {@code True} if only readers (not backups) should be updated.
+ */
+ private boolean readersOnly() {
+ return readersOnly;
+ }
+
+ /**
+ * @param readersOnly {@code True} if only readers (not backups) should be updated.
+ */
+ private void readersOnly(boolean readersOnly) {
+ this.readersOnly = readersOnly;
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/0015962a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
index 8ebe9c3..8ad6496 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
@@ -50,9 +50,11 @@ class GridDhtAtomicSingleUpdateFuture extends GridDhtAtomicAbstractUpdateFuture
GridDhtAtomicSingleUpdateFuture(
GridCacheContext cctx,
GridCacheVersion writeVer,
- GridNearAtomicAbstractUpdateRequest updateReq
+ GridNearAtomicAbstractUpdateRequest updateReq,
+ GridNearAtomicUpdateResponse updateRes,
+ GridDhtAtomicCache.UpdateReplyClosure completionCb
) {
- super(cctx, writeVer, updateReq);
+ super(cctx, writeVer, updateReq, updateRes, completionCb);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/0015962a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
index 5d5ddf0..6de08c2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
@@ -49,9 +49,11 @@ class GridDhtAtomicUpdateFuture extends GridDhtAtomicAbstractUpdateFuture {
GridDhtAtomicUpdateFuture(
GridCacheContext cctx,
GridCacheVersion writeVer,
- GridNearAtomicAbstractUpdateRequest updateReq
+ GridNearAtomicAbstractUpdateRequest updateReq,
+ GridNearAtomicUpdateResponse updateRes,
+ GridDhtAtomicCache.UpdateReplyClosure completionCb
) {
- super(cctx, writeVer, updateReq);
+ super(cctx, writeVer, updateReq, updateRes, completionCb);
mappings = U.newHashMap(updateReq.size());
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/0015962a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractSingleUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractSingleUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractSingleUpdateRequest.java
index 6811236..10f368e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractSingleUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractSingleUpdateRequest.java
@@ -73,6 +73,8 @@ public abstract class GridNearAtomicAbstractSingleUpdateRequest extends GridNear
boolean retval,
@Nullable UUID subjId,
int taskNameHash,
+ boolean fastMap,
+ boolean clientReq,
boolean mappingKnown,
boolean skipStore,
boolean keepBinary,
@@ -88,6 +90,8 @@ public abstract class GridNearAtomicAbstractSingleUpdateRequest extends GridNear
retval,
subjId,
taskNameHash,
+ fastMap,
+ clientReq,
mappingKnown,
skipStore,
keepBinary,
http://git-wip-us.apache.org/repos/asf/ignite/blob/0015962a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
index 39abb73..59f3e76 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
@@ -42,6 +42,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheMvccManager;
import org.apache.ignite.internal.processors.cache.GridCacheOperation;
import org.apache.ignite.internal.processors.cache.GridCacheReturn;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.F;
@@ -51,6 +52,7 @@ import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteUuid;
import org.jetbrains.annotations.Nullable;
+import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.CLOCK;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC;
@@ -140,6 +142,9 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
@GridToStringInclude
protected Long futId;
+ /** */
+ protected GridCacheVersion updVer;
+
/** Operation result. */
protected GridCacheReturn opRes;
@@ -208,12 +213,32 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
}
/** {@inheritDoc} */
- @Override public final IgniteInternalFuture<Void> completeFuture(AffinityTopologyVersion topVer) {
+ @Override public IgniteInternalFuture<Void> completeFuture(AffinityTopologyVersion topVer) {
return null;
}
/**
* @param req Request.
+ * @param topVer Topology version.
+ */
+ final void initUpdateVersion(GridNearAtomicAbstractUpdateRequest req, AffinityTopologyVersion topVer) {
+ // Assign version on near node in CLOCK ordering mode even if fastMap is false.
+ if (cctx.config().getAtomicWriteOrderMode() == CLOCK) {
+ GridCacheVersion updVer = this.updVer;
+
+ if (updVer == null) {
+ this.updVer = updVer = cctx.versions().next(topVer);
+
+ if (log.isDebugEnabled())
+ log.debug("Assigned fast-map version for update on near node: " + updVer);
+ }
+
+ req.updateVersion(updVer);
+ }
+ }
+
+ /**
+ * @param req Request.
*/
void sendCheckUpdateRequest(GridNearAtomicCheckUpdateRequest req) {
try {
@@ -241,14 +266,29 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
// Cannot remap.
remapCnt = 1;
- map(topVer);
+ if (fastMap()) {
+ Long futId = addAtomicFuture(topVer);
+
+ if (futId != null)
+ map(topVer, futId);
+ }
+ else
+ map(topVer, futId);
}
}
/**
+ * @return {@code True} for fast map update mode.
+ */
+ protected boolean fastMap() {
+ return false;
+ }
+
+ /**
* @param topVer Topology version.
+ * @param futId Future ID.
*/
- protected abstract void map(AffinityTopologyVersion topVer);
+ protected abstract void map(AffinityTopologyVersion topVer, Long futId);
/**
* Maps future on ready topology.
@@ -274,7 +314,7 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
* @return {@code True} future is stored by {@link GridCacheMvccManager#addAtomicFuture}.
*/
final boolean storeFuture() {
- return syncMode != FULL_ASYNC;
+ return syncMode != FULL_ASYNC || cctx.config().getAtomicWriteOrderMode() == CLOCK;
}
/**
@@ -416,6 +456,30 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
}
/**
+ * Adds future prevents topology change before operation complete.
+ * Should be invoked before topology lock released.
+ *
+ * @param topVer Topology version.
+ * @return Future version in case future added.
+ */
+ protected final Long addAtomicFuture(AffinityTopologyVersion topVer) {
+ Long futId = cctx.mvcc().atomicFutureId();
+
+ synchronized (mux) {
+ assert this.futId == null : this;
+ assert this.topVer == AffinityTopologyVersion.ZERO : this;
+
+ this.topVer = topVer;
+ this.futId = futId;
+ }
+
+ if (storeFuture() && !cctx.mvcc().addAtomicFuture(futId, this))
+ return null;
+
+ return futId;
+ }
+
+ /**
*
*/
static class PrimaryRequestState {
http://git-wip-us.apache.org/repos/asf/ignite/blob/0015962a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
index a43bfb0..bdf2678 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
@@ -48,7 +48,7 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa
/** Message index. */
public static final int CACHE_MSG_IDX = nextIndexId();
- /** . */
+ /** */
private static final int NEED_PRIMARY_RES_FLAG_MASK = 0x01;
/** Topology locked flag. Set if atomic update is performed inside TX or explicit lock. */
@@ -63,6 +63,15 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa
/** Return value flag. */
private static final int RET_VAL_FLAG_MASK = 0x10;
+ /** Fast map update flag. */
+ private static final int FAST_MAP_FLAG_MASK = 0x20;
+
+ /** Client node request flag. */
+ private static final int CLIENT_REQ_FLAG_MASK = 0x40;
+
+ /** Client node request flag. */
+ private static final int HAS_PRIMARY_FLAG_MASK = 0x80;
+
/** Target node ID. */
@GridDirectTransient
protected UUID nodeId;
@@ -93,6 +102,9 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa
@GridDirectTransient
private GridNearAtomicUpdateResponse res;
+ /** Update version. Set to non-null if fastMap is {@code true}. */
+ private GridCacheVersion updateVer;
+
/**
*
*/
@@ -129,6 +141,8 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa
boolean retval,
@Nullable UUID subjId,
int taskNameHash,
+ boolean fastMap,
+ boolean clientReq,
boolean needPrimaryRes,
boolean skipStore,
boolean keepBinary,
@@ -144,16 +158,62 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa
this.taskNameHash = taskNameHash;
this.addDepInfo = addDepInfo;
+ if (fastMap)
+ setFlag(true, FAST_MAP_FLAG_MASK);
+ if (clientReq)
+ setFlag(true, CLIENT_REQ_FLAG_MASK);
if (needPrimaryRes)
- needPrimaryResponse(true);
+ setFlag(true, NEED_PRIMARY_RES_FLAG_MASK);
if (topLocked)
- topologyLocked(true);
+ setFlag(true, TOP_LOCKED_FLAG_MASK);
if (retval)
- returnValue(true);
+ setFlag(true, RET_VAL_FLAG_MASK);
if (skipStore)
- skipStore(true);
+ setFlag(true, SKIP_STORE_FLAG_MASK);
if (keepBinary)
- keepBinary(true);
+ setFlag(true, KEEP_BINARY_FLAG_MASK);
+ }
+
+ /**
+ * @return Flag indicating whether this request contains primary keys.
+ */
+ boolean hasPrimary() {
+ return isFlag(HAS_PRIMARY_FLAG_MASK);
+ }
+
+ /**
+ * @param val Flag indicating whether this request contains primary keys.
+ */
+ void hasPrimary(boolean val) {
+ setFlag(val, HAS_PRIMARY_FLAG_MASK);
+ }
+
+ /**
+ * @return Flag indicating whether this is fast-map update.
+ */
+ boolean fastMap() {
+ return isFlag(FAST_MAP_FLAG_MASK);
+ }
+
+ /**
+ * @return {@code True} if request sent from client node.
+ */
+ boolean clientRequest() {
+ return isFlag(CLIENT_REQ_FLAG_MASK);
+ }
+
+ /**
+ * @return Update version for fast-map request.
+ */
+ @Nullable public final GridCacheVersion updateVersion() {
+ return updateVer;
+ }
+
+ /**
+ * @param updateVer Update version for fast-map request.
+ */
+ final void updateVersion(GridCacheVersion updateVer) {
+ this.updateVer = updateVer;
}
/** {@inheritDoc} */
@@ -291,13 +351,6 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa
}
/**
- * @param val {@code True} if topology is locked on near node.
- */
- private void topologyLocked(boolean val) {
- setFlag(val, TOP_LOCKED_FLAG_MASK);
- }
-
- /**
* @return Return value flag.
*/
public final boolean returnValue() {
@@ -305,13 +358,6 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa
}
/**
- * @param val Return value flag.
- */
- public final void returnValue(boolean val) {
- setFlag(val, RET_VAL_FLAG_MASK);
- }
-
- /**
* @return Skip write-through to a persistent storage.
*/
public final boolean skipStore() {
@@ -319,13 +365,6 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa
}
/**
- * @param val Skip store flag.
- */
- public void skipStore(boolean val) {
- setFlag(val, SKIP_STORE_FLAG_MASK);
- }
-
- /**
* @return Keep binary flag.
*/
public final boolean keepBinary() {
@@ -333,13 +372,6 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa
}
/**
- * @param val Keep binary flag.
- */
- public void keepBinary(boolean val) {
- setFlag(val, KEEP_BINARY_FLAG_MASK);
- }
-
- /**
* Sets flag mask.
*
* @param flag Set or clear.
@@ -380,12 +412,14 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa
* @param conflictTtl Conflict TTL (optional).
* @param conflictExpireTime Conflict expire time (optional).
* @param conflictVer Conflict version (optional).
+ * @param primary If given key is primary on this mapping.
*/
abstract void addUpdateEntry(KeyCacheObject key,
@Nullable Object val,
long conflictTtl,
long conflictExpireTime,
- @Nullable GridCacheVersion conflictVer);
+ @Nullable GridCacheVersion conflictVer,
+ boolean primary);
/**
* @return Keys for this update request.
@@ -458,7 +492,7 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 10;
+ return 11;
}
/** {@inheritDoc} */
@@ -518,6 +552,12 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa
writer.incrementState();
+ case 10:
+ if (!writer.writeMessage("updateVer", updateVer))
+ return false;
+
+ writer.incrementState();
+
}
return true;
@@ -598,6 +638,14 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa
reader.incrementState();
+ case 10:
+ updateVer = reader.readMessage("updateVer");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
}
return reader.afterMessageRead(GridNearAtomicAbstractUpdateRequest.class);