You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/03/02 15:34:27 UTC
[1/2] ignite git commit: ignite-4705
Repository: ignite
Updated Branches:
refs/heads/ignite-4705-2 [created] 30bfae0e6
http://git-wip-us.apache.org/repos/asf/ignite/blob/30bfae0e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
index 204e510..6228f03 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
@@ -269,7 +269,6 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
if (msgLog.isDebugEnabled()) {
msgLog.debug("Near update fut, sent request [futId=" + req.futureId() +
- ", writeVer=" + req.updateVersion() +
", node=" + req.nodeId() + ']');
}
@@ -279,7 +278,6 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
catch (IgniteCheckedException e) {
if (msgLog.isDebugEnabled()) {
msgLog.debug("Near update fut, failed to send request [futId=" + req.futureId() +
- ", writeVer=" + req.updateVersion() +
", node=" + req.nodeId() +
", err=" + e + ']');
}
@@ -305,12 +303,6 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
public abstract void onDhtResponse(UUID nodeId, GridDhtAtomicNearResponse res);
/**
- * @param nodeId Node ID.
- * @param res Response.
- */
- public abstract void onMappingReceived(UUID nodeId, GridNearAtomicMappingResponse res);
-
- /**
* @param req Request.
* @param e Error.
*/
@@ -423,23 +415,6 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
}
/**
- * @param cctx Context.
- * @param res Response.
- * @return {@code True} if request processing finished.
- */
- boolean onMappingReceived(GridCacheContext cctx, GridNearAtomicMappingResponse res) {
- if (finished() || mapping != null)
- return false;
-
- if (res.affinityMapping())
- initAffinityMapping(cctx, null);
- else
- initMapping(cctx, res.mapping(), null);
-
- return finished();
- }
-
- /**
* @param nodeId Node ID.
* @return {@code True} if request processing finished.
*/
@@ -465,41 +440,41 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
if (finished())
return false;
- if (res.primaryDhtFailureResponse()) {
- assert res.mapping() != null : res;
- assert res.failedNodeId() != null : res;
-
- nodeId = res.failedNodeId();
- }
-
- if (res.hasResult())
- hasRes = true;
-
- if (res.affinityMapping()) {
- if (mapping == null) {
- initAffinityMapping(cctx, nodeId);
-
- return finished();
- }
- } else if (res.mapping() != null) {
- // Mapping is sent from dht nodes.
- if (mapping == null) {
- initMapping(cctx, res.mapping(), nodeId);
-
- return finished();
- }
- }
- else {
- // Mapping and result are sent from primary.
- if (mapping == null) {
- if (rcvd == null)
- rcvd = new HashSet<>();
-
- rcvd.add(nodeId);
-
- return false; // Need wait for response from primary.
- }
- }
+// if (res.primaryDhtFailureResponse()) {
+// assert res.mapping() != null : res;
+// assert res.failedNodeId() != null : res;
+//
+// nodeId = res.failedNodeId();
+// }
+//
+// if (res.hasResult())
+// hasRes = true;
+//
+// if (res.affinityMapping()) {
+// if (mapping == null) {
+// initAffinityMapping(cctx, nodeId);
+//
+// return finished();
+// }
+// } else if (res.mapping() != null) {
+// // Mapping is sent from dht nodes.
+// if (mapping == null) {
+// initMapping(cctx, res.mapping(), nodeId);
+//
+// return finished();
+// }
+// }
+// else {
+// // Mapping and result are sent from primary.
+// if (mapping == null) {
+// if (rcvd == null)
+// rcvd = new HashSet<>();
+//
+// rcvd.add(nodeId);
+//
+// return false; // Need wait for response from primary.
+// }
+// }
return mapping.remove(nodeId) && finished();
}
@@ -523,12 +498,12 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
assert res.returnValue() != null : res;
- if (res.mapping() != null) {
- if (mapping == null)
- initMapping(cctx, res.mapping(), null);
- }
- else
- initAffinityMapping(cctx, null);
+// if (res.mapping() != null) {
+// if (mapping == null)
+// initMapping(cctx, res.mapping(), null);
+// }
+// else
+// initAffinityMapping(cctx, null);
return finished();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/30bfae0e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
index d5c7a9e..a40cfe0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
@@ -21,14 +21,19 @@ import java.util.List;
import java.util.UUID;
import javax.cache.expiry.ExpiryPolicy;
import javax.cache.processor.EntryProcessor;
+import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.internal.GridDirectTransient;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
import org.apache.ignite.internal.processors.cache.GridCacheMessage;
import org.apache.ignite.internal.processors.cache.GridCacheOperation;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
/**
@@ -38,109 +43,282 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa
/** Message index. */
public static final int CACHE_MSG_IDX = nextIndexId();
- /**
- * @return Mapped node ID.
- */
- public abstract UUID nodeId();
+ /** Stable topology flag mask. */
+ private static final int STABLE_TOP_FLAG_MASK = 0x01;
- /**
- * @param nodeId Node ID.
- */
- public abstract void nodeId(UUID nodeId);
+ /** Topology locked flag. Set if atomic update is performed inside TX or explicit lock. */
+ private static final int TOP_LOCKED_FLAG_MASK = 0x02;
- /**
- * @return Subject ID.
- */
- public abstract UUID subjectId();
+ /** Skip write-through to a persistent storage. */
+ private static final int SKIP_STORE_FLAG_MASK = 0x04;
+
+ /** Keep binary flag. */
+ private static final int KEEP_BINARY_FLAG_MASK = 0x08;
+
+ /** Return value flag. */
+ private static final int RET_VAL_FLAG_MASK = 0x10;
+
+ /** Target node ID. */
+ @GridDirectTransient
+ protected UUID nodeId;
+
+ /** Future version. */
+ protected long futId;
+
+ /** Topology version. */
+ protected AffinityTopologyVersion topVer;
+
+ /** Write synchronization mode. */
+ protected CacheWriteSynchronizationMode syncMode;
+
+ /** Update operation. */
+ protected GridCacheOperation op;
+
+ /** Subject ID. */
+ protected UUID subjId;
+
+ /** Task name hash. */
+ protected int taskNameHash;
+
+ /** Compressed boolean flags. */
+ protected byte flags;
+
+ /** */
+ @GridDirectTransient
+ private GridNearAtomicUpdateResponse res;
/**
- * @return Task name hash.
+ *
*/
- public abstract int taskNameHash();
+ public GridNearAtomicAbstractUpdateRequest() {
+ // No-op.
+ }
/**
- * @return Future version.
- */
- public abstract long futureId();
+ * Constructor.
+ *
+ * @param cacheId Cache ID.
+ * @param nodeId Node ID.
+ * @param futId Future ID.
+ * @param topVer Topology version.
+ * @param topLocked Topology locked flag.
+ * @param syncMode Synchronization mode.
+ * @param op Cache update operation.
+ * @param retval Return value required flag.
+ * @param subjId Subject ID.
+ * @param taskNameHash Task name hash code.
+ * @param skipStore Skip write-through to a persistent storage.
+ * @param keepBinary Keep binary flag.
+ * @param addDepInfo Deployment info flag.
+ */
+ protected GridNearAtomicAbstractUpdateRequest(
+ int cacheId,
+ UUID nodeId,
+ long futId,
+ @NotNull AffinityTopologyVersion topVer,
+ boolean topLocked,
+ CacheWriteSynchronizationMode syncMode,
+ GridCacheOperation op,
+ boolean retval,
+ @Nullable UUID subjId,
+ int taskNameHash,
+ boolean stableTop,
+ boolean skipStore,
+ boolean keepBinary,
+ boolean addDepInfo
+ ) {
+ this.cacheId = cacheId;
+ this.nodeId = nodeId;
+ this.futId = futId;
+ this.topVer = topVer;
+ this.syncMode = syncMode;
+ this.op = op;
+ this.subjId = subjId;
+ this.taskNameHash = taskNameHash;
+ this.addDepInfo = addDepInfo;
+
+ if (stableTop)
+ stableTopology(true);
+
+ if (topLocked)
+ topologyLocked(true);
+
+ if (retval)
+ returnValue(true);
+
+ if (skipStore)
+ skipStore(true);
+
+ if (keepBinary)
+ keepBinary(true);
+ }
+
+ /** {@inheritDoc} */
+ @Override public final int lookupIndex() {
+ return CACHE_MSG_IDX;
+ }
+
+ /** {@inheritDoc} */
+ @Override public final boolean addDeploymentInfo() {
+ return addDepInfo;
+ }
+
+ /** {@inheritDoc} */
+ @Override public final IgniteLogger messageLogger(GridCacheSharedContext ctx) {
+ return ctx.atomicMessageLogger();
+ }
+
+ boolean stableTopology() {
+ return isFlag(STABLE_TOP_FLAG_MASK);
+ }
+
+ boolean dhtReplyToNear() {
+ return stableTopology() && syncMode == CacheWriteSynchronizationMode.FULL_SYNC;
+ }
+
+ void stableTopology(boolean stableTop) {
+ setFlag(stableTop, STABLE_TOP_FLAG_MASK);
+ }
+
+ public int taskNameHash() {
+ return taskNameHash;
+ }
+
+ public GridCacheOperation operation() {
+ return op;
+ }
+
+ public UUID subjectId() {
+ return subjId;
+ }
+
+ public UUID nodeId() {
+ return nodeId;
+ }
+
+ void nodeId(UUID nodeId) {
+ this.nodeId = nodeId;
+ }
+
+ public long futureId() {
+ return futId;
+ }
+
+ public final CacheWriteSynchronizationMode writeSynchronizationMode() {
+ return syncMode;
+ }
/**
- * @return Flag indicating whether this is fast-map udpate.
- * TODO IGNITE-4705
+ * @param res Response.
+ * @return {@code True} if current response was {@code null}.
*/
- public abstract boolean fastMap();
+ public boolean onResponse(GridNearAtomicUpdateResponse res) {
+ if (this.res == null) {
+ this.res = res;
+
+ return true;
+ }
+
+ return false;
+ }
/**
- * @return Update version for fast-map request.
- * TODO IGNITE-4705
+ * @return Response.
*/
- public abstract GridCacheVersion updateVersion();
+ @Nullable public GridNearAtomicUpdateResponse response() {
+ return res;
+ }
/**
* @return Topology locked flag.
*/
- public abstract boolean topologyLocked();
+ public final boolean topologyLocked() {
+ return isFlag(TOP_LOCKED_FLAG_MASK);
+ }
/**
- * @return {@code True} if request sent from client node.
+ * Sets topologyLocked flag value.
*/
- public abstract boolean clientRequest();
+ public final void topologyLocked(boolean val) {
+ setFlag(val, TOP_LOCKED_FLAG_MASK);
+ }
/**
- * @return Cache write synchronization mode.
+ * @return Return value flag.
*/
- public abstract CacheWriteSynchronizationMode writeSynchronizationMode();
+ public final boolean returnValue() {
+ return isFlag(RET_VAL_FLAG_MASK);
+ }
/**
- * @return Expiry policy.
+ * Sets returnValue flag value.
*/
- public abstract ExpiryPolicy expiry();
+ public final void returnValue(boolean val) {
+ setFlag(val, RET_VAL_FLAG_MASK);
+ }
/**
- * @return Return value flag.
+ * @return Skip write-through to a persistent storage.
*/
- public abstract boolean returnValue();
+ public final boolean skipStore() {
+ return isFlag(SKIP_STORE_FLAG_MASK);
+ }
/**
- * @return Filter.
+ * Sets skipStore flag value.
*/
- @Nullable public abstract CacheEntryPredicate[] filter();
+ public void skipStore(boolean val) {
+ setFlag(val, SKIP_STORE_FLAG_MASK);
+ }
/**
- * @return Skip write-through to a persistent storage.
+ * @return Keep binary flag.
*/
- public abstract boolean skipStore();
+ public final boolean keepBinary() {
+ return isFlag(KEEP_BINARY_FLAG_MASK);
+ }
/**
- * @return Keep binary flag.
+ * Sets keepBinary flag value.
*/
- public abstract boolean keepBinary();
+ public void keepBinary(boolean val) {
+ setFlag(val, KEEP_BINARY_FLAG_MASK);
+ }
/**
- * @return Update operation.
+ * Sets flag mask.
+ *
+ * @param flag Set or clear.
+ * @param mask Mask.
*/
- public abstract GridCacheOperation operation();
+ private void setFlag(boolean flag, int mask) {
+ flags = flag ? (byte)(flags | mask) : (byte)(flags & ~mask);
+ }
/**
- * @return Optional arguments for entry processor.
+ * Reads flag mask.
+ *
+ * @param mask Mask to read.
+ * @return Flag value.
*/
- @Nullable public abstract Object[] invokeArguments();
+ private boolean isFlag(int mask) {
+ return (flags & mask) != 0;
+ }
/**
- * @return Flag indicating whether this request contains primary keys.
- * TODO IGNITE-4705
+ * @return Expiry policy.
*/
- public abstract boolean hasPrimary();
+ public abstract ExpiryPolicy expiry();
/**
- * @param res Response.
- * @return {@code True} if current response was {@code null}.
+ * @return Filter.
*/
- public abstract boolean onResponse(GridNearAtomicUpdateResponse res);
+ @Nullable public abstract CacheEntryPredicate[] filter();
/**
- * @return Response.
+ * @return Optional arguments for entry processor.
*/
- @Nullable public abstract GridNearAtomicUpdateResponse response();
+ @Nullable public abstract Object[] invokeArguments();
/**
* @param key Key to add.
@@ -148,14 +326,12 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa
* @param conflictTtl Conflict TTL (optional).
* @param conflictExpireTime Conflict expire time (optional).
* @param conflictVer Conflict version (optional).
- * @param primary If given key is primary on this mapping.
*/
- public abstract void addUpdateEntry(KeyCacheObject key,
+ abstract void addUpdateEntry(KeyCacheObject key,
@Nullable Object val,
long conflictTtl,
long conflictExpireTime,
- @Nullable GridCacheVersion conflictVer,
- boolean primary);
+ @Nullable GridCacheVersion conflictVer);
/**
* @return Keys for this update request.
@@ -185,7 +361,6 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa
*/
public abstract CacheObject writeValue(int idx);
-
/**
* @return Conflict versions.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/30bfae0e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
index 869cfbc..331bcdd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
@@ -26,7 +26,6 @@ import java.util.UUID;
import javax.cache.expiry.ExpiryPolicy;
import javax.cache.processor.EntryProcessor;
import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.internal.GridDirectCollection;
import org.apache.ignite.internal.GridDirectTransient;
@@ -41,10 +40,10 @@ import org.apache.ignite.internal.processors.cache.distributed.IgniteExternaliza
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.GridLongList;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
import org.jetbrains.annotations.NotNull;
@@ -61,56 +60,6 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
/** */
private static final long serialVersionUID = 0L;
- /** Target node ID. */
- @GridDirectTransient
- private UUID nodeId;
-
- /** Future version. */
- private long futId;
-
- /** Update version. Set to non-null if fastMap is {@code true}. */
- private GridCacheVersion updateVer;
-
- /** Topology version. */
- private AffinityTopologyVersion topVer;
-
- /** Write synchronization mode. */
- private CacheWriteSynchronizationMode syncMode;
-
- /** Update operation. */
- private GridCacheOperation op;
-
- /** Subject ID. */
- protected UUID subjId;
-
- /** Task name hash. */
- protected int taskNameHash;
-
- /** */
- @GridDirectTransient
- private GridNearAtomicUpdateResponse res;
-
- /** Fast map flag. */
- protected boolean fastMap;
-
- /** Topology locked flag. Set if atomic update is performed inside TX or explicit lock. */
- protected boolean topLocked;
-
- /** Flag indicating whether request contains primary keys. */
- protected boolean hasPrimary;
-
- /** Skip write-through to a persistent storage. */
- protected boolean skipStore;
-
- /** */
- protected boolean clientReq;
-
- /** Keep binary flag. */
- protected boolean keepBinary;
-
- /** Return value flag. */
- protected boolean retval;
-
/** Keys to update. */
@GridToStringInclude
@GridDirectCollection(KeyCacheObject.class)
@@ -120,10 +69,6 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
@GridDirectCollection(CacheObject.class)
private List<CacheObject> vals;
- /** Partitions of keys. */
- @GridDirectCollection(int.class)
- private List<Integer> partIds;
-
/** Entry processors. */
@GridDirectTransient
private List<EntryProcessor<Object, Object, Object>> entryProcessors;
@@ -177,7 +122,6 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
* @param nodeId Node ID.
* @param futId Future ID.
* @param fastMap Fast map scheme flag.
- * @param updateVer Update version set if fast map is performed.
* @param topVer Topology version.
* @param topLocked Topology locked flag.
* @param syncMode Synchronization mode.
@@ -190,7 +134,6 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
* @param taskNameHash Task name hash code.
* @param skipStore Skip write-through to a persistent storage.
* @param keepBinary Keep binary flag.
- * @param clientReq Client node request flag.
* @param addDepInfo Deployment info flag.
* @param maxEntryCnt Maximum entries count.
*/
@@ -198,8 +141,6 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
int cacheId,
UUID nodeId,
long futId,
- boolean fastMap,
- @Nullable GridCacheVersion updateVer,
@NotNull AffinityTopologyVersion topVer,
boolean topLocked,
CacheWriteSynchronizationMode syncMode,
@@ -210,32 +151,29 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
@Nullable CacheEntryPredicate[] filter,
@Nullable UUID subjId,
int taskNameHash,
+ boolean stableTop,
boolean skipStore,
boolean keepBinary,
- boolean clientReq,
boolean addDepInfo,
int maxEntryCnt
) {
- this.cacheId = cacheId;
- this.nodeId = nodeId;
- this.futId = futId;
- this.fastMap = fastMap;
- this.updateVer = updateVer;
-
- this.topVer = topVer;
- this.topLocked = topLocked;
- this.syncMode = syncMode;
- this.op = op;
- this.retval = retval;
+ super(cacheId,
+ nodeId,
+ futId,
+ topVer,
+ topLocked,
+ syncMode,
+ op,
+ retval,
+ subjId,
+ taskNameHash,
+ stableTop,
+ skipStore,
+ keepBinary,
+ addDepInfo);
this.expiryPlc = expiryPlc;
this.invokeArgs = invokeArgs;
this.filter = filter;
- this.subjId = subjId;
- this.taskNameHash = taskNameHash;
- this.skipStore = skipStore;
- this.keepBinary = keepBinary;
- this.clientReq = clientReq;
- this.addDepInfo = addDepInfo;
// By default ArrayList expands to array of 10 elements on first add. We cannot guess how many entries
// will be added to request because of unknown affinity distribution. However, we DO KNOW how many keys
@@ -244,84 +182,6 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
initSize = Math.min(maxEntryCnt, 10);
keys = new ArrayList<>(initSize);
-
- partIds = new ArrayList<>(initSize);
- }
-
- /** {@inheritDoc} */
- @Override public int lookupIndex() {
- return CACHE_MSG_IDX;
- }
-
- /** {@inheritDoc} */
- @Override public UUID nodeId() {
- return nodeId;
- }
-
- /** {@inheritDoc} */
- @Override public void nodeId(UUID nodeId) {
- this.nodeId = nodeId;
- }
-
- /** {@inheritDoc} */
- @Override public UUID subjectId() {
- return subjId;
- }
-
- /** {@inheritDoc} */
- @Override public int taskNameHash() {
- return taskNameHash;
- }
-
- /** {@inheritDoc} */
- @Override public long futureId() {
- return futId;
- }
-
- /** {@inheritDoc} */
- @Override public GridCacheVersion updateVersion() {
- return updateVer;
- }
-
- /** {@inheritDoc} */
- @Override public AffinityTopologyVersion topologyVersion() {
- return topVer;
- }
-
- /** {@inheritDoc} */
- @Override public CacheWriteSynchronizationMode writeSynchronizationMode() {
- return syncMode;
- }
-
- /** {@inheritDoc} */
- @Override public GridCacheOperation operation() {
- return op;
- }
-
- /** {@inheritDoc} */
- @Override public boolean onResponse(GridNearAtomicUpdateResponse res) {
- if (this.res == null) {
- this.res = res;
-
- return true;
- }
-
- return false;
- }
-
- /** {@inheritDoc} */
- @Override @Nullable public GridNearAtomicUpdateResponse response() {
- return res;
- }
-
- /** {@inheritDoc} */
- @Override public boolean addDeploymentInfo() {
- return addDepInfo;
- }
-
- /** {@inheritDoc} */
- @Override public IgniteLogger messageLogger(GridCacheSharedContext ctx) {
- return ctx.atomicMessageLogger();
}
/** {@inheritDoc} */
@@ -329,8 +189,7 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
@Nullable Object val,
long conflictTtl,
long conflictExpireTime,
- @Nullable GridCacheVersion conflictVer,
- boolean primary) {
+ @Nullable GridCacheVersion conflictVer) {
EntryProcessor<Object, Object, Object> entryProcessor = null;
if (op == TRANSFORM) {
@@ -342,7 +201,6 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
assert val != null || op == DELETE;
keys.add(key);
- partIds.add(key.partition());
if (entryProcessor != null) {
if (entryProcessors == null)
@@ -359,8 +217,6 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
vals.add((CacheObject)val);
}
- hasPrimary |= primary;
-
// In case there is no conflict, do not create the list.
if (conflictVer != null) {
if (conflictVers == null) {
@@ -488,41 +344,6 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
}
/** {@inheritDoc} */
- @Override public boolean fastMap() {
- return fastMap;
- }
-
- /** {@inheritDoc} */
- @Override public boolean topologyLocked() {
- return topLocked;
- }
-
- /** {@inheritDoc} */
- @Override public boolean clientRequest() {
- return clientReq;
- }
-
- /** {@inheritDoc} */
- @Override public boolean returnValue() {
- return retval;
- }
-
- /** {@inheritDoc} */
- @Override public boolean skipStore() {
- return skipStore;
- }
-
- /** {@inheritDoc} */
- @Override public boolean keepBinary() {
- return keepBinary;
- }
-
- /** {@inheritDoc} */
- @Override public boolean hasPrimary() {
- return hasPrimary;
- }
-
- /** {@inheritDoc} */
@Override @Nullable public CacheEntryPredicate[] filter() {
return filter;
}
@@ -600,397 +421,22 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
}
else
finishUnmarshalCacheObjects(vals, cctx, ldr);
-
- if (partIds != null && !partIds.isEmpty()) {
- assert partIds.size() == keys.size();
-
- for (int i = 0; i < keys.size(); i++)
- keys.get(i).partition(partIds.get(i));
- }
}
/** {@inheritDoc} */
@Override public int partition() {
- return partIds != null && !partIds.isEmpty() ? partIds.get(0) : -1;
+ assert !F.isEmpty(keys);
+
+ return keys.get(0).partition();
}
/** {@inheritDoc} */
@Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
- writer.setBuffer(buf);
-
- if (!super.writeTo(buf, writer))
- return false;
-
- if (!writer.isHeaderWritten()) {
- if (!writer.writeHeader(directType(), fieldsCount()))
- return false;
-
- writer.onHeaderWritten();
- }
-
- switch (writer.state()) {
- case 3:
- if (!writer.writeBoolean("clientReq", clientReq))
- return false;
-
- writer.incrementState();
-
- case 4:
- if (!writer.writeMessage("conflictExpireTimes", conflictExpireTimes))
- return false;
-
- writer.incrementState();
-
- case 5:
- if (!writer.writeMessage("conflictTtls", conflictTtls))
- return false;
-
- writer.incrementState();
-
- case 6:
- if (!writer.writeCollection("conflictVers", conflictVers, MessageCollectionItemType.MSG))
- return false;
-
- writer.incrementState();
-
- case 7:
- if (!writer.writeCollection("entryProcessorsBytes", entryProcessorsBytes, MessageCollectionItemType.BYTE_ARR))
- return false;
-
- writer.incrementState();
-
- case 8:
- if (!writer.writeByteArray("expiryPlcBytes", expiryPlcBytes))
- return false;
-
- writer.incrementState();
-
- case 9:
- if (!writer.writeBoolean("fastMap", fastMap))
- return false;
-
- writer.incrementState();
-
- case 10:
- if (!writer.writeObjectArray("filter", filter, MessageCollectionItemType.MSG))
- return false;
-
- writer.incrementState();
-
- case 11:
- if (!writer.writeLong("futId", futId))
- return false;
-
- writer.incrementState();
-
- case 12:
- if (!writer.writeBoolean("hasPrimary", hasPrimary))
- return false;
-
- writer.incrementState();
-
- case 13:
- if (!writer.writeObjectArray("invokeArgsBytes", invokeArgsBytes, MessageCollectionItemType.BYTE_ARR))
- return false;
-
- writer.incrementState();
-
- case 14:
- if (!writer.writeBoolean("keepBinary", keepBinary))
- return false;
-
- writer.incrementState();
-
- case 15:
- if (!writer.writeCollection("keys", keys, MessageCollectionItemType.MSG))
- return false;
-
- writer.incrementState();
-
- case 16:
- if (!writer.writeByte("op", op != null ? (byte)op.ordinal() : -1))
- return false;
-
- writer.incrementState();
-
- case 17:
- if (!writer.writeCollection("partIds", partIds, MessageCollectionItemType.INT))
- return false;
-
- writer.incrementState();
-
- case 18:
- if (!writer.writeBoolean("retval", retval))
- return false;
-
- writer.incrementState();
-
- case 19:
- if (!writer.writeBoolean("skipStore", skipStore))
- return false;
-
- writer.incrementState();
-
- case 20:
- if (!writer.writeUuid("subjId", subjId))
- return false;
-
- writer.incrementState();
-
- case 21:
- if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1))
- return false;
-
- writer.incrementState();
-
- case 22:
- if (!writer.writeInt("taskNameHash", taskNameHash))
- return false;
-
- writer.incrementState();
-
- case 23:
- if (!writer.writeBoolean("topLocked", topLocked))
- return false;
-
- writer.incrementState();
-
- case 24:
- if (!writer.writeMessage("topVer", topVer))
- return false;
-
- writer.incrementState();
-
- case 25:
- if (!writer.writeMessage("updateVer", updateVer))
- return false;
-
- writer.incrementState();
-
- case 26:
- if (!writer.writeCollection("vals", vals, MessageCollectionItemType.MSG))
- return false;
-
- writer.incrementState();
-
- }
-
return true;
}
/** {@inheritDoc} */
@Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
- reader.setBuffer(buf);
-
- if (!reader.beforeMessageRead())
- return false;
-
- if (!super.readFrom(buf, reader))
- return false;
-
- switch (reader.state()) {
- case 3:
- clientReq = reader.readBoolean("clientReq");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 4:
- conflictExpireTimes = reader.readMessage("conflictExpireTimes");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 5:
- conflictTtls = reader.readMessage("conflictTtls");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 6:
- conflictVers = reader.readCollection("conflictVers", MessageCollectionItemType.MSG);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 7:
- entryProcessorsBytes = reader.readCollection("entryProcessorsBytes", MessageCollectionItemType.BYTE_ARR);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 8:
- expiryPlcBytes = reader.readByteArray("expiryPlcBytes");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 9:
- fastMap = reader.readBoolean("fastMap");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 10:
- filter = reader.readObjectArray("filter", MessageCollectionItemType.MSG, CacheEntryPredicate.class);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 11:
- futId = reader.readLong("futId");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 12:
- hasPrimary = reader.readBoolean("hasPrimary");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 13:
- invokeArgsBytes = reader.readObjectArray("invokeArgsBytes", MessageCollectionItemType.BYTE_ARR, byte[].class);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 14:
- keepBinary = reader.readBoolean("keepBinary");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 15:
- keys = reader.readCollection("keys", MessageCollectionItemType.MSG);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 16:
- byte opOrd;
-
- opOrd = reader.readByte("op");
-
- if (!reader.isLastRead())
- return false;
-
- op = GridCacheOperation.fromOrdinal(opOrd);
-
- reader.incrementState();
-
- case 17:
- partIds = reader.readCollection("partIds", MessageCollectionItemType.INT);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 18:
- retval = reader.readBoolean("retval");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 19:
- skipStore = reader.readBoolean("skipStore");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 20:
- subjId = reader.readUuid("subjId");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 21:
- byte syncModeOrd;
-
- syncModeOrd = reader.readByte("syncMode");
-
- if (!reader.isLastRead())
- return false;
-
- syncMode = CacheWriteSynchronizationMode.fromOrdinal(syncModeOrd);
-
- reader.incrementState();
-
- case 22:
- taskNameHash = reader.readInt("taskNameHash");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 23:
- topLocked = reader.readBoolean("topLocked");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 24:
- topVer = reader.readMessage("topVer");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 25:
- updateVer = reader.readMessage("updateVer");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 26:
- vals = reader.readCollection("vals", MessageCollectionItemType.MSG);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- }
-
return reader.afterMessageRead(GridNearAtomicFullUpdateRequest.class);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/30bfae0e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicMappingResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicMappingResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicMappingResponse.java
deleted file mode 100644
index 855fb78..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicMappingResponse.java
+++ /dev/null
@@ -1,244 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
-
-import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.UUID;
-import org.apache.ignite.internal.GridDirectCollection;
-import org.apache.ignite.internal.processors.cache.GridCacheMessage;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
-
-/**
- *
- */
-public class GridNearAtomicMappingResponse extends GridCacheMessage {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Message index. */
- public static final int CACHE_MSG_IDX = nextIndexId();
-
- /** */
- private static final int AFF_MAPPING_FLAG_MASK = 0x01;
-
- /** */
- private int part;
-
- /** */
- @GridDirectCollection(UUID.class)
- private List<UUID> mapping;
-
- /** */
- private long futId;
-
- /** */
- private byte flags;
-
- /**
- *
- */
- public GridNearAtomicMappingResponse() {
- // No-op.
- }
-
- /**
- * @param cacheId Cache ID.
- * @param part Partition.
- * @param futId Future ID.
- * @param mapping Mapping.
- * @param affMapping {@code True} if update mapping matches affinity function result.
- */
- GridNearAtomicMappingResponse(int cacheId, int part, long futId, List<UUID> mapping, boolean affMapping) {
- assert mapping == null || !affMapping;
-
- this.cacheId = cacheId;
- this.part = part;
- this.futId = futId;
- this.mapping = mapping;
-
- if (affMapping)
- setFlag(true, AFF_MAPPING_FLAG_MASK);
- }
-
- /**
- * @return {@code True} if update mapping matches affinity function result.
- */
- boolean affinityMapping() {
- return isFlag(AFF_MAPPING_FLAG_MASK);
- }
-
- /**
- * Sets flag mask.
- *
- * @param flag Set or clear.
- * @param mask Mask.
- */
- protected final void setFlag(boolean flag, int mask) {
- flags = flag ? (byte)(flags | mask) : (byte)(flags & ~mask);
- }
-
- /**
- * Reags flag mask.
- *
- * @param mask Mask to read.
- * @return Flag value.
- */
- final boolean isFlag(int mask) {
- return (flags & mask) != 0;
- }
-
- /** {@inheritDoc} */
- @Override public int partition() {
- return part;
- }
-
- /**
- * @return Mapping.
- */
- public List<UUID> mapping() {
- return mapping;
- }
-
- /**
- * @return Future ID.
- */
- public long futureId() {
- return futId;
- }
-
- /** {@inheritDoc} */
- @Override public int lookupIndex() {
- return CACHE_MSG_IDX;
- }
-
- /** {@inheritDoc} */
- @Override public byte directType() {
- return -47;
- }
-
- /** {@inheritDoc} */
- @Override public byte fieldsCount() {
- return 7;
- }
-
- /** {@inheritDoc} */
- @Override public boolean addDeploymentInfo() {
- return false;
- }
-
- /** {@inheritDoc} */
- @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
- writer.setBuffer(buf);
-
- if (!super.writeTo(buf, writer))
- return false;
-
- if (!writer.isHeaderWritten()) {
- if (!writer.writeHeader(directType(), fieldsCount()))
- return false;
-
- writer.onHeaderWritten();
- }
-
- switch (writer.state()) {
- case 3:
- if (!writer.writeByte("flags", flags))
- return false;
-
- writer.incrementState();
-
- case 4:
- if (!writer.writeLong("futId", futId))
- return false;
-
- writer.incrementState();
-
- case 5:
- if (!writer.writeCollection("mapping", mapping, MessageCollectionItemType.UUID))
- return false;
-
- writer.incrementState();
-
- case 6:
- if (!writer.writeInt("part", part))
- return false;
-
- writer.incrementState();
-
- }
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
- reader.setBuffer(buf);
-
- if (!reader.beforeMessageRead())
- return false;
-
- if (!super.readFrom(buf, reader))
- return false;
-
- switch (reader.state()) {
- case 3:
- flags = reader.readByte("flags");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 4:
- futId = reader.readLong("futId");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 5:
- mapping = reader.readCollection("mapping", MessageCollectionItemType.UUID);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 6:
- part = reader.readInt("part");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- }
-
- return reader.afterMessageRead(GridNearAtomicMappingResponse.class);
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(GridNearAtomicMappingResponse.class, this);
- }
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/30bfae0e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFilterRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFilterRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFilterRequest.java
index c474c83..b9ded3d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFilterRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFilterRequest.java
@@ -59,8 +59,6 @@ public class GridNearAtomicSingleUpdateFilterRequest extends GridNearAtomicSingl
* @param cacheId Cache ID.
* @param nodeId Node ID.
* @param futId Future ID.
- * @param fastMap Fast map scheme flag.
- * @param updateVer Update version set if fast map is performed.
* @param topVer Topology version.
* @param topLocked Topology locked flag.
* @param syncMode Synchronization mode.
@@ -71,15 +69,12 @@ public class GridNearAtomicSingleUpdateFilterRequest extends GridNearAtomicSingl
* @param taskNameHash Task name hash code.
* @param skipStore Skip write-through to a persistent storage.
* @param keepBinary Keep binary flag.
- * @param clientReq Client node request flag.
* @param addDepInfo Deployment info flag.
*/
GridNearAtomicSingleUpdateFilterRequest(
int cacheId,
UUID nodeId,
long futId,
- boolean fastMap,
- @Nullable GridCacheVersion updateVer,
@NotNull AffinityTopologyVersion topVer,
boolean topLocked,
CacheWriteSynchronizationMode syncMode,
@@ -88,17 +83,15 @@ public class GridNearAtomicSingleUpdateFilterRequest extends GridNearAtomicSingl
@Nullable CacheEntryPredicate[] filter,
@Nullable UUID subjId,
int taskNameHash,
+ boolean stableTop,
boolean skipStore,
boolean keepBinary,
- boolean clientReq,
boolean addDepInfo
) {
super(
cacheId,
nodeId,
futId,
- fastMap,
- updateVer,
topVer,
topLocked,
syncMode,
@@ -106,9 +99,9 @@ public class GridNearAtomicSingleUpdateFilterRequest extends GridNearAtomicSingl
retval,
subjId,
taskNameHash,
+ stableTop,
skipStore,
keepBinary,
- clientReq,
addDepInfo
);
http://git-wip-us.apache.org/repos/asf/ignite/blob/30bfae0e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
index b1b951f..a741735 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
@@ -102,8 +102,21 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
int remapCnt,
boolean waitTopFut
) {
- super(cctx, cache, syncMode, op, invokeArgs, retval, rawRetval, expiryPlc, filter, subjId, taskNameHash,
- skipStore, keepBinary, remapCnt, waitTopFut);
+ super(cctx,
+ cache,
+ syncMode,
+ op,
+ invokeArgs,
+ retval,
+ rawRetval,
+ expiryPlc,
+ filter,
+ subjId,
+ taskNameHash,
+ skipStore,
+ keepBinary,
+ remapCnt,
+ waitTopFut);
assert subjId != null;
@@ -159,7 +172,6 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
if (res != null) {
if (msgLog.isDebugEnabled()) {
msgLog.debug("Near update single fut, node left [futId=" + req.futureId() +
- ", writeVer=" + req.updateVersion() +
", node=" + nodeId + ']');
}
@@ -184,7 +196,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
GridCacheReturn ret = (GridCacheReturn)res;
Object retval = res == null ? null : rawRetval ? ret : (this.retval || op == TRANSFORM) ?
- cctx.unwrapBinaryIfNeeded(ret.value(), keepBinary) : ret.success();
+ cctx.unwrapBinaryIfNeeded(ret.value(), keepBinary) : ret.success();
if (op == TRANSFORM && retval == null)
retval = Collections.emptyMap();
@@ -202,30 +214,6 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
}
/** {@inheritDoc} */
- @Override public void onMappingReceived(UUID nodeId, GridNearAtomicMappingResponse res) {
- GridCacheReturn opRes0;
- CachePartialUpdateCheckedException err0;
- AffinityTopologyVersion remapTopVer0;
-
- synchronized (mux) {
- if (futId == null || futId != res.futureId())
- return;
-
- assert reqState != null;
-
- if (reqState.onMappingReceived(cctx, res)) {
- opRes0 = opRes;
- err0 = err;
- remapTopVer0 = onAllReceived();
- }
- else
- return;
- }
-
- finishUpdateFuture(opRes0, err0, remapTopVer0);
- }
-
- /** {@inheritDoc} */
@Override public void onDhtResponse(UUID nodeId, GridDhtAtomicNearResponse res) {
GridCacheReturn opRes0;
CachePartialUpdateCheckedException err0;
@@ -444,7 +432,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
private void updateNear(GridNearAtomicAbstractUpdateRequest req, GridNearAtomicUpdateResponse res) {
assert nearEnabled;
- if (res.remapTopologyVersion() != null || !req.hasPrimary())
+ if (res.remapTopologyVersion() != null)
return;
GridNearAtomicCache near = (GridNearAtomicCache)cctx.dht().near();
@@ -585,14 +573,14 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
GridNearAtomicAbstractUpdateRequest req;
+ boolean stableTop = true;
+
if (canUseSingleRequest()) {
if (op == TRANSFORM) {
req = new GridNearAtomicSingleUpdateInvokeRequest(
cctx.cacheId(),
primary.id(),
futId,
- false,
- null,
topVer,
topLocked,
syncMode,
@@ -601,9 +589,9 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
invokeArgs,
subjId,
taskNameHash,
+ stableTop,
skipStore,
keepBinary,
- cctx.kernalContext().clientNode(),
cctx.deploymentEnabled());
}
else {
@@ -612,8 +600,6 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
cctx.cacheId(),
primary.id(),
futId,
- false,
- null,
topVer,
topLocked,
syncMode,
@@ -621,9 +607,9 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
retval,
subjId,
taskNameHash,
+ stableTop,
skipStore,
keepBinary,
- cctx.kernalContext().clientNode(),
cctx.deploymentEnabled());
}
else {
@@ -631,8 +617,6 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
cctx.cacheId(),
primary.id(),
futId,
- false,
- null,
topVer,
topLocked,
syncMode,
@@ -641,9 +625,9 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
filter,
subjId,
taskNameHash,
+ stableTop,
skipStore,
keepBinary,
- cctx.kernalContext().clientNode(),
cctx.deploymentEnabled());
}
}
@@ -653,8 +637,6 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
cctx.cacheId(),
primary.id(),
futId,
- false,
- null,
topVer,
topLocked,
syncMode,
@@ -665,9 +647,9 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
filter,
subjId,
taskNameHash,
+ stableTop,
skipStore,
keepBinary,
- cctx.kernalContext().clientNode(),
cctx.deploymentEnabled(),
1);
}
@@ -676,8 +658,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
val,
CU.TTL_NOT_CHANGED,
CU.EXPIRE_TIME_CALCULATE,
- null,
- true);
+ null);
return req;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/30bfae0e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateInvokeRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateInvokeRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateInvokeRequest.java
index 269443f..5199602 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateInvokeRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateInvokeRequest.java
@@ -77,8 +77,6 @@ public class GridNearAtomicSingleUpdateInvokeRequest extends GridNearAtomicSingl
* @param cacheId Cache ID.
* @param nodeId Node ID.
* @param futId Future ID.
- * @param fastMap Fast map scheme flag.
- * @param updateVer Update version set if fast map is performed.
* @param topVer Topology version.
* @param topLocked Topology locked flag.
* @param syncMode Synchronization mode.
@@ -89,15 +87,12 @@ public class GridNearAtomicSingleUpdateInvokeRequest extends GridNearAtomicSingl
* @param taskNameHash Task name hash code.
* @param skipStore Skip write-through to a persistent storage.
* @param keepBinary Keep binary flag.
- * @param clientReq Client node request flag.
* @param addDepInfo Deployment info flag.
*/
GridNearAtomicSingleUpdateInvokeRequest(
int cacheId,
UUID nodeId,
long futId,
- boolean fastMap,
- @Nullable GridCacheVersion updateVer,
@NotNull AffinityTopologyVersion topVer,
boolean topLocked,
CacheWriteSynchronizationMode syncMode,
@@ -106,17 +101,15 @@ public class GridNearAtomicSingleUpdateInvokeRequest extends GridNearAtomicSingl
@Nullable Object[] invokeArgs,
@Nullable UUID subjId,
int taskNameHash,
+ boolean stableTop,
boolean skipStore,
boolean keepBinary,
- boolean clientReq,
boolean addDepInfo
) {
super(
cacheId,
nodeId,
futId,
- fastMap,
- updateVer,
topVer,
topLocked,
syncMode,
@@ -124,9 +117,9 @@ public class GridNearAtomicSingleUpdateInvokeRequest extends GridNearAtomicSingl
retval,
subjId,
taskNameHash,
+ stableTop,
skipStore,
keepBinary,
- clientReq,
addDepInfo
);
this.invokeArgs = invokeArgs;
@@ -140,14 +133,12 @@ public class GridNearAtomicSingleUpdateInvokeRequest extends GridNearAtomicSingl
* @param conflictTtl Conflict TTL (optional).
* @param conflictExpireTime Conflict expire time (optional).
* @param conflictVer Conflict version (optional).
- * @param primary If given key is primary on this mapping.
*/
@Override public void addUpdateEntry(KeyCacheObject key,
@Nullable Object val,
long conflictTtl,
long conflictExpireTime,
- @Nullable GridCacheVersion conflictVer,
- boolean primary) {
+ @Nullable GridCacheVersion conflictVer) {
assert conflictTtl < 0 : conflictTtl;
assert conflictExpireTime < 0 : conflictExpireTime;
assert conflictVer == null : conflictVer;
@@ -156,9 +147,6 @@ public class GridNearAtomicSingleUpdateInvokeRequest extends GridNearAtomicSingl
entryProcessor = (EntryProcessor<Object, Object, Object>)val;
this.key = key;
- partId = key.partition();
-
- hasPrimary(hasPrimary() | primary);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/30bfae0e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
index 577b130..42367b8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
@@ -57,9 +57,6 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractSin
/** Value to update. */
protected CacheObject val;
- /** Partition of key. */
- protected int partId;
-
/**
* Empty constructor required by {@link Externalizable}.
*/
@@ -73,8 +70,6 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractSin
* @param cacheId Cache ID.
* @param nodeId Node ID.
* @param futId Future ID.
- * @param fastMap Fast map scheme flag.
- * @param updateVer Update version set if fast map is performed.
* @param topVer Topology version.
* @param topLocked Topology locked flag.
* @param syncMode Synchronization mode.
@@ -84,15 +79,12 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractSin
* @param taskNameHash Task name hash code.
* @param skipStore Skip write-through to a persistent storage.
* @param keepBinary Keep binary flag.
- * @param clientReq Client node request flag.
* @param addDepInfo Deployment info flag.
*/
GridNearAtomicSingleUpdateRequest(
int cacheId,
UUID nodeId,
long futId,
- boolean fastMap,
- @Nullable GridCacheVersion updateVer,
@NotNull AffinityTopologyVersion topVer,
boolean topLocked,
CacheWriteSynchronizationMode syncMode,
@@ -100,17 +92,14 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractSin
boolean retval,
@Nullable UUID subjId,
int taskNameHash,
+ boolean stableTop,
boolean skipStore,
boolean keepBinary,
- boolean clientReq,
boolean addDepInfo
) {
- super(
- cacheId,
+ super(cacheId,
nodeId,
futId,
- fastMap,
- updateVer,
topVer,
topLocked,
syncMode,
@@ -118,16 +107,17 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractSin
retval,
subjId,
taskNameHash,
+ stableTop,
skipStore,
keepBinary,
- clientReq,
- addDepInfo
- );
+ addDepInfo);
}
/** {@inheritDoc} */
@Override public int partition() {
- return partId;
+ assert key != null;
+
+ return key.partition();
}
/**
@@ -136,14 +126,12 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractSin
* @param conflictTtl Conflict TTL (optional).
* @param conflictExpireTime Conflict expire time (optional).
* @param conflictVer Conflict version (optional).
- * @param primary If given key is primary on this mapping.
*/
@Override public void addUpdateEntry(KeyCacheObject key,
@Nullable Object val,
long conflictTtl,
long conflictExpireTime,
- @Nullable GridCacheVersion conflictVer,
- boolean primary) {
+ @Nullable GridCacheVersion conflictVer) {
assert op != TRANSFORM;
assert val != null || op == DELETE;
assert conflictTtl < 0 : conflictTtl;
@@ -151,15 +139,12 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractSin
assert conflictVer == null : conflictVer;
this.key = key;
- partId = key.partition();
if (val != null) {
assert val instanceof CacheObject : val;
this.val = (CacheObject)val;
}
-
- hasPrimary(hasPrimary() | primary);
}
/** {@inheritDoc} */
@@ -255,8 +240,6 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractSin
if (val != null)
val.finishUnmarshal(cctx.cacheObjectContext(), ldr);
-
- key.partition(partId);
}
/** {@inheritDoc} */
@@ -280,12 +263,6 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractSin
writer.incrementState();
- case 12:
- if (!writer.writeInt("partId", partId))
- return false;
-
- writer.incrementState();
-
case 13:
if (!writer.writeMessage("val", val))
return false;
@@ -316,14 +293,6 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractSin
reader.incrementState();
- case 12:
- partId = reader.readInt("partId");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
case 13:
val = reader.readMessage("val");
http://git-wip-us.apache.org/repos/asf/ignite/blob/30bfae0e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index 573cb40..629c447 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -233,7 +233,6 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
if (msgLog.isDebugEnabled()) {
msgLog.debug("Near update fut, node left [futId=" + req.futureId() +
- ", writeVer=" + req.updateVersion() +
", node=" + nodeId + ']');
}
@@ -284,51 +283,6 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
}
/** {@inheritDoc} */
- @Override public void onMappingReceived(UUID nodeId, GridNearAtomicMappingResponse res) {
- GridCacheReturn opRes0;
- CachePartialUpdateCheckedException err0;
- AffinityTopologyVersion remapTopVer0;
-
- synchronized (mux) {
- if (futId == null || futId != res.futureId())
- return;
-
- PrimaryRequestState reqState;
-
- if (singleReq != null) {
- if (singleReq.onMappingReceived(cctx, res)) {
- opRes0 = opRes;
- err0 = err;
- remapTopVer0 = onAllReceived();
- }
- else
- return;
- }
- else {
- reqState = mappings != null ? mappings.get(nodeId) : null;
-
- if (reqState != null && reqState.onMappingReceived(cctx, res)) {
- assert mappings.size() > resCnt : "[mappings=" + mappings.size() + ", cnt=" + resCnt + ']';
-
- resCnt++;
-
- if (mappings.size() == resCnt) {
- opRes0 = opRes;
- err0 = err;
- remapTopVer0 = onAllReceived();
- }
- else
- return;
- }
- else
- return;
- }
- }
-
- finishUpdateFuture(opRes0, err0, remapTopVer0);
- }
-
- /** {@inheritDoc} */
@Override public void onDhtResponse(UUID nodeId, GridDhtAtomicNearResponse res) {
GridCacheReturn opRes0;
CachePartialUpdateCheckedException err0;
@@ -756,14 +710,12 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
if (msgLog.isDebugEnabled()) {
msgLog.debug("Near update fut, sent request [futId=" + req.futureId() +
- ", writeVer=" + req.updateVersion() +
", node=" + req.nodeId() + ']');
}
}
catch (IgniteCheckedException e) {
if (msgLog.isDebugEnabled()) {
msgLog.debug("Near update fut, failed to send request [futId=" + req.futureId() +
- ", writeVer=" + req.updateVersion() +
", node=" + req.nodeId() +
", err=" + e + ']');
}
@@ -817,7 +769,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
if (size == 1) {
assert remapKeys == null || remapKeys.size() == 1;
- singleReq0 = mapSingleUpdate(topVer, futId, null);
+ singleReq0 = mapSingleUpdate(topVer, futId);
}
else {
Map<UUID, PrimaryRequestState> pendingMappings = mapUpdate(topNodes,
@@ -923,6 +875,8 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
Map<UUID, PrimaryRequestState> pendingMappings = U.newHashMap(topNodes.size());
+ boolean stableTop = true;
+
// Create mappings first, then send messages.
for (Object key : keys) {
if (key == null)
@@ -991,8 +945,6 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
cctx.cacheId(),
nodeId,
futId,
- false,
- null,
topVer,
topLocked,
syncMode,
@@ -1003,16 +955,16 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
filter,
subjId,
taskNameHash,
+ stableTop,
skipStore,
keepBinary,
- cctx.kernalContext().clientNode(),
cctx.deploymentEnabled(),
keys.size()));
pendingMappings.put(nodeId, mapped);
}
- mapped.req.addUpdateEntry(cacheKey, val, conflictTtl, conflictExpireTime, conflictVer, true);
+ mapped.req.addUpdateEntry(cacheKey, val, conflictTtl, conflictExpireTime, conflictVer);
}
return pendingMappings;
@@ -1021,13 +973,10 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
/**
* @param topVer Topology version.
* @param futId Future ID.
- * @param updVer Update version.
* @return Request.
* @throws Exception If failed.
*/
- private PrimaryRequestState mapSingleUpdate(AffinityTopologyVersion topVer,
- Long futId,
- @Nullable GridCacheVersion updVer) throws Exception {
+ private PrimaryRequestState mapSingleUpdate(AffinityTopologyVersion topVer, Long futId) throws Exception {
Object key = F.first(keys);
Object val;
@@ -1086,12 +1035,12 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all partition nodes " +
"left the grid).");
+ boolean stableTop = true;
+
GridNearAtomicFullUpdateRequest req = new GridNearAtomicFullUpdateRequest(
cctx.cacheId(),
primary.id(),
futId,
- false,
- updVer,
topVer,
topLocked,
syncMode,
@@ -1102,9 +1051,9 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
filter,
subjId,
taskNameHash,
+ stableTop,
skipStore,
keepBinary,
- cctx.kernalContext().clientNode(),
cctx.deploymentEnabled(),
1);
@@ -1112,8 +1061,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
val,
conflictTtl,
conflictExpireTime,
- conflictVer,
- true);
+ conflictVer);
return new PrimaryRequestState(req);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/30bfae0e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
index 76f2ea0..6bb8913 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
@@ -104,10 +104,6 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
/** Near expire times. */
private GridLongList nearExpireTimes;
- /** */
- @GridDirectCollection(UUID.class)
- private List<UUID> mapping;
-
/** Partition ID. */
private int partId = -1;
@@ -131,20 +127,6 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
this.addDepInfo = addDepInfo;
}
- /**
- * @return Update mapping.
- */
- public List<UUID> mapping() {
- return mapping;
- }
-
- /**
- * @param mapping Mapping.
- */
- public void mapping(List<UUID> mapping) {
- this.mapping = mapping;
- }
-
/** {@inheritDoc} */
@Override public int lookupIndex() {
return CACHE_MSG_IDX;
@@ -499,12 +481,6 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
writer.incrementState();
- case 6:
- if (!writer.writeCollection("mapping", mapping, MessageCollectionItemType.UUID))
- return false;
-
- writer.incrementState();
-
case 7:
if (!writer.writeMessage("nearExpireTimes", nearExpireTimes))
return false;
@@ -599,14 +575,6 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
reader.incrementState();
- case 6:
- mapping = reader.readCollection("mapping", MessageCollectionItemType.UUID);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
case 7:
nearExpireTimes = reader.readMessage("nearExpireTimes");
http://git-wip-us.apache.org/repos/asf/ignite/blob/30bfae0e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
index f8ae661..62aecd1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
@@ -142,10 +142,7 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
List<Integer> nearValsIdxs = res.nearValuesIndexes();
List<Integer> skipped = res.skippedIndexes();
- GridCacheVersion ver = req.updateVersion();
-
- if (ver == null)
- ver = res.nearVersion();
+ GridCacheVersion ver = res.nearVersion();
assert ver != null : "Failed to find version [req=" + req + ", res=" + res + ']';
@@ -195,7 +192,6 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
processNearAtomicUpdateResponse(ver,
key,
val,
- null,
ttl,
expireTime,
req.keepBinary(),
@@ -213,7 +209,6 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
* @param ver Version.
* @param key Key.
* @param val Value.
- * @param valBytes Value bytes.
* @param ttl TTL.
* @param expireTime Expire time.
* @param nodeId Node ID.
@@ -225,7 +220,6 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
GridCacheVersion ver,
KeyCacheObject key,
@Nullable CacheObject val,
- @Nullable byte[] valBytes,
long ttl,
long expireTime,
boolean keepBinary,
@@ -242,7 +236,7 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
try {
entry = entryEx(key, topVer);
- GridCacheOperation op = (val != null || valBytes != null) ? UPDATE : DELETE;
+ GridCacheOperation op = val != null ? UPDATE : DELETE;
GridCacheUpdateAtomicResult updRes = entry.innerUpdate(
ver,
http://git-wip-us.apache.org/repos/asf/ignite/blob/30bfae0e/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java
index cbb2b8a..081e49f 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java
@@ -402,9 +402,6 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac
assertEquals(3, msgs.size());
- for (Object msg : msgs)
- assertTrue(((GridNearAtomicFullUpdateRequest)msg).clientRequest());
-
map.put(primaryKey(ignite0.cache(null)), 3);
map.put(primaryKey(ignite1.cache(null)), 4);
map.put(primaryKey(ignite2.cache(null)), 5);
[2/2] ignite git commit: ignite-4705
Posted by sb...@apache.org.
ignite-4705
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/30bfae0e
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/30bfae0e
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/30bfae0e
Branch: refs/heads/ignite-4705-2
Commit: 30bfae0e6d5e8ae5098771d6754e4fa24becbd13
Parents: 19c340c
Author: sboikov <sb...@gridgain.com>
Authored: Thu Mar 2 14:47:21 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Mar 2 18:34:19 2017 +0300
----------------------------------------------------------------------
.../communication/GridIoMessageFactory.java | 8 +-
.../processors/cache/GridCacheIoManager.java | 4 +-
.../processors/cache/GridCacheMvccManager.java | 2 +-
.../GridDhtAtomicAbstractUpdateFuture.java | 219 +++----
.../GridDhtAtomicAbstractUpdateRequest.java | 54 +-
.../dht/atomic/GridDhtAtomicCache.java | 269 ++-------
.../dht/atomic/GridDhtAtomicNearResponse.java | 115 +---
.../atomic/GridDhtAtomicSingleUpdateFuture.java | 16 +-
.../GridDhtAtomicSingleUpdateRequest.java | 4 -
.../dht/atomic/GridDhtAtomicUpdateFuture.java | 14 +-
.../dht/atomic/GridDhtAtomicUpdateRequest.java | 4 -
.../dht/atomic/GridDhtAtomicUpdateResponse.java | 35 +-
...idNearAtomicAbstractSingleUpdateRequest.java | 475 +--------------
.../GridNearAtomicAbstractUpdateFuture.java | 107 ++--
.../GridNearAtomicAbstractUpdateRequest.java | 285 +++++++--
.../atomic/GridNearAtomicFullUpdateRequest.java | 594 +------------------
.../atomic/GridNearAtomicMappingResponse.java | 244 --------
...GridNearAtomicSingleUpdateFilterRequest.java | 11 +-
.../GridNearAtomicSingleUpdateFuture.java | 67 +--
...GridNearAtomicSingleUpdateInvokeRequest.java | 18 +-
.../GridNearAtomicSingleUpdateRequest.java | 47 +-
.../dht/atomic/GridNearAtomicUpdateFuture.java | 72 +--
.../atomic/GridNearAtomicUpdateResponse.java | 32 -
.../distributed/near/GridNearAtomicCache.java | 10 +-
...niteCacheClientNodeChangingTopologyTest.java | 3 -
25 files changed, 568 insertions(+), 2141 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/30bfae0e/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 908d1d6..fa59291 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -72,7 +72,6 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDh
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicFullUpdateRequest;
-import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicMappingResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateFilterRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateInvokeRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateRequest;
@@ -121,11 +120,11 @@ import org.apache.ignite.internal.processors.datastreamer.DataStreamerEntry;
import org.apache.ignite.internal.processors.datastreamer.DataStreamerRequest;
import org.apache.ignite.internal.processors.datastreamer.DataStreamerResponse;
import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
+import org.apache.ignite.internal.processors.hadoop.shuffle.HadoopDirectShuffleMessage;
import org.apache.ignite.internal.processors.hadoop.shuffle.HadoopShuffleAck;
import org.apache.ignite.internal.processors.hadoop.shuffle.HadoopShuffleFinishRequest;
import org.apache.ignite.internal.processors.hadoop.shuffle.HadoopShuffleFinishResponse;
import org.apache.ignite.internal.processors.hadoop.shuffle.HadoopShuffleMessage;
-import org.apache.ignite.internal.processors.hadoop.shuffle.HadoopDirectShuffleMessage;
import org.apache.ignite.internal.processors.igfs.IgfsAckMessage;
import org.apache.ignite.internal.processors.igfs.IgfsBlockKey;
import org.apache.ignite.internal.processors.igfs.IgfsBlocksMessage;
@@ -176,11 +175,6 @@ public class GridIoMessageFactory implements MessageFactory {
Message msg = null;
switch (type) {
- case -47:
- msg = new GridNearAtomicMappingResponse();
-
- break;
-
case -46:
msg = new UpdateErrors();
http://git-wip-us.apache.org/repos/asf/ignite/blob/30bfae0e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index c46b01a..d1a6753 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -494,9 +494,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
* @return Atomic future ID if applicable for message.
*/
@Nullable private GridCacheVersion atomicWriteVersion(GridCacheMessage cacheMsg) {
- if (cacheMsg instanceof GridNearAtomicAbstractUpdateRequest)
- return ((GridNearAtomicAbstractUpdateRequest)cacheMsg).updateVersion();
- else if (cacheMsg instanceof GridDhtAtomicAbstractUpdateRequest)
+ if (cacheMsg instanceof GridDhtAtomicAbstractUpdateRequest)
return ((GridDhtAtomicAbstractUpdateRequest)cacheMsg).writeVersion();
return null;
http://git-wip-us.apache.org/repos/asf/ignite/blob/30bfae0e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
index f3d3fb6..7b87769 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
@@ -136,7 +136,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
private volatile boolean stopping;
/** */
- private final AtomicLong atomicFutId = new AtomicLong();
+ private final AtomicLong atomicFutId = new AtomicLong(U.currentTimeMillis());
/** Lock callback. */
@GridToStringExclude
http://git-wip-us.apache.org/repos/asf/ignite/blob/30bfae0e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
index 094d643..96f3611 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@@ -52,7 +51,6 @@ import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
-import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC;
/**
* DHT atomic cache backup update future.
@@ -78,7 +76,7 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
protected final GridCacheContext cctx;
/** Future version. */
- protected final long futId;
+ protected final Long futId;
/** Update request. */
final GridNearAtomicAbstractUpdateRequest updateReq;
@@ -94,25 +92,31 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
private volatile int resCnt;
/** */
- private boolean repliedToNear;
+ private final GridNearAtomicUpdateResponse updateRes;
/** */
- private boolean affMapping = true;
+ private final GridDhtAtomicCache.UpdateReplyClosure completionCb;
/**
* @param cctx Cache context.
* @param writeVer Write version.
* @param updateReq Update request.
+ * @param updateRes Response.
+ * @param completionCb Callback to invoke to send response to near node.
*/
protected GridDhtAtomicAbstractUpdateFuture(
GridCacheContext cctx,
GridCacheVersion writeVer,
- GridNearAtomicAbstractUpdateRequest updateReq
+ GridNearAtomicAbstractUpdateRequest updateReq,
+ GridNearAtomicUpdateResponse updateRes,
+ GridDhtAtomicCache.UpdateReplyClosure completionCb
) {
this.cctx = cctx;
this.updateReq = updateReq;
this.writeVer = writeVer;
+ this.updateRes = updateRes;
+ this.completionCb = completionCb;
futId = cctx.mvcc().atomicFutureId();
@@ -145,7 +149,6 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
}
/**
- * @param nearNodeId Near node ID.
* @param entry Entry to map.
* @param val Value to write.
* @param entryProcessor Entry processor.
@@ -159,7 +162,6 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
@SuppressWarnings("ForLoopReplaceableByForEach")
final void addWriteEntry(
AffinityAssignment affAssignment,
- UUID nearNodeId,
GridDhtCacheEntry entry,
@Nullable CacheObject val,
EntryProcessor<Object, Object, Object> entryProcessor,
@@ -175,9 +177,7 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
List<ClusterNode> dhtNodes = cctx.dht().topology().nodes(entry.partition(), affAssignment, affNodes);
- if (dhtNodes != null)
- affMapping = false;
- else
+ if (dhtNodes == null)
dhtNodes = affNodes;
if (log.isDebugEnabled())
@@ -198,7 +198,6 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
if (updateReq == null) {
updateReq = createRequest(
node.id(),
- nearNodeId,
futId,
writeVer,
syncMode,
@@ -236,7 +235,6 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
protected abstract void addNearKey(KeyCacheObject key, Collection<UUID> readers);
/**
- * @param nearNodeId Near node ID.
* @param readers Entry readers.
* @param entry Entry.
* @param val Value.
@@ -245,15 +243,12 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
* @param expireTime Expire time for near cache update (optional).
*/
final void addNearWriteEntries(
- UUID nearNodeId,
Collection<UUID> readers,
GridDhtCacheEntry entry,
@Nullable CacheObject val,
EntryProcessor<Object, Object, Object> entryProcessor,
long ttl,
long expireTime) {
- affMapping = false;
-
CacheWriteSynchronizationMode syncMode = updateReq.writeSynchronizationMode();
addNearKey(entry.key(), readers);
@@ -272,7 +267,6 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
updateReq = createRequest(
node.id(),
- nearNodeId,
futId,
writeVer,
syncMode,
@@ -332,14 +326,11 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
GridDhtAtomicAbstractUpdateRequest req = mappings != null ? mappings.get(nodeId) : null;
- boolean needReplyToNear = false;
+// boolean needReplyToNear = false;
if (req != null) {
synchronized (this) {
if (req.onResponse()) {
- if (errors != null || (nodeErr && !repliedToNear))
- needReplyToNear = repliedToNear = true;
-
resCnt0 = resCnt;
resCnt0 += 1;
@@ -349,53 +340,50 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
else
return false;
}
-
- if (needReplyToNear) {
- assert !F.isEmpty(mappings);
-
- List<UUID> dhtNodes = new ArrayList<>(mappings.size());
-
- dhtNodes.addAll(mappings.keySet());
-
- GridDhtAtomicNearResponse res = new GridDhtAtomicNearResponse(cctx.cacheId(),
- req.partition(),
- req.nearFutureId(),
- cctx.localNodeId(),
- dhtNodes,
- req.flags());
-
- res.errors(errors);
-
- res.failedNodeId(nodeId);
-
- try {
- cctx.io().send(req.nearNodeId(), res, cctx.ioPolicy());
-
- if (msgLog.isDebugEnabled()) {
- msgLog.debug("DTH update fut, sent response on DHT node fail " +
- "[futId=" + futId +
- ", writeVer=" + writeVer +
- ", node=" + req.nearNodeId() +
- ", failedNode=" + nodeId + ']');
- }
- }
- catch (ClusterTopologyCheckedException ignored) {
- if (msgLog.isDebugEnabled()) {
- msgLog.debug("DTH update fut, failed to notify near node on DHT node fail, near node left " +
- "[futId=" + futId +
- ", writeVer=" + writeVer +
- ", node=" + req.nearNodeId() +
- ", failedNode=" + nodeId + ']');
- }
- }
- catch (IgniteCheckedException ignored) {
- U.error(msgLog, "DTH update fut, failed to notify near node on DHT node fail " +
- "[futId=" + futId +
- ", writeVer=" + writeVer +
- ", node=" + req.nearNodeId() +
- ", failedNode=" + nodeId + ']');
- }
- }
+//
+// if (needReplyToNear) {
+// assert !F.isEmpty(mappings);
+//
+// List<UUID> dhtNodes = new ArrayList<>(mappings.size());
+//
+// dhtNodes.addAll(mappings.keySet());
+//
+// GridDhtAtomicNearResponse res = new GridDhtAtomicNearResponse(cctx.cacheId(),
+// req.partition(),
+// req.nearFutureId(),
+// cctx.localNodeId(),
+// req.flags());
+//
+// res.errors(errors);
+//
+// try {
+// cctx.io().send(req.nearNodeId(), res, cctx.ioPolicy());
+//
+// if (msgLog.isDebugEnabled()) {
+// msgLog.debug("DTH update fut, sent response on DHT node fail " +
+// "[futId=" + futId +
+// ", writeVer=" + writeVer +
+// ", node=" + req.nearNodeId() +
+// ", failedNode=" + nodeId + ']');
+// }
+// }
+// catch (ClusterTopologyCheckedException ignored) {
+// if (msgLog.isDebugEnabled()) {
+// msgLog.debug("DTH update fut, failed to notify near node on DHT node fail, near node left " +
+// "[futId=" + futId +
+// ", writeVer=" + writeVer +
+// ", node=" + req.nearNodeId() +
+// ", failedNode=" + nodeId + ']');
+// }
+// }
+// catch (IgniteCheckedException ignored) {
+// U.error(msgLog, "DTH update fut, failed to notify near node on DHT node fail " +
+// "[futId=" + futId +
+// ", writeVer=" + writeVer +
+// ", node=" + req.nearNodeId() +
+// ", failedNode=" + nodeId + ']');
+// }
+// }
if (resCnt0 == mappings.size())
onDone();
@@ -406,97 +394,49 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
return false;
}
- final void init(GridNearAtomicUpdateResponse updateRes, GridCacheReturn ret) {
- repliedToNear = updateReq.writeSynchronizationMode() == PRIMARY_SYNC ||
- ret.hasValue() ||
- updateRes.nearVersion() != null ||
- updateRes.nodeId().equals(cctx.localNodeId());
- }
-
/**
* Sends requests to remote nodes.
*
- * @param updateRes Response.
- * @param completionCb Callback to invoke to send response to near node.
* @param ret Cache operation return value.
*/
- final void map(GridNearAtomicUpdateResponse updateRes,
- GridDhtAtomicCache.UpdateReplyClosure completionCb,
- GridCacheReturn ret) {
- boolean fullSync = updateReq.writeSynchronizationMode() == FULL_SYNC;
-
- boolean needReplyToNear = repliedToNear;
+ final void map(ClusterNode nearNode, GridCacheReturn ret) {
+ if (F.isEmpty(mappings)) {
+ completionCb.apply(updateReq, updateRes);
- List<UUID> dhtNodes = null;
+ onDone();
- boolean affMapping = false;
+ return;
+ }
- if (fullSync) {
- if (!F.isEmpty(mappings)) {
- if (updateReq.size() == 1 && this.affMapping)
- affMapping = true;
- else {
- dhtNodes = new ArrayList<>(mappings.size());
+ boolean fullSync = updateReq.writeSynchronizationMode() == FULL_SYNC;
- dhtNodes.addAll(mappings.keySet());
- }
- }
- else
- dhtNodes = Collections.emptyList();
- }
- else
- dhtNodes = Collections.emptyList();
+ if (updateReq.dhtReplyToNear()) {
+ assert fullSync;
- updateRes.mapping(dhtNodes);
+ boolean needReplyToNear = ret.hasValue() || updateRes.nearVersion() != null;
- if (!F.isEmpty(mappings)) {
- sendDhtRequests(fullSync && !needReplyToNear, dhtNodes, affMapping, ret);
+ sendDhtRequests(true, nearNode, ret);
if (needReplyToNear)
completionCb.apply(updateReq, updateRes);
- else {
- if (fullSync && GridDhtAtomicCache.IGNITE_ATOMIC_SND_MAPPING_TO_NEAR) {
- GridNearAtomicMappingResponse mappingRes = new GridNearAtomicMappingResponse(
- cctx.cacheId(),
- updateReq.partition(),
- updateReq.futureId(),
- dhtNodes,
- affMapping);
-
- try {
- cctx.io().send(updateRes.nodeId(), mappingRes, cctx.ioPolicy());
- }
- catch (IgniteCheckedException e) {
- U.error(msgLog, "Failed to send mapping response [futId=" + futId +
- ", writeVer=" + writeVer +
- ", node=" + updateRes.nodeId() + ']');
- }
- }
- }
}
else {
- completionCb.apply(updateReq, updateRes);
+ sendDhtRequests(false, nearNode, ret);
- onDone();
+ if (!fullSync)
+ completionCb.apply(updateReq, updateRes);
}
}
/**
* @param nearReplyInfo {@code True} if need add information for near node response.
- * @param dhtNodes DHT nodes.
* @param ret Return value.
*/
- private void sendDhtRequests(boolean nearReplyInfo, List<UUID> dhtNodes, boolean affMapping, GridCacheReturn ret) {
+ private void sendDhtRequests(boolean nearReplyInfo, ClusterNode nearNode, GridCacheReturn ret) {
for (GridDhtAtomicAbstractUpdateRequest req : mappings.values()) {
try {
if (nearReplyInfo) {
- if (affMapping) {
- assert dhtNodes == null;
-
- req.affinityMapping(true);
- }
- else
- req.dhtNodes(dhtNodes);
+ req.nearReplyInfo(nearNode.id(), updateReq.futureId());
if (!ret.hasValue())
req.setResult(ret.success());
@@ -549,8 +489,13 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
}
/**
+ * @param updateRes Response.
+ * @param err Error.
+ */
+ protected abstract void addFailedKeys(GridNearAtomicUpdateResponse updateRes, Throwable err);
+
+ /**
* @param nodeId Node ID.
- * @param nearNodeId Near node ID.
* @param futId Future ID.
* @param writeVer Update version.
* @param syncMode Write synchronization mode.
@@ -562,7 +507,6 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
*/
protected abstract GridDhtAtomicAbstractUpdateRequest createRequest(
UUID nodeId,
- UUID nearNodeId,
long futId,
GridCacheVersion writeVer,
CacheWriteSynchronizationMode syncMode,
@@ -584,6 +528,13 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
clsr.apply(suc);
}
+ if (updateReq.writeSynchronizationMode() == FULL_SYNC && !updateReq.dhtReplyToNear()) {
+ if (!suc)
+ addFailedKeys(updateRes, err);
+
+ completionCb.apply(updateReq, updateRes);
+ }
+
return true;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/30bfae0e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java
index 896c163..e04e381 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java
@@ -58,12 +58,6 @@ public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheMessag
/** */
static final int DHT_ATOMIC_RESULT_SUCCESS_MASK = 0x10;
- /** */
- static final int DHT_ATOMIC_PRIMARY_DHT_FAIL_RESPONSE = 0x20;
-
- /** */
- static final int DHT_ATOMIC_AFF_MAPPING_FLAG_MASK = 0x40;
-
/** Message index. */
public static final int CACHE_MSG_IDX = nextIndexId();
@@ -102,10 +96,6 @@ public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheMessag
/** Additional flags. */
protected byte flags;
- /** */
- @GridDirectCollection(UUID.class)
- private List<UUID> dhtNodes;
-
/**
* Empty constructor required by {@link Externalizable}.
*/
@@ -118,14 +108,10 @@ public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheMessag
*
* @param cacheId Cache ID.
* @param nodeId Node ID.
- * @param nearNodeId Near node ID.
- * @param nearFutId Future ID on near node.
*/
protected GridDhtAtomicAbstractUpdateRequest(int cacheId,
UUID nodeId,
long futId,
- UUID nearNodeId,
- long nearFutId,
GridCacheVersion writeVer,
CacheWriteSynchronizationMode syncMode,
@NotNull AffinityTopologyVersion topVer,
@@ -138,8 +124,6 @@ public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheMessag
this.cacheId = cacheId;
this.nodeId = nodeId;
this.futId = futId;
- this.nearNodeId = nearNodeId;
- this.nearFutId = nearFutId;
this.writeVer = writeVer;
this.syncMode = syncMode;
this.topVer = topVer;
@@ -153,11 +137,11 @@ public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheMessag
setFlag(true, DHT_ATOMIC_KEEP_BINARY_FLAG_MASK);
}
- /**
- * @param affMapping
- */
- public void affinityMapping(boolean affMapping) {
- setFlag(affMapping, DHT_ATOMIC_AFF_MAPPING_FLAG_MASK);
+ void nearReplyInfo(UUID nearNodeId, long nearFutId) {
+ assert nearNodeId != null;
+
+ this.nearNodeId = nearNodeId;
+ this.nearFutId = nearFutId;
}
/**
@@ -176,20 +160,6 @@ public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheMessag
return nearNodeId;
}
- /**
- * @param dhtNodes DHT nodes.
- */
- void dhtNodes(List<UUID> dhtNodes) {
- this.dhtNodes = dhtNodes;
- }
-
- /**
- * @return DHT nodes.
- */
- List<UUID> dhtNodes() {
- return dhtNodes;
- }
-
/** {@inheritDoc} */
@Override public int lookupIndex() {
return CACHE_MSG_IDX;
@@ -478,12 +448,6 @@ public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheMessag
}
switch (writer.state()) {
- case 3:
- if (!writer.writeCollection("dhtNodes", dhtNodes, MessageCollectionItemType.UUID))
- return false;
-
- writer.incrementState();
-
case 4:
if (!writer.writeByte("flags", flags))
return false;
@@ -554,14 +518,6 @@ public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheMessag
return false;
switch (reader.state()) {
- case 3:
- dhtNodes = reader.readCollection("dhtNodes", MessageCollectionItemType.UUID);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
case 4:
flags = reader.readByte("flags");
http://git-wip-us.apache.org/repos/asf/ignite/blob/30bfae0e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index a58f1ca..f5d06dd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -34,7 +34,6 @@ import javax.cache.processor.EntryProcessorResult;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.NodeStoppingException;
@@ -93,14 +92,12 @@ import org.apache.ignite.internal.util.typedef.CI2;
import org.apache.ignite.internal.util.typedef.CO;
import org.apache.ignite.internal.util.typedef.CX1;
import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.P1;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteClosure;
-import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgniteOutClosure;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.plugin.security.SecurityPermission;
@@ -138,14 +135,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
Integer.getInteger(IGNITE_ATOMIC_DEFERRED_ACK_TIMEOUT, 500);
/** */
- static final boolean IGNITE_ATOMIC_SND_MAPPING_TO_NEAR =
- IgniteSystemProperties.getBoolean("IGNITE_ATOMIC_SND_MAPPING_TO_NEAR", false);
-
- /** */
- private static final boolean IGNITE_ATOMIC_DHT_REPLY_AFTER_ACK =
- IgniteSystemProperties.getBoolean("IGNITE_ATOMIC_DHT_REPLY_AFTER_ACK", true);
-
- /** */
private final ThreadLocal<Map<UUID, GridDhtAtomicDeferredUpdateResponse>> defRes =
new ThreadLocal<Map<UUID, GridDhtAtomicDeferredUpdateResponse>>() {
@Override protected Map<UUID, GridDhtAtomicDeferredUpdateResponse> initialValue() {
@@ -244,12 +233,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
@Override public void start() throws IgniteCheckedException {
super.start();
- // TODO IGNITE-4705.
- log.info("Atomic cache start [name=" + name() +
- ", mode=" + configuration().getWriteSynchronizationMode() +
- ", IGNITE_ATOMIC_SND_MAPPING_TO_NEAR=" + IGNITE_ATOMIC_SND_MAPPING_TO_NEAR +
- ", IGNITE_ATOMIC_DHT_REPLY_AFTER_ACK=" + IGNITE_ATOMIC_DHT_REPLY_AFTER_ACK + ']');
-
CacheMetricsImpl m = new CacheMetricsImpl(ctx);
if (ctx.dht().near() != null)
@@ -395,19 +378,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
}
});
- ctx.io().addHandler(ctx.cacheId(),
- GridNearAtomicMappingResponse.class,
- new CI2<UUID, GridNearAtomicMappingResponse>() {
- @Override public void apply(UUID uuid, GridNearAtomicMappingResponse msg) {
- processDhtAtomicNearMappingResponse(uuid, msg);
- }
-
- @Override public String toString() {
- return "GridNearAtomicMappingResponse handler " +
- "[msgIdx=" + GridNearAtomicMappingResponse.CACHE_MSG_IDX + ']';
- }
- });
-
if (near == null) {
ctx.io().addHandler(
ctx.cacheId(),
@@ -1813,7 +1783,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
boolean sndPrevVal = !top.rebalanceFinished(req.topologyVersion());
- dhtFut = createDhtFuture(ver, req);
+ dhtFut = createDhtFuture(ver, req, res, completionCb);
expiry = expiryPolicy(req.expiry());
@@ -1867,11 +1837,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
res.returnValue(retVal);
- if (dhtFut != null) {
- dhtFut.init(res, res.returnValue());
-
+ if (dhtFut != null)
ctx.mvcc().addAtomicFuture(dhtFut.id(), dhtFut);
- }
}
else {
// Should remap all keys.
@@ -1904,8 +1871,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
}
}
catch (GridDhtInvalidPartitionException ignore) {
- assert !req.fastMap() || req.clientRequest() : req;
-
if (log.isDebugEnabled())
log.debug("Caught invalid partition exception for cache entry (will remap update request): " + req);
@@ -1936,7 +1901,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
else {
// If there are backups, map backup update future.
if (dhtFut != null) {
- dhtFut.map(res, completionCb, res.returnValue());
+ dhtFut.map(node, res.returnValue());
// Otherwise, complete the call.
}
else
@@ -2469,7 +2434,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
dhtFut);
if (dhtFut == null && !F.isEmpty(filteredReaders)) {
- dhtFut = createDhtFuture(ver, req);
+ dhtFut = createDhtFuture(ver, req, null, null); // TODO IGNITE-4705.
readersOnly = true;
}
@@ -2488,7 +2453,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
if (!readersOnly) {
dhtFut.addWriteEntry(
affAssignment,
- nearNode.id(),
entry,
updRes.newValue(),
entryProcessor,
@@ -2502,7 +2466,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
if (!F.isEmpty(filteredReaders))
dhtFut.addNearWriteEntries(
- nearNode.id(),
filteredReaders,
entry,
updRes.newValue(),
@@ -2644,17 +2607,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
GridCacheOperation op;
if (putMap != null) {
- // If fast mapping, filter primary keys for write to store.
- Map<KeyCacheObject, CacheObject> storeMap = req.fastMap() ?
- F.view(putMap, new P1<CacheObject>() {
- @Override public boolean apply(CacheObject key) {
- return ctx.affinity().primaryByKey(ctx.localNode(), key, req.topologyVersion());
- }
- }) :
- putMap;
-
try {
- ctx.store().putAll(null, F.viewReadOnly(storeMap, new C1<CacheObject, IgniteBiTuple<CacheObject, GridCacheVersion>>() {
+ ctx.store().putAll(null, F.viewReadOnly(putMap, new C1<CacheObject, IgniteBiTuple<CacheObject, GridCacheVersion>>() {
@Override public IgniteBiTuple<CacheObject, GridCacheVersion> apply(CacheObject v) {
return F.t(v, ver);
}
@@ -2667,17 +2621,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
op = UPDATE;
}
else {
- // If fast mapping, filter primary keys for write to store.
- Collection<KeyCacheObject> storeKeys = req.fastMap() ?
- F.view(rmvKeys, new P1<Object>() {
- @Override public boolean apply(Object key) {
- return ctx.affinity().primaryByKey(ctx.localNode(), key, req.topologyVersion());
- }
- }) :
- rmvKeys;
-
try {
- ctx.store().removeAll(null, storeKeys);
+ ctx.store().removeAll(null, rmvKeys);
}
catch (CacheStorePartialUpdateException e) {
storeErr = e;
@@ -2712,10 +2657,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
assert writeVal != null || op == DELETE : "null write value found.";
- boolean primary = !req.fastMap() || ctx.affinity().primaryByPartition(ctx.localNode(),
- entry.partition(),
- req.topologyVersion());
-
Collection<UUID> readers = null;
Collection<UUID> filteredReaders = null;
@@ -2738,11 +2679,11 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
expiry,
/*event*/true,
/*metrics*/true,
- primary,
+ /*primary*/true,
/*verCheck*/false,
topVer,
null,
- replicate ? primary ? DR_PRIMARY : DR_BACKUP : DR_NONE,
+ replicate ? DR_PRIMARY : DR_NONE,
CU.TTL_NOT_CHANGED,
CU.EXPIRE_TIME_CALCULATE,
null,
@@ -2777,7 +2718,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
batchRes.addDeleted(entry, updRes, entries);
if (dhtFut == null && !F.isEmpty(filteredReaders)) {
- dhtFut = createDhtFuture(ver, req);
+ dhtFut = createDhtFuture(ver, req, null, null); // TODO IGNITE-4705.
batchRes.readersOnly(true);
}
@@ -2789,7 +2730,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
if (!batchRes.readersOnly()) {
dhtFut.addWriteEntry(
affAssignment,
- nearNode.id(),
entry,
writeVal,
entryProcessor,
@@ -2803,7 +2743,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
if (!F.isEmpty(filteredReaders))
dhtFut.addNearWriteEntries(
- nearNode.id(),
filteredReaders,
entry,
writeVal,
@@ -2813,30 +2752,26 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
}
if (hasNear) {
- if (primary) {
- if (!ctx.affinity().partitionBelongs(nearNode, entry.partition(), topVer)) {
- int idx = firstEntryIdx + i;
-
- if (req.operation() == TRANSFORM) {
- res.addNearValue(idx,
- writeVal,
- updRes.newTtl(),
- CU.EXPIRE_TIME_CALCULATE);
- }
- else
- res.addNearTtl(idx, updRes.newTtl(), CU.EXPIRE_TIME_CALCULATE);
-
- if (writeVal != null || entry.hasValue()) {
- IgniteInternalFuture<Boolean> f = entry.addReader(nearNode.id(), req.messageId(), topVer);
+ if (!ctx.affinity().partitionBelongs(nearNode, entry.partition(), topVer)) {
+ int idx = firstEntryIdx + i;
- assert f == null : f;
- }
+ if (req.operation() == TRANSFORM) {
+ res.addNearValue(idx,
+ writeVal,
+ updRes.newTtl(),
+ CU.EXPIRE_TIME_CALCULATE);
}
- else if (readers.contains(nearNode.id())) // Reader became primary or backup.
- entry.removeReader(nearNode.id(), req.messageId());
else
- res.addSkippedIndex(firstEntryIdx + i);
+ res.addNearTtl(idx, updRes.newTtl(), CU.EXPIRE_TIME_CALCULATE);
+
+ if (writeVal != null || entry.hasValue()) {
+ IgniteInternalFuture<Boolean> f = entry.addReader(nearNode.id(), req.messageId(), topVer);
+
+ assert f == null : f;
+ }
}
+ else if (readers.contains(nearNode.id())) // Reader became primary or backup.
+ entry.removeReader(nearNode.id(), req.messageId());
else
res.addSkippedIndex(firstEntryIdx + i);
}
@@ -3083,12 +3018,14 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
*/
@Nullable private GridDhtAtomicAbstractUpdateFuture createDhtFuture(
GridCacheVersion writeVer,
- GridNearAtomicAbstractUpdateRequest updateReq
+ GridNearAtomicAbstractUpdateRequest updateReq,
+ GridNearAtomicUpdateResponse updateRes,
+ GridDhtAtomicCache.UpdateReplyClosure completionCb
) {
if (updateReq.size() == 1)
- return new GridDhtAtomicSingleUpdateFuture(ctx, writeVer, updateReq);
+ return new GridDhtAtomicSingleUpdateFuture(ctx, writeVer, updateReq, updateRes, completionCb);
else
- return new GridDhtAtomicUpdateFuture(ctx, writeVer, updateReq);
+ return new GridDhtAtomicUpdateFuture(ctx, writeVer, updateReq, updateRes, completionCb);
}
/**
@@ -3098,7 +3035,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
private void processNearAtomicUpdateRequest(UUID nodeId, GridNearAtomicAbstractUpdateRequest req) {
if (msgLog.isDebugEnabled()) {
msgLog.debug("Received near atomic update request [futId=" + req.futureId() +
- ", writeVer=" + req.updateVersion() +
", node=" + nodeId + ']');
}
@@ -3143,13 +3079,22 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
GridCacheVersion ver = req.writeVersion();
- GridDhtAtomicNearResponse nearRes = ctx.config().getWriteSynchronizationMode() == FULL_SYNC ?
- new GridDhtAtomicNearResponse(ctx.cacheId(),
+ GridDhtAtomicUpdateResponse dhtRes = null;
+ GridDhtAtomicNearResponse nearRes = null;
+
+ if (req.nearNodeId() != null) {
+ nearRes = new GridDhtAtomicNearResponse(ctx.cacheId(),
req.partition(),
req.nearFutureId(),
nodeId,
- req.dhtNodes(),
- req.flags()) : null;
+ req.flags());
+ }
+ else if (req.writeSynchronizationMode() == FULL_SYNC) {
+ dhtRes = new GridDhtAtomicUpdateResponse(ctx.cacheId(),
+ req.partition(),
+ req.futureId(),
+ ctx.deploymentEnabled());
+ }
boolean replicate = ctx.isDrEnabled();
@@ -3233,43 +3178,45 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
// Ignore.
}
catch (IgniteCheckedException e) {
+ IgniteCheckedException err =
+ new IgniteCheckedException("Failed to update key on backup node: " + key, e);
+
if (nearRes != null)
- nearRes.addFailedKey(key, new IgniteCheckedException("Failed to update key on backup node: " + key, e));
+ nearRes.addFailedKey(key, err);
+ else if (dhtRes != null)
+ dhtRes.addFailedKey(key, err);
U.error(log, "Failed to update key on backup node: " + key, e);
}
}
- GridDhtAtomicUpdateResponse dhtRes = null;
-
if (isNearEnabled(cacheCfg)) {
List<KeyCacheObject> nearEvicted =
((GridNearAtomicCache<K, V>)near()).processDhtAtomicUpdateRequest(nodeId, req, nearRes);
if (nearEvicted != null) {
- dhtRes = new GridDhtAtomicUpdateResponse(ctx.cacheId(),
- req.partition(),
- req.futureId(),
- ctx.deploymentEnabled());
+ if (dhtRes == null) {
+ dhtRes = new GridDhtAtomicUpdateResponse(ctx.cacheId(),
+ req.partition(),
+ req.futureId(),
+ ctx.deploymentEnabled());
+ }
dhtRes.nearEvicted(nearEvicted);
}
}
if (nearRes != null) {
- if (IGNITE_ATOMIC_DHT_REPLY_AFTER_ACK)
- sendDhtNearResponse(nodeId, req, nearRes);
- else {
- sendDhtNearResponse(null, req, nearRes);
+ sendDhtNearResponse(req, nearRes);
+ sendDeferredUpdateResponse(req.partition(), nodeId, req.futureId());
+ }
+ else {
+ if (dhtRes != null)
+ sendDhtPrimaryResponse(nodeId, req, dhtRes);
+ else
sendDeferredUpdateResponse(req.partition(), nodeId, req.futureId());
- }
}
- else
- sendDeferredUpdateResponse(req.partition(), nodeId, req.futureId());
-
- if (dhtRes != null)
- sendDhtPrimaryResponse(nodeId, req, dhtRes);
}
/**
@@ -3306,44 +3253,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
/**
*
*/
- private class DeferredResponseClosure implements IgniteInClosure<IgniteException>, Runnable {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** */
- private final int part;
-
- /** */
- private final UUID primaryId;
-
- /** */
- private final long futId;
-
- /**
- * @param part Partition ID.
- * @param primaryId Primary ID.
- * @param futId Future ID.
- */
- DeferredResponseClosure(int part, UUID primaryId, long futId) {
- this.part = part;
- this.primaryId = primaryId;
- this.futId = futId;
- }
-
- /** {@inheritDoc} */
- @Override public void run() {
- sendDeferredUpdateResponse(part, primaryId, futId);
- }
-
- /** {@inheritDoc} */
- @Override public void apply(IgniteException e) {
- ctx.kernalContext().getStripedExecutorService().execute(part, this);
- }
- }
-
- /**
- *
- */
private class DeferredUpdateTimeout implements GridTimeoutObject, Runnable {
/** */
private final int part;
@@ -3483,33 +3392,12 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
}
/**
- * @param primaryId Primary node ID.
* @param req Request.
* @param nearRes Response to send.
*/
- private void sendDhtNearResponse(final UUID primaryId,
- final GridDhtAtomicAbstractUpdateRequest req,
- GridDhtAtomicNearResponse nearRes) {
- DeferredResponseClosure c = primaryId != null ?
- new DeferredResponseClosure(req.partition(), primaryId, req.futureId()) : null;
-
+ private void sendDhtNearResponse(final GridDhtAtomicAbstractUpdateRequest req, GridDhtAtomicNearResponse nearRes) {
try {
- ClusterNode node = ctx.discovery().node(req.nearNodeId());
-
- if (node == null)
- throw new ClusterTopologyCheckedException("Node left: " + req.nearNodeId());
-
- if (c != null) {
- ctx.gridIO().send(node,
- TOPIC_CACHE,
- nearRes,
- ctx.ioPolicy(),
- c);
-
- c = null;
- }
- else
- ctx.gridIO().send(node, TOPIC_CACHE, nearRes, ctx.ioPolicy());
+ ctx.gridIO().send(req.nearNodeId(), TOPIC_CACHE, nearRes, ctx.ioPolicy());
if (msgLog.isDebugEnabled()) {
msgLog.debug("Sent DHT near response [futId=" + req.futureId() +
@@ -3531,35 +3419,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
", node=" + req.nearNodeId() +
", res=" + nearRes + ']', e);
}
- finally {
- if (c != null)
- c.apply(null);
- }
- }
-
- /**
- * @param nodeId Node ID.
- * @param res Response.
- */
- private void processDhtAtomicNearMappingResponse(UUID nodeId, GridNearAtomicMappingResponse res) {
- GridNearAtomicAbstractUpdateFuture updateFut =
- (GridNearAtomicAbstractUpdateFuture)ctx.mvcc().atomicFuture(res.futureId());
-
- if (updateFut != null) {
- if (msgLog.isDebugEnabled()) {
- msgLog.debug("Received near mapping response [futId=" + res.futureId() +
- ", node=" + nodeId + ']');
- }
-
- updateFut.onMappingReceived(nodeId, res);
- }
- else {
- if (msgLog.isDebugEnabled()) {
- msgLog.debug("Failed to find future for near mapping response [futId=" + res.futureId() +
- ", node=" + nodeId +
- ", res=" + res + ']');
- }
- }
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/30bfae0e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicNearResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicNearResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicNearResponse.java
index b397f0f..595e41a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicNearResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicNearResponse.java
@@ -18,25 +18,19 @@
package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
import java.nio.ByteBuffer;
-import java.util.List;
import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.GridDirectCollection;
import org.apache.ignite.internal.processors.cache.GridCacheMessage;
import org.apache.ignite.internal.processors.cache.GridCacheReturn;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
-import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
import org.jetbrains.annotations.Nullable;
-import static org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateRequest.DHT_ATOMIC_AFF_MAPPING_FLAG_MASK;
import static org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateRequest.DHT_ATOMIC_HAS_RESULT_MASK;
-import static org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateRequest.DHT_ATOMIC_PRIMARY_DHT_FAIL_RESPONSE;
import static org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateRequest.DHT_ATOMIC_RESULT_SUCCESS_MASK;
/**
@@ -59,19 +53,11 @@ public class GridDhtAtomicNearResponse extends GridCacheMessage {
private UUID primaryId;
/** */
- @GridDirectCollection(UUID.class)
- @GridToStringInclude
- private List<UUID> mapping;
-
- /** */
@GridToStringExclude
private byte flags;
/** */
- private UpdateErrors errors;
-
- /** */
- private UUID failedNodeId;
+ private UpdateErrors errs;
/**
*
@@ -85,14 +71,12 @@ public class GridDhtAtomicNearResponse extends GridCacheMessage {
* @param partId Partition.
* @param futId Future ID.
* @param primaryId Primary node ID.
- * @param mapping Update mapping.
* @param flags Flags.
*/
public GridDhtAtomicNearResponse(int cacheId,
int partId,
long futId,
UUID primaryId,
- List<UUID> mapping,
byte flags)
{
assert primaryId != null;
@@ -101,54 +85,21 @@ public class GridDhtAtomicNearResponse extends GridCacheMessage {
this.partId = partId;
this.futId = futId;
this.primaryId = primaryId;
- this.mapping = mapping;
this.flags = flags;
}
/**
- * @return {@code True} if update mapping matches affinity function result.
- */
- boolean affinityMapping() {
- return isFlag(DHT_ATOMIC_AFF_MAPPING_FLAG_MASK);
- }
-
- /**
* @return Errors.
*/
@Nullable UpdateErrors errors() {
- return errors;
+ return errs;
}
/**
* @param errors Errors.
*/
void errors(UpdateErrors errors) {
- this.errors = errors;
- }
-
- /**
- * @return Failed node ID.
- */
- UUID failedNodeId() {
- return failedNodeId;
- }
-
- /**
- * @param failedNodeId Failed node ID (used when primary notifies near node).
- */
- void failedNodeId(UUID failedNodeId) {
- assert failedNodeId != null;
-
- this.failedNodeId = failedNodeId;
-
- setFlag(true, DHT_ATOMIC_PRIMARY_DHT_FAIL_RESPONSE);
- }
-
- /**
- * @return {@code True} if message is sent from primary when DHT node fails.
- */
- boolean primaryDhtFailureResponse() {
- return isFlag(DHT_ATOMIC_PRIMARY_DHT_FAIL_RESPONSE);
+ this.errs = errors;
}
/**
@@ -168,10 +119,10 @@ public class GridDhtAtomicNearResponse extends GridCacheMessage {
* @param e Error.
*/
public void addFailedKey(KeyCacheObject key, Throwable e) {
- if (errors == null)
- errors = new UpdateErrors();
+ if (errs == null)
+ errs = new UpdateErrors();
- errors.addFailedKey(key, e);
+ errs.addFailedKey(key, e);
}
/**
@@ -191,13 +142,6 @@ public class GridDhtAtomicNearResponse extends GridCacheMessage {
}
/**
- * @return Update mapping.
- */
- public List<UUID> mapping() {
- return mapping;
- }
-
- /**
* @param flag Set or clear.
* @param mask Mask.
*/
@@ -246,16 +190,16 @@ public class GridDhtAtomicNearResponse extends GridCacheMessage {
@Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
super.prepareMarshal(ctx);
- if (errors != null)
- errors.prepareMarshal(this, ctx.cacheContext(cacheId));
+ if (errs != null)
+ errs.prepareMarshal(this, ctx.cacheContext(cacheId));
}
/** {@inheritDoc} */
@Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException {
super.finishUnmarshal(ctx, ldr);
- if (errors != null)
- errors.finishUnmarshal(this, ctx.cacheContext(cacheId), ldr);
+ if (errs != null)
+ errs.finishUnmarshal(this, ctx.cacheContext(cacheId), ldr);
}
/** {@inheritDoc} */
@@ -274,13 +218,7 @@ public class GridDhtAtomicNearResponse extends GridCacheMessage {
switch (writer.state()) {
case 3:
- if (!writer.writeMessage("errors", errors))
- return false;
-
- writer.incrementState();
-
- case 4:
- if (!writer.writeUuid("failedNodeId", failedNodeId))
+ if (!writer.writeMessage("errs", errs))
return false;
writer.incrementState();
@@ -297,12 +235,6 @@ public class GridDhtAtomicNearResponse extends GridCacheMessage {
writer.incrementState();
- case 7:
- if (!writer.writeCollection("mapping", mapping, MessageCollectionItemType.UUID))
- return false;
-
- writer.incrementState();
-
case 8:
if (!writer.writeInt("partId", partId))
return false;
@@ -332,15 +264,7 @@ public class GridDhtAtomicNearResponse extends GridCacheMessage {
switch (reader.state()) {
case 3:
- errors = reader.readMessage("errors");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 4:
- failedNodeId = reader.readUuid("failedNodeId");
+ errs = reader.readMessage("errs");
if (!reader.isLastRead())
return false;
@@ -363,14 +287,6 @@ public class GridDhtAtomicNearResponse extends GridCacheMessage {
reader.incrementState();
- case 7:
- mapping = reader.readCollection("mapping", MessageCollectionItemType.UUID);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
case 8:
partId = reader.readInt("partId");
@@ -394,10 +310,9 @@ public class GridDhtAtomicNearResponse extends GridCacheMessage {
/** {@inheritDoc} */
@Override public String toString() {
- return S.toString(GridDhtAtomicNearResponse.class, this, "flags",
+ return S.toString(GridDhtAtomicNearResponse.class, this,
+ "flags",
"res=" + isFlag(DHT_ATOMIC_HAS_RESULT_MASK) +
- "|resOk=" + isFlag(DHT_ATOMIC_RESULT_SUCCESS_MASK) +
- "|affMap=" + isFlag(DHT_ATOMIC_AFF_MAPPING_FLAG_MASK) +
- "|dhtFail=" + isFlag(DHT_ATOMIC_PRIMARY_DHT_FAIL_RESPONSE));
+ "|resOk=" + isFlag(DHT_ATOMIC_RESULT_SUCCESS_MASK));
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/30bfae0e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
index 2cc370f..e393322 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
@@ -47,9 +47,11 @@ class GridDhtAtomicSingleUpdateFuture extends GridDhtAtomicAbstractUpdateFuture
GridDhtAtomicSingleUpdateFuture(
GridCacheContext cctx,
GridCacheVersion writeVer,
- GridNearAtomicAbstractUpdateRequest updateReq
+ GridNearAtomicAbstractUpdateRequest updateReq,
+ GridNearAtomicUpdateResponse updateRes,
+ GridDhtAtomicCache.UpdateReplyClosure completionCb
) {
- super(cctx, writeVer, updateReq);
+ super(cctx, writeVer, updateReq, updateRes, completionCb);
}
/** {@inheritDoc} */
@@ -67,7 +69,6 @@ class GridDhtAtomicSingleUpdateFuture extends GridDhtAtomicAbstractUpdateFuture
/** {@inheritDoc} */
@Override protected GridDhtAtomicAbstractUpdateRequest createRequest(
UUID nodeId,
- UUID nearNodeId,
long futId,
GridCacheVersion writeVer,
CacheWriteSynchronizationMode syncMode,
@@ -81,8 +82,6 @@ class GridDhtAtomicSingleUpdateFuture extends GridDhtAtomicAbstractUpdateFuture
cctx.cacheId(),
nodeId,
futId,
- nearNodeId,
- updateReq.futureId(),
writeVer,
syncMode,
topVer,
@@ -97,8 +96,6 @@ class GridDhtAtomicSingleUpdateFuture extends GridDhtAtomicAbstractUpdateFuture
cctx.cacheId(),
nodeId,
futId,
- nearNodeId,
- updateReq.futureId(),
writeVer,
syncMode,
topVer,
@@ -112,6 +109,11 @@ class GridDhtAtomicSingleUpdateFuture extends GridDhtAtomicAbstractUpdateFuture
}
}
+ /** {@inheritDoc} */
+ @Override protected void addFailedKeys(GridNearAtomicUpdateResponse updateRes, Throwable err) {
+
+ }
+
/**
* @param ttl TTL.
* @param conflictExpireTime Conflict expire time.
http://git-wip-us.apache.org/repos/asf/ignite/blob/30bfae0e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java
index fa7c445..3d3ce04 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java
@@ -86,8 +86,6 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat
int cacheId,
UUID nodeId,
long futId,
- UUID nearNodeId,
- long nearFutId,
GridCacheVersion writeVer,
CacheWriteSynchronizationMode syncMode,
@NotNull AffinityTopologyVersion topVer,
@@ -100,8 +98,6 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat
super(cacheId,
nodeId,
futId,
- nearNodeId,
- nearFutId,
writeVer,
syncMode,
topVer,
http://git-wip-us.apache.org/repos/asf/ignite/blob/30bfae0e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
index 1c12193..ed57cf0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
@@ -46,9 +46,11 @@ class GridDhtAtomicUpdateFuture extends GridDhtAtomicAbstractUpdateFuture {
GridDhtAtomicUpdateFuture(
GridCacheContext cctx,
GridCacheVersion writeVer,
- GridNearAtomicAbstractUpdateRequest updateReq
+ GridNearAtomicAbstractUpdateRequest updateReq,
+ GridNearAtomicUpdateResponse updateRes,
+ GridDhtAtomicCache.UpdateReplyClosure completionCb
) {
- super(cctx, writeVer, updateReq);
+ super(cctx, writeVer, updateReq, updateRes, completionCb);
mappings = U.newHashMap(updateReq.size());
}
@@ -66,7 +68,6 @@ class GridDhtAtomicUpdateFuture extends GridDhtAtomicAbstractUpdateFuture {
/** {@inheritDoc} */
@Override protected GridDhtAtomicAbstractUpdateRequest createRequest(
UUID nodeId,
- UUID nearNodeId,
long futId,
GridCacheVersion writeVer,
CacheWriteSynchronizationMode syncMode,
@@ -79,8 +80,6 @@ class GridDhtAtomicUpdateFuture extends GridDhtAtomicAbstractUpdateFuture {
cctx.cacheId(),
nodeId,
futId,
- nearNodeId,
- updateReq.futureId(),
writeVer,
syncMode,
topVer,
@@ -94,6 +93,11 @@ class GridDhtAtomicUpdateFuture extends GridDhtAtomicAbstractUpdateFuture {
}
/** {@inheritDoc} */
+ @Override protected void addFailedKeys(GridNearAtomicUpdateResponse updateRes, Throwable err) {
+
+ }
+
+ /** {@inheritDoc} */
@Override public String toString() {
return S.toString(GridDhtAtomicUpdateFuture.class, this, "super", super.toString());
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/30bfae0e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
index ef42af8..029ea42 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
@@ -147,8 +147,6 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
int cacheId,
UUID nodeId,
long futId,
- UUID nearNodeId,
- long nearFutId,
GridCacheVersion writeVer,
CacheWriteSynchronizationMode syncMode,
@NotNull AffinityTopologyVersion topVer,
@@ -163,8 +161,6 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
super(cacheId,
nodeId,
futId,
- nearNodeId,
- nearFutId,
writeVer,
syncMode,
topVer,
http://git-wip-us.apache.org/repos/asf/ignite/blob/30bfae0e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java
index b1a46d5..10806b1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java
@@ -53,7 +53,7 @@ public class GridDhtAtomicUpdateResponse extends GridCacheMessage implements Gri
private long futId;
/** */
- private UpdateErrors errors;
+ private UpdateErrors errs;
/** Evicted readers. */
@GridToStringInclude
@@ -84,10 +84,21 @@ public class GridDhtAtomicUpdateResponse extends GridCacheMessage implements Gri
}
/**
+ * @param key Key.
+ * @param e Error.
+ */
+ public void addFailedKey(KeyCacheObject key, Throwable e) {
+ if (errs == null)
+ errs = new UpdateErrors();
+
+ errs.addFailedKey(key, e);
+ }
+
+ /**
* @return Errors.
*/
@Nullable UpdateErrors errors() {
- return errors;
+ return errs;
}
/** {@inheritDoc} */
@@ -108,15 +119,15 @@ public class GridDhtAtomicUpdateResponse extends GridCacheMessage implements Gri
* @param err Error.
*/
public void onError(IgniteCheckedException err){
- if (errors == null)
- errors = new UpdateErrors();
+ if (errs == null)
+ errs = new UpdateErrors();
- errors.onError(err);
+ errs.onError(err);
}
/** {@inheritDoc} */
@Override public IgniteCheckedException error() {
- return errors != null ? errors.error() : null;
+ return errs != null ? errs.error() : null;
}
/**
@@ -158,8 +169,8 @@ public class GridDhtAtomicUpdateResponse extends GridCacheMessage implements Gri
prepareMarshalCacheObjects(nearEvicted, cctx);
- if (errors != null)
- errors.prepareMarshal(this, cctx);
+ if (errs != null)
+ errs.prepareMarshal(this, cctx);
}
/** {@inheritDoc} */
@@ -170,8 +181,8 @@ public class GridDhtAtomicUpdateResponse extends GridCacheMessage implements Gri
finishUnmarshalCacheObjects(nearEvicted, cctx, ldr);
- if (errors != null)
- errors.finishUnmarshal(this, cctx, ldr);
+ if (errs != null)
+ errs.finishUnmarshal(this, cctx, ldr);
}
/** {@inheritDoc} */
@@ -200,7 +211,7 @@ public class GridDhtAtomicUpdateResponse extends GridCacheMessage implements Gri
switch (writer.state()) {
case 3:
- if (!writer.writeMessage("errors", errors))
+ if (!writer.writeMessage("errors", errs))
return false;
writer.incrementState();
@@ -240,7 +251,7 @@ public class GridDhtAtomicUpdateResponse extends GridCacheMessage implements Gri
switch (reader.state()) {
case 3:
- errors = reader.readMessage("errors");
+ errs = reader.readMessage("errors");
if (!reader.isLastRead())
return false;
http://git-wip-us.apache.org/repos/asf/ignite/blob/30bfae0e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractSingleUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractSingleUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractSingleUpdateRequest.java
index 2a17813..3a9055e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractSingleUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractSingleUpdateRequest.java
@@ -18,19 +18,13 @@
package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
import java.io.Externalizable;
-import java.nio.ByteBuffer;
import java.util.UUID;
import javax.cache.expiry.ExpiryPolicy;
-import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.internal.GridDirectTransient;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
import org.apache.ignite.internal.processors.cache.GridCacheOperation;
-import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
-import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@@ -44,59 +38,6 @@ public abstract class GridNearAtomicAbstractSingleUpdateRequest extends GridNear
/** */
private static final CacheEntryPredicate[] NO_FILTER = new CacheEntryPredicate[0];
- /** Fast map flag mask. */
- private static final int FAST_MAP_FLAG_MASK = 0x1;
-
- /** Flag indicating whether request contains primary keys. */
- private static final int HAS_PRIMARY_FLAG_MASK = 0x2;
-
- /** Topology locked flag. Set if atomic update is performed inside TX or explicit lock. */
- private static final int TOP_LOCKED_FLAG_MASK = 0x4;
-
- /** Skip write-through to a persistent storage. */
- private static final int SKIP_STORE_FLAG_MASK = 0x8;
-
- /** */
- private static final int CLIENT_REQ_FLAG_MASK = 0x10;
-
- /** Keep binary flag. */
- private static final int KEEP_BINARY_FLAG_MASK = 0x20;
-
- /** Return value flag. */
- private static final int RET_VAL_FLAG_MASK = 0x40;
-
- /** Target node ID. */
- @GridDirectTransient
- protected UUID nodeId;
-
- /** Future version. */
- protected long futId;
-
- /** Update version. Set to non-null if fastMap is {@code true}. */
- private GridCacheVersion updateVer;
-
- /** Topology version. */
- protected AffinityTopologyVersion topVer;
-
- /** Write synchronization mode. */
- protected CacheWriteSynchronizationMode syncMode;
-
- /** Update operation. */
- protected GridCacheOperation op;
-
- /** Subject ID. */
- protected UUID subjId;
-
- /** Task name hash. */
- protected int taskNameHash;
-
- /** */
- @GridDirectTransient
- private GridNearAtomicUpdateResponse res;
-
- /** Compressed boolean flags. */
- protected byte flags;
-
/**
* Empty constructor required by {@link Externalizable}.
*/
@@ -110,8 +51,6 @@ public abstract class GridNearAtomicAbstractSingleUpdateRequest extends GridNear
* @param cacheId Cache ID.
* @param nodeId Node ID.
* @param futId Future ID.
- * @param fastMap Fast map scheme flag.
- * @param updateVer Update version set if fast map is performed.
* @param topVer Topology version.
* @param topLocked Topology locked flag.
* @param syncMode Synchronization mode.
@@ -121,15 +60,12 @@ public abstract class GridNearAtomicAbstractSingleUpdateRequest extends GridNear
* @param taskNameHash Task name hash code.
* @param skipStore Skip write-through to a persistent storage.
* @param keepBinary Keep binary flag.
- * @param clientReq Client node request flag.
* @param addDepInfo Deployment info flag.
*/
protected GridNearAtomicAbstractSingleUpdateRequest(
int cacheId,
UUID nodeId,
long futId,
- boolean fastMap,
- @Nullable GridCacheVersion updateVer,
@NotNull AffinityTopologyVersion topVer,
boolean topLocked,
CacheWriteSynchronizationMode syncMode,
@@ -137,89 +73,25 @@ public abstract class GridNearAtomicAbstractSingleUpdateRequest extends GridNear
boolean retval,
@Nullable UUID subjId,
int taskNameHash,
+ boolean stableTop,
boolean skipStore,
boolean keepBinary,
- boolean clientReq,
boolean addDepInfo
) {
- this.cacheId = cacheId;
- this.nodeId = nodeId;
- this.futId = futId;
- this.updateVer = updateVer;
- this.topVer = topVer;
- this.syncMode = syncMode;
- this.op = op;
- this.subjId = subjId;
- this.taskNameHash = taskNameHash;
- this.addDepInfo = addDepInfo;
-
- fastMap(fastMap);
- topologyLocked(topLocked);
- returnValue(retval);
- skipStore(skipStore);
- keepBinary(keepBinary);
- clientRequest(clientReq);
- }
-
- /** {@inheritDoc} */
- @Override public int lookupIndex() {
- return CACHE_MSG_IDX;
- }
-
- /**
- * @return Mapped node ID.
- */
- @Override public UUID nodeId() {
- return nodeId;
- }
-
- /**
- * @param nodeId Node ID.
- */
- @Override public void nodeId(UUID nodeId) {
- this.nodeId = nodeId;
- }
-
- /**
- * @return Subject ID.
- */
- @Override public UUID subjectId() {
- return subjId;
- }
-
- /**
- * @return Task name hash.
- */
- @Override public int taskNameHash() {
- return taskNameHash;
- }
-
- /**
- * @return Future version.
- */
- @Override public long futureId() {
- return futId;
- }
-
- /**
- * @return Update version for fast-map request.
- */
- @Override public GridCacheVersion updateVersion() {
- return updateVer;
- }
-
- /**
- * @return Topology version.
- */
- @Override public AffinityTopologyVersion topologyVersion() {
- return topVer;
- }
-
- /**
- * @return Cache write synchronization mode.
- */
- @Override public CacheWriteSynchronizationMode writeSynchronizationMode() {
- return syncMode;
+ super(cacheId,
+ nodeId,
+ futId,
+ topVer,
+ topLocked,
+ syncMode,
+ op,
+ retval,
+ subjId,
+ taskNameHash,
+ stableTop,
+ skipStore,
+ keepBinary,
+ addDepInfo);
}
/**
@@ -230,331 +102,14 @@ public abstract class GridNearAtomicAbstractSingleUpdateRequest extends GridNear
}
/**
- * @return Update operation.
- */
- @Override public GridCacheOperation operation() {
- return op;
- }
-
- /**
* @return Optional arguments for entry processor.
*/
@Override @Nullable public Object[] invokeArguments() {
return null;
}
- /**
- * @param res Response.
- * @return {@code True} if current response was {@code null}.
- */
- @Override public boolean onResponse(GridNearAtomicUpdateResponse res) {
- if (this.res == null) {
- this.res = res;
-
- return true;
- }
-
- return false;
- }
-
- /**
- * @return Response.
- */
- @Override @Nullable public GridNearAtomicUpdateResponse response() {
- return res;
- }
-
- /** {@inheritDoc} */
- @Override public boolean addDeploymentInfo() {
- return addDepInfo;
- }
-
- /** {@inheritDoc} */
- @Override public IgniteLogger messageLogger(GridCacheSharedContext ctx) {
- return ctx.atomicMessageLogger();
- }
-
- /**
- * @return Flag indicating whether this is fast-map udpate.
- */
- @Override public boolean fastMap() {
- return isFlag(FAST_MAP_FLAG_MASK);
- }
-
- /**
- * Sets fastMap flag value.
- */
- public void fastMap(boolean val) {
- setFlag(val, FAST_MAP_FLAG_MASK);
- }
-
- /**
- * @return Topology locked flag.
- */
- @Override public boolean topologyLocked() {
- return isFlag(TOP_LOCKED_FLAG_MASK);
- }
-
- /**
- * Sets topologyLocked flag value.
- */
- public void topologyLocked(boolean val) {
- setFlag(val, TOP_LOCKED_FLAG_MASK);
- }
-
- /**
- * @return {@code True} if request sent from client node.
- */
- @Override public boolean clientRequest() {
- return isFlag(CLIENT_REQ_FLAG_MASK);
- }
-
- /**
- * Sets clientRequest flag value.
- */
- public void clientRequest(boolean val) {
- setFlag(val, CLIENT_REQ_FLAG_MASK);
- }
-
- /**
- * @return Return value flag.
- */
- @Override public boolean returnValue() {
- return isFlag(RET_VAL_FLAG_MASK);
- }
-
- /**
- * Sets returnValue flag value.
- */
- public void returnValue(boolean val) {
- setFlag(val, RET_VAL_FLAG_MASK);
- }
-
- /**
- * @return Skip write-through to a persistent storage.
- */
- @Override public boolean skipStore() {
- return isFlag(SKIP_STORE_FLAG_MASK);
- }
-
- /**
- * Sets skipStore flag value.
- */
- public void skipStore(boolean val) {
- setFlag(val, SKIP_STORE_FLAG_MASK);
- }
-
- /**
- * @return Keep binary flag.
- */
- @Override public boolean keepBinary() {
- return isFlag(KEEP_BINARY_FLAG_MASK);
- }
-
- /**
- * Sets keepBinary flag value.
- */
- public void keepBinary(boolean val) {
- setFlag(val, KEEP_BINARY_FLAG_MASK);
- }
-
- /**
- * @return Flag indicating whether this request contains primary keys.
- */
- @Override public boolean hasPrimary() {
- return isFlag(HAS_PRIMARY_FLAG_MASK);
- }
-
- /**
- * Sets hasPrimary flag value.
- */
- public void hasPrimary(boolean val) {
- setFlag(val, HAS_PRIMARY_FLAG_MASK);
- }
-
/** {@inheritDoc} */
@Nullable @Override public CacheEntryPredicate[] filter() {
return NO_FILTER;
}
-
- /**
- * Sets flag mask.
- *
- * @param flag Set or clear.
- * @param mask Mask.
- */
- private void setFlag(boolean flag, int mask) {
- flags = flag ? (byte)(flags | mask) : (byte)(flags & ~mask);
- }
-
- /**
- * Reads flag mask.
- *
- * @param mask Mask to read.
- * @return Flag value.
- */
- private boolean isFlag(int mask) {
- return (flags & mask) != 0;
- }
-
- /** {@inheritDoc} */
- @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
- writer.setBuffer(buf);
-
- if (!super.writeTo(buf, writer))
- return false;
-
- if (!writer.isHeaderWritten()) {
- if (!writer.writeHeader(directType(), fieldsCount()))
- return false;
-
- writer.onHeaderWritten();
- }
-
- switch (writer.state()) {
- case 3:
- if (!writer.writeByte("flags", flags))
- return false;
-
- writer.incrementState();
-
- case 4:
- if (!writer.writeLong("futId", futId))
- return false;
-
- writer.incrementState();
-
- case 5:
- if (!writer.writeByte("op", op != null ? (byte)op.ordinal() : -1))
- return false;
-
- writer.incrementState();
-
- case 6:
- if (!writer.writeUuid("subjId", subjId))
- return false;
-
- writer.incrementState();
-
- case 7:
- if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1))
- return false;
-
- writer.incrementState();
-
- case 8:
- if (!writer.writeInt("taskNameHash", taskNameHash))
- return false;
-
- writer.incrementState();
-
- case 9:
- if (!writer.writeMessage("topVer", topVer))
- return false;
-
- writer.incrementState();
-
- case 10:
- if (!writer.writeMessage("updateVer", updateVer))
- return false;
-
- writer.incrementState();
-
- }
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
- reader.setBuffer(buf);
-
- if (!reader.beforeMessageRead())
- return false;
-
- if (!super.readFrom(buf, reader))
- return false;
-
- switch (reader.state()) {
- case 3:
- flags = reader.readByte("flags");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 4:
- futId = reader.readLong("futId");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 5:
- byte opOrd;
-
- opOrd = reader.readByte("op");
-
- if (!reader.isLastRead())
- return false;
-
- op = GridCacheOperation.fromOrdinal(opOrd);
-
- reader.incrementState();
-
- case 6:
- subjId = reader.readUuid("subjId");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 7:
- byte syncModeOrd;
-
- syncModeOrd = reader.readByte("syncMode");
-
- if (!reader.isLastRead())
- return false;
-
- syncMode = CacheWriteSynchronizationMode.fromOrdinal(syncModeOrd);
-
- reader.incrementState();
-
- case 8:
- taskNameHash = reader.readInt("taskNameHash");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 9:
- topVer = reader.readMessage("topVer");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 10:
- updateVer = reader.readMessage("updateVer");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- }
-
- return reader.afterMessageRead(GridNearAtomicAbstractSingleUpdateRequest.class);
- }
-
- /** {@inheritDoc} */
- @Override public byte fieldsCount() {
- return 11;
- }
}