You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/02/25 13:31:19 UTC
[20/51] [abbrv] ignite git commit: ignite-2523 : Created
SingleUpdateResponse
ignite-2523 : Created SingleUpdateResponse
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/dfdd2f4f
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/dfdd2f4f
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/dfdd2f4f
Branch: refs/heads/ignite-2523
Commit: dfdd2f4f907651a86da6fd52f18d2f189f65279d
Parents: a18c352
Author: Ilya Lantukh <il...@gridgain.com>
Authored: Mon Feb 8 17:23:02 2016 +0300
Committer: Ilya Lantukh <il...@gridgain.com>
Committed: Mon Feb 8 17:23:02 2016 +0300
----------------------------------------------------------------------
.../communication/GridIoMessageFactory.java | 10 +-
.../processors/cache/GridCacheIoManager.java | 10 +-
.../dht/atomic/GridDhtAtomicCache.java | 38 +-
.../dht/atomic/GridDhtAtomicUpdateFuture.java | 8 +-
.../GridNearAtomicMultipleUpdateRequest.java | 6 +-
.../GridNearAtomicMultipleUpdateResponse.java | 640 ++++++++++++++++++
.../GridNearAtomicSingleUpdateRequest.java | 6 +-
.../GridNearAtomicSingleUpdateResponse.java | 641 +++++++++++++++++++
.../dht/atomic/GridNearAtomicUpdateFuture.java | 22 +-
.../dht/atomic/GridNearAtomicUpdateRequest.java | 4 +-
.../atomic/GridNearAtomicUpdateResponse.java | 616 ++----------------
.../distributed/near/GridNearAtomicCache.java | 4 +-
.../IgniteClientReconnectCacheTest.java | 4 +-
.../IgniteClientReconnectCollectionsTest.java | 6 +-
14 files changed, 1382 insertions(+), 633 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/dfdd2f4f/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 88e34c9..4d769af 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -68,9 +68,10 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtUnlock
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicDeferredUpdateResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateResponse;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicMultipleUpdateResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicMultipleUpdateRequest;
-import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateResponse;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
@@ -368,7 +369,7 @@ public class GridIoMessageFactory implements MessageFactory {
break;
case 41:
- msg = new GridNearAtomicUpdateResponse();
+ msg = new GridNearAtomicMultipleUpdateResponse();
break;
@@ -732,6 +733,11 @@ public class GridIoMessageFactory implements MessageFactory {
break;
+ case -24:
+ msg = new GridNearAtomicSingleUpdateResponse();
+
+ break;
+
// [-3..119] [124] - this
// [120..123] - DR
// [-4..-22] - SQL
http://git-wip-us.apache.org/repos/asf/ignite/blob/dfdd2f4f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index 8a8f161..ea97277 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -46,7 +46,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDh
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicMultipleUpdateRequest;
-import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateResponse;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicMultipleUpdateResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysResponse;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest;
@@ -412,7 +412,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
case 40: {
GridNearAtomicMultipleUpdateRequest req = (GridNearAtomicMultipleUpdateRequest)msg;
- GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(
+ GridNearAtomicMultipleUpdateResponse res = new GridNearAtomicMultipleUpdateResponse(
ctx.cacheId(),
nodeId,
req.futureVersion(),
@@ -443,7 +443,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
break;
case 45: {
- processMessage(nodeId,msg,c);// Will be handled by Rebalance Demander.
+ processMessage(nodeId, msg, c);// Will be handled by Rebalance Demander.
}
break;
@@ -544,7 +544,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
break;
case 114: {
- processMessage(nodeId,msg,c);// Will be handled by Rebalance Demander.
+ processMessage(nodeId, msg, c);// Will be handled by Rebalance Demander.
}
break;
@@ -590,7 +590,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
case -23: {
GridNearAtomicSingleUpdateRequest req = (GridNearAtomicSingleUpdateRequest)msg;
- GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(
+ GridNearAtomicMultipleUpdateResponse res = new GridNearAtomicMultipleUpdateResponse(
ctx.cacheId(),
nodeId,
req.futureVersion(),
http://git-wip-us.apache.org/repos/asf/ignite/blob/dfdd2f4f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 55db70a..b0504db 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -139,7 +139,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
Integer.getInteger(IGNITE_ATOMIC_DEFERRED_ACK_TIMEOUT, 500);
/** Update reply closure. */
- private CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> updateReplyClos;
+ private CI2<GridNearAtomicUpdateRequest, GridNearAtomicMultipleUpdateResponse> updateReplyClos;
/** Pending */
private ConcurrentMap<UUID, DeferredResponseBuffer> pendingResponses = new ConcurrentHashMap8<>();
@@ -192,9 +192,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
}
});
- updateReplyClos = new CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse>() {
+ updateReplyClos = new CI2<GridNearAtomicUpdateRequest, GridNearAtomicMultipleUpdateResponse>() {
@SuppressWarnings("ThrowableResultOfMethodCallIgnored")
- @Override public void apply(GridNearAtomicUpdateRequest req, GridNearAtomicUpdateResponse res) {
+ @Override public void apply(GridNearAtomicUpdateRequest req, GridNearAtomicMultipleUpdateResponse res) {
if (ctx.config().getAtomicWriteOrderMode() == CLOCK) {
assert req.writeSynchronizationMode() != FULL_ASYNC : req;
@@ -262,8 +262,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
}
});
- ctx.io().addHandler(ctx.cacheId(), GridNearAtomicUpdateResponse.class, new CI2<UUID, GridNearAtomicUpdateResponse>() {
- @Override public void apply(UUID nodeId, GridNearAtomicUpdateResponse res) {
+ ctx.io().addHandler(ctx.cacheId(), GridNearAtomicMultipleUpdateResponse.class, new CI2<UUID, GridNearAtomicMultipleUpdateResponse>() {
+ @Override public void apply(UUID nodeId, GridNearAtomicMultipleUpdateResponse res) {
processNearAtomicUpdateResponse(nodeId, res);
}
});
@@ -1311,7 +1311,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
public void updateAllAsyncInternal(
final UUID nodeId,
final GridNearAtomicUpdateRequest req,
- final CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb
+ final CI2<GridNearAtomicUpdateRequest, GridNearAtomicMultipleUpdateResponse> completionCb
) {
IgniteInternalFuture<Object> forceFut = preldr.request(req.keys(), req.topologyVersion());
@@ -1336,9 +1336,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
public void updateAllAsyncInternal0(
UUID nodeId,
GridNearAtomicUpdateRequest req,
- CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb
+ CI2<GridNearAtomicUpdateRequest, GridNearAtomicMultipleUpdateResponse> completionCb
) {
- GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(ctx.cacheId(), nodeId, req.futureVersion(),
+ GridNearAtomicMultipleUpdateResponse res = new GridNearAtomicMultipleUpdateResponse(ctx.cacheId(), nodeId, req.futureVersion(),
ctx.deploymentEnabled());
List<KeyCacheObject> keys = req.keys();
@@ -1559,11 +1559,11 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
ClusterNode node,
boolean hasNear,
GridNearAtomicUpdateRequest req,
- GridNearAtomicUpdateResponse res,
+ GridNearAtomicMultipleUpdateResponse res,
List<GridDhtCacheEntry> locked,
GridCacheVersion ver,
@Nullable GridDhtAtomicUpdateFuture dhtFut,
- CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb,
+ CI2<GridNearAtomicUpdateRequest, GridNearAtomicMultipleUpdateResponse> completionCb,
boolean replicate,
String taskName,
@Nullable IgniteCacheExpiryPolicy expiry,
@@ -1975,11 +1975,11 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
ClusterNode node,
boolean hasNear,
GridNearAtomicUpdateRequest req,
- GridNearAtomicUpdateResponse res,
+ GridNearAtomicMultipleUpdateResponse res,
List<GridDhtCacheEntry> locked,
GridCacheVersion ver,
@Nullable GridDhtAtomicUpdateFuture dhtFut,
- CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb,
+ CI2<GridNearAtomicUpdateRequest, GridNearAtomicMultipleUpdateResponse> completionCb,
boolean replicate,
String taskName,
@Nullable IgniteCacheExpiryPolicy expiry,
@@ -2214,9 +2214,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
@Nullable Collection<KeyCacheObject> rmvKeys,
@Nullable Map<KeyCacheObject, EntryProcessor<Object, Object, Object>> entryProcessorMap,
@Nullable GridDhtAtomicUpdateFuture dhtFut,
- CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb,
+ CI2<GridNearAtomicUpdateRequest, GridNearAtomicMultipleUpdateResponse> completionCb,
final GridNearAtomicUpdateRequest req,
- final GridNearAtomicUpdateResponse res,
+ final GridNearAtomicMultipleUpdateResponse res,
boolean replicate,
UpdateBatchResult batchRes,
String taskName,
@@ -2594,7 +2594,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
* @return {@code True} if filter evaluation succeeded.
*/
private boolean checkFilter(GridCacheEntryEx entry, GridNearAtomicUpdateRequest req,
- GridNearAtomicUpdateResponse res) {
+ GridNearAtomicMultipleUpdateResponse res) {
try {
return ctx.isAllLocked(entry, req.filter());
}
@@ -2688,8 +2688,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
@Nullable private GridDhtAtomicUpdateFuture createDhtFuture(
GridCacheVersion writeVer,
GridNearAtomicUpdateRequest updateReq,
- GridNearAtomicUpdateResponse updateRes,
- CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb,
+ GridNearAtomicMultipleUpdateResponse updateRes,
+ CI2<GridNearAtomicUpdateRequest, GridNearAtomicMultipleUpdateResponse> completionCb,
boolean force
) {
if (!force) {
@@ -2734,7 +2734,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
* @param res Near atomic update response.
*/
@SuppressWarnings("unchecked")
- private void processNearAtomicUpdateResponse(UUID nodeId, GridNearAtomicUpdateResponse res) {
+ private void processNearAtomicUpdateResponse(UUID nodeId, GridNearAtomicMultipleUpdateResponse res) {
if (log.isDebugEnabled())
log.debug("Processing near atomic update response [nodeId=" + nodeId + ", res=" + res + ']');
@@ -2944,7 +2944,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
* @param nodeId Originating node ID.
* @param res Near update response.
*/
- private void sendNearUpdateReply(UUID nodeId, GridNearAtomicUpdateResponse res) {
+ private void sendNearUpdateReply(UUID nodeId, GridNearAtomicMultipleUpdateResponse res) {
try {
ctx.io().send(nodeId, res, ctx.ioPolicy());
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/dfdd2f4f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
index 3a31700..68c639d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
@@ -77,7 +77,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> implement
/** Completion callback. */
@GridToStringExclude
- private final CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb;
+ private final CI2<GridNearAtomicUpdateRequest, GridNearAtomicMultipleUpdateResponse> completionCb;
/** Mappings. */
@GridToStringInclude
@@ -90,7 +90,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> implement
private final GridNearAtomicUpdateRequest updateReq;
/** Update response. */
- private final GridNearAtomicUpdateResponse updateRes;
+ private final GridNearAtomicMultipleUpdateResponse updateRes;
/** Future keys. */
private final Collection<KeyCacheObject> keys;
@@ -110,10 +110,10 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> implement
*/
public GridDhtAtomicUpdateFuture(
GridCacheContext cctx,
- CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb,
+ CI2<GridNearAtomicUpdateRequest, GridNearAtomicMultipleUpdateResponse> completionCb,
GridCacheVersion writeVer,
GridNearAtomicUpdateRequest updateReq,
- GridNearAtomicUpdateResponse updateRes
+ GridNearAtomicMultipleUpdateResponse updateRes
) {
this.cctx = cctx;
this.writeVer = writeVer;
http://git-wip-us.apache.org/repos/asf/ignite/blob/dfdd2f4f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicMultipleUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicMultipleUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicMultipleUpdateRequest.java
index 650d350..6f109be 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicMultipleUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicMultipleUpdateRequest.java
@@ -157,7 +157,7 @@ public class GridNearAtomicMultipleUpdateRequest extends GridCacheMessage
/** */
@GridDirectTransient
- private GridNearAtomicUpdateResponse res;
+ private GridNearAtomicMultipleUpdateResponse res;
/** Maximum possible size of inner collections. */
@GridDirectTransient
@@ -502,7 +502,7 @@ public class GridNearAtomicMultipleUpdateRequest extends GridCacheMessage
}
/** {@inheritDoc} */
- @Override public boolean onResponse(GridNearAtomicUpdateResponse res) {
+ @Override public boolean onResponse(GridNearAtomicMultipleUpdateResponse res) {
if (this.res == null) {
this.res = res;
@@ -513,7 +513,7 @@ public class GridNearAtomicMultipleUpdateRequest extends GridCacheMessage
}
/** {@inheritDoc} */
- @Override @Nullable public GridNearAtomicUpdateResponse response() {
+ @Override @Nullable public GridNearAtomicMultipleUpdateResponse response() {
return res;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/dfdd2f4f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicMultipleUpdateResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicMultipleUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicMultipleUpdateResponse.java
new file mode 100644
index 0000000..d22acc4
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicMultipleUpdateResponse.java
@@ -0,0 +1,640 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
+
+import java.io.Externalizable;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.GridDirectCollection;
+import org.apache.ignite.internal.GridDirectTransient;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
+import org.apache.ignite.internal.processors.cache.GridCacheMessage;
+import org.apache.ignite.internal.processors.cache.GridCacheReturn;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.GridLongList;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * DHT atomic cache near update response.
+ */
+public class GridNearAtomicMultipleUpdateResponse extends GridCacheMessage implements GridCacheDeployable, GridNearAtomicUpdateResponse {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Cache message index. */
+ public static final int CACHE_MSG_IDX = nextIndexId();
+
+ /** Node ID this reply should be sent to. */
+ @GridDirectTransient
+ private UUID nodeId;
+
+ /** Future version. */
+ private GridCacheVersion futVer;
+
+ /** Update error. */
+ @GridDirectTransient
+ private volatile IgniteCheckedException err;
+
+ /** Serialized error. */
+ private byte[] errBytes;
+
+ /** Return value. */
+ @GridToStringInclude
+ private GridCacheReturn ret;
+
+ /** Failed keys. */
+ @GridToStringInclude
+ @GridDirectCollection(KeyCacheObject.class)
+ private volatile Collection<KeyCacheObject> failedKeys;
+
+ /** Keys that should be remapped. */
+ @GridToStringInclude
+ @GridDirectCollection(KeyCacheObject.class)
+ private List<KeyCacheObject> remapKeys;
+
+ /** Indexes of keys for which values were generated on primary node (used if originating node has near cache). */
+ @GridDirectCollection(int.class)
+ private List<Integer> nearValsIdxs;
+
+ /** Indexes of keys for which update was skipped (used if originating node has near cache). */
+ @GridDirectCollection(int.class)
+ private List<Integer> nearSkipIdxs;
+
+ /** Values generated on primary node which should be put to originating node's near cache. */
+ @GridToStringInclude
+ @GridDirectCollection(CacheObject.class)
+ private List<CacheObject> nearVals;
+
+ /** Version generated on primary node to be used for originating node's near cache update. */
+ private GridCacheVersion nearVer;
+
+ /** Near TTLs. */
+ private GridLongList nearTtls;
+
+ /** Near expire times. */
+ private GridLongList nearExpireTimes;
+
+ /**
+ * Empty constructor required by {@link Externalizable}.
+ */
+ public GridNearAtomicMultipleUpdateResponse() {
+ // No-op.
+ }
+
+ /**
+ * @param cacheId Cache ID.
+ * @param nodeId Node ID this reply should be sent to.
+ * @param futVer Future version.
+ * @param addDepInfo Deployment info flag.
+ */
+ public GridNearAtomicMultipleUpdateResponse(int cacheId, UUID nodeId, GridCacheVersion futVer, boolean addDepInfo) {
+ assert futVer != null;
+
+ this.cacheId = cacheId;
+ this.nodeId = nodeId;
+ this.futVer = futVer;
+ this.addDepInfo = addDepInfo;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int lookupIndex() {
+ return CACHE_MSG_IDX;
+ }
+
+ /**
+ * @return Node ID this response should be sent to.
+ */
+ @Override public UUID nodeId() {
+ return nodeId;
+ }
+
+ /**
+ * @param nodeId Node ID.
+ */
+ @Override public void nodeId(UUID nodeId) {
+ this.nodeId = nodeId;
+ }
+
+ /**
+ * @return Future version.
+ */
+ @Override public GridCacheVersion futureVersion() {
+ return futVer;
+ }
+
+ /**
+ * Sets update error.
+ *
+ * @param err Error.
+ */
+ @Override public void error(IgniteCheckedException err){
+ this.err = err;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteCheckedException error() {
+ return err;
+ }
+
+ /**
+ * @return Collection of failed keys.
+ */
+ @Override public Collection<KeyCacheObject> failedKeys() {
+ return failedKeys;
+ }
+
+ /**
+ * @return Return value.
+ */
+ @Override public GridCacheReturn returnValue() {
+ return ret;
+ }
+
+ /**
+ * @param ret Return value.
+ */
+ @Override @SuppressWarnings("unchecked")
+ public void returnValue(GridCacheReturn ret) {
+ this.ret = ret;
+ }
+
+ /**
+ * @param remapKeys Remap keys.
+ */
+ @Override public void remapKeys(List<KeyCacheObject> remapKeys) {
+ this.remapKeys = remapKeys;
+ }
+
+ /**
+ * @return Remap keys.
+ */
+ @Override public Collection<KeyCacheObject> remapKeys() {
+ return remapKeys;
+ }
+
+ /**
+ * Adds value to be put in near cache on originating node.
+ *
+ * @param keyIdx Key index.
+ * @param val Value.
+ * @param ttl TTL for near cache update.
+ * @param expireTime Expire time for near cache update.
+ */
+ @Override public void addNearValue(int keyIdx,
+ @Nullable CacheObject val,
+ long ttl,
+ long expireTime) {
+ if (nearValsIdxs == null) {
+ nearValsIdxs = new ArrayList<>();
+ nearVals = new ArrayList<>();
+ }
+
+ addNearTtl(keyIdx, ttl, expireTime);
+
+ nearValsIdxs.add(keyIdx);
+ nearVals.add(val);
+ }
+
+ /**
+ * @param keyIdx Key index.
+ * @param ttl TTL for near cache update.
+ * @param expireTime Expire time for near cache update.
+ */
+ @Override @SuppressWarnings("ForLoopReplaceableByForEach")
+ public void addNearTtl(int keyIdx, long ttl, long expireTime) {
+ if (ttl >= 0) {
+ if (nearTtls == null) {
+ nearTtls = new GridLongList(16);
+
+ for (int i = 0; i < keyIdx; i++)
+ nearTtls.add(-1L);
+ }
+ }
+
+ if (nearTtls != null)
+ nearTtls.add(ttl);
+
+ if (expireTime >= 0) {
+ if (nearExpireTimes == null) {
+ nearExpireTimes = new GridLongList(16);
+
+ for (int i = 0; i < keyIdx; i++)
+ nearExpireTimes.add(-1);
+ }
+ }
+
+ if (nearExpireTimes != null)
+ nearExpireTimes.add(expireTime);
+ }
+
+ /**
+ * @param idx Index.
+ * @return Expire time for near cache update.
+ */
+ @Override public long nearExpireTime(int idx) {
+ if (nearExpireTimes != null) {
+ assert idx >= 0 && idx < nearExpireTimes.size();
+
+ return nearExpireTimes.get(idx);
+ }
+
+ return -1L;
+ }
+
+ /**
+ * @param idx Index.
+ * @return TTL for near cache update.
+ */
+ @Override public long nearTtl(int idx) {
+ if (nearTtls != null) {
+ assert idx >= 0 && idx < nearTtls.size();
+
+ return nearTtls.get(idx);
+ }
+
+ return -1L;
+ }
+
+ /**
+ * @param nearVer Version generated on primary node to be used for originating node's near cache update.
+ */
+ @Override public void nearVersion(GridCacheVersion nearVer) {
+ this.nearVer = nearVer;
+ }
+
+ /**
+ * @return Version generated on primary node to be used for originating node's near cache update.
+ */
+ @Override public GridCacheVersion nearVersion() {
+ return nearVer;
+ }
+
+ /**
+ * @param keyIdx Index of key for which update was skipped
+ */
+ @Override public void addSkippedIndex(int keyIdx) {
+ if (nearSkipIdxs == null)
+ nearSkipIdxs = new ArrayList<>();
+
+ nearSkipIdxs.add(keyIdx);
+
+ addNearTtl(keyIdx, -1L, -1L);
+ }
+
+ /**
+ * @return Indexes of keys for which update was skipped
+ */
+ @Override @Nullable public List<Integer> skippedIndexes() {
+ return nearSkipIdxs;
+ }
+
+ /**
+ * @return Indexes of keys for which values were generated on primary node.
+ */
+ @Override @Nullable public List<Integer> nearValuesIndexes() {
+ return nearValsIdxs;
+ }
+
+ /**
+ * @param idx Index.
+ * @return Value generated on primary node which should be put to originating node's near cache.
+ */
+ @Override @Nullable public CacheObject nearValue(int idx) {
+ return nearVals.get(idx);
+ }
+
+ /**
+ * Adds key to collection of failed keys.
+ *
+ * @param key Key to add.
+ * @param e Error cause.
+ */
+ @Override public synchronized void addFailedKey(KeyCacheObject key, Throwable e) {
+ if (failedKeys == null)
+ failedKeys = new ConcurrentLinkedQueue<>();
+
+ failedKeys.add(key);
+
+ if (err == null)
+ err = new IgniteCheckedException("Failed to update keys on primary node.");
+
+ err.addSuppressed(e);
+ }
+
+ /**
+ * Adds keys to collection of failed keys.
+ *
+ * @param keys Key to add.
+ * @param e Error cause.
+ */
+ @Override public synchronized void addFailedKeys(Collection<KeyCacheObject> keys, Throwable e) {
+ if (keys != null) {
+ if (failedKeys == null)
+ failedKeys = new ArrayList<>(keys.size());
+
+ failedKeys.addAll(keys);
+ }
+
+ if (err == null)
+ err = new IgniteCheckedException("Failed to update keys on primary node.");
+
+ err.addSuppressed(e);
+ }
+
+ /**
+ * Adds keys to collection of failed keys.
+ *
+ * @param keys Key to add.
+ * @param e Error cause.
+ * @param ctx Context.
+ */
+ @Override public synchronized void addFailedKeys(Collection<KeyCacheObject> keys, Throwable e, GridCacheContext ctx) {
+ if (failedKeys == null)
+ failedKeys = new ArrayList<>(keys.size());
+
+ failedKeys.addAll(keys);
+
+ if (err == null)
+ err = new IgniteCheckedException("Failed to update keys on primary node.");
+
+ err.addSuppressed(e);
+ }
+
+ /** {@inheritDoc}
+ * @param ctx*/
+ @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
+ super.prepareMarshal(ctx);
+
+ if (err != null && errBytes == null)
+ errBytes = ctx.marshaller().marshal(err);
+
+ GridCacheContext cctx = ctx.cacheContext(cacheId);
+
+ prepareMarshalCacheObjects(failedKeys, cctx);
+
+ prepareMarshalCacheObjects(remapKeys, cctx);
+
+ prepareMarshalCacheObjects(nearVals, cctx);
+
+ if (ret != null)
+ ret.prepareMarshal(cctx);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException {
+ super.finishUnmarshal(ctx, ldr);
+
+ if (errBytes != null && err == null)
+ err = ctx.marshaller().unmarshal(errBytes, ldr);
+
+ GridCacheContext cctx = ctx.cacheContext(cacheId);
+
+ finishUnmarshalCacheObjects(failedKeys, cctx, ldr);
+
+ finishUnmarshalCacheObjects(remapKeys, cctx, ldr);
+
+ finishUnmarshalCacheObjects(nearVals, cctx, ldr);
+
+ if (ret != null)
+ ret.finishUnmarshal(cctx, ldr);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean addDeploymentInfo() {
+ return addDepInfo;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ writer.setBuffer(buf);
+
+ if (!super.writeTo(buf, writer))
+ return false;
+
+ if (!writer.isHeaderWritten()) {
+ if (!writer.writeHeader(directType(), fieldsCount()))
+ return false;
+
+ writer.onHeaderWritten();
+ }
+
+ switch (writer.state()) {
+ case 3:
+ if (!writer.writeByteArray("errBytes", errBytes))
+ return false;
+
+ writer.incrementState();
+
+ case 4:
+ if (!writer.writeCollection("failedKeys", failedKeys, MessageCollectionItemType.MSG))
+ return false;
+
+ writer.incrementState();
+
+ case 5:
+ if (!writer.writeMessage("futVer", futVer))
+ return false;
+
+ writer.incrementState();
+
+ case 6:
+ if (!writer.writeMessage("nearExpireTimes", nearExpireTimes))
+ return false;
+
+ writer.incrementState();
+
+ case 7:
+ if (!writer.writeCollection("nearSkipIdxs", nearSkipIdxs, MessageCollectionItemType.INT))
+ return false;
+
+ writer.incrementState();
+
+ case 8:
+ if (!writer.writeMessage("nearTtls", nearTtls))
+ return false;
+
+ writer.incrementState();
+
+ case 9:
+ if (!writer.writeCollection("nearVals", nearVals, MessageCollectionItemType.MSG))
+ return false;
+
+ writer.incrementState();
+
+ case 10:
+ if (!writer.writeCollection("nearValsIdxs", nearValsIdxs, MessageCollectionItemType.INT))
+ return false;
+
+ writer.incrementState();
+
+ case 11:
+ if (!writer.writeMessage("nearVer", nearVer))
+ return false;
+
+ writer.incrementState();
+
+ case 12:
+ if (!writer.writeCollection("remapKeys", remapKeys, MessageCollectionItemType.MSG))
+ return false;
+
+ writer.incrementState();
+
+ case 13:
+ if (!writer.writeMessage("ret", ret))
+ return false;
+
+ writer.incrementState();
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ reader.setBuffer(buf);
+
+ if (!reader.beforeMessageRead())
+ return false;
+
+ if (!super.readFrom(buf, reader))
+ return false;
+
+ switch (reader.state()) {
+ case 3:
+ errBytes = reader.readByteArray("errBytes");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 4:
+ failedKeys = reader.readCollection("failedKeys", MessageCollectionItemType.MSG);
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 5:
+ futVer = reader.readMessage("futVer");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 6:
+ nearExpireTimes = reader.readMessage("nearExpireTimes");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 7:
+ nearSkipIdxs = reader.readCollection("nearSkipIdxs", MessageCollectionItemType.INT);
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 8:
+ nearTtls = reader.readMessage("nearTtls");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 9:
+ nearVals = reader.readCollection("nearVals", MessageCollectionItemType.MSG);
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 10:
+ nearValsIdxs = reader.readCollection("nearValsIdxs", MessageCollectionItemType.INT);
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 11:
+ nearVer = reader.readMessage("nearVer");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 12:
+ remapKeys = reader.readCollection("remapKeys", MessageCollectionItemType.MSG);
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 13:
+ ret = reader.readMessage("ret");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ }
+
+ return reader.afterMessageRead(GridNearAtomicMultipleUpdateResponse.class);
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte directType() {
+ return 41;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte fieldsCount() {
+ return 14;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridNearAtomicMultipleUpdateResponse.class, this, "parent");
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/dfdd2f4f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
index 1e981af..8714010 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
@@ -149,7 +149,7 @@ public class GridNearAtomicSingleUpdateRequest extends GridCacheMessage
/** */
@GridDirectTransient
- private GridNearAtomicUpdateResponse res;
+ private GridNearAtomicMultipleUpdateResponse res;
/**
* Empty constructor required by {@link Externalizable}.
@@ -427,7 +427,7 @@ public class GridNearAtomicSingleUpdateRequest extends GridCacheMessage
}
/** {@inheritDoc} */
- @Override public boolean onResponse(GridNearAtomicUpdateResponse res) {
+ @Override public boolean onResponse(GridNearAtomicMultipleUpdateResponse res) {
if (this.res == null) {
this.res = res;
@@ -438,7 +438,7 @@ public class GridNearAtomicSingleUpdateRequest extends GridCacheMessage
}
/** {@inheritDoc} */
- @Override @Nullable public GridNearAtomicUpdateResponse response() {
+ @Override @Nullable public GridNearAtomicMultipleUpdateResponse response() {
return res;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/dfdd2f4f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateResponse.java
new file mode 100644
index 0000000..581c33b
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateResponse.java
@@ -0,0 +1,641 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.GridDirectCollection;
+import org.apache.ignite.internal.GridDirectTransient;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
+import org.apache.ignite.internal.processors.cache.GridCacheMessage;
+import org.apache.ignite.internal.processors.cache.GridCacheReturn;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.GridLongList;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+import org.jetbrains.annotations.Nullable;
+
+import java.io.Externalizable;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+public class GridNearAtomicSingleUpdateResponse extends GridCacheMessage implements GridCacheDeployable, GridNearAtomicUpdateResponse {
+
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Cache message index. */
+ public static final int CACHE_MSG_IDX = nextIndexId();
+
+ /** Node ID this reply should be sent to. */
+ @GridDirectTransient
+ private UUID nodeId;
+
+ /** Future version. */
+ private GridCacheVersion futVer;
+
+ /** Update error. */
+ @GridDirectTransient
+ private volatile IgniteCheckedException err;
+
+ /** Serialized error. */
+ private byte[] errBytes;
+
+ /** Return value. */
+ @GridToStringInclude
+ private GridCacheReturn ret;
+
+ /** Failed keys. */
+ @GridToStringInclude
+ @GridDirectCollection(KeyCacheObject.class)
+ private volatile Collection<KeyCacheObject> failedKeys;
+
+ /** Keys that should be remapped. */
+ @GridToStringInclude
+ @GridDirectCollection(KeyCacheObject.class)
+ private List<KeyCacheObject> remapKeys;
+
+ /** Indexes of keys for which values were generated on primary node (used if originating node has near cache). */
+ @GridDirectCollection(int.class)
+ private List<Integer> nearValsIdxs;
+
+ /** Indexes of keys for which update was skipped (used if originating node has near cache). */
+ @GridDirectCollection(int.class)
+ private List<Integer> nearSkipIdxs;
+
+ /** Values generated on primary node which should be put to originating node's near cache. */
+ @GridToStringInclude
+ @GridDirectCollection(CacheObject.class)
+ private List<CacheObject> nearVals;
+
+ /** Version generated on primary node to be used for originating node's near cache update. */
+ private GridCacheVersion nearVer;
+
+ /** Near TTLs. */
+ private GridLongList nearTtls;
+
+ /** Near expire times. */
+ private GridLongList nearExpireTimes;
+
+ /**
+ * Empty constructor required by {@link Externalizable}.
+ */
+ public GridNearAtomicSingleUpdateResponse() {
+ // No-op.
+ }
+
+ /**
+ * @param cacheId Cache ID.
+ * @param nodeId Node ID this reply should be sent to.
+ * @param futVer Future version.
+ * @param addDepInfo Deployment info flag.
+ */
+ public GridNearAtomicSingleUpdateResponse(int cacheId, UUID nodeId, GridCacheVersion futVer, boolean addDepInfo) {
+ assert futVer != null;
+
+ this.cacheId = cacheId;
+ this.nodeId = nodeId;
+ this.futVer = futVer;
+ this.addDepInfo = addDepInfo;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int lookupIndex() {
+ return CACHE_MSG_IDX;
+ }
+
+ /**
+ * @return Node ID this response should be sent to.
+ */
+ @Override public UUID nodeId() {
+ return nodeId;
+ }
+
+ /**
+ * @param nodeId Node ID.
+ */
+ @Override public void nodeId(UUID nodeId) {
+ this.nodeId = nodeId;
+ }
+
+ /**
+ * @return Future version.
+ */
+ @Override public GridCacheVersion futureVersion() {
+ return futVer;
+ }
+
+ /**
+ * Sets update error.
+ *
+ * @param err Error.
+ */
+ @Override public void error(IgniteCheckedException err) {
+ this.err = err;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteCheckedException error() {
+ return err;
+ }
+
+ /**
+ * @return Collection of failed keys.
+ */
+ @Override public Collection<KeyCacheObject> failedKeys() {
+ return failedKeys;
+ }
+
+ /**
+ * @return Return value.
+ */
+ @Override public GridCacheReturn returnValue() {
+ return ret;
+ }
+
+ /**
+ * @param ret Return value.
+ */
+ @Override @SuppressWarnings("unchecked")
+ public void returnValue(GridCacheReturn ret) {
+ this.ret = ret;
+ }
+
+ /**
+ * @param remapKeys Remap keys.
+ */
+ @Override public void remapKeys(List<KeyCacheObject> remapKeys) {
+ this.remapKeys = remapKeys;
+ }
+
+ /**
+ * @return Remap keys.
+ */
+ @Override public Collection<KeyCacheObject> remapKeys() {
+ return remapKeys;
+ }
+
+ /**
+ * Adds value to be put in near cache on originating node.
+ *
+ * @param keyIdx Key index.
+ * @param val Value.
+ * @param ttl TTL for near cache update.
+ * @param expireTime Expire time for near cache update.
+ */
+ @Override public void addNearValue(int keyIdx,
+ @Nullable CacheObject val,
+ long ttl,
+ long expireTime) {
+ if (nearValsIdxs == null) {
+ nearValsIdxs = new ArrayList<>();
+ nearVals = new ArrayList<>();
+ }
+
+ addNearTtl(keyIdx, ttl, expireTime);
+
+ nearValsIdxs.add(keyIdx);
+ nearVals.add(val);
+ }
+
+ /**
+ * @param keyIdx Key index.
+ * @param ttl TTL for near cache update.
+ * @param expireTime Expire time for near cache update.
+ */
+ @Override @SuppressWarnings("ForLoopReplaceableByForEach")
+ public void addNearTtl(int keyIdx, long ttl, long expireTime) {
+ if (ttl >= 0) {
+ if (nearTtls == null) {
+ nearTtls = new GridLongList(16);
+
+ for (int i = 0; i < keyIdx; i++)
+ nearTtls.add(-1L);
+ }
+ }
+
+ if (nearTtls != null)
+ nearTtls.add(ttl);
+
+ if (expireTime >= 0) {
+ if (nearExpireTimes == null) {
+ nearExpireTimes = new GridLongList(16);
+
+ for (int i = 0; i < keyIdx; i++)
+ nearExpireTimes.add(-1);
+ }
+ }
+
+ if (nearExpireTimes != null)
+ nearExpireTimes.add(expireTime);
+ }
+
+ /**
+ * @param idx Index.
+ * @return Expire time for near cache update.
+ */
+ @Override public long nearExpireTime(int idx) {
+ if (nearExpireTimes != null) {
+ assert idx >= 0 && idx < nearExpireTimes.size();
+
+ return nearExpireTimes.get(idx);
+ }
+
+ return -1L;
+ }
+
+ /**
+ * @param idx Index.
+ * @return TTL for near cache update.
+ */
+ @Override public long nearTtl(int idx) {
+ if (nearTtls != null) {
+ assert idx >= 0 && idx < nearTtls.size();
+
+ return nearTtls.get(idx);
+ }
+
+ return -1L;
+ }
+
+ /**
+ * @param nearVer Version generated on primary node to be used for originating node's near cache update.
+ */
+ @Override public void nearVersion(GridCacheVersion nearVer) {
+ this.nearVer = nearVer;
+ }
+
+ /**
+ * @return Version generated on primary node to be used for originating node's near cache update.
+ */
+ @Override public GridCacheVersion nearVersion() {
+ return nearVer;
+ }
+
+ /**
+ * @param keyIdx Index of key for which update was skipped
+ */
+ @Override public void addSkippedIndex(int keyIdx) {
+ if (nearSkipIdxs == null)
+ nearSkipIdxs = new ArrayList<>();
+
+ nearSkipIdxs.add(keyIdx);
+
+ addNearTtl(keyIdx, -1L, -1L);
+ }
+
+ /**
+ * @return Indexes of keys for which update was skipped
+ */
+ @Override @Nullable public List<Integer> skippedIndexes() {
+ return nearSkipIdxs;
+ }
+
+ /**
+ * @return Indexes of keys for which values were generated on primary node.
+ */
+ @Override @Nullable public List<Integer> nearValuesIndexes() {
+ return nearValsIdxs;
+ }
+
+ /**
+ * @param idx Index.
+ * @return Value generated on primary node which should be put to originating node's near cache.
+ */
+ @Override @Nullable public CacheObject nearValue(int idx) {
+ return nearVals.get(idx);
+ }
+
+ /**
+ * Adds key to collection of failed keys.
+ *
+ * @param key Key to add.
+ * @param e Error cause.
+ */
+ @Override public synchronized void addFailedKey(KeyCacheObject key, Throwable e) {
+ if (failedKeys == null)
+ failedKeys = new ConcurrentLinkedQueue<>();
+
+ failedKeys.add(key);
+
+ if (err == null)
+ err = new IgniteCheckedException("Failed to update keys on primary node.");
+
+ err.addSuppressed(e);
+ }
+
+ /**
+ * Adds keys to collection of failed keys.
+ *
+ * @param keys Key to add.
+ * @param e Error cause.
+ */
+ @Override public synchronized void addFailedKeys(Collection<KeyCacheObject> keys, Throwable e) {
+ if (keys != null) {
+ if (failedKeys == null)
+ failedKeys = new ArrayList<>(keys.size());
+
+ failedKeys.addAll(keys);
+ }
+
+ if (err == null)
+ err = new IgniteCheckedException("Failed to update keys on primary node.");
+
+ err.addSuppressed(e);
+ }
+
+ /**
+ * Adds keys to collection of failed keys.
+ *
+ * @param keys Key to add.
+ * @param e Error cause.
+ * @param ctx Context.
+ */
+ @Override public synchronized void addFailedKeys(Collection<KeyCacheObject> keys, Throwable e,
+ GridCacheContext ctx) {
+ if (failedKeys == null)
+ failedKeys = new ArrayList<>(keys.size());
+
+ failedKeys.addAll(keys);
+
+ if (err == null)
+ err = new IgniteCheckedException("Failed to update keys on primary node.");
+
+ err.addSuppressed(e);
+ }
+
+ /** {@inheritDoc}
+ * @param ctx*/
+ @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
+ super.prepareMarshal(ctx);
+
+ if (err != null && errBytes == null)
+ errBytes = ctx.marshaller().marshal(err);
+
+ GridCacheContext cctx = ctx.cacheContext(cacheId);
+
+ prepareMarshalCacheObjects(failedKeys, cctx);
+
+ prepareMarshalCacheObjects(remapKeys, cctx);
+
+ prepareMarshalCacheObjects(nearVals, cctx);
+
+ if (ret != null)
+ ret.prepareMarshal(cctx);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException {
+ super.finishUnmarshal(ctx, ldr);
+
+ if (errBytes != null && err == null)
+ err = ctx.marshaller().unmarshal(errBytes, ldr);
+
+ GridCacheContext cctx = ctx.cacheContext(cacheId);
+
+ finishUnmarshalCacheObjects(failedKeys, cctx, ldr);
+
+ finishUnmarshalCacheObjects(remapKeys, cctx, ldr);
+
+ finishUnmarshalCacheObjects(nearVals, cctx, ldr);
+
+ if (ret != null)
+ ret.finishUnmarshal(cctx, ldr);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean addDeploymentInfo() {
+ return addDepInfo;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ writer.setBuffer(buf);
+
+ if (!super.writeTo(buf, writer))
+ return false;
+
+ if (!writer.isHeaderWritten()) {
+ if (!writer.writeHeader(directType(), fieldsCount()))
+ return false;
+
+ writer.onHeaderWritten();
+ }
+
+ switch (writer.state()) {
+ case 3:
+ if (!writer.writeByteArray("errBytes", errBytes))
+ return false;
+
+ writer.incrementState();
+
+ case 4:
+ if (!writer.writeCollection("failedKeys", failedKeys, MessageCollectionItemType.MSG))
+ return false;
+
+ writer.incrementState();
+
+ case 5:
+ if (!writer.writeMessage("futVer", futVer))
+ return false;
+
+ writer.incrementState();
+
+ case 6:
+ if (!writer.writeMessage("nearExpireTimes", nearExpireTimes))
+ return false;
+
+ writer.incrementState();
+
+ case 7:
+ if (!writer.writeCollection("nearSkipIdxs", nearSkipIdxs, MessageCollectionItemType.INT))
+ return false;
+
+ writer.incrementState();
+
+ case 8:
+ if (!writer.writeMessage("nearTtls", nearTtls))
+ return false;
+
+ writer.incrementState();
+
+ case 9:
+ if (!writer.writeCollection("nearVals", nearVals, MessageCollectionItemType.MSG))
+ return false;
+
+ writer.incrementState();
+
+ case 10:
+ if (!writer.writeCollection("nearValsIdxs", nearValsIdxs, MessageCollectionItemType.INT))
+ return false;
+
+ writer.incrementState();
+
+ case 11:
+ if (!writer.writeMessage("nearVer", nearVer))
+ return false;
+
+ writer.incrementState();
+
+ case 12:
+ if (!writer.writeCollection("remapKeys", remapKeys, MessageCollectionItemType.MSG))
+ return false;
+
+ writer.incrementState();
+
+ case 13:
+ if (!writer.writeMessage("ret", ret))
+ return false;
+
+ writer.incrementState();
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ reader.setBuffer(buf);
+
+ if (!reader.beforeMessageRead())
+ return false;
+
+ if (!super.readFrom(buf, reader))
+ return false;
+
+ switch (reader.state()) {
+ case 3:
+ errBytes = reader.readByteArray("errBytes");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 4:
+ failedKeys = reader.readCollection("failedKeys", MessageCollectionItemType.MSG);
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 5:
+ futVer = reader.readMessage("futVer");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 6:
+ nearExpireTimes = reader.readMessage("nearExpireTimes");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 7:
+ nearSkipIdxs = reader.readCollection("nearSkipIdxs", MessageCollectionItemType.INT);
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 8:
+ nearTtls = reader.readMessage("nearTtls");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 9:
+ nearVals = reader.readCollection("nearVals", MessageCollectionItemType.MSG);
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 10:
+ nearValsIdxs = reader.readCollection("nearValsIdxs", MessageCollectionItemType.INT);
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 11:
+ nearVer = reader.readMessage("nearVer");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 12:
+ remapKeys = reader.readCollection("remapKeys", MessageCollectionItemType.MSG);
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 13:
+ ret = reader.readMessage("ret");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ }
+
+ return reader.afterMessageRead(GridNearAtomicMultipleUpdateResponse.class);
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte directType() {
+ return -24;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte fieldsCount() {
+ return 14;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridNearAtomicSingleUpdateResponse.class, this, "parent");
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/dfdd2f4f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index c1dab93..68ee67b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -339,7 +339,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> implem
* @param nodeId Node ID.
* @param res Update response.
*/
- public void onResult(UUID nodeId, GridNearAtomicUpdateResponse res) {
+ public void onResult(UUID nodeId, GridNearAtomicMultipleUpdateResponse res) {
state.onResult(nodeId, res, false);
}
@@ -349,7 +349,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> implem
* @param req Update request.
* @param res Update response.
*/
- private void updateNear(GridNearAtomicUpdateRequest req, GridNearAtomicUpdateResponse res) {
+ private void updateNear(GridNearAtomicUpdateRequest req, GridNearAtomicMultipleUpdateResponse res) {
assert nearEnabled;
if (res.remapKeys() != null || !req.hasPrimary())
@@ -454,9 +454,9 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> implem
private void mapSingle(UUID nodeId, GridNearAtomicUpdateRequest req) {
if (cctx.localNodeId().equals(nodeId)) {
cache.updateAllAsyncInternal(nodeId, req,
- new CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse>() {
+ new CI2<GridNearAtomicUpdateRequest, GridNearAtomicMultipleUpdateResponse>() {
@Override public void apply(GridNearAtomicUpdateRequest req,
- GridNearAtomicUpdateResponse res) {
+ GridNearAtomicMultipleUpdateResponse res) {
onResult(res.nodeId(), res);
}
});
@@ -510,8 +510,8 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> implem
if (locUpdate != null) {
cache.updateAllAsyncInternal(cctx.localNodeId(), locUpdate,
- new CI2<GridNearAtomicMultipleUpdateRequest, GridNearAtomicUpdateResponse>() {
- @Override public void apply(GridNearAtomicMultipleUpdateRequest req, GridNearAtomicUpdateResponse res) {
+ new CI2<GridNearAtomicMultipleUpdateRequest, GridNearAtomicMultipleUpdateResponse>() {
+ @Override public void apply(GridNearAtomicMultipleUpdateRequest req, GridNearAtomicMultipleUpdateResponse res) {
onResult(res.nodeId(), res);
}
});
@@ -570,7 +570,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> implem
* @param nodeId Left node ID.
*/
void onNodeLeft(UUID nodeId) {
- GridNearAtomicUpdateResponse res = null;
+ GridNearAtomicMultipleUpdateResponse res = null;
synchronized (this) {
GridNearAtomicUpdateRequest req;
@@ -581,7 +581,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> implem
req = mappings != null ? mappings.get(nodeId) : null;
if (req != null && req.response() == null) {
- res = new GridNearAtomicUpdateResponse(cctx.cacheId(),
+ res = new GridNearAtomicMultipleUpdateResponse(cctx.cacheId(),
nodeId,
req.futureVersion(),
cctx.deploymentEnabled());
@@ -605,7 +605,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> implem
* @param nodeErr {@code True} if response was created on node failure.
*/
@SuppressWarnings({"unchecked", "ThrowableResultOfMethodCallIgnored"})
- void onResult(UUID nodeId, GridNearAtomicUpdateResponse res, boolean nodeErr) {
+ void onResult(UUID nodeId, GridNearAtomicMultipleUpdateResponse res, boolean nodeErr) {
GridNearAtomicUpdateRequest req;
AffinityTopologyVersion remapTopVer = null;
@@ -737,7 +737,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> implem
if (rcvAll && nearEnabled) {
if (mappings != null) {
for (GridNearAtomicMultipleUpdateRequest req0 : mappings.values()) {
- GridNearAtomicUpdateResponse res0 = req0.response();
+ GridNearAtomicMultipleUpdateResponse res0 = req0.response();
assert res0 != null : req0;
@@ -812,7 +812,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> implem
*/
void onSendError(GridNearAtomicUpdateRequest req, IgniteCheckedException e) {
synchronized (this) {
- GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(cctx.cacheId(),
+ GridNearAtomicMultipleUpdateResponse res = new GridNearAtomicMultipleUpdateResponse(cctx.cacheId(),
req.nodeId(),
req.futureVersion(),
cctx.deploymentEnabled());
http://git-wip-us.apache.org/repos/asf/ignite/blob/dfdd2f4f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
index 960add7..c1977bf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
@@ -190,10 +190,10 @@ public interface GridNearAtomicUpdateRequest {
* @param res Response.
* @return {@code True} if current response was {@code null}.
*/
- public boolean onResponse(GridNearAtomicUpdateResponse res);
+ public boolean onResponse(GridNearAtomicMultipleUpdateResponse res);
/**
* @return Response.
*/
- @Nullable public GridNearAtomicUpdateResponse response();
+ @Nullable public GridNearAtomicMultipleUpdateResponse response();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/dfdd2f4f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
index 3e3ac29..51d388c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
@@ -17,624 +17,86 @@
package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
-import java.io.Externalizable;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.GridDirectCollection;
-import org.apache.ignite.internal.GridDirectTransient;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
-import org.apache.ignite.internal.processors.cache.GridCacheMessage;
import org.apache.ignite.internal.processors.cache.GridCacheReturn;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.internal.util.GridLongList;
-import org.apache.ignite.internal.util.tostring.GridToStringInclude;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
import org.jetbrains.annotations.Nullable;
-/**
- * DHT atomic cache near update response.
- */
-public class GridNearAtomicUpdateResponse extends GridCacheMessage implements GridCacheDeployable {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Cache message index. */
- public static final int CACHE_MSG_IDX = nextIndexId();
-
- /** Node ID this reply should be sent to. */
- @GridDirectTransient
- private UUID nodeId;
-
- /** Future version. */
- private GridCacheVersion futVer;
-
- /** Update error. */
- @GridDirectTransient
- private volatile IgniteCheckedException err;
-
- /** Serialized error. */
- private byte[] errBytes;
-
- /** Return value. */
- @GridToStringInclude
- private GridCacheReturn ret;
-
- /** Failed keys. */
- @GridToStringInclude
- @GridDirectCollection(KeyCacheObject.class)
- private volatile Collection<KeyCacheObject> failedKeys;
-
- /** Keys that should be remapped. */
- @GridToStringInclude
- @GridDirectCollection(KeyCacheObject.class)
- private List<KeyCacheObject> remapKeys;
-
- /** Indexes of keys for which values were generated on primary node (used if originating node has near cache). */
- @GridDirectCollection(int.class)
- private List<Integer> nearValsIdxs;
-
- /** Indexes of keys for which update was skipped (used if originating node has near cache). */
- @GridDirectCollection(int.class)
- private List<Integer> nearSkipIdxs;
-
- /** Values generated on primary node which should be put to originating node's near cache. */
- @GridToStringInclude
- @GridDirectCollection(CacheObject.class)
- private List<CacheObject> nearVals;
-
- /** Version generated on primary node to be used for originating node's near cache update. */
- private GridCacheVersion nearVer;
-
- /** Near TTLs. */
- private GridLongList nearTtls;
-
- /** Near expire times. */
- private GridLongList nearExpireTimes;
-
- /**
- * Empty constructor required by {@link Externalizable}.
- */
- public GridNearAtomicUpdateResponse() {
- // No-op.
- }
-
- /**
- * @param cacheId Cache ID.
- * @param nodeId Node ID this reply should be sent to.
- * @param futVer Future version.
- * @param addDepInfo Deployment info flag.
- */
- public GridNearAtomicUpdateResponse(int cacheId, UUID nodeId, GridCacheVersion futVer, boolean addDepInfo) {
- assert futVer != null;
-
- this.cacheId = cacheId;
- this.nodeId = nodeId;
- this.futVer = futVer;
- this.addDepInfo = addDepInfo;
- }
-
- /** {@inheritDoc} */
- @Override public int lookupIndex() {
- return CACHE_MSG_IDX;
- }
-
- /**
- * @return Node ID this response should be sent to.
- */
- public UUID nodeId() {
- return nodeId;
- }
-
- /**
- * @param nodeId Node ID.
- */
- public void nodeId(UUID nodeId) {
- this.nodeId = nodeId;
- }
-
- /**
- * @return Future version.
- */
- public GridCacheVersion futureVersion() {
- return futVer;
- }
-
- /**
- * Sets update error.
- *
- * @param err Error.
- */
- public void error(IgniteCheckedException err){
- this.err = err;
- }
-
- /** {@inheritDoc} */
- @Override public IgniteCheckedException error() {
- return err;
- }
-
- /**
- * @return Collection of failed keys.
- */
- public Collection<KeyCacheObject> failedKeys() {
- return failedKeys;
- }
-
- /**
- * @return Return value.
- */
- public GridCacheReturn returnValue() {
- return ret;
- }
-
- /**
- * @param ret Return value.
- */
- @SuppressWarnings("unchecked")
- public void returnValue(GridCacheReturn ret) {
- this.ret = ret;
- }
-
- /**
- * @param remapKeys Remap keys.
- */
- public void remapKeys(List<KeyCacheObject> remapKeys) {
- this.remapKeys = remapKeys;
- }
-
- /**
- * @return Remap keys.
- */
- public Collection<KeyCacheObject> remapKeys() {
- return remapKeys;
- }
-
- /**
- * Adds value to be put in near cache on originating node.
- *
- * @param keyIdx Key index.
- * @param val Value.
- * @param ttl TTL for near cache update.
- * @param expireTime Expire time for near cache update.
- */
- public void addNearValue(int keyIdx,
- @Nullable CacheObject val,
- long ttl,
- long expireTime) {
- if (nearValsIdxs == null) {
- nearValsIdxs = new ArrayList<>();
- nearVals = new ArrayList<>();
- }
-
- addNearTtl(keyIdx, ttl, expireTime);
-
- nearValsIdxs.add(keyIdx);
- nearVals.add(val);
- }
-
- /**
- * @param keyIdx Key index.
- * @param ttl TTL for near cache update.
- * @param expireTime Expire time for near cache update.
- */
- @SuppressWarnings("ForLoopReplaceableByForEach")
- public void addNearTtl(int keyIdx, long ttl, long expireTime) {
- if (ttl >= 0) {
- if (nearTtls == null) {
- nearTtls = new GridLongList(16);
-
- for (int i = 0; i < keyIdx; i++)
- nearTtls.add(-1L);
- }
- }
-
- if (nearTtls != null)
- nearTtls.add(ttl);
-
- if (expireTime >= 0) {
- if (nearExpireTimes == null) {
- nearExpireTimes = new GridLongList(16);
-
- for (int i = 0; i < keyIdx; i++)
- nearExpireTimes.add(-1);
- }
- }
-
- if (nearExpireTimes != null)
- nearExpireTimes.add(expireTime);
- }
-
- /**
- * @param idx Index.
- * @return Expire time for near cache update.
- */
- public long nearExpireTime(int idx) {
- if (nearExpireTimes != null) {
- assert idx >= 0 && idx < nearExpireTimes.size();
-
- return nearExpireTimes.get(idx);
- }
-
- return -1L;
- }
-
- /**
- * @param idx Index.
- * @return TTL for near cache update.
- */
- public long nearTtl(int idx) {
- if (nearTtls != null) {
- assert idx >= 0 && idx < nearTtls.size();
-
- return nearTtls.get(idx);
- }
-
- return -1L;
- }
-
- /**
- * @param nearVer Version generated on primary node to be used for originating node's near cache update.
- */
- public void nearVersion(GridCacheVersion nearVer) {
- this.nearVer = nearVer;
- }
-
- /**
- * @return Version generated on primary node to be used for originating node's near cache update.
- */
- public GridCacheVersion nearVersion() {
- return nearVer;
- }
-
- /**
- * @param keyIdx Index of key for which update was skipped
- */
- public void addSkippedIndex(int keyIdx) {
- if (nearSkipIdxs == null)
- nearSkipIdxs = new ArrayList<>();
-
- nearSkipIdxs.add(keyIdx);
-
- addNearTtl(keyIdx, -1L, -1L);
- }
-
- /**
- * @return Indexes of keys for which update was skipped
- */
- @Nullable public List<Integer> skippedIndexes() {
- return nearSkipIdxs;
- }
-
- /**
- * @return Indexes of keys for which values were generated on primary node.
- */
- @Nullable public List<Integer> nearValuesIndexes() {
- return nearValsIdxs;
- }
-
- /**
- * @param idx Index.
- * @return Value generated on primary node which should be put to originating node's near cache.
- */
- @Nullable public CacheObject nearValue(int idx) {
- return nearVals.get(idx);
- }
-
- /**
- * Adds key to collection of failed keys.
- *
- * @param key Key to add.
- * @param e Error cause.
- */
- public synchronized void addFailedKey(KeyCacheObject key, Throwable e) {
- if (failedKeys == null)
- failedKeys = new ConcurrentLinkedQueue<>();
-
- failedKeys.add(key);
-
- if (err == null)
- err = new IgniteCheckedException("Failed to update keys on primary node.");
-
- err.addSuppressed(e);
- }
-
- /**
- * Adds keys to collection of failed keys.
- *
- * @param keys Key to add.
- * @param e Error cause.
- */
- public synchronized void addFailedKeys(Collection<KeyCacheObject> keys, Throwable e) {
- if (keys != null) {
- if (failedKeys == null)
- failedKeys = new ArrayList<>(keys.size());
-
- failedKeys.addAll(keys);
- }
-
- if (err == null)
- err = new IgniteCheckedException("Failed to update keys on primary node.");
-
- err.addSuppressed(e);
- }
-
- /**
- * Adds keys to collection of failed keys.
- *
- * @param keys Key to add.
- * @param e Error cause.
- * @param ctx Context.
- */
- public synchronized void addFailedKeys(Collection<KeyCacheObject> keys, Throwable e, GridCacheContext ctx) {
- if (failedKeys == null)
- failedKeys = new ArrayList<>(keys.size());
-
- failedKeys.addAll(keys);
-
- if (err == null)
- err = new IgniteCheckedException("Failed to update keys on primary node.");
-
- err.addSuppressed(e);
- }
-
- /** {@inheritDoc}
- * @param ctx*/
- @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
- super.prepareMarshal(ctx);
-
- if (err != null && errBytes == null)
- errBytes = ctx.marshaller().marshal(err);
-
- GridCacheContext cctx = ctx.cacheContext(cacheId);
-
- prepareMarshalCacheObjects(failedKeys, cctx);
-
- prepareMarshalCacheObjects(remapKeys, cctx);
-
- prepareMarshalCacheObjects(nearVals, cctx);
-
- if (ret != null)
- ret.prepareMarshal(cctx);
- }
-
- /** {@inheritDoc} */
- @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException {
- super.finishUnmarshal(ctx, ldr);
-
- if (errBytes != null && err == null)
- err = ctx.marshaller().unmarshal(errBytes, ldr);
-
- GridCacheContext cctx = ctx.cacheContext(cacheId);
-
- finishUnmarshalCacheObjects(failedKeys, cctx, ldr);
-
- finishUnmarshalCacheObjects(remapKeys, cctx, ldr);
-
- finishUnmarshalCacheObjects(nearVals, cctx, ldr);
-
- if (ret != null)
- ret.finishUnmarshal(cctx, ldr);
- }
-
- /** {@inheritDoc} */
- @Override public boolean addDeploymentInfo() {
- return addDepInfo;
- }
-
- /** {@inheritDoc} */
- @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
- writer.setBuffer(buf);
-
- if (!super.writeTo(buf, writer))
- return false;
-
- if (!writer.isHeaderWritten()) {
- if (!writer.writeHeader(directType(), fieldsCount()))
- return false;
-
- writer.onHeaderWritten();
- }
-
- switch (writer.state()) {
- case 3:
- if (!writer.writeByteArray("errBytes", errBytes))
- return false;
-
- writer.incrementState();
-
- case 4:
- if (!writer.writeCollection("failedKeys", failedKeys, MessageCollectionItemType.MSG))
- return false;
-
- writer.incrementState();
-
- case 5:
- if (!writer.writeMessage("futVer", futVer))
- return false;
-
- writer.incrementState();
-
- case 6:
- if (!writer.writeMessage("nearExpireTimes", nearExpireTimes))
- return false;
-
- writer.incrementState();
-
- case 7:
- if (!writer.writeCollection("nearSkipIdxs", nearSkipIdxs, MessageCollectionItemType.INT))
- return false;
-
- writer.incrementState();
-
- case 8:
- if (!writer.writeMessage("nearTtls", nearTtls))
- return false;
-
- writer.incrementState();
-
- case 9:
- if (!writer.writeCollection("nearVals", nearVals, MessageCollectionItemType.MSG))
- return false;
-
- writer.incrementState();
-
- case 10:
- if (!writer.writeCollection("nearValsIdxs", nearValsIdxs, MessageCollectionItemType.INT))
- return false;
-
- writer.incrementState();
-
- case 11:
- if (!writer.writeMessage("nearVer", nearVer))
- return false;
-
- writer.incrementState();
-
- case 12:
- if (!writer.writeCollection("remapKeys", remapKeys, MessageCollectionItemType.MSG))
- return false;
-
- writer.incrementState();
-
- case 13:
- if (!writer.writeMessage("ret", ret))
- return false;
-
- writer.incrementState();
-
- }
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
- reader.setBuffer(buf);
-
- if (!reader.beforeMessageRead())
- return false;
-
- if (!super.readFrom(buf, reader))
- return false;
-
- switch (reader.state()) {
- case 3:
- errBytes = reader.readByteArray("errBytes");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 4:
- failedKeys = reader.readCollection("failedKeys", MessageCollectionItemType.MSG);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.List;
+import java.util.UUID;
- case 5:
- futVer = reader.readMessage("futVer");
+public interface GridNearAtomicUpdateResponse {
- if (!reader.isLastRead())
- return false;
+ int lookupIndex();
- reader.incrementState();
+ UUID nodeId();
- case 6:
- nearExpireTimes = reader.readMessage("nearExpireTimes");
+ void nodeId(UUID nodeId);
- if (!reader.isLastRead())
- return false;
+ GridCacheVersion futureVersion();
- reader.incrementState();
+ void error(IgniteCheckedException err);
- case 7:
- nearSkipIdxs = reader.readCollection("nearSkipIdxs", MessageCollectionItemType.INT);
+ IgniteCheckedException error();
- if (!reader.isLastRead())
- return false;
+ Collection<KeyCacheObject> failedKeys();
- reader.incrementState();
+ GridCacheReturn returnValue();
- case 8:
- nearTtls = reader.readMessage("nearTtls");
+ @SuppressWarnings("unchecked") void returnValue(GridCacheReturn ret);
- if (!reader.isLastRead())
- return false;
+ void remapKeys(List<KeyCacheObject> remapKeys);
- reader.incrementState();
+ Collection<KeyCacheObject> remapKeys();
- case 9:
- nearVals = reader.readCollection("nearVals", MessageCollectionItemType.MSG);
+ void addNearValue(int keyIdx,
+ @Nullable CacheObject val,
+ long ttl,
+ long expireTime);
- if (!reader.isLastRead())
- return false;
+ @SuppressWarnings("ForLoopReplaceableByForEach") void addNearTtl(int keyIdx, long ttl, long expireTime);
- reader.incrementState();
+ long nearExpireTime(int idx);
- case 10:
- nearValsIdxs = reader.readCollection("nearValsIdxs", MessageCollectionItemType.INT);
+ long nearTtl(int idx);
- if (!reader.isLastRead())
- return false;
+ void nearVersion(GridCacheVersion nearVer);
- reader.incrementState();
+ GridCacheVersion nearVersion();
- case 11:
- nearVer = reader.readMessage("nearVer");
+ void addSkippedIndex(int keyIdx);
- if (!reader.isLastRead())
- return false;
+ @Nullable List<Integer> skippedIndexes();
- reader.incrementState();
+ @Nullable List<Integer> nearValuesIndexes();
- case 12:
- remapKeys = reader.readCollection("remapKeys", MessageCollectionItemType.MSG);
+ @Nullable CacheObject nearValue(int idx);
- if (!reader.isLastRead())
- return false;
+ void addFailedKey(KeyCacheObject key, Throwable e);
- reader.incrementState();
+ void addFailedKeys(Collection<KeyCacheObject> keys, Throwable e);
- case 13:
- ret = reader.readMessage("ret");
+ void addFailedKeys(Collection<KeyCacheObject> keys, Throwable e, GridCacheContext ctx);
- if (!reader.isLastRead())
- return false;
+ void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException;
- reader.incrementState();
+ void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException;
- }
+ boolean addDeploymentInfo();
- return reader.afterMessageRead(GridNearAtomicUpdateResponse.class);
- }
+ boolean writeTo(ByteBuffer buf, MessageWriter writer);
- /** {@inheritDoc} */
- @Override public byte directType() {
- return 41;
- }
+ boolean readFrom(ByteBuffer buf, MessageReader reader);
- /** {@inheritDoc} */
- @Override public byte fieldsCount() {
- return 14;
- }
+ byte directType();
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(GridNearAtomicUpdateResponse.class, this, "parent");
- }
+ byte fieldsCount();
}