You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/02/25 13:31:47 UTC
[48/51] [abbrv] ignite git commit: Review.
Review.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1829f441
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1829f441
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1829f441
Branch: refs/heads/ignite-2523
Commit: 1829f44188582be33a1d0d74538cc8b1bc265531
Parents: f83d909
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Thu Feb 25 15:20:12 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Thu Feb 25 15:20:12 2016 +0300
----------------------------------------------------------------------
.../communication/GridIoMessageFactory.java | 16 +-
.../processors/cache/GridCacheIoManager.java | 16 +-
.../dht/atomic/GridDhtAtomicCache.java | 20 +-
.../GridDhtAtomicMultipleUpdateRequest.java | 1004 ++++++++++++++++++
.../GridDhtAtomicMultipleUpdateResponse.java | 279 +++++
.../dht/atomic/GridDhtAtomicUpdateFuture.java | 4 +-
.../dht/atomic/GridDhtAtomicUpdateRequest.java | 1004 ------------------
.../dht/atomic/GridDhtAtomicUpdateResponse.java | 279 -----
.../GridNearAtomicMultipleUpdateRequest.java | 989 +++++++++++++++++
.../GridNearAtomicMultipleUpdateResponse.java | 575 ++++++++++
.../dht/atomic/GridNearAtomicUpdateFuture.java | 30 +-
.../dht/atomic/GridNearAtomicUpdateRequest.java | 989 -----------------
.../atomic/GridNearAtomicUpdateResponse.java | 575 ----------
.../IgniteClientReconnectCacheTest.java | 4 +-
.../IgniteClientReconnectCollectionsTest.java | 4 +-
.../GridCacheAtomicMessageCountSelfTest.java | 12 +-
.../IgniteCacheAtomicStopBusySelfTest.java | 10 +-
...niteCacheClientNodeChangingTopologyTest.java | 20 +-
...eAtomicInvalidPartitionHandlingSelfTest.java | 4 +-
19 files changed, 2917 insertions(+), 2917 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/1829f441/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index e44ad00..2366104 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -66,13 +66,13 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrep
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtUnlockRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicDeferredUpdateResponse;
-import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateRequest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicMultipleUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicSingleUpdateRequest;
-import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateResponse;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicMultipleUpdateResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicSingleUpdateResponse;
-import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateResponse;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicMultipleUpdateResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateRequest;
-import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateRequest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicMultipleUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysResponse;
@@ -356,22 +356,22 @@ public class GridIoMessageFactory implements MessageFactory {
break;
case 38:
- msg = new GridDhtAtomicUpdateRequest();
+ msg = new GridDhtAtomicMultipleUpdateRequest();
break;
case 39:
- msg = new GridDhtAtomicUpdateResponse();
+ msg = new GridDhtAtomicMultipleUpdateResponse();
break;
case 40:
- msg = new GridNearAtomicUpdateRequest();
+ msg = new GridNearAtomicMultipleUpdateRequest();
break;
case 41:
- msg = new GridNearAtomicUpdateResponse();
+ msg = new GridNearAtomicMultipleUpdateResponse();
break;
http://git-wip-us.apache.org/repos/asf/ignite/blob/1829f441/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index 48a0b9f..2ec8dd9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -34,12 +34,12 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLockRe
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridPartitionedSingleGetFuture;
-import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateRequest;
-import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateResponse;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicMultipleUpdateRequest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicMultipleUpdateResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicSingleUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicSingleUpdateResponse;
-import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateRequest;
-import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateResponse;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicMultipleUpdateRequest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicMultipleUpdateResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysRequest;
@@ -398,9 +398,9 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
break;
case 38: {
- GridDhtAtomicUpdateRequest req = (GridDhtAtomicUpdateRequest)msg;
+ GridDhtAtomicMultipleUpdateRequest req = (GridDhtAtomicMultipleUpdateRequest)msg;
- GridDhtAtomicUpdateResponse res = new GridDhtAtomicUpdateResponse(
+ GridDhtAtomicMultipleUpdateResponse res = new GridDhtAtomicMultipleUpdateResponse(
ctx.cacheId(),
req.futureVersion(),
ctx.deploymentEnabled());
@@ -413,9 +413,9 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
break;
case 40: {
- GridNearAtomicUpdateRequest req = (GridNearAtomicUpdateRequest)msg;
+ GridNearAtomicMultipleUpdateRequest req = (GridNearAtomicMultipleUpdateRequest)msg;
- GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(
+ GridNearAtomicMultipleUpdateResponse res = new GridNearAtomicMultipleUpdateResponse(
ctx.cacheId(),
nodeId,
req.futureVersion(),
http://git-wip-us.apache.org/repos/asf/ignite/blob/1829f441/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 4c07bf2..6965a9c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -252,8 +252,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
}
});
- ctx.io().addHandler(ctx.cacheId(), GridNearAtomicUpdateRequest.class, new CI2<UUID, GridNearAtomicUpdateRequest>() {
- @Override public void apply(UUID nodeId, GridNearAtomicUpdateRequest req) {
+ ctx.io().addHandler(ctx.cacheId(), GridNearAtomicMultipleUpdateRequest.class, new CI2<UUID, GridNearAtomicMultipleUpdateRequest>() {
+ @Override public void apply(UUID nodeId, GridNearAtomicMultipleUpdateRequest req) {
processNearAtomicUpdateRequest(nodeId, req);
}
});
@@ -264,8 +264,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
}
});
- ctx.io().addHandler(ctx.cacheId(), GridNearAtomicUpdateResponse.class, new CI2<UUID, GridNearAtomicUpdateResponse>() {
- @Override public void apply(UUID nodeId, GridNearAtomicUpdateResponse res) {
+ ctx.io().addHandler(ctx.cacheId(), GridNearAtomicMultipleUpdateResponse.class, new CI2<UUID, GridNearAtomicMultipleUpdateResponse>() {
+ @Override public void apply(UUID nodeId, GridNearAtomicMultipleUpdateResponse res) {
processNearAtomicUpdateResponse(nodeId, res);
}
});
@@ -276,8 +276,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
}
});
- ctx.io().addHandler(ctx.cacheId(), GridDhtAtomicUpdateRequest.class, new CI2<UUID, GridDhtAtomicUpdateRequest>() {
- @Override public void apply(UUID nodeId, GridDhtAtomicUpdateRequest req) {
+ ctx.io().addHandler(ctx.cacheId(), GridDhtAtomicMultipleUpdateRequest.class, new CI2<UUID, GridDhtAtomicMultipleUpdateRequest>() {
+ @Override public void apply(UUID nodeId, GridDhtAtomicMultipleUpdateRequest req) {
processDhtAtomicUpdateRequest(nodeId, req);
}
});
@@ -288,8 +288,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
}
});
- ctx.io().addHandler(ctx.cacheId(), GridDhtAtomicUpdateResponse.class, new CI2<UUID, GridDhtAtomicUpdateResponse>() {
- @Override public void apply(UUID nodeId, GridDhtAtomicUpdateResponse res) {
+ ctx.io().addHandler(ctx.cacheId(), GridDhtAtomicMultipleUpdateResponse.class, new CI2<UUID, GridDhtAtomicMultipleUpdateResponse>() {
+ @Override public void apply(UUID nodeId, GridDhtAtomicMultipleUpdateResponse res) {
processDhtAtomicUpdateResponse(nodeId, res);
}
});
@@ -1365,7 +1365,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
res = new GridNearAtomicSingleUpdateResponse(ctx.cacheId(), nodeId, req.futureVersion(),
ctx.deploymentEnabled());
else
- res = new GridNearAtomicUpdateResponse(ctx.cacheId(), nodeId, req.futureVersion(),
+ res = new GridNearAtomicMultipleUpdateResponse(ctx.cacheId(), nodeId, req.futureVersion(),
ctx.deploymentEnabled());
List<KeyCacheObject> keys = req.keys();
@@ -2831,7 +2831,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
res = new GridDhtAtomicSingleUpdateResponse(ctx.cacheId(), req.futureVersion(),
ctx.deploymentEnabled());
else
- res = new GridDhtAtomicUpdateResponse(ctx.cacheId(), req.futureVersion(),
+ res = new GridDhtAtomicMultipleUpdateResponse(ctx.cacheId(), req.futureVersion(),
ctx.deploymentEnabled());
Boolean replicate = ctx.isDrEnabled();
http://git-wip-us.apache.org/repos/asf/ignite/blob/1829f441/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicMultipleUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicMultipleUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicMultipleUpdateRequest.java
new file mode 100644
index 0000000..de89b47
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicMultipleUpdateRequest.java
@@ -0,0 +1,1004 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
+
+import java.io.Externalizable;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.UUID;
+import javax.cache.processor.EntryProcessor;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.internal.GridDirectCollection;
+import org.apache.ignite.internal.GridDirectTransient;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
+import org.apache.ignite.internal.processors.cache.GridCacheMessage;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.GridLongList;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Lite dht cache backup update request.
+ */
+public class GridDhtAtomicMultipleUpdateRequest extends GridCacheMessage implements GridCacheDeployable, GridDhtAtomicUpdateRequestInterface {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Message index. */
+ public static final int CACHE_MSG_IDX = nextIndexId();
+
+ /** Node ID. */
+ private UUID nodeId;
+
+ /** Future version. */
+ private GridCacheVersion futVer;
+
+ /** Write version. */
+ private GridCacheVersion writeVer;
+
+ /** Topology version. */
+ private AffinityTopologyVersion topVer;
+
+ /** Keys to update. */
+ @GridToStringInclude
+ @GridDirectCollection(KeyCacheObject.class)
+ private List<KeyCacheObject> keys;
+
+ /** Values to update. */
+ @GridToStringInclude
+ @GridDirectCollection(CacheObject.class)
+ private List<CacheObject> vals;
+
+ /** Previous values. */
+ @GridToStringInclude
+ @GridDirectCollection(CacheObject.class)
+ private List<CacheObject> prevVals;
+
+ /** Conflict versions. */
+ @GridDirectCollection(GridCacheVersion.class)
+ private List<GridCacheVersion> conflictVers;
+
+ /** TTLs. */
+ private GridLongList ttls;
+
+ /** Conflict expire time. */
+ private GridLongList conflictExpireTimes;
+
+ /** Near TTLs. */
+ private GridLongList nearTtls;
+
+ /** Near expire times. */
+ private GridLongList nearExpireTimes;
+
+ /** Write synchronization mode. */
+ private CacheWriteSynchronizationMode syncMode;
+
+ /** Near cache keys to update. */
+ @GridToStringInclude
+ @GridDirectCollection(KeyCacheObject.class)
+ private List<KeyCacheObject> nearKeys;
+
+ /** Values to update. */
+ @GridToStringInclude
+ @GridDirectCollection(CacheObject.class)
+ private List<CacheObject> nearVals;
+
+ /** Force transform backups flag. */
+ private boolean forceTransformBackups;
+
+ /** Entry processors. */
+ @GridDirectTransient
+ private List<EntryProcessor<Object, Object, Object>> entryProcessors;
+
+ /** Entry processors bytes. */
+ @GridDirectCollection(byte[].class)
+ private List<byte[]> entryProcessorsBytes;
+
+ /** Near entry processors. */
+ @GridDirectTransient
+ private List<EntryProcessor<Object, Object, Object>> nearEntryProcessors;
+
+ /** Near entry processors bytes. */
+ @GridDirectCollection(byte[].class)
+ private List<byte[]> nearEntryProcessorsBytes;
+
+ /** Optional arguments for entry processor. */
+ @GridDirectTransient
+ private Object[] invokeArgs;
+
+ /** Entry processor arguments bytes. */
+ private byte[][] invokeArgsBytes;
+
+ /** Subject ID. */
+ private UUID subjId;
+
+ /** Task name hash. */
+ private int taskNameHash;
+
+ /** Partition. */
+ private GridLongList updateCntrs;
+
+ /** On response flag. Access should be synced on future. */
+ @GridDirectTransient
+ private boolean onRes;
+
+ /** */
+ @GridDirectTransient
+ private List<Integer> partIds;
+
+ /** */
+ @GridDirectTransient
+ private List<CacheObject> locPrevVals;
+
+ /** Keep binary flag. */
+ private boolean keepBinary;
+
+ /**
+ * Empty constructor required by {@link Externalizable}.
+ */
+ public GridDhtAtomicMultipleUpdateRequest() {
+ // No-op.
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param cacheId Cache ID.
+ * @param nodeId Node ID.
+ * @param futVer Future version.
+ * @param writeVer Write version for cache values.
+ * @param invokeArgs Optional arguments for entry processor.
+ * @param syncMode Cache write synchronization mode.
+ * @param topVer Topology version.
+ * @param forceTransformBackups Force transform backups flag.
+ * @param subjId Subject ID.
+ * @param taskNameHash Task name hash code.
+ * @param addDepInfo Deployment info.
+ */
+ public GridDhtAtomicMultipleUpdateRequest(
+ int cacheId,
+ UUID nodeId,
+ GridCacheVersion futVer,
+ GridCacheVersion writeVer,
+ CacheWriteSynchronizationMode syncMode,
+ @NotNull AffinityTopologyVersion topVer,
+ boolean forceTransformBackups,
+ UUID subjId,
+ int taskNameHash,
+ Object[] invokeArgs,
+ boolean addDepInfo,
+ boolean keepBinary
+ ) {
+ assert invokeArgs == null || forceTransformBackups;
+
+ this.cacheId = cacheId;
+ this.nodeId = nodeId;
+ this.futVer = futVer;
+ this.writeVer = writeVer;
+ this.syncMode = syncMode;
+ this.topVer = topVer;
+ this.forceTransformBackups = forceTransformBackups;
+ this.subjId = subjId;
+ this.taskNameHash = taskNameHash;
+ this.invokeArgs = invokeArgs;
+ this.addDepInfo = addDepInfo;
+ this.keepBinary = keepBinary;
+
+ keys = new ArrayList<>();
+ partIds = new ArrayList<>();
+
+ if (forceTransformBackups) {
+ entryProcessors = new ArrayList<>();
+ entryProcessorsBytes = new ArrayList<>();
+ }
+ else
+ vals = new ArrayList<>();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean forceTransformBackups() {
+ return forceTransformBackups;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void addWriteValue(KeyCacheObject key,
+ @Nullable CacheObject val,
+ EntryProcessor<Object, Object, Object> entryProcessor,
+ long ttl,
+ long conflictExpireTime,
+ @Nullable GridCacheVersion conflictVer,
+ boolean addPrevVal,
+ int partId,
+ @Nullable CacheObject prevVal,
+ @Nullable Long updateCntr,
+ boolean storeLocPrevVal) {
+ keys.add(key);
+
+ partIds.add(partId);
+
+ if (storeLocPrevVal) {
+ if (locPrevVals == null)
+ locPrevVals = new ArrayList<>();
+
+ locPrevVals.add(prevVal);
+ }
+
+ if (forceTransformBackups) {
+ assert entryProcessor != null;
+
+ entryProcessors.add(entryProcessor);
+ }
+ else
+ vals.add(val);
+
+ if (addPrevVal) {
+ if (prevVals == null)
+ prevVals = new ArrayList<>();
+
+ prevVals.add(prevVal);
+ }
+
+ if (updateCntr != null) {
+ if (updateCntrs == null)
+ updateCntrs = new GridLongList();
+
+ updateCntrs.add(updateCntr);
+ }
+
+ // In case there is no conflict, do not create the list.
+ if (conflictVer != null) {
+ if (conflictVers == null) {
+ conflictVers = new ArrayList<>();
+
+ for (int i = 0; i < keys.size() - 1; i++)
+ conflictVers.add(null);
+ }
+
+ conflictVers.add(conflictVer);
+ }
+ else if (conflictVers != null)
+ conflictVers.add(null);
+
+ if (ttl >= 0) {
+ if (ttls == null) {
+ ttls = new GridLongList(keys.size());
+
+ for (int i = 0; i < keys.size() - 1; i++)
+ ttls.add(CU.TTL_NOT_CHANGED);
+ }
+ }
+
+ if (ttls != null)
+ ttls.add(ttl);
+
+ if (conflictExpireTime >= 0) {
+ if (conflictExpireTimes == null) {
+ conflictExpireTimes = new GridLongList(keys.size());
+
+ for (int i = 0; i < keys.size() - 1; i++)
+ conflictExpireTimes.add(CU.EXPIRE_TIME_CALCULATE);
+ }
+ }
+
+ if (conflictExpireTimes != null)
+ conflictExpireTimes.add(conflictExpireTime);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void addNearWriteValue(KeyCacheObject key,
+ @Nullable CacheObject val,
+ EntryProcessor<Object, Object, Object> entryProcessor,
+ long ttl,
+ long expireTime) {
+ if (nearKeys == null) {
+ nearKeys = new ArrayList<>();
+
+ if (forceTransformBackups) {
+ nearEntryProcessors = new ArrayList<>();
+ nearEntryProcessorsBytes = new ArrayList<>();
+ }
+ else
+ nearVals = new ArrayList<>();
+ }
+
+ nearKeys.add(key);
+
+ if (forceTransformBackups) {
+ assert entryProcessor != null;
+
+ nearEntryProcessors.add(entryProcessor);
+ }
+ else
+ nearVals.add(val);
+
+ if (ttl >= 0) {
+ if (nearTtls == null) {
+ nearTtls = new GridLongList(nearKeys.size());
+
+ for (int i = 0; i < nearKeys.size() - 1; i++)
+ nearTtls.add(CU.TTL_NOT_CHANGED);
+ }
+ }
+
+ if (nearTtls != null)
+ nearTtls.add(ttl);
+
+ if (expireTime >= 0) {
+ if (nearExpireTimes == null) {
+ nearExpireTimes = new GridLongList(nearKeys.size());
+
+ for (int i = 0; i < nearKeys.size() - 1; i++)
+ nearExpireTimes.add(CU.EXPIRE_TIME_CALCULATE);
+ }
+ }
+
+ if (nearExpireTimes != null)
+ nearExpireTimes.add(expireTime);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int lookupIndex() {
+ return CACHE_MSG_IDX;
+ }
+
+ /** {@inheritDoc} */
+ @Override public UUID nodeId() {
+ return nodeId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public UUID subjectId() {
+ return subjId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int taskNameHash() {
+ return taskNameHash;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int size() {
+ return keys.size();
+ }
+
+ /** {@inheritDoc} */
+ @Override public int nearSize() {
+ return nearKeys != null ? nearKeys.size() : 0;
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridCacheVersion futureVersion() {
+ return futVer;
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridCacheVersion writeVersion() {
+ return writeVer;
+ }
+
+ /** {@inheritDoc} */
+ @Override public CacheWriteSynchronizationMode writeSynchronizationMode() {
+ return syncMode;
+ }
+
+ /** {@inheritDoc} */
+ @Override public AffinityTopologyVersion topologyVersion() {
+ return topVer;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<KeyCacheObject> keys() {
+ return keys;
+ }
+
+ /** {@inheritDoc} */
+ @Override public KeyCacheObject key(int idx) {
+ return keys.get(idx);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int partitionId(int idx) {
+ return partIds.get(idx);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Long updateCounter(int updCntr) {
+ if (updateCntrs != null && updCntr < updateCntrs.size())
+ return updateCntrs.get(updCntr);
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public KeyCacheObject nearKey(int idx) {
+ return nearKeys.get(idx);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean keepBinary() {
+ return keepBinary;
+ }
+
+ /** {@inheritDoc} */
+ @Override @Nullable public CacheObject value(int idx) {
+ if (vals != null)
+ return vals.get(idx);
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override @Nullable public CacheObject previousValue(int idx) {
+ if (prevVals != null)
+ return prevVals.get(idx);
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override @Nullable public CacheObject localPreviousValue(int idx) {
+ assert locPrevVals != null;
+
+ return locPrevVals.get(idx);
+ }
+
+ /** {@inheritDoc} */
+ @Override @Nullable public EntryProcessor<Object, Object, Object> entryProcessor(int idx) {
+ return entryProcessors == null ? null : entryProcessors.get(idx);
+ }
+
+ /** {@inheritDoc} */
+ @Override @Nullable public CacheObject nearValue(int idx) {
+ if (nearVals != null)
+ return nearVals.get(idx);
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override @Nullable public EntryProcessor<Object, Object, Object> nearEntryProcessor(int idx) {
+ return nearEntryProcessors == null ? null : nearEntryProcessors.get(idx);
+ }
+
+ /** {@inheritDoc} */
+ @Override @Nullable public GridCacheVersion conflictVersion(int idx) {
+ if (conflictVers != null) {
+ assert idx >= 0 && idx < conflictVers.size();
+
+ return conflictVers.get(idx);
+ }
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long ttl(int idx) {
+ if (ttls != null) {
+ assert idx >= 0 && idx < ttls.size();
+
+ return ttls.get(idx);
+ }
+
+ return CU.TTL_NOT_CHANGED;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long nearTtl(int idx) {
+ if (nearTtls != null) {
+ assert idx >= 0 && idx < nearTtls.size();
+
+ return nearTtls.get(idx);
+ }
+
+ return CU.TTL_NOT_CHANGED;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long conflictExpireTime(int idx) {
+ if (conflictExpireTimes != null) {
+ assert idx >= 0 && idx < conflictExpireTimes.size();
+
+ return conflictExpireTimes.get(idx);
+ }
+
+ return CU.EXPIRE_TIME_CALCULATE;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long nearExpireTime(int idx) {
+ if (nearExpireTimes != null) {
+ assert idx >= 0 && idx < nearExpireTimes.size();
+
+ return nearExpireTimes.get(idx);
+ }
+
+ return CU.EXPIRE_TIME_CALCULATE;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean onResponse() {
+ return !onRes && (onRes = true);
+ }
+
+ /** {@inheritDoc} */
+ @Override @Nullable public Object[] invokeArguments() {
+ return invokeArgs;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
+ super.prepareMarshal(ctx);
+
+ GridCacheContext cctx = ctx.cacheContext(cacheId);
+
+ prepareMarshalCacheObjects(keys, cctx);
+
+ prepareMarshalCacheObjects(vals, cctx);
+
+ prepareMarshalCacheObjects(nearKeys, cctx);
+
+ prepareMarshalCacheObjects(nearVals, cctx);
+
+ prepareMarshalCacheObjects(prevVals, cctx);
+
+ if (forceTransformBackups) {
+ // force addition of deployment info for entry processors if P2P is enabled globally.
+ if (!addDepInfo && ctx.deploymentEnabled())
+ addDepInfo = true;
+
+ if (invokeArgsBytes == null)
+ invokeArgsBytes = marshalInvokeArguments(invokeArgs, cctx);
+
+ if (entryProcessorsBytes == null)
+ entryProcessorsBytes = marshalCollection(entryProcessors, cctx);
+
+ if (nearEntryProcessorsBytes == null)
+ nearEntryProcessorsBytes = marshalCollection(nearEntryProcessors, cctx);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException {
+ super.finishUnmarshal(ctx, ldr);
+
+ GridCacheContext cctx = ctx.cacheContext(cacheId);
+
+ finishUnmarshalCacheObjects(keys, cctx, ldr);
+
+ finishUnmarshalCacheObjects(vals, cctx, ldr);
+
+ finishUnmarshalCacheObjects(nearKeys, cctx, ldr);
+
+ finishUnmarshalCacheObjects(nearVals, cctx, ldr);
+
+ finishUnmarshalCacheObjects(prevVals, cctx, ldr);
+
+ if (forceTransformBackups) {
+ if (entryProcessors == null)
+ entryProcessors = unmarshalCollection(entryProcessorsBytes, ctx, ldr);
+
+ if (invokeArgs == null)
+ invokeArgs = unmarshalInvokeArguments(invokeArgsBytes, ctx, ldr);
+
+ if (nearEntryProcessors == null)
+ nearEntryProcessors = unmarshalCollection(nearEntryProcessorsBytes, ctx, ldr);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean addDeploymentInfo() {
+ return addDepInfo;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ writer.setBuffer(buf);
+
+ if (!super.writeTo(buf, writer))
+ return false;
+
+ if (!writer.isHeaderWritten()) {
+ if (!writer.writeHeader(directType(), fieldsCount()))
+ return false;
+
+ writer.onHeaderWritten();
+ }
+
+ switch (writer.state()) {
+ case 3:
+ if (!writer.writeMessage("conflictExpireTimes", conflictExpireTimes))
+ return false;
+
+ writer.incrementState();
+
+ case 4:
+ if (!writer.writeCollection("conflictVers", conflictVers, MessageCollectionItemType.MSG))
+ return false;
+
+ writer.incrementState();
+
+ case 5:
+ if (!writer.writeCollection("entryProcessorsBytes", entryProcessorsBytes, MessageCollectionItemType.BYTE_ARR))
+ return false;
+
+ writer.incrementState();
+
+ case 6:
+ if (!writer.writeBoolean("forceTransformBackups", forceTransformBackups))
+ return false;
+
+ writer.incrementState();
+
+ case 7:
+ if (!writer.writeMessage("futVer", futVer))
+ return false;
+
+ writer.incrementState();
+
+ case 8:
+ if (!writer.writeObjectArray("invokeArgsBytes", invokeArgsBytes, MessageCollectionItemType.BYTE_ARR))
+ return false;
+
+ writer.incrementState();
+
+ case 9:
+ if (!writer.writeBoolean("keepBinary", keepBinary))
+ return false;
+
+ writer.incrementState();
+
+ case 10:
+ if (!writer.writeCollection("keys", keys, MessageCollectionItemType.MSG))
+ return false;
+
+ writer.incrementState();
+
+ case 11:
+ if (!writer.writeCollection("nearEntryProcessorsBytes", nearEntryProcessorsBytes, MessageCollectionItemType.BYTE_ARR))
+ return false;
+
+ writer.incrementState();
+
+ case 12:
+ if (!writer.writeMessage("nearExpireTimes", nearExpireTimes))
+ return false;
+
+ writer.incrementState();
+
+ case 13:
+ if (!writer.writeCollection("nearKeys", nearKeys, MessageCollectionItemType.MSG))
+ return false;
+
+ writer.incrementState();
+
+ case 14:
+ if (!writer.writeMessage("nearTtls", nearTtls))
+ return false;
+
+ writer.incrementState();
+
+ case 15:
+ if (!writer.writeCollection("nearVals", nearVals, MessageCollectionItemType.MSG))
+ return false;
+
+ writer.incrementState();
+
+ case 16:
+ if (!writer.writeCollection("prevVals", prevVals, MessageCollectionItemType.MSG))
+ return false;
+
+ writer.incrementState();
+
+ case 17:
+ if (!writer.writeUuid("subjId", subjId))
+ return false;
+
+ writer.incrementState();
+
+ case 18:
+ if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1))
+ return false;
+
+ writer.incrementState();
+
+ case 19:
+ if (!writer.writeInt("taskNameHash", taskNameHash))
+ return false;
+
+ writer.incrementState();
+
+ case 20:
+ if (!writer.writeMessage("topVer", topVer))
+ return false;
+
+ writer.incrementState();
+
+ case 21:
+ if (!writer.writeMessage("ttls", ttls))
+ return false;
+
+ writer.incrementState();
+
+ case 22:
+ if (!writer.writeMessage("updateCntrs", updateCntrs))
+ return false;
+
+ writer.incrementState();
+
+ case 23:
+ if (!writer.writeCollection("vals", vals, MessageCollectionItemType.MSG))
+ return false;
+
+ writer.incrementState();
+
+ case 24:
+ if (!writer.writeMessage("writeVer", writeVer))
+ return false;
+
+ writer.incrementState();
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ reader.setBuffer(buf);
+
+ if (!reader.beforeMessageRead())
+ return false;
+
+ if (!super.readFrom(buf, reader))
+ return false;
+
+ switch (reader.state()) {
+ case 3:
+ conflictExpireTimes = reader.readMessage("conflictExpireTimes");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 4:
+ conflictVers = reader.readCollection("conflictVers", MessageCollectionItemType.MSG);
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 5:
+ entryProcessorsBytes = reader.readCollection("entryProcessorsBytes", MessageCollectionItemType.BYTE_ARR);
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 6:
+ forceTransformBackups = reader.readBoolean("forceTransformBackups");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 7:
+ futVer = reader.readMessage("futVer");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 8:
+ invokeArgsBytes = reader.readObjectArray("invokeArgsBytes", MessageCollectionItemType.BYTE_ARR, byte[].class);
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 9:
+ keepBinary = reader.readBoolean("keepBinary");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 10:
+ keys = reader.readCollection("keys", MessageCollectionItemType.MSG);
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 11:
+ nearEntryProcessorsBytes = reader.readCollection("nearEntryProcessorsBytes", MessageCollectionItemType.BYTE_ARR);
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 12:
+ nearExpireTimes = reader.readMessage("nearExpireTimes");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 13:
+ nearKeys = reader.readCollection("nearKeys", MessageCollectionItemType.MSG);
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 14:
+ nearTtls = reader.readMessage("nearTtls");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 15:
+ nearVals = reader.readCollection("nearVals", MessageCollectionItemType.MSG);
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 16:
+ prevVals = reader.readCollection("prevVals", MessageCollectionItemType.MSG);
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 17:
+ subjId = reader.readUuid("subjId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 18:
+ byte syncModeOrd;
+
+ syncModeOrd = reader.readByte("syncMode");
+
+ if (!reader.isLastRead())
+ return false;
+
+ syncMode = CacheWriteSynchronizationMode.fromOrdinal(syncModeOrd);
+
+ reader.incrementState();
+
+ case 19:
+ taskNameHash = reader.readInt("taskNameHash");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 20:
+ topVer = reader.readMessage("topVer");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 21:
+ ttls = reader.readMessage("ttls");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 22:
+ updateCntrs = reader.readMessage("updateCntrs");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 23:
+ vals = reader.readCollection("vals", MessageCollectionItemType.MSG);
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 24:
+ writeVer = reader.readMessage("writeVer");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ }
+
+ return reader.afterMessageRead(GridDhtAtomicMultipleUpdateRequest.class);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onAckReceived() {
+ cleanup();
+ }
+
+ /**
+ * Cleanup values not needed after message was sent.
+ */
+ private void cleanup() {
+ nearVals = null;
+ prevVals = null;
+
+ // Do not keep values if they are not needed for continuous query notification.
+ if (locPrevVals == null) {
+ keys = null;
+ vals = null;
+ locPrevVals = null;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte directType() {
+ return 38;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte fieldsCount() {
+ return 25;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridDhtAtomicMultipleUpdateRequest.class, this, "super", super.toString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1829f441/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicMultipleUpdateResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicMultipleUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicMultipleUpdateResponse.java
new file mode 100644
index 0000000..4853ef5
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicMultipleUpdateResponse.java
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
+
+import java.io.Externalizable;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.GridDirectCollection;
+import org.apache.ignite.internal.GridDirectTransient;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
+import org.apache.ignite.internal.processors.cache.GridCacheMessage;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ * DHT atomic cache backup update response.
+ */
+public class GridDhtAtomicMultipleUpdateResponse extends GridCacheMessage implements GridCacheDeployable, GridDhtAtomicUpdateResponseInterface {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Message index. */
+ public static final int CACHE_MSG_IDX = nextIndexId();
+
+ /** Future version. */
+ private GridCacheVersion futVer;
+
+ /** Failed keys. */
+ @GridToStringInclude
+ @GridDirectCollection(KeyCacheObject.class)
+ private List<KeyCacheObject> failedKeys;
+
+ /** Update error. */
+ @GridDirectTransient
+ private IgniteCheckedException err;
+
+ /** Serialized update error. */
+ private byte[] errBytes;
+
+ /** Evicted readers. */
+ @GridToStringInclude
+ @GridDirectCollection(KeyCacheObject.class)
+ private List<KeyCacheObject> nearEvicted;
+
+ /**
+ * Empty constructor required by {@link Externalizable}.
+ */
+ public GridDhtAtomicMultipleUpdateResponse() {
+ // No-op.
+ }
+
+ /**
+ * @param cacheId Cache ID.
+ * @param futVer Future version.
+ * @param addDepInfo Deployment info.
+ */
+ public GridDhtAtomicMultipleUpdateResponse(int cacheId, GridCacheVersion futVer, boolean addDepInfo) {
+ this.cacheId = cacheId;
+ this.futVer = futVer;
+ this.addDepInfo = addDepInfo;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int lookupIndex() {
+ return CACHE_MSG_IDX;
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridCacheVersion futureVersion() {
+ return futVer;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onError(IgniteCheckedException err) {
+ this.err = err;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteCheckedException error() {
+ return err;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<KeyCacheObject> failedKeys() {
+ return failedKeys;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void addFailedKey(KeyCacheObject key, Throwable e) {
+ if (failedKeys == null)
+ failedKeys = new ArrayList<>();
+
+ failedKeys.add(key);
+
+ if (err == null)
+ err = new IgniteCheckedException("Failed to update keys on primary node.");
+
+ err.addSuppressed(e);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<KeyCacheObject> nearEvicted() {
+ return nearEvicted;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void addNearEvicted(KeyCacheObject key) {
+ if (nearEvicted == null)
+ nearEvicted = new ArrayList<>();
+
+ nearEvicted.add(key);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
+ super.prepareMarshal(ctx);
+
+ GridCacheContext cctx = ctx.cacheContext(cacheId);
+
+ prepareMarshalCacheObjects(failedKeys, cctx);
+
+ prepareMarshalCacheObjects(nearEvicted, cctx);
+
+ if (errBytes == null)
+ errBytes = ctx.marshaller().marshal(err);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException {
+ super.finishUnmarshal(ctx, ldr);
+
+ GridCacheContext cctx = ctx.cacheContext(cacheId);
+
+ finishUnmarshalCacheObjects(failedKeys, cctx, ldr);
+
+ finishUnmarshalCacheObjects(nearEvicted, cctx, ldr);
+
+ if (errBytes != null && err == null)
+ err = ctx.marshaller().unmarshal(errBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean addDeploymentInfo() {
+ return addDepInfo;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ writer.setBuffer(buf);
+
+ if (!super.writeTo(buf, writer))
+ return false;
+
+ if (!writer.isHeaderWritten()) {
+ if (!writer.writeHeader(directType(), fieldsCount()))
+ return false;
+
+ writer.onHeaderWritten();
+ }
+
+ switch (writer.state()) {
+ case 3:
+ if (!writer.writeByteArray("errBytes", errBytes))
+ return false;
+
+ writer.incrementState();
+
+ case 4:
+ if (!writer.writeCollection("failedKeys", failedKeys, MessageCollectionItemType.MSG))
+ return false;
+
+ writer.incrementState();
+
+ case 5:
+ if (!writer.writeMessage("futVer", futVer))
+ return false;
+
+ writer.incrementState();
+
+ case 6:
+ if (!writer.writeCollection("nearEvicted", nearEvicted, MessageCollectionItemType.MSG))
+ return false;
+
+ writer.incrementState();
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ reader.setBuffer(buf);
+
+ if (!reader.beforeMessageRead())
+ return false;
+
+ if (!super.readFrom(buf, reader))
+ return false;
+
+ switch (reader.state()) {
+ case 3:
+ errBytes = reader.readByteArray("errBytes");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 4:
+ failedKeys = reader.readCollection("failedKeys", MessageCollectionItemType.MSG);
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 5:
+ futVer = reader.readMessage("futVer");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 6:
+ nearEvicted = reader.readCollection("nearEvicted", MessageCollectionItemType.MSG);
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ }
+
+ return reader.afterMessageRead(GridDhtAtomicMultipleUpdateResponse.class);
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte directType() {
+ return 39;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte fieldsCount() {
+ return 7;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridDhtAtomicMultipleUpdateResponse.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1829f441/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
index 82cae3c..6e2ed31 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
@@ -266,7 +266,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> implement
cctx.deploymentEnabled(),
this.updateReq.keepBinary());
else
- updateReq = new GridDhtAtomicUpdateRequest(
+ updateReq = new GridDhtAtomicMultipleUpdateRequest(
cctx.cacheId(),
nodeId,
futVer,
@@ -362,7 +362,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> implement
cctx.deploymentEnabled(),
this.updateReq.keepBinary());
else
- updateReq = new GridDhtAtomicUpdateRequest(
+ updateReq = new GridDhtAtomicMultipleUpdateRequest(
cctx.cacheId(),
nodeId,
futVer,
http://git-wip-us.apache.org/repos/asf/ignite/blob/1829f441/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
deleted file mode 100644
index 4ceac74..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
+++ /dev/null
@@ -1,1004 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
-
-import java.io.Externalizable;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.UUID;
-import javax.cache.processor.EntryProcessor;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.cache.CacheWriteSynchronizationMode;
-import org.apache.ignite.internal.GridDirectCollection;
-import org.apache.ignite.internal.GridDirectTransient;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.CacheObject;
-import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
-import org.apache.ignite.internal.processors.cache.GridCacheMessage;
-import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
-import org.apache.ignite.internal.processors.cache.KeyCacheObject;
-import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.internal.util.GridLongList;
-import org.apache.ignite.internal.util.tostring.GridToStringInclude;
-import org.apache.ignite.internal.util.typedef.internal.CU;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
-import org.jetbrains.annotations.NotNull;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Lite dht cache backup update request.
- */
-public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements GridCacheDeployable, GridDhtAtomicUpdateRequestInterface {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Message index. */
- public static final int CACHE_MSG_IDX = nextIndexId();
-
- /** Node ID. */
- private UUID nodeId;
-
- /** Future version. */
- private GridCacheVersion futVer;
-
- /** Write version. */
- private GridCacheVersion writeVer;
-
- /** Topology version. */
- private AffinityTopologyVersion topVer;
-
- /** Keys to update. */
- @GridToStringInclude
- @GridDirectCollection(KeyCacheObject.class)
- private List<KeyCacheObject> keys;
-
- /** Values to update. */
- @GridToStringInclude
- @GridDirectCollection(CacheObject.class)
- private List<CacheObject> vals;
-
- /** Previous values. */
- @GridToStringInclude
- @GridDirectCollection(CacheObject.class)
- private List<CacheObject> prevVals;
-
- /** Conflict versions. */
- @GridDirectCollection(GridCacheVersion.class)
- private List<GridCacheVersion> conflictVers;
-
- /** TTLs. */
- private GridLongList ttls;
-
- /** Conflict expire time. */
- private GridLongList conflictExpireTimes;
-
- /** Near TTLs. */
- private GridLongList nearTtls;
-
- /** Near expire times. */
- private GridLongList nearExpireTimes;
-
- /** Write synchronization mode. */
- private CacheWriteSynchronizationMode syncMode;
-
- /** Near cache keys to update. */
- @GridToStringInclude
- @GridDirectCollection(KeyCacheObject.class)
- private List<KeyCacheObject> nearKeys;
-
- /** Values to update. */
- @GridToStringInclude
- @GridDirectCollection(CacheObject.class)
- private List<CacheObject> nearVals;
-
- /** Force transform backups flag. */
- private boolean forceTransformBackups;
-
- /** Entry processors. */
- @GridDirectTransient
- private List<EntryProcessor<Object, Object, Object>> entryProcessors;
-
- /** Entry processors bytes. */
- @GridDirectCollection(byte[].class)
- private List<byte[]> entryProcessorsBytes;
-
- /** Near entry processors. */
- @GridDirectTransient
- private List<EntryProcessor<Object, Object, Object>> nearEntryProcessors;
-
- /** Near entry processors bytes. */
- @GridDirectCollection(byte[].class)
- private List<byte[]> nearEntryProcessorsBytes;
-
- /** Optional arguments for entry processor. */
- @GridDirectTransient
- private Object[] invokeArgs;
-
- /** Entry processor arguments bytes. */
- private byte[][] invokeArgsBytes;
-
- /** Subject ID. */
- private UUID subjId;
-
- /** Task name hash. */
- private int taskNameHash;
-
- /** Partition. */
- private GridLongList updateCntrs;
-
- /** On response flag. Access should be synced on future. */
- @GridDirectTransient
- private boolean onRes;
-
- /** */
- @GridDirectTransient
- private List<Integer> partIds;
-
- /** */
- @GridDirectTransient
- private List<CacheObject> locPrevVals;
-
- /** Keep binary flag. */
- private boolean keepBinary;
-
- /**
- * Empty constructor required by {@link Externalizable}.
- */
- public GridDhtAtomicUpdateRequest() {
- // No-op.
- }
-
- /**
- * Constructor.
- *
- * @param cacheId Cache ID.
- * @param nodeId Node ID.
- * @param futVer Future version.
- * @param writeVer Write version for cache values.
- * @param invokeArgs Optional arguments for entry processor.
- * @param syncMode Cache write synchronization mode.
- * @param topVer Topology version.
- * @param forceTransformBackups Force transform backups flag.
- * @param subjId Subject ID.
- * @param taskNameHash Task name hash code.
- * @param addDepInfo Deployment info.
- */
- public GridDhtAtomicUpdateRequest(
- int cacheId,
- UUID nodeId,
- GridCacheVersion futVer,
- GridCacheVersion writeVer,
- CacheWriteSynchronizationMode syncMode,
- @NotNull AffinityTopologyVersion topVer,
- boolean forceTransformBackups,
- UUID subjId,
- int taskNameHash,
- Object[] invokeArgs,
- boolean addDepInfo,
- boolean keepBinary
- ) {
- assert invokeArgs == null || forceTransformBackups;
-
- this.cacheId = cacheId;
- this.nodeId = nodeId;
- this.futVer = futVer;
- this.writeVer = writeVer;
- this.syncMode = syncMode;
- this.topVer = topVer;
- this.forceTransformBackups = forceTransformBackups;
- this.subjId = subjId;
- this.taskNameHash = taskNameHash;
- this.invokeArgs = invokeArgs;
- this.addDepInfo = addDepInfo;
- this.keepBinary = keepBinary;
-
- keys = new ArrayList<>();
- partIds = new ArrayList<>();
-
- if (forceTransformBackups) {
- entryProcessors = new ArrayList<>();
- entryProcessorsBytes = new ArrayList<>();
- }
- else
- vals = new ArrayList<>();
- }
-
- /** {@inheritDoc} */
- @Override public boolean forceTransformBackups() {
- return forceTransformBackups;
- }
-
- /** {@inheritDoc} */
- @Override public void addWriteValue(KeyCacheObject key,
- @Nullable CacheObject val,
- EntryProcessor<Object, Object, Object> entryProcessor,
- long ttl,
- long conflictExpireTime,
- @Nullable GridCacheVersion conflictVer,
- boolean addPrevVal,
- int partId,
- @Nullable CacheObject prevVal,
- @Nullable Long updateCntr,
- boolean storeLocPrevVal) {
- keys.add(key);
-
- partIds.add(partId);
-
- if (storeLocPrevVal) {
- if (locPrevVals == null)
- locPrevVals = new ArrayList<>();
-
- locPrevVals.add(prevVal);
- }
-
- if (forceTransformBackups) {
- assert entryProcessor != null;
-
- entryProcessors.add(entryProcessor);
- }
- else
- vals.add(val);
-
- if (addPrevVal) {
- if (prevVals == null)
- prevVals = new ArrayList<>();
-
- prevVals.add(prevVal);
- }
-
- if (updateCntr != null) {
- if (updateCntrs == null)
- updateCntrs = new GridLongList();
-
- updateCntrs.add(updateCntr);
- }
-
- // In case there is no conflict, do not create the list.
- if (conflictVer != null) {
- if (conflictVers == null) {
- conflictVers = new ArrayList<>();
-
- for (int i = 0; i < keys.size() - 1; i++)
- conflictVers.add(null);
- }
-
- conflictVers.add(conflictVer);
- }
- else if (conflictVers != null)
- conflictVers.add(null);
-
- if (ttl >= 0) {
- if (ttls == null) {
- ttls = new GridLongList(keys.size());
-
- for (int i = 0; i < keys.size() - 1; i++)
- ttls.add(CU.TTL_NOT_CHANGED);
- }
- }
-
- if (ttls != null)
- ttls.add(ttl);
-
- if (conflictExpireTime >= 0) {
- if (conflictExpireTimes == null) {
- conflictExpireTimes = new GridLongList(keys.size());
-
- for (int i = 0; i < keys.size() - 1; i++)
- conflictExpireTimes.add(CU.EXPIRE_TIME_CALCULATE);
- }
- }
-
- if (conflictExpireTimes != null)
- conflictExpireTimes.add(conflictExpireTime);
- }
-
- /** {@inheritDoc} */
- @Override public void addNearWriteValue(KeyCacheObject key,
- @Nullable CacheObject val,
- EntryProcessor<Object, Object, Object> entryProcessor,
- long ttl,
- long expireTime) {
- if (nearKeys == null) {
- nearKeys = new ArrayList<>();
-
- if (forceTransformBackups) {
- nearEntryProcessors = new ArrayList<>();
- nearEntryProcessorsBytes = new ArrayList<>();
- }
- else
- nearVals = new ArrayList<>();
- }
-
- nearKeys.add(key);
-
- if (forceTransformBackups) {
- assert entryProcessor != null;
-
- nearEntryProcessors.add(entryProcessor);
- }
- else
- nearVals.add(val);
-
- if (ttl >= 0) {
- if (nearTtls == null) {
- nearTtls = new GridLongList(nearKeys.size());
-
- for (int i = 0; i < nearKeys.size() - 1; i++)
- nearTtls.add(CU.TTL_NOT_CHANGED);
- }
- }
-
- if (nearTtls != null)
- nearTtls.add(ttl);
-
- if (expireTime >= 0) {
- if (nearExpireTimes == null) {
- nearExpireTimes = new GridLongList(nearKeys.size());
-
- for (int i = 0; i < nearKeys.size() - 1; i++)
- nearExpireTimes.add(CU.EXPIRE_TIME_CALCULATE);
- }
- }
-
- if (nearExpireTimes != null)
- nearExpireTimes.add(expireTime);
- }
-
- /** {@inheritDoc} */
- @Override public int lookupIndex() {
- return CACHE_MSG_IDX;
- }
-
- /** {@inheritDoc} */
- @Override public UUID nodeId() {
- return nodeId;
- }
-
- /** {@inheritDoc} */
- @Override public UUID subjectId() {
- return subjId;
- }
-
- /** {@inheritDoc} */
- @Override public int taskNameHash() {
- return taskNameHash;
- }
-
- /** {@inheritDoc} */
- @Override public int size() {
- return keys.size();
- }
-
- /** {@inheritDoc} */
- @Override public int nearSize() {
- return nearKeys != null ? nearKeys.size() : 0;
- }
-
- /** {@inheritDoc} */
- @Override public GridCacheVersion futureVersion() {
- return futVer;
- }
-
- /** {@inheritDoc} */
- @Override public GridCacheVersion writeVersion() {
- return writeVer;
- }
-
- /** {@inheritDoc} */
- @Override public CacheWriteSynchronizationMode writeSynchronizationMode() {
- return syncMode;
- }
-
- /** {@inheritDoc} */
- @Override public AffinityTopologyVersion topologyVersion() {
- return topVer;
- }
-
- /** {@inheritDoc} */
- @Override public Collection<KeyCacheObject> keys() {
- return keys;
- }
-
- /** {@inheritDoc} */
- @Override public KeyCacheObject key(int idx) {
- return keys.get(idx);
- }
-
- /** {@inheritDoc} */
- @Override public int partitionId(int idx) {
- return partIds.get(idx);
- }
-
- /** {@inheritDoc} */
- @Override public Long updateCounter(int updCntr) {
- if (updateCntrs != null && updCntr < updateCntrs.size())
- return updateCntrs.get(updCntr);
-
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public KeyCacheObject nearKey(int idx) {
- return nearKeys.get(idx);
- }
-
- /** {@inheritDoc} */
- @Override public boolean keepBinary() {
- return keepBinary;
- }
-
- /** {@inheritDoc} */
- @Override @Nullable public CacheObject value(int idx) {
- if (vals != null)
- return vals.get(idx);
-
- return null;
- }
-
- /** {@inheritDoc} */
- @Override @Nullable public CacheObject previousValue(int idx) {
- if (prevVals != null)
- return prevVals.get(idx);
-
- return null;
- }
-
- /** {@inheritDoc} */
- @Override @Nullable public CacheObject localPreviousValue(int idx) {
- assert locPrevVals != null;
-
- return locPrevVals.get(idx);
- }
-
- /** {@inheritDoc} */
- @Override @Nullable public EntryProcessor<Object, Object, Object> entryProcessor(int idx) {
- return entryProcessors == null ? null : entryProcessors.get(idx);
- }
-
- /** {@inheritDoc} */
- @Override @Nullable public CacheObject nearValue(int idx) {
- if (nearVals != null)
- return nearVals.get(idx);
-
- return null;
- }
-
- /** {@inheritDoc} */
- @Override @Nullable public EntryProcessor<Object, Object, Object> nearEntryProcessor(int idx) {
- return nearEntryProcessors == null ? null : nearEntryProcessors.get(idx);
- }
-
- /** {@inheritDoc} */
- @Override @Nullable public GridCacheVersion conflictVersion(int idx) {
- if (conflictVers != null) {
- assert idx >= 0 && idx < conflictVers.size();
-
- return conflictVers.get(idx);
- }
-
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public long ttl(int idx) {
- if (ttls != null) {
- assert idx >= 0 && idx < ttls.size();
-
- return ttls.get(idx);
- }
-
- return CU.TTL_NOT_CHANGED;
- }
-
- /** {@inheritDoc} */
- @Override public long nearTtl(int idx) {
- if (nearTtls != null) {
- assert idx >= 0 && idx < nearTtls.size();
-
- return nearTtls.get(idx);
- }
-
- return CU.TTL_NOT_CHANGED;
- }
-
- /** {@inheritDoc} */
- @Override public long conflictExpireTime(int idx) {
- if (conflictExpireTimes != null) {
- assert idx >= 0 && idx < conflictExpireTimes.size();
-
- return conflictExpireTimes.get(idx);
- }
-
- return CU.EXPIRE_TIME_CALCULATE;
- }
-
- /** {@inheritDoc} */
- @Override public long nearExpireTime(int idx) {
- if (nearExpireTimes != null) {
- assert idx >= 0 && idx < nearExpireTimes.size();
-
- return nearExpireTimes.get(idx);
- }
-
- return CU.EXPIRE_TIME_CALCULATE;
- }
-
- /** {@inheritDoc} */
- @Override public boolean onResponse() {
- return !onRes && (onRes = true);
- }
-
- /** {@inheritDoc} */
- @Override @Nullable public Object[] invokeArguments() {
- return invokeArgs;
- }
-
- /** {@inheritDoc} */
- @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
- super.prepareMarshal(ctx);
-
- GridCacheContext cctx = ctx.cacheContext(cacheId);
-
- prepareMarshalCacheObjects(keys, cctx);
-
- prepareMarshalCacheObjects(vals, cctx);
-
- prepareMarshalCacheObjects(nearKeys, cctx);
-
- prepareMarshalCacheObjects(nearVals, cctx);
-
- prepareMarshalCacheObjects(prevVals, cctx);
-
- if (forceTransformBackups) {
- // force addition of deployment info for entry processors if P2P is enabled globally.
- if (!addDepInfo && ctx.deploymentEnabled())
- addDepInfo = true;
-
- if (invokeArgsBytes == null)
- invokeArgsBytes = marshalInvokeArguments(invokeArgs, cctx);
-
- if (entryProcessorsBytes == null)
- entryProcessorsBytes = marshalCollection(entryProcessors, cctx);
-
- if (nearEntryProcessorsBytes == null)
- nearEntryProcessorsBytes = marshalCollection(nearEntryProcessors, cctx);
- }
- }
-
- /** {@inheritDoc} */
- @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException {
- super.finishUnmarshal(ctx, ldr);
-
- GridCacheContext cctx = ctx.cacheContext(cacheId);
-
- finishUnmarshalCacheObjects(keys, cctx, ldr);
-
- finishUnmarshalCacheObjects(vals, cctx, ldr);
-
- finishUnmarshalCacheObjects(nearKeys, cctx, ldr);
-
- finishUnmarshalCacheObjects(nearVals, cctx, ldr);
-
- finishUnmarshalCacheObjects(prevVals, cctx, ldr);
-
- if (forceTransformBackups) {
- if (entryProcessors == null)
- entryProcessors = unmarshalCollection(entryProcessorsBytes, ctx, ldr);
-
- if (invokeArgs == null)
- invokeArgs = unmarshalInvokeArguments(invokeArgsBytes, ctx, ldr);
-
- if (nearEntryProcessors == null)
- nearEntryProcessors = unmarshalCollection(nearEntryProcessorsBytes, ctx, ldr);
- }
- }
-
- /** {@inheritDoc} */
- @Override public boolean addDeploymentInfo() {
- return addDepInfo;
- }
-
- /** {@inheritDoc} */
- @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
- writer.setBuffer(buf);
-
- if (!super.writeTo(buf, writer))
- return false;
-
- if (!writer.isHeaderWritten()) {
- if (!writer.writeHeader(directType(), fieldsCount()))
- return false;
-
- writer.onHeaderWritten();
- }
-
- switch (writer.state()) {
- case 3:
- if (!writer.writeMessage("conflictExpireTimes", conflictExpireTimes))
- return false;
-
- writer.incrementState();
-
- case 4:
- if (!writer.writeCollection("conflictVers", conflictVers, MessageCollectionItemType.MSG))
- return false;
-
- writer.incrementState();
-
- case 5:
- if (!writer.writeCollection("entryProcessorsBytes", entryProcessorsBytes, MessageCollectionItemType.BYTE_ARR))
- return false;
-
- writer.incrementState();
-
- case 6:
- if (!writer.writeBoolean("forceTransformBackups", forceTransformBackups))
- return false;
-
- writer.incrementState();
-
- case 7:
- if (!writer.writeMessage("futVer", futVer))
- return false;
-
- writer.incrementState();
-
- case 8:
- if (!writer.writeObjectArray("invokeArgsBytes", invokeArgsBytes, MessageCollectionItemType.BYTE_ARR))
- return false;
-
- writer.incrementState();
-
- case 9:
- if (!writer.writeBoolean("keepBinary", keepBinary))
- return false;
-
- writer.incrementState();
-
- case 10:
- if (!writer.writeCollection("keys", keys, MessageCollectionItemType.MSG))
- return false;
-
- writer.incrementState();
-
- case 11:
- if (!writer.writeCollection("nearEntryProcessorsBytes", nearEntryProcessorsBytes, MessageCollectionItemType.BYTE_ARR))
- return false;
-
- writer.incrementState();
-
- case 12:
- if (!writer.writeMessage("nearExpireTimes", nearExpireTimes))
- return false;
-
- writer.incrementState();
-
- case 13:
- if (!writer.writeCollection("nearKeys", nearKeys, MessageCollectionItemType.MSG))
- return false;
-
- writer.incrementState();
-
- case 14:
- if (!writer.writeMessage("nearTtls", nearTtls))
- return false;
-
- writer.incrementState();
-
- case 15:
- if (!writer.writeCollection("nearVals", nearVals, MessageCollectionItemType.MSG))
- return false;
-
- writer.incrementState();
-
- case 16:
- if (!writer.writeCollection("prevVals", prevVals, MessageCollectionItemType.MSG))
- return false;
-
- writer.incrementState();
-
- case 17:
- if (!writer.writeUuid("subjId", subjId))
- return false;
-
- writer.incrementState();
-
- case 18:
- if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1))
- return false;
-
- writer.incrementState();
-
- case 19:
- if (!writer.writeInt("taskNameHash", taskNameHash))
- return false;
-
- writer.incrementState();
-
- case 20:
- if (!writer.writeMessage("topVer", topVer))
- return false;
-
- writer.incrementState();
-
- case 21:
- if (!writer.writeMessage("ttls", ttls))
- return false;
-
- writer.incrementState();
-
- case 22:
- if (!writer.writeMessage("updateCntrs", updateCntrs))
- return false;
-
- writer.incrementState();
-
- case 23:
- if (!writer.writeCollection("vals", vals, MessageCollectionItemType.MSG))
- return false;
-
- writer.incrementState();
-
- case 24:
- if (!writer.writeMessage("writeVer", writeVer))
- return false;
-
- writer.incrementState();
-
- }
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
- reader.setBuffer(buf);
-
- if (!reader.beforeMessageRead())
- return false;
-
- if (!super.readFrom(buf, reader))
- return false;
-
- switch (reader.state()) {
- case 3:
- conflictExpireTimes = reader.readMessage("conflictExpireTimes");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 4:
- conflictVers = reader.readCollection("conflictVers", MessageCollectionItemType.MSG);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 5:
- entryProcessorsBytes = reader.readCollection("entryProcessorsBytes", MessageCollectionItemType.BYTE_ARR);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 6:
- forceTransformBackups = reader.readBoolean("forceTransformBackups");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 7:
- futVer = reader.readMessage("futVer");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 8:
- invokeArgsBytes = reader.readObjectArray("invokeArgsBytes", MessageCollectionItemType.BYTE_ARR, byte[].class);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 9:
- keepBinary = reader.readBoolean("keepBinary");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 10:
- keys = reader.readCollection("keys", MessageCollectionItemType.MSG);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 11:
- nearEntryProcessorsBytes = reader.readCollection("nearEntryProcessorsBytes", MessageCollectionItemType.BYTE_ARR);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 12:
- nearExpireTimes = reader.readMessage("nearExpireTimes");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 13:
- nearKeys = reader.readCollection("nearKeys", MessageCollectionItemType.MSG);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 14:
- nearTtls = reader.readMessage("nearTtls");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 15:
- nearVals = reader.readCollection("nearVals", MessageCollectionItemType.MSG);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 16:
- prevVals = reader.readCollection("prevVals", MessageCollectionItemType.MSG);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 17:
- subjId = reader.readUuid("subjId");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 18:
- byte syncModeOrd;
-
- syncModeOrd = reader.readByte("syncMode");
-
- if (!reader.isLastRead())
- return false;
-
- syncMode = CacheWriteSynchronizationMode.fromOrdinal(syncModeOrd);
-
- reader.incrementState();
-
- case 19:
- taskNameHash = reader.readInt("taskNameHash");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 20:
- topVer = reader.readMessage("topVer");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 21:
- ttls = reader.readMessage("ttls");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 22:
- updateCntrs = reader.readMessage("updateCntrs");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 23:
- vals = reader.readCollection("vals", MessageCollectionItemType.MSG);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 24:
- writeVer = reader.readMessage("writeVer");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- }
-
- return reader.afterMessageRead(GridDhtAtomicUpdateRequest.class);
- }
-
- /** {@inheritDoc} */
- @Override public void onAckReceived() {
- cleanup();
- }
-
- /**
- * Cleanup values not needed after message was sent.
- */
- private void cleanup() {
- nearVals = null;
- prevVals = null;
-
- // Do not keep values if they are not needed for continuous query notification.
- if (locPrevVals == null) {
- keys = null;
- vals = null;
- locPrevVals = null;
- }
- }
-
- /** {@inheritDoc} */
- @Override public byte directType() {
- return 38;
- }
-
- /** {@inheritDoc} */
- @Override public byte fieldsCount() {
- return 25;
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(GridDhtAtomicUpdateRequest.class, this, "super", super.toString());
- }
-}