You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/02/05 08:04:13 UTC
[1/2] ignite git commit: IGNITE-2523: Renamings.
Repository: ignite
Updated Branches:
refs/heads/ignite-2523 e83408060 -> 1491c1f49
http://git-wip-us.apache.org/repos/asf/ignite/blob/1491c1f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
index 08b29be..960add7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
@@ -17,960 +17,183 @@
package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
-import java.io.Externalizable;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.UUID;
-import javax.cache.expiry.ExpiryPolicy;
-import javax.cache.processor.EntryProcessor;
-import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
-import org.apache.ignite.internal.GridDirectCollection;
-import org.apache.ignite.internal.GridDirectTransient;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
import org.apache.ignite.internal.processors.cache.CacheObject;
-import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
-import org.apache.ignite.internal.processors.cache.GridCacheMessage;
import org.apache.ignite.internal.processors.cache.GridCacheOperation;
-import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
-import org.apache.ignite.internal.processors.cache.distributed.IgniteExternalizableExpiryPolicy;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.internal.util.GridLongList;
-import org.apache.ignite.internal.util.tostring.GridToStringInclude;
-import org.apache.ignite.internal.util.typedef.internal.CU;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
-import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
-import static org.apache.ignite.internal.processors.cache.GridCacheOperation.DELETE;
-import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
-import static org.apache.ignite.internal.processors.cache.GridCacheOperation.UPDATE;
+import javax.cache.expiry.ExpiryPolicy;
+import javax.cache.processor.EntryProcessor;
+import java.util.List;
+import java.util.UUID;
/**
- * Lite DHT cache update request sent from near node to primary node.
+ * Base interface for near atomic update requests.
*/
-public class GridNearAtomicUpdateRequest extends GridCacheMessage
- implements GridNearAtomicUpdateRequestBase, GridCacheDeployable {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Message index. */
- public static final int CACHE_MSG_IDX = nextIndexId();
-
- /** Target node ID. */
- @GridDirectTransient
- private UUID nodeId;
-
- /** Future version. */
- private GridCacheVersion futVer;
-
- /** Fast map flag. */
- private boolean fastMap;
-
- /** Update version. Set to non-null if fastMap is {@code true}. */
- private GridCacheVersion updateVer;
-
- /** Topology version. */
- private AffinityTopologyVersion topVer;
-
- /** Topology locked flag. Set if atomic update is performed inside TX or explicit lock. */
- private boolean topLocked;
-
- /** Write synchronization mode. */
- private CacheWriteSynchronizationMode syncMode;
-
- /** Update operation. */
- private GridCacheOperation op;
-
- /** Keys to update. */
- @GridToStringInclude
- @GridDirectCollection(KeyCacheObject.class)
- private List<KeyCacheObject> keys;
-
- /** Values to update. */
- @GridDirectCollection(CacheObject.class)
- private List<CacheObject> vals;
-
- /** Entry processors. */
- @GridDirectTransient
- private List<EntryProcessor<Object, Object, Object>> entryProcessors;
-
- /** Entry processors bytes. */
- @GridDirectCollection(byte[].class)
- private List<byte[]> entryProcessorsBytes;
-
- /** Optional arguments for entry processor. */
- @GridDirectTransient
- private Object[] invokeArgs;
-
- /** Entry processor arguments bytes. */
- private byte[][] invokeArgsBytes;
-
- /** Conflict versions. */
- @GridDirectCollection(GridCacheVersion.class)
- private List<GridCacheVersion> conflictVers;
-
- /** Conflict TTLs. */
- private GridLongList conflictTtls;
-
- /** Conflict expire times. */
- private GridLongList conflictExpireTimes;
-
- /** Return value flag. */
- private boolean retval;
-
- /** Expiry policy. */
- @GridDirectTransient
- private ExpiryPolicy expiryPlc;
-
- /** Expiry policy bytes. */
- private byte[] expiryPlcBytes;
-
- /** Filter. */
- private CacheEntryPredicate[] filter;
-
- /** Flag indicating whether request contains primary keys. */
- private boolean hasPrimary;
-
- /** Subject ID. */
- private UUID subjId;
-
- /** Task name hash. */
- private int taskNameHash;
-
- /** Skip write-through to a persistent storage. */
- private boolean skipStore;
-
- /** */
- private boolean clientReq;
-
- /** Keep binary flag. */
- private boolean keepBinary;
-
- /** */
- @GridDirectTransient
- private GridNearAtomicUpdateResponse res;
-
- /** Maximum possible size of inner collections. */
- @GridDirectTransient
- private int initSize;
+public interface GridNearAtomicUpdateRequest {
+ /**
+ * @return Message ID.
+ */
+ public long messageId();
/**
- * Empty constructor required by {@link Externalizable}.
+ * @return Mapped node ID.
*/
- public GridNearAtomicUpdateRequest() {
- // No-op.
- }
+ public UUID nodeId();
/**
- * Constructor.
- *
- * @param cacheId Cache ID.
* @param nodeId Node ID.
- * @param futVer Future version.
- * @param fastMap Fast map scheme flag.
- * @param updateVer Update version set if fast map is performed.
- * @param topVer Topology version.
- * @param topLocked Topology locked flag.
- * @param syncMode Synchronization mode.
- * @param op Cache update operation.
- * @param retval Return value required flag.
- * @param expiryPlc Expiry policy.
- * @param invokeArgs Optional arguments for entry processor.
- * @param filter Optional filter for atomic check.
- * @param subjId Subject ID.
- * @param taskNameHash Task name hash code.
- * @param skipStore Skip write-through to a persistent storage.
- * @param keepBinary Keep binary flag.
- * @param clientReq Client node request flag.
- * @param addDepInfo Deployment info flag.
- * @param maxEntryCnt Maximum entries count.
*/
- public GridNearAtomicUpdateRequest(
- int cacheId,
- UUID nodeId,
- GridCacheVersion futVer,
- boolean fastMap,
- @Nullable GridCacheVersion updateVer,
- @NotNull AffinityTopologyVersion topVer,
- boolean topLocked,
- CacheWriteSynchronizationMode syncMode,
- GridCacheOperation op,
- boolean retval,
- @Nullable ExpiryPolicy expiryPlc,
- @Nullable Object[] invokeArgs,
- @Nullable CacheEntryPredicate[] filter,
- @Nullable UUID subjId,
- int taskNameHash,
- boolean skipStore,
- boolean keepBinary,
- boolean clientReq,
- boolean addDepInfo,
- int maxEntryCnt
- ) {
- assert futVer != null;
-
- this.cacheId = cacheId;
- this.nodeId = nodeId;
- this.futVer = futVer;
- this.fastMap = fastMap;
- this.updateVer = updateVer;
-
- this.topVer = topVer;
- this.topLocked = topLocked;
- this.syncMode = syncMode;
- this.op = op;
- this.retval = retval;
- this.expiryPlc = expiryPlc;
- this.invokeArgs = invokeArgs;
- this.filter = filter;
- this.subjId = subjId;
- this.taskNameHash = taskNameHash;
- this.skipStore = skipStore;
- this.keepBinary = keepBinary;
- this.clientReq = clientReq;
- this.addDepInfo = addDepInfo;
-
- // By default ArrayList expands to array of 10 elements on first add. We cannot guess how many entries
- // will be added to request because of unknown affinity distribution. However, we DO KNOW how many keys
- // participate in request. As such, we know upper bound of all collections in request. If this bound is lower
- // than 10, we use it.
- initSize = Math.min(maxEntryCnt, 10);
-
- keys = new ArrayList<>(initSize);
- }
-
- /** {@inheritDoc} */
- @Override public int lookupIndex() {
- return CACHE_MSG_IDX;
- }
-
- /** {@inheritDoc} */
- @Override public UUID nodeId() {
- return nodeId;
- }
-
- /** {@inheritDoc} */
- @Override public void nodeId(UUID nodeId) {
- this.nodeId = nodeId;
- }
-
- /** {@inheritDoc} */
- @Override public UUID subjectId() {
- return subjId;
- }
-
- /** {@inheritDoc} */
- @Override public int taskNameHash() {
- return taskNameHash;
- }
-
- /** {@inheritDoc} */
- @Override public GridCacheVersion futureVersion() {
- return futVer;
- }
-
- /** {@inheritDoc} */
- @Override public boolean fastMap() {
- return fastMap;
- }
-
- /** {@inheritDoc} */
- @Override public GridCacheVersion updateVersion() {
- return updateVer;
- }
-
- /** {@inheritDoc} */
- @Override public AffinityTopologyVersion topologyVersion() {
- return topVer;
- }
-
- /** {@inheritDoc} */
- @Override public boolean topologyLocked() {
- return topLocked;
- }
-
- /** {@inheritDoc} */
- @Override public boolean clientRequest() {
- return clientReq;
- }
-
- /** {@inheritDoc} */
- @Override public CacheWriteSynchronizationMode writeSynchronizationMode() {
- return syncMode;
- }
-
- /** {@inheritDoc} */
- @Override public ExpiryPolicy expiry() {
- return expiryPlc;
- }
-
- /** {@inheritDoc} */
- @Override public boolean returnValue() {
- return retval;
- }
-
- /** {@inheritDoc} */
- @Nullable public CacheEntryPredicate[] filter() {
- return filter;
- }
-
- /** {@inheritDoc} */
- @Override public boolean skipStore() {
- return skipStore;
- }
-
- /** {@inheritDoc} */
- @Override public boolean keepBinary() {
- return keepBinary;
- }
+ public void nodeId(UUID nodeId);
/**
- * @param key Key to add.
- * @param val Optional update value.
- * @param conflictTtl Conflict TTL (optional).
- * @param conflictExpireTime Conflict expire time (optional).
- * @param conflictVer Conflict version (optional).
- * @param primary If given key is primary on this mapping.
+ * @return Subject ID.
*/
- @SuppressWarnings("unchecked")
- public void addUpdateEntry(KeyCacheObject key,
- @Nullable Object val,
- long conflictTtl,
- long conflictExpireTime,
- @Nullable GridCacheVersion conflictVer,
- boolean primary) {
- EntryProcessor<Object, Object, Object> entryProcessor = null;
-
- if (op == TRANSFORM) {
- assert val instanceof EntryProcessor : val;
-
- entryProcessor = (EntryProcessor<Object, Object, Object>)val;
- }
-
- assert val != null || op == DELETE;
-
- keys.add(key);
-
- if (entryProcessor != null) {
- if (entryProcessors == null)
- entryProcessors = new ArrayList<>(initSize);
-
- entryProcessors.add(entryProcessor);
- }
- else if (val != null) {
- assert val instanceof CacheObject : val;
-
- if (vals == null)
- vals = new ArrayList<>(initSize);
-
- vals.add((CacheObject)val);
- }
-
- hasPrimary |= primary;
-
- // In case there is no conflict, do not create the list.
- if (conflictVer != null) {
- if (conflictVers == null) {
- conflictVers = new ArrayList<>(initSize);
-
- for (int i = 0; i < keys.size() - 1; i++)
- conflictVers.add(null);
- }
-
- conflictVers.add(conflictVer);
- }
- else if (conflictVers != null)
- conflictVers.add(null);
-
- if (conflictTtl >= 0) {
- if (conflictTtls == null) {
- conflictTtls = new GridLongList(keys.size());
-
- for (int i = 0; i < keys.size() - 1; i++)
- conflictTtls.add(CU.TTL_NOT_CHANGED);
- }
-
- conflictTtls.add(conflictTtl);
- }
-
- if (conflictExpireTime >= 0) {
- if (conflictExpireTimes == null) {
- conflictExpireTimes = new GridLongList(keys.size());
-
- for (int i = 0; i < keys.size() - 1; i++)
- conflictExpireTimes.add(CU.EXPIRE_TIME_CALCULATE);
- }
-
- conflictExpireTimes.add(conflictExpireTime);
- }
- }
-
- /** {@inheritDoc} */
- @Override public List<KeyCacheObject> keys() {
- return keys;
- }
-
- /** {@inheritDoc} */
- @Override public List<?> values() {
- return op == TRANSFORM ? entryProcessors : vals;
- }
-
- /** {@inheritDoc} */
- @Override public GridCacheOperation operation() {
- return op;
- }
-
- /** {@inheritDoc} */
- @Override @Nullable public Object[] invokeArguments() {
- return invokeArgs;
- }
-
- /** {@inheritDoc} */
- @SuppressWarnings("unchecked")
- @Override public CacheObject value(int idx) {
- assert op == UPDATE : op;
-
- return vals.get(idx);
- }
-
- /** {@inheritDoc} */
- @SuppressWarnings("unchecked")
- @Override public EntryProcessor<Object, Object, Object> entryProcessor(int idx) {
- assert op == TRANSFORM : op;
-
- return entryProcessors.get(idx);
- }
-
- /** {@inheritDoc} */
- @Override public CacheObject writeValue(int idx) {
- if (vals != null)
- return vals.get(idx);
-
- return null;
- }
-
- /** {@inheritDoc} */
- @Override @Nullable public List<GridCacheVersion> conflictVersions() {
- return conflictVers;
- }
-
- /** {@inheritDoc} */
- @Override @Nullable public GridCacheVersion conflictVersion(int idx) {
- if (conflictVers != null) {
- assert idx >= 0 && idx < conflictVers.size();
-
- return conflictVers.get(idx);
- }
-
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public long conflictTtl(int idx) {
- if (conflictTtls != null) {
- assert idx >= 0 && idx < conflictTtls.size();
-
- return conflictTtls.get(idx);
- }
-
- return CU.TTL_NOT_CHANGED;
- }
-
- /** {@inheritDoc} */
- @Override public long conflictExpireTime(int idx) {
- if (conflictExpireTimes != null) {
- assert idx >= 0 && idx < conflictExpireTimes.size();
-
- return conflictExpireTimes.get(idx);
- }
-
- return CU.EXPIRE_TIME_CALCULATE;
- }
-
- /** {@inheritDoc} */
- @Override public boolean hasPrimary() {
- return hasPrimary;
- }
-
- /** {@inheritDoc} */
- @Override public boolean onResponse(GridNearAtomicUpdateResponse res) {
- if (this.res == null) {
- this.res = res;
-
- return true;
- }
-
- return false;
- }
-
- /** {@inheritDoc} */
- @Override @Nullable public GridNearAtomicUpdateResponse response() {
- return res;
- }
-
- /** {@inheritDoc} */
- @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
- super.prepareMarshal(ctx);
-
- GridCacheContext cctx = ctx.cacheContext(cacheId);
-
- prepareMarshalCacheObjects(keys, cctx);
-
- if (filter != null) {
- boolean hasFilter = false;
-
- for (CacheEntryPredicate p : filter) {
- if (p != null) {
- hasFilter = true;
-
- p.prepareMarshal(cctx);
- }
- }
-
- if (!hasFilter)
- filter = null;
- }
-
- if (expiryPlc != null && expiryPlcBytes == null)
- expiryPlcBytes = CU.marshal(cctx, new IgniteExternalizableExpiryPolicy(expiryPlc));
-
- if (op == TRANSFORM) {
- // force addition of deployment info for entry processors if P2P is enabled globally.
- if (!addDepInfo && ctx.deploymentEnabled())
- addDepInfo = true;
-
- if (entryProcessorsBytes == null)
- entryProcessorsBytes = marshalCollection(entryProcessors, cctx);
-
- if (invokeArgsBytes == null)
- invokeArgsBytes = marshalInvokeArguments(invokeArgs, cctx);
- }
- else
- prepareMarshalCacheObjects(vals, cctx);
- }
-
- /** {@inheritDoc} */
- @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException {
- super.finishUnmarshal(ctx, ldr);
-
- GridCacheContext cctx = ctx.cacheContext(cacheId);
-
- finishUnmarshalCacheObjects(keys, cctx, ldr);
-
- if (op == TRANSFORM) {
- if (entryProcessors == null)
- entryProcessors = unmarshalCollection(entryProcessorsBytes, ctx, ldr);
-
- if (invokeArgs == null)
- invokeArgs = unmarshalInvokeArguments(invokeArgsBytes, ctx, ldr);
- }
- else
- finishUnmarshalCacheObjects(vals, cctx, ldr);
-
- if (filter != null) {
- for (CacheEntryPredicate p : filter) {
- if (p != null)
- p.finishUnmarshal(cctx, ldr);
- }
- }
-
- if (expiryPlcBytes != null && expiryPlc == null)
- expiryPlc = ctx.marshaller().unmarshal(expiryPlcBytes, ldr);
- }
-
- /** {@inheritDoc} */
- @Override public boolean addDeploymentInfo() {
- return addDepInfo;
- }
-
- /** {@inheritDoc} */
- @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
- writer.setBuffer(buf);
-
- if (!super.writeTo(buf, writer))
- return false;
-
- if (!writer.isHeaderWritten()) {
- if (!writer.writeHeader(directType(), fieldsCount()))
- return false;
-
- writer.onHeaderWritten();
- }
-
- switch (writer.state()) {
- case 3:
- if (!writer.writeBoolean("clientReq", clientReq))
- return false;
-
- writer.incrementState();
-
- case 4:
- if (!writer.writeMessage("conflictExpireTimes", conflictExpireTimes))
- return false;
-
- writer.incrementState();
-
- case 5:
- if (!writer.writeMessage("conflictTtls", conflictTtls))
- return false;
-
- writer.incrementState();
-
- case 6:
- if (!writer.writeCollection("conflictVers", conflictVers, MessageCollectionItemType.MSG))
- return false;
-
- writer.incrementState();
-
- case 7:
- if (!writer.writeCollection("entryProcessorsBytes", entryProcessorsBytes, MessageCollectionItemType.BYTE_ARR))
- return false;
-
- writer.incrementState();
-
- case 8:
- if (!writer.writeByteArray("expiryPlcBytes", expiryPlcBytes))
- return false;
-
- writer.incrementState();
-
- case 9:
- if (!writer.writeBoolean("fastMap", fastMap))
- return false;
-
- writer.incrementState();
-
- case 10:
- if (!writer.writeObjectArray("filter", filter, MessageCollectionItemType.MSG))
- return false;
-
- writer.incrementState();
-
- case 11:
- if (!writer.writeMessage("futVer", futVer))
- return false;
-
- writer.incrementState();
-
- case 12:
- if (!writer.writeBoolean("hasPrimary", hasPrimary))
- return false;
-
- writer.incrementState();
-
- case 13:
- if (!writer.writeObjectArray("invokeArgsBytes", invokeArgsBytes, MessageCollectionItemType.BYTE_ARR))
- return false;
-
- writer.incrementState();
-
- case 14:
- if (!writer.writeBoolean("keepBinary", keepBinary))
- return false;
-
- writer.incrementState();
-
- case 15:
- if (!writer.writeCollection("keys", keys, MessageCollectionItemType.MSG))
- return false;
-
- writer.incrementState();
-
- case 16:
- if (!writer.writeByte("op", op != null ? (byte)op.ordinal() : -1))
- return false;
+ public UUID subjectId();
- writer.incrementState();
-
- case 17:
- if (!writer.writeBoolean("retval", retval))
- return false;
-
- writer.incrementState();
-
- case 18:
- if (!writer.writeBoolean("skipStore", skipStore))
- return false;
-
- writer.incrementState();
-
- case 19:
- if (!writer.writeUuid("subjId", subjId))
- return false;
-
- writer.incrementState();
-
- case 20:
- if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1))
- return false;
-
- writer.incrementState();
-
- case 21:
- if (!writer.writeInt("taskNameHash", taskNameHash))
- return false;
-
- writer.incrementState();
-
- case 22:
- if (!writer.writeBoolean("topLocked", topLocked))
- return false;
-
- writer.incrementState();
-
- case 23:
- if (!writer.writeMessage("topVer", topVer))
- return false;
-
- writer.incrementState();
-
- case 24:
- if (!writer.writeMessage("updateVer", updateVer))
- return false;
-
- writer.incrementState();
-
- case 25:
- if (!writer.writeCollection("vals", vals, MessageCollectionItemType.MSG))
- return false;
-
- writer.incrementState();
-
- }
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
- reader.setBuffer(buf);
-
- if (!reader.beforeMessageRead())
- return false;
-
- if (!super.readFrom(buf, reader))
- return false;
-
- switch (reader.state()) {
- case 3:
- clientReq = reader.readBoolean("clientReq");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 4:
- conflictExpireTimes = reader.readMessage("conflictExpireTimes");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 5:
- conflictTtls = reader.readMessage("conflictTtls");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 6:
- conflictVers = reader.readCollection("conflictVers", MessageCollectionItemType.MSG);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 7:
- entryProcessorsBytes = reader.readCollection("entryProcessorsBytes", MessageCollectionItemType.BYTE_ARR);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 8:
- expiryPlcBytes = reader.readByteArray("expiryPlcBytes");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 9:
- fastMap = reader.readBoolean("fastMap");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 10:
- filter = reader.readObjectArray("filter", MessageCollectionItemType.MSG, CacheEntryPredicate.class);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 11:
- futVer = reader.readMessage("futVer");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 12:
- hasPrimary = reader.readBoolean("hasPrimary");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 13:
- invokeArgsBytes = reader.readObjectArray("invokeArgsBytes", MessageCollectionItemType.BYTE_ARR, byte[].class);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 14:
- keepBinary = reader.readBoolean("keepBinary");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 15:
- keys = reader.readCollection("keys", MessageCollectionItemType.MSG);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 16:
- byte opOrd;
-
- opOrd = reader.readByte("op");
-
- if (!reader.isLastRead())
- return false;
-
- op = GridCacheOperation.fromOrdinal(opOrd);
-
- reader.incrementState();
-
- case 17:
- retval = reader.readBoolean("retval");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 18:
- skipStore = reader.readBoolean("skipStore");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 19:
- subjId = reader.readUuid("subjId");
-
- if (!reader.isLastRead())
- return false;
+ /**
+ * @return Task name hash.
+ */
+ public int taskNameHash();
- reader.incrementState();
+ /**
+ * @return Future version.
+ */
+ public GridCacheVersion futureVersion();
- case 20:
- byte syncModeOrd;
+ /**
+ * @return Flag indicating whether this is fast-map udpate.
+ */
+ public boolean fastMap();
- syncModeOrd = reader.readByte("syncMode");
+ /**
+ * @return Update version for fast-map request.
+ */
+ public GridCacheVersion updateVersion();
- if (!reader.isLastRead())
- return false;
+ /**
+ * @return Topology version.
+ */
+ public AffinityTopologyVersion topologyVersion();
- syncMode = CacheWriteSynchronizationMode.fromOrdinal(syncModeOrd);
+ /**
+ * @return Topology locked flag.
+ */
+ public boolean topologyLocked();
- reader.incrementState();
+ /**
+ * @return {@code True} if request sent from client node.
+ */
+ public boolean clientRequest();
- case 21:
- taskNameHash = reader.readInt("taskNameHash");
+ /**
+ * @return Cache write synchronization mode.
+ */
+ public CacheWriteSynchronizationMode writeSynchronizationMode();
- if (!reader.isLastRead())
- return false;
+ /**
+ * @return Expiry policy.
+ */
+ public ExpiryPolicy expiry();
- reader.incrementState();
+ /**
+ * @return Return value flag.
+ */
+ public boolean returnValue();
- case 22:
- topLocked = reader.readBoolean("topLocked");
+ /**
+ * @return Filter.
+ */
+ @Nullable public CacheEntryPredicate[] filter();
- if (!reader.isLastRead())
- return false;
+ /**
+ * @return Skip write-through to a persistent storage.
+ */
+ public boolean skipStore();
- reader.incrementState();
+ /**
+ * @return Keep binary flag.
+ */
+ public boolean keepBinary();
- case 23:
- topVer = reader.readMessage("topVer");
+ /**
+ * @return Keys for this update request.
+ */
+ public List<KeyCacheObject> keys();
- if (!reader.isLastRead())
- return false;
+ /**
+ * @return Values for this update request.
+ */
+ public List<?> values();
- reader.incrementState();
+ /**
+ * @return Update operation.
+ */
+ public GridCacheOperation operation();
- case 24:
- updateVer = reader.readMessage("updateVer");
+ /**
+ * @return Optional arguments for entry processor.
+ */
+ @Nullable public Object[] invokeArguments();
- if (!reader.isLastRead())
- return false;
+ /**
+ * @param idx Key index.
+ * @return Value.
+ */
+ public CacheObject value(int idx);
- reader.incrementState();
+ /**
+ * @param idx Key index.
+ * @return Entry processor.
+ */
+ public EntryProcessor<Object, Object, Object> entryProcessor(int idx);
- case 25:
- vals = reader.readCollection("vals", MessageCollectionItemType.MSG);
+ /**
+ * @param idx Index to get.
+ * @return Write value - either value, or transform closure.
+ */
+ public CacheObject writeValue(int idx);
- if (!reader.isLastRead())
- return false;
+ /**
+ * @return Conflict versions.
+ */
+ @Nullable public List<GridCacheVersion> conflictVersions();
- reader.incrementState();
+ /**
+ * @param idx Index.
+ * @return Conflict version.
+ */
+ @Nullable public GridCacheVersion conflictVersion(int idx);
- }
+ /**
+ * @param idx Index.
+ * @return Conflict TTL.
+ */
+ public long conflictTtl(int idx);
- return reader.afterMessageRead(GridNearAtomicUpdateRequest.class);
- }
+ /**
+ * @param idx Index.
+ * @return Conflict expire time.
+ */
+ public long conflictExpireTime(int idx);
- /** {@inheritDoc} */
- @Override public byte directType() {
- return 40;
- }
+ /**
+ * @return Flag indicating whether this request contains primary keys.
+ */
+ public boolean hasPrimary();
- /** {@inheritDoc} */
- @Override public byte fieldsCount() {
- return 26;
- }
+ /**
+ * @param res Response.
+ * @return {@code True} if current response was {@code null}.
+ */
+ public boolean onResponse(GridNearAtomicUpdateResponse res);
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(GridNearAtomicUpdateRequest.class, this, "filter", Arrays.toString(filter),
- "parent", super.toString());
- }
+ /**
+ * @return Response.
+ */
+ @Nullable public GridNearAtomicUpdateResponse response();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1491c1f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequestBase.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequestBase.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequestBase.java
deleted file mode 100644
index 8ddb181..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequestBase.java
+++ /dev/null
@@ -1,199 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
-
-import org.apache.ignite.cache.CacheWriteSynchronizationMode;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
-import org.apache.ignite.internal.processors.cache.CacheObject;
-import org.apache.ignite.internal.processors.cache.GridCacheOperation;
-import org.apache.ignite.internal.processors.cache.KeyCacheObject;
-import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.jetbrains.annotations.Nullable;
-
-import javax.cache.expiry.ExpiryPolicy;
-import javax.cache.processor.EntryProcessor;
-import java.util.List;
-import java.util.UUID;
-
-/**
- * Base interface for near atomic update requests.
- */
-public interface GridNearAtomicUpdateRequestBase {
- /**
- * @return Message ID.
- */
- public long messageId();
-
- /**
- * @return Mapped node ID.
- */
- public UUID nodeId();
-
- /**
- * @param nodeId Node ID.
- */
- public void nodeId(UUID nodeId);
-
- /**
- * @return Subject ID.
- */
- public UUID subjectId();
-
- /**
- * @return Task name hash.
- */
- public int taskNameHash();
-
- /**
- * @return Future version.
- */
- public GridCacheVersion futureVersion();
-
- /**
- * @return Flag indicating whether this is fast-map udpate.
- */
- public boolean fastMap();
-
- /**
- * @return Update version for fast-map request.
- */
- public GridCacheVersion updateVersion();
-
- /**
- * @return Topology version.
- */
- public AffinityTopologyVersion topologyVersion();
-
- /**
- * @return Topology locked flag.
- */
- public boolean topologyLocked();
-
- /**
- * @return {@code True} if request sent from client node.
- */
- public boolean clientRequest();
-
- /**
- * @return Cache write synchronization mode.
- */
- public CacheWriteSynchronizationMode writeSynchronizationMode();
-
- /**
- * @return Expiry policy.
- */
- public ExpiryPolicy expiry();
-
- /**
- * @return Return value flag.
- */
- public boolean returnValue();
-
- /**
- * @return Filter.
- */
- @Nullable public CacheEntryPredicate[] filter();
-
- /**
- * @return Skip write-through to a persistent storage.
- */
- public boolean skipStore();
-
- /**
- * @return Keep binary flag.
- */
- public boolean keepBinary();
-
- /**
- * @return Keys for this update request.
- */
- public List<KeyCacheObject> keys();
-
- /**
- * @return Values for this update request.
- */
- public List<?> values();
-
- /**
- * @return Update operation.
- */
- public GridCacheOperation operation();
-
- /**
- * @return Optional arguments for entry processor.
- */
- @Nullable public Object[] invokeArguments();
-
- /**
- * @param idx Key index.
- * @return Value.
- */
- public CacheObject value(int idx);
-
- /**
- * @param idx Key index.
- * @return Entry processor.
- */
- public EntryProcessor<Object, Object, Object> entryProcessor(int idx);
-
- /**
- * @param idx Index to get.
- * @return Write value - either value, or transform closure.
- */
- public CacheObject writeValue(int idx);
-
- /**
- * @return Conflict versions.
- */
- @Nullable public List<GridCacheVersion> conflictVersions();
-
- /**
- * @param idx Index.
- * @return Conflict version.
- */
- @Nullable public GridCacheVersion conflictVersion(int idx);
-
- /**
- * @param idx Index.
- * @return Conflict TTL.
- */
- public long conflictTtl(int idx);
-
- /**
- * @param idx Index.
- * @return Conflict expire time.
- */
- public long conflictExpireTime(int idx);
-
- /**
- * @return Flag indicating whether this request contains primary keys.
- */
- public boolean hasPrimary();
-
- /**
- * @param res Response.
- * @return {@code True} if current response was {@code null}.
- */
- public boolean onResponse(GridNearAtomicUpdateResponse res);
-
- /**
- * @return Response.
- */
- @Nullable public GridNearAtomicUpdateResponse response();
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1491c1f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
index 22b9504..63c073d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
@@ -45,7 +45,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvali
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateResponse;
-import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateRequestBase;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateResponse;
import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx;
@@ -127,7 +127,7 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
* @param res Update response.
*/
public void processNearAtomicUpdateResponse(
- GridNearAtomicUpdateRequestBase req,
+ GridNearAtomicUpdateRequest req,
GridNearAtomicUpdateResponse res
) {
if (F.size(res.failedKeys()) == req.keys().size())
http://git-wip-us.apache.org/repos/asf/ignite/blob/1491c1f4/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicMessageCountSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicMessageCountSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicMessageCountSelfTest.java
index cda86ba..50a6114 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicMessageCountSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicMessageCountSelfTest.java
@@ -29,7 +29,7 @@ import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.managers.communication.GridIoMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateRequest;
-import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateRequest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicMultipleUpdateRequest;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.spi.IgniteSpiException;
@@ -138,7 +138,7 @@ public class GridCacheAtomicMessageCountSelfTest extends GridCommonAbstractTest
TestCommunicationSpi commSpi = (TestCommunicationSpi)grid(0).configuration().getCommunicationSpi();
- commSpi.registerMessage(GridNearAtomicUpdateRequest.class);
+ commSpi.registerMessage(GridNearAtomicMultipleUpdateRequest.class);
commSpi.registerMessage(GridNearAtomicSingleUpdateRequest.class);
commSpi.registerMessage(GridDhtAtomicUpdateRequest.class);
@@ -199,7 +199,7 @@ public class GridCacheAtomicMessageCountSelfTest extends GridCommonAbstractTest
* @return Count.
*/
private int nearRequestsCount(TestCommunicationSpi commSpi) {
- return commSpi.messageCount(GridNearAtomicUpdateRequest.class) +
+ return commSpi.messageCount(GridNearAtomicMultipleUpdateRequest.class) +
commSpi.messageCount(GridNearAtomicSingleUpdateRequest.class);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1491c1f4/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicStopBusySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicStopBusySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicStopBusySelfTest.java
index 024ff2f..43a111a 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicStopBusySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicStopBusySelfTest.java
@@ -19,7 +19,7 @@ package org.apache.ignite.internal.processors.cache;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateRequest;
-import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateRequest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicMultipleUpdateRequest;
/**
* Stopped node when client operations are executing.
@@ -32,7 +32,7 @@ public class IgniteCacheAtomicStopBusySelfTest extends IgniteCacheAbstractStopBu
/** {@inheritDoc} */
@Override public void testPut() throws Exception {
- bannedMsg.set(GridNearAtomicUpdateRequest.class);
+ bannedMsg.set(GridNearAtomicMultipleUpdateRequest.class);
bannedMsg.set(GridNearAtomicSingleUpdateRequest.class);
super.testPut();
@@ -40,7 +40,7 @@ public class IgniteCacheAtomicStopBusySelfTest extends IgniteCacheAbstractStopBu
/** {@inheritDoc} */
@Override public void testPutBatch() throws Exception {
- bannedMsg.set(GridNearAtomicUpdateRequest.class);
+ bannedMsg.set(GridNearAtomicMultipleUpdateRequest.class);
bannedMsg.set(GridNearAtomicSingleUpdateRequest.class);
super.testPut();
@@ -48,7 +48,7 @@ public class IgniteCacheAtomicStopBusySelfTest extends IgniteCacheAbstractStopBu
/** {@inheritDoc} */
@Override public void testPutAsync() throws Exception {
- bannedMsg.set(GridNearAtomicUpdateRequest.class);
+ bannedMsg.set(GridNearAtomicMultipleUpdateRequest.class);
bannedMsg.set(GridNearAtomicSingleUpdateRequest.class);
super.testPut();
@@ -56,7 +56,7 @@ public class IgniteCacheAtomicStopBusySelfTest extends IgniteCacheAbstractStopBu
/** {@inheritDoc} */
@Override public void testRemove() throws Exception {
- bannedMsg.set(GridNearAtomicUpdateRequest.class);
+ bannedMsg.set(GridNearAtomicMultipleUpdateRequest.class);
bannedMsg.set(GridNearAtomicSingleUpdateRequest.class);
super.testPut();
http://git-wip-us.apache.org/repos/asf/ignite/blob/1491c1f4/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java
index 8182b33..49686fc 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java
@@ -62,8 +62,8 @@ import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateRequest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicMultipleUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateRequest;
-import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateRequestBase;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheEntry;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockRequest;
@@ -233,10 +233,10 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac
TestCommunicationSpi spi = (TestCommunicationSpi)ignite2.configuration().getCommunicationSpi();
// Block messages requests for both nodes.
- spi.blockMessages(GridNearAtomicUpdateRequest.class, ignite0.localNode().id());
+ spi.blockMessages(GridNearAtomicMultipleUpdateRequest.class, ignite0.localNode().id());
spi.blockMessages(GridNearAtomicSingleUpdateRequest.class, ignite0.localNode().id());
- spi.blockMessages(GridNearAtomicUpdateRequest.class, ignite1.localNode().id());
+ spi.blockMessages(GridNearAtomicMultipleUpdateRequest.class, ignite1.localNode().id());
spi.blockMessages(GridNearAtomicSingleUpdateRequest.class, ignite1.localNode().id());
final IgniteCache<Integer, Integer> cache = ignite2.cache(null);
@@ -278,7 +278,7 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac
map.put(i, i + 1);
// Block messages requests for single node.
- spi.blockMessages(GridNearAtomicUpdateRequest.class, ignite0.localNode().id());
+ spi.blockMessages(GridNearAtomicMultipleUpdateRequest.class, ignite0.localNode().id());
spi.blockMessages(GridNearAtomicSingleUpdateRequest.class, ignite0.localNode().id());
putFut = GridTestUtils.runAsync(new Callable<Object>() {
@@ -366,16 +366,16 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac
TestCommunicationSpi spi = (TestCommunicationSpi)ignite3.configuration().getCommunicationSpi();
// Block messages requests for both nodes.
- spi.blockMessages(GridNearAtomicUpdateRequest.class, ignite0.localNode().id());
+ spi.blockMessages(GridNearAtomicMultipleUpdateRequest.class, ignite0.localNode().id());
spi.blockMessages(GridNearAtomicSingleUpdateRequest.class, ignite0.localNode().id());
- spi.blockMessages(GridNearAtomicUpdateRequest.class, ignite1.localNode().id());
+ spi.blockMessages(GridNearAtomicMultipleUpdateRequest.class, ignite1.localNode().id());
spi.blockMessages(GridNearAtomicSingleUpdateRequest.class, ignite1.localNode().id());
- spi.blockMessages(GridNearAtomicUpdateRequest.class, ignite2.localNode().id());
+ spi.blockMessages(GridNearAtomicMultipleUpdateRequest.class, ignite2.localNode().id());
spi.blockMessages(GridNearAtomicSingleUpdateRequest.class, ignite2.localNode().id());
- spi.record(GridNearAtomicUpdateRequest.class, GridNearAtomicSingleUpdateRequest.class);
+ spi.record(GridNearAtomicMultipleUpdateRequest.class, GridNearAtomicSingleUpdateRequest.class);
final IgniteCache<Integer, Integer> cache = ignite3.cache(null);
@@ -412,7 +412,7 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac
assertEquals(3, msgs.size());
for (Object msg : msgs)
- assertTrue(((GridNearAtomicUpdateRequestBase)msg).clientRequest());
+ assertTrue(((GridNearAtomicUpdateRequest)msg).clientRequest());
map.put(primaryKey(ignite0.cache(null)), 3);
map.put(primaryKey(ignite1.cache(null)), 4);
@@ -469,10 +469,10 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac
TestCommunicationSpi spi = (TestCommunicationSpi)ignite2.configuration().getCommunicationSpi();
// Block messages requests for both nodes.
- spi.blockMessages(GridNearAtomicUpdateRequest.class, ignite0.localNode().id());
+ spi.blockMessages(GridNearAtomicMultipleUpdateRequest.class, ignite0.localNode().id());
spi.blockMessages(GridNearAtomicSingleUpdateRequest.class, ignite0.localNode().id());
- spi.blockMessages(GridNearAtomicUpdateRequest.class, ignite1.localNode().id());
+ spi.blockMessages(GridNearAtomicMultipleUpdateRequest.class, ignite1.localNode().id());
spi.blockMessages(GridNearAtomicSingleUpdateRequest.class, ignite1.localNode().id());
final IgniteCache<Integer, Integer> cache = ignite2.cache(null);
http://git-wip-us.apache.org/repos/asf/ignite/blob/1491c1f4/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicInvalidPartitionHandlingSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicInvalidPartitionHandlingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicInvalidPartitionHandlingSelfTest.java
index c3bd369..0e7755b 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicInvalidPartitionHandlingSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicInvalidPartitionHandlingSelfTest.java
@@ -478,7 +478,7 @@ public class GridCacheAtomicInvalidPartitionHandlingSelfTest extends GridCommonA
Object origMsg = msg.message();
return delay && (
- (origMsg instanceof GridNearAtomicUpdateRequest) ||
+ (origMsg instanceof GridNearAtomicMultipleUpdateRequest) ||
(origMsg instanceof GridNearAtomicSingleUpdateRequest) ||
(origMsg instanceof GridDhtAtomicUpdateRequest)
);
[2/2] ignite git commit: IGNITE-2523: Renamings.
Posted by vo...@apache.org.
IGNITE-2523: Renamings.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1491c1f4
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1491c1f4
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1491c1f4
Branch: refs/heads/ignite-2523
Commit: 1491c1f494e47618191ae7e4f79c67a5fdd9a326
Parents: e834080
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Fri Feb 5 10:04:04 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Fri Feb 5 10:04:04 2016 +0300
----------------------------------------------------------------------
.../communication/GridIoMessageFactory.java | 4 +-
.../processors/cache/GridCacheIoManager.java | 4 +-
.../dht/atomic/GridDhtAtomicCache.java | 40 +-
.../dht/atomic/GridDhtAtomicUpdateFuture.java | 8 +-
.../GridNearAtomicMultipleUpdateRequest.java | 976 +++++++++++++++++
.../GridNearAtomicSingleUpdateRequest.java | 2 +-
.../dht/atomic/GridNearAtomicUpdateFuture.java | 54 +-
.../dht/atomic/GridNearAtomicUpdateRequest.java | 1037 +++---------------
.../atomic/GridNearAtomicUpdateRequestBase.java | 199 ----
.../distributed/near/GridNearAtomicCache.java | 4 +-
.../GridCacheAtomicMessageCountSelfTest.java | 6 +-
.../IgniteCacheAtomicStopBusySelfTest.java | 10 +-
...niteCacheClientNodeChangingTopologyTest.java | 22 +-
...eAtomicInvalidPartitionHandlingSelfTest.java | 2 +-
14 files changed, 1184 insertions(+), 1184 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/1491c1f4/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 25e07b8..88e34c9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -69,7 +69,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDh
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateRequest;
-import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateRequest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicMultipleUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysResponse;
@@ -363,7 +363,7 @@ public class GridIoMessageFactory implements MessageFactory {
break;
case 40:
- msg = new GridNearAtomicUpdateRequest();
+ msg = new GridNearAtomicMultipleUpdateRequest();
break;
http://git-wip-us.apache.org/repos/asf/ignite/blob/1491c1f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index 57545af..8a8f161 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -45,7 +45,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridPartition
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateRequest;
-import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateRequest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicMultipleUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysResponse;
@@ -410,7 +410,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
break;
case 40: {
- GridNearAtomicUpdateRequest req = (GridNearAtomicUpdateRequest)msg;
+ GridNearAtomicMultipleUpdateRequest req = (GridNearAtomicMultipleUpdateRequest)msg;
GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(
ctx.cacheId(),
http://git-wip-us.apache.org/repos/asf/ignite/blob/1491c1f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 40494c1..55db70a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -139,7 +139,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
Integer.getInteger(IGNITE_ATOMIC_DEFERRED_ACK_TIMEOUT, 500);
/** Update reply closure. */
- private CI2<GridNearAtomicUpdateRequestBase, GridNearAtomicUpdateResponse> updateReplyClos;
+ private CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> updateReplyClos;
/** Pending */
private ConcurrentMap<UUID, DeferredResponseBuffer> pendingResponses = new ConcurrentHashMap8<>();
@@ -192,9 +192,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
}
});
- updateReplyClos = new CI2<GridNearAtomicUpdateRequestBase, GridNearAtomicUpdateResponse>() {
+ updateReplyClos = new CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse>() {
@SuppressWarnings("ThrowableResultOfMethodCallIgnored")
- @Override public void apply(GridNearAtomicUpdateRequestBase req, GridNearAtomicUpdateResponse res) {
+ @Override public void apply(GridNearAtomicUpdateRequest req, GridNearAtomicUpdateResponse res) {
if (ctx.config().getAtomicWriteOrderMode() == CLOCK) {
assert req.writeSynchronizationMode() != FULL_ASYNC : req;
@@ -250,8 +250,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
}
});
- ctx.io().addHandler(ctx.cacheId(), GridNearAtomicUpdateRequest.class, new CI2<UUID, GridNearAtomicUpdateRequest>() {
- @Override public void apply(UUID nodeId, GridNearAtomicUpdateRequest req) {
+ ctx.io().addHandler(ctx.cacheId(), GridNearAtomicMultipleUpdateRequest.class, new CI2<UUID, GridNearAtomicMultipleUpdateRequest>() {
+ @Override public void apply(UUID nodeId, GridNearAtomicMultipleUpdateRequest req) {
processNearAtomicUpdateRequest(nodeId, req);
}
});
@@ -1310,8 +1310,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
*/
public void updateAllAsyncInternal(
final UUID nodeId,
- final GridNearAtomicUpdateRequestBase req,
- final CI2<GridNearAtomicUpdateRequestBase, GridNearAtomicUpdateResponse> completionCb
+ final GridNearAtomicUpdateRequest req,
+ final CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb
) {
IgniteInternalFuture<Object> forceFut = preldr.request(req.keys(), req.topologyVersion());
@@ -1335,8 +1335,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
*/
public void updateAllAsyncInternal0(
UUID nodeId,
- GridNearAtomicUpdateRequestBase req,
- CI2<GridNearAtomicUpdateRequestBase, GridNearAtomicUpdateResponse> completionCb
+ GridNearAtomicUpdateRequest req,
+ CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb
) {
GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(ctx.cacheId(), nodeId, req.futureVersion(),
ctx.deploymentEnabled());
@@ -1558,12 +1558,12 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
private UpdateBatchResult updateWithBatch(
ClusterNode node,
boolean hasNear,
- GridNearAtomicUpdateRequestBase req,
+ GridNearAtomicUpdateRequest req,
GridNearAtomicUpdateResponse res,
List<GridDhtCacheEntry> locked,
GridCacheVersion ver,
@Nullable GridDhtAtomicUpdateFuture dhtFut,
- CI2<GridNearAtomicUpdateRequestBase, GridNearAtomicUpdateResponse> completionCb,
+ CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb,
boolean replicate,
String taskName,
@Nullable IgniteCacheExpiryPolicy expiry,
@@ -1974,12 +1974,12 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
private UpdateSingleResult updateSingle(
ClusterNode node,
boolean hasNear,
- GridNearAtomicUpdateRequestBase req,
+ GridNearAtomicUpdateRequest req,
GridNearAtomicUpdateResponse res,
List<GridDhtCacheEntry> locked,
GridCacheVersion ver,
@Nullable GridDhtAtomicUpdateFuture dhtFut,
- CI2<GridNearAtomicUpdateRequestBase, GridNearAtomicUpdateResponse> completionCb,
+ CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb,
boolean replicate,
String taskName,
@Nullable IgniteCacheExpiryPolicy expiry,
@@ -2214,8 +2214,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
@Nullable Collection<KeyCacheObject> rmvKeys,
@Nullable Map<KeyCacheObject, EntryProcessor<Object, Object, Object>> entryProcessorMap,
@Nullable GridDhtAtomicUpdateFuture dhtFut,
- CI2<GridNearAtomicUpdateRequestBase, GridNearAtomicUpdateResponse> completionCb,
- final GridNearAtomicUpdateRequestBase req,
+ CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb,
+ final GridNearAtomicUpdateRequest req,
final GridNearAtomicUpdateResponse res,
boolean replicate,
UpdateBatchResult batchRes,
@@ -2593,7 +2593,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
* will return false.
* @return {@code True} if filter evaluation succeeded.
*/
- private boolean checkFilter(GridCacheEntryEx entry, GridNearAtomicUpdateRequestBase req,
+ private boolean checkFilter(GridCacheEntryEx entry, GridNearAtomicUpdateRequest req,
GridNearAtomicUpdateResponse res) {
try {
return ctx.isAllLocked(entry, req.filter());
@@ -2608,7 +2608,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
/**
* @param req Request to remap.
*/
- private void remapToNewPrimary(GridNearAtomicUpdateRequestBase req) {
+ private void remapToNewPrimary(GridNearAtomicUpdateRequest req) {
assert req.writeSynchronizationMode() == FULL_ASYNC : req;
if (log.isDebugEnabled())
@@ -2687,9 +2687,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
*/
@Nullable private GridDhtAtomicUpdateFuture createDhtFuture(
GridCacheVersion writeVer,
- GridNearAtomicUpdateRequestBase updateReq,
+ GridNearAtomicUpdateRequest updateReq,
GridNearAtomicUpdateResponse updateRes,
- CI2<GridNearAtomicUpdateRequestBase, GridNearAtomicUpdateResponse> completionCb,
+ CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb,
boolean force
) {
if (!force) {
@@ -2720,7 +2720,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
* @param nodeId Sender node ID.
* @param req Near atomic update request.
*/
- private void processNearAtomicUpdateRequest(UUID nodeId, GridNearAtomicUpdateRequestBase req) {
+ private void processNearAtomicUpdateRequest(UUID nodeId, GridNearAtomicUpdateRequest req) {
if (log.isDebugEnabled())
log.debug("Processing near atomic update request [nodeId=" + nodeId + ", req=" + req + ']');
http://git-wip-us.apache.org/repos/asf/ignite/blob/1491c1f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
index 7820832..3a31700 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
@@ -77,7 +77,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> implement
/** Completion callback. */
@GridToStringExclude
- private final CI2<GridNearAtomicUpdateRequestBase, GridNearAtomicUpdateResponse> completionCb;
+ private final CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb;
/** Mappings. */
@GridToStringInclude
@@ -87,7 +87,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> implement
private Map<KeyCacheObject, GridDhtCacheEntry> nearReadersEntries;
/** Update request. */
- private final GridNearAtomicUpdateRequestBase updateReq;
+ private final GridNearAtomicUpdateRequest updateReq;
/** Update response. */
private final GridNearAtomicUpdateResponse updateRes;
@@ -110,9 +110,9 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> implement
*/
public GridDhtAtomicUpdateFuture(
GridCacheContext cctx,
- CI2<GridNearAtomicUpdateRequestBase, GridNearAtomicUpdateResponse> completionCb,
+ CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb,
GridCacheVersion writeVer,
- GridNearAtomicUpdateRequestBase updateReq,
+ GridNearAtomicUpdateRequest updateReq,
GridNearAtomicUpdateResponse updateRes
) {
this.cctx = cctx;
http://git-wip-us.apache.org/repos/asf/ignite/blob/1491c1f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicMultipleUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicMultipleUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicMultipleUpdateRequest.java
new file mode 100644
index 0000000..650d350
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicMultipleUpdateRequest.java
@@ -0,0 +1,976 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
+
+import java.io.Externalizable;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+import javax.cache.expiry.ExpiryPolicy;
+import javax.cache.processor.EntryProcessor;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.internal.GridDirectCollection;
+import org.apache.ignite.internal.GridDirectTransient;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
+import org.apache.ignite.internal.processors.cache.GridCacheMessage;
+import org.apache.ignite.internal.processors.cache.GridCacheOperation;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.distributed.IgniteExternalizableExpiryPolicy;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.GridLongList;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.internal.processors.cache.GridCacheOperation.DELETE;
+import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
+import static org.apache.ignite.internal.processors.cache.GridCacheOperation.UPDATE;
+
+/**
+ * Lite DHT cache update request sent from near node to primary node.
+ */
+public class GridNearAtomicMultipleUpdateRequest extends GridCacheMessage
+ implements GridNearAtomicUpdateRequest, GridCacheDeployable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Message index. */
+ public static final int CACHE_MSG_IDX = nextIndexId();
+
+ /** Target node ID. */
+ @GridDirectTransient
+ private UUID nodeId;
+
+ /** Future version. */
+ private GridCacheVersion futVer;
+
+ /** Fast map flag. */
+ private boolean fastMap;
+
+ /** Update version. Set to non-null if fastMap is {@code true}. */
+ private GridCacheVersion updateVer;
+
+ /** Topology version. */
+ private AffinityTopologyVersion topVer;
+
+ /** Topology locked flag. Set if atomic update is performed inside TX or explicit lock. */
+ private boolean topLocked;
+
+ /** Write synchronization mode. */
+ private CacheWriteSynchronizationMode syncMode;
+
+ /** Update operation. */
+ private GridCacheOperation op;
+
+ /** Keys to update. */
+ @GridToStringInclude
+ @GridDirectCollection(KeyCacheObject.class)
+ private List<KeyCacheObject> keys;
+
+ /** Values to update. */
+ @GridDirectCollection(CacheObject.class)
+ private List<CacheObject> vals;
+
+ /** Entry processors. */
+ @GridDirectTransient
+ private List<EntryProcessor<Object, Object, Object>> entryProcessors;
+
+ /** Entry processors bytes. */
+ @GridDirectCollection(byte[].class)
+ private List<byte[]> entryProcessorsBytes;
+
+ /** Optional arguments for entry processor. */
+ @GridDirectTransient
+ private Object[] invokeArgs;
+
+ /** Entry processor arguments bytes. */
+ private byte[][] invokeArgsBytes;
+
+ /** Conflict versions. */
+ @GridDirectCollection(GridCacheVersion.class)
+ private List<GridCacheVersion> conflictVers;
+
+ /** Conflict TTLs. */
+ private GridLongList conflictTtls;
+
+ /** Conflict expire times. */
+ private GridLongList conflictExpireTimes;
+
+ /** Return value flag. */
+ private boolean retval;
+
+ /** Expiry policy. */
+ @GridDirectTransient
+ private ExpiryPolicy expiryPlc;
+
+ /** Expiry policy bytes. */
+ private byte[] expiryPlcBytes;
+
+ /** Filter. */
+ private CacheEntryPredicate[] filter;
+
+ /** Flag indicating whether request contains primary keys. */
+ private boolean hasPrimary;
+
+ /** Subject ID. */
+ private UUID subjId;
+
+ /** Task name hash. */
+ private int taskNameHash;
+
+ /** Skip write-through to a persistent storage. */
+ private boolean skipStore;
+
+ /** */
+ private boolean clientReq;
+
+ /** Keep binary flag. */
+ private boolean keepBinary;
+
+ /** */
+ @GridDirectTransient
+ private GridNearAtomicUpdateResponse res;
+
+ /** Maximum possible size of inner collections. */
+ @GridDirectTransient
+ private int initSize;
+
+ /**
+ * Empty constructor required by {@link Externalizable}.
+ */
+ public GridNearAtomicMultipleUpdateRequest() {
+ // No-op.
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param cacheId Cache ID.
+ * @param nodeId Node ID.
+ * @param futVer Future version.
+ * @param fastMap Fast map scheme flag.
+ * @param updateVer Update version set if fast map is performed.
+ * @param topVer Topology version.
+ * @param topLocked Topology locked flag.
+ * @param syncMode Synchronization mode.
+ * @param op Cache update operation.
+ * @param retval Return value required flag.
+ * @param expiryPlc Expiry policy.
+ * @param invokeArgs Optional arguments for entry processor.
+ * @param filter Optional filter for atomic check.
+ * @param subjId Subject ID.
+ * @param taskNameHash Task name hash code.
+ * @param skipStore Skip write-through to a persistent storage.
+ * @param keepBinary Keep binary flag.
+ * @param clientReq Client node request flag.
+ * @param addDepInfo Deployment info flag.
+ * @param maxEntryCnt Maximum entries count.
+ */
+ public GridNearAtomicMultipleUpdateRequest(
+ int cacheId,
+ UUID nodeId,
+ GridCacheVersion futVer,
+ boolean fastMap,
+ @Nullable GridCacheVersion updateVer,
+ @NotNull AffinityTopologyVersion topVer,
+ boolean topLocked,
+ CacheWriteSynchronizationMode syncMode,
+ GridCacheOperation op,
+ boolean retval,
+ @Nullable ExpiryPolicy expiryPlc,
+ @Nullable Object[] invokeArgs,
+ @Nullable CacheEntryPredicate[] filter,
+ @Nullable UUID subjId,
+ int taskNameHash,
+ boolean skipStore,
+ boolean keepBinary,
+ boolean clientReq,
+ boolean addDepInfo,
+ int maxEntryCnt
+ ) {
+ assert futVer != null;
+
+ this.cacheId = cacheId;
+ this.nodeId = nodeId;
+ this.futVer = futVer;
+ this.fastMap = fastMap;
+ this.updateVer = updateVer;
+
+ this.topVer = topVer;
+ this.topLocked = topLocked;
+ this.syncMode = syncMode;
+ this.op = op;
+ this.retval = retval;
+ this.expiryPlc = expiryPlc;
+ this.invokeArgs = invokeArgs;
+ this.filter = filter;
+ this.subjId = subjId;
+ this.taskNameHash = taskNameHash;
+ this.skipStore = skipStore;
+ this.keepBinary = keepBinary;
+ this.clientReq = clientReq;
+ this.addDepInfo = addDepInfo;
+
+ // By default ArrayList expands to array of 10 elements on first add. We cannot guess how many entries
+ // will be added to request because of unknown affinity distribution. However, we DO KNOW how many keys
+ // participate in request. As such, we know upper bound of all collections in request. If this bound is lower
+ // than 10, we use it.
+ initSize = Math.min(maxEntryCnt, 10);
+
+ keys = new ArrayList<>(initSize);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int lookupIndex() {
+ return CACHE_MSG_IDX;
+ }
+
+ /** {@inheritDoc} */
+ @Override public UUID nodeId() {
+ return nodeId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void nodeId(UUID nodeId) {
+ this.nodeId = nodeId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public UUID subjectId() {
+ return subjId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int taskNameHash() {
+ return taskNameHash;
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridCacheVersion futureVersion() {
+ return futVer;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean fastMap() {
+ return fastMap;
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridCacheVersion updateVersion() {
+ return updateVer;
+ }
+
+ /** {@inheritDoc} */
+ @Override public AffinityTopologyVersion topologyVersion() {
+ return topVer;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean topologyLocked() {
+ return topLocked;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean clientRequest() {
+ return clientReq;
+ }
+
+ /** {@inheritDoc} */
+ @Override public CacheWriteSynchronizationMode writeSynchronizationMode() {
+ return syncMode;
+ }
+
+ /** {@inheritDoc} */
+ @Override public ExpiryPolicy expiry() {
+ return expiryPlc;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean returnValue() {
+ return retval;
+ }
+
+ /** {@inheritDoc} */
+ @Nullable public CacheEntryPredicate[] filter() {
+ return filter;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean skipStore() {
+ return skipStore;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean keepBinary() {
+ return keepBinary;
+ }
+
+ /**
+ * @param key Key to add.
+ * @param val Optional update value.
+ * @param conflictTtl Conflict TTL (optional).
+ * @param conflictExpireTime Conflict expire time (optional).
+ * @param conflictVer Conflict version (optional).
+ * @param primary If given key is primary on this mapping.
+ */
+ @SuppressWarnings("unchecked")
+ public void addUpdateEntry(KeyCacheObject key,
+ @Nullable Object val,
+ long conflictTtl,
+ long conflictExpireTime,
+ @Nullable GridCacheVersion conflictVer,
+ boolean primary) {
+ EntryProcessor<Object, Object, Object> entryProcessor = null;
+
+ if (op == TRANSFORM) {
+ assert val instanceof EntryProcessor : val;
+
+ entryProcessor = (EntryProcessor<Object, Object, Object>)val;
+ }
+
+ assert val != null || op == DELETE;
+
+ keys.add(key);
+
+ if (entryProcessor != null) {
+ if (entryProcessors == null)
+ entryProcessors = new ArrayList<>(initSize);
+
+ entryProcessors.add(entryProcessor);
+ }
+ else if (val != null) {
+ assert val instanceof CacheObject : val;
+
+ if (vals == null)
+ vals = new ArrayList<>(initSize);
+
+ vals.add((CacheObject)val);
+ }
+
+ hasPrimary |= primary;
+
+ // In case there is no conflict, do not create the list.
+ if (conflictVer != null) {
+ if (conflictVers == null) {
+ conflictVers = new ArrayList<>(initSize);
+
+ for (int i = 0; i < keys.size() - 1; i++)
+ conflictVers.add(null);
+ }
+
+ conflictVers.add(conflictVer);
+ }
+ else if (conflictVers != null)
+ conflictVers.add(null);
+
+ if (conflictTtl >= 0) {
+ if (conflictTtls == null) {
+ conflictTtls = new GridLongList(keys.size());
+
+ for (int i = 0; i < keys.size() - 1; i++)
+ conflictTtls.add(CU.TTL_NOT_CHANGED);
+ }
+
+ conflictTtls.add(conflictTtl);
+ }
+
+ if (conflictExpireTime >= 0) {
+ if (conflictExpireTimes == null) {
+ conflictExpireTimes = new GridLongList(keys.size());
+
+ for (int i = 0; i < keys.size() - 1; i++)
+ conflictExpireTimes.add(CU.EXPIRE_TIME_CALCULATE);
+ }
+
+ conflictExpireTimes.add(conflictExpireTime);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public List<KeyCacheObject> keys() {
+ return keys;
+ }
+
+ /** {@inheritDoc} */
+ @Override public List<?> values() {
+ return op == TRANSFORM ? entryProcessors : vals;
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridCacheOperation operation() {
+ return op;
+ }
+
+ /** {@inheritDoc} */
+ @Override @Nullable public Object[] invokeArguments() {
+ return invokeArgs;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
+ @Override public CacheObject value(int idx) {
+ assert op == UPDATE : op;
+
+ return vals.get(idx);
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
+ @Override public EntryProcessor<Object, Object, Object> entryProcessor(int idx) {
+ assert op == TRANSFORM : op;
+
+ return entryProcessors.get(idx);
+ }
+
+ /** {@inheritDoc} */
+ @Override public CacheObject writeValue(int idx) {
+ if (vals != null)
+ return vals.get(idx);
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override @Nullable public List<GridCacheVersion> conflictVersions() {
+ return conflictVers;
+ }
+
+ /** {@inheritDoc} */
+ @Override @Nullable public GridCacheVersion conflictVersion(int idx) {
+ if (conflictVers != null) {
+ assert idx >= 0 && idx < conflictVers.size();
+
+ return conflictVers.get(idx);
+ }
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long conflictTtl(int idx) {
+ if (conflictTtls != null) {
+ assert idx >= 0 && idx < conflictTtls.size();
+
+ return conflictTtls.get(idx);
+ }
+
+ return CU.TTL_NOT_CHANGED;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long conflictExpireTime(int idx) {
+ if (conflictExpireTimes != null) {
+ assert idx >= 0 && idx < conflictExpireTimes.size();
+
+ return conflictExpireTimes.get(idx);
+ }
+
+ return CU.EXPIRE_TIME_CALCULATE;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean hasPrimary() {
+ return hasPrimary;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean onResponse(GridNearAtomicUpdateResponse res) {
+ if (this.res == null) {
+ this.res = res;
+
+ return true;
+ }
+
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override @Nullable public GridNearAtomicUpdateResponse response() {
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
+ super.prepareMarshal(ctx);
+
+ GridCacheContext cctx = ctx.cacheContext(cacheId);
+
+ prepareMarshalCacheObjects(keys, cctx);
+
+ if (filter != null) {
+ boolean hasFilter = false;
+
+ for (CacheEntryPredicate p : filter) {
+ if (p != null) {
+ hasFilter = true;
+
+ p.prepareMarshal(cctx);
+ }
+ }
+
+ if (!hasFilter)
+ filter = null;
+ }
+
+ if (expiryPlc != null && expiryPlcBytes == null)
+ expiryPlcBytes = CU.marshal(cctx, new IgniteExternalizableExpiryPolicy(expiryPlc));
+
+ if (op == TRANSFORM) {
+ // force addition of deployment info for entry processors if P2P is enabled globally.
+ if (!addDepInfo && ctx.deploymentEnabled())
+ addDepInfo = true;
+
+ if (entryProcessorsBytes == null)
+ entryProcessorsBytes = marshalCollection(entryProcessors, cctx);
+
+ if (invokeArgsBytes == null)
+ invokeArgsBytes = marshalInvokeArguments(invokeArgs, cctx);
+ }
+ else
+ prepareMarshalCacheObjects(vals, cctx);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException {
+ super.finishUnmarshal(ctx, ldr);
+
+ GridCacheContext cctx = ctx.cacheContext(cacheId);
+
+ finishUnmarshalCacheObjects(keys, cctx, ldr);
+
+ if (op == TRANSFORM) {
+ if (entryProcessors == null)
+ entryProcessors = unmarshalCollection(entryProcessorsBytes, ctx, ldr);
+
+ if (invokeArgs == null)
+ invokeArgs = unmarshalInvokeArguments(invokeArgsBytes, ctx, ldr);
+ }
+ else
+ finishUnmarshalCacheObjects(vals, cctx, ldr);
+
+ if (filter != null) {
+ for (CacheEntryPredicate p : filter) {
+ if (p != null)
+ p.finishUnmarshal(cctx, ldr);
+ }
+ }
+
+ if (expiryPlcBytes != null && expiryPlc == null)
+ expiryPlc = ctx.marshaller().unmarshal(expiryPlcBytes, ldr);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean addDeploymentInfo() {
+ return addDepInfo;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ writer.setBuffer(buf);
+
+ if (!super.writeTo(buf, writer))
+ return false;
+
+ if (!writer.isHeaderWritten()) {
+ if (!writer.writeHeader(directType(), fieldsCount()))
+ return false;
+
+ writer.onHeaderWritten();
+ }
+
+ switch (writer.state()) {
+ case 3:
+ if (!writer.writeBoolean("clientReq", clientReq))
+ return false;
+
+ writer.incrementState();
+
+ case 4:
+ if (!writer.writeMessage("conflictExpireTimes", conflictExpireTimes))
+ return false;
+
+ writer.incrementState();
+
+ case 5:
+ if (!writer.writeMessage("conflictTtls", conflictTtls))
+ return false;
+
+ writer.incrementState();
+
+ case 6:
+ if (!writer.writeCollection("conflictVers", conflictVers, MessageCollectionItemType.MSG))
+ return false;
+
+ writer.incrementState();
+
+ case 7:
+ if (!writer.writeCollection("entryProcessorsBytes", entryProcessorsBytes, MessageCollectionItemType.BYTE_ARR))
+ return false;
+
+ writer.incrementState();
+
+ case 8:
+ if (!writer.writeByteArray("expiryPlcBytes", expiryPlcBytes))
+ return false;
+
+ writer.incrementState();
+
+ case 9:
+ if (!writer.writeBoolean("fastMap", fastMap))
+ return false;
+
+ writer.incrementState();
+
+ case 10:
+ if (!writer.writeObjectArray("filter", filter, MessageCollectionItemType.MSG))
+ return false;
+
+ writer.incrementState();
+
+ case 11:
+ if (!writer.writeMessage("futVer", futVer))
+ return false;
+
+ writer.incrementState();
+
+ case 12:
+ if (!writer.writeBoolean("hasPrimary", hasPrimary))
+ return false;
+
+ writer.incrementState();
+
+ case 13:
+ if (!writer.writeObjectArray("invokeArgsBytes", invokeArgsBytes, MessageCollectionItemType.BYTE_ARR))
+ return false;
+
+ writer.incrementState();
+
+ case 14:
+ if (!writer.writeBoolean("keepBinary", keepBinary))
+ return false;
+
+ writer.incrementState();
+
+ case 15:
+ if (!writer.writeCollection("keys", keys, MessageCollectionItemType.MSG))
+ return false;
+
+ writer.incrementState();
+
+ case 16:
+ if (!writer.writeByte("op", op != null ? (byte)op.ordinal() : -1))
+ return false;
+
+ writer.incrementState();
+
+ case 17:
+ if (!writer.writeBoolean("retval", retval))
+ return false;
+
+ writer.incrementState();
+
+ case 18:
+ if (!writer.writeBoolean("skipStore", skipStore))
+ return false;
+
+ writer.incrementState();
+
+ case 19:
+ if (!writer.writeUuid("subjId", subjId))
+ return false;
+
+ writer.incrementState();
+
+ case 20:
+ if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1))
+ return false;
+
+ writer.incrementState();
+
+ case 21:
+ if (!writer.writeInt("taskNameHash", taskNameHash))
+ return false;
+
+ writer.incrementState();
+
+ case 22:
+ if (!writer.writeBoolean("topLocked", topLocked))
+ return false;
+
+ writer.incrementState();
+
+ case 23:
+ if (!writer.writeMessage("topVer", topVer))
+ return false;
+
+ writer.incrementState();
+
+ case 24:
+ if (!writer.writeMessage("updateVer", updateVer))
+ return false;
+
+ writer.incrementState();
+
+ case 25:
+ if (!writer.writeCollection("vals", vals, MessageCollectionItemType.MSG))
+ return false;
+
+ writer.incrementState();
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ reader.setBuffer(buf);
+
+ if (!reader.beforeMessageRead())
+ return false;
+
+ if (!super.readFrom(buf, reader))
+ return false;
+
+ switch (reader.state()) {
+ case 3:
+ clientReq = reader.readBoolean("clientReq");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 4:
+ conflictExpireTimes = reader.readMessage("conflictExpireTimes");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 5:
+ conflictTtls = reader.readMessage("conflictTtls");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 6:
+ conflictVers = reader.readCollection("conflictVers", MessageCollectionItemType.MSG);
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 7:
+ entryProcessorsBytes = reader.readCollection("entryProcessorsBytes", MessageCollectionItemType.BYTE_ARR);
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 8:
+ expiryPlcBytes = reader.readByteArray("expiryPlcBytes");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 9:
+ fastMap = reader.readBoolean("fastMap");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 10:
+ filter = reader.readObjectArray("filter", MessageCollectionItemType.MSG, CacheEntryPredicate.class);
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 11:
+ futVer = reader.readMessage("futVer");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 12:
+ hasPrimary = reader.readBoolean("hasPrimary");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 13:
+ invokeArgsBytes = reader.readObjectArray("invokeArgsBytes", MessageCollectionItemType.BYTE_ARR, byte[].class);
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 14:
+ keepBinary = reader.readBoolean("keepBinary");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 15:
+ keys = reader.readCollection("keys", MessageCollectionItemType.MSG);
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 16:
+ byte opOrd;
+
+ opOrd = reader.readByte("op");
+
+ if (!reader.isLastRead())
+ return false;
+
+ op = GridCacheOperation.fromOrdinal(opOrd);
+
+ reader.incrementState();
+
+ case 17:
+ retval = reader.readBoolean("retval");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 18:
+ skipStore = reader.readBoolean("skipStore");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 19:
+ subjId = reader.readUuid("subjId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 20:
+ byte syncModeOrd;
+
+ syncModeOrd = reader.readByte("syncMode");
+
+ if (!reader.isLastRead())
+ return false;
+
+ syncMode = CacheWriteSynchronizationMode.fromOrdinal(syncModeOrd);
+
+ reader.incrementState();
+
+ case 21:
+ taskNameHash = reader.readInt("taskNameHash");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 22:
+ topLocked = reader.readBoolean("topLocked");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 23:
+ topVer = reader.readMessage("topVer");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 24:
+ updateVer = reader.readMessage("updateVer");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 25:
+ vals = reader.readCollection("vals", MessageCollectionItemType.MSG);
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ }
+
+ return reader.afterMessageRead(GridNearAtomicMultipleUpdateRequest.class);
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte directType() {
+ return 40;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte fieldsCount() {
+ return 26;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridNearAtomicMultipleUpdateRequest.class, this, "filter", Arrays.toString(filter),
+ "parent", super.toString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1491c1f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
index 1c30482..1e981af 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
@@ -57,7 +57,7 @@ import static org.apache.ignite.internal.processors.cache.GridCacheOperation.UPD
* Lite DHT cache update request sent from near node to primary node.
*/
public class GridNearAtomicSingleUpdateRequest extends GridCacheMessage
- implements GridNearAtomicUpdateRequestBase, GridCacheDeployable {
+ implements GridNearAtomicUpdateRequest, GridCacheDeployable {
/** */
private static final long serialVersionUID = 0L;
http://git-wip-us.apache.org/repos/asf/ignite/blob/1491c1f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index f894551..879f9f3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -348,7 +348,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> implem
* @param req Update request.
* @param res Update response.
*/
- private void updateNear(GridNearAtomicUpdateRequestBase req, GridNearAtomicUpdateResponse res) {
+ private void updateNear(GridNearAtomicUpdateRequest req, GridNearAtomicUpdateResponse res) {
assert nearEnabled;
if (res.remapKeys() != null || !req.hasPrimary())
@@ -450,11 +450,11 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> implem
* @param nodeId Node ID.
* @param req Request.
*/
- private void mapSingle(UUID nodeId, GridNearAtomicUpdateRequestBase req) {
+ private void mapSingle(UUID nodeId, GridNearAtomicUpdateRequest req) {
if (cctx.localNodeId().equals(nodeId)) {
cache.updateAllAsyncInternal(nodeId, req,
- new CI2<GridNearAtomicUpdateRequestBase, GridNearAtomicUpdateResponse>() {
- @Override public void apply(GridNearAtomicUpdateRequestBase req,
+ new CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse>() {
+ @Override public void apply(GridNearAtomicUpdateRequest req,
GridNearAtomicUpdateResponse res) {
onResult(res.nodeId(), res);
}
@@ -465,8 +465,8 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> implem
if (log.isDebugEnabled())
log.debug("Sending near atomic update request [nodeId=" + req.nodeId() + ", req=" + req + ']');
- if (req instanceof GridNearAtomicUpdateRequest)
- cctx.io().send(req.nodeId(), (GridNearAtomicUpdateRequest)req, cctx.ioPolicy());
+ if (req instanceof GridNearAtomicMultipleUpdateRequest)
+ cctx.io().send(req.nodeId(), (GridNearAtomicMultipleUpdateRequest)req, cctx.ioPolicy());
else {
assert req instanceof GridNearAtomicSingleUpdateRequest;
@@ -487,13 +487,13 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> implem
*
* @param mappings Mappings to send.
*/
- private void doUpdate(Map<UUID, GridNearAtomicUpdateRequest> mappings) {
+ private void doUpdate(Map<UUID, GridNearAtomicMultipleUpdateRequest> mappings) {
UUID locNodeId = cctx.localNodeId();
- GridNearAtomicUpdateRequest locUpdate = null;
+ GridNearAtomicMultipleUpdateRequest locUpdate = null;
// Send messages to remote nodes first, then run local update.
- for (GridNearAtomicUpdateRequest req : mappings.values()) {
+ for (GridNearAtomicMultipleUpdateRequest req : mappings.values()) {
if (locNodeId.equals(req.nodeId())) {
assert locUpdate == null : "Cannot have more than one local mapping [locUpdate=" + locUpdate +
", req=" + req + ']';
@@ -515,8 +515,8 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> implem
if (locUpdate != null) {
cache.updateAllAsyncInternal(cctx.localNodeId(), locUpdate,
- new CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse>() {
- @Override public void apply(GridNearAtomicUpdateRequest req, GridNearAtomicUpdateResponse res) {
+ new CI2<GridNearAtomicMultipleUpdateRequest, GridNearAtomicUpdateResponse>() {
+ @Override public void apply(GridNearAtomicMultipleUpdateRequest req, GridNearAtomicUpdateResponse res) {
onResult(res.nodeId(), res);
}
});
@@ -541,7 +541,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> implem
/** Mappings if operations is mapped to more than one node. */
@GridToStringInclude
- private Map<UUID, GridNearAtomicUpdateRequest> mappings;
+ private Map<UUID, GridNearAtomicMultipleUpdateRequest> mappings;
/** */
private int resCnt;
@@ -559,7 +559,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> implem
private Collection<KeyCacheObject> remapKeys;
/** Not null is operation is mapped to single node. */
- private GridNearAtomicUpdateRequestBase singleReq;
+ private GridNearAtomicUpdateRequest singleReq;
/** Operation result. */
private GridCacheReturn opRes;
@@ -578,7 +578,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> implem
GridNearAtomicUpdateResponse res = null;
synchronized (this) {
- GridNearAtomicUpdateRequestBase req;
+ GridNearAtomicUpdateRequest req;
if (singleReq != null)
req = singleReq.nodeId().equals(nodeId) ? singleReq : null;
@@ -611,7 +611,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> implem
*/
@SuppressWarnings({"unchecked", "ThrowableResultOfMethodCallIgnored"})
void onResult(UUID nodeId, GridNearAtomicUpdateResponse res, boolean nodeErr) {
- GridNearAtomicUpdateRequestBase req;
+ GridNearAtomicUpdateRequest req;
AffinityTopologyVersion remapTopVer = null;
@@ -741,7 +741,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> implem
if (rcvAll && nearEnabled) {
if (mappings != null) {
- for (GridNearAtomicUpdateRequest req0 : mappings.values()) {
+ for (GridNearAtomicMultipleUpdateRequest req0 : mappings.values()) {
GridNearAtomicUpdateResponse res0 = req0.response();
assert res0 != null : req0;
@@ -815,7 +815,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> implem
* @param req Request.
* @param e Error.
*/
- void onSendError(GridNearAtomicUpdateRequestBase req, IgniteCheckedException e) {
+ void onSendError(GridNearAtomicUpdateRequest req, IgniteCheckedException e) {
synchronized (this) {
GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(cctx.cacheId(),
req.nodeId(),
@@ -843,8 +843,8 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> implem
}
Exception err = null;
- GridNearAtomicUpdateRequestBase singleReq0 = null;
- Map<UUID, GridNearAtomicUpdateRequest> mappings0 = null;
+ GridNearAtomicUpdateRequest singleReq0 = null;
+ Map<UUID, GridNearAtomicMultipleUpdateRequest> mappings0 = null;
int size = keys.size();
@@ -873,7 +873,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> implem
singleReq0 = mapSingleUpdate(topVer, topNodes, futVer, updVer);
}
else {
- Map<UUID, GridNearAtomicUpdateRequest> pendingMappings = mapUpdate(topNodes,
+ Map<UUID, GridNearAtomicMultipleUpdateRequest> pendingMappings = mapUpdate(topNodes,
topVer,
futVer,
updVer,
@@ -885,7 +885,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> implem
if (syncMode == PRIMARY_SYNC) {
mappings0 = U.newHashMap(pendingMappings.size());
- for (GridNearAtomicUpdateRequest req : pendingMappings.values()) {
+ for (GridNearAtomicMultipleUpdateRequest req : pendingMappings.values()) {
if (req.hasPrimary())
mappings0.put(req.nodeId(), req);
}
@@ -996,7 +996,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> implem
* @throws Exception If failed.
*/
@SuppressWarnings("ConstantConditions")
- private Map<UUID, GridNearAtomicUpdateRequest> mapUpdate(Collection<ClusterNode> topNodes,
+ private Map<UUID, GridNearAtomicMultipleUpdateRequest> mapUpdate(Collection<ClusterNode> topNodes,
AffinityTopologyVersion topVer,
GridCacheVersion futVer,
@Nullable GridCacheVersion updVer,
@@ -1016,7 +1016,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> implem
if (conflictRmvVals != null)
conflictRmvValsIt = conflictRmvVals.iterator();
- Map<UUID, GridNearAtomicUpdateRequest> pendingMappings = U.newHashMap(topNodes.size());
+ Map<UUID, GridNearAtomicMultipleUpdateRequest> pendingMappings = U.newHashMap(topNodes.size());
// Create mappings first, then send messages.
for (Object key : keys) {
@@ -1084,10 +1084,10 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> implem
UUID nodeId = affNode.id();
- GridNearAtomicUpdateRequest mapped = pendingMappings.get(nodeId);
+ GridNearAtomicMultipleUpdateRequest mapped = pendingMappings.get(nodeId);
if (mapped == null) {
- mapped = new GridNearAtomicUpdateRequest(
+ mapped = new GridNearAtomicMultipleUpdateRequest(
cctx.cacheId(),
nodeId,
futVer,
@@ -1128,7 +1128,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> implem
* @return Request.
* @throws Exception If failed.
*/
- private GridNearAtomicUpdateRequestBase mapSingleUpdate(AffinityTopologyVersion topVer,
+ private GridNearAtomicUpdateRequest mapSingleUpdate(AffinityTopologyVersion topVer,
Collection<ClusterNode> topNodes, GridCacheVersion futVer, @Nullable GridCacheVersion updVer)
throws Exception {
Object key = F.first(keys);
@@ -1226,7 +1226,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> implem
cctx.deploymentEnabled());
}
else {
- GridNearAtomicUpdateRequest req = new GridNearAtomicUpdateRequest(
+ GridNearAtomicMultipleUpdateRequest req = new GridNearAtomicMultipleUpdateRequest(
cctx.cacheId(),
primary.id(),
futVer,