You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/03/13 15:09:39 UTC
[5/8] ignite git commit: ignite-4705 Atomic cache protocol change:
notify client node from backups
http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
index c92e0f5..39abb73 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
@@ -17,12 +17,22 @@
package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
import javax.cache.expiry.ExpiryPolicy;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
import org.apache.ignite.internal.processors.cache.CachePartialUpdateCheckedException;
@@ -31,16 +41,19 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheMvccManager;
import org.apache.ignite.internal.processors.cache.GridCacheOperation;
import org.apache.ignite.internal.processors.cache.GridCacheReturn;
-import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
-import org.apache.ignite.internal.util.typedef.CI2;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteUuid;
import org.jetbrains.annotations.Nullable;
-import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.CLOCK;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC;
/**
* Base for near atomic update futures.
@@ -108,28 +121,24 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
protected boolean topLocked;
/** Remap count. */
+ @GridToStringInclude
protected int remapCnt;
/** Current topology version. */
+ @GridToStringInclude
protected AffinityTopologyVersion topVer = AffinityTopologyVersion.ZERO;
/** */
- protected GridCacheVersion updVer;
-
- /** Topology version when got mapping error. */
- protected AffinityTopologyVersion mapErrTopVer;
-
- /** */
- protected int resCnt;
+ @GridToStringInclude
+ protected AffinityTopologyVersion remapTopVer;
/** Error. */
+ @GridToStringInclude
protected CachePartialUpdateCheckedException err;
/** Future ID. */
- protected GridCacheVersion futVer;
-
- /** Completion future for a particular topology version. */
- protected GridFutureAdapter<Void> topCompleteFut;
+ @GridToStringInclude
+ protected Long futId;
/** Operation result. */
protected GridCacheReturn opRes;
@@ -198,10 +207,30 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
this.remapCnt = remapCnt;
}
+ /** {@inheritDoc} */
+ @Override public final IgniteInternalFuture<Void> completeFuture(AffinityTopologyVersion topVer) {
+ return null;
+ }
+
+ /**
+ * @param req Request.
+ */
+ void sendCheckUpdateRequest(GridNearAtomicCheckUpdateRequest req) {
+ try {
+ cctx.io().send(req.updateRequest().nodeId(), req, cctx.ioPolicy());
+ }
+ catch (ClusterTopologyCheckedException e) {
+ onSendError(req, e);
+ }
+ catch (IgniteCheckedException e) {
+ onDone(e);
+ }
+ }
+
/**
* Performs future mapping.
*/
- public void map() {
+ public final void map() {
AffinityTopologyVersion topVer = cctx.shared().lockedTopologyVersion(null);
if (topVer == null)
@@ -212,18 +241,14 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
// Cannot remap.
remapCnt = 1;
- GridCacheVersion futVer = addAtomicFuture(topVer);
-
- if (futVer != null)
- map(topVer, futVer);
+ map(topVer);
}
}
/**
* @param topVer Topology version.
- * @param futVer Future version
*/
- protected abstract void map(AffinityTopologyVersion topVer, GridCacheVersion futVer);
+ protected abstract void map(AffinityTopologyVersion topVer);
/**
* Maps future on ready topology.
@@ -248,8 +273,8 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
/**
* @return {@code True} future is stored by {@link GridCacheMvccManager#addAtomicFuture}.
*/
- protected boolean storeFuture() {
- return cctx.config().getAtomicWriteOrderMode() == CLOCK || syncMode != FULL_ASYNC;
+ final boolean storeFuture() {
+ return syncMode != FULL_ASYNC;
}
/**
@@ -258,12 +283,15 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
* @param nodeId Node ID.
* @param req Request.
*/
- protected void mapSingle(UUID nodeId, GridNearAtomicAbstractUpdateRequest req) {
+ final void sendSingleRequest(UUID nodeId, GridNearAtomicAbstractUpdateRequest req) {
if (cctx.localNodeId().equals(nodeId)) {
cache.updateAllAsyncInternal(nodeId, req,
- new CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse>() {
+ new GridDhtAtomicCache.UpdateReplyClosure() {
@Override public void apply(GridNearAtomicAbstractUpdateRequest req, GridNearAtomicUpdateResponse res) {
- onResult(res.nodeId(), res, false);
+ if (syncMode != FULL_ASYNC)
+ onPrimaryResponse(res.nodeId(), res, false);
+ else if (res.remapTopologyVersion() != null)
+ ((GridDhtAtomicCache)cctx.cache()).remapToNewPrimary(req);
}
});
}
@@ -272,18 +300,13 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
cctx.io().send(req.nodeId(), req, cctx.ioPolicy());
if (msgLog.isDebugEnabled()) {
- msgLog.debug("Near update fut, sent request [futId=" + req.futureVersion() +
- ", writeVer=" + req.updateVersion() +
+ msgLog.debug("Near update fut, sent request [futId=" + req.futureId() +
", node=" + req.nodeId() + ']');
}
-
- if (syncMode == FULL_ASYNC)
- onDone(new GridCacheReturn(cctx, true, true, null, true));
}
catch (IgniteCheckedException e) {
if (msgLog.isDebugEnabled()) {
- msgLog.debug("Near update fut, failed to send request [futId=" + req.futureVersion() +
- ", writeVer=" + req.updateVersion() +
+ msgLog.debug("Near update fut, failed to send request [futId=" + req.futureId() +
", node=" + req.nodeId() +
", err=" + e + ']');
}
@@ -300,46 +323,377 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
* @param res Update response.
* @param nodeErr {@code True} if response was created on node failure.
*/
- public abstract void onResult(UUID nodeId, GridNearAtomicUpdateResponse res, boolean nodeErr);
+ public abstract void onPrimaryResponse(UUID nodeId, GridNearAtomicUpdateResponse res, boolean nodeErr);
+
+ /**
+ * @param nodeId Node ID.
+ * @param res Response.
+ */
+ public abstract void onDhtResponse(UUID nodeId, GridDhtAtomicNearResponse res);
/**
* @param req Request.
- * @param e Error.
+ * @param res Response.
*/
- protected final void onSendError(GridNearAtomicAbstractUpdateRequest req, IgniteCheckedException e) {
- synchronized (mux) {
- GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(cctx.cacheId(),
- req.nodeId(),
- req.futureVersion(),
- cctx.deploymentEnabled());
+ final void onPrimaryError(GridNearAtomicAbstractUpdateRequest req, GridNearAtomicUpdateResponse res) {
+ assert res.error() != null;
+
+ if (err == null)
+ err = new CachePartialUpdateCheckedException("Failed to update keys (retry update if possible).");
+
+ Collection<KeyCacheObject> keys0 = res.failedKeys() != null ? res.failedKeys() : req.keys();
+
+ Collection<Object> keys = new ArrayList<>(keys0.size());
- res.addFailedKeys(req.keys(), e);
+ for (KeyCacheObject key : keys0)
+ keys.add(cctx.cacheObjectContext().unwrapBinaryIfNeeded(key, keepBinary, false));
- onResult(req.nodeId(), res, true);
+ err.add(keys, res.error(), req.topologyVersion());
+ }
+
+ /**
+ * @param req Request.
+ * @return Response to notify about primary failure.
+ */
+ final GridNearAtomicUpdateResponse primaryFailedResponse(GridNearAtomicAbstractUpdateRequest req) {
+ assert req.response() == null : req;
+ assert req.nodeId() != null : req;
+
+ if (msgLog.isDebugEnabled()) {
+ msgLog.debug("Near update fut, node left [futId=" + req.futureId() +
+ ", node=" + req.nodeId() + ']');
}
+
+ GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(cctx.cacheId(),
+ req.nodeId(),
+ req.futureId(),
+ req.partition(),
+ true,
+ cctx.deploymentEnabled());
+
+ ClusterTopologyCheckedException e = new ClusterTopologyCheckedException("Primary node left grid " +
+ "before response is received: " + req.nodeId());
+
+ e.retryReadyFuture(cctx.shared().nextAffinityReadyFuture(req.topologyVersion()));
+
+ res.addFailedKeys(req.keys(), e);
+
+ return res;
+ }
+
+ /**
+ * @param req Request.
+ * @param e Error.
+ */
+ final void onSendError(GridNearAtomicAbstractUpdateRequest req, IgniteCheckedException e) {
+ GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(cctx.cacheId(),
+ req.nodeId(),
+ req.futureId(),
+ req.partition(),
+ e instanceof ClusterTopologyCheckedException,
+ cctx.deploymentEnabled());
+
+ res.addFailedKeys(req.keys(), e);
+
+ onPrimaryResponse(req.nodeId(), res, true);
+ }
+
+ /**
+ * @param req Request.
+ * @param e Error.
+ */
+ private void onSendError(GridNearAtomicCheckUpdateRequest req, IgniteCheckedException e) {
+ GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(cctx.cacheId(),
+ req.updateRequest().nodeId(),
+ req.futureId(),
+ req.partition(),
+ e instanceof ClusterTopologyCheckedException,
+ cctx.deploymentEnabled());
+
+ res.addFailedKeys(req.updateRequest().keys(), e);
+
+ onPrimaryResponse(req.updateRequest().nodeId(), res, true);
}
/**
- * Adds future prevents topology change before operation complete.
- * Should be invoked before topology lock released.
*
- * @param topVer Topology version.
- * @return Future version in case future added.
*/
- protected final GridCacheVersion addAtomicFuture(AffinityTopologyVersion topVer) {
- GridCacheVersion futVer = cctx.versions().next(topVer);
+ static class PrimaryRequestState {
+ /** */
+ final GridNearAtomicAbstractUpdateRequest req;
+
+ /** */
+ @GridToStringInclude
+ Set<UUID> dhtNodes;
+
+ /** */
+ @GridToStringInclude
+ private Set<UUID> rcvd;
+
+ /** */
+ private boolean hasRes;
+
+ /**
+ * @param req Request.
+ * @param nodes Affinity nodes.
+ * @param single {@code True} if created for sigle-key operation.
+ */
+ PrimaryRequestState(GridNearAtomicAbstractUpdateRequest req, List<ClusterNode> nodes, boolean single) {
+ assert req != null && req.nodeId() != null : req;
+
+ this.req = req;
+
+ if (req.initMappingLocally()) {
+ if (single) {
+ if (nodes.size() > 1) {
+ dhtNodes = U.newHashSet(nodes.size() - 1);
+
+ for (int i = 1; i < nodes.size(); i++)
+ dhtNodes.add(nodes.get(i).id());
+ }
+ else
+ dhtNodes = Collections.emptySet();
+ }
+ else {
+ dhtNodes = new HashSet<>();
+
+ for (int i = 1; i < nodes.size(); i++)
+ dhtNodes.add(nodes.get(i).id());
+ }
+ }
+ }
+
+ /**
+ * @return Primary node ID.
+ */
+ UUID primaryId() {
+ return req.nodeId();
+ }
+
+ /**
+ * @param nodes Nodes.
+ */
+ void addMapping(List<ClusterNode> nodes) {
+ assert req.initMappingLocally();
+
+ for (int i = 1; i < nodes.size(); i++)
+ dhtNodes.add(nodes.get(i).id());
+ }
+
+ /**
+ * @param cctx Context.
+ * @return Check result.
+ */
+ DhtLeftResult checkDhtNodes(GridCacheContext cctx) {
+ assert req.initMappingLocally() : req;
- synchronized (mux) {
- assert this.futVer == null : this;
- assert this.topVer == AffinityTopologyVersion.ZERO : this;
+ if (finished())
+ return DhtLeftResult.NOT_DONE;
- this.topVer = topVer;
- this.futVer = futVer;
+ boolean finished = false;
+
+ for (Iterator<UUID> it = dhtNodes.iterator(); it.hasNext();) {
+ UUID nodeId = it.next();
+
+ if (!cctx.discovery().alive(nodeId)) {
+ it.remove();
+
+ if (finished()) {
+ finished = true;
+
+ break;
+ }
+ }
+ }
+
+ if (finished)
+ return DhtLeftResult.DONE;
+
+ if (dhtNodes.isEmpty())
+ return !req.needPrimaryResponse() ? DhtLeftResult.ALL_RCVD_CHECK_PRIMARY : DhtLeftResult.NOT_DONE;
+
+ return DhtLeftResult.NOT_DONE;
+ }
+
+ /**
+ * @return {@code True} if all expected responses are received.
+ */
+ private boolean finished() {
+ if (req.writeSynchronizationMode() == PRIMARY_SYNC)
+ return hasRes;
+
+ return (dhtNodes != null && dhtNodes.isEmpty()) && hasRes;
+ }
+
+ /**
+ * @return Request if need process primary fail response, {@code null} otherwise.
+ */
+ @Nullable GridNearAtomicAbstractUpdateRequest onPrimaryFail() {
+ if (finished())
+ return null;
+
+ /*
+ * When primary failed, even if primary response is received, it is possible it failed to send
+ * request to backup(s), need remap operation.
+ */
+ if (req.fullSync() && !req.nodeFailedResponse()) {
+ req.resetResponse();
+
+ return req;
+ }
+
+ return req.response() == null ? req : null;
}
- if (storeFuture() && !cctx.mvcc().addAtomicFuture(futVer, this))
- return null;
+ /**
+ * @param nodeId Node ID.
+ * @param res Response.
+ * @return Request if need process primary response, {@code null} otherwise.
+ */
+ @Nullable GridNearAtomicAbstractUpdateRequest processPrimaryResponse(UUID nodeId, GridNearAtomicUpdateResponse res) {
+ assert req.nodeId().equals(nodeId);
- return futVer;
+ if (res.nodeLeftResponse())
+ return onPrimaryFail();
+
+ if (finished())
+ return null;
+
+ return req.response() == null ? req : null;
+ }
+
+ /**
+ * @param nodeId Node ID.
+ * @return {@code True} if request processing finished.
+ */
+ DhtLeftResult onDhtNodeLeft(UUID nodeId) {
+ if (req.writeSynchronizationMode() != FULL_SYNC || dhtNodes == null || finished())
+ return DhtLeftResult.NOT_DONE;
+
+ if (dhtNodes.remove(nodeId) && dhtNodes.isEmpty()) {
+ if (hasRes)
+ return DhtLeftResult.DONE;
+ else
+ return !req.needPrimaryResponse() ? DhtLeftResult.ALL_RCVD_CHECK_PRIMARY : DhtLeftResult.NOT_DONE;
+ }
+
+ return DhtLeftResult.NOT_DONE;
+ }
+
+ /**
+ * @param nodeId Node ID.
+ * @param res Response.
+ * @return {@code True} if request processing finished.
+ */
+ boolean onDhtResponse(UUID nodeId, GridDhtAtomicNearResponse res) {
+ assert req.writeSynchronizationMode() == FULL_SYNC : req;
+
+ if (finished())
+ return false;
+
+ if (res.hasResult())
+ hasRes = true;
+
+ if (dhtNodes == null) {
+ if (rcvd == null)
+ rcvd = new HashSet<>();
+
+ rcvd.add(nodeId);
+
+ return false;
+ }
+
+ return dhtNodes.remove(nodeId) && finished();
+ }
+
+ /**
+ * @param res Response.
+ * @param cctx Cache context.
+ * @return {@code True} if request processing finished.
+ */
+ boolean onPrimaryResponse(GridNearAtomicUpdateResponse res, GridCacheContext cctx) {
+ assert !finished() : this;
+
+ hasRes = true;
+
+ boolean onRes = req.onResponse(res);
+
+ assert onRes;
+
+ if (res.error() != null || res.remapTopologyVersion() != null) {
+ dhtNodes = Collections.emptySet(); // Mark as finished.
+
+ return true;
+ }
+
+ assert res.returnValue() != null : res;
+
+ if (res.dhtNodes() != null)
+ initDhtNodes(res.dhtNodes(), cctx);
+
+ return finished();
+ }
+
+ /**
+ * @param nodeIds Node IDs.
+ * @param cctx Context.
+ */
+ private void initDhtNodes(List<UUID> nodeIds, GridCacheContext cctx) {
+ assert dhtNodes == null || req.initMappingLocally();
+
+ Set<UUID> dhtNodes0 = dhtNodes;
+
+ dhtNodes = null;
+
+ for (UUID dhtNodeId : nodeIds) {
+ if (F.contains(rcvd, dhtNodeId))
+ continue;
+
+ if (req.initMappingLocally() && !F.contains(dhtNodes0, dhtNodeId))
+ continue;
+
+ if (cctx.discovery().node(dhtNodeId) != null) {
+ if (dhtNodes == null)
+ dhtNodes = U.newHashSet(nodeIds.size());
+
+ dhtNodes.add(dhtNodeId);
+ }
+ }
+
+ if (dhtNodes == null)
+ dhtNodes = Collections.emptySet();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(PrimaryRequestState.class, this,
+ "primary", primaryId(),
+ "needPrimaryRes", req.needPrimaryResponse(),
+ "primaryRes", req.response() != null,
+ "done", finished());
+ }
+ }
+
+ /**
+ *
+ */
+ enum DhtLeftResult {
+ /** All responses and operation result are received. */
+ DONE,
+
+ /** Not all responses are received. */
+ NOT_DONE,
+
+ /**
+ * All backups failed and response from primary is not required,
+ * in this case in FULL_SYNC mode need send additional request
+ * on primary to ensure FULL_SYNC guarantee.
+ */
+ ALL_RCVD_CHECK_PRIMARY
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridNearAtomicAbstractUpdateFuture.class, this, super.toString());
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
index bee2ecd..a43bfb0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
@@ -17,18 +17,28 @@
package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
+import java.nio.ByteBuffer;
import java.util.List;
import java.util.UUID;
import javax.cache.expiry.ExpiryPolicy;
import javax.cache.processor.EntryProcessor;
+import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.internal.GridDirectTransient;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
import org.apache.ignite.internal.processors.cache.GridCacheMessage;
import org.apache.ignite.internal.processors.cache.GridCacheOperation;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
/**
@@ -38,106 +48,331 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa
/** Message index. */
public static final int CACHE_MSG_IDX = nextIndexId();
+ /** . */
+ private static final int NEED_PRIMARY_RES_FLAG_MASK = 0x01;
+
+ /** Topology locked flag. Set if atomic update is performed inside TX or explicit lock. */
+ private static final int TOP_LOCKED_FLAG_MASK = 0x02;
+
+ /** Skip write-through to a persistent storage. */
+ private static final int SKIP_STORE_FLAG_MASK = 0x04;
+
+ /** Keep binary flag. */
+ private static final int KEEP_BINARY_FLAG_MASK = 0x08;
+
+ /** Return value flag. */
+ private static final int RET_VAL_FLAG_MASK = 0x10;
+
+ /** Target node ID. */
+ @GridDirectTransient
+ protected UUID nodeId;
+
+ /** Future version. */
+ protected long futId;
+
+ /** Topology version. */
+ protected AffinityTopologyVersion topVer;
+
+ /** Write synchronization mode. */
+ protected CacheWriteSynchronizationMode syncMode;
+
+ /** Update operation. */
+ protected GridCacheOperation op;
+
+ /** Subject ID. */
+ protected UUID subjId;
+
+ /** Task name hash. */
+ protected int taskNameHash;
+
+ /** Compressed boolean flags. Make sure 'toString' is updated when add new flag. */
+ @GridToStringExclude
+ protected byte flags;
+
+ /** */
+ @GridDirectTransient
+ private GridNearAtomicUpdateResponse res;
+
/**
- * @return Mapped node ID.
+ *
*/
- public abstract UUID nodeId();
+ public GridNearAtomicAbstractUpdateRequest() {
+ // No-op.
+ }
/**
+ * Constructor.
+ *
+ * @param cacheId Cache ID.
* @param nodeId Node ID.
+ * @param futId Future ID.
+ * @param topVer Topology version.
+ * @param topLocked Topology locked flag.
+ * @param syncMode Synchronization mode.
+ * @param op Cache update operation.
+ * @param retval Return value required flag.
+ * @param subjId Subject ID.
+ * @param taskNameHash Task name hash code.
+ * @param needPrimaryRes {@code True} if near node waits for primary response.
+ * @param skipStore Skip write-through to a persistent storage.
+ * @param keepBinary Keep binary flag.
+ * @param addDepInfo Deployment info flag.
+ */
+ protected GridNearAtomicAbstractUpdateRequest(
+ int cacheId,
+ UUID nodeId,
+ long futId,
+ @NotNull AffinityTopologyVersion topVer,
+ boolean topLocked,
+ CacheWriteSynchronizationMode syncMode,
+ GridCacheOperation op,
+ boolean retval,
+ @Nullable UUID subjId,
+ int taskNameHash,
+ boolean needPrimaryRes,
+ boolean skipStore,
+ boolean keepBinary,
+ boolean addDepInfo
+ ) {
+ this.cacheId = cacheId;
+ this.nodeId = nodeId;
+ this.futId = futId;
+ this.topVer = topVer;
+ this.syncMode = syncMode;
+ this.op = op;
+ this.subjId = subjId;
+ this.taskNameHash = taskNameHash;
+ this.addDepInfo = addDepInfo;
+
+ if (needPrimaryRes)
+ needPrimaryResponse(true);
+ if (topLocked)
+ topologyLocked(true);
+ if (retval)
+ returnValue(true);
+ if (skipStore)
+ skipStore(true);
+ if (keepBinary)
+ keepBinary(true);
+ }
+
+ /** {@inheritDoc} */
+ @Override public final AffinityTopologyVersion topologyVersion() {
+ return topVer;
+ }
+
+ /** {@inheritDoc} */
+ @Override public final int lookupIndex() {
+ return CACHE_MSG_IDX;
+ }
+
+ /** {@inheritDoc} */
+ @Override public final boolean addDeploymentInfo() {
+ return addDepInfo;
+ }
+
+ /** {@inheritDoc} */
+ @Override public final IgniteLogger messageLogger(GridCacheSharedContext ctx) {
+ return ctx.atomicMessageLogger();
+ }
+
+ /**
+ * @return {@code True} if near node is able to initialize update mapping locally.
+ */
+ boolean initMappingLocally() {
+ return !needPrimaryResponse() && fullSync();
+ }
+
+ /**
+ * @return {@code True} if near node waits for primary response.
+ */
+ boolean needPrimaryResponse() {
+ return isFlag(NEED_PRIMARY_RES_FLAG_MASK);
+ }
+
+ /**
+ * @param needRes {@code True} if near node waits for primary response.
+ */
+ void needPrimaryResponse(boolean needRes) {
+ setFlag(needRes, NEED_PRIMARY_RES_FLAG_MASK);
+ }
+
+ /**
+ * @return {@code True} if update is processed in {@link CacheWriteSynchronizationMode#FULL_SYNC} mode.
+ */
+ boolean fullSync() {
+ assert syncMode != null;
+
+ return syncMode == CacheWriteSynchronizationMode.FULL_SYNC;
+ }
+
+ /**
+ * @return Task name hash code.
+ */
+ public int taskNameHash() {
+ return taskNameHash;
+ }
+
+ /**
+ * @return Update opreation.
*/
- public abstract void nodeId(UUID nodeId);
+ public GridCacheOperation operation() {
+ return op;
+ }
/**
* @return Subject ID.
*/
- public abstract UUID subjectId();
+ public UUID subjectId() {
+ return subjId;
+ }
/**
- * @return Task name hash.
+ * @return Target node ID.
*/
- public abstract int taskNameHash();
+ public UUID nodeId() {
+ return nodeId;
+ }
/**
- * @return Future version.
+ * @return Near node future ID.
*/
- public abstract GridCacheVersion futureVersion();
+ public long futureId() {
+ return futId;
+ }
/**
- * @return Flag indicating whether this is fast-map udpate.
+ * @return Write synchronization mode.
*/
- public abstract boolean fastMap();
+ public final CacheWriteSynchronizationMode writeSynchronizationMode() {
+ return syncMode;
+ }
/**
- * @return Update version for fast-map request.
+ * @param res Response.
+ * @return {@code True} if current response was {@code null}.
*/
- public abstract GridCacheVersion updateVersion();
+ public boolean onResponse(GridNearAtomicUpdateResponse res) {
+ if (this.res == null) {
+ this.res = res;
+
+ return true;
+ }
+
+ return false;
+ }
/**
- * @return Topology locked flag.
+ *
*/
- public abstract boolean topologyLocked();
+ void resetResponse() {
+ this.res = null;
+ }
/**
- * @return {@code True} if request sent from client node.
+ * @return Response.
*/
- public abstract boolean clientRequest();
+ @Nullable public GridNearAtomicUpdateResponse response() {
+ return res;
+ }
/**
- * @return Cache write synchronization mode.
+ * @return {@code True} if received notification about primary fail.
*/
- public abstract CacheWriteSynchronizationMode writeSynchronizationMode();
+ boolean nodeFailedResponse() {
+ return res != null && res.nodeLeftResponse();
+ }
/**
- * @return Expiry policy.
+ * @return Topology locked flag.
*/
- public abstract ExpiryPolicy expiry();
+ final boolean topologyLocked() {
+ return isFlag(TOP_LOCKED_FLAG_MASK);
+ }
+
+ /**
+ * @param val {@code True} if topology is locked on near node.
+ */
+ private void topologyLocked(boolean val) {
+ setFlag(val, TOP_LOCKED_FLAG_MASK);
+ }
/**
* @return Return value flag.
*/
- public abstract boolean returnValue();
+ public final boolean returnValue() {
+ return isFlag(RET_VAL_FLAG_MASK);
+ }
/**
- * @return Filter.
+ * @param val Return value flag.
*/
- @Nullable public abstract CacheEntryPredicate[] filter();
+ public final void returnValue(boolean val) {
+ setFlag(val, RET_VAL_FLAG_MASK);
+ }
/**
* @return Skip write-through to a persistent storage.
*/
- public abstract boolean skipStore();
+ public final boolean skipStore() {
+ return isFlag(SKIP_STORE_FLAG_MASK);
+ }
+
+ /**
+ * @param val Skip store flag.
+ */
+ public void skipStore(boolean val) {
+ setFlag(val, SKIP_STORE_FLAG_MASK);
+ }
/**
* @return Keep binary flag.
*/
- public abstract boolean keepBinary();
+ public final boolean keepBinary() {
+ return isFlag(KEEP_BINARY_FLAG_MASK);
+ }
/**
- * @return Update operation.
+ * @param val Keep binary flag.
*/
- public abstract GridCacheOperation operation();
+ public void keepBinary(boolean val) {
+ setFlag(val, KEEP_BINARY_FLAG_MASK);
+ }
/**
- * @return Optional arguments for entry processor.
+ * Sets flag mask.
+ *
+ * @param flag Set or clear.
+ * @param mask Mask.
*/
- @Nullable public abstract Object[] invokeArguments();
+ private void setFlag(boolean flag, int mask) {
+ flags = flag ? (byte)(flags | mask) : (byte)(flags & ~mask);
+ }
/**
- * @return Flag indicating whether this request contains primary keys.
+ * Reads flag mask.
+ *
+ * @param mask Mask to read.
+ * @return Flag value.
*/
- public abstract boolean hasPrimary();
+ private boolean isFlag(int mask) {
+ return (flags & mask) != 0;
+ }
/**
- * @param res Response.
- * @return {@code True} if current response was {@code null}.
+ * @return Expiry policy.
*/
- public abstract boolean onResponse(GridNearAtomicUpdateResponse res);
+ public abstract ExpiryPolicy expiry();
/**
- * @return Response.
+ * @return Filter.
*/
- @Nullable public abstract GridNearAtomicUpdateResponse response();
+ @Nullable public abstract CacheEntryPredicate[] filter();
+
+ /**
+ * @return Optional arguments for entry processor.
+ */
+ @Nullable public abstract Object[] invokeArguments();
/**
* @param key Key to add.
@@ -145,14 +380,12 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa
* @param conflictTtl Conflict TTL (optional).
* @param conflictExpireTime Conflict expire time (optional).
* @param conflictVer Conflict version (optional).
- * @param primary If given key is primary on this mapping.
*/
- public abstract void addUpdateEntry(KeyCacheObject key,
+ abstract void addUpdateEntry(KeyCacheObject key,
@Nullable Object val,
long conflictTtl,
long conflictExpireTime,
- @Nullable GridCacheVersion conflictVer,
- boolean primary);
+ @Nullable GridCacheVersion conflictVer);
/**
* @return Keys for this update request.
@@ -182,7 +415,6 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa
*/
public abstract CacheObject writeValue(int idx);
-
/**
* @return Conflict versions.
*/
@@ -223,4 +455,170 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa
* @return Key.
*/
public abstract KeyCacheObject key(int idx);
+
+ /** {@inheritDoc} */
+ @Override public byte fieldsCount() {
+ return 10;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ writer.setBuffer(buf);
+
+ if (!super.writeTo(buf, writer))
+ return false;
+
+ if (!writer.isHeaderWritten()) {
+ if (!writer.writeHeader(directType(), fieldsCount()))
+ return false;
+
+ writer.onHeaderWritten();
+ }
+
+ switch (writer.state()) {
+ case 3:
+ if (!writer.writeByte("flags", flags))
+ return false;
+
+ writer.incrementState();
+
+ case 4:
+ if (!writer.writeLong("futId", futId))
+ return false;
+
+ writer.incrementState();
+
+ case 5:
+ if (!writer.writeByte("op", op != null ? (byte)op.ordinal() : -1))
+ return false;
+
+ writer.incrementState();
+
+ case 6:
+ if (!writer.writeUuid("subjId", subjId))
+ return false;
+
+ writer.incrementState();
+
+ case 7:
+ if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1))
+ return false;
+
+ writer.incrementState();
+
+ case 8:
+ if (!writer.writeInt("taskNameHash", taskNameHash))
+ return false;
+
+ writer.incrementState();
+
+ case 9:
+ if (!writer.writeMessage("topVer", topVer))
+ return false;
+
+ writer.incrementState();
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ reader.setBuffer(buf);
+
+ if (!reader.beforeMessageRead())
+ return false;
+
+ if (!super.readFrom(buf, reader))
+ return false;
+
+ switch (reader.state()) {
+ case 3:
+ flags = reader.readByte("flags");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 4:
+ futId = reader.readLong("futId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 5:
+ byte opOrd;
+
+ opOrd = reader.readByte("op");
+
+ if (!reader.isLastRead())
+ return false;
+
+ op = GridCacheOperation.fromOrdinal(opOrd);
+
+ reader.incrementState();
+
+ case 6:
+ subjId = reader.readUuid("subjId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 7:
+ byte syncModeOrd;
+
+ syncModeOrd = reader.readByte("syncMode");
+
+ if (!reader.isLastRead())
+ return false;
+
+ syncMode = CacheWriteSynchronizationMode.fromOrdinal(syncModeOrd);
+
+ reader.incrementState();
+
+ case 8:
+ taskNameHash = reader.readInt("taskNameHash");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 9:
+ topVer = reader.readMessage("topVer");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ }
+
+ return reader.afterMessageRead(GridNearAtomicAbstractUpdateRequest.class);
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ StringBuilder flags = new StringBuilder();
+
+ if (needPrimaryResponse())
+ appendFlag(flags, "needRes");
+ if (topologyLocked())
+ appendFlag(flags, "topLock");
+ if (skipStore())
+ appendFlag(flags, "skipStore");
+ if (keepBinary())
+ appendFlag(flags, "keepBinary");
+ if (returnValue())
+ appendFlag(flags, "retVal");
+
+ return S.toString(GridNearAtomicAbstractUpdateRequest.class, this,
+ "flags", flags.toString());
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicCheckUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicCheckUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicCheckUpdateRequest.java
new file mode 100644
index 0000000..4d0726a
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicCheckUpdateRequest.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.GridDirectTransient;
+import org.apache.ignite.internal.processors.cache.GridCacheMessage;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ *
+ */
+public class GridNearAtomicCheckUpdateRequest extends GridCacheMessage {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Cache message index. */
+ public static final int CACHE_MSG_IDX = nextIndexId();
+
+ /** */
+ @GridDirectTransient
+ private GridNearAtomicAbstractUpdateRequest updateReq;
+
+ /** */
+ private int partId;
+
+ /** */
+ private long futId;
+
+ /**
+ *
+ */
+ public GridNearAtomicCheckUpdateRequest() {
+ // No-op.
+ }
+
+ /**
+ * @param updateReq Related update request.
+ */
+ GridNearAtomicCheckUpdateRequest(GridNearAtomicAbstractUpdateRequest updateReq) {
+ assert updateReq != null && updateReq.fullSync() : updateReq;
+
+ this.updateReq = updateReq;
+ this.cacheId = updateReq.cacheId();
+ this.partId = updateReq.partition();
+ this.futId = updateReq.futureId();
+
+ assert partId >= 0;
+ }
+
+ /**
+ * @return Future ID on near node.
+ */
+ public final long futureId() {
+ return futId;
+ }
+
+ /**
+ * @return Related update request.
+ */
+ GridNearAtomicAbstractUpdateRequest updateRequest() {
+ return updateReq;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int partition() {
+ return partId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int lookupIndex() {
+ return CACHE_MSG_IDX;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean addDeploymentInfo() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte directType() {
+ return -47;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte fieldsCount() {
+ return 5;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ writer.setBuffer(buf);
+
+ if (!super.writeTo(buf, writer))
+ return false;
+
+ if (!writer.isHeaderWritten()) {
+ if (!writer.writeHeader(directType(), fieldsCount()))
+ return false;
+
+ writer.onHeaderWritten();
+ }
+
+ switch (writer.state()) {
+ case 3:
+ if (!writer.writeLong("futId", futId))
+ return false;
+
+ writer.incrementState();
+
+ case 4:
+ if (!writer.writeInt("partId", partId))
+ return false;
+
+ writer.incrementState();
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ reader.setBuffer(buf);
+
+ if (!reader.beforeMessageRead())
+ return false;
+
+ if (!super.readFrom(buf, reader))
+ return false;
+
+ switch (reader.state()) {
+ case 3:
+ futId = reader.readLong("futId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 4:
+ partId = reader.readInt("partId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ }
+
+ return reader.afterMessageRead(GridNearAtomicCheckUpdateRequest.class);
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridNearAtomicCheckUpdateRequest.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
index 08c2474..c381333 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
@@ -26,7 +26,6 @@ import java.util.UUID;
import javax.cache.expiry.ExpiryPolicy;
import javax.cache.processor.EntryProcessor;
import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.internal.GridDirectCollection;
import org.apache.ignite.internal.GridDirectTransient;
@@ -41,6 +40,7 @@ import org.apache.ignite.internal.processors.cache.distributed.IgniteExternaliza
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.GridLongList;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -61,56 +61,6 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
/** */
private static final long serialVersionUID = 0L;
- /** Target node ID. */
- @GridDirectTransient
- private UUID nodeId;
-
- /** Future version. */
- private GridCacheVersion futVer;
-
- /** Update version. Set to non-null if fastMap is {@code true}. */
- private GridCacheVersion updateVer;
-
- /** Topology version. */
- private AffinityTopologyVersion topVer;
-
- /** Write synchronization mode. */
- private CacheWriteSynchronizationMode syncMode;
-
- /** Update operation. */
- private GridCacheOperation op;
-
- /** Subject ID. */
- protected UUID subjId;
-
- /** Task name hash. */
- protected int taskNameHash;
-
- /** */
- @GridDirectTransient
- private GridNearAtomicUpdateResponse res;
-
- /** Fast map flag. */
- protected boolean fastMap;
-
- /** Topology locked flag. Set if atomic update is performed inside TX or explicit lock. */
- protected boolean topLocked;
-
- /** Flag indicating whether request contains primary keys. */
- protected boolean hasPrimary;
-
- /** Skip write-through to a persistent storage. */
- protected boolean skipStore;
-
- /** */
- protected boolean clientReq;
-
- /** Keep binary flag. */
- protected boolean keepBinary;
-
- /** Return value flag. */
- protected boolean retval;
-
/** Keys to update. */
@GridToStringInclude
@GridDirectCollection(KeyCacheObject.class)
@@ -120,10 +70,6 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
@GridDirectCollection(CacheObject.class)
private List<CacheObject> vals;
- /** Partitions of keys. */
- @GridDirectCollection(int.class)
- private List<Integer> partIds;
-
/** Entry processors. */
@GridDirectTransient
private List<EntryProcessor<Object, Object, Object>> entryProcessors;
@@ -175,9 +121,7 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
*
* @param cacheId Cache ID.
* @param nodeId Node ID.
- * @param futVer Future version.
- * @param fastMap Fast map scheme flag.
- * @param updateVer Update version set if fast map is performed.
+ * @param futId Future ID.
* @param topVer Topology version.
* @param topLocked Topology locked flag.
* @param syncMode Synchronization mode.
@@ -190,16 +134,13 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
* @param taskNameHash Task name hash code.
* @param skipStore Skip write-through to a persistent storage.
* @param keepBinary Keep binary flag.
- * @param clientReq Client node request flag.
* @param addDepInfo Deployment info flag.
* @param maxEntryCnt Maximum entries count.
*/
GridNearAtomicFullUpdateRequest(
int cacheId,
UUID nodeId,
- GridCacheVersion futVer,
- boolean fastMap,
- @Nullable GridCacheVersion updateVer,
+ long futId,
@NotNull AffinityTopologyVersion topVer,
boolean topLocked,
CacheWriteSynchronizationMode syncMode,
@@ -210,34 +151,29 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
@Nullable CacheEntryPredicate[] filter,
@Nullable UUID subjId,
int taskNameHash,
+ boolean needPrimaryRes,
boolean skipStore,
boolean keepBinary,
- boolean clientReq,
boolean addDepInfo,
int maxEntryCnt
) {
- assert futVer != null;
-
- this.cacheId = cacheId;
- this.nodeId = nodeId;
- this.futVer = futVer;
- this.fastMap = fastMap;
- this.updateVer = updateVer;
-
- this.topVer = topVer;
- this.topLocked = topLocked;
- this.syncMode = syncMode;
- this.op = op;
- this.retval = retval;
+ super(cacheId,
+ nodeId,
+ futId,
+ topVer,
+ topLocked,
+ syncMode,
+ op,
+ retval,
+ subjId,
+ taskNameHash,
+ needPrimaryRes,
+ skipStore,
+ keepBinary,
+ addDepInfo);
this.expiryPlc = expiryPlc;
this.invokeArgs = invokeArgs;
this.filter = filter;
- this.subjId = subjId;
- this.taskNameHash = taskNameHash;
- this.skipStore = skipStore;
- this.keepBinary = keepBinary;
- this.clientReq = clientReq;
- this.addDepInfo = addDepInfo;
// By default ArrayList expands to array of 10 elements on first add. We cannot guess how many entries
// will be added to request because of unknown affinity distribution. However, we DO KNOW how many keys
@@ -246,84 +182,6 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
initSize = Math.min(maxEntryCnt, 10);
keys = new ArrayList<>(initSize);
-
- partIds = new ArrayList<>(initSize);
- }
-
- /** {@inheritDoc} */
- @Override public int lookupIndex() {
- return CACHE_MSG_IDX;
- }
-
- /** {@inheritDoc} */
- @Override public UUID nodeId() {
- return nodeId;
- }
-
- /** {@inheritDoc} */
- @Override public void nodeId(UUID nodeId) {
- this.nodeId = nodeId;
- }
-
- /** {@inheritDoc} */
- @Override public UUID subjectId() {
- return subjId;
- }
-
- /** {@inheritDoc} */
- @Override public int taskNameHash() {
- return taskNameHash;
- }
-
- /** {@inheritDoc} */
- @Override public GridCacheVersion futureVersion() {
- return futVer;
- }
-
- /** {@inheritDoc} */
- @Override public GridCacheVersion updateVersion() {
- return updateVer;
- }
-
- /** {@inheritDoc} */
- @Override public AffinityTopologyVersion topologyVersion() {
- return topVer;
- }
-
- /** {@inheritDoc} */
- @Override public CacheWriteSynchronizationMode writeSynchronizationMode() {
- return syncMode;
- }
-
- /** {@inheritDoc} */
- @Override public GridCacheOperation operation() {
- return op;
- }
-
- /** {@inheritDoc} */
- @Override public boolean onResponse(GridNearAtomicUpdateResponse res) {
- if (this.res == null) {
- this.res = res;
-
- return true;
- }
-
- return false;
- }
-
- /** {@inheritDoc} */
- @Override @Nullable public GridNearAtomicUpdateResponse response() {
- return res;
- }
-
- /** {@inheritDoc} */
- @Override public boolean addDeploymentInfo() {
- return addDepInfo;
- }
-
- /** {@inheritDoc} */
- @Override public IgniteLogger messageLogger(GridCacheSharedContext ctx) {
- return ctx.atomicMessageLogger();
}
/** {@inheritDoc} */
@@ -331,8 +189,7 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
@Nullable Object val,
long conflictTtl,
long conflictExpireTime,
- @Nullable GridCacheVersion conflictVer,
- boolean primary) {
+ @Nullable GridCacheVersion conflictVer) {
EntryProcessor<Object, Object, Object> entryProcessor = null;
if (op == TRANSFORM) {
@@ -344,7 +201,6 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
assert val != null || op == DELETE;
keys.add(key);
- partIds.add(key.partition());
if (entryProcessor != null) {
if (entryProcessors == null)
@@ -361,8 +217,6 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
vals.add((CacheObject)val);
}
- hasPrimary |= primary;
-
// In case there is no conflict, do not create the list.
if (conflictVer != null) {
if (conflictVers == null) {
@@ -407,6 +261,8 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
/** {@inheritDoc} */
@Override public int size() {
+ assert keys != null;
+
return keys != null ? keys.size() : 0;
}
@@ -488,41 +344,6 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
}
/** {@inheritDoc} */
- @Override public boolean fastMap() {
- return fastMap;
- }
-
- /** {@inheritDoc} */
- @Override public boolean topologyLocked() {
- return topLocked;
- }
-
- /** {@inheritDoc} */
- @Override public boolean clientRequest() {
- return clientReq;
- }
-
- /** {@inheritDoc} */
- @Override public boolean returnValue() {
- return retval;
- }
-
- /** {@inheritDoc} */
- @Override public boolean skipStore() {
- return skipStore;
- }
-
- /** {@inheritDoc} */
- @Override public boolean keepBinary() {
- return keepBinary;
- }
-
- /** {@inheritDoc} */
- @Override public boolean hasPrimary() {
- return hasPrimary;
- }
-
- /** {@inheritDoc} */
@Override @Nullable public CacheEntryPredicate[] filter() {
return filter;
}
@@ -600,18 +421,13 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
}
else
finishUnmarshalCacheObjects(vals, cctx, ldr);
-
- if (partIds != null && !partIds.isEmpty()) {
- assert partIds.size() == keys.size();
-
- for (int i = 0; i < keys.size(); i++)
- keys.get(i).partition(partIds.get(i));
- }
}
/** {@inheritDoc} */
@Override public int partition() {
- return partIds != null && !partIds.isEmpty() ? partIds.get(0) : -1;
+ assert !F.isEmpty(keys);
+
+ return keys.get(0).partition();
}
/** {@inheritDoc} */
@@ -629,145 +445,55 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
}
switch (writer.state()) {
- case 3:
- if (!writer.writeBoolean("clientReq", clientReq))
- return false;
-
- writer.incrementState();
-
- case 4:
- if (!writer.writeMessage("conflictExpireTimes", conflictExpireTimes))
- return false;
-
- writer.incrementState();
-
- case 5:
- if (!writer.writeMessage("conflictTtls", conflictTtls))
- return false;
-
- writer.incrementState();
-
- case 6:
- if (!writer.writeCollection("conflictVers", conflictVers, MessageCollectionItemType.MSG))
- return false;
-
- writer.incrementState();
-
- case 7:
- if (!writer.writeCollection("entryProcessorsBytes", entryProcessorsBytes, MessageCollectionItemType.BYTE_ARR))
- return false;
-
- writer.incrementState();
-
- case 8:
- if (!writer.writeByteArray("expiryPlcBytes", expiryPlcBytes))
- return false;
-
- writer.incrementState();
-
- case 9:
- if (!writer.writeBoolean("fastMap", fastMap))
- return false;
-
- writer.incrementState();
-
case 10:
- if (!writer.writeObjectArray("filter", filter, MessageCollectionItemType.MSG))
+ if (!writer.writeMessage("conflictExpireTimes", conflictExpireTimes))
return false;
writer.incrementState();
case 11:
- if (!writer.writeMessage("futVer", futVer))
+ if (!writer.writeMessage("conflictTtls", conflictTtls))
return false;
writer.incrementState();
case 12:
- if (!writer.writeBoolean("hasPrimary", hasPrimary))
+ if (!writer.writeCollection("conflictVers", conflictVers, MessageCollectionItemType.MSG))
return false;
writer.incrementState();
case 13:
- if (!writer.writeObjectArray("invokeArgsBytes", invokeArgsBytes, MessageCollectionItemType.BYTE_ARR))
+ if (!writer.writeCollection("entryProcessorsBytes", entryProcessorsBytes, MessageCollectionItemType.BYTE_ARR))
return false;
writer.incrementState();
case 14:
- if (!writer.writeBoolean("keepBinary", keepBinary))
+ if (!writer.writeByteArray("expiryPlcBytes", expiryPlcBytes))
return false;
writer.incrementState();
case 15:
- if (!writer.writeCollection("keys", keys, MessageCollectionItemType.MSG))
+ if (!writer.writeObjectArray("filter", filter, MessageCollectionItemType.MSG))
return false;
writer.incrementState();
case 16:
- if (!writer.writeByte("op", op != null ? (byte)op.ordinal() : -1))
+ if (!writer.writeObjectArray("invokeArgsBytes", invokeArgsBytes, MessageCollectionItemType.BYTE_ARR))
return false;
writer.incrementState();
case 17:
- if (!writer.writeCollection("partIds", partIds, MessageCollectionItemType.INT))
+ if (!writer.writeCollection("keys", keys, MessageCollectionItemType.MSG))
return false;
writer.incrementState();
case 18:
- if (!writer.writeBoolean("retval", retval))
- return false;
-
- writer.incrementState();
-
- case 19:
- if (!writer.writeBoolean("skipStore", skipStore))
- return false;
-
- writer.incrementState();
-
- case 20:
- if (!writer.writeUuid("subjId", subjId))
- return false;
-
- writer.incrementState();
-
- case 21:
- if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1))
- return false;
-
- writer.incrementState();
-
- case 22:
- if (!writer.writeInt("taskNameHash", taskNameHash))
- return false;
-
- writer.incrementState();
-
- case 23:
- if (!writer.writeBoolean("topLocked", topLocked))
- return false;
-
- writer.incrementState();
-
- case 24:
- if (!writer.writeMessage("topVer", topVer))
- return false;
-
- writer.incrementState();
-
- case 25:
- if (!writer.writeMessage("updateVer", updateVer))
- return false;
-
- writer.incrementState();
-
- case 26:
if (!writer.writeCollection("vals", vals, MessageCollectionItemType.MSG))
return false;
@@ -789,64 +515,8 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
return false;
switch (reader.state()) {
- case 3:
- clientReq = reader.readBoolean("clientReq");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 4:
- conflictExpireTimes = reader.readMessage("conflictExpireTimes");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 5:
- conflictTtls = reader.readMessage("conflictTtls");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 6:
- conflictVers = reader.readCollection("conflictVers", MessageCollectionItemType.MSG);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 7:
- entryProcessorsBytes = reader.readCollection("entryProcessorsBytes", MessageCollectionItemType.BYTE_ARR);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 8:
- expiryPlcBytes = reader.readByteArray("expiryPlcBytes");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 9:
- fastMap = reader.readBoolean("fastMap");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
case 10:
- filter = reader.readObjectArray("filter", MessageCollectionItemType.MSG, CacheEntryPredicate.class);
+ conflictExpireTimes = reader.readMessage("conflictExpireTimes");
if (!reader.isLastRead())
return false;
@@ -854,7 +524,7 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
reader.incrementState();
case 11:
- futVer = reader.readMessage("futVer");
+ conflictTtls = reader.readMessage("conflictTtls");
if (!reader.isLastRead())
return false;
@@ -862,7 +532,7 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
reader.incrementState();
case 12:
- hasPrimary = reader.readBoolean("hasPrimary");
+ conflictVers = reader.readCollection("conflictVers", MessageCollectionItemType.MSG);
if (!reader.isLastRead())
return false;
@@ -870,7 +540,7 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
reader.incrementState();
case 13:
- invokeArgsBytes = reader.readObjectArray("invokeArgsBytes", MessageCollectionItemType.BYTE_ARR, byte[].class);
+ entryProcessorsBytes = reader.readCollection("entryProcessorsBytes", MessageCollectionItemType.BYTE_ARR);
if (!reader.isLastRead())
return false;
@@ -878,7 +548,7 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
reader.incrementState();
case 14:
- keepBinary = reader.readBoolean("keepBinary");
+ expiryPlcBytes = reader.readByteArray("expiryPlcBytes");
if (!reader.isLastRead())
return false;
@@ -886,7 +556,7 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
reader.incrementState();
case 15:
- keys = reader.readCollection("keys", MessageCollectionItemType.MSG);
+ filter = reader.readObjectArray("filter", MessageCollectionItemType.MSG, CacheEntryPredicate.class);
if (!reader.isLastRead())
return false;
@@ -894,19 +564,15 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
reader.incrementState();
case 16:
- byte opOrd;
-
- opOrd = reader.readByte("op");
+ invokeArgsBytes = reader.readObjectArray("invokeArgsBytes", MessageCollectionItemType.BYTE_ARR, byte[].class);
if (!reader.isLastRead())
return false;
- op = GridCacheOperation.fromOrdinal(opOrd);
-
reader.incrementState();
case 17:
- partIds = reader.readCollection("partIds", MessageCollectionItemType.INT);
+ keys = reader.readCollection("keys", MessageCollectionItemType.MSG);
if (!reader.isLastRead())
return false;
@@ -914,74 +580,6 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
reader.incrementState();
case 18:
- retval = reader.readBoolean("retval");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 19:
- skipStore = reader.readBoolean("skipStore");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 20:
- subjId = reader.readUuid("subjId");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 21:
- byte syncModeOrd;
-
- syncModeOrd = reader.readByte("syncMode");
-
- if (!reader.isLastRead())
- return false;
-
- syncMode = CacheWriteSynchronizationMode.fromOrdinal(syncModeOrd);
-
- reader.incrementState();
-
- case 22:
- taskNameHash = reader.readInt("taskNameHash");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 23:
- topLocked = reader.readBoolean("topLocked");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 24:
- topVer = reader.readMessage("topVer");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 25:
- updateVer = reader.readMessage("updateVer");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 26:
vals = reader.readCollection("vals", MessageCollectionItemType.MSG);
if (!reader.isLastRead())
@@ -1013,12 +611,13 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 27;
+ return 19;
}
/** {@inheritDoc} */
@Override public String toString() {
- return S.toString(GridNearAtomicFullUpdateRequest.class, this, "filter", Arrays.toString(filter),
+ return S.toString(GridNearAtomicFullUpdateRequest.class, this,
+ "filter", Arrays.toString(filter),
"parent", super.toString());
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFilterRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFilterRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFilterRequest.java
index e0c24b2..78582b0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFilterRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFilterRequest.java
@@ -58,9 +58,7 @@ public class GridNearAtomicSingleUpdateFilterRequest extends GridNearAtomicSingl
*
* @param cacheId Cache ID.
* @param nodeId Node ID.
- * @param futVer Future version.
- * @param fastMap Fast map scheme flag.
- * @param updateVer Update version set if fast map is performed.
+ * @param futId Future ID.
* @param topVer Topology version.
* @param topLocked Topology locked flag.
* @param syncMode Synchronization mode.
@@ -71,15 +69,12 @@ public class GridNearAtomicSingleUpdateFilterRequest extends GridNearAtomicSingl
* @param taskNameHash Task name hash code.
* @param skipStore Skip write-through to a persistent storage.
* @param keepBinary Keep binary flag.
- * @param clientReq Client node request flag.
* @param addDepInfo Deployment info flag.
*/
GridNearAtomicSingleUpdateFilterRequest(
int cacheId,
UUID nodeId,
- GridCacheVersion futVer,
- boolean fastMap,
- @Nullable GridCacheVersion updateVer,
+ long futId,
@NotNull AffinityTopologyVersion topVer,
boolean topLocked,
CacheWriteSynchronizationMode syncMode,
@@ -88,17 +83,15 @@ public class GridNearAtomicSingleUpdateFilterRequest extends GridNearAtomicSingl
@Nullable CacheEntryPredicate[] filter,
@Nullable UUID subjId,
int taskNameHash,
+ boolean needPrimaryRes,
boolean skipStore,
boolean keepBinary,
- boolean clientReq,
boolean addDepInfo
) {
super(
cacheId,
nodeId,
- futVer,
- fastMap,
- updateVer,
+ futId,
topVer,
topLocked,
syncMode,
@@ -106,9 +99,9 @@ public class GridNearAtomicSingleUpdateFilterRequest extends GridNearAtomicSingl
retval,
subjId,
taskNameHash,
+ needPrimaryRes,
skipStore,
keepBinary,
- clientReq,
addDepInfo
);
@@ -173,7 +166,7 @@ public class GridNearAtomicSingleUpdateFilterRequest extends GridNearAtomicSingl
}
switch (writer.state()) {
- case 14:
+ case 12:
if (!writer.writeObjectArray("filter", filter, MessageCollectionItemType.MSG))
return false;
@@ -195,7 +188,7 @@ public class GridNearAtomicSingleUpdateFilterRequest extends GridNearAtomicSingl
return false;
switch (reader.state()) {
- case 14:
+ case 12:
filter = reader.readObjectArray("filter", MessageCollectionItemType.MSG, CacheEntryPredicate.class);
if (!reader.isLastRead())
@@ -215,7 +208,7 @@ public class GridNearAtomicSingleUpdateFilterRequest extends GridNearAtomicSingl
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 15;
+ return 13;
}
/** {@inheritDoc} */