You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/03/13 16:35:33 UTC
[41/50] [abbrv] ignite git commit: ignite-4705 Atomic cache protocol
change: notify client node from backups
http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicDeferredUpdateResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicDeferredUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicDeferredUpdateResponse.java
index 923b220..9fe183f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicDeferredUpdateResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicDeferredUpdateResponse.java
@@ -19,17 +19,18 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
import java.io.Externalizable;
import java.nio.ByteBuffer;
-import java.util.Collection;
import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.internal.GridDirectCollection;
+import org.apache.ignite.internal.GridDirectTransient;
import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
import org.apache.ignite.internal.processors.cache.GridCacheMessage;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
-import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
+import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
+import org.apache.ignite.internal.util.GridLongList;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+import org.jetbrains.annotations.Nullable;
/**
* Deferred dht atomic update response.
@@ -42,13 +43,12 @@ public class GridDhtAtomicDeferredUpdateResponse extends GridCacheMessage implem
public static final int CACHE_MSG_IDX = nextIndexId();
/** ACK future versions. */
- @GridDirectCollection(GridCacheVersion.class)
- private Collection<GridCacheVersion> futVers;
+ private GridLongList futIds;
- /** {@inheritDoc} */
- @Override public int lookupIndex() {
- return CACHE_MSG_IDX;
- }
+ /** */
+ @GridDirectTransient
+ @GridToStringExclude
+ private GridTimeoutObject timeoutSnd;
/**
* Empty constructor required by {@link Externalizable}
@@ -61,27 +61,42 @@ public class GridDhtAtomicDeferredUpdateResponse extends GridCacheMessage implem
* Constructor.
*
* @param cacheId Cache ID.
- * @param futVers Future versions.
- * @param addDepInfo Deployment info.
+ * @param futIds Future IDs.
*/
- public GridDhtAtomicDeferredUpdateResponse(int cacheId, Collection<GridCacheVersion> futVers, boolean addDepInfo) {
- assert !F.isEmpty(futVers);
-
+ public GridDhtAtomicDeferredUpdateResponse(int cacheId, GridLongList futIds) {
this.cacheId = cacheId;
- this.futVers = futVers;
- this.addDepInfo = addDepInfo;
+ this.futIds = futIds;
+ }
+
+ /**
+ * @param timeoutSnd Callback sending response on timeout.
+ */
+ void timeoutSender(@Nullable GridTimeoutObject timeoutSnd) {
+ this.timeoutSnd = timeoutSnd;
+ }
+
+ /**
+ * @return Callback sending response on timeout.
+ */
+ @Nullable GridTimeoutObject timeoutSender() {
+ return timeoutSnd;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int lookupIndex() {
+ return CACHE_MSG_IDX;
}
/** {@inheritDoc} */
@Override public boolean addDeploymentInfo() {
- return addDepInfo;
+ return false;
}
/**
- * @return List of ACKed future versions.
+ * @return List of ACKed future ids.
*/
- public Collection<GridCacheVersion> futureVersions() {
- return futVers;
+ GridLongList futureIds() {
+ return futIds;
}
/** {@inheritDoc} */
@@ -105,7 +120,7 @@ public class GridDhtAtomicDeferredUpdateResponse extends GridCacheMessage implem
switch (writer.state()) {
case 3:
- if (!writer.writeCollection("futVers", futVers, MessageCollectionItemType.MSG))
+ if (!writer.writeMessage("futIds", futIds))
return false;
writer.incrementState();
@@ -127,7 +142,7 @@ public class GridDhtAtomicDeferredUpdateResponse extends GridCacheMessage implem
switch (reader.state()) {
case 3:
- futVers = reader.readCollection("futVers", MessageCollectionItemType.MSG);
+ futIds = reader.readMessage("futIds");
if (!reader.isLastRead())
return false;
@@ -148,4 +163,9 @@ public class GridDhtAtomicDeferredUpdateResponse extends GridCacheMessage implem
@Override public byte fieldsCount() {
return 4;
}
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridDhtAtomicDeferredUpdateResponse.class, this);
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicNearResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicNearResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicNearResponse.java
new file mode 100644
index 0000000..08a7e28
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicNearResponse.java
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
+
+import java.nio.ByteBuffer;
+import java.util.UUID;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.cache.GridCacheMessage;
+import org.apache.ignite.internal.processors.cache.GridCacheReturn;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateRequest.DHT_ATOMIC_HAS_RESULT_MASK;
+
+/**
+ * Message sent from DHT nodes to near node in FULL_SYNC mode.
+ */
+public class GridDhtAtomicNearResponse extends GridCacheMessage {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Message index. */
+ public static final int CACHE_MSG_IDX = nextIndexId();
+
+ /** */
+ private int partId;
+
+ /** */
+ private long futId;
+
+ /** */
+ private UUID primaryId;
+
+ /** */
+ @GridToStringExclude
+ private byte flags;
+
+ /** */
+ @GridToStringInclude
+ private UpdateErrors errs;
+
+ /**
+ *
+ */
+ public GridDhtAtomicNearResponse() {
+ // No-op.
+ }
+
+ /**
+ * @param cacheId Cache ID.
+ * @param partId Partition.
+ * @param futId Future ID.
+ * @param primaryId Primary node ID.
+ * @param flags Flags.
+ */
+ public GridDhtAtomicNearResponse(int cacheId,
+ int partId,
+ long futId,
+ UUID primaryId,
+ byte flags)
+ {
+ assert primaryId != null;
+
+ this.cacheId = cacheId;
+ this.partId = partId;
+ this.futId = futId;
+ this.primaryId = primaryId;
+ this.flags = flags;
+ }
+
+ /**
+ * @return Errors.
+ */
+ @Nullable UpdateErrors errors() {
+ return errs;
+ }
+
+ /**
+ * @param errs Errors.
+ */
+ public void errors(UpdateErrors errs) {
+ this.errs = errs;
+ }
+
+ /**
+ * @return Primary node ID.
+ */
+ UUID primaryId() {
+ return primaryId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int partition() {
+ return partId;
+ }
+
+ /**
+ * @param key Key.
+ * @param e Error.
+ */
+ public void addFailedKey(KeyCacheObject key, Throwable e) {
+ if (errs == null)
+ errs = new UpdateErrors();
+
+ errs.addFailedKey(key, e);
+ }
+
+ /**
+ * @return Operation result.
+ */
+ public GridCacheReturn result() {
+ assert hasResult() : this;
+
+ return new GridCacheReturn(true, true);
+ }
+
+ /**
+ * @return {@code True} if response contains operation result.
+ */
+ boolean hasResult() {
+ return isFlag(DHT_ATOMIC_HAS_RESULT_MASK);
+ }
+
+ /**
+ * Reads flag mask.
+ *
+ * @param mask Mask to read.
+ * @return Flag value.
+ */
+ private boolean isFlag(int mask) {
+ return (flags & mask) != 0;
+ }
+
+ /**
+ * @return Future ID.
+ */
+ public long futureId() {
+ return futId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int lookupIndex() {
+ return CACHE_MSG_IDX;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte directType() {
+ return -45;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte fieldsCount() {
+ return 8;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean addDeploymentInfo() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
+ super.prepareMarshal(ctx);
+
+ if (errs != null)
+ errs.prepareMarshal(this, ctx.cacheContext(cacheId));
+ }
+
+ /** {@inheritDoc} */
+ @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException {
+ super.finishUnmarshal(ctx, ldr);
+
+ if (errs != null)
+ errs.finishUnmarshal(this, ctx.cacheContext(cacheId), ldr);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ writer.setBuffer(buf);
+
+ if (!super.writeTo(buf, writer))
+ return false;
+
+ if (!writer.isHeaderWritten()) {
+ if (!writer.writeHeader(directType(), fieldsCount()))
+ return false;
+
+ writer.onHeaderWritten();
+ }
+
+ switch (writer.state()) {
+ case 3:
+ if (!writer.writeMessage("errs", errs))
+ return false;
+
+ writer.incrementState();
+
+ case 4:
+ if (!writer.writeByte("flags", flags))
+ return false;
+
+ writer.incrementState();
+
+ case 5:
+ if (!writer.writeLong("futId", futId))
+ return false;
+
+ writer.incrementState();
+
+ case 6:
+ if (!writer.writeInt("partId", partId))
+ return false;
+
+ writer.incrementState();
+
+ case 7:
+ if (!writer.writeUuid("primaryId", primaryId))
+ return false;
+
+ writer.incrementState();
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ reader.setBuffer(buf);
+
+ if (!reader.beforeMessageRead())
+ return false;
+
+ if (!super.readFrom(buf, reader))
+ return false;
+
+ switch (reader.state()) {
+ case 3:
+ errs = reader.readMessage("errs");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 4:
+ flags = reader.readByte("flags");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 5:
+ futId = reader.readLong("futId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 6:
+ partId = reader.readInt("partId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 7:
+ primaryId = reader.readUuid("primaryId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ }
+
+ return reader.afterMessageRead(GridDhtAtomicNearResponse.class);
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ StringBuilder flags = new StringBuilder();
+
+ if (hasResult())
+ appendFlag(flags, "hasRes");
+
+ return S.toString(GridDhtAtomicNearResponse.class, this,
+ "flags", flags.toString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
index 0dc2754..8ebe9c3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
@@ -24,16 +24,11 @@ import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.internal.util.typedef.CI2;
-import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteProductVersion;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@@ -45,64 +40,45 @@ class GridDhtAtomicSingleUpdateFuture extends GridDhtAtomicAbstractUpdateFuture
private static final long serialVersionUID = 0L;
/** */
- private static final IgniteProductVersion SINGLE_UPDATE_REQUEST = IgniteProductVersion.fromString("1.7.4");
-
- /** Future keys. */
- private KeyCacheObject key;
-
- /** Entries with readers. */
- private GridDhtCacheEntry nearReaderEntry;
+ private boolean allUpdated;
/**
* @param cctx Cache context.
- * @param completionCb Callback to invoke when future is completed.
* @param writeVer Write version.
* @param updateReq Update request.
- * @param updateRes Update response.
*/
GridDhtAtomicSingleUpdateFuture(
GridCacheContext cctx,
- CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> completionCb,
GridCacheVersion writeVer,
- GridNearAtomicAbstractUpdateRequest updateReq,
- GridNearAtomicUpdateResponse updateRes
+ GridNearAtomicAbstractUpdateRequest updateReq
) {
- super(cctx,
- completionCb,
- writeVer,
- updateReq,
- updateRes);
+ super(cctx, writeVer, updateReq);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected boolean sendAllToDht() {
+ return allUpdated;
}
/** {@inheritDoc} */
@Override protected void addDhtKey(KeyCacheObject key, List<ClusterNode> dhtNodes) {
- assert this.key == null || this.key.equals(key) : this.key;
+ if (mappings == null) {
+ allUpdated = true;
- if (mappings == null)
mappings = U.newHashMap(dhtNodes.size());
-
- this.key = key;
+ }
}
/** {@inheritDoc} */
@Override protected void addNearKey(KeyCacheObject key, Collection<UUID> readers) {
- assert this.key == null || this.key.equals(key) : this.key;
-
if (mappings == null)
mappings = U.newHashMap(readers.size());
-
- this.key = key;
- }
-
- /** {@inheritDoc} */
- @Override protected void addNearReaderEntry(GridDhtCacheEntry entry) {
- nearReaderEntry = entry;
}
/** {@inheritDoc} */
@Override protected GridDhtAtomicAbstractUpdateRequest createRequest(
- ClusterNode node,
- GridCacheVersion futVer,
+ UUID nodeId,
+ long futId,
GridCacheVersion writeVer,
CacheWriteSynchronizationMode syncMode,
@NotNull AffinityTopologyVersion topVer,
@@ -110,11 +86,11 @@ class GridDhtAtomicSingleUpdateFuture extends GridDhtAtomicAbstractUpdateFuture
long conflictExpireTime,
@Nullable GridCacheVersion conflictVer
) {
- if (canUseSingleRequest(node, ttl, conflictExpireTime, conflictVer)) {
+ if (canUseSingleRequest(ttl, conflictExpireTime, conflictVer)) {
return new GridDhtAtomicSingleUpdateRequest(
cctx.cacheId(),
- node.id(),
- futVer,
+ nodeId,
+ futId,
writeVer,
syncMode,
topVer,
@@ -127,68 +103,37 @@ class GridDhtAtomicSingleUpdateFuture extends GridDhtAtomicAbstractUpdateFuture
else {
return new GridDhtAtomicUpdateRequest(
cctx.cacheId(),
- node.id(),
- futVer,
+ nodeId,
+ futId,
writeVer,
syncMode,
topVer,
- false,
updateReq.subjectId(),
updateReq.taskNameHash(),
null,
cctx.deploymentEnabled(),
updateReq.keepBinary(),
- updateReq.skipStore());
+ updateReq.skipStore(),
+ false);
}
}
- /** {@inheritDoc} */
- @Override public void onResult(UUID nodeId, GridDhtAtomicUpdateResponse updateRes) {
- if (log.isDebugEnabled())
- log.debug("Received DHT atomic update future result [nodeId=" + nodeId + ", updateRes=" + updateRes + ']');
-
- if (updateRes.error() != null)
- this.updateRes.addFailedKeys(updateRes.failedKeys(), updateRes.error());
-
- if (!F.isEmpty(updateRes.nearEvicted())) {
- try {
- assert nearReaderEntry != null;
-
- nearReaderEntry.removeReader(nodeId, updateRes.messageId());
- }
- catch (GridCacheEntryRemovedException e) {
- if (log.isDebugEnabled())
- log.debug("Entry with evicted reader was removed [entry=" + nearReaderEntry + ", err=" + e + ']');
- }
- }
-
- registerResponse(nodeId);
- }
-
- /** {@inheritDoc} */
- @Override protected void addFailedKeys(GridNearAtomicUpdateResponse updateRes, Throwable err) {
- updateRes.addFailedKey(key, err);
- }
-
/**
- * @param node Target node
* @param ttl TTL.
* @param conflictExpireTime Conflict expire time.
* @param conflictVer Conflict version.
* @return {@code True} if it is possible to use {@link GridDhtAtomicSingleUpdateRequest}.
*/
- private boolean canUseSingleRequest(ClusterNode node,
- long ttl,
+ private boolean canUseSingleRequest(long ttl,
long conflictExpireTime,
@Nullable GridCacheVersion conflictVer) {
- return node.version().compareToIgnoreTimestamp(SINGLE_UPDATE_REQUEST) >= 0 &&
- (ttl == CU.TTL_NOT_CHANGED) &&
+ return (ttl == CU.TTL_NOT_CHANGED) &&
(conflictExpireTime == CU.EXPIRE_TIME_CALCULATE) &&
conflictVer == null;
}
/** {@inheritDoc} */
@Override public String toString() {
- return S.toString(GridDhtAtomicSingleUpdateFuture.class, this);
+ return S.toString(GridDhtAtomicSingleUpdateFuture.class, this, "super", super.toString());
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java
index a7e6c24..6b92c02 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java
@@ -38,9 +38,6 @@ import org.apache.ignite.plugin.extensions.communication.MessageWriter;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
-import static org.apache.ignite.internal.processors.cache.GridCacheUtils.KEEP_BINARY_FLAG_MASK;
-import static org.apache.ignite.internal.processors.cache.GridCacheUtils.SKIP_STORE_FLAG_MASK;
-
/**
*
*/
@@ -48,30 +45,6 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat
/** */
private static final long serialVersionUID = 0L;
- /** Near cache key flag. */
- private static final int NEAR_FLAG_MASK = 0x80;
-
- /** Future version. */
- protected GridCacheVersion futVer;
-
- /** Write version. */
- protected GridCacheVersion writeVer;
-
- /** Write synchronization mode. */
- protected CacheWriteSynchronizationMode syncMode;
-
- /** Topology version. */
- protected AffinityTopologyVersion topVer;
-
- /** Subject ID. */
- protected UUID subjId;
-
- /** Task name hash. */
- protected int taskNameHash;
-
- /** Additional flags. */
- protected byte flags;
-
/** Key to update. */
@GridToStringInclude
protected KeyCacheObject key;
@@ -87,9 +60,6 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat
/** Partition. */
protected long updateCntr;
- /** */
- protected int partId;
-
/**
* Empty constructor required by {@link Externalizable}.
*/
@@ -102,7 +72,7 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat
*
* @param cacheId Cache ID.
* @param nodeId Node ID.
- * @param futVer Future version.
+ * @param futId Future ID.
* @param writeVer Write version for cache values.
* @param syncMode Cache write synchronization mode.
* @param topVer Topology version.
@@ -115,7 +85,7 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat
GridDhtAtomicSingleUpdateRequest(
int cacheId,
UUID nodeId,
- GridCacheVersion futVer,
+ long futId,
GridCacheVersion writeVer,
CacheWriteSynchronizationMode syncMode,
@NotNull AffinityTopologyVersion topVer,
@@ -125,19 +95,17 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat
boolean keepBinary,
boolean skipStore
) {
- super(cacheId, nodeId);
- this.futVer = futVer;
- this.writeVer = writeVer;
- this.syncMode = syncMode;
- this.topVer = topVer;
- this.subjId = subjId;
- this.taskNameHash = taskNameHash;
- this.addDepInfo = addDepInfo;
-
- if (skipStore)
- setFlag(true, SKIP_STORE_FLAG_MASK);
- if (keepBinary)
- setFlag(true, KEEP_BINARY_FLAG_MASK);
+ super(cacheId,
+ nodeId,
+ futId,
+ writeVer,
+ syncMode,
+ topVer,
+ subjId,
+ taskNameHash,
+ addDepInfo,
+ keepBinary,
+ skipStore);
}
/**
@@ -148,7 +116,6 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat
* @param conflictExpireTime Conflict expire time (optional).
* @param conflictVer Conflict version (optional).
* @param addPrevVal If {@code true} adds previous value.
- * @param partId Partition.
* @param prevVal Previous value.
* @param updateCntr Update counter.
*/
@@ -159,7 +126,6 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat
long conflictExpireTime,
@Nullable GridCacheVersion conflictVer,
boolean addPrevVal,
- int partId,
@Nullable CacheObject prevVal,
long updateCntr
) {
@@ -167,11 +133,11 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat
assert ttl <= 0 : ttl;
assert conflictExpireTime <= 0 : conflictExpireTime;
assert conflictVer == null : conflictVer;
+ assert key.partition() >= 0 : key;
near(false);
this.key = key;
- this.partId = partId;
this.val = val;
if (addPrevVal)
@@ -194,6 +160,7 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat
long expireTime) {
assert entryProcessor == null;
assert ttl <= 0 : ttl;
+ assert key.partition() >= 0 : key;
near(true);
@@ -222,11 +189,6 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat
}
/** {@inheritDoc} */
- @Override public boolean skipStore() {
- return isFlag(SKIP_STORE_FLAG_MASK);
- }
-
- /** {@inheritDoc} */
@Override public KeyCacheObject key(int idx) {
assert idx == 0 : idx;
@@ -235,14 +197,11 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat
/** {@inheritDoc} */
@Override public int partition() {
- return partId;
- }
+ int p = key.partition();
- /** {@inheritDoc} */
- @Override public int partitionId(int idx) {
- assert idx == 0 : idx;
+ assert p >= 0;
- return partId;
+ return p;
}
/** {@inheritDoc} */
@@ -267,31 +226,6 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat
}
/** {@inheritDoc} */
- @Override public GridCacheVersion futureVersion() {
- return futVer;
- }
-
- /** {@inheritDoc} */
- @Override public GridCacheVersion writeVersion() {
- return writeVer;
- }
-
- /** {@inheritDoc} */
- @Override public int taskNameHash() {
- return taskNameHash;
- }
-
- /** {@inheritDoc} */
- @Override public UUID subjectId() {
- return subjId;
- }
-
- /** {@inheritDoc} */
- @Override public CacheWriteSynchronizationMode writeSynchronizationMode() {
- return syncMode;
- }
-
- /** {@inheritDoc} */
@Override @Nullable public CacheObject previousValue(int idx) {
assert idx == 0 : idx;
@@ -360,25 +294,6 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat
}
/** {@inheritDoc} */
- @Override public boolean keepBinary() {
- return isFlag(KEEP_BINARY_FLAG_MASK);
- }
-
- /**
- *
- */
- private boolean near() {
- return isFlag(NEAR_FLAG_MASK);
- }
-
- /**
- *
- */
- private void near(boolean near) {
- setFlag(near, NEAR_FLAG_MASK);
- }
-
- /** {@inheritDoc} */
@Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
super.prepareMarshal(ctx);
@@ -403,8 +318,6 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat
finishUnmarshalObject(val, cctx, ldr);
finishUnmarshalObject(prevVal, cctx, ldr);
-
- key.partition(partId);
}
/** {@inheritDoc} */
@@ -422,78 +335,30 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat
}
switch (writer.state()) {
- case 3:
- if (!writer.writeByte("flags", flags))
- return false;
-
- writer.incrementState();
-
- case 4:
- if (!writer.writeMessage("futVer", futVer))
- return false;
-
- writer.incrementState();
-
- case 5:
+ case 12:
if (!writer.writeMessage("key", key))
return false;
writer.incrementState();
- case 6:
- if (!writer.writeInt("partId", partId))
- return false;
-
- writer.incrementState();
-
- case 7:
+ case 13:
if (!writer.writeMessage("prevVal", prevVal))
return false;
writer.incrementState();
- case 8:
- if (!writer.writeUuid("subjId", subjId))
- return false;
-
- writer.incrementState();
-
- case 9:
- if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1))
- return false;
-
- writer.incrementState();
-
- case 10:
- if (!writer.writeInt("taskNameHash", taskNameHash))
- return false;
-
- writer.incrementState();
-
- case 11:
- if (!writer.writeMessage("topVer", topVer))
- return false;
-
- writer.incrementState();
-
- case 12:
+ case 14:
if (!writer.writeLong("updateCntr", updateCntr))
return false;
writer.incrementState();
- case 13:
+ case 15:
if (!writer.writeMessage("val", val))
return false;
writer.incrementState();
- case 14:
- if (!writer.writeMessage("writeVer", writeVer))
- return false;
-
- writer.incrementState();
-
}
return true;
@@ -510,23 +375,7 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat
return false;
switch (reader.state()) {
- case 3:
- flags = reader.readByte("flags");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 4:
- futVer = reader.readMessage("futVer");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 5:
+ case 12:
key = reader.readMessage("key");
if (!reader.isLastRead())
@@ -534,15 +383,7 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat
reader.incrementState();
- case 6:
- partId = reader.readInt("partId");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 7:
+ case 13:
prevVal = reader.readMessage("prevVal");
if (!reader.isLastRead())
@@ -550,43 +391,7 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat
reader.incrementState();
- case 8:
- subjId = reader.readUuid("subjId");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 9:
- byte syncModeOrd;
-
- syncModeOrd = reader.readByte("syncMode");
-
- if (!reader.isLastRead())
- return false;
-
- syncMode = CacheWriteSynchronizationMode.fromOrdinal(syncModeOrd);
-
- reader.incrementState();
-
- case 10:
- taskNameHash = reader.readInt("taskNameHash");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 11:
- topVer = reader.readMessage("topVer");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 12:
+ case 14:
updateCntr = reader.readLong("updateCntr");
if (!reader.isLastRead())
@@ -594,7 +399,7 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat
reader.incrementState();
- case 13:
+ case 15:
val = reader.readMessage("val");
if (!reader.isLastRead())
@@ -602,14 +407,6 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat
reader.incrementState();
- case 14:
- writeVer = reader.readMessage("writeVer");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
}
return reader.afterMessageRead(GridDhtAtomicSingleUpdateRequest.class);
@@ -652,27 +449,7 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 15;
- }
-
- /**
- * Sets flag mask.
- *
- * @param flag Set or clear.
- * @param mask Mask.
- */
- private void setFlag(boolean flag, int mask) {
- flags = flag ? (byte)(flags | mask) : (byte)(flags & ~mask);
- }
-
- /**
- * Reags flag mask.
- *
- * @param mask Mask to read.
- * @return Flag value.
- */
- private boolean isFlag(int mask) {
- return (flags & mask) != 0;
+ return 16;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
index 5429adc..5d5ddf0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
@@ -17,22 +17,15 @@
package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
-import java.util.ArrayList;
import java.util.Collection;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import java.util.UUID;
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.internal.util.typedef.CI2;
-import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.jetbrains.annotations.NotNull;
@@ -45,89 +38,45 @@ class GridDhtAtomicUpdateFuture extends GridDhtAtomicAbstractUpdateFuture {
/** */
private static final long serialVersionUID = 0L;
- /** Future keys. */
- private final Collection<KeyCacheObject> keys;
-
- /** Entries with readers. */
- private Map<KeyCacheObject, GridDhtCacheEntry> nearReadersEntries;
-
+ /** */
+ private int updateCntr;
/**
* @param cctx Cache context.
- * @param completionCb Callback to invoke when future is completed.
* @param writeVer Write version.
* @param updateReq Update request.
- * @param updateRes Update response.
*/
GridDhtAtomicUpdateFuture(
GridCacheContext cctx,
- CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> completionCb,
GridCacheVersion writeVer,
- GridNearAtomicAbstractUpdateRequest updateReq,
- GridNearAtomicUpdateResponse updateRes
+ GridNearAtomicAbstractUpdateRequest updateReq
) {
- super(cctx,
- completionCb,
- writeVer,
- updateReq,
- updateRes);
+ super(cctx, writeVer, updateReq);
- keys = new ArrayList<>(updateReq.size());
mappings = U.newHashMap(updateReq.size());
}
/** {@inheritDoc} */
- @Override protected void addDhtKey(KeyCacheObject key, List<ClusterNode> dhtNodes) {
- keys.add(key);
- }
-
- /** {@inheritDoc} */
- @Override protected void addNearKey(KeyCacheObject key, Collection<UUID> readers) {
- keys.add(key);
- }
-
- /** {@inheritDoc} */
- @Override protected void addNearReaderEntry(GridDhtCacheEntry entry) {
- if (nearReadersEntries == null)
- nearReadersEntries = new HashMap<>();
-
- nearReadersEntries.put(entry.key(), entry);
+ @Override protected boolean sendAllToDht() {
+ return updateCntr == updateReq.size();
}
/** {@inheritDoc} */
- @Override public void onResult(UUID nodeId, GridDhtAtomicUpdateResponse updateRes) {
- if (log.isDebugEnabled())
- log.debug("Received DHT atomic update future result [nodeId=" + nodeId + ", updateRes=" + updateRes + ']');
-
- if (updateRes.error() != null)
- this.updateRes.addFailedKeys(updateRes.failedKeys(), updateRes.error());
-
- if (!F.isEmpty(updateRes.nearEvicted())) {
- for (KeyCacheObject key : updateRes.nearEvicted()) {
- GridDhtCacheEntry entry = nearReadersEntries.get(key);
-
- try {
- entry.removeReader(nodeId, updateRes.messageId());
- }
- catch (GridCacheEntryRemovedException e) {
- if (log.isDebugEnabled())
- log.debug("Entry with evicted reader was removed [entry=" + entry + ", err=" + e + ']');
- }
- }
- }
+ @Override protected void addDhtKey(KeyCacheObject key, List<ClusterNode> dhtNodes) {
+ assert updateCntr < updateReq.size();
- registerResponse(nodeId);
+ updateCntr++;
}
/** {@inheritDoc} */
- @Override protected void addFailedKeys(GridNearAtomicUpdateResponse updateRes, Throwable err) {
- for (KeyCacheObject key : keys)
- updateRes.addFailedKey(key, err);
+ @Override protected void addNearKey(KeyCacheObject key, Collection<UUID> readers) {
+ // No-op.
}
/** {@inheritDoc} */
- @Override protected GridDhtAtomicAbstractUpdateRequest createRequest(ClusterNode node,
- GridCacheVersion futVer,
+ @Override protected GridDhtAtomicAbstractUpdateRequest createRequest(
+ UUID nodeId,
+ long futId,
GridCacheVersion writeVer,
CacheWriteSynchronizationMode syncMode,
@NotNull AffinityTopologyVersion topVer,
@@ -137,22 +86,22 @@ class GridDhtAtomicUpdateFuture extends GridDhtAtomicAbstractUpdateFuture {
) {
return new GridDhtAtomicUpdateRequest(
cctx.cacheId(),
- node.id(),
- futVer,
+ nodeId,
+ futId,
writeVer,
syncMode,
topVer,
- false,
updateReq.subjectId(),
updateReq.taskNameHash(),
null,
cctx.deploymentEnabled(),
updateReq.keepBinary(),
- updateReq.skipStore());
+ updateReq.skipStore(),
+ false);
}
/** {@inheritDoc} */
@Override public String toString() {
- return S.toString(GridDhtAtomicUpdateFuture.class, this);
+ return S.toString(GridDhtAtomicUpdateFuture.class, this, "super", super.toString());
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
index 7144963..6b8af8d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
@@ -44,8 +44,6 @@ import org.apache.ignite.plugin.extensions.communication.MessageWriter;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
-import static org.apache.ignite.internal.processors.cache.GridCacheUtils.SKIP_STORE_FLAG_MASK;
-
/**
* Lite dht cache backup update request.
*/
@@ -53,15 +51,6 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
/** */
private static final long serialVersionUID = 0L;
- /** Future version. */
- private GridCacheVersion futVer;
-
- /** Write version. */
- private GridCacheVersion writeVer;
-
- /** Topology version. */
- private AffinityTopologyVersion topVer;
-
/** Keys to update. */
@GridToStringInclude
@GridDirectCollection(KeyCacheObject.class)
@@ -93,9 +82,6 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
/** Near expire times. */
private GridLongList nearExpireTimes;
- /** Write synchronization mode. */
- private CacheWriteSynchronizationMode syncMode;
-
/** Near cache keys to update. */
@GridToStringInclude
@GridDirectCollection(KeyCacheObject.class)
@@ -132,25 +118,9 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
/** Entry processor arguments bytes. */
private byte[][] invokeArgsBytes;
- /** Subject ID. */
- private UUID subjId;
-
- /** Task name hash. */
- private int taskNameHash;
-
/** Partition. */
private GridLongList updateCntrs;
- /** */
- @GridDirectTransient
- private List<Integer> partIds;
-
- /** Keep binary flag. */
- private boolean keepBinary;
-
- /** Additional flags. */
- private byte flags;
-
/**
* Empty constructor required by {@link Externalizable}.
*/
@@ -163,7 +133,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
*
* @param cacheId Cache ID.
* @param nodeId Node ID.
- * @param futVer Future version.
+ * @param futId Future ID.
* @param writeVer Write version for cache values.
* @param invokeArgs Optional arguments for entry processor.
* @param syncMode Cache write synchronization mode.
@@ -176,38 +146,36 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
public GridDhtAtomicUpdateRequest(
int cacheId,
UUID nodeId,
- GridCacheVersion futVer,
+ long futId,
GridCacheVersion writeVer,
CacheWriteSynchronizationMode syncMode,
@NotNull AffinityTopologyVersion topVer,
- boolean forceTransformBackups,
UUID subjId,
int taskNameHash,
Object[] invokeArgs,
boolean addDepInfo,
boolean keepBinary,
- boolean skipStore
+ boolean skipStore,
+ boolean forceTransformBackups
) {
- super(cacheId, nodeId);
-
- this.futVer = futVer;
- this.writeVer = writeVer;
- this.syncMode = syncMode;
- this.topVer = topVer;
- this.forceTransformBackups = forceTransformBackups;
- this.subjId = subjId;
- this.taskNameHash = taskNameHash;
+ super(cacheId,
+ nodeId,
+ futId,
+ writeVer,
+ syncMode,
+ topVer,
+ subjId,
+ taskNameHash,
+ addDepInfo,
+ keepBinary,
+ skipStore);
assert invokeArgs == null || forceTransformBackups;
+ this.forceTransformBackups = forceTransformBackups;
this.invokeArgs = invokeArgs;
- this.addDepInfo = addDepInfo;
- this.keepBinary = keepBinary;
-
- setFlag(skipStore, SKIP_STORE_FLAG_MASK);
keys = new ArrayList<>();
- partIds = new ArrayList<>();
if (forceTransformBackups) {
entryProcessors = new ArrayList<>();
@@ -225,13 +193,12 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
long conflictExpireTime,
@Nullable GridCacheVersion conflictVer,
boolean addPrevVal,
- int partId,
@Nullable CacheObject prevVal,
long updateCntr
) {
- keys.add(key);
+ assert key.partition() >= 0 : key;
- partIds.add(partId);
+ keys.add(key);
if (forceTransformBackups) {
assert entryProcessor != null;
@@ -298,6 +265,8 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
EntryProcessor<Object, Object, Object> entryProcessor,
long ttl,
long expireTime) {
+ assert key.partition() >= 0 : key;
+
if (nearKeys == null) {
nearKeys = new ArrayList<>();
@@ -350,31 +319,6 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
}
/** {@inheritDoc} */
- @Override public UUID subjectId() {
- return subjId;
- }
-
- /** {@inheritDoc} */
- @Override public int taskNameHash() {
- return taskNameHash;
- }
-
- /** {@inheritDoc} */
- @Override public GridCacheVersion futureVersion() {
- return futVer;
- }
-
- /** {@inheritDoc} */
- @Override public GridCacheVersion writeVersion() {
- return writeVer;
- }
-
- /** {@inheritDoc} */
- @Override public CacheWriteSynchronizationMode writeSynchronizationMode() {
- return syncMode;
- }
-
- /** {@inheritDoc} */
@Override public AffinityTopologyVersion topologyVersion() {
return topVer;
}
@@ -400,11 +344,6 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
}
/** {@inheritDoc} */
- @Override public int partitionId(int idx) {
- return partIds.get(idx);
- }
-
- /** {@inheritDoc} */
@Override public Long updateCounter(int updCntr) {
if (updateCntrs != null && updCntr < updateCntrs.size())
return updateCntrs.get(updCntr);
@@ -486,7 +425,13 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
/** {@inheritDoc} */
@Override public int partition() {
- return partIds != null && !partIds.isEmpty() ? partIds.get(0) : -1;
+ assert !F.isEmpty(keys) || !F.isEmpty(nearKeys);
+
+ int p = keys.size() > 0 ? keys.get(0).partition() : nearKeys.get(0).partition();
+
+ assert p >= 0;
+
+ return p;
}
/** {@inheritDoc} */
@@ -512,16 +457,6 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
}
/** {@inheritDoc} */
- @Override public boolean keepBinary() {
- return keepBinary;
- }
-
- /** {@inheritDoc} */
- @Override public boolean skipStore() {
- return isFlag(SKIP_STORE_FLAG_MASK);
- }
-
- /** {@inheritDoc} */
@Override @Nullable public Object[] invokeArguments() {
return invokeArgs;
}
@@ -584,13 +519,6 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
if (nearEntryProcessors == null)
nearEntryProcessors = unmarshalCollection(nearEntryProcessorsBytes, ctx, ldr);
}
-
- if (partIds != null && !partIds.isEmpty()) {
- assert partIds.size() == keys.size();
-
- for (int i = 0; i < keys.size(); i++)
- keys.get(i).partition(partIds.get(i));
- }
}
/** {@inheritDoc} */
@@ -608,144 +536,96 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
}
switch (writer.state()) {
- case 3:
+ case 12:
if (!writer.writeMessage("conflictExpireTimes", conflictExpireTimes))
return false;
writer.incrementState();
- case 4:
+ case 13:
if (!writer.writeCollection("conflictVers", conflictVers, MessageCollectionItemType.MSG))
return false;
writer.incrementState();
- case 5:
+ case 14:
if (!writer.writeCollection("entryProcessorsBytes", entryProcessorsBytes, MessageCollectionItemType.BYTE_ARR))
return false;
writer.incrementState();
- case 6:
- if (!writer.writeByte("flags", flags))
- return false;
-
- writer.incrementState();
-
- case 7:
+ case 15:
if (!writer.writeBoolean("forceTransformBackups", forceTransformBackups))
return false;
writer.incrementState();
- case 8:
- if (!writer.writeMessage("futVer", futVer))
- return false;
-
- writer.incrementState();
-
- case 9:
+ case 16:
if (!writer.writeObjectArray("invokeArgsBytes", invokeArgsBytes, MessageCollectionItemType.BYTE_ARR))
return false;
writer.incrementState();
- case 10:
- if (!writer.writeBoolean("keepBinary", keepBinary))
- return false;
-
- writer.incrementState();
-
- case 11:
+ case 17:
if (!writer.writeCollection("keys", keys, MessageCollectionItemType.MSG))
return false;
writer.incrementState();
- case 12:
+ case 18:
if (!writer.writeCollection("nearEntryProcessorsBytes", nearEntryProcessorsBytes, MessageCollectionItemType.BYTE_ARR))
return false;
writer.incrementState();
- case 13:
+ case 19:
if (!writer.writeMessage("nearExpireTimes", nearExpireTimes))
return false;
writer.incrementState();
- case 14:
+ case 20:
if (!writer.writeCollection("nearKeys", nearKeys, MessageCollectionItemType.MSG))
return false;
writer.incrementState();
- case 15:
+ case 21:
if (!writer.writeMessage("nearTtls", nearTtls))
return false;
writer.incrementState();
- case 16:
+ case 22:
if (!writer.writeCollection("nearVals", nearVals, MessageCollectionItemType.MSG))
return false;
writer.incrementState();
- case 17:
+ case 23:
if (!writer.writeCollection("prevVals", prevVals, MessageCollectionItemType.MSG))
return false;
writer.incrementState();
- case 18:
- if (!writer.writeUuid("subjId", subjId))
- return false;
-
- writer.incrementState();
-
- case 19:
- if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1))
- return false;
-
- writer.incrementState();
-
- case 20:
- if (!writer.writeInt("taskNameHash", taskNameHash))
- return false;
-
- writer.incrementState();
-
- case 21:
- if (!writer.writeMessage("topVer", topVer))
- return false;
-
- writer.incrementState();
-
- case 22:
+ case 24:
if (!writer.writeMessage("ttls", ttls))
return false;
writer.incrementState();
- case 23:
+ case 25:
if (!writer.writeMessage("updateCntrs", updateCntrs))
return false;
writer.incrementState();
- case 24:
+ case 26:
if (!writer.writeCollection("vals", vals, MessageCollectionItemType.MSG))
return false;
writer.incrementState();
- case 25:
- if (!writer.writeMessage("writeVer", writeVer))
- return false;
-
- writer.incrementState();
-
}
return true;
@@ -762,7 +642,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
return false;
switch (reader.state()) {
- case 3:
+ case 12:
conflictExpireTimes = reader.readMessage("conflictExpireTimes");
if (!reader.isLastRead())
@@ -770,7 +650,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
reader.incrementState();
- case 4:
+ case 13:
conflictVers = reader.readCollection("conflictVers", MessageCollectionItemType.MSG);
if (!reader.isLastRead())
@@ -778,7 +658,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
reader.incrementState();
- case 5:
+ case 14:
entryProcessorsBytes = reader.readCollection("entryProcessorsBytes", MessageCollectionItemType.BYTE_ARR);
if (!reader.isLastRead())
@@ -786,15 +666,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
reader.incrementState();
- case 6:
- flags = reader.readByte("flags");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 7:
+ case 15:
forceTransformBackups = reader.readBoolean("forceTransformBackups");
if (!reader.isLastRead())
@@ -802,15 +674,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
reader.incrementState();
- case 8:
- futVer = reader.readMessage("futVer");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 9:
+ case 16:
invokeArgsBytes = reader.readObjectArray("invokeArgsBytes", MessageCollectionItemType.BYTE_ARR, byte[].class);
if (!reader.isLastRead())
@@ -818,15 +682,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
reader.incrementState();
- case 10:
- keepBinary = reader.readBoolean("keepBinary");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 11:
+ case 17:
keys = reader.readCollection("keys", MessageCollectionItemType.MSG);
if (!reader.isLastRead())
@@ -834,7 +690,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
reader.incrementState();
- case 12:
+ case 18:
nearEntryProcessorsBytes = reader.readCollection("nearEntryProcessorsBytes", MessageCollectionItemType.BYTE_ARR);
if (!reader.isLastRead())
@@ -842,7 +698,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
reader.incrementState();
- case 13:
+ case 19:
nearExpireTimes = reader.readMessage("nearExpireTimes");
if (!reader.isLastRead())
@@ -850,7 +706,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
reader.incrementState();
- case 14:
+ case 20:
nearKeys = reader.readCollection("nearKeys", MessageCollectionItemType.MSG);
if (!reader.isLastRead())
@@ -858,7 +714,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
reader.incrementState();
- case 15:
+ case 21:
nearTtls = reader.readMessage("nearTtls");
if (!reader.isLastRead())
@@ -866,7 +722,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
reader.incrementState();
- case 16:
+ case 22:
nearVals = reader.readCollection("nearVals", MessageCollectionItemType.MSG);
if (!reader.isLastRead())
@@ -874,7 +730,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
reader.incrementState();
- case 17:
+ case 23:
prevVals = reader.readCollection("prevVals", MessageCollectionItemType.MSG);
if (!reader.isLastRead())
@@ -882,43 +738,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
reader.incrementState();
- case 18:
- subjId = reader.readUuid("subjId");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 19:
- byte syncModeOrd;
-
- syncModeOrd = reader.readByte("syncMode");
-
- if (!reader.isLastRead())
- return false;
-
- syncMode = CacheWriteSynchronizationMode.fromOrdinal(syncModeOrd);
-
- reader.incrementState();
-
- case 20:
- taskNameHash = reader.readInt("taskNameHash");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 21:
- topVer = reader.readMessage("topVer");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 22:
+ case 24:
ttls = reader.readMessage("ttls");
if (!reader.isLastRead())
@@ -926,7 +746,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
reader.incrementState();
- case 23:
+ case 25:
updateCntrs = reader.readMessage("updateCntrs");
if (!reader.isLastRead())
@@ -934,7 +754,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
reader.incrementState();
- case 24:
+ case 26:
vals = reader.readCollection("vals", MessageCollectionItemType.MSG);
if (!reader.isLastRead())
@@ -942,14 +762,6 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
reader.incrementState();
- case 25:
- writeVer = reader.readMessage("writeVer");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
}
return reader.afterMessageRead(GridDhtAtomicUpdateRequest.class);
@@ -968,30 +780,9 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 26;
- }
-
- /**
- * Sets flag mask.
- *
- * @param flag Set or clear.
- * @param mask Mask.
- */
- private void setFlag(boolean flag, int mask) {
- flags = flag ? (byte)(flags | mask) : (byte)(flags & ~mask);
+ return 27;
}
- /**
- * Reags flag mask.
- *
- * @param mask Mask to read.
- * @return Flag value.
- */
- private boolean isFlag(int mask) {
- return (flags & mask) != 0;
- }
-
-
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(GridDhtAtomicUpdateRequest.class, this, "super", super.toString());
http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java
index c3d3ca9..ab7aa6c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java
@@ -25,16 +25,13 @@ import java.util.List;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.GridDirectCollection;
-import org.apache.ignite.internal.GridDirectTransient;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
import org.apache.ignite.internal.processors.cache.GridCacheMessage;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
-import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
@@ -50,19 +47,10 @@ public class GridDhtAtomicUpdateResponse extends GridCacheMessage implements Gri
public static final int CACHE_MSG_IDX = nextIndexId();
/** Future version. */
- private GridCacheVersion futVer;
+ private long futId;
- /** Failed keys. */
- @GridToStringInclude
- @GridDirectCollection(KeyCacheObject.class)
- private List<KeyCacheObject> failedKeys;
-
- /** Update error. */
- @GridDirectTransient
- private IgniteCheckedException err;
-
- /** Serialized update error. */
- private byte[] errBytes;
+ /** */
+ private UpdateErrors errs;
/** Evicted readers. */
@GridToStringInclude
@@ -70,7 +58,7 @@ public class GridDhtAtomicUpdateResponse extends GridCacheMessage implements Gri
private List<KeyCacheObject> nearEvicted;
/** */
- private int partId = -1;
+ private int partId;
/**
* Empty constructor required by {@link Externalizable}.
@@ -81,12 +69,14 @@ public class GridDhtAtomicUpdateResponse extends GridCacheMessage implements Gri
/**
* @param cacheId Cache ID.
- * @param futVer Future version.
+ * @param partId Partition.
+ * @param futId Future ID.
* @param addDepInfo Deployment info.
*/
- public GridDhtAtomicUpdateResponse(int cacheId, GridCacheVersion futVer, boolean addDepInfo) {
+ public GridDhtAtomicUpdateResponse(int cacheId, int partId, long futId, boolean addDepInfo) {
this.cacheId = cacheId;
- this.futVer = futVer;
+ this.partId = partId;
+ this.futId = futId;
this.addDepInfo = addDepInfo;
}
@@ -98,8 +88,8 @@ public class GridDhtAtomicUpdateResponse extends GridCacheMessage implements Gri
/**
* @return Future version.
*/
- public GridCacheVersion futureVersion() {
- return futVer;
+ public long futureId() {
+ return futId;
}
/**
@@ -108,63 +98,29 @@ public class GridDhtAtomicUpdateResponse extends GridCacheMessage implements Gri
* @param err Error.
*/
public void onError(IgniteCheckedException err){
- this.err = err;
+ if (errs == null)
+ errs = new UpdateErrors();
+
+ errs.onError(err);
}
/** {@inheritDoc} */
@Override public IgniteCheckedException error() {
- return err;
- }
-
- /**
- * @return Failed keys.
- */
- public Collection<KeyCacheObject> failedKeys() {
- return failedKeys;
- }
-
- /**
- * Adds key to collection of failed keys.
- *
- * @param key Key to add.
- * @param e Error cause.
- */
- public void addFailedKey(KeyCacheObject key, Throwable e) {
- if (failedKeys == null)
- failedKeys = new ArrayList<>();
-
- failedKeys.add(key);
-
- if (err == null)
- err = new IgniteCheckedException("Failed to update keys on primary node.");
-
- err.addSuppressed(e);
+ return errs != null ? errs.error() : null;
}
/**
* @return Evicted readers.
*/
- public Collection<KeyCacheObject> nearEvicted() {
+ Collection<KeyCacheObject> nearEvicted() {
return nearEvicted;
}
/**
- * Adds near evicted key..
- *
- * @param key Evicted key.
- */
- public void addNearEvicted(KeyCacheObject key) {
- if (nearEvicted == null)
- nearEvicted = new ArrayList<>();
-
- nearEvicted.add(key);
- }
-
- /**
- * @param partId Partition ID to set.
+ * @param nearEvicted Evicted near cache keys.
*/
- public void partition(int partId) {
- this.partId = partId;
+ void nearEvicted(List<KeyCacheObject> nearEvicted) {
+ this.nearEvicted = nearEvicted;
}
/** {@inheritDoc} */
@@ -178,12 +134,10 @@ public class GridDhtAtomicUpdateResponse extends GridCacheMessage implements Gri
GridCacheContext cctx = ctx.cacheContext(cacheId);
- prepareMarshalCacheObjects(failedKeys, cctx);
-
prepareMarshalCacheObjects(nearEvicted, cctx);
- if (errBytes == null)
- errBytes = U.marshal(ctx, err);
+ if (errs != null)
+ errs.prepareMarshal(this, cctx);
}
/** {@inheritDoc} */
@@ -192,12 +146,10 @@ public class GridDhtAtomicUpdateResponse extends GridCacheMessage implements Gri
GridCacheContext cctx = ctx.cacheContext(cacheId);
- finishUnmarshalCacheObjects(failedKeys, cctx, ldr);
-
finishUnmarshalCacheObjects(nearEvicted, cctx, ldr);
- if (errBytes != null && err == null)
- err = U.unmarshal(ctx, errBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
+ if (errs != null)
+ errs.finishUnmarshal(this, cctx, ldr);
}
/** {@inheritDoc} */
@@ -226,30 +178,24 @@ public class GridDhtAtomicUpdateResponse extends GridCacheMessage implements Gri
switch (writer.state()) {
case 3:
- if (!writer.writeByteArray("errBytes", errBytes))
+ if (!writer.writeMessage("errs", errs))
return false;
writer.incrementState();
case 4:
- if (!writer.writeCollection("failedKeys", failedKeys, MessageCollectionItemType.MSG))
+ if (!writer.writeLong("futId", futId))
return false;
writer.incrementState();
case 5:
- if (!writer.writeMessage("futVer", futVer))
- return false;
-
- writer.incrementState();
-
- case 6:
if (!writer.writeCollection("nearEvicted", nearEvicted, MessageCollectionItemType.MSG))
return false;
writer.incrementState();
- case 7:
+ case 6:
if (!writer.writeInt("partId", partId))
return false;
@@ -272,7 +218,7 @@ public class GridDhtAtomicUpdateResponse extends GridCacheMessage implements Gri
switch (reader.state()) {
case 3:
- errBytes = reader.readByteArray("errBytes");
+ errs = reader.readMessage("errs");
if (!reader.isLastRead())
return false;
@@ -280,7 +226,7 @@ public class GridDhtAtomicUpdateResponse extends GridCacheMessage implements Gri
reader.incrementState();
case 4:
- failedKeys = reader.readCollection("failedKeys", MessageCollectionItemType.MSG);
+ futId = reader.readLong("futId");
if (!reader.isLastRead())
return false;
@@ -288,14 +234,6 @@ public class GridDhtAtomicUpdateResponse extends GridCacheMessage implements Gri
reader.incrementState();
case 5:
- futVer = reader.readMessage("futVer");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 6:
nearEvicted = reader.readCollection("nearEvicted", MessageCollectionItemType.MSG);
if (!reader.isLastRead())
@@ -303,7 +241,7 @@ public class GridDhtAtomicUpdateResponse extends GridCacheMessage implements Gri
reader.incrementState();
- case 7:
+ case 6:
partId = reader.readInt("partId");
if (!reader.isLastRead())
@@ -323,7 +261,7 @@ public class GridDhtAtomicUpdateResponse extends GridCacheMessage implements Gri
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 8;
+ return 7;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractSingleUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractSingleUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractSingleUpdateRequest.java
index 61deeee..6811236 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractSingleUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractSingleUpdateRequest.java
@@ -18,19 +18,13 @@
package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
import java.io.Externalizable;
-import java.nio.ByteBuffer;
import java.util.UUID;
import javax.cache.expiry.ExpiryPolicy;
-import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.internal.GridDirectTransient;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
import org.apache.ignite.internal.processors.cache.GridCacheOperation;
-import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
-import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@@ -44,59 +38,6 @@ public abstract class GridNearAtomicAbstractSingleUpdateRequest extends GridNear
/** */
private static final CacheEntryPredicate[] NO_FILTER = new CacheEntryPredicate[0];
- /** Fast map flag mask. */
- private static final int FAST_MAP_FLAG_MASK = 0x1;
-
- /** Flag indicating whether request contains primary keys. */
- private static final int HAS_PRIMARY_FLAG_MASK = 0x2;
-
- /** Topology locked flag. Set if atomic update is performed inside TX or explicit lock. */
- private static final int TOP_LOCKED_FLAG_MASK = 0x4;
-
- /** Skip write-through to a persistent storage. */
- private static final int SKIP_STORE_FLAG_MASK = 0x8;
-
- /** */
- private static final int CLIENT_REQ_FLAG_MASK = 0x10;
-
- /** Keep binary flag. */
- private static final int KEEP_BINARY_FLAG_MASK = 0x20;
-
- /** Return value flag. */
- private static final int RET_VAL_FLAG_MASK = 0x40;
-
- /** Target node ID. */
- @GridDirectTransient
- protected UUID nodeId;
-
- /** Future version. */
- protected GridCacheVersion futVer;
-
- /** Update version. Set to non-null if fastMap is {@code true}. */
- private GridCacheVersion updateVer;
-
- /** Topology version. */
- protected AffinityTopologyVersion topVer;
-
- /** Write synchronization mode. */
- protected CacheWriteSynchronizationMode syncMode;
-
- /** Update operation. */
- protected GridCacheOperation op;
-
- /** Subject ID. */
- protected UUID subjId;
-
- /** Task name hash. */
- protected int taskNameHash;
-
- /** */
- @GridDirectTransient
- private GridNearAtomicUpdateResponse res;
-
- /** Compressed boolean flags. */
- protected byte flags;
-
/**
* Empty constructor required by {@link Externalizable}.
*/
@@ -109,9 +50,7 @@ public abstract class GridNearAtomicAbstractSingleUpdateRequest extends GridNear
*
* @param cacheId Cache ID.
* @param nodeId Node ID.
- * @param futVer Future version.
- * @param fastMap Fast map scheme flag.
- * @param updateVer Update version set if fast map is performed.
+ * @param futId Future ID.
* @param topVer Topology version.
* @param topLocked Topology locked flag.
* @param syncMode Synchronization mode.
@@ -121,15 +60,12 @@ public abstract class GridNearAtomicAbstractSingleUpdateRequest extends GridNear
* @param taskNameHash Task name hash code.
* @param skipStore Skip write-through to a persistent storage.
* @param keepBinary Keep binary flag.
- * @param clientReq Client node request flag.
* @param addDepInfo Deployment info flag.
*/
protected GridNearAtomicAbstractSingleUpdateRequest(
int cacheId,
UUID nodeId,
- GridCacheVersion futVer,
- boolean fastMap,
- @Nullable GridCacheVersion updateVer,
+ long futId,
@NotNull AffinityTopologyVersion topVer,
boolean topLocked,
CacheWriteSynchronizationMode syncMode,
@@ -137,91 +73,25 @@ public abstract class GridNearAtomicAbstractSingleUpdateRequest extends GridNear
boolean retval,
@Nullable UUID subjId,
int taskNameHash,
+ boolean mappingKnown,
boolean skipStore,
boolean keepBinary,
- boolean clientReq,
boolean addDepInfo
) {
- assert futVer != null;
-
- this.cacheId = cacheId;
- this.nodeId = nodeId;
- this.futVer = futVer;
- this.updateVer = updateVer;
- this.topVer = topVer;
- this.syncMode = syncMode;
- this.op = op;
- this.subjId = subjId;
- this.taskNameHash = taskNameHash;
- this.addDepInfo = addDepInfo;
-
- fastMap(fastMap);
- topologyLocked(topLocked);
- returnValue(retval);
- skipStore(skipStore);
- keepBinary(keepBinary);
- clientRequest(clientReq);
- }
-
- /** {@inheritDoc} */
- @Override public int lookupIndex() {
- return CACHE_MSG_IDX;
- }
-
- /**
- * @return Mapped node ID.
- */
- @Override public UUID nodeId() {
- return nodeId;
- }
-
- /**
- * @param nodeId Node ID.
- */
- @Override public void nodeId(UUID nodeId) {
- this.nodeId = nodeId;
- }
-
- /**
- * @return Subject ID.
- */
- @Override public UUID subjectId() {
- return subjId;
- }
-
- /**
- * @return Task name hash.
- */
- @Override public int taskNameHash() {
- return taskNameHash;
- }
-
- /**
- * @return Future version.
- */
- @Override public GridCacheVersion futureVersion() {
- return futVer;
- }
-
- /**
- * @return Update version for fast-map request.
- */
- @Override public GridCacheVersion updateVersion() {
- return updateVer;
- }
-
- /**
- * @return Topology version.
- */
- @Override public AffinityTopologyVersion topologyVersion() {
- return topVer;
- }
-
- /**
- * @return Cache write synchronization mode.
- */
- @Override public CacheWriteSynchronizationMode writeSynchronizationMode() {
- return syncMode;
+ super(cacheId,
+ nodeId,
+ futId,
+ topVer,
+ topLocked,
+ syncMode,
+ op,
+ retval,
+ subjId,
+ taskNameHash,
+ mappingKnown,
+ skipStore,
+ keepBinary,
+ addDepInfo);
}
/**
@@ -232,331 +102,14 @@ public abstract class GridNearAtomicAbstractSingleUpdateRequest extends GridNear
}
/**
- * @return Update operation.
- */
- @Override public GridCacheOperation operation() {
- return op;
- }
-
- /**
* @return Optional arguments for entry processor.
*/
@Override @Nullable public Object[] invokeArguments() {
return null;
}
- /**
- * @param res Response.
- * @return {@code True} if current response was {@code null}.
- */
- @Override public boolean onResponse(GridNearAtomicUpdateResponse res) {
- if (this.res == null) {
- this.res = res;
-
- return true;
- }
-
- return false;
- }
-
- /**
- * @return Response.
- */
- @Override @Nullable public GridNearAtomicUpdateResponse response() {
- return res;
- }
-
- /** {@inheritDoc} */
- @Override public boolean addDeploymentInfo() {
- return addDepInfo;
- }
-
- /** {@inheritDoc} */
- @Override public IgniteLogger messageLogger(GridCacheSharedContext ctx) {
- return ctx.atomicMessageLogger();
- }
-
- /**
- * @return Flag indicating whether this is fast-map udpate.
- */
- @Override public boolean fastMap() {
- return isFlag(FAST_MAP_FLAG_MASK);
- }
-
- /**
- * Sets fastMap flag value.
- */
- public void fastMap(boolean val) {
- setFlag(val, FAST_MAP_FLAG_MASK);
- }
-
- /**
- * @return Topology locked flag.
- */
- @Override public boolean topologyLocked() {
- return isFlag(TOP_LOCKED_FLAG_MASK);
- }
-
- /**
- * Sets topologyLocked flag value.
- */
- public void topologyLocked(boolean val) {
- setFlag(val, TOP_LOCKED_FLAG_MASK);
- }
-
- /**
- * @return {@code True} if request sent from client node.
- */
- @Override public boolean clientRequest() {
- return isFlag(CLIENT_REQ_FLAG_MASK);
- }
-
- /**
- * Sets clientRequest flag value.
- */
- public void clientRequest(boolean val) {
- setFlag(val, CLIENT_REQ_FLAG_MASK);
- }
-
- /**
- * @return Return value flag.
- */
- @Override public boolean returnValue() {
- return isFlag(RET_VAL_FLAG_MASK);
- }
-
- /**
- * Sets returnValue flag value.
- */
- public void returnValue(boolean val) {
- setFlag(val, RET_VAL_FLAG_MASK);
- }
-
- /**
- * @return Skip write-through to a persistent storage.
- */
- @Override public boolean skipStore() {
- return isFlag(SKIP_STORE_FLAG_MASK);
- }
-
- /**
- * Sets skipStore flag value.
- */
- public void skipStore(boolean val) {
- setFlag(val, SKIP_STORE_FLAG_MASK);
- }
-
- /**
- * @return Keep binary flag.
- */
- @Override public boolean keepBinary() {
- return isFlag(KEEP_BINARY_FLAG_MASK);
- }
-
- /**
- * Sets keepBinary flag value.
- */
- public void keepBinary(boolean val) {
- setFlag(val, KEEP_BINARY_FLAG_MASK);
- }
-
- /**
- * @return Flag indicating whether this request contains primary keys.
- */
- @Override public boolean hasPrimary() {
- return isFlag(HAS_PRIMARY_FLAG_MASK);
- }
-
- /**
- * Sets hasPrimary flag value.
- */
- public void hasPrimary(boolean val) {
- setFlag(val, HAS_PRIMARY_FLAG_MASK);
- }
-
/** {@inheritDoc} */
@Nullable @Override public CacheEntryPredicate[] filter() {
return NO_FILTER;
}
-
- /**
- * Sets flag mask.
- *
- * @param flag Set or clear.
- * @param mask Mask.
- */
- private void setFlag(boolean flag, int mask) {
- flags = flag ? (byte)(flags | mask) : (byte)(flags & ~mask);
- }
-
- /**
- * Reads flag mask.
- *
- * @param mask Mask to read.
- * @return Flag value.
- */
- private boolean isFlag(int mask) {
- return (flags & mask) != 0;
- }
-
- /** {@inheritDoc} */
- @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
- writer.setBuffer(buf);
-
- if (!super.writeTo(buf, writer))
- return false;
-
- if (!writer.isHeaderWritten()) {
- if (!writer.writeHeader(directType(), fieldsCount()))
- return false;
-
- writer.onHeaderWritten();
- }
-
- switch (writer.state()) {
- case 3:
- if (!writer.writeByte("flags", flags))
- return false;
-
- writer.incrementState();
-
- case 4:
- if (!writer.writeMessage("futVer", futVer))
- return false;
-
- writer.incrementState();
-
- case 5:
- if (!writer.writeByte("op", op != null ? (byte)op.ordinal() : -1))
- return false;
-
- writer.incrementState();
-
- case 6:
- if (!writer.writeUuid("subjId", subjId))
- return false;
-
- writer.incrementState();
-
- case 7:
- if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1))
- return false;
-
- writer.incrementState();
-
- case 8:
- if (!writer.writeInt("taskNameHash", taskNameHash))
- return false;
-
- writer.incrementState();
-
- case 9:
- if (!writer.writeMessage("topVer", topVer))
- return false;
-
- writer.incrementState();
-
- case 10:
- if (!writer.writeMessage("updateVer", updateVer))
- return false;
-
- writer.incrementState();
-
- }
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
- reader.setBuffer(buf);
-
- if (!reader.beforeMessageRead())
- return false;
-
- if (!super.readFrom(buf, reader))
- return false;
-
- switch (reader.state()) {
- case 3:
- flags = reader.readByte("flags");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 4:
- futVer = reader.readMessage("futVer");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 5:
- byte opOrd;
-
- opOrd = reader.readByte("op");
-
- if (!reader.isLastRead())
- return false;
-
- op = GridCacheOperation.fromOrdinal(opOrd);
-
- reader.incrementState();
-
- case 6:
- subjId = reader.readUuid("subjId");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 7:
- byte syncModeOrd;
-
- syncModeOrd = reader.readByte("syncMode");
-
- if (!reader.isLastRead())
- return false;
-
- syncMode = CacheWriteSynchronizationMode.fromOrdinal(syncModeOrd);
-
- reader.incrementState();
-
- case 8:
- taskNameHash = reader.readInt("taskNameHash");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 9:
- topVer = reader.readMessage("topVer");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 10:
- updateVer = reader.readMessage("updateVer");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- }
-
- return reader.afterMessageRead(GridNearAtomicAbstractSingleUpdateRequest.class);
- }
-
- /** {@inheritDoc} */
- @Override public byte fieldsCount() {
- return 11;
- }
}