You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/02/25 13:31:41 UTC
[42/51] [abbrv] ignite git commit: Review.
http://git-wip-us.apache.org/repos/asf/ignite/blob/28c20c30/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
new file mode 100644
index 0000000..60193ba
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
@@ -0,0 +1,989 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
+
+import java.io.Externalizable;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+import javax.cache.expiry.ExpiryPolicy;
+import javax.cache.processor.EntryProcessor;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.internal.GridDirectCollection;
+import org.apache.ignite.internal.GridDirectTransient;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
+import org.apache.ignite.internal.processors.cache.GridCacheMessage;
+import org.apache.ignite.internal.processors.cache.GridCacheOperation;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.distributed.IgniteExternalizableExpiryPolicy;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.GridLongList;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.internal.processors.cache.GridCacheOperation.DELETE;
+import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
+import static org.apache.ignite.internal.processors.cache.GridCacheOperation.UPDATE;
+
+/**
+ * Lite DHT cache update request sent from near node to primary node.
+ */
+public class GridNearAtomicUpdateRequest extends GridCacheMessage
+ implements GridNearAtomicUpdateRequestInterface, GridCacheDeployable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Message index. */
+ public static final int CACHE_MSG_IDX = nextIndexId();
+
+ /** Target node ID. */
+ @GridDirectTransient
+ private UUID nodeId;
+
+ /** Future version. */
+ private GridCacheVersion futVer;
+
+ /** Fast map flag. */
+ private boolean fastMap;
+
+ /** Update version. Set to non-null if fastMap is {@code true}. */
+ private GridCacheVersion updateVer;
+
+ /** Topology version. */
+ private AffinityTopologyVersion topVer;
+
+ /** Topology locked flag. Set if atomic update is performed inside TX or explicit lock. */
+ private boolean topLocked;
+
+ /** Write synchronization mode. */
+ private CacheWriteSynchronizationMode syncMode;
+
+ /** Update operation. */
+ private GridCacheOperation op;
+
+ /** Keys to update. */
+ @GridToStringInclude
+ @GridDirectCollection(KeyCacheObject.class)
+ private List<KeyCacheObject> keys;
+
+ /** Values to update. */
+ @GridDirectCollection(CacheObject.class)
+ private List<CacheObject> vals;
+
+ /** Entry processors. */
+ @GridDirectTransient
+ private List<EntryProcessor<Object, Object, Object>> entryProcessors;
+
+ /** Entry processors bytes. */
+ @GridDirectCollection(byte[].class)
+ private List<byte[]> entryProcessorsBytes;
+
+ /** Optional arguments for entry processor. */
+ @GridDirectTransient
+ private Object[] invokeArgs;
+
+ /** Entry processor arguments bytes. */
+ private byte[][] invokeArgsBytes;
+
+ /** Conflict versions. */
+ @GridDirectCollection(GridCacheVersion.class)
+ private List<GridCacheVersion> conflictVers;
+
+ /** Conflict TTLs. */
+ private GridLongList conflictTtls;
+
+ /** Conflict expire times. */
+ private GridLongList conflictExpireTimes;
+
+ /** Return value flag. */
+ private boolean retval;
+
+ /** Expiry policy. */
+ @GridDirectTransient
+ private ExpiryPolicy expiryPlc;
+
+ /** Expiry policy bytes. */
+ private byte[] expiryPlcBytes;
+
+ /** Filter. */
+ private CacheEntryPredicate[] filter;
+
+ /** Flag indicating whether request contains primary keys. */
+ private boolean hasPrimary;
+
+ /** Subject ID. */
+ private UUID subjId;
+
+ /** Task name hash. */
+ private int taskNameHash;
+
+ /** Skip write-through to a persistent storage. */
+ private boolean skipStore;
+
+ /** */
+ private boolean clientReq;
+
+ /** Keep binary flag. */
+ private boolean keepBinary;
+
+ /** */
+ @GridDirectTransient
+ private GridNearAtomicUpdateResponseInterface res;
+
+ /** Maximum possible size of inner collections. */
+ @GridDirectTransient
+ private int initSize;
+
+ /**
+ * Empty constructor required by {@link Externalizable}.
+ */
+ public GridNearAtomicUpdateRequest() {
+ // No-op.
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param cacheId Cache ID.
+ * @param nodeId Node ID.
+ * @param futVer Future version.
+ * @param fastMap Fast map scheme flag.
+ * @param updateVer Update version set if fast map is performed.
+ * @param topVer Topology version.
+ * @param topLocked Topology locked flag.
+ * @param syncMode Synchronization mode.
+ * @param op Cache update operation.
+ * @param retval Return value required flag.
+ * @param expiryPlc Expiry policy.
+ * @param invokeArgs Optional arguments for entry processor.
+ * @param filter Optional filter for atomic check.
+ * @param subjId Subject ID.
+ * @param taskNameHash Task name hash code.
+ * @param skipStore Skip write-through to a persistent storage.
+ * @param keepBinary Keep binary flag.
+ * @param clientReq Client node request flag.
+ * @param addDepInfo Deployment info flag.
+ * @param maxEntryCnt Maximum entries count.
+ */
+ public GridNearAtomicUpdateRequest(
+ int cacheId,
+ UUID nodeId,
+ GridCacheVersion futVer,
+ boolean fastMap,
+ @Nullable GridCacheVersion updateVer,
+ @NotNull AffinityTopologyVersion topVer,
+ boolean topLocked,
+ CacheWriteSynchronizationMode syncMode,
+ GridCacheOperation op,
+ boolean retval,
+ @Nullable ExpiryPolicy expiryPlc,
+ @Nullable Object[] invokeArgs,
+ @Nullable CacheEntryPredicate[] filter,
+ @Nullable UUID subjId,
+ int taskNameHash,
+ boolean skipStore,
+ boolean keepBinary,
+ boolean clientReq,
+ boolean addDepInfo,
+ int maxEntryCnt
+ ) {
+ assert futVer != null;
+
+ this.cacheId = cacheId;
+ this.nodeId = nodeId;
+ this.futVer = futVer;
+ this.fastMap = fastMap;
+ this.updateVer = updateVer;
+
+ this.topVer = topVer;
+ this.topLocked = topLocked;
+ this.syncMode = syncMode;
+ this.op = op;
+ this.retval = retval;
+ this.expiryPlc = expiryPlc;
+ this.invokeArgs = invokeArgs;
+ this.filter = filter;
+ this.subjId = subjId;
+ this.taskNameHash = taskNameHash;
+ this.skipStore = skipStore;
+ this.keepBinary = keepBinary;
+ this.clientReq = clientReq;
+ this.addDepInfo = addDepInfo;
+
+ // By default ArrayList expands to array of 10 elements on first add. We cannot guess how many entries
+ // will be added to request because of unknown affinity distribution. However, we DO KNOW how many keys
+ // participate in request. As such, we know upper bound of all collections in request. If this bound is lower
+ // than 10, we use it.
+ initSize = Math.min(maxEntryCnt, 10);
+
+ keys = new ArrayList<>(initSize);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int lookupIndex() {
+ return CACHE_MSG_IDX;
+ }
+
+ /** {@inheritDoc} */
+ @Override public UUID nodeId() {
+ return nodeId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void nodeId(UUID nodeId) {
+ this.nodeId = nodeId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public UUID subjectId() {
+ return subjId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int taskNameHash() {
+ return taskNameHash;
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridCacheVersion futureVersion() {
+ return futVer;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean fastMap() {
+ return fastMap;
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridCacheVersion updateVersion() {
+ return updateVer;
+ }
+
+ /** {@inheritDoc} */
+ @Override public AffinityTopologyVersion topologyVersion() {
+ return topVer;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean topologyLocked() {
+ return topLocked;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean clientRequest() {
+ return clientReq;
+ }
+
+ /** {@inheritDoc} */
+ @Override public CacheWriteSynchronizationMode writeSynchronizationMode() {
+ return syncMode;
+ }
+
+ /** {@inheritDoc} */
+ @Override public ExpiryPolicy expiry() {
+ return expiryPlc;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean returnValue() {
+ return retval;
+ }
+
+ /** {@inheritDoc} */
+ @Nullable public CacheEntryPredicate[] filter() {
+ return filter;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean skipStore() {
+ return skipStore;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean keepBinary() {
+ return keepBinary;
+ }
+
+ /**
+ * @param key Key to add.
+ * @param val Optional update value.
+ * @param conflictTtl Conflict TTL (optional).
+ * @param conflictExpireTime Conflict expire time (optional).
+ * @param conflictVer Conflict version (optional).
+ * @param primary If given key is primary on this mapping.
+ */
+ @SuppressWarnings("unchecked")
+ public void addUpdateEntry(KeyCacheObject key,
+ @Nullable Object val,
+ long conflictTtl,
+ long conflictExpireTime,
+ @Nullable GridCacheVersion conflictVer,
+ boolean primary) {
+ EntryProcessor<Object, Object, Object> entryProcessor = null;
+
+ if (op == TRANSFORM) {
+ assert val instanceof EntryProcessor : val;
+
+ entryProcessor = (EntryProcessor<Object, Object, Object>)val;
+ }
+
+ assert val != null || op == DELETE;
+
+ keys.add(key);
+
+ if (entryProcessor != null) {
+ if (entryProcessors == null)
+ entryProcessors = new ArrayList<>(initSize);
+
+ entryProcessors.add(entryProcessor);
+ }
+ else if (val != null) {
+ assert val instanceof CacheObject : val;
+
+ if (vals == null)
+ vals = new ArrayList<>(initSize);
+
+ vals.add((CacheObject)val);
+ }
+
+ hasPrimary |= primary;
+
+ // In case there is no conflict, do not create the list.
+ if (conflictVer != null) {
+ if (conflictVers == null) {
+ conflictVers = new ArrayList<>(initSize);
+
+ for (int i = 0; i < keys.size() - 1; i++)
+ conflictVers.add(null);
+ }
+
+ conflictVers.add(conflictVer);
+ }
+ else if (conflictVers != null)
+ conflictVers.add(null);
+
+ if (conflictTtl >= 0) {
+ if (conflictTtls == null) {
+ conflictTtls = new GridLongList(keys.size());
+
+ for (int i = 0; i < keys.size() - 1; i++)
+ conflictTtls.add(CU.TTL_NOT_CHANGED);
+ }
+
+ conflictTtls.add(conflictTtl);
+ }
+
+ if (conflictExpireTime >= 0) {
+ if (conflictExpireTimes == null) {
+ conflictExpireTimes = new GridLongList(keys.size());
+
+ for (int i = 0; i < keys.size() - 1; i++)
+ conflictExpireTimes.add(CU.EXPIRE_TIME_CALCULATE);
+ }
+
+ conflictExpireTimes.add(conflictExpireTime);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public List<KeyCacheObject> keys() {
+ return keys;
+ }
+
+ /** {@inheritDoc} */
+ @Override public List<?> values() {
+ return op == TRANSFORM ? entryProcessors : vals;
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridCacheOperation operation() {
+ return op;
+ }
+
+ /** {@inheritDoc} */
+ @Override @Nullable public Object[] invokeArguments() {
+ return invokeArgs;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
+ @Override public CacheObject value(int idx) {
+ assert op == UPDATE : op;
+
+ return vals.get(idx);
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
+ @Override public EntryProcessor<Object, Object, Object> entryProcessor(int idx) {
+ assert op == TRANSFORM : op;
+
+ return entryProcessors.get(idx);
+ }
+
+ /** {@inheritDoc} */
+ @Override public CacheObject writeValue(int idx) {
+ if (vals != null)
+ return vals.get(idx);
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override @Nullable public List<GridCacheVersion> conflictVersions() {
+ return conflictVers;
+ }
+
+ /** {@inheritDoc} */
+ @Override @Nullable public GridCacheVersion conflictVersion(int idx) {
+ if (conflictVers != null) {
+ assert idx >= 0 && idx < conflictVers.size();
+
+ return conflictVers.get(idx);
+ }
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long conflictTtl(int idx) {
+ if (conflictTtls != null) {
+ assert idx >= 0 && idx < conflictTtls.size();
+
+ return conflictTtls.get(idx);
+ }
+
+ return CU.TTL_NOT_CHANGED;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long conflictExpireTime(int idx) {
+ if (conflictExpireTimes != null) {
+ assert idx >= 0 && idx < conflictExpireTimes.size();
+
+ return conflictExpireTimes.get(idx);
+ }
+
+ return CU.EXPIRE_TIME_CALCULATE;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean hasPrimary() {
+ return hasPrimary;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean onResponse(GridNearAtomicUpdateResponseInterface res) {
+ if (this.res == null) {
+ this.res = res;
+
+ return true;
+ }
+
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override @Nullable public GridNearAtomicUpdateResponseInterface response() {
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
+ super.prepareMarshal(ctx);
+
+ GridCacheContext cctx = ctx.cacheContext(cacheId);
+
+ prepareMarshalCacheObjects(keys, cctx);
+
+ if (filter != null) {
+ boolean hasFilter = false;
+
+ for (CacheEntryPredicate p : filter) {
+ if (p != null) {
+ hasFilter = true;
+
+ p.prepareMarshal(cctx);
+ }
+ }
+
+ if (!hasFilter)
+ filter = null;
+ }
+
+ if (expiryPlc != null && expiryPlcBytes == null)
+ expiryPlcBytes = CU.marshal(cctx, new IgniteExternalizableExpiryPolicy(expiryPlc));
+
+ if (op == TRANSFORM) {
+ // force addition of deployment info for entry processors if P2P is enabled globally.
+ if (!addDepInfo && ctx.deploymentEnabled())
+ addDepInfo = true;
+
+ if (entryProcessorsBytes == null)
+ entryProcessorsBytes = marshalCollection(entryProcessors, cctx);
+
+ if (invokeArgsBytes == null)
+ invokeArgsBytes = marshalInvokeArguments(invokeArgs, cctx);
+ }
+ else
+ prepareMarshalCacheObjects(vals, cctx);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException {
+ super.finishUnmarshal(ctx, ldr);
+
+ GridCacheContext cctx = ctx.cacheContext(cacheId);
+
+ finishUnmarshalCacheObjects(keys, cctx, ldr);
+
+ if (op == TRANSFORM) {
+ if (entryProcessors == null)
+ entryProcessors = unmarshalCollection(entryProcessorsBytes, ctx, ldr);
+
+ if (invokeArgs == null)
+ invokeArgs = unmarshalInvokeArguments(invokeArgsBytes, ctx, ldr);
+ }
+ else
+ finishUnmarshalCacheObjects(vals, cctx, ldr);
+
+ if (filter != null) {
+ for (CacheEntryPredicate p : filter) {
+ if (p != null)
+ p.finishUnmarshal(cctx, ldr);
+ }
+ }
+
+ if (expiryPlcBytes != null && expiryPlc == null)
+ expiryPlc = ctx.marshaller().unmarshal(expiryPlcBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean addDeploymentInfo() {
+ return addDepInfo;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ writer.setBuffer(buf);
+
+ if (!super.writeTo(buf, writer))
+ return false;
+
+ if (!writer.isHeaderWritten()) {
+ if (!writer.writeHeader(directType(), fieldsCount()))
+ return false;
+
+ writer.onHeaderWritten();
+ }
+
+ switch (writer.state()) {
+ case 3:
+ if (!writer.writeBoolean("clientReq", clientReq))
+ return false;
+
+ writer.incrementState();
+
+ case 4:
+ if (!writer.writeMessage("conflictExpireTimes", conflictExpireTimes))
+ return false;
+
+ writer.incrementState();
+
+ case 5:
+ if (!writer.writeMessage("conflictTtls", conflictTtls))
+ return false;
+
+ writer.incrementState();
+
+ case 6:
+ if (!writer.writeCollection("conflictVers", conflictVers, MessageCollectionItemType.MSG))
+ return false;
+
+ writer.incrementState();
+
+ case 7:
+ if (!writer.writeCollection("entryProcessorsBytes", entryProcessorsBytes, MessageCollectionItemType.BYTE_ARR))
+ return false;
+
+ writer.incrementState();
+
+ case 8:
+ if (!writer.writeByteArray("expiryPlcBytes", expiryPlcBytes))
+ return false;
+
+ writer.incrementState();
+
+ case 9:
+ if (!writer.writeBoolean("fastMap", fastMap))
+ return false;
+
+ writer.incrementState();
+
+ case 10:
+ if (!writer.writeObjectArray("filter", filter, MessageCollectionItemType.MSG))
+ return false;
+
+ writer.incrementState();
+
+ case 11:
+ if (!writer.writeMessage("futVer", futVer))
+ return false;
+
+ writer.incrementState();
+
+ case 12:
+ if (!writer.writeBoolean("hasPrimary", hasPrimary))
+ return false;
+
+ writer.incrementState();
+
+ case 13:
+ if (!writer.writeObjectArray("invokeArgsBytes", invokeArgsBytes, MessageCollectionItemType.BYTE_ARR))
+ return false;
+
+ writer.incrementState();
+
+ case 14:
+ if (!writer.writeBoolean("keepBinary", keepBinary))
+ return false;
+
+ writer.incrementState();
+
+ case 15:
+ if (!writer.writeCollection("keys", keys, MessageCollectionItemType.MSG))
+ return false;
+
+ writer.incrementState();
+
+ case 16:
+ if (!writer.writeByte("op", op != null ? (byte)op.ordinal() : -1))
+ return false;
+
+ writer.incrementState();
+
+ case 17:
+ if (!writer.writeBoolean("retval", retval))
+ return false;
+
+ writer.incrementState();
+
+ case 18:
+ if (!writer.writeBoolean("skipStore", skipStore))
+ return false;
+
+ writer.incrementState();
+
+ case 19:
+ if (!writer.writeUuid("subjId", subjId))
+ return false;
+
+ writer.incrementState();
+
+ case 20:
+ if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1))
+ return false;
+
+ writer.incrementState();
+
+ case 21:
+ if (!writer.writeInt("taskNameHash", taskNameHash))
+ return false;
+
+ writer.incrementState();
+
+ case 22:
+ if (!writer.writeBoolean("topLocked", topLocked))
+ return false;
+
+ writer.incrementState();
+
+ case 23:
+ if (!writer.writeMessage("topVer", topVer))
+ return false;
+
+ writer.incrementState();
+
+ case 24:
+ if (!writer.writeMessage("updateVer", updateVer))
+ return false;
+
+ writer.incrementState();
+
+ case 25:
+ if (!writer.writeCollection("vals", vals, MessageCollectionItemType.MSG))
+ return false;
+
+ writer.incrementState();
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ reader.setBuffer(buf);
+
+ if (!reader.beforeMessageRead())
+ return false;
+
+ if (!super.readFrom(buf, reader))
+ return false;
+
+ switch (reader.state()) {
+ case 3:
+ clientReq = reader.readBoolean("clientReq");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 4:
+ conflictExpireTimes = reader.readMessage("conflictExpireTimes");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 5:
+ conflictTtls = reader.readMessage("conflictTtls");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 6:
+ conflictVers = reader.readCollection("conflictVers", MessageCollectionItemType.MSG);
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 7:
+ entryProcessorsBytes = reader.readCollection("entryProcessorsBytes", MessageCollectionItemType.BYTE_ARR);
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 8:
+ expiryPlcBytes = reader.readByteArray("expiryPlcBytes");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 9:
+ fastMap = reader.readBoolean("fastMap");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 10:
+ filter = reader.readObjectArray("filter", MessageCollectionItemType.MSG, CacheEntryPredicate.class);
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 11:
+ futVer = reader.readMessage("futVer");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 12:
+ hasPrimary = reader.readBoolean("hasPrimary");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 13:
+ invokeArgsBytes = reader.readObjectArray("invokeArgsBytes", MessageCollectionItemType.BYTE_ARR, byte[].class);
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 14:
+ keepBinary = reader.readBoolean("keepBinary");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 15:
+ keys = reader.readCollection("keys", MessageCollectionItemType.MSG);
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 16:
+ byte opOrd;
+
+ opOrd = reader.readByte("op");
+
+ if (!reader.isLastRead())
+ return false;
+
+ op = GridCacheOperation.fromOrdinal(opOrd);
+
+ reader.incrementState();
+
+ case 17:
+ retval = reader.readBoolean("retval");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 18:
+ skipStore = reader.readBoolean("skipStore");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 19:
+ subjId = reader.readUuid("subjId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 20:
+ byte syncModeOrd;
+
+ syncModeOrd = reader.readByte("syncMode");
+
+ if (!reader.isLastRead())
+ return false;
+
+ syncMode = CacheWriteSynchronizationMode.fromOrdinal(syncModeOrd);
+
+ reader.incrementState();
+
+ case 21:
+ taskNameHash = reader.readInt("taskNameHash");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 22:
+ topLocked = reader.readBoolean("topLocked");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 23:
+ topVer = reader.readMessage("topVer");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 24:
+ updateVer = reader.readMessage("updateVer");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 25:
+ vals = reader.readCollection("vals", MessageCollectionItemType.MSG);
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ }
+
+ return reader.afterMessageRead(GridNearAtomicUpdateRequest.class);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void cleanup(boolean clearKeys) {
+ vals = null;
+ entryProcessors = null;
+ entryProcessorsBytes = null;
+ invokeArgs = null;
+ invokeArgsBytes = null;
+
+ if (clearKeys)
+ keys = null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte directType() {
+ return 40;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte fieldsCount() {
+ return 26;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridNearAtomicUpdateRequest.class, this, "filter", Arrays.toString(filter),
+ "parent", super.toString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/28c20c30/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
new file mode 100644
index 0000000..2295854
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
@@ -0,0 +1,642 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
+
+import java.io.Externalizable;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.GridDirectCollection;
+import org.apache.ignite.internal.GridDirectTransient;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
+import org.apache.ignite.internal.processors.cache.GridCacheMessage;
+import org.apache.ignite.internal.processors.cache.GridCacheReturn;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.GridLongList;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * DHT atomic cache near update response.
+ */
+public class GridNearAtomicUpdateResponse extends GridCacheMessage implements GridCacheDeployable, GridNearAtomicUpdateResponseInterface {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Cache message index. */
+ public static final int CACHE_MSG_IDX = nextIndexId();
+
+ /** Node ID this reply should be sent to. */
+ @GridDirectTransient
+ private UUID nodeId;
+
+ /** Future version. */
+ private GridCacheVersion futVer;
+
+ /** Update error. */
+ @GridDirectTransient
+ private volatile IgniteCheckedException err;
+
+ /** Serialized error. */
+ private byte[] errBytes;
+
+ /** Return value. */
+ @GridToStringInclude
+ private GridCacheReturn ret;
+
+ /** Failed keys. */
+ @GridToStringInclude
+ @GridDirectCollection(KeyCacheObject.class)
+ private volatile Collection<KeyCacheObject> failedKeys;
+
+ /** Keys that should be remapped. */
+ @GridToStringInclude
+ @GridDirectCollection(KeyCacheObject.class)
+ private List<KeyCacheObject> remapKeys;
+
+ /** Indexes of keys for which values were generated on primary node (used if originating node has near cache). */
+ @GridDirectCollection(int.class)
+ private List<Integer> nearValsIdxs;
+
+ /** Indexes of keys for which update was skipped (used if originating node has near cache). */
+ @GridDirectCollection(int.class)
+ private List<Integer> nearSkipIdxs;
+
+ /** Values generated on primary node which should be put to originating node's near cache. */
+ @GridToStringInclude
+ @GridDirectCollection(CacheObject.class)
+ private List<CacheObject> nearVals;
+
+ /** Version generated on primary node to be used for originating node's near cache update. */
+ private GridCacheVersion nearVer;
+
+ /** Near TTLs. */
+ private GridLongList nearTtls;
+
+ /** Near expire times. */
+ private GridLongList nearExpireTimes;
+
+ /**
+ * Empty constructor required by {@link Externalizable}.
+ */
+ public GridNearAtomicUpdateResponse() {
+ // No-op.
+ }
+
+ /**
+ * @param cacheId Cache ID.
+ * @param nodeId Node ID this reply should be sent to.
+ * @param futVer Future version.
+ * @param addDepInfo Deployment info flag.
+ */
+ public GridNearAtomicUpdateResponse(int cacheId, UUID nodeId, GridCacheVersion futVer, boolean addDepInfo) {
+ assert futVer != null;
+
+ this.cacheId = cacheId;
+ this.nodeId = nodeId;
+ this.futVer = futVer;
+ this.addDepInfo = addDepInfo;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int lookupIndex() {
+ return CACHE_MSG_IDX;
+ }
+
+ /**
+ * @return Node ID this response should be sent to.
+ */
+ @Override public UUID nodeId() {
+ return nodeId;
+ }
+
+ /**
+ * @param nodeId Node ID.
+ */
+ @Override public void nodeId(UUID nodeId) {
+ this.nodeId = nodeId;
+ }
+
+ /**
+ * @return Future version.
+ */
+ @Override public GridCacheVersion futureVersion() {
+ return futVer;
+ }
+
+ /**
+ * Sets update error.
+ *
+ * @param err Error.
+ */
+ @Override public void error(IgniteCheckedException err) {
+ this.err = err;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteCheckedException error() {
+ return err;
+ }
+
+ /**
+ * @return Collection of failed keys.
+ */
+ @Override public Collection<KeyCacheObject> failedKeys() {
+ return failedKeys;
+ }
+
+ /**
+ * @return Return value.
+ */
+ @Override public GridCacheReturn returnValue() {
+ return ret;
+ }
+
+ /**
+ * @param ret Return value.
+ */
+ @Override @SuppressWarnings("unchecked")
+ public void returnValue(GridCacheReturn ret) {
+ this.ret = ret;
+ }
+
+ /**
+ * @param remapKeys Remap keys.
+ */
+ @Override public void remapKeys(List<KeyCacheObject> remapKeys) {
+ this.remapKeys = remapKeys;
+ }
+
+ /**
+ * @return Remap keys.
+ */
+ @Override public Collection<KeyCacheObject> remapKeys() {
+ return remapKeys;
+ }
+
+ /**
+ * Adds value to be put in near cache on originating node.
+ *
+ * @param keyIdx Key index.
+ * @param val Value.
+ * @param ttl TTL for near cache update.
+ * @param expireTime Expire time for near cache update.
+ */
+ @Override public void addNearValue(int keyIdx,
+ @Nullable CacheObject val,
+ long ttl,
+ long expireTime) {
+ if (nearValsIdxs == null) {
+ nearValsIdxs = new ArrayList<>();
+ nearVals = new ArrayList<>();
+ }
+
+ addNearTtl(keyIdx, ttl, expireTime);
+
+ nearValsIdxs.add(keyIdx);
+ nearVals.add(val);
+ }
+
+ /**
+ * @param keyIdx Key index.
+ * @param ttl TTL for near cache update.
+ * @param expireTime Expire time for near cache update.
+ */
+ @Override @SuppressWarnings("ForLoopReplaceableByForEach")
+ public void addNearTtl(int keyIdx, long ttl, long expireTime) {
+ if (ttl >= 0) {
+ if (nearTtls == null) {
+ nearTtls = new GridLongList(16);
+
+ for (int i = 0; i < keyIdx; i++)
+ nearTtls.add(-1L);
+ }
+ }
+
+ if (nearTtls != null)
+ nearTtls.add(ttl);
+
+ if (expireTime >= 0) {
+ if (nearExpireTimes == null) {
+ nearExpireTimes = new GridLongList(16);
+
+ for (int i = 0; i < keyIdx; i++)
+ nearExpireTimes.add(-1);
+ }
+ }
+
+ if (nearExpireTimes != null)
+ nearExpireTimes.add(expireTime);
+ }
+
+ /**
+ * @param idx Index.
+ * @return Expire time for near cache update.
+ */
+ @Override public long nearExpireTime(int idx) {
+ if (nearExpireTimes != null) {
+ assert idx >= 0 && idx < nearExpireTimes.size();
+
+ return nearExpireTimes.get(idx);
+ }
+
+ return -1L;
+ }
+
+ /**
+ * @param idx Index.
+ * @return TTL for near cache update.
+ */
+ @Override public long nearTtl(int idx) {
+ if (nearTtls != null) {
+ assert idx >= 0 && idx < nearTtls.size();
+
+ return nearTtls.get(idx);
+ }
+
+ return -1L;
+ }
+
+ /**
+ * @param nearVer Version generated on primary node to be used for originating node's near cache update.
+ */
+ @Override public void nearVersion(GridCacheVersion nearVer) {
+ this.nearVer = nearVer;
+ }
+
+ /**
+ * @return Version generated on primary node to be used for originating node's near cache update.
+ */
+ @Override public GridCacheVersion nearVersion() {
+ return nearVer;
+ }
+
+ /**
+ * @param keyIdx Index of key for which update was skipped
+ */
+ @Override public void addSkippedIndex(int keyIdx) {
+ if (nearSkipIdxs == null)
+ nearSkipIdxs = new ArrayList<>();
+
+ nearSkipIdxs.add(keyIdx);
+
+ addNearTtl(keyIdx, -1L, -1L);
+ }
+
+ /**
+ * @return Indexes of keys for which update was skipped
+ */
+ @Override @Nullable public List<Integer> skippedIndexes() {
+ return nearSkipIdxs;
+ }
+
+ /**
+ * @return Indexes of keys for which values were generated on primary node.
+ */
+ @Override @Nullable public List<Integer> nearValuesIndexes() {
+ return nearValsIdxs;
+ }
+
+ /**
+ * @param idx Index.
+ * @return Value generated on primary node which should be put to originating node's near cache.
+ */
+ @Override @Nullable public CacheObject nearValue(int idx) {
+ return nearVals.get(idx);
+ }
+
+ /**
+ * Adds key to collection of failed keys.
+ *
+ * @param key Key to add.
+ * @param e Error cause.
+ */
+ @Override public synchronized void addFailedKey(KeyCacheObject key, Throwable e) {
+ if (failedKeys == null)
+ failedKeys = new ConcurrentLinkedQueue<>();
+
+ failedKeys.add(key);
+
+ if (err == null)
+ err = new IgniteCheckedException("Failed to update keys on primary node.");
+
+ err.addSuppressed(e);
+ }
+
+ /**
+ * Adds keys to collection of failed keys.
+ *
+ * @param keys Key to add.
+ * @param e Error cause.
+ */
+ @Override public synchronized void addFailedKeys(Collection<KeyCacheObject> keys, Throwable e) {
+ if (keys != null) {
+ if (failedKeys == null)
+ failedKeys = new ArrayList<>(keys.size());
+
+ failedKeys.addAll(keys);
+ }
+
+ if (err == null)
+ err = new IgniteCheckedException("Failed to update keys on primary node.");
+
+ err.addSuppressed(e);
+ }
+
+ /**
+ * Adds keys to collection of failed keys.
+ *
+ * @param keys Key to add.
+ * @param e Error cause.
+ * @param ctx Context.
+ */
+ @Override public synchronized void addFailedKeys(Collection<KeyCacheObject> keys, Throwable e,
+ GridCacheContext ctx) {
+ if (failedKeys == null)
+ failedKeys = new ArrayList<>(keys.size());
+
+ failedKeys.addAll(keys);
+
+ if (err == null)
+ err = new IgniteCheckedException("Failed to update keys on primary node.");
+
+ err.addSuppressed(e);
+ }
+
+ /** {@inheritDoc}
+ * @param ctx*/
+ @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
+ super.prepareMarshal(ctx);
+
+ if (err != null && errBytes == null)
+ errBytes = ctx.marshaller().marshal(err);
+
+ GridCacheContext cctx = ctx.cacheContext(cacheId);
+
+ prepareMarshalCacheObjects(failedKeys, cctx);
+
+ prepareMarshalCacheObjects(remapKeys, cctx);
+
+ prepareMarshalCacheObjects(nearVals, cctx);
+
+ if (ret != null)
+ ret.prepareMarshal(cctx);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException {
+ super.finishUnmarshal(ctx, ldr);
+
+ if (errBytes != null && err == null)
+ err = ctx.marshaller().unmarshal(errBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
+
+ GridCacheContext cctx = ctx.cacheContext(cacheId);
+
+ finishUnmarshalCacheObjects(failedKeys, cctx, ldr);
+
+ finishUnmarshalCacheObjects(remapKeys, cctx, ldr);
+
+ finishUnmarshalCacheObjects(nearVals, cctx, ldr);
+
+ if (ret != null)
+ ret.finishUnmarshal(cctx, ldr);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean addDeploymentInfo() {
+ return addDepInfo;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ writer.setBuffer(buf);
+
+ if (!super.writeTo(buf, writer))
+ return false;
+
+ if (!writer.isHeaderWritten()) {
+ if (!writer.writeHeader(directType(), fieldsCount()))
+ return false;
+
+ writer.onHeaderWritten();
+ }
+
+ switch (writer.state()) {
+ case 3:
+ if (!writer.writeByteArray("errBytes", errBytes))
+ return false;
+
+ writer.incrementState();
+
+ case 4:
+ if (!writer.writeCollection("failedKeys", failedKeys, MessageCollectionItemType.MSG))
+ return false;
+
+ writer.incrementState();
+
+ case 5:
+ if (!writer.writeMessage("futVer", futVer))
+ return false;
+
+ writer.incrementState();
+
+ case 6:
+ if (!writer.writeMessage("nearExpireTimes", nearExpireTimes))
+ return false;
+
+ writer.incrementState();
+
+ case 7:
+ if (!writer.writeCollection("nearSkipIdxs", nearSkipIdxs, MessageCollectionItemType.INT))
+ return false;
+
+ writer.incrementState();
+
+ case 8:
+ if (!writer.writeMessage("nearTtls", nearTtls))
+ return false;
+
+ writer.incrementState();
+
+ case 9:
+ if (!writer.writeCollection("nearVals", nearVals, MessageCollectionItemType.MSG))
+ return false;
+
+ writer.incrementState();
+
+ case 10:
+ if (!writer.writeCollection("nearValsIdxs", nearValsIdxs, MessageCollectionItemType.INT))
+ return false;
+
+ writer.incrementState();
+
+ case 11:
+ if (!writer.writeMessage("nearVer", nearVer))
+ return false;
+
+ writer.incrementState();
+
+ case 12:
+ if (!writer.writeCollection("remapKeys", remapKeys, MessageCollectionItemType.MSG))
+ return false;
+
+ writer.incrementState();
+
+ case 13:
+ if (!writer.writeMessage("ret", ret))
+ return false;
+
+ writer.incrementState();
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ reader.setBuffer(buf);
+
+ if (!reader.beforeMessageRead())
+ return false;
+
+ if (!super.readFrom(buf, reader))
+ return false;
+
+ switch (reader.state()) {
+ case 3:
+ errBytes = reader.readByteArray("errBytes");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 4:
+ failedKeys = reader.readCollection("failedKeys", MessageCollectionItemType.MSG);
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 5:
+ futVer = reader.readMessage("futVer");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 6:
+ nearExpireTimes = reader.readMessage("nearExpireTimes");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 7:
+ nearSkipIdxs = reader.readCollection("nearSkipIdxs", MessageCollectionItemType.INT);
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 8:
+ nearTtls = reader.readMessage("nearTtls");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 9:
+ nearVals = reader.readCollection("nearVals", MessageCollectionItemType.MSG);
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 10:
+ nearValsIdxs = reader.readCollection("nearValsIdxs", MessageCollectionItemType.INT);
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 11:
+ nearVer = reader.readMessage("nearVer");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 12:
+ remapKeys = reader.readCollection("remapKeys", MessageCollectionItemType.MSG);
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 13:
+ ret = reader.readMessage("ret");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ }
+
+ return reader.afterMessageRead(GridNearAtomicUpdateResponse.class);
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte directType() {
+ return 41;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte fieldsCount() {
+ return 14;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridNearAtomicUpdateResponse.class, this, "parent");
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/28c20c30/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
index ea6f889..5d5344e 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
@@ -29,7 +29,6 @@ import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
-import javax.cache.Cache;
import javax.cache.CacheException;
import junit.framework.AssertionFailedError;
import org.apache.ignite.Ignite;
@@ -51,8 +50,7 @@ import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.events.Event;
import org.apache.ignite.internal.managers.communication.GridIoMessage;
import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
-import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicMultipleUpdateResponse;
-import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateRequest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetResponse;
@@ -818,7 +816,7 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac
if (ccfg.getAtomicityMode() == ATOMIC)
checkOperationInProgressFails(client, ccfg, F.<Class<?>>asList(
- GridNearAtomicMultipleUpdateResponse.class, GridNearAtomicSingleUpdateResponse.class),
+ GridNearAtomicUpdateResponse.class, GridNearAtomicSingleUpdateResponse.class),
putOp);
else
checkOperationInProgressFails(client, ccfg, GridNearTxPrepareResponse.class, putOp);
http://git-wip-us.apache.org/repos/asf/ignite/blob/28c20c30/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCollectionsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCollectionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCollectionsTest.java
index 4c011db..22d56b0 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCollectionsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCollectionsTest.java
@@ -26,7 +26,7 @@ import org.apache.ignite.IgniteClientDisconnectedException;
import org.apache.ignite.IgniteQueue;
import org.apache.ignite.IgniteSet;
import org.apache.ignite.configuration.CollectionConfiguration;
-import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicMultipleUpdateResponse;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateResponseInterface;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareResponse;
import org.apache.ignite.testframework.GridTestUtils;
@@ -316,7 +316,7 @@ public class IgniteClientReconnectCollectionsTest extends IgniteClientReconnectA
BlockTpcCommunicationSpi commSpi = commSpi(srv);
if (colCfg.getAtomicityMode() == ATOMIC)
- commSpi.blockMessage(GridNearAtomicMultipleUpdateResponse.class);
+ commSpi.blockMessage(GridNearAtomicUpdateResponse.class);
else
commSpi.blockMessage(GridNearTxPrepareResponse.class);
http://git-wip-us.apache.org/repos/asf/ignite/blob/28c20c30/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicMessageCountSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicMessageCountSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicMessageCountSelfTest.java
index b56a8a6..358cc58 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicMessageCountSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicMessageCountSelfTest.java
@@ -27,10 +27,10 @@ import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.managers.communication.GridIoMessage;
-import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicMultipleUpdateRequest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicSingleUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateRequest;
-import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicMultipleUpdateRequest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateRequest;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.spi.IgniteSpiException;
@@ -139,9 +139,9 @@ public class GridCacheAtomicMessageCountSelfTest extends GridCommonAbstractTest
TestCommunicationSpi commSpi = (TestCommunicationSpi)grid(0).configuration().getCommunicationSpi();
- commSpi.registerMessage(GridNearAtomicMultipleUpdateRequest.class);
+ commSpi.registerMessage(GridNearAtomicUpdateRequest.class);
commSpi.registerMessage(GridNearAtomicSingleUpdateRequest.class);
- commSpi.registerMessage(GridDhtAtomicMultipleUpdateRequest.class);
+ commSpi.registerMessage(GridDhtAtomicUpdateRequest.class);
commSpi.registerMessage(GridDhtAtomicSingleUpdateRequest.class);
int putCnt = 15;
@@ -201,7 +201,7 @@ public class GridCacheAtomicMessageCountSelfTest extends GridCommonAbstractTest
* @return Count.
*/
private int nearRequestsCount(TestCommunicationSpi commSpi) {
- return commSpi.messageCount(GridNearAtomicMultipleUpdateRequest.class) +
+ return commSpi.messageCount(GridNearAtomicUpdateRequest.class) +
commSpi.messageCount(GridNearAtomicSingleUpdateRequest.class);
}
@@ -212,7 +212,7 @@ public class GridCacheAtomicMessageCountSelfTest extends GridCommonAbstractTest
* @return Count.
*/
private int dhtRequestsCount(TestCommunicationSpi commSpi) {
- return commSpi.messageCount(GridDhtAtomicMultipleUpdateRequest.class) +
+ return commSpi.messageCount(GridDhtAtomicUpdateRequest.class) +
commSpi.messageCount(GridDhtAtomicSingleUpdateRequest.class);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/28c20c30/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicStopBusySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicStopBusySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicStopBusySelfTest.java
index 43a111a..024ff2f 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicStopBusySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicStopBusySelfTest.java
@@ -19,7 +19,7 @@ package org.apache.ignite.internal.processors.cache;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateRequest;
-import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicMultipleUpdateRequest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateRequest;
/**
* Stopped node when client operations are executing.
@@ -32,7 +32,7 @@ public class IgniteCacheAtomicStopBusySelfTest extends IgniteCacheAbstractStopBu
/** {@inheritDoc} */
@Override public void testPut() throws Exception {
- bannedMsg.set(GridNearAtomicMultipleUpdateRequest.class);
+ bannedMsg.set(GridNearAtomicUpdateRequest.class);
bannedMsg.set(GridNearAtomicSingleUpdateRequest.class);
super.testPut();
@@ -40,7 +40,7 @@ public class IgniteCacheAtomicStopBusySelfTest extends IgniteCacheAbstractStopBu
/** {@inheritDoc} */
@Override public void testPutBatch() throws Exception {
- bannedMsg.set(GridNearAtomicMultipleUpdateRequest.class);
+ bannedMsg.set(GridNearAtomicUpdateRequest.class);
bannedMsg.set(GridNearAtomicSingleUpdateRequest.class);
super.testPut();
@@ -48,7 +48,7 @@ public class IgniteCacheAtomicStopBusySelfTest extends IgniteCacheAbstractStopBu
/** {@inheritDoc} */
@Override public void testPutAsync() throws Exception {
- bannedMsg.set(GridNearAtomicMultipleUpdateRequest.class);
+ bannedMsg.set(GridNearAtomicUpdateRequest.class);
bannedMsg.set(GridNearAtomicSingleUpdateRequest.class);
super.testPut();
@@ -56,7 +56,7 @@ public class IgniteCacheAtomicStopBusySelfTest extends IgniteCacheAbstractStopBu
/** {@inheritDoc} */
@Override public void testRemove() throws Exception {
- bannedMsg.set(GridNearAtomicMultipleUpdateRequest.class);
+ bannedMsg.set(GridNearAtomicUpdateRequest.class);
bannedMsg.set(GridNearAtomicSingleUpdateRequest.class);
super.testPut();
http://git-wip-us.apache.org/repos/asf/ignite/blob/28c20c30/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java
index 4733e19..8ecef5d 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java
@@ -62,7 +62,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateRequest;
-import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicMultipleUpdateRequest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateRequestInterface;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheEntry;
@@ -233,10 +233,10 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac
TestCommunicationSpi spi = (TestCommunicationSpi)ignite2.configuration().getCommunicationSpi();
// Block messages requests for both nodes.
- spi.blockMessages(GridNearAtomicMultipleUpdateRequest.class, ignite0.localNode().id());
+ spi.blockMessages(GridNearAtomicUpdateRequest.class, ignite0.localNode().id());
spi.blockMessages(GridNearAtomicSingleUpdateRequest.class, ignite0.localNode().id());
- spi.blockMessages(GridNearAtomicMultipleUpdateRequest.class, ignite1.localNode().id());
+ spi.blockMessages(GridNearAtomicUpdateRequest.class, ignite1.localNode().id());
spi.blockMessages(GridNearAtomicSingleUpdateRequest.class, ignite1.localNode().id());
final IgniteCache<Integer, Integer> cache = ignite2.cache(null);
@@ -278,7 +278,7 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac
map.put(i, i + 1);
// Block messages requests for single node.
- spi.blockMessages(GridNearAtomicMultipleUpdateRequest.class, ignite0.localNode().id());
+ spi.blockMessages(GridNearAtomicUpdateRequest.class, ignite0.localNode().id());
spi.blockMessages(GridNearAtomicSingleUpdateRequest.class, ignite0.localNode().id());
putFut = GridTestUtils.runAsync(new Callable<Object>() {
@@ -366,16 +366,16 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac
TestCommunicationSpi spi = (TestCommunicationSpi)ignite3.configuration().getCommunicationSpi();
// Block messages requests for both nodes.
- spi.blockMessages(GridNearAtomicMultipleUpdateRequest.class, ignite0.localNode().id());
+ spi.blockMessages(GridNearAtomicUpdateRequest.class, ignite0.localNode().id());
spi.blockMessages(GridNearAtomicSingleUpdateRequest.class, ignite0.localNode().id());
- spi.blockMessages(GridNearAtomicMultipleUpdateRequest.class, ignite1.localNode().id());
+ spi.blockMessages(GridNearAtomicUpdateRequest.class, ignite1.localNode().id());
spi.blockMessages(GridNearAtomicSingleUpdateRequest.class, ignite1.localNode().id());
- spi.blockMessages(GridNearAtomicMultipleUpdateRequest.class, ignite2.localNode().id());
+ spi.blockMessages(GridNearAtomicUpdateRequest.class, ignite2.localNode().id());
spi.blockMessages(GridNearAtomicSingleUpdateRequest.class, ignite2.localNode().id());
- spi.record(GridNearAtomicMultipleUpdateRequest.class, GridNearAtomicSingleUpdateRequest.class);
+ spi.record(GridNearAtomicUpdateRequest.class, GridNearAtomicSingleUpdateRequest.class);
final IgniteCache<Integer, Integer> cache = ignite3.cache(null);
@@ -469,10 +469,10 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac
TestCommunicationSpi spi = (TestCommunicationSpi)ignite2.configuration().getCommunicationSpi();
// Block messages requests for both nodes.
- spi.blockMessages(GridNearAtomicMultipleUpdateRequest.class, ignite0.localNode().id());
+ spi.blockMessages(GridNearAtomicUpdateRequest.class, ignite0.localNode().id());
spi.blockMessages(GridNearAtomicSingleUpdateRequest.class, ignite0.localNode().id());
- spi.blockMessages(GridNearAtomicMultipleUpdateRequest.class, ignite1.localNode().id());
+ spi.blockMessages(GridNearAtomicUpdateRequest.class, ignite1.localNode().id());
spi.blockMessages(GridNearAtomicSingleUpdateRequest.class, ignite1.localNode().id());
final IgniteCache<Integer, Integer> cache = ignite2.cache(null);
http://git-wip-us.apache.org/repos/asf/ignite/blob/28c20c30/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicInvalidPartitionHandlingSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicInvalidPartitionHandlingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicInvalidPartitionHandlingSelfTest.java
index 0786b49..b6c89c7 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicInvalidPartitionHandlingSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicInvalidPartitionHandlingSelfTest.java
@@ -478,9 +478,9 @@ public class GridCacheAtomicInvalidPartitionHandlingSelfTest extends GridCommonA
Object origMsg = msg.message();
return delay && (
- (origMsg instanceof GridNearAtomicMultipleUpdateRequest) ||
+ (origMsg instanceof GridNearAtomicUpdateRequest) ||
(origMsg instanceof GridNearAtomicSingleUpdateRequest) ||
- (origMsg instanceof GridDhtAtomicMultipleUpdateRequest) ||
+ (origMsg instanceof GridDhtAtomicUpdateRequest) ||
(origMsg instanceof GridDhtAtomicSingleUpdateRequest)
);
}