You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2017/03/14 15:00:26 UTC
[32/40] ignite git commit: ignite-4705 Atomic cache protocol change:
notify client node from backups
http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java
index deb9ce4..2826215 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java
@@ -18,26 +18,65 @@
package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
import java.io.Externalizable;
+import java.nio.ByteBuffer;
import java.util.UUID;
import javax.cache.processor.EntryProcessor;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.internal.GridDirectTransient;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
import org.apache.ignite.internal.processors.cache.GridCacheMessage;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
/**
*
*/
public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheMessage implements GridCacheDeployable {
+ /** Skip store flag bit mask. */
+ private static final int DHT_ATOMIC_SKIP_STORE_FLAG_MASK = 0x01;
+
+ /** Keep binary flag. */
+ private static final int DHT_ATOMIC_KEEP_BINARY_FLAG_MASK = 0x02;
+
+ /** Near cache key flag. */
+ private static final int DHT_ATOMIC_NEAR_FLAG_MASK = 0x04;
+
+ /** */
+ static final int DHT_ATOMIC_HAS_RESULT_MASK = 0x08;
+
+ /** */
+ private static final int DHT_ATOMIC_REPLY_WITHOUT_DELAY = 0x10;
+
/** Message index. */
public static final int CACHE_MSG_IDX = nextIndexId();
+ /** Future ID on primary. */
+ protected long futId;
+
+ /** Write version. */
+ protected GridCacheVersion writeVer;
+
+ /** Write synchronization mode. */
+ protected CacheWriteSynchronizationMode syncMode;
+
+ /** Topology version. */
+ protected AffinityTopologyVersion topVer;
+
+ /** Subject ID. */
+ protected UUID subjId;
+
+ /** Task name hash. */
+ protected int taskNameHash;
+
/** Node ID. */
@GridDirectTransient
protected UUID nodeId;
@@ -46,6 +85,15 @@ public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheMessag
@GridDirectTransient
private boolean onRes;
+ /** */
+ private UUID nearNodeId;
+
+ /** */
+ private long nearFutId;
+
+ /** Additional flags. */
+ protected byte flags;
+
/**
* Empty constructor required by {@link Externalizable}.
*/
@@ -59,9 +107,68 @@ public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheMessag
* @param cacheId Cache ID.
* @param nodeId Node ID.
*/
- protected GridDhtAtomicAbstractUpdateRequest(int cacheId, UUID nodeId) {
+ protected GridDhtAtomicAbstractUpdateRequest(int cacheId,
+ UUID nodeId,
+ long futId,
+ GridCacheVersion writeVer,
+ CacheWriteSynchronizationMode syncMode,
+ @NotNull AffinityTopologyVersion topVer,
+ UUID subjId,
+ int taskNameHash,
+ boolean addDepInfo,
+ boolean keepBinary,
+ boolean skipStore
+ ) {
this.cacheId = cacheId;
this.nodeId = nodeId;
+ this.futId = futId;
+ this.writeVer = writeVer;
+ this.syncMode = syncMode;
+ this.topVer = topVer;
+ this.subjId = subjId;
+ this.taskNameHash = taskNameHash;
+ this.addDepInfo = addDepInfo;
+
+ if (skipStore)
+ setFlag(true, DHT_ATOMIC_SKIP_STORE_FLAG_MASK);
+ if (keepBinary)
+ setFlag(true, DHT_ATOMIC_KEEP_BINARY_FLAG_MASK);
+ }
+
+ void nearReplyInfo(UUID nearNodeId, long nearFutId) {
+ assert nearNodeId != null;
+
+ this.nearNodeId = nearNodeId;
+ this.nearFutId = nearFutId;
+ }
+
+ boolean replyWithoutDelay() {
+ return isFlag(DHT_ATOMIC_REPLY_WITHOUT_DELAY);
+ }
+
+ void replyWithoutDelay(boolean replyWithoutDelay) {
+ setFlag(replyWithoutDelay, DHT_ATOMIC_REPLY_WITHOUT_DELAY);
+ }
+
+ /**
+ * @param res Result flag.
+ */
+ void hasResult(boolean res) {
+ setFlag(res, DHT_ATOMIC_HAS_RESULT_MASK);
+ }
+
+ /**
+ * @return Result flag.
+ */
+ private boolean hasResult() {
+ return isFlag(DHT_ATOMIC_HAS_RESULT_MASK);
+ }
+
+ /**
+ * @return Near node ID.
+ */
+ public UUID nearNodeId() {
+ return nearNodeId;
}
/** {@inheritDoc} */
@@ -77,14 +184,25 @@ public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheMessag
}
/**
+ * @return Flags.
+ */
+ public final byte flags() {
+ return flags;
+ }
+
+ /**
* @return Keep binary flag.
*/
- public abstract boolean keepBinary();
+ public final boolean keepBinary() {
+ return isFlag(DHT_ATOMIC_KEEP_BINARY_FLAG_MASK);
+ }
/**
* @return Skip write-through to a persistent storage.
*/
- public abstract boolean skipStore();
+ public final boolean skipStore() {
+ return isFlag(DHT_ATOMIC_SKIP_STORE_FLAG_MASK);
+ }
/**
* @return {@code True} if on response flag changed.
@@ -93,6 +211,13 @@ public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheMessag
return !onRes && (onRes = true);
}
+ /**
+ * @return {@code True} if response was received.
+ */
+ boolean hasResponse() {
+ return onRes;
+ }
+
/** {@inheritDoc} */
@Override public boolean addDeploymentInfo() {
return addDepInfo;
@@ -121,7 +246,6 @@ public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheMessag
* @param conflictExpireTime Conflict expire time (optional).
* @param conflictVer Conflict version (optional).
* @param addPrevVal If {@code true} adds previous value.
- * @param partId Partition.
* @param prevVal Previous value.
* @param updateCntr Update counter.
*/
@@ -132,7 +256,6 @@ public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheMessag
long conflictExpireTime,
@Nullable GridCacheVersion conflictVer,
boolean addPrevVal,
- int partId,
@Nullable CacheObject prevVal,
long updateCntr
);
@@ -158,27 +281,44 @@ public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheMessag
/**
* @return Subject ID.
*/
- public abstract UUID subjectId();
+ public final UUID subjectId() {
+ return subjId;
+ }
/**
* @return Task name.
*/
- public abstract int taskNameHash();
+ public final int taskNameHash() {
+ return taskNameHash;
+ }
+
+ /**
+ * @return Future ID on primary node.
+ */
+ public final long futureId() {
+ return futId;
+ }
/**
- * @return Version assigned on primary node.
+ * @return Future ID on near node.
*/
- public abstract GridCacheVersion futureVersion();
+ public final long nearFutureId() {
+ return nearFutId;
+ }
/**
* @return Write version.
*/
- public abstract GridCacheVersion writeVersion();
+ public final GridCacheVersion writeVersion() {
+ return writeVer;
+ }
/**
* @return Cache write synchronization mode.
*/
- public abstract CacheWriteSynchronizationMode writeSynchronizationMode();
+ public final CacheWriteSynchronizationMode writeSynchronizationMode() {
+ return syncMode;
+ }
/**
* @return Keys size.
@@ -203,12 +343,6 @@ public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheMessag
public abstract KeyCacheObject key(int idx);
/**
- * @param idx Partition index.
- * @return Partition id.
- */
- public abstract int partitionId(int idx);
-
- /**
* @param updCntr Update counter.
* @return Update counter.
*/
@@ -284,4 +418,228 @@ public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheMessag
* @return Optional arguments for entry processor.
*/
@Nullable public abstract Object[] invokeArguments();
+
+ /**
+ * @return {@code True} if near cache update request.
+ */
+ protected final boolean near() {
+ return isFlag(DHT_ATOMIC_NEAR_FLAG_MASK);
+ }
+
+ /**
+ * @param near Near cache update flag.
+ */
+ protected final void near(boolean near) {
+ setFlag(near, DHT_ATOMIC_NEAR_FLAG_MASK);
+ }
+
+ /**
+ * Sets flag mask.
+ *
+ * @param flag Set or clear.
+ * @param mask Mask.
+ */
+ protected final void setFlag(boolean flag, int mask) {
+ flags = flag ? (byte)(flags | mask) : (byte)(flags & ~mask);
+ }
+
+ /**
+ * Reags flag mask.
+ *
+ * @param mask Mask to read.
+ * @return Flag value.
+ */
+ final boolean isFlag(int mask) {
+ return (flags & mask) != 0;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte fieldsCount() {
+ return 12;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ writer.setBuffer(buf);
+
+ if (!super.writeTo(buf, writer))
+ return false;
+
+ if (!writer.isHeaderWritten()) {
+ if (!writer.writeHeader(directType(), fieldsCount()))
+ return false;
+
+ writer.onHeaderWritten();
+ }
+
+ switch (writer.state()) {
+ case 3:
+ if (!writer.writeByte("flags", flags))
+ return false;
+
+ writer.incrementState();
+
+ case 4:
+ if (!writer.writeLong("futId", futId))
+ return false;
+
+ writer.incrementState();
+
+ case 5:
+ if (!writer.writeLong("nearFutId", nearFutId))
+ return false;
+
+ writer.incrementState();
+
+ case 6:
+ if (!writer.writeUuid("nearNodeId", nearNodeId))
+ return false;
+
+ writer.incrementState();
+
+ case 7:
+ if (!writer.writeUuid("subjId", subjId))
+ return false;
+
+ writer.incrementState();
+
+ case 8:
+ if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1))
+ return false;
+
+ writer.incrementState();
+
+ case 9:
+ if (!writer.writeInt("taskNameHash", taskNameHash))
+ return false;
+
+ writer.incrementState();
+
+ case 10:
+ if (!writer.writeMessage("topVer", topVer))
+ return false;
+
+ writer.incrementState();
+
+ case 11:
+ if (!writer.writeMessage("writeVer", writeVer))
+ return false;
+
+ writer.incrementState();
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ reader.setBuffer(buf);
+
+ if (!reader.beforeMessageRead())
+ return false;
+
+ if (!super.readFrom(buf, reader))
+ return false;
+
+ switch (reader.state()) {
+ case 3:
+ flags = reader.readByte("flags");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 4:
+ futId = reader.readLong("futId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 5:
+ nearFutId = reader.readLong("nearFutId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 6:
+ nearNodeId = reader.readUuid("nearNodeId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 7:
+ subjId = reader.readUuid("subjId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 8:
+ byte syncModeOrd;
+
+ syncModeOrd = reader.readByte("syncMode");
+
+ if (!reader.isLastRead())
+ return false;
+
+ syncMode = CacheWriteSynchronizationMode.fromOrdinal(syncModeOrd);
+
+ reader.incrementState();
+
+ case 9:
+ taskNameHash = reader.readInt("taskNameHash");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 10:
+ topVer = reader.readMessage("topVer");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 11:
+ writeVer = reader.readMessage("writeVer");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ }
+
+ return reader.afterMessageRead(GridDhtAtomicAbstractUpdateRequest.class);
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ StringBuilder flags = new StringBuilder();
+
+ if (skipStore())
+ appendFlag(flags, "skipStore");
+ if (keepBinary())
+ appendFlag(flags, "keepBinary");
+ if (near())
+ appendFlag(flags, "near");
+ if (hasResult())
+ appendFlag(flags, "hasRes");
+ if (replyWithoutDelay())
+ appendFlag(flags, "resNoDelay");
+
+ return S.toString(GridDhtAtomicAbstractUpdateRequest.class, this,
+ "flags", flags.toString());
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index cebf4ae..c20ed48 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -38,6 +38,7 @@ import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.NodeStoppingException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
import org.apache.ignite.internal.processors.cache.CacheInvokeEntry;
@@ -57,7 +58,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheMapEntryFactory;
import org.apache.ignite.internal.processors.cache.GridCacheOperation;
import org.apache.ignite.internal.processors.cache.GridCacheReturn;
import org.apache.ignite.internal.processors.cache.GridCacheUpdateAtomicResult;
-import org.apache.ignite.internal.processors.cache.GridDeferredAckMessageSender;
import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter;
@@ -80,6 +80,8 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionEx;
+import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
+import org.apache.ignite.internal.util.GridLongList;
import org.apache.ignite.internal.util.GridUnsafe;
import org.apache.ignite.internal.util.future.GridEmbeddedFuture;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
@@ -90,8 +92,6 @@ import org.apache.ignite.internal.util.typedef.CI2;
import org.apache.ignite.internal.util.typedef.CO;
import org.apache.ignite.internal.util.typedef.CX1;
import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.P1;
-import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
@@ -99,16 +99,14 @@ import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgniteOutClosure;
+import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.plugin.security.SecurityPermission;
import org.apache.ignite.transactions.TransactionIsolation;
import org.jetbrains.annotations.Nullable;
-import org.jsr166.ConcurrentLinkedDeque8;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_ATOMIC_DEFERRED_ACK_BUFFER_SIZE;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_ATOMIC_DEFERRED_ACK_TIMEOUT;
-import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.CLOCK;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC;
-import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.DELETE;
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.UPDATE;
@@ -134,12 +132,17 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
private static final int DEFERRED_UPDATE_RESPONSE_TIMEOUT =
Integer.getInteger(IGNITE_ATOMIC_DEFERRED_ACK_TIMEOUT, 500);
+ /** */
+ private final ThreadLocal<Map<UUID, GridDhtAtomicDeferredUpdateResponse>> defRes =
+ new ThreadLocal<Map<UUID, GridDhtAtomicDeferredUpdateResponse>>() {
+ @Override protected Map<UUID, GridDhtAtomicDeferredUpdateResponse> initialValue() {
+ return new HashMap<>();
+ }
+ };
+
/** Update reply closure. */
@GridToStringExclude
- private CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> updateReplyClos;
-
- /** Pending */
- private GridDeferredAckMessageSender deferredUpdateMsgSnd;
+ private UpdateReplyClosure updateReplyClos;
/** */
private GridNearAtomicCache<K, V> near;
@@ -205,25 +208,13 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
@Override protected void init() {
super.init();
- updateReplyClos = new CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse>() {
+ updateReplyClos = new UpdateReplyClosure() {
@SuppressWarnings("ThrowableResultOfMethodCallIgnored")
@Override public void apply(GridNearAtomicAbstractUpdateRequest req, GridNearAtomicUpdateResponse res) {
- if (ctx.config().getAtomicWriteOrderMode() == CLOCK) {
- assert req.writeSynchronizationMode() != FULL_ASYNC : req;
-
- // Always send reply in CLOCK ordering mode.
- sendNearUpdateReply(res.nodeId(), res);
-
- return;
- }
-
- // Request should be for primary keys only in PRIMARY ordering mode.
- assert req.hasPrimary() : req;
-
if (req.writeSynchronizationMode() != FULL_ASYNC)
sendNearUpdateReply(res.nodeId(), res);
else {
- if (!F.isEmpty(res.remapKeys()))
+ if (res.remapTopologyVersion() != null)
// Remap keys on primary node in FULL_ASYNC mode.
remapToNewPrimary(req);
else if (res.error() != null) {
@@ -240,53 +231,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
@Override public void start() throws IgniteCheckedException {
super.start();
- deferredUpdateMsgSnd = new GridDeferredAckMessageSender(ctx.time(), ctx.closures()) {
- @Override public int getTimeout() {
- return DEFERRED_UPDATE_RESPONSE_TIMEOUT;
- }
-
- @Override public int getBufferSize() {
- return DEFERRED_UPDATE_RESPONSE_BUFFER_SIZE;
- }
-
- @Override public void finish(UUID nodeId, ConcurrentLinkedDeque8<GridCacheVersion> vers) {
- GridDhtAtomicDeferredUpdateResponse msg = new GridDhtAtomicDeferredUpdateResponse(ctx.cacheId(),
- vers, ctx.deploymentEnabled());
-
- try {
- ctx.kernalContext().gateway().readLock();
-
- try {
- ctx.io().send(nodeId, msg, ctx.ioPolicy());
-
- if (msgLog.isDebugEnabled()) {
- msgLog.debug("Sent deferred DHT update response [futIds=" + msg.futureVersions() +
- ", node=" + nodeId + ']');
- }
- }
- finally {
- ctx.kernalContext().gateway().readUnlock();
- }
- }
- catch (IllegalStateException ignored) {
- if (msgLog.isDebugEnabled()) {
- msgLog.debug("Failed to send deferred DHT update response, node is stopping [" +
- "futIds=" + msg.futureVersions() + ", node=" + nodeId + ']');
- }
- }
- catch (ClusterTopologyCheckedException ignored) {
- if (msgLog.isDebugEnabled()) {
- msgLog.debug("Failed to send deferred DHT update response, node left [" +
- "futIds=" + msg.futureVersions() + ", node=" + nodeId + ']');
- }
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to send deferred DHT update response to remote node [" +
- "futIds=" + msg.futureVersions() + ", node=" + nodeId + ']', e);
- }
- }
- };
-
CacheMetricsImpl m = new CacheMetricsImpl(ctx);
if (ctx.dht().near() != null)
@@ -419,6 +363,32 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
}
});
+ ctx.io().addHandler(ctx.cacheId(),
+ GridDhtAtomicNearResponse.class,
+ new CI2<UUID, GridDhtAtomicNearResponse>() {
+ @Override public void apply(UUID uuid, GridDhtAtomicNearResponse msg) {
+ processDhtAtomicNearResponse(uuid, msg);
+ }
+
+ @Override public String toString() {
+ return "GridDhtAtomicNearResponse handler " +
+ "[msgIdx=" + GridDhtAtomicNearResponse.CACHE_MSG_IDX + ']';
+ }
+ });
+
+ ctx.io().addHandler(ctx.cacheId(),
+ GridNearAtomicCheckUpdateRequest.class,
+ new CI2<UUID, GridNearAtomicCheckUpdateRequest>() {
+ @Override public void apply(UUID uuid, GridNearAtomicCheckUpdateRequest msg) {
+ processCheckUpdateRequest(uuid, msg);
+ }
+
+ @Override public String toString() {
+ return "GridNearAtomicCheckUpdateRequest handler " +
+ "[msgIdx=" + GridNearAtomicCheckUpdateRequest.CACHE_MSG_IDX + ']';
+ }
+ });
+
if (near == null) {
ctx.io().addHandler(
ctx.cacheId(),
@@ -450,11 +420,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
}
}
- /** {@inheritDoc} */
- @Override public void stop() {
- deferredUpdateMsgSnd.stop();
- }
-
/**
* @param near Near cache.
*/
@@ -1341,9 +1306,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
CacheEntryPredicate[] filters = CU.filterArray(filter);
- if (conflictPutVal == null &&
- conflictRmvVer == null &&
- !isFastMap(filters, op)) {
+ if (conflictPutVal == null && conflictRmvVer == null) {
return new GridNearAtomicSingleUpdateFuture(
ctx,
this,
@@ -1389,19 +1352,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
}
/**
- * Whether this is fast-map operation.
- *
- * @param filters Filters.
- * @param op Operation.
- * @return {@code True} if fast-map.
- */
- public boolean isFastMap(CacheEntryPredicate[] filters, GridCacheOperation op) {
- return F.isEmpty(filters) && op != TRANSFORM && ctx.config().getWriteSynchronizationMode() == FULL_SYNC &&
- ctx.config().getAtomicWriteOrderMode() == CLOCK &&
- !(ctx.writeThrough() && ctx.config().getInterceptor() != null);
- }
-
- /**
* Entry point for all public API remove methods.
*
* @param keys Keys to remove.
@@ -1696,10 +1646,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
* @param req Update request.
* @param completionCb Completion callback.
*/
- public void updateAllAsyncInternal(
+ void updateAllAsyncInternal(
final UUID nodeId,
final GridNearAtomicAbstractUpdateRequest req,
- final CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> completionCb
+ final UpdateReplyClosure completionCb
) {
IgniteInternalFuture<Object> forceFut = preldr.request(req, req.topologyVersion());
@@ -1748,12 +1698,14 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
*/
private void onForceKeysError(final UUID nodeId,
final GridNearAtomicAbstractUpdateRequest req,
- final CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> completionCb,
+ final UpdateReplyClosure completionCb,
IgniteCheckedException e
) {
GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(ctx.cacheId(),
nodeId,
- req.futureVersion(),
+ req.futureId(),
+ req.partition(),
+ false,
ctx.deploymentEnabled());
res.addFailedKeys(req.keys(), e);
@@ -1771,12 +1723,23 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
private void updateAllAsyncInternal0(
UUID nodeId,
GridNearAtomicAbstractUpdateRequest req,
- CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> completionCb
+ UpdateReplyClosure completionCb
) {
- GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(ctx.cacheId(), nodeId, req.futureVersion(),
- ctx.deploymentEnabled());
+ ClusterNode node = ctx.discovery().node(nodeId);
+
+ if (node == null) {
+ U.warn(msgLog, "Skip near update request, node originated update request left [" +
+ "futId=" + req.futureId() + ", node=" + nodeId + ']');
- res.partition(req.partition());
+ return;
+ }
+
+ GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(ctx.cacheId(),
+ nodeId,
+ req.futureId(),
+ req.partition(),
+ false,
+ ctx.deploymentEnabled());
assert !req.returnValue() || (req.operation() == TRANSFORM || req.size() == 1);
@@ -1791,7 +1754,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
try {
// If batch store update is enabled, we need to lock all entries.
// First, need to acquire locks on cache entries, then check filter.
- List<GridDhtCacheEntry> locked = lockEntries(req, req.topologyVersion());
+ List<GridDhtCacheEntry> locked = null;
Collection<IgniteBiTuple<GridDhtCacheEntry, GridCacheVersion>> deleted = null;
@@ -1810,43 +1773,29 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
return;
}
- // Do not check topology version for CLOCK versioning since
- // partition exchange will wait for near update future (if future is on server node).
- // Also do not check topology version if topology was locked on near node by
+ // Do not check topology version if topology was locked on near node by
// external transaction or explicit lock.
- if ((req.fastMap() && !req.clientRequest()) || req.topologyLocked() ||
- !needRemap(req.topologyVersion(), top.topologyVersion())) {
- ClusterNode node = ctx.discovery().node(nodeId);
-
- if (node == null) {
- U.warn(msgLog, "Skip near update request, node originated update request left [" +
- "futId=" + req.futureVersion() + ", node=" + nodeId + ']');
-
- return;
- }
+ if (req.topologyLocked() || !needRemap(req.topologyVersion(), top.topologyVersion())) {
+ locked = lockEntries(req, req.topologyVersion());
boolean hasNear = ctx.discovery().cacheNearNode(node, name());
- GridCacheVersion ver = req.updateVersion();
+ // Assign next version for update inside entries lock.
+ GridCacheVersion ver = ctx.versions().next(top.topologyVersion());
- if (ver == null) {
- // Assign next version for update inside entries lock.
- ver = ctx.versions().next(top.topologyVersion());
+ if (hasNear)
+ res.nearVersion(ver);
- if (hasNear)
- res.nearVersion(ver);
-
- if (msgLog.isDebugEnabled()) {
- msgLog.debug("Assigned update version [futId=" + req.futureVersion() +
- ", writeVer=" + ver + ']');
- }
+ if (msgLog.isDebugEnabled()) {
+ msgLog.debug("Assigned update version [futId=" + req.futureId() +
+ ", writeVer=" + ver + ']');
}
assert ver != null : "Got null version for update request: " + req;
boolean sndPrevVal = !top.rebalanceFinished(req.topologyVersion());
- dhtFut = createDhtFuture(ver, req, res, completionCb, false);
+ dhtFut = createDhtFuture(ver, req);
expiry = expiryPolicy(req.expiry());
@@ -1866,7 +1815,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
locked,
ver,
dhtFut,
- completionCb,
ctx.isDrEnabled(),
taskName,
expiry,
@@ -1886,7 +1834,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
locked,
ver,
dhtFut,
- completionCb,
ctx.isDrEnabled(),
taskName,
expiry,
@@ -1902,15 +1849,15 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
res.returnValue(retVal);
- if (req.writeSynchronizationMode() != FULL_ASYNC)
- req.cleanup(!node.isLocal());
-
if (dhtFut != null)
- ctx.mvcc().addAtomicFuture(dhtFut.version(), dhtFut);
+ ctx.mvcc().addAtomicFuture(dhtFut.id(), dhtFut);
}
- else
+ else {
// Should remap all keys.
remap = true;
+
+ res.remapTopologyVersion(top.topologyVersion());
+ }
}
finally {
top.readUnlock();
@@ -1936,12 +1883,12 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
}
}
catch (GridDhtInvalidPartitionException ignore) {
- assert !req.fastMap() || req.clientRequest() : req;
-
if (log.isDebugEnabled())
log.debug("Caught invalid partition exception for cache entry (will remap update request): " + req);
remap = true;
+
+ res.remapTopologyVersion(ctx.topology().topologyVersion());
}
catch (Throwable e) {
// At least RuntimeException can be thrown by the code above when GridCacheContext is cleaned and there is
@@ -1961,18 +1908,14 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
if (remap) {
assert dhtFut == null;
- res.remapKeys(req.keys());
-
completionCb.apply(req, res);
}
- else {
- // If there are backups, map backup update future.
+ else
if (dhtFut != null)
- dhtFut.map();
- // Otherwise, complete the call.
- else
- completionCb.apply(req, res);
- }
+ dhtFut.map(node, res.returnValue(), res, completionCb);
+
+ if (req.writeSynchronizationMode() != FULL_ASYNC)
+ req.cleanup(!node.isLocal());
sendTtlUpdateRequest(expiry);
}
@@ -1987,7 +1930,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
* @param locked Locked entries.
* @param ver Assigned version.
* @param dhtFut Optional DHT future.
- * @param completionCb Completion callback to invoke when DHT future is completed.
* @param replicate Whether replication is enabled.
* @param taskName Task name.
* @param expiry Expiry policy.
@@ -2004,7 +1946,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
final List<GridDhtCacheEntry> locked,
final GridCacheVersion ver,
@Nullable GridDhtAtomicAbstractUpdateFuture dhtFut,
- final CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> completionCb,
final boolean replicate,
final String taskName,
@Nullable final IgniteCacheExpiryPolicy expiry,
@@ -2049,9 +1990,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
for (int i = 0; i < locked.size(); i++) {
GridDhtCacheEntry entry = locked.get(i);
- if (entry == null)
- continue;
-
try {
if (!checkFilter(entry, req, res)) {
if (expiry != null && entry.hasValue()) {
@@ -2155,7 +2093,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
null,
entryProcessorMap,
dhtFut,
- completionCb,
req,
res,
replicate,
@@ -2204,7 +2141,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
rmvKeys,
entryProcessorMap,
dhtFut,
- completionCb,
req,
res,
replicate,
@@ -2331,7 +2267,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
rmvKeys,
entryProcessorMap,
dhtFut,
- completionCb,
req,
res,
replicate,
@@ -2404,14 +2339,13 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
/**
* Updates locked entries one-by-one.
*
- * @param node Originating node.
+ * @param nearNode Originating node.
* @param hasNear {@code True} if originating node has near cache.
* @param req Update request.
* @param res Update response.
* @param locked Locked entries.
* @param ver Assigned update version.
* @param dhtFut Optional DHT future.
- * @param completionCb Completion callback to invoke when DHT future is completed.
* @param replicate Whether DR is enabled for that cache.
* @param taskName Task name.
* @param expiry Expiry policy.
@@ -2420,14 +2354,13 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
* @throws GridCacheEntryRemovedException Should be never thrown.
*/
private UpdateSingleResult updateSingle(
- ClusterNode node,
+ ClusterNode nearNode,
boolean hasNear,
GridNearAtomicAbstractUpdateRequest req,
GridNearAtomicUpdateResponse res,
List<GridDhtCacheEntry> locked,
GridCacheVersion ver,
@Nullable GridDhtAtomicAbstractUpdateFuture dhtFut,
- CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> completionCb,
boolean replicate,
String taskName,
@Nullable IgniteCacheExpiryPolicy expiry,
@@ -2440,10 +2373,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
boolean checkReaders = hasNear || ctx.discovery().hasNearCache(ctx.cacheId(), topVer);
- boolean readersOnly = false;
-
boolean intercept = ctx.config().getInterceptor() != null;
+ AffinityAssignment affAssignment = ctx.affinity().assignment(topVer);
+
// Avoid iterator creation.
for (int i = 0; i < req.size(); i++) {
KeyCacheObject k = req.key(i);
@@ -2455,18 +2388,12 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
try {
GridDhtCacheEntry entry = locked.get(i);
- if (entry == null)
- continue;
-
GridCacheVersion newConflictVer = req.conflictVersion(i);
long newConflictTtl = req.conflictTtl(i);
long newConflictExpireTime = req.conflictExpireTime(i);
assert !(newConflictVer instanceof GridCacheVersionEx) : newConflictVer;
- boolean primary = !req.fastMap() || ctx.affinity().primaryByPartition(ctx.localNode(), entry.partition(),
- req.topologyVersion());
-
Object writeVal = op == TRANSFORM ? req.entryProcessor(i) : req.writeValue(i);
Collection<UUID> readers = null;
@@ -2474,46 +2401,39 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
if (checkReaders) {
readers = entry.readers();
- filteredReaders = F.view(entry.readers(), F.notEqualTo(node.id()));
+ filteredReaders = F.view(entry.readers(), F.notEqualTo(nearNode.id()));
}
GridCacheUpdateAtomicResult updRes = entry.innerUpdate(
ver,
- node.id(),
+ nearNode.id(),
locNodeId,
op,
writeVal,
req.invokeArguments(),
- (primary || (ctx.store().isLocal() && !ctx.shared().localStorePrimaryOnly()))
- && writeThrough() && !req.skipStore(),
+ writeThrough() && !req.skipStore(),
!req.skipStore(),
sndPrevVal || req.returnValue(),
req.keepBinary(),
expiry,
- true,
- true,
- primary,
- ctx.config().getAtomicWriteOrderMode() == CLOCK, // Check version in CLOCK mode on primary node.
+ /*event*/true,
+ /*metrics*/true,
+ /*primary*/true,
+ /*verCheck*/false,
topVer,
req.filter(),
- replicate ? primary ? DR_PRIMARY : DR_BACKUP : DR_NONE,
+ replicate ? DR_PRIMARY : DR_NONE,
newConflictTtl,
newConflictExpireTime,
newConflictVer,
- true,
+ /*conflictResolve*/true,
intercept,
req.subjectId(),
taskName,
- null,
- null,
+ /*prevVal*/null,
+ /*updateCntr*/null,
dhtFut);
- if (dhtFut == null && !F.isEmpty(filteredReaders)) {
- dhtFut = createDhtFuture(ver, req, res, completionCb, true);
-
- readersOnly = true;
- }
-
if (dhtFut != null) {
if (updRes.sendToDht()) { // Send to backups even in case of remove-remove scenarios.
GridCacheVersionConflictContext<?, ?> conflictCtx = updRes.conflictResolveResult();
@@ -2525,20 +2445,21 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
EntryProcessor<Object, Object, Object> entryProcessor = null;
- if (!readersOnly) {
- dhtFut.addWriteEntry(entry,
- updRes.newValue(),
- entryProcessor,
- updRes.newTtl(),
- updRes.conflictExpireTime(),
- newConflictVer,
- sndPrevVal,
- updRes.oldValue(),
- updRes.updateCounter());
- }
+ dhtFut.addWriteEntry(
+ affAssignment,
+ entry,
+ updRes.newValue(),
+ entryProcessor,
+ updRes.newTtl(),
+ updRes.conflictExpireTime(),
+ newConflictVer,
+ sndPrevVal,
+ updRes.oldValue(),
+ updRes.updateCounter());
if (!F.isEmpty(filteredReaders))
- dhtFut.addNearWriteEntries(filteredReaders,
+ dhtFut.addNearWriteEntries(
+ filteredReaders,
entry,
updRes.newValue(),
entryProcessor,
@@ -2553,8 +2474,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
}
if (hasNear) {
- if (primary && updRes.sendToDht()) {
- if (!ctx.affinity().partitionBelongs(node, entry.partition(), topVer)) {
+ if (updRes.sendToDht()) {
+ if (!ctx.affinity().partitionBelongs(nearNode, entry.partition(), topVer)) {
// If put the same value as in request then do not need to send it back.
if (op == TRANSFORM || writeVal != updRes.newValue()) {
res.addNearValue(i,
@@ -2566,13 +2487,13 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
res.addNearTtl(i, updRes.newTtl(), updRes.conflictExpireTime());
if (updRes.newValue() != null) {
- IgniteInternalFuture<Boolean> f = entry.addReader(node.id(), req.messageId(), topVer);
+ IgniteInternalFuture<Boolean> f = entry.addReader(nearNode.id(), req.messageId(), topVer);
assert f == null : f;
}
}
- else if (F.contains(readers, node.id())) // Reader became primary or backup.
- entry.removeReader(node.id(), req.messageId());
+ else if (F.contains(readers, nearNode.id())) // Reader became primary or backup.
+ entry.removeReader(nearNode.id(), req.messageId());
else
res.addSkippedIndex(i);
}
@@ -2594,7 +2515,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
if (compRes != null && (compRes.get1() != null || compRes.get2() != null)) {
if (retVal == null)
- retVal = new GridCacheReturn(node.isLocal());
+ retVal = new GridCacheReturn(nearNode.isLocal());
retVal.addEntryProcessResult(ctx,
k,
@@ -2610,7 +2531,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
CacheObject ret = updRes.oldValue();
retVal = new GridCacheReturn(ctx,
- node.isLocal(),
+ nearNode.isLocal(),
req.keepBinary(),
req.returnValue() ? ret : null,
updRes.success());
@@ -2630,13 +2551,12 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
* @param firstEntryIdx Index of the first entry in the request keys collection.
* @param entries Entries to update.
* @param ver Version to set.
- * @param node Originating node.
+ * @param nearNode Originating node.
* @param writeVals Write values.
* @param putMap Values to put.
* @param rmvKeys Keys to remove.
* @param entryProcessorMap Entry processors.
* @param dhtFut DHT update future if has backups.
- * @param completionCb Completion callback to invoke when DHT future is completed.
* @param req Request.
* @param res Response.
* @param replicate Whether replication is enabled.
@@ -2652,13 +2572,12 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
final int firstEntryIdx,
final List<GridDhtCacheEntry> entries,
final GridCacheVersion ver,
- final ClusterNode node,
+ final ClusterNode nearNode,
@Nullable final List<CacheObject> writeVals,
@Nullable final Map<KeyCacheObject, CacheObject> putMap,
@Nullable final Collection<KeyCacheObject> rmvKeys,
@Nullable final Map<KeyCacheObject, EntryProcessor<Object, Object, Object>> entryProcessorMap,
@Nullable GridDhtAtomicAbstractUpdateFuture dhtFut,
- final CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> completionCb,
final GridNearAtomicAbstractUpdateRequest req,
final GridNearAtomicUpdateResponse res,
final boolean replicate,
@@ -2681,17 +2600,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
GridCacheOperation op;
if (putMap != null) {
- // If fast mapping, filter primary keys for write to store.
- Map<KeyCacheObject, CacheObject> storeMap = req.fastMap() ?
- F.view(putMap, new P1<CacheObject>() {
- @Override public boolean apply(CacheObject key) {
- return ctx.affinity().primaryByKey(ctx.localNode(), key, req.topologyVersion());
- }
- }) :
- putMap;
-
try {
- ctx.store().putAll(null, F.viewReadOnly(storeMap, new C1<CacheObject, IgniteBiTuple<CacheObject, GridCacheVersion>>() {
+ ctx.store().putAll(null, F.viewReadOnly(putMap, new C1<CacheObject, IgniteBiTuple<CacheObject, GridCacheVersion>>() {
@Override public IgniteBiTuple<CacheObject, GridCacheVersion> apply(CacheObject v) {
return F.t(v, ver);
}
@@ -2704,17 +2614,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
op = UPDATE;
}
else {
- // If fast mapping, filter primary keys for write to store.
- Collection<KeyCacheObject> storeKeys = req.fastMap() ?
- F.view(rmvKeys, new P1<Object>() {
- @Override public boolean apply(Object key) {
- return ctx.affinity().primaryByKey(ctx.localNode(), key, req.topologyVersion());
- }
- }) :
- rmvKeys;
-
try {
- ctx.store().removeAll(null, storeKeys);
+ ctx.store().removeAll(null, rmvKeys);
}
catch (CacheStorePartialUpdateException e) {
storeErr = e;
@@ -2725,6 +2626,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
boolean intercept = ctx.config().getInterceptor() != null;
+ AffinityAssignment affAssignment = ctx.affinity().assignment(topVer);
+
// Avoid iterator creation.
for (int i = 0; i < entries.size(); i++) {
GridDhtCacheEntry entry = entries.get(i);
@@ -2747,21 +2650,17 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
assert writeVal != null || op == DELETE : "null write value found.";
- boolean primary = !req.fastMap() || ctx.affinity().primaryByPartition(ctx.localNode(),
- entry.partition(),
- req.topologyVersion());
-
Collection<UUID> readers = null;
Collection<UUID> filteredReaders = null;
if (checkReaders) {
readers = entry.readers();
- filteredReaders = F.view(entry.readers(), F.notEqualTo(node.id()));
+ filteredReaders = F.view(entry.readers(), F.notEqualTo(nearNode.id()));
}
GridCacheUpdateAtomicResult updRes = entry.innerUpdate(
ver,
- node.id(),
+ nearNode.id(),
locNodeId,
op,
writeVal,
@@ -2773,11 +2672,11 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
expiry,
/*event*/true,
/*metrics*/true,
- primary,
- ctx.config().getAtomicWriteOrderMode() == CLOCK, // Check version in CLOCK mode on primary node.
+ /*primary*/true,
+ /*verCheck*/false,
topVer,
null,
- replicate ? primary ? DR_PRIMARY : DR_BACKUP : DR_NONE,
+ replicate ? DR_PRIMARY : DR_NONE,
CU.TTL_NOT_CHANGED,
CU.EXPIRE_TIME_CALCULATE,
null,
@@ -2811,30 +2710,25 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
batchRes.addDeleted(entry, updRes, entries);
- if (dhtFut == null && !F.isEmpty(filteredReaders)) {
- dhtFut = createDhtFuture(ver, req, res, completionCb, true);
-
- batchRes.readersOnly(true);
- }
-
if (dhtFut != null) {
EntryProcessor<Object, Object, Object> entryProcessor =
entryProcessorMap == null ? null : entryProcessorMap.get(entry.key());
- if (!batchRes.readersOnly()) {
- dhtFut.addWriteEntry(entry,
- writeVal,
- entryProcessor,
- updRes.newTtl(),
- CU.EXPIRE_TIME_CALCULATE,
- null,
- sndPrevVal,
- updRes.oldValue(),
- updRes.updateCounter());
- }
+ dhtFut.addWriteEntry(
+ affAssignment,
+ entry,
+ writeVal,
+ entryProcessor,
+ updRes.newTtl(),
+ CU.EXPIRE_TIME_CALCULATE,
+ null,
+ sndPrevVal,
+ updRes.oldValue(),
+ updRes.updateCounter());
if (!F.isEmpty(filteredReaders))
- dhtFut.addNearWriteEntries(filteredReaders,
+ dhtFut.addNearWriteEntries(
+ filteredReaders,
entry,
writeVal,
entryProcessor,
@@ -2843,30 +2737,26 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
}
if (hasNear) {
- if (primary) {
- if (!ctx.affinity().partitionBelongs(node, entry.partition(), topVer)) {
- int idx = firstEntryIdx + i;
-
- if (req.operation() == TRANSFORM) {
- res.addNearValue(idx,
- writeVal,
- updRes.newTtl(),
- CU.EXPIRE_TIME_CALCULATE);
- }
- else
- res.addNearTtl(idx, updRes.newTtl(), CU.EXPIRE_TIME_CALCULATE);
+ if (!ctx.affinity().partitionBelongs(nearNode, entry.partition(), topVer)) {
+ int idx = firstEntryIdx + i;
- if (writeVal != null || entry.hasValue()) {
- IgniteInternalFuture<Boolean> f = entry.addReader(node.id(), req.messageId(), topVer);
-
- assert f == null : f;
- }
+ if (req.operation() == TRANSFORM) {
+ res.addNearValue(idx,
+ writeVal,
+ updRes.newTtl(),
+ CU.EXPIRE_TIME_CALCULATE);
}
- else if (readers.contains(node.id())) // Reader became primary or backup.
- entry.removeReader(node.id(), req.messageId());
else
- res.addSkippedIndex(firstEntryIdx + i);
+ res.addNearTtl(idx, updRes.newTtl(), CU.EXPIRE_TIME_CALCULATE);
+
+ if (writeVal != null || entry.hasValue()) {
+ IgniteInternalFuture<Boolean> f = entry.addReader(nearNode.id(), req.messageId(), topVer);
+
+ assert f == null : f;
+ }
}
+ else if (readers.contains(nearNode.id())) // Reader became primary or backup.
+ entry.removeReader(nearNode.id(), req.messageId());
else
res.addSkippedIndex(firstEntryIdx + i);
}
@@ -2879,7 +2769,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
}
}
catch (IgniteCheckedException e) {
- res.addFailedKeys(putMap != null ? putMap.keySet() : rmvKeys, e, ctx);
+ res.addFailedKeys(putMap != null ? putMap.keySet() : rmvKeys, e);
}
if (storeErr != null) {
@@ -2888,7 +2778,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
for (Object failedKey : storeErr.failedKeys())
failed.add(ctx.toCacheKeyObject(failedKey));
- res.addFailedKeys(failed, storeErr.getCause(), ctx);
+ res.addFailedKeys(failed, storeErr.getCause());
}
return dhtFut;
@@ -2910,23 +2800,14 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
KeyCacheObject key = req.key(0);
while (true) {
- try {
- GridDhtCacheEntry entry = entryExx(key, topVer);
+ GridDhtCacheEntry entry = entryExx(key, topVer);
- GridUnsafe.monitorEnter(entry);
+ GridUnsafe.monitorEnter(entry);
- if (entry.obsolete())
- GridUnsafe.monitorExit(entry);
- else
- return Collections.singletonList(entry);
- }
- catch (GridDhtInvalidPartitionException e) {
- // Ignore invalid partition exception in CLOCK ordering mode.
- if (ctx.config().getAtomicWriteOrderMode() == CLOCK)
- return Collections.singletonList(null);
- else
- throw e;
- }
+ if (entry.obsolete())
+ GridUnsafe.monitorExit(entry);
+ else
+ return Collections.singletonList(entry);
}
}
else {
@@ -2934,18 +2815,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
while (true) {
for (int i = 0; i < req.size(); i++) {
- try {
- GridDhtCacheEntry entry = entryExx(req.key(i), topVer);
+ GridDhtCacheEntry entry = entryExx(req.key(i), topVer);
- locked.add(entry);
- }
- catch (GridDhtInvalidPartitionException e) {
- // Ignore invalid partition exception in CLOCK ordering mode.
- if (ctx.config().getAtomicWriteOrderMode() == CLOCK)
- locked.add(null);
- else
- throw e;
- }
+ locked.add(entry);
}
boolean retry = false;
@@ -3055,7 +2927,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
/**
* @param req Request to remap.
*/
- private void remapToNewPrimary(GridNearAtomicAbstractUpdateRequest req) {
+ void remapToNewPrimary(GridNearAtomicAbstractUpdateRequest req) {
assert req.writeSynchronizationMode() == FULL_ASYNC : req;
if (log.isDebugEnabled())
@@ -3098,7 +2970,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
drPutVals = null;
}
- final GridNearAtomicUpdateFuture updateFut = new GridNearAtomicUpdateFuture(
+ GridNearAtomicUpdateFuture updateFut = new GridNearAtomicUpdateFuture(
ctx,
this,
ctx.config().getWriteSynchronizationMode(),
@@ -3127,43 +2999,16 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
*
* @param writeVer Write version.
* @param updateReq Update request.
- * @param updateRes Update response.
- * @param completionCb Completion callback to invoke when future is completed.
- * @param force If {@code true} then creates future without optimizations checks.
- * @return Backup update future or {@code null} if there are no backups.
+ * @return Backup update future.
*/
- @Nullable private GridDhtAtomicAbstractUpdateFuture createDhtFuture(
+ private GridDhtAtomicAbstractUpdateFuture createDhtFuture(
GridCacheVersion writeVer,
- GridNearAtomicAbstractUpdateRequest updateReq,
- GridNearAtomicUpdateResponse updateRes,
- CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> completionCb,
- boolean force
+ GridNearAtomicAbstractUpdateRequest updateReq
) {
- if (!force) {
- if (updateReq.fastMap())
- return null;
-
- AffinityTopologyVersion topVer = updateReq.topologyVersion();
-
- Collection<ClusterNode> nodes = ctx.kernalContext().discovery().cacheAffinityNodes(ctx.cacheId(), topVer);
-
- // We are on primary node for some key.
- assert !nodes.isEmpty() : "Failed to find affinity nodes [name=" + name() + ", topVer=" + topVer +
- ctx.kernalContext().discovery().discoCache(topVer) + ']';
-
- if (nodes.size() == 1) {
- if (log.isDebugEnabled())
- log.debug("Partitioned cache topology has only one node, will not create DHT atomic update future " +
- "[topVer=" + topVer + ", updateReq=" + updateReq + ']');
-
- return null;
- }
- }
-
if (updateReq.size() == 1)
- return new GridDhtAtomicSingleUpdateFuture(ctx, completionCb, writeVer, updateReq, updateRes);
+ return new GridDhtAtomicSingleUpdateFuture(ctx, writeVer, updateReq);
else
- return new GridDhtAtomicUpdateFuture(ctx, completionCb, writeVer, updateReq, updateRes);
+ return new GridDhtAtomicUpdateFuture(ctx, writeVer, updateReq);
}
/**
@@ -3172,13 +3017,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
*/
private void processNearAtomicUpdateRequest(UUID nodeId, GridNearAtomicAbstractUpdateRequest req) {
if (msgLog.isDebugEnabled()) {
- msgLog.debug("Received near atomic update request [futId=" + req.futureVersion() +
- ", writeVer=" + req.updateVersion() +
+ msgLog.debug("Received near atomic update request [futId=" + req.futureId() +
", node=" + nodeId + ']');
}
- req.nodeId(ctx.localNodeId());
-
updateAllAsyncInternal(nodeId, req, updateReplyClos);
}
@@ -3189,20 +3031,41 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
@SuppressWarnings("unchecked")
private void processNearAtomicUpdateResponse(UUID nodeId, GridNearAtomicUpdateResponse res) {
if (msgLog.isDebugEnabled())
- msgLog.debug("Received near atomic update response " +
- "[futId=" + res.futureVersion() + ", node=" + nodeId + ']');
+ msgLog.debug("Received near atomic update response [futId" + res.futureId() + ", node=" + nodeId + ']');
res.nodeId(ctx.localNodeId());
GridNearAtomicAbstractUpdateFuture fut =
- (GridNearAtomicAbstractUpdateFuture)ctx.mvcc().atomicFuture(res.futureVersion());
+ (GridNearAtomicAbstractUpdateFuture)ctx.mvcc().atomicFuture(res.futureId());
if (fut != null)
- fut.onResult(nodeId, res, false);
-
+ fut.onPrimaryResponse(nodeId, res, false);
else
U.warn(msgLog, "Failed to find near update future for update response (will ignore) " +
- "[futId" + res.futureVersion() + ", node=" + nodeId + ", res=" + res + ']');
+ "[futId=" + res.futureId() + ", node=" + nodeId + ", res=" + res + ']');
+ }
+
+ /**
+ * @param nodeId Node ID.
+ * @param checkReq Request.
+ */
+ private void processCheckUpdateRequest(UUID nodeId, GridNearAtomicCheckUpdateRequest checkReq) {
+ /*
+ * Message is processed in the same stripe, so primary already processed update request. It is possible
+ * response was not sent if operation result was empty. Near node will get original response or this one.
+ */
+ GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(ctx.cacheId(),
+ nodeId,
+ checkReq.futureId(),
+ checkReq.partition(),
+ false,
+ false);
+
+ GridCacheReturn ret = new GridCacheReturn(false, true);
+
+ res.returnValue(ret);
+
+ sendNearUpdateReply(nodeId, res);
}
/**
@@ -3210,20 +3073,28 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
* @param req Dht atomic update request.
*/
private void processDhtAtomicUpdateRequest(UUID nodeId, GridDhtAtomicAbstractUpdateRequest req) {
+ assert Thread.currentThread().getName().startsWith("sys-stripe-") : Thread.currentThread().getName();
+
if (msgLog.isDebugEnabled()) {
- msgLog.debug("Received DHT atomic update request [futId=" + req.futureVersion() +
+ msgLog.debug("Received DHT atomic update request [futId=" + req.futureId() +
", writeVer=" + req.writeVersion() + ", node=" + nodeId + ']');
}
+ assert req.partition() >= 0 : req;
+
GridCacheVersion ver = req.writeVersion();
- // Always send update reply.
- GridDhtAtomicUpdateResponse res = new GridDhtAtomicUpdateResponse(ctx.cacheId(), req.futureVersion(),
- ctx.deploymentEnabled());
+ GridDhtAtomicNearResponse nearRes = null;
- res.partition(req.partition());
+ if (req.nearNodeId() != null) {
+ nearRes = new GridDhtAtomicNearResponse(ctx.cacheId(),
+ req.partition(),
+ req.nearFutureId(),
+ nodeId,
+ req.flags());
+ }
- Boolean replicate = ctx.isDrEnabled();
+ boolean replicate = ctx.isDrEnabled();
boolean intercept = req.forceTransformBackups() && ctx.config().getInterceptor() != null;
@@ -3305,48 +3176,208 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
// Ignore.
}
catch (IgniteCheckedException e) {
- res.addFailedKey(key, new IgniteCheckedException("Failed to update key on backup node: " + key, e));
+ IgniteCheckedException err =
+ new IgniteCheckedException("Failed to update key on backup node: " + key, e);
+
+ if (nearRes != null)
+ nearRes.addFailedKey(key, err);
+
+ U.error(log, "Failed to update key on backup node: " + key, e);
+ }
+ }
+
+ GridDhtAtomicUpdateResponse dhtRes = null;
+
+ if (isNearEnabled(cacheCfg)) {
+ List<KeyCacheObject> nearEvicted =
+ ((GridNearAtomicCache<K, V>)near()).processDhtAtomicUpdateRequest(nodeId, req, nearRes);
+
+ if (nearEvicted != null) {
+ dhtRes = new GridDhtAtomicUpdateResponse(ctx.cacheId(),
+ req.partition(),
+ req.futureId(),
+ ctx.deploymentEnabled());
+
+ dhtRes.nearEvicted(nearEvicted);
}
}
- if (isNearEnabled(cacheCfg))
- ((GridNearAtomicCache<K, V>)near()).processDhtAtomicUpdateRequest(nodeId, req, res);
+ if (nearRes != null)
+ sendDhtNearResponse(req, nearRes);
+ if (dhtRes == null && req.replyWithoutDelay()) {
+ dhtRes = new GridDhtAtomicUpdateResponse(ctx.cacheId(),
+ req.partition(),
+ req.futureId(),
+ ctx.deploymentEnabled());
+ }
+
+ if (dhtRes != null)
+ sendDhtPrimaryResponse(nodeId, req, dhtRes);
+ else
+ sendDeferredUpdateResponse(req.partition(), nodeId, req.futureId());
+ }
+
+ /**
+ * @param nodeId Primary node ID.
+ * @param req Request.
+ * @param dhtRes Response to send.
+ */
+ private void sendDhtPrimaryResponse(UUID nodeId,
+ GridDhtAtomicAbstractUpdateRequest req,
+ GridDhtAtomicUpdateResponse dhtRes) {
try {
- if (res.failedKeys() != null || res.nearEvicted() != null || req.writeSynchronizationMode() == FULL_SYNC) {
- ctx.io().send(nodeId, res, ctx.ioPolicy());
+ ctx.io().send(nodeId, dhtRes, ctx.ioPolicy());
- if (msgLog.isDebugEnabled()) {
- msgLog.debug("Sent DHT atomic update response [futId=" + req.futureVersion() +
- ", writeVer=" + req.writeVersion() + ", node=" + nodeId + ']');
- }
+ if (msgLog.isDebugEnabled()) {
+ msgLog.debug("Sent DHT response [futId=" + req.futureId() +
+ ", nearFutId=" + req.nearFutureId() +
+ ", writeVer=" + req.writeVersion() +
+ ", node=" + nodeId + ']');
}
- else {
- if (msgLog.isDebugEnabled()) {
- msgLog.debug("Will send deferred DHT atomic update response [futId=" + req.futureVersion() +
- ", writeVer=" + req.writeVersion() + ", node=" + nodeId + ']');
- }
+ }
+ catch (ClusterTopologyCheckedException ignored) {
+ U.warn(msgLog, "Failed to send DHT response, node left [futId=" + req.futureId() +
+ ", nearFutId=" + req.nearFutureId() +
+ ", node=" + nodeId + ']');
+ }
+ catch (IgniteCheckedException e) {
+ U.error(msgLog, "Failed to send DHT near response [futId=" + req.futureId() +
+ ", nearFutId=" + req.nearFutureId() +
+ ", node=" + nodeId +
+ ", res=" + dhtRes + ']', e);
+ }
+ }
+
+ /**
+ * @param part Partition.
+ * @param primaryId Primary ID.
+ * @param futId Future ID.
+ */
+ private void sendDeferredUpdateResponse(int part, UUID primaryId, long futId) {
+ Map<UUID, GridDhtAtomicDeferredUpdateResponse> resMap = defRes.get();
+
+ GridDhtAtomicDeferredUpdateResponse msg = resMap.get(primaryId);
+
+ if (msg == null) {
+ msg = new GridDhtAtomicDeferredUpdateResponse(ctx.cacheId(),
+ new GridLongList(DEFERRED_UPDATE_RESPONSE_BUFFER_SIZE));
+
+ if (DEFERRED_UPDATE_RESPONSE_TIMEOUT > 0) {
+ GridTimeoutObject timeoutSnd = new DeferredUpdateTimeout(part, primaryId);
+
+ msg.timeoutSender(timeoutSnd);
+
+ ctx.time().addTimeoutObject(timeoutSnd);
+ }
+
+ resMap.put(primaryId, msg);
+ }
+
+ GridLongList futIds = msg.futureIds();
+
+ assert futIds.size() < DEFERRED_UPDATE_RESPONSE_BUFFER_SIZE : futIds.size();
+
+ futIds.add(futId);
+
+ if (futIds.size() >= DEFERRED_UPDATE_RESPONSE_BUFFER_SIZE) {
+ resMap.remove(primaryId);
+
+ sendDeferredUpdateResponse(primaryId, msg);
+ }
+ }
+
+ /**
+ * @param primaryId Primary ID.
+ * @param msg Message.
+ */
+ private void sendDeferredUpdateResponse(UUID primaryId, GridDhtAtomicDeferredUpdateResponse msg) {
+ try {
+ GridTimeoutObject timeoutSnd = msg.timeoutSender();
+
+ if (timeoutSnd != null)
+ ctx.time().removeTimeoutObject(timeoutSnd);
+
+ ctx.io().send(primaryId, msg, ctx.ioPolicy());
+
+ if (msgLog.isDebugEnabled()) {
+ msgLog.debug("Sent deferred DHT update response [futIds=" + msg.futureIds() +
+ ", node=" + primaryId + ']');
+ }
+ }
+ catch (ClusterTopologyCheckedException ignored) {
+ if (msgLog.isDebugEnabled()) {
+ msgLog.debug("Failed to send deferred DHT update response, node left [" +
+ "futIds=" + msg.futureIds() + ", node=" + primaryId + ']');
+ }
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to send deferred DHT update response to remote node [" +
+ "futIds=" + msg.futureIds() + ", node=" + primaryId + ']', e);
+ }
+ }
+
+ /**
+ * @param req Request.
+ * @param nearRes Response to send.
+ */
+ private void sendDhtNearResponse(final GridDhtAtomicAbstractUpdateRequest req, GridDhtAtomicNearResponse nearRes) {
+ try {
+ ClusterNode node = ctx.discovery().node(req.nearNodeId());
+
+ if (node == null)
+ throw new ClusterTopologyCheckedException("Node failed: " + req.nearNodeId());
- // No failed keys and sync mode is not FULL_SYNC, thus sending deferred response.
- sendDeferredUpdateResponse(nodeId, req.futureVersion());
+ if (node.isLocal())
+ processDhtAtomicNearResponse(node.id(), nearRes);
+ else
+ ctx.io().send(node, nearRes, ctx.ioPolicy());
+
+ if (msgLog.isDebugEnabled()) {
+ msgLog.debug("Sent DHT near response [futId=" + req.futureId() +
+ ", nearFutId=" + req.nearFutureId() +
+ ", writeVer=" + req.writeVersion() +
+ ", node=" + req.nearNodeId() + ']');
}
}
catch (ClusterTopologyCheckedException ignored) {
- U.warn(msgLog, "Failed to send DHT atomic update response, node left [futId=" + req.futureVersion() +
- ", node=" + req.nodeId() + ']');
+ if (msgLog.isDebugEnabled()) {
+ msgLog.debug("Failed to send DHT near response, node left [futId=" + req.futureId() +
+ ", nearFutId=" + req.nearFutureId() +
+ ", node=" + req.nearNodeId() + ']');
+ }
}
catch (IgniteCheckedException e) {
- U.error(msgLog, "Failed to send DHT atomic update response [futId=" + req.futureVersion() +
- ", node=" + nodeId + ", res=" + res + ']', e);
+ U.error(msgLog, "Failed to send DHT near response [futId=" + req.futureId() +
+ ", nearFutId=" + req.nearFutureId() +
+ ", node=" + req.nearNodeId() +
+ ", res=" + nearRes + ']', e);
}
}
/**
- * @param nodeId Node ID to send message to.
- * @param ver Version to ack.
+ * @param nodeId Node ID.
+ * @param res Response.
*/
- private void sendDeferredUpdateResponse(UUID nodeId, GridCacheVersion ver) {
- deferredUpdateMsgSnd.sendDeferredAckMessage(nodeId, ver);
+ private void processDhtAtomicNearResponse(UUID nodeId, GridDhtAtomicNearResponse res) {
+ GridNearAtomicAbstractUpdateFuture updateFut =
+ (GridNearAtomicAbstractUpdateFuture)ctx.mvcc().atomicFuture(res.futureId());
+
+ if (updateFut != null) {
+ if (msgLog.isDebugEnabled()) {
+ msgLog.debug("Received DHT atomic near response [futId=" + res.futureId() +
+ ", node=" + nodeId + ']');
+ }
+
+ updateFut.onDhtResponse(nodeId, res);
+ }
+ else {
+ if (msgLog.isDebugEnabled()) {
+ msgLog.debug("Failed to find future for DHT atomic near response [futId=" + res.futureId() +
+ ", node=" + nodeId +
+ ", res=" + res + ']');
+ }
+ }
}
/**
@@ -3355,18 +3386,19 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
*/
@SuppressWarnings("unchecked")
private void processDhtAtomicUpdateResponse(UUID nodeId, GridDhtAtomicUpdateResponse res) {
- GridDhtAtomicAbstractUpdateFuture updateFut = (GridDhtAtomicAbstractUpdateFuture)ctx.mvcc().atomicFuture(res.futureVersion());
+ GridDhtAtomicAbstractUpdateFuture updateFut =
+ (GridDhtAtomicAbstractUpdateFuture)ctx.mvcc().atomicFuture(res.futureId());
if (updateFut != null) {
if (msgLog.isDebugEnabled()) {
- msgLog.debug("Received DHT atomic update response [futId=" + res.futureVersion() +
- ", writeVer=" + updateFut.writeVersion() + ", node=" + nodeId + ']');
+ msgLog.debug("Received DHT atomic update response [futId=" + res.futureId() +
+ ", writeVer=" + updateFut.writeVersion() + ", node=" + nodeId + ']');
}
- updateFut.onResult(nodeId, res);
+ updateFut.onDhtResponse(nodeId, res);
}
else {
- U.warn(msgLog, "Failed to find DHT update future for update response [futId=" + res.futureVersion() +
+ U.warn(msgLog, "Failed to find DHT update future for update response [futId=" + res.futureId() +
", node=" + nodeId + ", res=" + res + ']');
}
}
@@ -3377,19 +3409,25 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
*/
@SuppressWarnings("unchecked")
private void processDhtAtomicDeferredUpdateResponse(UUID nodeId, GridDhtAtomicDeferredUpdateResponse res) {
- for (GridCacheVersion ver : res.futureVersions()) {
- GridDhtAtomicAbstractUpdateFuture updateFut = (GridDhtAtomicAbstractUpdateFuture)ctx.mvcc().atomicFuture(ver);
+ GridLongList futIds = res.futureIds();
+
+ assert futIds != null && futIds.size() > 0 : futIds;
+
+ for (int i = 0; i < futIds.size(); i++) {
+ Long id = futIds.get(i);
+
+ GridDhtAtomicAbstractUpdateFuture updateFut = (GridDhtAtomicAbstractUpdateFuture)ctx.mvcc().atomicFuture(id);
if (updateFut != null) {
if (msgLog.isDebugEnabled()) {
- msgLog.debug("Received DHT atomic deferred update response [futId=" + ver +
+ msgLog.debug("Received DHT atomic deferred update response [futId=" + id +
", writeVer=" + res + ", node=" + nodeId + ']');
}
- updateFut.onResult(nodeId);
+ updateFut.onDeferredResponse(nodeId);
}
else {
- U.warn(msgLog, "Failed to find DHT update future for deferred update response [futId=" + ver +
+ U.warn(msgLog, "Failed to find DHT update future for deferred update response [futId=" + id +
", nodeId=" + nodeId + ", res=" + res + ']');
}
}
@@ -3404,16 +3442,16 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
ctx.io().send(nodeId, res, ctx.ioPolicy());
if (msgLog.isDebugEnabled())
- msgLog.debug("Sent near update response [futId=" + res.futureVersion() + ", node=" + nodeId + ']');
+ msgLog.debug("Sent near update response [futId=" + res.futureId() + ", node=" + nodeId + ']');
}
catch (ClusterTopologyCheckedException ignored) {
if (msgLog.isDebugEnabled()) {
- msgLog.debug("Failed to send near update response [futId=" + res.futureVersion() +
+ msgLog.debug("Failed to send near update response [futId=" + res.futureId() +
", node=" + nodeId + ']');
}
}
catch (IgniteCheckedException e) {
- U.error(msgLog, "Failed to send near update response [futId=" + res.futureVersion() +
+ U.error(msgLog, "Failed to send near update response [futId=" + res.futureId() +
", node=" + nodeId + ", res=" + res + ']', e);
}
}
@@ -3482,9 +3520,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
private GridDhtAtomicAbstractUpdateFuture dhtFut;
/** */
- private boolean readersOnly;
-
- /** */
private GridCacheReturn invokeRes;
/**
@@ -3537,20 +3572,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
private void dhtFuture(@Nullable GridDhtAtomicAbstractUpdateFuture dhtFut) {
this.dhtFut = dhtFut;
}
-
- /**
- * @return {@code True} if only readers (not backups) should be updated.
- */
- private boolean readersOnly() {
- return readersOnly;
- }
-
- /**
- * @param readersOnly {@code True} if only readers (not backups) should be updated.
- */
- private void readersOnly(boolean readersOnly) {
- this.readersOnly = readersOnly;
- }
}
/**
@@ -3569,4 +3590,71 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
return Collections.emptyList();
}
}
+
+ /**
+ *
+ */
+ interface UpdateReplyClosure extends CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> {
+ // No-op.
+ }
+
+ /**
+ *
+ */
+ private class DeferredUpdateTimeout implements GridTimeoutObject, Runnable {
+ /** */
+ private final int part;
+
+ /** */
+ private final UUID primaryId;
+
+ /** */
+ private final IgniteUuid id;
+
+ /** */
+ private final long endTime;
+
+ /**
+ * @param part Partition.
+ * @param primaryId Primary ID.
+ */
+ DeferredUpdateTimeout(int part, UUID primaryId) {
+ this.part = part;
+ this.primaryId = primaryId;
+
+ endTime = U.currentTimeMillis() + DEFERRED_UPDATE_RESPONSE_TIMEOUT;
+
+ id = IgniteUuid.fromUuid(primaryId);
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteUuid timeoutId() {
+ return id;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long endTime() {
+ return endTime;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void run() {
+ Map<UUID, GridDhtAtomicDeferredUpdateResponse> resMap = defRes.get();
+
+ GridDhtAtomicDeferredUpdateResponse msg = resMap.get(primaryId);
+
+ if (msg != null && msg.timeoutSender() == this) {
+ msg.timeoutSender(null);
+
+ resMap.remove(primaryId);
+
+ sendDeferredUpdateResponse(primaryId, msg);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onTimeout() {
+ ctx.kernalContext().getStripedExecutorService().execute(part, this);
+ }
+ }
}