You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/02/15 15:18:47 UTC
[1/2] ignite git commit: ignite-4705
Repository: ignite
Updated Branches:
refs/heads/ignite-4705 [created] 7287a9368
http://git-wip-us.apache.org/repos/asf/ignite/blob/7287a936/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
index 891a20c..8c3a364 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
@@ -20,7 +20,9 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashSet;
import java.util.Map;
+import java.util.Set;
import java.util.UUID;
import javax.cache.expiry.ExpiryPolicy;
import javax.cache.processor.EntryProcessor;
@@ -41,7 +43,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheTryPutFailedExceptio
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearAtomicCache;
-import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.typedef.CI1;
@@ -49,19 +50,14 @@ import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.lang.IgniteProductVersion;
import org.jetbrains.annotations.Nullable;
-import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.CLOCK;
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
/**
* DHT atomic cache near update future.
*/
public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpdateFuture {
- /** */
- private static final IgniteProductVersion SINGLE_UPDATE_REQUEST = IgniteProductVersion.fromString("1.7.4");
-
/** Keys */
private Object key;
@@ -69,9 +65,12 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
@SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"})
private Object val;
- /** Not null is operation is mapped to single node. */
+ /** Current request. */
private GridNearAtomicAbstractUpdateRequest req;
+ /** */
+ private Set<UUID> mapping;
+
/**
* @param cctx Cache context.
* @param cache Cache instance.
@@ -120,9 +119,9 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
}
/** {@inheritDoc} */
- @Override public GridCacheVersion version() {
+ @Override public Long id() {
synchronized (mux) {
- return futVer;
+ return futId;
}
}
@@ -138,7 +137,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
if (req != null && req.response() == null) {
res = new GridNearAtomicUpdateResponse(cctx.cacheId(),
nodeId,
- req.futureVersion(),
+ req.futureId(),
cctx.deploymentEnabled());
ClusterTopologyCheckedException e = new ClusterTopologyCheckedException("Primary node left grid " +
@@ -152,7 +151,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
if (res != null) {
if (msgLog.isDebugEnabled()) {
- msgLog.debug("Near update single fut, node left [futId=" + req.futureVersion() +
+ msgLog.debug("Near update single fut, node left [futId=" + req.futureId() +
", writeVer=" + req.updateVersion() +
", node=" + nodeId + ']');
}
@@ -183,7 +182,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
retval = Collections.emptyMap();
if (super.onDone(retval, err)) {
- GridCacheVersion futVer = onFutureDone();
+ Long futVer = onFutureDone();
if (futVer != null)
cctx.mvcc().removeAtomicFuture(futVer);
@@ -195,6 +194,33 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
}
/** {@inheritDoc} */
+ @Override public void onResult(UUID nodeId, GridNearAtomicDhtResponse res) {
+ GridCacheReturn opRes0 = null;
+
+ synchronized (mux) {
+ if (futId == null || futId != res.futureId())
+ return;
+
+ if (mapping == null)
+ mapping = new HashSet<>(res.mapping());
+
+ mapping.remove(nodeId);
+
+ if (opRes == null && res.hasResult())
+ opRes = res.result();
+
+ if (mapping.isEmpty() && opRes != null) {
+ opRes0 = opRes;
+
+ futId = null;
+ }
+ }
+
+ if (opRes0 != null)
+ onDone(opRes0);
+ }
+
+ /** {@inheritDoc} */
@SuppressWarnings({"unchecked", "ThrowableResultOfMethodCallIgnored"})
@Override public void onResult(UUID nodeId, GridNearAtomicUpdateResponse res, boolean nodeErr) {
GridNearAtomicAbstractUpdateRequest req;
@@ -207,7 +233,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
GridFutureAdapter<?> fut0 = null;
synchronized (mux) {
- if (!res.futureVersion().equals(futVer))
+ if (futId == null || futId != res.futureId())
return;
if (!this.req.nodeId().equals(nodeId))
@@ -238,24 +264,22 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
}
}
else {
- if (!req.fastMap() || req.hasPrimary()) {
- GridCacheReturn ret = res.returnValue();
-
- if (op == TRANSFORM) {
- if (ret != null) {
- assert ret.value() == null || ret.value() instanceof Map : ret.value();
-
- if (ret.value() != null) {
- if (opRes != null)
- opRes.mergeEntryProcessResults(ret);
- else
- opRes = ret;
- }
+ GridCacheReturn ret = res.returnValue();
+
+ if (op == TRANSFORM) {
+ if (ret != null) {
+ assert ret.value() == null || ret.value() instanceof Map : ret.value();
+
+ if (ret.value() != null) {
+ if (opRes != null)
+ opRes.mergeEntryProcessResults(ret);
+ else
+ opRes = ret;
}
}
- else
- opRes = ret;
}
+ else
+ opRes = ret;
}
if (remapKey) {
@@ -282,7 +306,6 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
new AffinityTopologyVersion(cause.topologyVersion().topologyVersion() + 1);
err = null;
- updVer = null;
}
}
}
@@ -296,9 +319,9 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
topCompleteFut = null;
- cctx.mvcc().removeAtomicFuture(futVer);
+ cctx.mvcc().removeAtomicFuture(futId);
- futVer = null;
+ futId = null;
topVer = AffinityTopologyVersion.ZERO;
}
}
@@ -379,11 +402,12 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
/** {@inheritDoc} */
@Override protected void mapOnTopology() {
+ // TODO IGNITE-4705: primary should block topology change, so it seem read lock is not needed.
cache.topology().readLock();
AffinityTopologyVersion topVer;
- GridCacheVersion futVer;
+ Long futId;
try {
if (cache.topology().stopping()) {
@@ -406,7 +430,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
topVer = fut.topologyVersion();
- futVer = addAtomicFuture(topVer);
+ futId = addAtomicFuture(topVer);
}
else {
if (waitTopFut) {
@@ -432,49 +456,22 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
cache.topology().readUnlock();
}
- if (futVer != null)
- map(topVer, futVer);
+ if (futId != null)
+ map(topVer, futId);
}
/** {@inheritDoc} */
- @Override protected void map(AffinityTopologyVersion topVer, GridCacheVersion futVer) {
- Collection<ClusterNode> topNodes = CU.affinityNodes(cctx, topVer);
-
- if (F.isEmpty(topNodes)) {
- onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all partition nodes " +
- "left the grid)."));
-
- return;
- }
-
- GridCacheVersion updVer;
-
- // Assign version on near node in CLOCK ordering mode even if fastMap is false.
- if (cctx.config().getAtomicWriteOrderMode() == CLOCK) {
- updVer = this.updVer;
-
- if (updVer == null) {
- updVer = cctx.versions().next(topVer);
-
- if (log.isDebugEnabled())
- log.debug("Assigned fast-map version for update on near node: " + updVer);
- }
- }
- else
- updVer = null;
-
+ @Override protected void map(AffinityTopologyVersion topVer, Long futId) {
Exception err = null;
GridNearAtomicAbstractUpdateRequest singleReq0 = null;
try {
- singleReq0 = mapSingleUpdate(topVer, futVer, updVer);
+ singleReq0 = mapSingleUpdate(topVer, futId);
synchronized (mux) {
- assert this.futVer == futVer || (this.isDone() && this.error() != null);
+ assert this.futId.equals(futId) || (this.isDone() && this.error() != null);
assert this.topVer == topVer;
- this.updVer = updVer;
-
resCnt = 0;
req = singleReq0;
@@ -495,10 +492,10 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
}
/**
- * @return Future version.
+ * @return Future ID.
*/
- private GridCacheVersion onFutureDone() {
- GridCacheVersion ver0;
+ private Long onFutureDone() {
+ Long id0;
GridFutureAdapter<Void> fut0;
@@ -507,27 +504,25 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
topCompleteFut = null;
- ver0 = futVer;
+ id0 = futId;
- futVer = null;
+ futId = null;
}
if (fut0 != null)
fut0.onDone();
- return ver0;
+ return id0;
}
/**
* @param topVer Topology version.
- * @param futVer Future version.
- * @param updVer Update version.
+ * @param futId Future ID.
* @return Request.
* @throws Exception If failed.
*/
- private GridNearAtomicAbstractUpdateRequest mapSingleUpdate(AffinityTopologyVersion topVer,
- GridCacheVersion futVer,
- @Nullable GridCacheVersion updVer) throws Exception {
+ private GridNearAtomicAbstractUpdateRequest mapSingleUpdate(AffinityTopologyVersion topVer, long futId)
+ throws Exception {
if (key == null)
throw new NullPointerException("Null key.");
@@ -551,14 +546,14 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
GridNearAtomicAbstractUpdateRequest req;
- if (canUseSingleRequest(primary)) {
+ if (canUseSingleRequest()) {
if (op == TRANSFORM) {
req = new GridNearAtomicSingleUpdateInvokeRequest(
cctx.cacheId(),
primary.id(),
- futVer,
+ futId,
false,
- updVer,
+ null,
topVer,
topLocked,
syncMode,
@@ -577,9 +572,9 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
req = new GridNearAtomicSingleUpdateRequest(
cctx.cacheId(),
primary.id(),
- futVer,
+ futId,
false,
- updVer,
+ null,
topVer,
topLocked,
syncMode,
@@ -596,9 +591,9 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
req = new GridNearAtomicSingleUpdateFilterRequest(
cctx.cacheId(),
primary.id(),
- futVer,
+ futId,
false,
- updVer,
+ null,
topVer,
topLocked,
syncMode,
@@ -618,9 +613,9 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
req = new GridNearAtomicFullUpdateRequest(
cctx.cacheId(),
primary.id(),
- futVer,
+ futId,
false,
- updVer,
+ null,
topVer,
topLocked,
syncMode,
@@ -649,11 +644,10 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
}
/**
- * @param node Target node
* @return {@code True} can use 'single' update requests.
*/
- private boolean canUseSingleRequest(ClusterNode node) {
- return expiryPlc == null && node != null && node.version().compareToIgnoreTimestamp(SINGLE_UPDATE_REQUEST) >= 0;
+ private boolean canUseSingleRequest() {
+ return expiryPlc == null;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/7287a936/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateInvokeRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateInvokeRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateInvokeRequest.java
index 6582063..269443f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateInvokeRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateInvokeRequest.java
@@ -76,7 +76,7 @@ public class GridNearAtomicSingleUpdateInvokeRequest extends GridNearAtomicSingl
*
* @param cacheId Cache ID.
* @param nodeId Node ID.
- * @param futVer Future version.
+ * @param futId Future ID.
* @param fastMap Fast map scheme flag.
* @param updateVer Update version set if fast map is performed.
* @param topVer Topology version.
@@ -95,7 +95,7 @@ public class GridNearAtomicSingleUpdateInvokeRequest extends GridNearAtomicSingl
GridNearAtomicSingleUpdateInvokeRequest(
int cacheId,
UUID nodeId,
- GridCacheVersion futVer,
+ long futId,
boolean fastMap,
@Nullable GridCacheVersion updateVer,
@NotNull AffinityTopologyVersion topVer,
@@ -114,7 +114,7 @@ public class GridNearAtomicSingleUpdateInvokeRequest extends GridNearAtomicSingl
super(
cacheId,
nodeId,
- futVer,
+ futId,
fastMap,
updateVer,
topVer,
http://git-wip-us.apache.org/repos/asf/ignite/blob/7287a936/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
index c3e9fbe..0045228 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
@@ -72,7 +72,7 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractSin
*
* @param cacheId Cache ID.
* @param nodeId Node ID.
- * @param futVer Future version.
+ * @param futId Future ID.
* @param fastMap Fast map scheme flag.
* @param updateVer Update version set if fast map is performed.
* @param topVer Topology version.
@@ -90,7 +90,7 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractSin
GridNearAtomicSingleUpdateRequest(
int cacheId,
UUID nodeId,
- GridCacheVersion futVer,
+ long futId,
boolean fastMap,
@Nullable GridCacheVersion updateVer,
@NotNull AffinityTopologyVersion topVer,
@@ -108,7 +108,7 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractSin
super(
cacheId,
nodeId,
- futVer,
+ futId,
fastMap,
updateVer,
topVer,
http://git-wip-us.apache.org/repos/asf/ignite/blob/7287a936/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index 2315a18..c075f09 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -154,9 +154,9 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
}
/** {@inheritDoc} */
- @Override public GridCacheVersion version() {
+ @Override public Long id() {
synchronized (mux) {
- return futVer;
+ return futId;
}
}
@@ -175,7 +175,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
if (req != null && req.response() == null) {
res = new GridNearAtomicUpdateResponse(cctx.cacheId(),
nodeId,
- req.futureVersion(),
+ req.futureId(),
cctx.deploymentEnabled());
ClusterTopologyCheckedException e = new ClusterTopologyCheckedException("Primary node left grid " +
@@ -189,7 +189,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
if (res != null) {
if (msgLog.isDebugEnabled()) {
- msgLog.debug("Near update fut, node left [futId=" + req.futureVersion() +
+ msgLog.debug("Near update fut, node left [futId=" + req.futureId() +
", writeVer=" + req.updateVersion() +
", node=" + nodeId + ']');
}
@@ -244,10 +244,10 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
retval = Collections.emptyMap();
if (super.onDone(retval, err)) {
- GridCacheVersion futVer = onFutureDone();
+ Long futId = onFutureDone();
- if (futVer != null)
- cctx.mvcc().removeAtomicFuture(futVer);
+ if (futId != null)
+ cctx.mvcc().removeAtomicFuture(futId);
return true;
}
@@ -256,6 +256,11 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
}
/** {@inheritDoc} */
+ @Override public void onResult(UUID nodeId, GridNearAtomicDhtResponse res) {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
@SuppressWarnings({"unchecked", "ThrowableResultOfMethodCallIgnored"})
@Override public void onResult(UUID nodeId, GridNearAtomicUpdateResponse res, boolean nodeErr) {
GridNearAtomicFullUpdateRequest req;
@@ -270,7 +275,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
GridFutureAdapter<?> fut0 = null;
synchronized (mux) {
- if (!res.futureVersion().equals(futVer))
+ if (futId == null || futId != res.futureId())
return;
if (singleReq != null) {
@@ -375,8 +380,6 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
for (Object key : failedKeys)
remapKeys.add(cctx.toCacheKeyObject(key));
-
- updVer = null;
}
}
}
@@ -390,9 +393,9 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
topCompleteFut = null;
- cctx.mvcc().removeAtomicFuture(futVer);
+ cctx.mvcc().removeAtomicFuture(futId);
- futVer = null;
+ futId = null;
topVer = AffinityTopologyVersion.ZERO;
}
}
@@ -492,7 +495,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
AffinityTopologyVersion topVer;
- GridCacheVersion futVer;
+ Long futId;
try {
if (cache.topology().stopping()) {
@@ -515,7 +518,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
topVer = fut.topologyVersion();
- futVer = addAtomicFuture(topVer);
+ futId = addAtomicFuture(topVer);
}
else {
if (waitTopFut) {
@@ -541,8 +544,8 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
cache.topology().readUnlock();
}
- if (futVer != null)
- map(topVer, futVer, remapKeys);
+ if (futId != null)
+ map(topVer, futId, remapKeys);
}
/**
@@ -568,14 +571,14 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
cctx.io().send(req.nodeId(), req, cctx.ioPolicy());
if (msgLog.isDebugEnabled()) {
- msgLog.debug("Near update fut, sent request [futId=" + req.futureVersion() +
+ msgLog.debug("Near update fut, sent request [futId=" + req.futureId() +
", writeVer=" + req.updateVersion() +
", node=" + req.nodeId() + ']');
}
}
catch (IgniteCheckedException e) {
if (msgLog.isDebugEnabled()) {
- msgLog.debug("Near update fut, failed to send request [futId=" + req.futureVersion() +
+ msgLog.debug("Near update fut, failed to send request [futId=" + req.futureId() +
", writeVer=" + req.updateVersion() +
", node=" + req.nodeId() +
", err=" + e + ']');
@@ -600,17 +603,17 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
}
/** {@inheritDoc} */
- @Override protected void map(AffinityTopologyVersion topVer, GridCacheVersion futVer) {
- map(topVer, futVer, null);
+ @Override protected void map(AffinityTopologyVersion topVer, Long futId) {
+ map(topVer, futId, null);
}
/**
* @param topVer Topology version.
- * @param futVer Future ID.
+ * @param futId Future ID.
* @param remapKeys Keys to remap.
*/
void map(AffinityTopologyVersion topVer,
- GridCacheVersion futVer,
+ Long futId,
@Nullable Collection<KeyCacheObject> remapKeys) {
Collection<ClusterNode> topNodes = CU.affinityNodes(cctx, topVer);
@@ -625,14 +628,15 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
// Assign version on near node in CLOCK ordering mode even if fastMap is false.
if (cctx.config().getAtomicWriteOrderMode() == CLOCK) {
- updVer = this.updVer;
-
- if (updVer == null) {
- updVer = cctx.versions().next(topVer);
-
- if (log.isDebugEnabled())
- log.debug("Assigned fast-map version for update on near node: " + updVer);
- }
+// TODO IGNITE-4705 (get rid of updVer everywhere).
+// updVer = this.updVer;
+//
+// if (updVer == null) {
+// updVer = cctx.versions().next(topVer);
+//
+// if (log.isDebugEnabled())
+// log.debug("Assigned fast-map version for update on near node: " + updVer);
+// }
}
else
updVer = null;
@@ -647,13 +651,13 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
if (size == 1 && !fastMap) {
assert remapKeys == null || remapKeys.size() == 1;
- singleReq0 = mapSingleUpdate(topVer, futVer, updVer);
+ singleReq0 = mapSingleUpdate(topVer, futId, null);
}
else {
Map<UUID, GridNearAtomicFullUpdateRequest> pendingMappings = mapUpdate(topNodes,
topVer,
- futVer,
- updVer,
+ futId,
+ null,
remapKeys);
if (pendingMappings.size() == 1)
@@ -675,10 +679,10 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
}
synchronized (mux) {
- assert this.futVer == futVer || (this.isDone() && this.error() != null);
+ assert this.futId.equals(futId) || (this.isDone() && this.error() != null);
assert this.topVer == topVer;
- this.updVer = updVer;
+// this.updVer = updVer;
resCnt = 0;
@@ -714,8 +718,8 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
/**
* @return Future version.
*/
- private GridCacheVersion onFutureDone() {
- GridCacheVersion ver0;
+ private Long onFutureDone() {
+ Long id0;
GridFutureAdapter<Void> fut0;
@@ -724,21 +728,21 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
topCompleteFut = null;
- ver0 = futVer;
+ id0 = futId;
- futVer = null;
+ futId = null;
}
if (fut0 != null)
fut0.onDone();
- return ver0;
+ return id0;
}
/**
* @param topNodes Cache nodes.
* @param topVer Topology version.
- * @param futVer Future version.
+ * @param futId Future ID.
* @param updVer Update version.
* @param remapKeys Keys to remap.
* @return Mapping.
@@ -747,7 +751,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
@SuppressWarnings("ForLoopReplaceableByForEach")
private Map<UUID, GridNearAtomicFullUpdateRequest> mapUpdate(Collection<ClusterNode> topNodes,
AffinityTopologyVersion topVer,
- GridCacheVersion futVer,
+ Long futId,
@Nullable GridCacheVersion updVer,
@Nullable Collection<KeyCacheObject> remapKeys) throws Exception {
Iterator<?> it = null;
@@ -843,7 +847,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
mapped = new GridNearAtomicFullUpdateRequest(
cctx.cacheId(),
nodeId,
- futVer,
+ futId,
fastMap,
updVer,
topVer,
@@ -876,13 +880,13 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
/**
* @param topVer Topology version.
- * @param futVer Future version.
+ * @param futId Future ID.
* @param updVer Update version.
* @return Request.
* @throws Exception If failed.
*/
private GridNearAtomicFullUpdateRequest mapSingleUpdate(AffinityTopologyVersion topVer,
- GridCacheVersion futVer,
+ Long futId,
@Nullable GridCacheVersion updVer) throws Exception {
Object key = F.first(keys);
@@ -945,7 +949,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
GridNearAtomicFullUpdateRequest req = new GridNearAtomicFullUpdateRequest(
cctx.cacheId(),
primary.id(),
- futVer,
+ futId,
fastMap,
updVer,
topVer,
http://git-wip-us.apache.org/repos/asf/ignite/blob/7287a936/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
index 2e38733..b089193 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
@@ -59,8 +59,8 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
@GridDirectTransient
private UUID nodeId;
- /** Future version. */
- private GridCacheVersion futVer;
+ /** Future ID. */
+ private long futId;
/** Update error. */
@GridDirectTransient
@@ -115,15 +115,13 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
/**
* @param cacheId Cache ID.
* @param nodeId Node ID this reply should be sent to.
- * @param futVer Future version.
+ * @param futId Future ID.
* @param addDepInfo Deployment info flag.
*/
- public GridNearAtomicUpdateResponse(int cacheId, UUID nodeId, GridCacheVersion futVer, boolean addDepInfo) {
- assert futVer != null;
-
+ public GridNearAtomicUpdateResponse(int cacheId, UUID nodeId, long futId, boolean addDepInfo) {
this.cacheId = cacheId;
this.nodeId = nodeId;
- this.futVer = futVer;
+ this.futId = futId;
this.addDepInfo = addDepInfo;
}
@@ -147,10 +145,10 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
}
/**
- * @return Future version.
+ * @return Future ID.
*/
- public GridCacheVersion futureVersion() {
- return futVer;
+ public long futureId() {
+ return futId;
}
/**
@@ -468,7 +466,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
writer.incrementState();
case 5:
- if (!writer.writeMessage("futVer", futVer))
+ if (!writer.writeLong("futId", futId))
return false;
writer.incrementState();
@@ -554,7 +552,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
reader.incrementState();
case 5:
- futVer = reader.readMessage("futVer");
+ futId = reader.readLong("futId");
if (!reader.isLastRead())
return false;
http://git-wip-us.apache.org/repos/asf/ignite/blob/7287a936/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index a1a18fe..5706536 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -264,7 +264,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
txHnd = new IgniteTxHandler(cctx);
- deferredAckMessageSender = new GridDeferredAckMessageSender(cctx.time(), cctx.kernalContext().closure()) {
+ deferredAckMessageSender = new GridDeferredAckMessageSender<GridCacheVersion>(cctx.time(), cctx.kernalContext().closure()) {
@Override public int getTimeout() {
return DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_TIMEOUT;
}
[2/2] ignite git commit: ignite-4705
Posted by sb...@apache.org.
ignite-4705
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7287a936
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7287a936
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7287a936
Branch: refs/heads/ignite-4705
Commit: 7287a93682d203f6d5043b681ef40968662d3f6c
Parents: 85b08c5
Author: sboikov <sb...@gridgain.com>
Authored: Wed Feb 15 16:47:54 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Feb 15 18:18:37 2017 +0300
----------------------------------------------------------------------
.../ignite/codegen/MessageCodeGenerator.java | 27 ++-
.../communication/GridIoMessageFactory.java | 6 +
.../processors/cache/GridCacheAtomicFuture.java | 4 +-
.../processors/cache/GridCacheIoManager.java | 22 +-
.../processors/cache/GridCacheMvccManager.java | 40 ++--
.../cache/GridDeferredAckMessageSender.java | 10 +-
.../GridDhtAtomicAbstractUpdateFuture.java | 28 +--
.../GridDhtAtomicAbstractUpdateRequest.java | 2 +-
.../dht/atomic/GridDhtAtomicCache.java | 94 ++++----
.../GridDhtAtomicDeferredUpdateResponse.java | 20 +-
.../atomic/GridDhtAtomicSingleUpdateFuture.java | 6 +-
.../GridDhtAtomicSingleUpdateRequest.java | 16 +-
.../dht/atomic/GridDhtAtomicUpdateFuture.java | 4 +-
.../dht/atomic/GridDhtAtomicUpdateRequest.java | 18 +-
.../dht/atomic/GridDhtAtomicUpdateResponse.java | 17 +-
...idNearAtomicAbstractSingleUpdateRequest.java | 18 +-
.../GridNearAtomicAbstractUpdateFuture.java | 37 ++--
.../GridNearAtomicAbstractUpdateRequest.java | 2 +-
.../dht/atomic/GridNearAtomicDhtResponse.java | 222 +++++++++++++++++++
.../atomic/GridNearAtomicFullUpdateRequest.java | 18 +-
...GridNearAtomicSingleUpdateFilterRequest.java | 6 +-
.../GridNearAtomicSingleUpdateFuture.java | 170 +++++++-------
...GridNearAtomicSingleUpdateInvokeRequest.java | 6 +-
.../GridNearAtomicSingleUpdateRequest.java | 6 +-
.../dht/atomic/GridNearAtomicUpdateFuture.java | 96 ++++----
.../atomic/GridNearAtomicUpdateResponse.java | 22 +-
.../cache/transactions/IgniteTxManager.java | 2 +-
27 files changed, 596 insertions(+), 323 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/7287a936/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
----------------------------------------------------------------------
diff --git a/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java b/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
index 5a26187..16b1e01 100644
--- a/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
+++ b/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
@@ -44,6 +44,19 @@ import org.apache.ignite.internal.GridDirectCollection;
import org.apache.ignite.internal.GridDirectMap;
import org.apache.ignite.internal.GridDirectTransient;
import org.apache.ignite.internal.IgniteCodeGeneratingFail;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateRequest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicDeferredUpdateResponse;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicSingleUpdateRequest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateRequest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateResponse;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractSingleUpdateRequest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractUpdateRequest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicDhtResponse;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicFullUpdateRequest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateFilterRequest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateInvokeRequest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateRequest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateResponse;
import org.apache.ignite.internal.util.typedef.internal.SB;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteUuid;
@@ -168,7 +181,19 @@ public class MessageCodeGenerator {
// gen.generateAll(true);
-// gen.generateAndWrite(GridNearAtomicUpdateRequest.class);
+// gen.generateAndWrite(GridNearAtomicDhtResponse.class);
+// gen.generateAndWrite(GridNearAtomicAbstractUpdateRequest.class);
+// gen.generateAndWrite(GridDhtAtomicAbstractUpdateRequest.class);
+// gen.generateAndWrite(GridNearAtomicAbstractSingleUpdateRequest.class);
+// gen.generateAndWrite(GridDhtAtomicDeferredUpdateResponse.class);
+// gen.generateAndWrite(GridDhtAtomicSingleUpdateRequest.class);
+// gen.generateAndWrite(GridDhtAtomicUpdateRequest.class);
+// gen.generateAndWrite(GridDhtAtomicUpdateResponse.class);
+// gen.generateAndWrite(GridNearAtomicFullUpdateRequest.class);
+// gen.generateAndWrite(GridNearAtomicSingleUpdateFilterRequest.class);
+// gen.generateAndWrite(GridNearAtomicSingleUpdateInvokeRequest.class);
+// gen.generateAndWrite(GridNearAtomicSingleUpdateRequest.class);
+// gen.generateAndWrite(GridNearAtomicUpdateResponse.class);
// gen.generateAndWrite(GridMessageCollection.class);
// gen.generateAndWrite(DataStreamerEntry.class);
http://git-wip-us.apache.org/repos/asf/ignite/blob/7287a936/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 6f95400..769a615 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -70,6 +70,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDh
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicSingleUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateResponse;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicDhtResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicFullUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateFilterRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateInvokeRequest;
@@ -173,6 +174,11 @@ public class GridIoMessageFactory implements MessageFactory {
Message msg = null;
switch (type) {
+ case -45:
+ msg = new GridNearAtomicDhtResponse();
+
+ break;
+
case -44:
msg = new TcpCommunicationSpi.HandshakeMessage2();
http://git-wip-us.apache.org/repos/asf/ignite/blob/7287a936/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java
index 3e11d50..565f11e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java
@@ -26,9 +26,9 @@ import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
*/
public interface GridCacheAtomicFuture<R> extends GridCacheFuture<R> {
/**
- * @return Future version.
+ * @return Future ID.
*/
- public GridCacheVersion version();
+ public Long id();
/**
* Gets future that will be completed when it is safe when update is finished on the given version of topology.
http://git-wip-us.apache.org/repos/asf/ignite/blob/7287a936/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index 924ce79..f9952b3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -467,15 +467,15 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
* @param cacheMsg Cache message.
* @return Atomic future ID if applicable for message.
*/
- @Nullable private GridCacheVersion atomicFututeId(GridCacheMessage cacheMsg) {
+ @Nullable private Long atomicFututeId(GridCacheMessage cacheMsg) {
if (cacheMsg instanceof GridNearAtomicAbstractUpdateRequest)
- return ((GridNearAtomicAbstractUpdateRequest)cacheMsg).futureVersion();
+ return ((GridNearAtomicAbstractUpdateRequest)cacheMsg).futureId();
else if (cacheMsg instanceof GridNearAtomicUpdateResponse)
- return ((GridNearAtomicUpdateResponse) cacheMsg).futureVersion();
+ return ((GridNearAtomicUpdateResponse) cacheMsg).futureId();
else if (cacheMsg instanceof GridDhtAtomicAbstractUpdateRequest)
- return ((GridDhtAtomicAbstractUpdateRequest)cacheMsg).futureVersion();
+ return ((GridDhtAtomicAbstractUpdateRequest)cacheMsg).futureId();
else if (cacheMsg instanceof GridDhtAtomicUpdateResponse)
- return ((GridDhtAtomicUpdateResponse) cacheMsg).futureVersion();
+ return ((GridDhtAtomicUpdateResponse) cacheMsg).futureId();
return null;
}
@@ -557,7 +557,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
GridDhtAtomicUpdateResponse res = new GridDhtAtomicUpdateResponse(
ctx.cacheId(),
- req.futureVersion(),
+ req.futureId(),
ctx.deploymentEnabled());
res.onError(req.classError());
@@ -573,7 +573,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(
ctx.cacheId(),
nodeId,
- req.futureVersion(),
+ req.futureId(),
ctx.deploymentEnabled());
res.error(req.classError());
@@ -751,7 +751,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(
ctx.cacheId(),
nodeId,
- req.futureVersion(),
+ req.futureId(),
ctx.deploymentEnabled());
res.error(req.classError());
@@ -767,7 +767,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(
ctx.cacheId(),
nodeId,
- req.futureVersion(),
+ req.futureId(),
ctx.deploymentEnabled());
res.error(req.classError());
@@ -783,7 +783,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(
ctx.cacheId(),
nodeId,
- req.futureVersion(),
+ req.futureId(),
ctx.deploymentEnabled());
res.error(req.classError());
@@ -798,7 +798,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
GridDhtAtomicUpdateResponse res = new GridDhtAtomicUpdateResponse(
ctx.cacheId(),
- req.futureVersion(),
+ req.futureId(),
ctx.deploymentEnabled());
res.onError(req.classError());
http://git-wip-us.apache.org/repos/asf/ignite/blob/7287a936/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
index 0d0e9ee..e8a5c8d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
@@ -29,6 +29,7 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.events.DiscoveryEvent;
@@ -106,8 +107,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
private final ConcurrentMap<GridCacheVersion, Collection<GridCacheMvccFuture<?>>> mvccFuts = newMap();
/** Pending atomic futures. */
- private final ConcurrentMap<GridCacheVersion, GridCacheAtomicFuture<?>> atomicFuts =
- new ConcurrentHashMap8<>();
+ private final ConcurrentMap<Long, GridCacheAtomicFuture<?>> atomicFuts = new ConcurrentHashMap8<>();
/** Pending data streamer futures. */
private final GridConcurrentHashSet<DataStreamerFuture> dataStreamerFuts = new GridConcurrentHashSet<>();
@@ -135,6 +135,9 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
/** */
private volatile boolean stopping;
+ /** */
+ private final AtomicLong atomicFutId = new AtomicLong();
+
/** Lock callback. */
@GridToStringExclude
private final GridCacheMvccCallback cb = new GridCacheMvccCallback() {
@@ -253,10 +256,10 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
cacheFut.onNodeLeft(discoEvt.eventNode().id());
if (cacheFut.isCancelled() || cacheFut.isDone()) {
- GridCacheVersion futVer = cacheFut.version();
+ Long futId = cacheFut.id();
- if (futVer != null)
- atomicFuts.remove(futVer, cacheFut);
+ if (futId != null)
+ atomicFuts.remove(futId, cacheFut);
}
}
}
@@ -423,14 +426,21 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
}
/**
- * @param futVer Future ID.
+ * @return ID for atomic cache update future.
+ */
+ public long atomicFutureId() {
+ return atomicFutId.incrementAndGet();
+ }
+
+ /**
+ * @param futId Future ID.
* @param fut Future.
* @return {@code False} if future was forcibly completed with error.
*/
- public boolean addAtomicFuture(GridCacheVersion futVer, GridCacheAtomicFuture<?> fut) {
- IgniteInternalFuture<?> old = atomicFuts.put(futVer, fut);
+ public boolean addAtomicFuture(Long futId, GridCacheAtomicFuture<?> fut) {
+ IgniteInternalFuture<?> old = atomicFuts.put(futId, fut);
- assert old == null : "Old future is not null [futVer=" + futVer + ", fut=" + fut + ", old=" + old + ']';
+ assert old == null : "Old future is not null [futId=" + futId + ", fut=" + fut + ", old=" + old + ']';
return onFutureAdded(fut);
}
@@ -452,19 +462,19 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
/**
* Gets future by given future ID.
*
- * @param futVer Future ID.
+ * @param futId Future ID.
* @return Future.
*/
- @Nullable public IgniteInternalFuture<?> atomicFuture(GridCacheVersion futVer) {
- return atomicFuts.get(futVer);
+ @Nullable public IgniteInternalFuture<?> atomicFuture(Long futId) {
+ return atomicFuts.get(futId);
}
/**
- * @param futVer Future ID.
+ * @param futId Future ID.
* @return Removed future.
*/
- @Nullable public IgniteInternalFuture<?> removeAtomicFuture(GridCacheVersion futVer) {
- return atomicFuts.remove(futVer);
+ @Nullable public IgniteInternalFuture<?> removeAtomicFuture(Long futId) {
+ return atomicFuts.remove(futId);
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/7287a936/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridDeferredAckMessageSender.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridDeferredAckMessageSender.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridDeferredAckMessageSender.java
index 7145dc2..8df883a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridDeferredAckMessageSender.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridDeferredAckMessageSender.java
@@ -33,7 +33,7 @@ import org.jsr166.ConcurrentLinkedDeque8;
/**
*
*/
-public abstract class GridDeferredAckMessageSender {
+public abstract class GridDeferredAckMessageSender<T> {
/** Deferred message buffers. */
private ConcurrentMap<UUID, DeferredAckMessageBuffer> deferredAckMsgBuffers = new ConcurrentHashMap8<>();
@@ -66,7 +66,7 @@ public abstract class GridDeferredAckMessageSender {
/**
*
*/
- public abstract void finish(UUID nodeId, ConcurrentLinkedDeque8<GridCacheVersion> vers);
+ public abstract void finish(UUID nodeId, ConcurrentLinkedDeque8<T> vers);
/**
*
@@ -80,7 +80,7 @@ public abstract class GridDeferredAckMessageSender {
* @param nodeId Node ID to send message to.
* @param ver Version to ack.
*/
- public void sendDeferredAckMessage(UUID nodeId, GridCacheVersion ver) {
+ public void sendDeferredAckMessage(UUID nodeId, T ver) {
while (true) {
DeferredAckMessageBuffer buf = deferredAckMsgBuffers.get(nodeId);
@@ -116,7 +116,7 @@ public abstract class GridDeferredAckMessageSender {
private AtomicBoolean guard = new AtomicBoolean(false);
/** Versions. */
- private ConcurrentLinkedDeque8<GridCacheVersion> vers = new ConcurrentLinkedDeque8<>();
+ private ConcurrentLinkedDeque8<T> vers = new ConcurrentLinkedDeque8<>();
/** Node ID. */
private final UUID nodeId;
@@ -172,7 +172,7 @@ public abstract class GridDeferredAckMessageSender {
* @param ver Version to send.
* @return {@code True} if request was handled, {@code false} if this buffer is filled and cannot be used.
*/
- public boolean add(GridCacheVersion ver) {
+ public boolean add(T ver) {
readLock().lock();
boolean snd = false;
http://git-wip-us.apache.org/repos/asf/ignite/blob/7287a936/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
index 1b175d0..10d1c4b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
@@ -74,7 +74,7 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
protected final GridCacheContext cctx;
/** Future version. */
- protected final GridCacheVersion futVer;
+ protected final long futId;
/** Completion callback. */
@GridToStringExclude
@@ -114,7 +114,7 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
GridNearAtomicUpdateResponse updateRes) {
this.cctx = cctx;
- futVer = cctx.versions().next(updateReq.topologyVersion());
+ futId = cctx.mvcc().atomicFutureId();
this.updateReq = updateReq;
this.completionCb = completionCb;
this.updateRes = updateRes;
@@ -191,7 +191,7 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
if (updateReq == null) {
updateReq = createRequest(
node,
- futVer,
+ futId,
writeVer,
syncMode,
topVer,
@@ -260,7 +260,7 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
updateReq = createRequest(
node,
- futVer,
+ futId,
writeVer,
syncMode,
topVer,
@@ -297,12 +297,12 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
/** {@inheritDoc} */
@Override public final IgniteUuid futureId() {
- return futVer.asGridUuid();
+ throw new UnsupportedOperationException();
}
/** {@inheritDoc} */
- @Override public final GridCacheVersion version() {
- return futVer;
+ @Override public final Long id() {
+ return futId;
}
/** {@inheritDoc} */
@@ -310,7 +310,7 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
boolean res = registerResponse(nodeId);
if (res && msgLog.isDebugEnabled()) {
- msgLog.debug("DTH update fut, node left [futId=" + futVer + ", writeVer=" + writeVer +
+ msgLog.debug("DTH update fut, node left [futId=" + futId + ", writeVer=" + writeVer +
", node=" + nodeId + ']');
}
@@ -358,20 +358,20 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
cctx.io().send(req.nodeId(), req, cctx.ioPolicy());
if (msgLog.isDebugEnabled()) {
- msgLog.debug("DTH update fut, sent request [futId=" + futVer +
+ msgLog.debug("DTH update fut, sent request [futId=" + futId +
", writeVer=" + writeVer + ", node=" + req.nodeId() + ']');
}
}
catch (ClusterTopologyCheckedException ignored) {
if (msgLog.isDebugEnabled()) {
- msgLog.debug("DTH update fut, failed to send request, node left [futId=" + futVer +
+ msgLog.debug("DTH update fut, failed to send request, node left [futId=" + futId +
", writeVer=" + writeVer + ", node=" + req.nodeId() + ']');
}
registerResponse(req.nodeId());
}
catch (IgniteCheckedException ignored) {
- U.error(msgLog, "Failed to send request [futId=" + futVer +
+ U.error(msgLog, "Failed to send request [futId=" + futId +
", writeVer=" + writeVer + ", node=" + req.nodeId() + ']');
registerResponse(req.nodeId());
@@ -401,7 +401,7 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
/**
* @param node Node.
- * @param futVer Future version.
+ * @param futId Future ID.
* @param writeVer Update version.
* @param syncMode Write synchronization mode.
* @param topVer Topology version.
@@ -412,7 +412,7 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
*/
protected abstract GridDhtAtomicAbstractUpdateRequest createRequest(
ClusterNode node,
- GridCacheVersion futVer,
+ long futId,
GridCacheVersion writeVer,
CacheWriteSynchronizationMode syncMode,
@NotNull AffinityTopologyVersion topVer,
@@ -438,7 +438,7 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
/** {@inheritDoc} */
@Override public final boolean onDone(@Nullable Void res, @Nullable Throwable err) {
if (super.onDone(res, err)) {
- cctx.mvcc().removeAtomicFuture(version());
+ cctx.mvcc().removeAtomicFuture(futId);
boolean suc = err == null;
http://git-wip-us.apache.org/repos/asf/ignite/blob/7287a936/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java
index f0bea07..7aa440d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java
@@ -168,7 +168,7 @@ public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheMessag
/**
* @return Version assigned on primary node.
*/
- public abstract GridCacheVersion futureVersion();
+ public abstract long futureId();
/**
* @return Write version.
http://git-wip-us.apache.org/repos/asf/ignite/blob/7287a936/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 1b6179e..87ac54b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -55,6 +55,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
import org.apache.ignite.internal.processors.cache.GridCacheMapEntry;
import org.apache.ignite.internal.processors.cache.GridCacheMapEntryFactory;
+import org.apache.ignite.internal.processors.cache.GridCacheMessage;
import org.apache.ignite.internal.processors.cache.GridCacheOperation;
import org.apache.ignite.internal.processors.cache.GridCacheReturn;
import org.apache.ignite.internal.processors.cache.GridCacheUpdateAtomicResult;
@@ -97,6 +98,7 @@ import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiInClosure;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgniteOutClosure;
@@ -241,7 +243,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
@Override public void start() throws IgniteCheckedException {
super.start();
- deferredUpdateMsgSnd = new GridDeferredAckMessageSender(ctx.time(), ctx.closures()) {
+ deferredUpdateMsgSnd = new GridDeferredAckMessageSender<Long>(ctx.time(), ctx.closures()) {
@Override public int getTimeout() {
return DEFERRED_UPDATE_RESPONSE_TIMEOUT;
}
@@ -250,7 +252,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
return DEFERRED_UPDATE_RESPONSE_BUFFER_SIZE;
}
- @Override public void finish(UUID nodeId, ConcurrentLinkedDeque8<GridCacheVersion> vers) {
+ @Override public void finish(UUID nodeId, ConcurrentLinkedDeque8<Long> vers) {
GridDhtAtomicDeferredUpdateResponse msg = new GridDhtAtomicDeferredUpdateResponse(ctx.cacheId(),
vers, ctx.deploymentEnabled());
@@ -261,7 +263,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
ctx.io().send(nodeId, msg, ctx.ioPolicy());
if (msgLog.isDebugEnabled()) {
- msgLog.debug("Sent deferred DHT update response [futIds=" + msg.futureVersions() +
+ msgLog.debug("Sent deferred DHT update response [futIds=" + msg.futureIds() +
", node=" + nodeId + ']');
}
}
@@ -272,18 +274,18 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
catch (IllegalStateException ignored) {
if (msgLog.isDebugEnabled()) {
msgLog.debug("Failed to send deferred DHT update response, node is stopping [" +
- "futIds=" + msg.futureVersions() + ", node=" + nodeId + ']');
+ "futIds=" + msg.futureIds() + ", node=" + nodeId + ']');
}
}
catch (ClusterTopologyCheckedException ignored) {
if (msgLog.isDebugEnabled()) {
msgLog.debug("Failed to send deferred DHT update response, node left [" +
- "futIds=" + msg.futureVersions() + ", node=" + nodeId + ']');
+ "futIds=" + msg.futureIds() + ", node=" + nodeId + ']');
}
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to send deferred DHT update response to remote node [" +
- "futIds=" + msg.futureVersions() + ", node=" + nodeId + ']', e);
+ "futIds=" + msg.futureIds() + ", node=" + nodeId + ']', e);
}
}
};
@@ -420,6 +422,17 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
}
});
+ ctx.io().addHandler(ctx.cacheId(), GridNearAtomicDhtResponse.class, new CI2<UUID, GridNearAtomicDhtResponse>() {
+ @Override public void apply(UUID uuid, GridNearAtomicDhtResponse msg) {
+
+ }
+
+ @Override public String toString() {
+ return "GridDhtAtomicDeferredUpdateResponse handler " +
+ "[msgIdx=" + GridNearAtomicDhtResponse.CACHE_MSG_IDX + ']';
+ }
+ });
+
if (near == null) {
ctx.io().addHandler(
ctx.cacheId(),
@@ -1343,8 +1356,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
CacheEntryPredicate[] filters = CU.filterArray(filter);
if (conflictPutVal == null &&
- conflictRmvVer == null &&
- !isFastMap(filters, op)) {
+ conflictRmvVer == null) {
return new GridNearAtomicSingleUpdateFuture(
ctx,
this,
@@ -1752,7 +1764,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
) {
GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(ctx.cacheId(),
nodeId,
- req.futureVersion(),
+ req.futureId(),
ctx.deploymentEnabled());
res.addFailedKeys(req.keys(), e);
@@ -1772,7 +1784,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
GridNearAtomicAbstractUpdateRequest req,
CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> completionCb
) {
- GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(ctx.cacheId(), nodeId, req.futureVersion(),
+ GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(ctx.cacheId(), nodeId, req.futureId(),
ctx.deploymentEnabled());
assert !req.returnValue() || (req.operation() == TRANSFORM || req.size() == 1);
@@ -1817,7 +1829,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
if (node == null) {
U.warn(msgLog, "Skip near update request, node originated update request left [" +
- "futId=" + req.futureVersion() + ", node=" + nodeId + ']');
+ "futId=" + req.futureId() + ", node=" + nodeId + ']');
return;
}
@@ -1834,7 +1846,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
res.nearVersion(ver);
if (msgLog.isDebugEnabled()) {
- msgLog.debug("Assigned update version [futId=" + req.futureVersion() +
+ msgLog.debug("Assigned update version [futId=" + req.futureId() +
", writeVer=" + ver + ']');
}
}
@@ -1903,7 +1915,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
req.cleanup(!node.isLocal());
if (dhtFut != null)
- ctx.mvcc().addAtomicFuture(dhtFut.version(), dhtFut);
+ ctx.mvcc().addAtomicFuture(dhtFut.id(), dhtFut);
}
else
// Should remap all keys.
@@ -3169,7 +3181,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
*/
private void processNearAtomicUpdateRequest(UUID nodeId, GridNearAtomicAbstractUpdateRequest req) {
if (msgLog.isDebugEnabled()) {
- msgLog.debug("Received near atomic update request [futId=" + req.futureVersion() +
+ msgLog.debug("Received near atomic update request [futId=" + req.futureId() +
", writeVer=" + req.updateVersion() +
", node=" + nodeId + ']');
}
@@ -3186,19 +3198,19 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
@SuppressWarnings("unchecked")
private void processNearAtomicUpdateResponse(UUID nodeId, GridNearAtomicUpdateResponse res) {
if (msgLog.isDebugEnabled())
- msgLog.debug("Received near atomic update response [futId" + res.futureVersion() + ", node=" + nodeId + ']');
+ msgLog.debug("Received near atomic update response [futId" + res.futureId() + ", node=" + nodeId + ']');
res.nodeId(ctx.localNodeId());
GridNearAtomicAbstractUpdateFuture fut =
- (GridNearAtomicAbstractUpdateFuture)ctx.mvcc().atomicFuture(res.futureVersion());
+ (GridNearAtomicAbstractUpdateFuture)ctx.mvcc().atomicFuture(res.futureId());
if (fut != null)
fut.onResult(nodeId, res, false);
else
U.warn(msgLog, "Failed to find near update future for update response (will ignore) " +
- "[futId" + res.futureVersion() + ", node=" + nodeId + ", res=" + res + ']');
+ "[futId=" + res.futureId() + ", node=" + nodeId + ", res=" + res + ']');
}
/**
@@ -3207,14 +3219,14 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
*/
private void processDhtAtomicUpdateRequest(UUID nodeId, GridDhtAtomicAbstractUpdateRequest req) {
if (msgLog.isDebugEnabled()) {
- msgLog.debug("Received DHT atomic update request [futId=" + req.futureVersion() +
+ msgLog.debug("Received DHT atomic update request [futId=" + req.futureId() +
", writeVer=" + req.writeVersion() + ", node=" + nodeId + ']');
}
GridCacheVersion ver = req.writeVersion();
// Always send update reply.
- GridDhtAtomicUpdateResponse res = new GridDhtAtomicUpdateResponse(ctx.cacheId(), req.futureVersion(),
+ GridDhtAtomicUpdateResponse res = new GridDhtAtomicUpdateResponse(ctx.cacheId(), req.futureId(),
ctx.deploymentEnabled());
Boolean replicate = ctx.isDrEnabled();
@@ -3311,36 +3323,44 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
ctx.io().send(nodeId, res, ctx.ioPolicy());
if (msgLog.isDebugEnabled()) {
- msgLog.debug("Sent DHT atomic update response [futId=" + req.futureVersion() +
+ msgLog.debug("Sent DHT atomic update response [futId=" + req.futureId() +
", writeVer=" + req.writeVersion() + ", node=" + nodeId + ']');
}
}
else {
if (msgLog.isDebugEnabled()) {
- msgLog.debug("Will send deferred DHT atomic update response [futId=" + req.futureVersion() +
+ msgLog.debug("Will send deferred DHT atomic update response [futId=" + req.futureId() +
", writeVer=" + req.writeVersion() + ", node=" + nodeId + ']');
}
// No failed keys and sync mode is not FULL_SYNC, thus sending deferred response.
- sendDeferredUpdateResponse(nodeId, req.futureVersion());
+ sendDeferredUpdateResponse(nodeId, req.futureId());
}
}
catch (ClusterTopologyCheckedException ignored) {
- U.warn(msgLog, "Failed to send DHT atomic update response, node left [futId=" + req.futureVersion() +
+ U.warn(msgLog, "Failed to send DHT atomic update response, node left [futId=" + req.futureId() +
", node=" + req.nodeId() + ']');
}
catch (IgniteCheckedException e) {
- U.error(msgLog, "Failed to send DHT atomic update response [futId=" + req.futureVersion() +
+ U.error(msgLog, "Failed to send DHT atomic update response [futId=" + req.futureId() +
", node=" + nodeId + ", res=" + res + ']', e);
}
}
/**
* @param nodeId Node ID to send message to.
- * @param ver Version to ack.
+ * @param futId ID to ack.
*/
- private void sendDeferredUpdateResponse(UUID nodeId, GridCacheVersion ver) {
- deferredUpdateMsgSnd.sendDeferredAckMessage(nodeId, ver);
+ private void sendDeferredUpdateResponse(UUID nodeId, long futId) {
+ deferredUpdateMsgSnd.sendDeferredAckMessage(nodeId, futId);
+ }
+
+ /**
+ * @param nodeId Node ID.
+ * @param res Response.
+ */
+ private void processNearAtomicDhtResponse(UUID nodeId, GridNearAtomicDhtResponse res) {
+
}
/**
@@ -3349,18 +3369,18 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
*/
@SuppressWarnings("unchecked")
private void processDhtAtomicUpdateResponse(UUID nodeId, GridDhtAtomicUpdateResponse res) {
- GridDhtAtomicAbstractUpdateFuture updateFut = (GridDhtAtomicAbstractUpdateFuture)ctx.mvcc().atomicFuture(res.futureVersion());
+ GridDhtAtomicAbstractUpdateFuture updateFut = (GridDhtAtomicAbstractUpdateFuture)ctx.mvcc().atomicFuture(res.futureId());
if (updateFut != null) {
if (msgLog.isDebugEnabled()) {
- msgLog.debug("Received DHT atomic update response [futId=" + res.futureVersion() +
+ msgLog.debug("Received DHT atomic update response [futId=" + res.futureId() +
", writeVer=" + updateFut.writeVersion() + ", node=" + nodeId + ']');
}
updateFut.onResult(nodeId, res);
}
else {
- U.warn(msgLog, "Failed to find DHT update future for update response [futId=" + res.futureVersion() +
+ U.warn(msgLog, "Failed to find DHT update future for update response [futId=" + res.futureId() +
", node=" + nodeId + ", res=" + res + ']');
}
}
@@ -3371,19 +3391,19 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
*/
@SuppressWarnings("unchecked")
private void processDhtAtomicDeferredUpdateResponse(UUID nodeId, GridDhtAtomicDeferredUpdateResponse res) {
- for (GridCacheVersion ver : res.futureVersions()) {
- GridDhtAtomicAbstractUpdateFuture updateFut = (GridDhtAtomicAbstractUpdateFuture)ctx.mvcc().atomicFuture(ver);
+ for (Long id : res.futureIds()) {
+ GridDhtAtomicAbstractUpdateFuture updateFut = (GridDhtAtomicAbstractUpdateFuture)ctx.mvcc().atomicFuture(id);
if (updateFut != null) {
if (msgLog.isDebugEnabled()) {
- msgLog.debug("Received DHT atomic deferred update response [futId=" + ver +
+ msgLog.debug("Received DHT atomic deferred update response [futId=" + id +
", writeVer=" + res + ", node=" + nodeId + ']');
}
updateFut.onResult(nodeId);
}
else {
- U.warn(msgLog, "Failed to find DHT update future for deferred update response [futId=" + ver +
+ U.warn(msgLog, "Failed to find DHT update future for deferred update response [futId=" + id +
", nodeId=" + nodeId + ", res=" + res + ']');
}
}
@@ -3398,16 +3418,16 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
ctx.io().send(nodeId, res, ctx.ioPolicy());
if (msgLog.isDebugEnabled())
- msgLog.debug("Sent near update response [futId=" + res.futureVersion() + ", node=" + nodeId + ']');
+ msgLog.debug("Sent near update response [futId=" + res.futureId() + ", node=" + nodeId + ']');
}
catch (ClusterTopologyCheckedException ignored) {
if (msgLog.isDebugEnabled()) {
- msgLog.debug("Failed to send near update response [futId=" + res.futureVersion() +
+ msgLog.debug("Failed to send near update response [futId=" + res.futureId() +
", node=" + nodeId + ']');
}
}
catch (IgniteCheckedException e) {
- U.error(msgLog, "Failed to send near update response [futId=" + res.futureVersion() +
+ U.error(msgLog, "Failed to send near update response [futId=" + res.futureId() +
", node=" + nodeId + ", res=" + res + ']', e);
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/7287a936/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicDeferredUpdateResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicDeferredUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicDeferredUpdateResponse.java
index 923b220..b662476 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicDeferredUpdateResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicDeferredUpdateResponse.java
@@ -43,7 +43,7 @@ public class GridDhtAtomicDeferredUpdateResponse extends GridCacheMessage implem
/** ACK future versions. */
@GridDirectCollection(GridCacheVersion.class)
- private Collection<GridCacheVersion> futVers;
+ private Collection<Long> futIds;
/** {@inheritDoc} */
@Override public int lookupIndex() {
@@ -61,14 +61,14 @@ public class GridDhtAtomicDeferredUpdateResponse extends GridCacheMessage implem
* Constructor.
*
* @param cacheId Cache ID.
- * @param futVers Future versions.
+ * @param futIds Future IDs.
* @param addDepInfo Deployment info.
*/
- public GridDhtAtomicDeferredUpdateResponse(int cacheId, Collection<GridCacheVersion> futVers, boolean addDepInfo) {
- assert !F.isEmpty(futVers);
+ public GridDhtAtomicDeferredUpdateResponse(int cacheId, Collection<Long> futIds, boolean addDepInfo) {
+ assert !F.isEmpty(futIds);
this.cacheId = cacheId;
- this.futVers = futVers;
+ this.futIds = futIds;
this.addDepInfo = addDepInfo;
}
@@ -78,10 +78,10 @@ public class GridDhtAtomicDeferredUpdateResponse extends GridCacheMessage implem
}
/**
- * @return List of ACKed future versions.
+ * @return List of ACKed future ids.
*/
- public Collection<GridCacheVersion> futureVersions() {
- return futVers;
+ public Collection<Long> futureIds() {
+ return futIds;
}
/** {@inheritDoc} */
@@ -105,7 +105,7 @@ public class GridDhtAtomicDeferredUpdateResponse extends GridCacheMessage implem
switch (writer.state()) {
case 3:
- if (!writer.writeCollection("futVers", futVers, MessageCollectionItemType.MSG))
+ if (!writer.writeCollection("futIds", futIds, MessageCollectionItemType.MSG))
return false;
writer.incrementState();
@@ -127,7 +127,7 @@ public class GridDhtAtomicDeferredUpdateResponse extends GridCacheMessage implem
switch (reader.state()) {
case 3:
- futVers = reader.readCollection("futVers", MessageCollectionItemType.MSG);
+ futIds = reader.readCollection("futIds", MessageCollectionItemType.MSG);
if (!reader.isLastRead())
return false;
http://git-wip-us.apache.org/repos/asf/ignite/blob/7287a936/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
index 20d6e90..0c8e482 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
@@ -98,7 +98,7 @@ class GridDhtAtomicSingleUpdateFuture extends GridDhtAtomicAbstractUpdateFuture
/** {@inheritDoc} */
@Override protected GridDhtAtomicAbstractUpdateRequest createRequest(
ClusterNode node,
- GridCacheVersion futVer,
+ long futId,
GridCacheVersion writeVer,
CacheWriteSynchronizationMode syncMode,
@NotNull AffinityTopologyVersion topVer,
@@ -110,7 +110,7 @@ class GridDhtAtomicSingleUpdateFuture extends GridDhtAtomicAbstractUpdateFuture
return new GridDhtAtomicSingleUpdateRequest(
cctx.cacheId(),
node.id(),
- futVer,
+ futId,
writeVer,
syncMode,
topVer,
@@ -124,7 +124,7 @@ class GridDhtAtomicSingleUpdateFuture extends GridDhtAtomicAbstractUpdateFuture
return new GridDhtAtomicUpdateRequest(
cctx.cacheId(),
node.id(),
- futVer,
+ futId,
writeVer,
syncMode,
topVer,
http://git-wip-us.apache.org/repos/asf/ignite/blob/7287a936/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java
index 0af7cf5..127c2be 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java
@@ -52,7 +52,7 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat
private static final int NEAR_FLAG_MASK = 0x80;
/** Future version. */
- protected GridCacheVersion futVer;
+ protected long futId;
/** Write version. */
protected GridCacheVersion writeVer;
@@ -102,7 +102,7 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat
*
* @param cacheId Cache ID.
* @param nodeId Node ID.
- * @param futVer Future version.
+ * @param futId Future ID.
* @param writeVer Write version for cache values.
* @param syncMode Cache write synchronization mode.
* @param topVer Topology version.
@@ -115,7 +115,7 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat
GridDhtAtomicSingleUpdateRequest(
int cacheId,
UUID nodeId,
- GridCacheVersion futVer,
+ long futId,
GridCacheVersion writeVer,
CacheWriteSynchronizationMode syncMode,
@NotNull AffinityTopologyVersion topVer,
@@ -126,7 +126,7 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat
boolean skipStore
) {
super(cacheId, nodeId);
- this.futVer = futVer;
+ this.futId = futId;
this.writeVer = writeVer;
this.syncMode = syncMode;
this.topVer = topVer;
@@ -268,8 +268,8 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat
}
/** {@inheritDoc} */
- @Override public GridCacheVersion futureVersion() {
- return futVer;
+ @Override public long futureId() {
+ return futId;
}
/** {@inheritDoc} */
@@ -430,7 +430,7 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat
writer.incrementState();
case 4:
- if (!writer.writeMessage("futVer", futVer))
+ if (!writer.writeLong("futId", futId))
return false;
writer.incrementState();
@@ -520,7 +520,7 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat
reader.incrementState();
case 4:
- futVer = reader.readMessage("futVer");
+ futId = reader.readLong("futId");
if (!reader.isLastRead())
return false;
http://git-wip-us.apache.org/repos/asf/ignite/blob/7287a936/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
index efb35c4..7cb75fa 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
@@ -123,7 +123,7 @@ class GridDhtAtomicUpdateFuture extends GridDhtAtomicAbstractUpdateFuture {
/** {@inheritDoc} */
@Override protected GridDhtAtomicAbstractUpdateRequest createRequest(ClusterNode node,
- GridCacheVersion futVer,
+ long futId,
GridCacheVersion writeVer,
CacheWriteSynchronizationMode syncMode,
@NotNull AffinityTopologyVersion topVer,
@@ -134,7 +134,7 @@ class GridDhtAtomicUpdateFuture extends GridDhtAtomicAbstractUpdateFuture {
return new GridDhtAtomicUpdateRequest(
cctx.cacheId(),
node.id(),
- futVer,
+ futId,
writeVer,
syncMode,
topVer,
http://git-wip-us.apache.org/repos/asf/ignite/blob/7287a936/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
index 1854e52..5dfea79 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
@@ -53,8 +53,8 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
/** */
private static final long serialVersionUID = 0L;
- /** Future version. */
- private GridCacheVersion futVer;
+ /** Future ID. */
+ private long futId;
/** Write version. */
private GridCacheVersion writeVer;
@@ -163,7 +163,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
*
* @param cacheId Cache ID.
* @param nodeId Node ID.
- * @param futVer Future version.
+ * @param futId Future ID.
* @param writeVer Write version for cache values.
* @param invokeArgs Optional arguments for entry processor.
* @param syncMode Cache write synchronization mode.
@@ -176,7 +176,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
public GridDhtAtomicUpdateRequest(
int cacheId,
UUID nodeId,
- GridCacheVersion futVer,
+ long futId,
GridCacheVersion writeVer,
CacheWriteSynchronizationMode syncMode,
@NotNull AffinityTopologyVersion topVer,
@@ -190,7 +190,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
) {
super(cacheId, nodeId);
- this.futVer = futVer;
+ this.futId = futId;
this.writeVer = writeVer;
this.syncMode = syncMode;
this.topVer = topVer;
@@ -362,8 +362,8 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
}
/** {@inheritDoc} */
- @Override public GridCacheVersion futureVersion() {
- return futVer;
+ @Override public long futureId() {
+ return futId;
}
/** {@inheritDoc} */
@@ -641,7 +641,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
writer.incrementState();
case 8:
- if (!writer.writeMessage("futVer", futVer))
+ if (!writer.writeLong("futId", futId))
return false;
writer.incrementState();
@@ -805,7 +805,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
reader.incrementState();
case 8:
- futVer = reader.readMessage("futVer");
+ futId = reader.readLong("futId");
if (!reader.isLastRead())
return false;
http://git-wip-us.apache.org/repos/asf/ignite/blob/7287a936/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java
index ff12af0..76d28c9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java
@@ -31,7 +31,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
import org.apache.ignite.internal.processors.cache.GridCacheMessage;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
-import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -50,7 +49,7 @@ public class GridDhtAtomicUpdateResponse extends GridCacheMessage implements Gri
public static final int CACHE_MSG_IDX = nextIndexId();
/** Future version. */
- private GridCacheVersion futVer;
+ private long futId;
/** Failed keys. */
@GridToStringInclude
@@ -78,12 +77,12 @@ public class GridDhtAtomicUpdateResponse extends GridCacheMessage implements Gri
/**
* @param cacheId Cache ID.
- * @param futVer Future version.
+ * @param futId Future ID.
* @param addDepInfo Deployment info.
*/
- public GridDhtAtomicUpdateResponse(int cacheId, GridCacheVersion futVer, boolean addDepInfo) {
+ public GridDhtAtomicUpdateResponse(int cacheId, long futId, boolean addDepInfo) {
this.cacheId = cacheId;
- this.futVer = futVer;
+ this.futId = futId;
this.addDepInfo = addDepInfo;
}
@@ -95,8 +94,8 @@ public class GridDhtAtomicUpdateResponse extends GridCacheMessage implements Gri
/**
* @return Future version.
*/
- public GridCacheVersion futureVersion() {
- return futVer;
+ public long futureId() {
+ return futId;
}
/**
@@ -223,7 +222,7 @@ public class GridDhtAtomicUpdateResponse extends GridCacheMessage implements Gri
writer.incrementState();
case 5:
- if (!writer.writeMessage("futVer", futVer))
+ if (!writer.writeLong("futId", futId))
return false;
writer.incrementState();
@@ -267,7 +266,7 @@ public class GridDhtAtomicUpdateResponse extends GridCacheMessage implements Gri
reader.incrementState();
case 5:
- futVer = reader.readMessage("futVer");
+ futId = reader.readLong("futId");
if (!reader.isLastRead())
return false;
http://git-wip-us.apache.org/repos/asf/ignite/blob/7287a936/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractSingleUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractSingleUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractSingleUpdateRequest.java
index 61deeee..2a17813 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractSingleUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractSingleUpdateRequest.java
@@ -70,7 +70,7 @@ public abstract class GridNearAtomicAbstractSingleUpdateRequest extends GridNear
protected UUID nodeId;
/** Future version. */
- protected GridCacheVersion futVer;
+ protected long futId;
/** Update version. Set to non-null if fastMap is {@code true}. */
private GridCacheVersion updateVer;
@@ -109,7 +109,7 @@ public abstract class GridNearAtomicAbstractSingleUpdateRequest extends GridNear
*
* @param cacheId Cache ID.
* @param nodeId Node ID.
- * @param futVer Future version.
+ * @param futId Future ID.
* @param fastMap Fast map scheme flag.
* @param updateVer Update version set if fast map is performed.
* @param topVer Topology version.
@@ -127,7 +127,7 @@ public abstract class GridNearAtomicAbstractSingleUpdateRequest extends GridNear
protected GridNearAtomicAbstractSingleUpdateRequest(
int cacheId,
UUID nodeId,
- GridCacheVersion futVer,
+ long futId,
boolean fastMap,
@Nullable GridCacheVersion updateVer,
@NotNull AffinityTopologyVersion topVer,
@@ -142,11 +142,9 @@ public abstract class GridNearAtomicAbstractSingleUpdateRequest extends GridNear
boolean clientReq,
boolean addDepInfo
) {
- assert futVer != null;
-
this.cacheId = cacheId;
this.nodeId = nodeId;
- this.futVer = futVer;
+ this.futId = futId;
this.updateVer = updateVer;
this.topVer = topVer;
this.syncMode = syncMode;
@@ -199,8 +197,8 @@ public abstract class GridNearAtomicAbstractSingleUpdateRequest extends GridNear
/**
* @return Future version.
*/
- @Override public GridCacheVersion futureVersion() {
- return futVer;
+ @Override public long futureId() {
+ return futId;
}
/**
@@ -421,7 +419,7 @@ public abstract class GridNearAtomicAbstractSingleUpdateRequest extends GridNear
writer.incrementState();
case 4:
- if (!writer.writeMessage("futVer", futVer))
+ if (!writer.writeLong("futId", futId))
return false;
writer.incrementState();
@@ -487,7 +485,7 @@ public abstract class GridNearAtomicAbstractSingleUpdateRequest extends GridNear
reader.incrementState();
case 4:
- futVer = reader.readMessage("futVer");
+ futId = reader.readLong("futId");
if (!reader.isLastRead())
return false;
http://git-wip-us.apache.org/repos/asf/ignite/blob/7287a936/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
index c92e0f5..42f4bc3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
@@ -113,9 +113,6 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
/** Current topology version. */
protected AffinityTopologyVersion topVer = AffinityTopologyVersion.ZERO;
- /** */
- protected GridCacheVersion updVer;
-
/** Topology version when got mapping error. */
protected AffinityTopologyVersion mapErrTopVer;
@@ -126,7 +123,7 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
protected CachePartialUpdateCheckedException err;
/** Future ID. */
- protected GridCacheVersion futVer;
+ protected Long futId;
/** Completion future for a particular topology version. */
protected GridFutureAdapter<Void> topCompleteFut;
@@ -212,18 +209,18 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
// Cannot remap.
remapCnt = 1;
- GridCacheVersion futVer = addAtomicFuture(topVer);
+ Long futId = addAtomicFuture(topVer);
- if (futVer != null)
- map(topVer, futVer);
+ if (futId != null)
+ map(topVer, futId);
}
}
/**
* @param topVer Topology version.
- * @param futVer Future version
+ * @param futId Future ID.
*/
- protected abstract void map(AffinityTopologyVersion topVer, GridCacheVersion futVer);
+ protected abstract void map(AffinityTopologyVersion topVer, Long futId);
/**
* Maps future on ready topology.
@@ -272,7 +269,7 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
cctx.io().send(req.nodeId(), req, cctx.ioPolicy());
if (msgLog.isDebugEnabled()) {
- msgLog.debug("Near update fut, sent request [futId=" + req.futureVersion() +
+ msgLog.debug("Near update fut, sent request [futId=" + req.futureId() +
", writeVer=" + req.updateVersion() +
", node=" + req.nodeId() + ']');
}
@@ -282,7 +279,7 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
}
catch (IgniteCheckedException e) {
if (msgLog.isDebugEnabled()) {
- msgLog.debug("Near update fut, failed to send request [futId=" + req.futureVersion() +
+ msgLog.debug("Near update fut, failed to send request [futId=" + req.futureId() +
", writeVer=" + req.updateVersion() +
", node=" + req.nodeId() +
", err=" + e + ']');
@@ -302,6 +299,8 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
*/
public abstract void onResult(UUID nodeId, GridNearAtomicUpdateResponse res, boolean nodeErr);
+ public abstract void onResult(UUID nodeId, GridNearAtomicDhtResponse res);
+
/**
* @param req Request.
* @param e Error.
@@ -310,7 +309,7 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
synchronized (mux) {
GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(cctx.cacheId(),
req.nodeId(),
- req.futureVersion(),
+ req.futureId(),
cctx.deploymentEnabled());
res.addFailedKeys(req.keys(), e);
@@ -324,22 +323,22 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
* Should be invoked before topology lock released.
*
* @param topVer Topology version.
- * @return Future version in case future added.
+ * @return Future ID in case future added.
*/
- protected final GridCacheVersion addAtomicFuture(AffinityTopologyVersion topVer) {
- GridCacheVersion futVer = cctx.versions().next(topVer);
+ protected final Long addAtomicFuture(AffinityTopologyVersion topVer) {
+ Long futId = cctx.mvcc().atomicFutureId();
synchronized (mux) {
- assert this.futVer == null : this;
+ assert this.futId == null : this;
assert this.topVer == AffinityTopologyVersion.ZERO : this;
this.topVer = topVer;
- this.futVer = futVer;
+ this.futId = futId;
}
- if (storeFuture() && !cctx.mvcc().addAtomicFuture(futVer, this))
+ if (storeFuture() && !cctx.mvcc().addAtomicFuture(futId, this))
return null;
- return futVer;
+ return futId;
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/7287a936/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
index bee2ecd..077c73c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
@@ -61,7 +61,7 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa
/**
* @return Future version.
*/
- public abstract GridCacheVersion futureVersion();
+ public abstract long futureId();
/**
* @return Flag indicating whether this is fast-map udpate.
http://git-wip-us.apache.org/repos/asf/ignite/blob/7287a936/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicDhtResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicDhtResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicDhtResponse.java
new file mode 100644
index 0000000..fc99a5b
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicDhtResponse.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import org.apache.ignite.internal.GridDirectCollection;
+import org.apache.ignite.internal.processors.cache.GridCacheMessage;
+import org.apache.ignite.internal.processors.cache.GridCacheReturn;
+import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ * TODO IGNITE-4705: no need send mapping if it == affinity.
+ */
+public class GridNearAtomicDhtResponse extends GridCacheMessage {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Message index. */
+ public static final int CACHE_MSG_IDX = nextIndexId();
+
+ /** */
+ private static final int HAS_RESULT_MASK = 0x1;
+
+ /** */
+ private static final int RESULT_SUCCESS_MASK = 0x2;
+
+ /** */
+ private long futId;
+
+ /** */
+ @GridDirectCollection(UUID.class)
+ private List<UUID> mapping;
+
+ /** */
+ private byte flags;
+
+ /**
+ *
+ */
+ public GridNearAtomicDhtResponse() {
+ // No-op.
+ }
+
+ /**
+ * @param futId Future ID.
+ * @param mapping Update mapping.
+ */
+ public GridNearAtomicDhtResponse(long futId, List<UUID> mapping) {
+ this.futId = futId;
+ this.mapping = mapping;
+ }
+
+ /**
+ * @param success Success flag.
+ */
+ public void setResult(boolean success) {
+ setFlag(true, HAS_RESULT_MASK);
+
+ setFlag(success, RESULT_SUCCESS_MASK);
+ }
+
+ /**
+ * @return Operation result.
+ */
+ public GridCacheReturn result() {
+ assert hasResult();
+
+ return new GridCacheReturn(true, isFlag(RESULT_SUCCESS_MASK));
+ }
+
+ /**
+ * @return {@code True} if response contains operation result.
+ */
+ public boolean hasResult() {
+ return isFlag(HAS_RESULT_MASK);
+ }
+
+ /**
+ * @return Update mapping.
+ */
+ public List<UUID> mapping() {
+ return mapping;
+ }
+
+ /**
+ * @param flag Set or clear.
+ * @param mask Mask.
+ */
+ private void setFlag(boolean flag, int mask) {
+ flags = flag ? (byte)(flags | mask) : (byte)(flags & ~mask);
+ }
+
+ /**
+ * Reads flag mask.
+ *
+ * @param mask Mask to read.
+ * @return Flag value.
+ */
+ private boolean isFlag(int mask) {
+ return (flags & mask) != 0;
+ }
+
+ /**
+ * @return Future ID.
+ */
+ public long futureId() {
+ return futId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte directType() {
+ return -45;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte fieldsCount() {
+ return 6;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean addDeploymentInfo() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ writer.setBuffer(buf);
+
+ if (!super.writeTo(buf, writer))
+ return false;
+
+ if (!writer.isHeaderWritten()) {
+ if (!writer.writeHeader(directType(), fieldsCount()))
+ return false;
+
+ writer.onHeaderWritten();
+ }
+
+ switch (writer.state()) {
+ case 3:
+ if (!writer.writeByte("flags", flags))
+ return false;
+
+ writer.incrementState();
+
+ case 4:
+ if (!writer.writeLong("futId", futId))
+ return false;
+
+ writer.incrementState();
+
+ case 5:
+ if (!writer.writeCollection("mapping", mapping, MessageCollectionItemType.UUID))
+ return false;
+
+ writer.incrementState();
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ reader.setBuffer(buf);
+
+ if (!reader.beforeMessageRead())
+ return false;
+
+ if (!super.readFrom(buf, reader))
+ return false;
+
+ switch (reader.state()) {
+ case 3:
+ flags = reader.readByte("flags");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 4:
+ futId = reader.readLong("futId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 5:
+ mapping = reader.readCollection("mapping", MessageCollectionItemType.UUID);
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ }
+
+ return reader.afterMessageRead(GridNearAtomicDhtResponse.class);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/7287a936/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
index 08c2474..a8f5a55 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
@@ -66,7 +66,7 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
private UUID nodeId;
/** Future version. */
- private GridCacheVersion futVer;
+ private long futId;
/** Update version. Set to non-null if fastMap is {@code true}. */
private GridCacheVersion updateVer;
@@ -175,7 +175,7 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
*
* @param cacheId Cache ID.
* @param nodeId Node ID.
- * @param futVer Future version.
+ * @param futId Future ID.
* @param fastMap Fast map scheme flag.
* @param updateVer Update version set if fast map is performed.
* @param topVer Topology version.
@@ -197,7 +197,7 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
GridNearAtomicFullUpdateRequest(
int cacheId,
UUID nodeId,
- GridCacheVersion futVer,
+ long futId,
boolean fastMap,
@Nullable GridCacheVersion updateVer,
@NotNull AffinityTopologyVersion topVer,
@@ -216,11 +216,9 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
boolean addDepInfo,
int maxEntryCnt
) {
- assert futVer != null;
-
this.cacheId = cacheId;
this.nodeId = nodeId;
- this.futVer = futVer;
+ this.futId = futId;
this.fastMap = fastMap;
this.updateVer = updateVer;
@@ -276,8 +274,8 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
}
/** {@inheritDoc} */
- @Override public GridCacheVersion futureVersion() {
- return futVer;
+ @Override public long futureId() {
+ return futId;
}
/** {@inheritDoc} */
@@ -678,7 +676,7 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
writer.incrementState();
case 11:
- if (!writer.writeMessage("futVer", futVer))
+ if (!writer.writeLong("futId", futId))
return false;
writer.incrementState();
@@ -854,7 +852,7 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
reader.incrementState();
case 11:
- futVer = reader.readMessage("futVer");
+ futId = reader.readLong("futId");
if (!reader.isLastRead())
return false;
http://git-wip-us.apache.org/repos/asf/ignite/blob/7287a936/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFilterRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFilterRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFilterRequest.java
index e0c24b2..c474c83 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFilterRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFilterRequest.java
@@ -58,7 +58,7 @@ public class GridNearAtomicSingleUpdateFilterRequest extends GridNearAtomicSingl
*
* @param cacheId Cache ID.
* @param nodeId Node ID.
- * @param futVer Future version.
+ * @param futId Future ID.
* @param fastMap Fast map scheme flag.
* @param updateVer Update version set if fast map is performed.
* @param topVer Topology version.
@@ -77,7 +77,7 @@ public class GridNearAtomicSingleUpdateFilterRequest extends GridNearAtomicSingl
GridNearAtomicSingleUpdateFilterRequest(
int cacheId,
UUID nodeId,
- GridCacheVersion futVer,
+ long futId,
boolean fastMap,
@Nullable GridCacheVersion updateVer,
@NotNull AffinityTopologyVersion topVer,
@@ -96,7 +96,7 @@ public class GridNearAtomicSingleUpdateFilterRequest extends GridNearAtomicSingl
super(
cacheId,
nodeId,
- futVer,
+ futId,
fastMap,
updateVer,
topVer,