You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/02/16 15:45:50 UTC
[2/3] ignite git commit: ignite-4705
ignite-4705
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c5c5eb5a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c5c5eb5a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c5c5eb5a
Branch: refs/heads/ignite-4705
Commit: c5c5eb5ade63a64ccf8c193db77978c4e297fc19
Parents: 7287a93
Author: sboikov <sb...@gridgain.com>
Authored: Thu Feb 16 15:07:54 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Feb 16 15:23:12 2017 +0300
----------------------------------------------------------------------
.../ignite/codegen/MessageCodeGenerator.java | 30 +-
.../communication/GridIoMessageFactory.java | 10 +-
.../processors/cache/GridCacheMessage.java | 4 +-
.../cache/GridDeferredAckMessageSender.java | 11 +-
.../GridDhtAtomicAbstractUpdateFuture.java | 49 ++-
.../GridDhtAtomicAbstractUpdateRequest.java | 135 +++++++-
.../dht/atomic/GridDhtAtomicCache.java | 320 ++++++++++++-------
.../dht/atomic/GridDhtAtomicNearResponse.java | 268 ++++++++++++++++
.../atomic/GridDhtAtomicSingleUpdateFuture.java | 24 +-
.../GridDhtAtomicSingleUpdateRequest.java | 56 ++--
.../dht/atomic/GridDhtAtomicUpdateFuture.java | 8 +-
.../dht/atomic/GridDhtAtomicUpdateRequest.java | 98 +++---
.../dht/atomic/GridDhtAtomicUpdateResponse.java | 7 +
.../GridNearAtomicAbstractUpdateFuture.java | 3 +-
.../dht/atomic/GridNearAtomicDhtResponse.java | 222 -------------
.../GridNearAtomicSingleUpdateFuture.java | 61 +++-
.../dht/atomic/GridNearAtomicUpdateFuture.java | 2 +-
.../atomic/GridNearAtomicUpdateResponse.java | 62 +++-
.../distributed/dht/atomic/UpdateErrors.java | 187 +++++++++++
.../distributed/near/GridNearAtomicCache.java | 17 +-
.../cache/IgniteGetAndPutBenchmark.java | 2 +-
.../cache/IgniteGetAndPutTxBenchmark.java | 2 +-
22 files changed, 1081 insertions(+), 497 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/c5c5eb5a/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
----------------------------------------------------------------------
diff --git a/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java b/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
index 16b1e01..6636bf2 100644
--- a/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
+++ b/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
@@ -46,17 +46,18 @@ import org.apache.ignite.internal.GridDirectTransient;
import org.apache.ignite.internal.IgniteCodeGeneratingFail;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicDeferredUpdateResponse;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicNearResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicSingleUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractSingleUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractUpdateRequest;
-import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicDhtResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicFullUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateFilterRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateInvokeRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateResponse;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.UpdateErrors;
import org.apache.ignite.internal.util.typedef.internal.SB;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteUuid;
@@ -181,19 +182,20 @@ public class MessageCodeGenerator {
// gen.generateAll(true);
-// gen.generateAndWrite(GridNearAtomicDhtResponse.class);
-// gen.generateAndWrite(GridNearAtomicAbstractUpdateRequest.class);
-// gen.generateAndWrite(GridDhtAtomicAbstractUpdateRequest.class);
-// gen.generateAndWrite(GridNearAtomicAbstractSingleUpdateRequest.class);
-// gen.generateAndWrite(GridDhtAtomicDeferredUpdateResponse.class);
-// gen.generateAndWrite(GridDhtAtomicSingleUpdateRequest.class);
-// gen.generateAndWrite(GridDhtAtomicUpdateRequest.class);
-// gen.generateAndWrite(GridDhtAtomicUpdateResponse.class);
-// gen.generateAndWrite(GridNearAtomicFullUpdateRequest.class);
-// gen.generateAndWrite(GridNearAtomicSingleUpdateFilterRequest.class);
-// gen.generateAndWrite(GridNearAtomicSingleUpdateInvokeRequest.class);
-// gen.generateAndWrite(GridNearAtomicSingleUpdateRequest.class);
-// gen.generateAndWrite(GridNearAtomicUpdateResponse.class);
+ gen.generateAndWrite(UpdateErrors.class);
+ gen.generateAndWrite(GridDhtAtomicNearResponse.class);
+ //gen.generateAndWrite(GridNearAtomicAbstractUpdateRequest.class);
+ gen.generateAndWrite(GridDhtAtomicAbstractUpdateRequest.class);
+ gen.generateAndWrite(GridNearAtomicAbstractSingleUpdateRequest.class);
+ gen.generateAndWrite(GridDhtAtomicDeferredUpdateResponse.class);
+ gen.generateAndWrite(GridDhtAtomicSingleUpdateRequest.class);
+ gen.generateAndWrite(GridDhtAtomicUpdateRequest.class);
+ gen.generateAndWrite(GridDhtAtomicUpdateResponse.class);
+ gen.generateAndWrite(GridNearAtomicFullUpdateRequest.class);
+ gen.generateAndWrite(GridNearAtomicSingleUpdateFilterRequest.class);
+ gen.generateAndWrite(GridNearAtomicSingleUpdateInvokeRequest.class);
+ gen.generateAndWrite(GridNearAtomicSingleUpdateRequest.class);
+ gen.generateAndWrite(GridNearAtomicUpdateResponse.class);
// gen.generateAndWrite(GridMessageCollection.class);
// gen.generateAndWrite(DataStreamerEntry.class);
http://git-wip-us.apache.org/repos/asf/ignite/blob/c5c5eb5a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 769a615..5ed46ff 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -67,15 +67,16 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrep
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtUnlockRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicDeferredUpdateResponse;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicNearResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicSingleUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateResponse;
-import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicDhtResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicFullUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateFilterRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateInvokeRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateResponse;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.UpdateErrors;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
@@ -174,8 +175,13 @@ public class GridIoMessageFactory implements MessageFactory {
Message msg = null;
switch (type) {
+ case -46:
+ msg = new UpdateErrors();
+
+ break;
+
case -45:
- msg = new GridNearAtomicDhtResponse();
+ msg = new GridDhtAtomicNearResponse();
break;
http://git-wip-us.apache.org/repos/asf/ignite/blob/c5c5eb5a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
index 0646d5a..b9fb56a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
@@ -501,7 +501,7 @@ public abstract class GridCacheMessage implements Message {
* @throws IgniteCheckedException If failed.
*/
@SuppressWarnings("ForLoopReplaceableByForEach")
- protected final void prepareMarshalCacheObjects(@Nullable List<? extends CacheObject> col,
+ public final void prepareMarshalCacheObjects(@Nullable List<? extends CacheObject> col,
GridCacheContext ctx) throws IgniteCheckedException {
if (col == null)
return;
@@ -553,7 +553,7 @@ public abstract class GridCacheMessage implements Message {
* @throws IgniteCheckedException If failed.
*/
@SuppressWarnings("ForLoopReplaceableByForEach")
- protected final void finishUnmarshalCacheObjects(@Nullable List<? extends CacheObject> col,
+ public final void finishUnmarshalCacheObjects(@Nullable List<? extends CacheObject> col,
GridCacheContext ctx,
ClassLoader ldr)
throws IgniteCheckedException
http://git-wip-us.apache.org/repos/asf/ignite/blob/c5c5eb5a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridDeferredAckMessageSender.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridDeferredAckMessageSender.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridDeferredAckMessageSender.java
index 8df883a..37ecc79 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridDeferredAckMessageSender.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridDeferredAckMessageSender.java
@@ -21,7 +21,6 @@ import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.closure.GridClosureProcessor;
import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
@@ -41,16 +40,16 @@ public abstract class GridDeferredAckMessageSender<T> {
private GridTimeoutProcessor time;
/** Closure processor. */
- public GridClosureProcessor closure;
+ public GridClosureProcessor c;
/**
* @param time Time.
- * @param closure Closure.
+ * @param c Closure.
*/
public GridDeferredAckMessageSender(GridTimeoutProcessor time,
- GridClosureProcessor closure) {
+ GridClosureProcessor c) {
this.time = time;
- this.closure = closure;
+ this.c = c;
}
/**
@@ -151,7 +150,7 @@ public abstract class GridDeferredAckMessageSender<T> {
/** {@inheritDoc} */
@Override public void onTimeout() {
if (guard.compareAndSet(false, true)) {
- closure.runLocalSafe(new Runnable() {
+ c.runLocalSafe(new Runnable() {
@Override public void run() {
writeLock().lock();
http://git-wip-us.apache.org/repos/asf/ignite/blob/c5c5eb5a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
index 10d1c4b..d494d98 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
@@ -93,9 +93,6 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
/** Continuous query closures. */
private Collection<CI1<Boolean>> cntQryClsrs;
- /** */
- private final boolean waitForExchange;
-
/** Response count. */
private volatile int resCnt;
@@ -113,14 +110,12 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
GridNearAtomicAbstractUpdateRequest updateReq,
GridNearAtomicUpdateResponse updateRes) {
this.cctx = cctx;
-
- futId = cctx.mvcc().atomicFutureId();
this.updateReq = updateReq;
this.completionCb = completionCb;
this.updateRes = updateRes;
this.writeVer = writeVer;
- waitForExchange = !(updateReq.topologyLocked() || (updateReq.fastMap() && !updateReq.clientRequest()));
+ futId = cctx.mvcc().atomicFutureId();
if (log == null) {
msgLog = cctx.shared().atomicMessageLogger();
@@ -130,6 +125,8 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
/** {@inheritDoc} */
@Override public final IgniteInternalFuture<Void> completeFuture(AffinityTopologyVersion topVer) {
+ boolean waitForExchange = !updateReq.topologyLocked();
+
if (waitForExchange && updateReq.topologyVersion().compareTo(topVer) < 0)
return this;
@@ -160,7 +157,9 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
* @param updateCntr Partition update counter.
*/
@SuppressWarnings("ForLoopReplaceableByForEach")
- final void addWriteEntry(GridDhtCacheEntry entry,
+ final void addWriteEntry(
+ UUID nearNodeId,
+ GridDhtCacheEntry entry,
@Nullable CacheObject val,
EntryProcessor<Object, Object, Object> entryProcessor,
long ttl,
@@ -190,7 +189,8 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
if (updateReq == null) {
updateReq = createRequest(
- node,
+ node.id(),
+ nearNodeId,
futId,
writeVer,
syncMode,
@@ -236,7 +236,9 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
* @param ttl TTL for near cache update (optional).
* @param expireTime Expire time for near cache update (optional).
*/
- final void addNearWriteEntries(Collection<UUID> readers,
+ final void addNearWriteEntries(
+ UUID nearNodeId,
+ Collection<UUID> readers,
GridDhtCacheEntry entry,
@Nullable CacheObject val,
EntryProcessor<Object, Object, Object> entryProcessor,
@@ -259,7 +261,8 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
continue;
updateReq = createRequest(
- node,
+ node.id(),
+ nearNodeId,
futId,
writeVer,
syncMode,
@@ -352,9 +355,21 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
* Sends requests to remote nodes.
*/
final void map() {
+ boolean fullSync = updateReq.writeSynchronizationMode() == FULL_SYNC;
+
if (!F.isEmpty(mappings)) {
+ List<UUID> dhtNodes = null;
+
+ if (fullSync) {
+ dhtNodes = new ArrayList<>(mappings.size());
+
+ dhtNodes.addAll(mappings.keySet());
+ }
+
for (GridDhtAtomicAbstractUpdateRequest req : mappings.values()) {
try {
+ req.dhtNodes(dhtNodes);
+
cctx.io().send(req.nodeId(), req, cctx.ioPolicy());
if (msgLog.isDebugEnabled()) {
@@ -383,7 +398,7 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
// Send response right away if no ACKs from backup is required.
// Backups will send ACKs anyway, future will be completed after all backups have replied.
- if (updateReq.writeSynchronizationMode() != FULL_SYNC)
+ if (!fullSync)
completionCb.apply(updateReq, updateRes);
}
@@ -400,7 +415,8 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
}
/**
- * @param node Node.
+ * @param nodeId Node ID.
+ * @param nodeId Near node ID.
* @param futId Future ID.
* @param writeVer Update version.
* @param syncMode Write synchronization mode.
@@ -411,7 +427,8 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
* @return Request.
*/
protected abstract GridDhtAtomicAbstractUpdateRequest createRequest(
- ClusterNode node,
+ UUID nodeId,
+ UUID nearNodeId,
long futId,
GridCacheVersion writeVer,
CacheWriteSynchronizationMode syncMode,
@@ -449,9 +466,9 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
for (CI1<Boolean> clsr : cntQryClsrs)
clsr.apply(suc);
}
-
- if (updateReq.writeSynchronizationMode() == FULL_SYNC)
- completionCb.apply(updateReq, updateRes);
+//
+// if (updateReq.writeSynchronizationMode() == FULL_SYNC)
+// completionCb.apply(updateReq, updateRes);
return true;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c5c5eb5a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java
index 7aa440d..3edbf8c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java
@@ -18,10 +18,13 @@
package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
import java.io.Externalizable;
+import java.nio.ByteBuffer;
+import java.util.List;
import java.util.UUID;
import javax.cache.processor.EntryProcessor;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.internal.GridDirectCollection;
import org.apache.ignite.internal.GridDirectTransient;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
@@ -29,6 +32,9 @@ import org.apache.ignite.internal.processors.cache.GridCacheMessage;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
import org.jetbrains.annotations.Nullable;
/**
@@ -46,6 +52,16 @@ public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheMessag
@GridDirectTransient
private boolean onRes;
+ /** */
+ private UUID nearNodeId;
+
+ /** */
+ private long nearFutId;
+
+ /** */
+ @GridDirectCollection(UUID.class)
+ private List<UUID> dhtNodes;
+
/**
* Empty constructor required by {@link Externalizable}.
*/
@@ -58,10 +74,35 @@ public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheMessag
*
* @param cacheId Cache ID.
* @param nodeId Node ID.
+ * @param nearNodeId Near node ID.
+ * @param nearFutId Future ID on near node.
*/
- protected GridDhtAtomicAbstractUpdateRequest(int cacheId, UUID nodeId) {
+ protected GridDhtAtomicAbstractUpdateRequest(int cacheId, UUID nodeId, UUID nearNodeId, long nearFutId) {
this.cacheId = cacheId;
this.nodeId = nodeId;
+ this.nearNodeId = nearNodeId;
+ this.nearFutId = nearFutId;
+ }
+
+ /**
+ * @return Near node ID.
+ */
+ public UUID nearNodeId() {
+ return nearNodeId;
+ }
+
+ /**
+ * @param dhtNodes DHT nodes.
+ */
+ public void dhtNodes(List<UUID> dhtNodes) {
+ this.dhtNodes = dhtNodes;
+ }
+
+ /**
+ * @return DHT nodes.
+ */
+ public List<UUID> dhtNodes() {
+ return dhtNodes;
}
/** {@inheritDoc} */
@@ -166,11 +207,18 @@ public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheMessag
public abstract int taskNameHash();
/**
- * @return Version assigned on primary node.
+ * @return Future ID on primary node.
*/
public abstract long futureId();
/**
+ * @return Future ID on near node.
+ */
+ public final long nearFutureId() {
+ return nearFutId;
+ }
+
+ /**
* @return Write version.
*/
public abstract GridCacheVersion writeVersion();
@@ -284,4 +332,87 @@ public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheMessag
* @return Optional arguments for entry processor.
*/
@Nullable public abstract Object[] invokeArguments();
+
+ /** {@inheritDoc} */
+ @Override public byte fieldsCount() {
+ return 6;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ writer.setBuffer(buf);
+
+ if (!super.writeTo(buf, writer))
+ return false;
+
+ if (!writer.isHeaderWritten()) {
+ if (!writer.writeHeader(directType(), fieldsCount()))
+ return false;
+
+ writer.onHeaderWritten();
+ }
+
+ switch (writer.state()) {
+ case 3:
+ if (!writer.writeCollection("dhtNodes", dhtNodes, MessageCollectionItemType.UUID))
+ return false;
+
+ writer.incrementState();
+
+ case 4:
+ if (!writer.writeLong("nearFutId", nearFutId))
+ return false;
+
+ writer.incrementState();
+
+ case 5:
+ if (!writer.writeUuid("nearNodeId", nearNodeId))
+ return false;
+
+ writer.incrementState();
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ reader.setBuffer(buf);
+
+ if (!reader.beforeMessageRead())
+ return false;
+
+ if (!super.readFrom(buf, reader))
+ return false;
+
+ switch (reader.state()) {
+ case 3:
+ dhtNodes = reader.readCollection("dhtNodes", MessageCollectionItemType.UUID);
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 4:
+ nearFutId = reader.readLong("nearFutId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 5:
+ nearNodeId = reader.readUuid("nearNodeId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ }
+
+ return reader.afterMessageRead(GridDhtAtomicAbstractUpdateRequest.class);
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c5c5eb5a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 87ac54b..3b81ee7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -55,7 +55,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
import org.apache.ignite.internal.processors.cache.GridCacheMapEntry;
import org.apache.ignite.internal.processors.cache.GridCacheMapEntryFactory;
-import org.apache.ignite.internal.processors.cache.GridCacheMessage;
import org.apache.ignite.internal.processors.cache.GridCacheOperation;
import org.apache.ignite.internal.processors.cache.GridCacheReturn;
import org.apache.ignite.internal.processors.cache.GridCacheUpdateAtomicResult;
@@ -93,14 +92,13 @@ import org.apache.ignite.internal.util.typedef.CO;
import org.apache.ignite.internal.util.typedef.CX1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.P1;
-import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteBiInClosure;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteClosure;
+import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgniteOutClosure;
import org.apache.ignite.plugin.security.SecurityPermission;
import org.apache.ignite.transactions.TransactionIsolation;
@@ -112,6 +110,7 @@ import static org.apache.ignite.IgniteSystemProperties.IGNITE_ATOMIC_DEFERRED_AC
import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.CLOCK;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.internal.GridTopic.TOPIC_CACHE;
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.DELETE;
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.UPDATE;
@@ -211,17 +210,17 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
updateReplyClos = new CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse>() {
@SuppressWarnings("ThrowableResultOfMethodCallIgnored")
@Override public void apply(GridNearAtomicAbstractUpdateRequest req, GridNearAtomicUpdateResponse res) {
- if (ctx.config().getAtomicWriteOrderMode() == CLOCK) {
- assert req.writeSynchronizationMode() != FULL_ASYNC : req;
-
- // Always send reply in CLOCK ordering mode.
- sendNearUpdateReply(res.nodeId(), res);
-
- return;
- }
-
- // Request should be for primary keys only in PRIMARY ordering mode.
- assert req.hasPrimary() : req;
+// if (ctx.config().getAtomicWriteOrderMode() == CLOCK) {
+// assert req.writeSynchronizationMode() != FULL_ASYNC : req;
+//
+// // Always send reply in CLOCK ordering mode.
+// sendNearUpdateReply(res.nodeId(), res);
+//
+// return;
+// }
+//
+// // Request should be for primary keys only in PRIMARY ordering mode.
+// assert req.hasPrimary() : req;
if (req.writeSynchronizationMode() != FULL_ASYNC)
sendNearUpdateReply(res.nodeId(), res);
@@ -422,14 +421,14 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
}
});
- ctx.io().addHandler(ctx.cacheId(), GridNearAtomicDhtResponse.class, new CI2<UUID, GridNearAtomicDhtResponse>() {
- @Override public void apply(UUID uuid, GridNearAtomicDhtResponse msg) {
-
+ ctx.io().addHandler(ctx.cacheId(), GridDhtAtomicNearResponse.class, new CI2<UUID, GridDhtAtomicNearResponse>() {
+ @Override public void apply(UUID uuid, GridDhtAtomicNearResponse msg) {
+ processDhtAtomicNearResponse(uuid, msg);
}
@Override public String toString() {
- return "GridDhtAtomicDeferredUpdateResponse handler " +
- "[msgIdx=" + GridNearAtomicDhtResponse.CACHE_MSG_IDX + ']';
+ return "GridDhtAtomicNearResponse handler " +
+ "[msgIdx=" + GridDhtAtomicNearResponse.CACHE_MSG_IDX + ']';
}
});
@@ -1819,12 +1818,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
return;
}
- // Do not check topology version for CLOCK versioning since
- // partition exchange will wait for near update future (if future is on server node).
- // Also do not check topology version if topology was locked on near node by
+ // Do not check topology version if topology was locked on near node by
// external transaction or explicit lock.
- if ((req.fastMap() && !req.clientRequest()) || req.topologyLocked() ||
- !needRemap(req.topologyVersion(), top.topologyVersion())) {
+ if (req.topologyLocked() || !needRemap(req.topologyVersion(), top.topologyVersion())) {
ClusterNode node = ctx.discovery().node(nodeId);
if (node == null) {
@@ -1836,19 +1832,15 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
boolean hasNear = ctx.discovery().cacheNearNode(node, name());
- GridCacheVersion ver = req.updateVersion();
-
- if (ver == null) {
- // Assign next version for update inside entries lock.
- ver = ctx.versions().next(top.topologyVersion());
+ // Assign next version for update inside entries lock.
+ GridCacheVersion ver = ctx.versions().next(top.topologyVersion());
- if (hasNear)
- res.nearVersion(ver);
+ if (hasNear)
+ res.nearVersion(ver);
- if (msgLog.isDebugEnabled()) {
- msgLog.debug("Assigned update version [futId=" + req.futureId() +
- ", writeVer=" + ver + ']');
- }
+ if (msgLog.isDebugEnabled()) {
+ msgLog.debug("Assigned update version [futId=" + req.futureId() +
+ ", writeVer=" + ver + ']');
}
assert ver != null : "Got null version for update request: " + req;
@@ -2413,7 +2405,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
/**
* Updates locked entries one-by-one.
*
- * @param node Originating node.
+ * @param nearNode Originating node.
* @param hasNear {@code True} if originating node has near cache.
* @param req Update request.
* @param res Update response.
@@ -2429,7 +2421,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
* @throws GridCacheEntryRemovedException Should be never thrown.
*/
private UpdateSingleResult updateSingle(
- ClusterNode node,
+ ClusterNode nearNode,
boolean hasNear,
GridNearAtomicAbstractUpdateRequest req,
GridNearAtomicUpdateResponse res,
@@ -2473,9 +2465,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
assert !(newConflictVer instanceof GridCacheVersionEx) : newConflictVer;
- boolean primary = !req.fastMap() || ctx.affinity().primaryByPartition(ctx.localNode(), entry.partition(),
- req.topologyVersion());
-
Object writeVal = op == TRANSFORM ? req.entryProcessor(i) : req.writeValue(i);
Collection<UUID> readers = null;
@@ -2483,38 +2472,37 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
if (checkReaders) {
readers = entry.readers();
- filteredReaders = F.view(entry.readers(), F.notEqualTo(node.id()));
+ filteredReaders = F.view(entry.readers(), F.notEqualTo(nearNode.id()));
}
GridCacheUpdateAtomicResult updRes = entry.innerUpdate(
ver,
- node.id(),
+ nearNode.id(),
locNodeId,
op,
writeVal,
req.invokeArguments(),
- (primary || (ctx.store().isLocal() && !ctx.shared().localStorePrimaryOnly()))
- && writeThrough() && !req.skipStore(),
+ writeThrough() && !req.skipStore(),
!req.skipStore(),
sndPrevVal || req.returnValue(),
req.keepBinary(),
expiry,
- true,
- true,
- primary,
- ctx.config().getAtomicWriteOrderMode() == CLOCK, // Check version in CLOCK mode on primary node.
+ /*event*/true,
+ /*metrics*/true,
+ /*primary*/true,
+ /*verCheck*/false,
topVer,
req.filter(),
- replicate ? primary ? DR_PRIMARY : DR_BACKUP : DR_NONE,
+ replicate ? DR_PRIMARY : DR_NONE,
newConflictTtl,
newConflictExpireTime,
newConflictVer,
- true,
+ /*conflictResolve*/true,
intercept,
req.subjectId(),
taskName,
- null,
- null,
+ /*prevVal*/null,
+ /*updateCntr*/null,
dhtFut);
if (dhtFut == null && !F.isEmpty(filteredReaders)) {
@@ -2535,7 +2523,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
EntryProcessor<Object, Object, Object> entryProcessor = null;
if (!readersOnly) {
- dhtFut.addWriteEntry(entry,
+ dhtFut.addWriteEntry(
+ nearNode.id(),
+ entry,
updRes.newValue(),
entryProcessor,
updRes.newTtl(),
@@ -2547,7 +2537,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
}
if (!F.isEmpty(filteredReaders))
- dhtFut.addNearWriteEntries(filteredReaders,
+ dhtFut.addNearWriteEntries(
+ nearNode.id(),
+ filteredReaders,
entry,
updRes.newValue(),
entryProcessor,
@@ -2562,8 +2554,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
}
if (hasNear) {
- if (primary && updRes.sendToDht()) {
- if (!ctx.affinity().partitionBelongs(node, entry.partition(), topVer)) {
+ if (updRes.sendToDht()) {
+ if (!ctx.affinity().partitionBelongs(nearNode, entry.partition(), topVer)) {
// If put the same value as in request then do not need to send it back.
if (op == TRANSFORM || writeVal != updRes.newValue()) {
res.addNearValue(i,
@@ -2575,13 +2567,13 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
res.addNearTtl(i, updRes.newTtl(), updRes.conflictExpireTime());
if (updRes.newValue() != null) {
- IgniteInternalFuture<Boolean> f = entry.addReader(node.id(), req.messageId(), topVer);
+ IgniteInternalFuture<Boolean> f = entry.addReader(nearNode.id(), req.messageId(), topVer);
assert f == null : f;
}
}
- else if (F.contains(readers, node.id())) // Reader became primary or backup.
- entry.removeReader(node.id(), req.messageId());
+ else if (F.contains(readers, nearNode.id())) // Reader became primary or backup.
+ entry.removeReader(nearNode.id(), req.messageId());
else
res.addSkippedIndex(i);
}
@@ -2603,7 +2595,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
if (compRes != null && (compRes.get1() != null || compRes.get2() != null)) {
if (retVal == null)
- retVal = new GridCacheReturn(node.isLocal());
+ retVal = new GridCacheReturn(nearNode.isLocal());
retVal.addEntryProcessResult(ctx,
k,
@@ -2619,7 +2611,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
CacheObject ret = updRes.oldValue();
retVal = new GridCacheReturn(ctx,
- node.isLocal(),
+ nearNode.isLocal(),
req.keepBinary(),
req.returnValue() ? ret : null,
updRes.success());
@@ -2639,7 +2631,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
* @param firstEntryIdx Index of the first entry in the request keys collection.
* @param entries Entries to update.
* @param ver Version to set.
- * @param node Originating node.
+ * @param nearNode Originating node.
* @param writeVals Write values.
* @param putMap Values to put.
* @param rmvKeys Keys to remove.
@@ -2661,7 +2653,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
final int firstEntryIdx,
final List<GridDhtCacheEntry> entries,
final GridCacheVersion ver,
- final ClusterNode node,
+ final ClusterNode nearNode,
@Nullable final List<CacheObject> writeVals,
@Nullable final Map<KeyCacheObject, CacheObject> putMap,
@Nullable final Collection<KeyCacheObject> rmvKeys,
@@ -2765,12 +2757,12 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
if (checkReaders) {
readers = entry.readers();
- filteredReaders = F.view(entry.readers(), F.notEqualTo(node.id()));
+ filteredReaders = F.view(entry.readers(), F.notEqualTo(nearNode.id()));
}
GridCacheUpdateAtomicResult updRes = entry.innerUpdate(
ver,
- node.id(),
+ nearNode.id(),
locNodeId,
op,
writeVal,
@@ -2831,7 +2823,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
entryProcessorMap == null ? null : entryProcessorMap.get(entry.key());
if (!batchRes.readersOnly()) {
- dhtFut.addWriteEntry(entry,
+ dhtFut.addWriteEntry(
+ nearNode.id(),
+ entry,
writeVal,
entryProcessor,
updRes.newTtl(),
@@ -2843,7 +2837,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
}
if (!F.isEmpty(filteredReaders))
- dhtFut.addNearWriteEntries(filteredReaders,
+ dhtFut.addNearWriteEntries(
+ nearNode.id(),
+ filteredReaders,
entry,
writeVal,
entryProcessor,
@@ -2853,7 +2849,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
if (hasNear) {
if (primary) {
- if (!ctx.affinity().partitionBelongs(node, entry.partition(), topVer)) {
+ if (!ctx.affinity().partitionBelongs(nearNode, entry.partition(), topVer)) {
int idx = firstEntryIdx + i;
if (req.operation() == TRANSFORM) {
@@ -2866,13 +2862,13 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
res.addNearTtl(idx, updRes.newTtl(), CU.EXPIRE_TIME_CALCULATE);
if (writeVal != null || entry.hasValue()) {
- IgniteInternalFuture<Boolean> f = entry.addReader(node.id(), req.messageId(), topVer);
+ IgniteInternalFuture<Boolean> f = entry.addReader(nearNode.id(), req.messageId(), topVer);
assert f == null : f;
}
}
- else if (readers.contains(node.id())) // Reader became primary or backup.
- entry.removeReader(node.id(), req.messageId());
+ else if (readers.contains(nearNode.id())) // Reader became primary or backup.
+ entry.removeReader(nearNode.id(), req.messageId());
else
res.addSkippedIndex(firstEntryIdx + i);
}
@@ -3148,31 +3144,35 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> completionCb,
boolean force
) {
- if (!force) {
- if (updateReq.fastMap())
- return null;
-
- AffinityTopologyVersion topVer = updateReq.topologyVersion();
-
- Collection<ClusterNode> nodes = ctx.kernalContext().discovery().cacheAffinityNodes(name(), topVer);
-
- // We are on primary node for some key.
- assert !nodes.isEmpty() : "Failed to find affinity nodes [name=" + name() + ", topVer=" + topVer +
- ctx.kernalContext().discovery().discoCache(topVer) + ']';
-
- if (nodes.size() == 1) {
- if (log.isDebugEnabled())
- log.debug("Partitioned cache topology has only one node, will not create DHT atomic update future " +
- "[topVer=" + topVer + ", updateReq=" + updateReq + ']');
-
- return null;
- }
- }
-
if (updateReq.size() == 1)
return new GridDhtAtomicSingleUpdateFuture(ctx, completionCb, writeVer, updateReq, updateRes);
else
return new GridDhtAtomicUpdateFuture(ctx, completionCb, writeVer, updateReq, updateRes);
+// if (!force) {
+// if (updateReq.fastMap())
+// return null;
+//
+// AffinityTopologyVersion topVer = updateReq.topologyVersion();
+//
+// Collection<ClusterNode> nodes = ctx.kernalContext().discovery().cacheAffinityNodes(name(), topVer);
+//
+// // We are on primary node for some key.
+// assert !nodes.isEmpty() : "Failed to find affinity nodes [name=" + name() + ", topVer=" + topVer +
+// ctx.kernalContext().discovery().discoCache(topVer) + ']';
+//
+// if (nodes.size() == 1) {
+// if (log.isDebugEnabled())
+// log.debug("Partitioned cache topology has only one node, will not create DHT atomic update future " +
+// "[topVer=" + topVer + ", updateReq=" + updateReq + ']');
+//
+// return null;
+// }
+// }
+//
+// if (updateReq.size() == 1)
+// return new GridDhtAtomicSingleUpdateFuture(ctx, completionCb, writeVer, updateReq, updateRes);
+// else
+// return new GridDhtAtomicUpdateFuture(ctx, completionCb, writeVer, updateReq, updateRes);
}
/**
@@ -3225,9 +3225,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
GridCacheVersion ver = req.writeVersion();
- // Always send update reply.
- GridDhtAtomicUpdateResponse res = new GridDhtAtomicUpdateResponse(ctx.cacheId(), req.futureId(),
- ctx.deploymentEnabled());
+ GridDhtAtomicNearResponse nearRes = ctx.config().getWriteSynchronizationMode() == FULL_SYNC ?
+ new GridDhtAtomicNearResponse(req.nearFutureId(), req.dhtNodes()) : null;
Boolean replicate = ctx.isDrEnabled();
@@ -3311,39 +3310,113 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
// Ignore.
}
catch (IgniteCheckedException e) {
- res.addFailedKey(key, new IgniteCheckedException("Failed to update key on backup node: " + key, e));
+ if (nearRes != null)
+ nearRes.addFailedKey(key, new IgniteCheckedException("Failed to update key on backup node: " + key, e));
+
+ U.error(log, "Failed to update key on backup node: " + key, e);
}
}
- if (isNearEnabled(cacheCfg))
- ((GridNearAtomicCache<K, V>)near()).processDhtAtomicUpdateRequest(nodeId, req, res);
+ GridDhtAtomicUpdateResponse dhtRes = null;
- try {
- if (res.failedKeys() != null || res.nearEvicted() != null || req.writeSynchronizationMode() == FULL_SYNC) {
- ctx.io().send(nodeId, res, ctx.ioPolicy());
+ if (isNearEnabled(cacheCfg)) {
+ List<KeyCacheObject> nearEvicted =
+ ((GridNearAtomicCache<K, V>)near()).processDhtAtomicUpdateRequest(nodeId, req, nearRes);
- if (msgLog.isDebugEnabled()) {
- msgLog.debug("Sent DHT atomic update response [futId=" + req.futureId() +
- ", writeVer=" + req.writeVersion() + ", node=" + nodeId + ']');
- }
- }
+ dhtRes = new GridDhtAtomicUpdateResponse(ctx.cacheId(), req.futureId(), ctx.deploymentEnabled());
+
+ dhtRes.nearEvicted(nearEvicted);
+ }
+
+ final boolean RES_AFTER_ACK = false;
+
+ if (nearRes != null) {
+ if (RES_AFTER_ACK)
+ sendDhtNearResponse(nodeId, req, nearRes);
else {
- if (msgLog.isDebugEnabled()) {
- msgLog.debug("Will send deferred DHT atomic update response [futId=" + req.futureId() +
- ", writeVer=" + req.writeVersion() + ", node=" + nodeId + ']');
- }
+ sendDhtNearResponse(null, req, nearRes);
- // No failed keys and sync mode is not FULL_SYNC, thus sending deferred response.
sendDeferredUpdateResponse(nodeId, req.futureId());
}
}
+ else
+ sendDeferredUpdateResponse(nodeId, req.futureId());
+
+ if (dhtRes != null)
+ sendDhtPrimaryResponse(nodeId, req, dhtRes);
+ }
+
+ /**
+ * @param nodeId Primary node ID.
+ * @param req Request.
+ * @param dhtRes Response to send.
+ */
+ private void sendDhtPrimaryResponse(UUID nodeId,
+ GridDhtAtomicAbstractUpdateRequest req,
+ GridDhtAtomicUpdateResponse dhtRes) {
+ try {
+ ctx.io().send(nodeId, dhtRes, ctx.ioPolicy());
+
+ if (msgLog.isDebugEnabled()) {
+ msgLog.debug("Sent DHT response [futId=" + req.futureId() +
+ ", nearFutId=" + req.nearFutureId() +
+ ", writeVer=" + req.writeVersion() +
+ ", node=" + nodeId + ']');
+ }
+ }
catch (ClusterTopologyCheckedException ignored) {
- U.warn(msgLog, "Failed to send DHT atomic update response, node left [futId=" + req.futureId() +
- ", node=" + req.nodeId() + ']');
+ U.warn(msgLog, "Failed to send DHT response, node left [futId=" + req.futureId() +
+ ", nearFutId=" + req.nearFutureId() +
+ ", node=" + nodeId + ']');
}
catch (IgniteCheckedException e) {
- U.error(msgLog, "Failed to send DHT atomic update response [futId=" + req.futureId() +
- ", node=" + nodeId + ", res=" + res + ']', e);
+ U.error(msgLog, "Failed to send DHT near response [futId=" + req.futureId() +
+ ", nearFutId=" + req.nearFutureId() +
+ ", node=" + nodeId +
+ ", res=" + dhtRes + ']', e);
+ }
+ }
+
+ /**
+ * @param req Request.
+ * @param nearRes Response to send.
+ */
+ private void sendDhtNearResponse(final UUID primaryId,
+ final GridDhtAtomicAbstractUpdateRequest req,
+ GridDhtAtomicNearResponse nearRes) {
+ try {
+ ClusterNode node = ctx.discovery().node(req.nearNodeId());
+
+ if (node == null)
+ throw new ClusterTopologyCheckedException("Node left: " + req.nearNodeId());
+
+ if (primaryId != null) {
+ ctx.gridIO().send(node, TOPIC_CACHE, nearRes, ctx.ioPolicy(), new IgniteInClosure<IgniteException>() {
+ @Override public void apply(IgniteException e) {
+ sendDeferredUpdateResponse(primaryId, req.futureId());
+ }
+ });
+ }
+ else
+ ctx.gridIO().send(node, TOPIC_CACHE, nearRes, ctx.ioPolicy());
+
+ if (msgLog.isDebugEnabled()) {
+ msgLog.debug("Sent DHT near response [futId=" + req.futureId() +
+ ", nearFutId=" + req.nearFutureId() +
+ ", writeVer=" + req.writeVersion() +
+ ", node=" + req.nearNodeId() + ']');
+ }
+ }
+ catch (ClusterTopologyCheckedException ignored) {
+ U.warn(msgLog, "Failed to send DHT near response, node left [futId=" + req.futureId() +
+ ", nearFutId=" + req.nearFutureId() +
+ ", node=" + req.nearNodeId() + ']');
+ }
+ catch (IgniteCheckedException e) {
+ U.error(msgLog, "Failed to send DHT near response [futId=" + req.futureId() +
+ ", nearFutId=" + req.nearFutureId() +
+ ", node=" + req.nearNodeId() +
+ ", res=" + nearRes + ']', e);
}
}
@@ -3359,8 +3432,23 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
* @param nodeId Node ID.
* @param res Response.
*/
- private void processNearAtomicDhtResponse(UUID nodeId, GridNearAtomicDhtResponse res) {
+ private void processDhtAtomicNearResponse(UUID nodeId, GridDhtAtomicNearResponse res) {
+ GridNearAtomicAbstractUpdateFuture updateFut =
+ (GridNearAtomicAbstractUpdateFuture)ctx.mvcc().atomicFuture(res.futureId());
+ if (updateFut != null) {
+ if (msgLog.isDebugEnabled()) {
+ msgLog.debug("Received DHT atomic near response [futId=" + res.futureId() +
+ ", node=" + nodeId + ']');
+ }
+
+ updateFut.onResult(nodeId, res);
+ }
+ else {
+ U.warn(msgLog, "Failed to find update future DHT atomic near response [futId=" + res.futureId() +
+ ", node=" + nodeId +
+ ", res=" + res + ']');
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/c5c5eb5a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicNearResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicNearResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicNearResponse.java
new file mode 100644
index 0000000..628e1dc
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicNearResponse.java
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.UUID;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.GridDirectCollection;
+import org.apache.ignite.internal.processors.cache.GridCacheMessage;
+import org.apache.ignite.internal.processors.cache.GridCacheReturn;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ * TODO IGNITE-4705: no not send mapping if it == affinity?
+ */
+public class GridDhtAtomicNearResponse extends GridCacheMessage {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Message index. */
+ public static final int CACHE_MSG_IDX = nextIndexId();
+
+ /** */
+ private static final int HAS_RESULT_MASK = 0x1;
+
+ /** */
+ private static final int RESULT_SUCCESS_MASK = 0x2;
+
+ /** */
+ private long futId;
+
+ /** */
+ @GridDirectCollection(UUID.class)
+ private List<UUID> mapping;
+
+ /** */
+ private byte flags;
+
+ /** */
+ private UpdateErrors errors;
+
+ /**
+ *
+ */
+ public GridDhtAtomicNearResponse() {
+ // No-op.
+ }
+
+ /**
+ * @param futId Future ID.
+ * @param mapping Update mapping.
+ */
+ public GridDhtAtomicNearResponse(long futId, List<UUID> mapping) {
+ this.futId = futId;
+ this.mapping = mapping;
+ }
+
+ /**
+ * @param key Key.
+ * @param e Error.
+ */
+ public void addFailedKey(KeyCacheObject key, Throwable e) {
+ if (errors == null)
+ errors = new UpdateErrors();
+
+ errors.addFailedKey(key, e);
+ }
+
+ /**
+ * @param success Success flag.
+ */
+ public void setResult(boolean success) {
+ setFlag(true, HAS_RESULT_MASK);
+
+ setFlag(success, RESULT_SUCCESS_MASK);
+ }
+
+ /**
+ * @return Operation result.
+ */
+ public GridCacheReturn result() {
+ assert hasResult();
+
+ return new GridCacheReturn(true, isFlag(RESULT_SUCCESS_MASK));
+ }
+
+ /**
+ * @return {@code True} if response contains operation result.
+ */
+ public boolean hasResult() {
+ return isFlag(HAS_RESULT_MASK);
+ }
+
+ /**
+ * @return Update mapping.
+ */
+ public List<UUID> mapping() {
+ return mapping;
+ }
+
+ /**
+ * @param flag Set or clear.
+ * @param mask Mask.
+ */
+ private void setFlag(boolean flag, int mask) {
+ flags = flag ? (byte)(flags | mask) : (byte)(flags & ~mask);
+ }
+
+ /**
+ * Reads flag mask.
+ *
+ * @param mask Mask to read.
+ * @return Flag value.
+ */
+ private boolean isFlag(int mask) {
+ return (flags & mask) != 0;
+ }
+
+ /**
+ * @return Future ID.
+ */
+ public long futureId() {
+ return futId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte directType() {
+ return -45;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte fieldsCount() {
+ return 7;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean addDeploymentInfo() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
+ super.prepareMarshal(ctx);
+
+ if (errors != null)
+ errors.prepareMarshal(this, ctx.cacheContext(cacheId));
+ }
+
+ /** {@inheritDoc} */
+ @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException {
+ super.finishUnmarshal(ctx, ldr);
+
+ if (errors != null)
+ errors.finishUnmarshal(this, ctx.cacheContext(cacheId), ldr);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ writer.setBuffer(buf);
+
+ if (!super.writeTo(buf, writer))
+ return false;
+
+ if (!writer.isHeaderWritten()) {
+ if (!writer.writeHeader(directType(), fieldsCount()))
+ return false;
+
+ writer.onHeaderWritten();
+ }
+
+ switch (writer.state()) {
+ case 3:
+ if (!writer.writeMessage("errors", errors))
+ return false;
+
+ writer.incrementState();
+
+ case 4:
+ if (!writer.writeByte("flags", flags))
+ return false;
+
+ writer.incrementState();
+
+ case 5:
+ if (!writer.writeLong("futId", futId))
+ return false;
+
+ writer.incrementState();
+
+ case 6:
+ if (!writer.writeCollection("mapping", mapping, MessageCollectionItemType.UUID))
+ return false;
+
+ writer.incrementState();
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ reader.setBuffer(buf);
+
+ if (!reader.beforeMessageRead())
+ return false;
+
+ if (!super.readFrom(buf, reader))
+ return false;
+
+ switch (reader.state()) {
+ case 3:
+ errors = reader.readMessage("errors");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 4:
+ flags = reader.readByte("flags");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 5:
+ futId = reader.readLong("futId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 6:
+ mapping = reader.readCollection("mapping", MessageCollectionItemType.UUID);
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ }
+
+ return reader.afterMessageRead(GridDhtAtomicNearResponse.class);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c5c5eb5a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
index 0c8e482..671034c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
@@ -33,7 +33,6 @@ import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteProductVersion;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@@ -44,9 +43,6 @@ class GridDhtAtomicSingleUpdateFuture extends GridDhtAtomicAbstractUpdateFuture
/** */
private static final long serialVersionUID = 0L;
- /** */
- private static final IgniteProductVersion SINGLE_UPDATE_REQUEST = IgniteProductVersion.fromString("1.7.4");
-
/** Future keys. */
private KeyCacheObject key;
@@ -97,7 +93,8 @@ class GridDhtAtomicSingleUpdateFuture extends GridDhtAtomicAbstractUpdateFuture
/** {@inheritDoc} */
@Override protected GridDhtAtomicAbstractUpdateRequest createRequest(
- ClusterNode node,
+ UUID nodeId,
+ UUID nearNodeId,
long futId,
GridCacheVersion writeVer,
CacheWriteSynchronizationMode syncMode,
@@ -106,11 +103,13 @@ class GridDhtAtomicSingleUpdateFuture extends GridDhtAtomicAbstractUpdateFuture
long conflictExpireTime,
@Nullable GridCacheVersion conflictVer
) {
- if (canUseSingleRequest(node, ttl, conflictExpireTime, conflictVer)) {
+ if (canUseSingleRequest(ttl, conflictExpireTime, conflictVer)) {
return new GridDhtAtomicSingleUpdateRequest(
cctx.cacheId(),
- node.id(),
+ nodeId,
futId,
+ nearNodeId,
+ updateReq.futureId(),
writeVer,
syncMode,
topVer,
@@ -123,8 +122,10 @@ class GridDhtAtomicSingleUpdateFuture extends GridDhtAtomicAbstractUpdateFuture
else {
return new GridDhtAtomicUpdateRequest(
cctx.cacheId(),
- node.id(),
+ nodeId,
futId,
+ nearNodeId,
+ updateReq.futureId(),
writeVer,
syncMode,
topVer,
@@ -167,18 +168,15 @@ class GridDhtAtomicSingleUpdateFuture extends GridDhtAtomicAbstractUpdateFuture
}
/**
- * @param node Target node
* @param ttl TTL.
* @param conflictExpireTime Conflict expire time.
* @param conflictVer Conflict version.
* @return {@code True} if it is possible to use {@link GridDhtAtomicSingleUpdateRequest}.
*/
- private boolean canUseSingleRequest(ClusterNode node,
- long ttl,
+ private boolean canUseSingleRequest(long ttl,
long conflictExpireTime,
@Nullable GridCacheVersion conflictVer) {
- return node.version().compareToIgnoreTimestamp(SINGLE_UPDATE_REQUEST) >= 0 &&
- (ttl == CU.TTL_NOT_CHANGED) &&
+ return (ttl == CU.TTL_NOT_CHANGED) &&
(conflictExpireTime == CU.EXPIRE_TIME_CALCULATE) &&
conflictVer == null;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c5c5eb5a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java
index 127c2be..e46c843 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java
@@ -51,7 +51,7 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat
/** Near cache key flag. */
private static final int NEAR_FLAG_MASK = 0x80;
- /** Future version. */
+ /** Future ID on primary. */
protected long futId;
/** Write version. */
@@ -116,6 +116,8 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat
int cacheId,
UUID nodeId,
long futId,
+ UUID nearNodeId,
+ long nearFutId,
GridCacheVersion writeVer,
CacheWriteSynchronizationMode syncMode,
@NotNull AffinityTopologyVersion topVer,
@@ -125,7 +127,7 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat
boolean keepBinary,
boolean skipStore
) {
- super(cacheId, nodeId);
+ super(cacheId, nodeId, nearNodeId, nearFutId);
this.futId = futId;
this.writeVer = writeVer;
this.syncMode = syncMode;
@@ -423,73 +425,73 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat
}
switch (writer.state()) {
- case 3:
+ case 6:
if (!writer.writeByte("flags", flags))
return false;
writer.incrementState();
- case 4:
+ case 7:
if (!writer.writeLong("futId", futId))
return false;
writer.incrementState();
- case 5:
+ case 8:
if (!writer.writeMessage("key", key))
return false;
writer.incrementState();
- case 6:
+ case 9:
if (!writer.writeInt("partId", partId))
return false;
writer.incrementState();
- case 7:
+ case 10:
if (!writer.writeMessage("prevVal", prevVal))
return false;
writer.incrementState();
- case 8:
+ case 11:
if (!writer.writeUuid("subjId", subjId))
return false;
writer.incrementState();
- case 9:
+ case 12:
if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1))
return false;
writer.incrementState();
- case 10:
+ case 13:
if (!writer.writeInt("taskNameHash", taskNameHash))
return false;
writer.incrementState();
- case 11:
+ case 14:
if (!writer.writeMessage("topVer", topVer))
return false;
writer.incrementState();
- case 12:
+ case 15:
if (!writer.writeLong("updateCntr", updateCntr))
return false;
writer.incrementState();
- case 13:
+ case 16:
if (!writer.writeMessage("val", val))
return false;
writer.incrementState();
- case 14:
+ case 17:
if (!writer.writeMessage("writeVer", writeVer))
return false;
@@ -511,7 +513,7 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat
return false;
switch (reader.state()) {
- case 3:
+ case 6:
flags = reader.readByte("flags");
if (!reader.isLastRead())
@@ -519,7 +521,7 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat
reader.incrementState();
- case 4:
+ case 7:
futId = reader.readLong("futId");
if (!reader.isLastRead())
@@ -527,7 +529,7 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat
reader.incrementState();
- case 5:
+ case 8:
key = reader.readMessage("key");
if (!reader.isLastRead())
@@ -535,7 +537,7 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat
reader.incrementState();
- case 6:
+ case 9:
partId = reader.readInt("partId");
if (!reader.isLastRead())
@@ -543,7 +545,7 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat
reader.incrementState();
- case 7:
+ case 10:
prevVal = reader.readMessage("prevVal");
if (!reader.isLastRead())
@@ -551,7 +553,7 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat
reader.incrementState();
- case 8:
+ case 11:
subjId = reader.readUuid("subjId");
if (!reader.isLastRead())
@@ -559,7 +561,7 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat
reader.incrementState();
- case 9:
+ case 12:
byte syncModeOrd;
syncModeOrd = reader.readByte("syncMode");
@@ -571,7 +573,7 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat
reader.incrementState();
- case 10:
+ case 13:
taskNameHash = reader.readInt("taskNameHash");
if (!reader.isLastRead())
@@ -579,7 +581,7 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat
reader.incrementState();
- case 11:
+ case 14:
topVer = reader.readMessage("topVer");
if (!reader.isLastRead())
@@ -587,7 +589,7 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat
reader.incrementState();
- case 12:
+ case 15:
updateCntr = reader.readLong("updateCntr");
if (!reader.isLastRead())
@@ -595,7 +597,7 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat
reader.incrementState();
- case 13:
+ case 16:
val = reader.readMessage("val");
if (!reader.isLastRead())
@@ -603,7 +605,7 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat
reader.incrementState();
- case 14:
+ case 17:
writeVer = reader.readMessage("writeVer");
if (!reader.isLastRead())
@@ -653,7 +655,7 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 15;
+ return 18;
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/c5c5eb5a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
index 7cb75fa..ea6a1b6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
@@ -122,7 +122,9 @@ class GridDhtAtomicUpdateFuture extends GridDhtAtomicAbstractUpdateFuture {
}
/** {@inheritDoc} */
- @Override protected GridDhtAtomicAbstractUpdateRequest createRequest(ClusterNode node,
+ @Override protected GridDhtAtomicAbstractUpdateRequest createRequest(
+ UUID nodeId,
+ UUID nearNodeId,
long futId,
GridCacheVersion writeVer,
CacheWriteSynchronizationMode syncMode,
@@ -133,8 +135,10 @@ class GridDhtAtomicUpdateFuture extends GridDhtAtomicAbstractUpdateFuture {
) {
return new GridDhtAtomicUpdateRequest(
cctx.cacheId(),
- node.id(),
+ nodeId,
futId,
+ nearNodeId,
+ updateReq.futureId(),
writeVer,
syncMode,
topVer,