You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/02/03 10:47:05 UTC
[2/8] ignite git commit: IGNITE-2532: WIP. Only refactorings for now.
IGNITE-2532: WIP. Only refactorings for now.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/46652832
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/46652832
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/46652832
Branch: refs/heads/ignite-2523
Commit: 466528329942f009c9e591a16b9453b95830b0cb
Parents: e6acce6
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Wed Feb 3 10:24:01 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Wed Feb 3 10:24:01 2016 +0300
----------------------------------------------------------------------
.../communication/GridIoMessageFactory.java | 6 +
.../processors/cache/GridCacheIoManager.java | 17 +
.../dht/atomic/GridDhtAtomicCache.java | 42 +-
.../dht/atomic/GridDhtAtomicUpdateFuture.java | 12 +-
.../GridNearAbstractAtomicUpdateFuture.java | 248 ++++
.../GridNearAtomicSingleUpdateFuture.java | 1264 ++++++++++++++++++
.../GridNearAtomicSingleUpdateRequest.java | 1044 +++++++++++++++
.../dht/atomic/GridNearAtomicUpdateFuture.java | 186 +--
.../dht/atomic/GridNearAtomicUpdateRequest.java | 3 +-
.../GridNearAtomicUpdateRequestInterface.java | 101 ++
.../distributed/near/GridNearAtomicCache.java | 4 +-
11 files changed, 2725 insertions(+), 202 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/46652832/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 3c7f378..25e07b8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -68,6 +68,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtUnlock
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicDeferredUpdateResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateResponse;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysRequest;
@@ -726,6 +727,11 @@ public class GridIoMessageFactory implements MessageFactory {
break;
+ case -23:
+ msg = new GridNearAtomicSingleUpdateRequest();
+
+ break;
+
// [-3..119] [124] - this
// [120..123] - DR
// [-4..-22] - SQL
http://git-wip-us.apache.org/repos/asf/ignite/blob/46652832/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index b297827..57545af 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -44,6 +44,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrep
import org.apache.ignite.internal.processors.cache.distributed.dht.GridPartitionedSingleGetFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateResponse;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysRequest;
@@ -586,6 +587,22 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
break;
+ case -23: {
+ GridNearAtomicSingleUpdateRequest req = (GridNearAtomicSingleUpdateRequest)msg;
+
+ GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(
+ ctx.cacheId(),
+ nodeId,
+ req.futureVersion(),
+ ctx.deploymentEnabled());
+
+ res.error(req.classError());
+
+ sendResponseOnFailedMessage(nodeId, res, cctx, ctx.ioPolicy());
+ }
+
+ break;
+
default:
throw new IgniteCheckedException("Failed to send response to node. Unsupported direct type [message="
+ msg + "]", msg.classError());
http://git-wip-us.apache.org/repos/asf/ignite/blob/46652832/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index f6f57ee..cdaa061 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -139,7 +139,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
Integer.getInteger(IGNITE_ATOMIC_DEFERRED_ACK_TIMEOUT, 500);
/** Update reply closure. */
- private CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> updateReplyClos;
+ private CI2<GridNearAtomicUpdateRequestInterface, GridNearAtomicUpdateResponse> updateReplyClos;
/** Pending */
private ConcurrentMap<UUID, DeferredResponseBuffer> pendingResponses = new ConcurrentHashMap8<>();
@@ -192,9 +192,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
}
});
- updateReplyClos = new CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse>() {
+ updateReplyClos = new CI2<GridNearAtomicUpdateRequestInterface, GridNearAtomicUpdateResponse>() {
@SuppressWarnings("ThrowableResultOfMethodCallIgnored")
- @Override public void apply(GridNearAtomicUpdateRequest req, GridNearAtomicUpdateResponse res) {
+ @Override public void apply(GridNearAtomicUpdateRequestInterface req, GridNearAtomicUpdateResponse res) {
if (ctx.config().getAtomicWriteOrderMode() == CLOCK) {
assert req.writeSynchronizationMode() != FULL_ASYNC : req;
@@ -256,6 +256,12 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
}
});
+ ctx.io().addHandler(ctx.cacheId(), GridNearAtomicSingleUpdateRequest.class, new CI2<UUID, GridNearAtomicSingleUpdateRequest>() {
+ @Override public void apply(UUID nodeId, GridNearAtomicSingleUpdateRequest req) {
+ processNearAtomicUpdateRequest(nodeId, req);
+ }
+ });
+
ctx.io().addHandler(ctx.cacheId(), GridNearAtomicUpdateResponse.class, new CI2<UUID, GridNearAtomicUpdateResponse>() {
@Override public void apply(UUID nodeId, GridNearAtomicUpdateResponse res) {
processNearAtomicUpdateResponse(nodeId, res);
@@ -1304,8 +1310,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
*/
public void updateAllAsyncInternal(
final UUID nodeId,
- final GridNearAtomicUpdateRequest req,
- final CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb
+ final GridNearAtomicUpdateRequestInterface req,
+ final CI2<GridNearAtomicUpdateRequestInterface, GridNearAtomicUpdateResponse> completionCb
) {
IgniteInternalFuture<Object> forceFut = preldr.request(req.keys(), req.topologyVersion());
@@ -1329,8 +1335,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
*/
public void updateAllAsyncInternal0(
UUID nodeId,
- GridNearAtomicUpdateRequest req,
- CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb
+ GridNearAtomicUpdateRequestInterface req,
+ CI2<GridNearAtomicUpdateRequestInterface, GridNearAtomicUpdateResponse> completionCb
) {
GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(ctx.cacheId(), nodeId, req.futureVersion(),
ctx.deploymentEnabled());
@@ -1552,12 +1558,12 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
private UpdateBatchResult updateWithBatch(
ClusterNode node,
boolean hasNear,
- GridNearAtomicUpdateRequest req,
+ GridNearAtomicUpdateRequestInterface req,
GridNearAtomicUpdateResponse res,
List<GridDhtCacheEntry> locked,
GridCacheVersion ver,
@Nullable GridDhtAtomicUpdateFuture dhtFut,
- CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb,
+ CI2<GridNearAtomicUpdateRequestInterface, GridNearAtomicUpdateResponse> completionCb,
boolean replicate,
String taskName,
@Nullable IgniteCacheExpiryPolicy expiry,
@@ -1968,12 +1974,12 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
private UpdateSingleResult updateSingle(
ClusterNode node,
boolean hasNear,
- GridNearAtomicUpdateRequest req,
+ GridNearAtomicUpdateRequestInterface req,
GridNearAtomicUpdateResponse res,
List<GridDhtCacheEntry> locked,
GridCacheVersion ver,
@Nullable GridDhtAtomicUpdateFuture dhtFut,
- CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb,
+ CI2<GridNearAtomicUpdateRequestInterface, GridNearAtomicUpdateResponse> completionCb,
boolean replicate,
String taskName,
@Nullable IgniteCacheExpiryPolicy expiry,
@@ -2208,8 +2214,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
@Nullable Collection<KeyCacheObject> rmvKeys,
@Nullable Map<KeyCacheObject, EntryProcessor<Object, Object, Object>> entryProcessorMap,
@Nullable GridDhtAtomicUpdateFuture dhtFut,
- CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb,
- final GridNearAtomicUpdateRequest req,
+ CI2<GridNearAtomicUpdateRequestInterface, GridNearAtomicUpdateResponse> completionCb,
+ final GridNearAtomicUpdateRequestInterface req,
final GridNearAtomicUpdateResponse res,
boolean replicate,
UpdateBatchResult batchRes,
@@ -2587,7 +2593,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
* will return false.
* @return {@code True} if filter evaluation succeeded.
*/
- private boolean checkFilter(GridCacheEntryEx entry, GridNearAtomicUpdateRequest req,
+ private boolean checkFilter(GridCacheEntryEx entry, GridNearAtomicUpdateRequestInterface req,
GridNearAtomicUpdateResponse res) {
try {
return ctx.isAllLocked(entry, req.filter());
@@ -2602,7 +2608,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
/**
* @param req Request to remap.
*/
- private void remapToNewPrimary(GridNearAtomicUpdateRequest req) {
+ private void remapToNewPrimary(GridNearAtomicUpdateRequestInterface req) {
assert req.writeSynchronizationMode() == FULL_ASYNC : req;
if (log.isDebugEnabled())
@@ -2681,9 +2687,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
*/
@Nullable private GridDhtAtomicUpdateFuture createDhtFuture(
GridCacheVersion writeVer,
- GridNearAtomicUpdateRequest updateReq,
+ GridNearAtomicUpdateRequestInterface updateReq,
GridNearAtomicUpdateResponse updateRes,
- CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb,
+ CI2<GridNearAtomicUpdateRequestInterface, GridNearAtomicUpdateResponse> completionCb,
boolean force
) {
if (!force) {
@@ -2714,7 +2720,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
* @param nodeId Sender node ID.
* @param req Near atomic update request.
*/
- private void processNearAtomicUpdateRequest(UUID nodeId, GridNearAtomicUpdateRequest req) {
+ private void processNearAtomicUpdateRequest(UUID nodeId, GridNearAtomicUpdateRequestInterface req) {
if (log.isDebugEnabled())
log.debug("Processing near atomic update request [nodeId=" + nodeId + ", req=" + req + ']');
http://git-wip-us.apache.org/repos/asf/ignite/blob/46652832/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
index e31af19..6891d3b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
@@ -53,8 +53,7 @@ import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
/**
* DHT atomic cache backup update future.
*/
-public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
- implements GridCacheAtomicFuture<Void> {
+public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> implements GridCacheAtomicFuture<Void> {
/** */
private static final long serialVersionUID = 0L;
@@ -78,7 +77,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
/** Completion callback. */
@GridToStringExclude
- private final CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb;
+ private final CI2<GridNearAtomicUpdateRequestInterface, GridNearAtomicUpdateResponse> completionCb;
/** Mappings. */
@GridToStringInclude
@@ -88,7 +87,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
private Map<KeyCacheObject, GridDhtCacheEntry> nearReadersEntries;
/** Update request. */
- private final GridNearAtomicUpdateRequest updateReq;
+ private final GridNearAtomicUpdateRequestInterface updateReq;
/** Update response. */
private final GridNearAtomicUpdateResponse updateRes;
@@ -111,10 +110,9 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
*/
public GridDhtAtomicUpdateFuture(
GridCacheContext cctx,
- CI2<GridNearAtomicUpdateRequest,
- GridNearAtomicUpdateResponse> completionCb,
+ CI2<GridNearAtomicUpdateRequestInterface, GridNearAtomicUpdateResponse> completionCb,
GridCacheVersion writeVer,
- GridNearAtomicUpdateRequest updateReq,
+ GridNearAtomicUpdateRequestInterface updateReq,
GridNearAtomicUpdateResponse updateRes
) {
this.cctx = cctx;
http://git-wip-us.apache.org/repos/asf/ignite/blob/46652832/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAbstractAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAbstractAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAbstractAtomicUpdateFuture.java
new file mode 100644
index 0000000..60e0c5f
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAbstractAtomicUpdateFuture.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
+
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
+import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager;
+import org.apache.ignite.internal.processors.cache.GridCacheAtomicFuture;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheMvccManager;
+import org.apache.ignite.internal.processors.cache.GridCacheOperation;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearAtomicCache;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteUuid;
+import org.jetbrains.annotations.Nullable;
+
+import javax.cache.expiry.ExpiryPolicy;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.CLOCK;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
+
+/**
+ * Base class for near atomic update futures.
+ */
+public abstract class GridNearAbstractAtomicUpdateFuture extends GridFutureAdapter<Object>
+ implements GridCacheAtomicFuture<Object> {
+ /** Logger reference. */
+ protected static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
+
+ /** Logger. */
+ protected static IgniteLogger log;
+
+ /** Optional arguments for entry processor. */
+ protected Object[] invokeArgs;
+
+ /** Cache context. */
+ protected final GridCacheContext cctx;
+
+ /** Cache. */
+ protected final GridDhtAtomicCache cache;
+
+ /** Update operation. */
+ protected final GridCacheOperation op;
+
+ /** Return value require flag. */
+ protected final boolean retval;
+
+ /** Expiry policy. */
+ protected final ExpiryPolicy expiryPlc;
+
+ /** Optional filter. */
+ protected final CacheEntryPredicate[] filter;
+
+ /** Write synchronization mode. */
+ protected final CacheWriteSynchronizationMode syncMode;
+
+ /** Raw return value flag. */
+ protected final boolean rawRetval;
+
+ /** Fast map flag. */
+ protected final boolean fastMap;
+
+ /** Near cache flag. */
+ protected final boolean nearEnabled;
+
+ /** Subject ID. */
+ protected final UUID subjId;
+
+ /** Task name hash. */
+ protected final int taskNameHash;
+
+ /** Skip store flag. */
+ protected final boolean skipStore;
+
+ /** */
+ protected final boolean keepBinary;
+
+ /** Wait for topology future flag. */
+ protected final boolean waitTopFut;
+
+ /** Topology locked flag. Set if atomic update is performed inside a TX or explicit lock. */
+ protected boolean topLocked;
+
+ /** Remap count. */
+ protected int remapCnt;
+
+ /**
+ * @param cctx Cache context.
+ * @param cache Cache instance.
+ * @param syncMode Write synchronization mode.
+ * @param op Update operation.
+ * @param invokeArgs Optional arguments for entry processor.
+ * @param retval Return value require flag.
+ * @param rawRetval {@code True} if should return {@code GridCacheReturn} as future result.
+ * @param expiryPlc Expiry policy explicitly specified for cache operation.
+ * @param filter Entry filter.
+ * @param subjId Subject ID.
+ * @param taskNameHash Task name hash code.
+ * @param skipStore Skip store flag.
+ * @param keepBinary Keep binary flag.
+ * @param remapCnt Maximum number of retries.
+ * @param waitTopFut If {@code false} does not wait for affinity change future.
+ */
+ public GridNearAbstractAtomicUpdateFuture(
+ GridCacheContext cctx,
+ GridDhtAtomicCache cache,
+ CacheWriteSynchronizationMode syncMode,
+ GridCacheOperation op,
+ @Nullable Object[] invokeArgs,
+ final boolean retval,
+ final boolean rawRetval,
+ @Nullable ExpiryPolicy expiryPlc,
+ final CacheEntryPredicate[] filter,
+ UUID subjId,
+ int taskNameHash,
+ boolean skipStore,
+ boolean keepBinary,
+ int remapCnt,
+ boolean waitTopFut
+ ) {
+ this.rawRetval = rawRetval;
+
+ assert subjId != null;
+
+ this.cctx = cctx;
+ this.cache = cache;
+ this.syncMode = syncMode;
+ this.op = op;
+ this.invokeArgs = invokeArgs;
+ this.retval = retval;
+ this.expiryPlc = expiryPlc;
+ this.filter = filter;
+ this.subjId = subjId;
+ this.taskNameHash = taskNameHash;
+ this.skipStore = skipStore;
+ this.keepBinary = keepBinary;
+ this.waitTopFut = waitTopFut;
+
+ if (log == null)
+ log = U.logger(cctx.kernalContext(), logRef, GridFutureAdapter.class);
+
+ fastMap = F.isEmpty(filter) && op != TRANSFORM && cctx.config().getWriteSynchronizationMode() == FULL_SYNC &&
+ cctx.config().getAtomicWriteOrderMode() == CLOCK &&
+ !(cctx.writeThrough() && cctx.config().getInterceptor() != null);
+
+ nearEnabled = CU.isNearEnabled(cctx);
+
+ if (!waitTopFut)
+ remapCnt = 1;
+
+ this.remapCnt = remapCnt;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteUuid futureId() {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean trackable() {
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void markNotTrackable() {
+ // No-op.
+ }
+
+ /**
+ * @return {@code True} if this future should block partition map exchange.
+ */
+ protected boolean waitForPartitionExchange() {
+ // Wait fast-map near atomic update futures in CLOCK mode.
+ return fastMap;
+ }
+
+ /**
+ * Updates near cache.
+ *
+ * @param req Update request.
+ * @param res Update response.
+ */
+ protected void updateNear(GridNearAtomicUpdateRequestInterface req, GridNearAtomicUpdateResponse res) {
+ assert nearEnabled;
+
+ if (res.remapKeys() != null || !req.hasPrimary())
+ return;
+
+ GridNearAtomicCache near = (GridNearAtomicCache)cctx.dht().near();
+
+ near.processNearAtomicUpdateResponse(req, res);
+ }
+
+ /**
+ * @return {@code True} future is stored by {@link GridCacheMvccManager#addAtomicFuture}.
+ */
+ protected boolean storeFuture() {
+ return cctx.config().getAtomicWriteOrderMode() == CLOCK || syncMode != FULL_ASYNC;
+ }
+
+ /**
+ * Maps key to nodes. If filters are absent and operation is not TRANSFORM, then we can assign version on near
+ * node and send updates in parallel to all participating nodes.
+ *
+ * @param key Key to map.
+ * @param topVer Topology version to map.
+ * @param fastMap Flag indicating whether mapping is performed for fast-circuit update.
+ * @return Collection of nodes to which key is mapped.
+ */
+ protected Collection<ClusterNode> mapKey(KeyCacheObject key, AffinityTopologyVersion topVer, boolean fastMap
+ ) {
+ GridCacheAffinityManager affMgr = cctx.affinity();
+
+ // If we can send updates in parallel - do it.
+ return fastMap ?
+ cctx.topology().nodes(affMgr.partition(key), topVer) :
+ Collections.singletonList(affMgr.primary(key, topVer));
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/46652832/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
new file mode 100644
index 0000000..d633e47
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
@@ -0,0 +1,1264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
+import org.apache.ignite.internal.processors.cache.CachePartialUpdateCheckedException;
+import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager;
+import org.apache.ignite.internal.processors.cache.GridCacheAtomicFuture;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheMvccManager;
+import org.apache.ignite.internal.processors.cache.GridCacheOperation;
+import org.apache.ignite.internal.processors.cache.GridCacheReturn;
+import org.apache.ignite.internal.processors.cache.GridCacheTryPutFailedException;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearAtomicCache;
+import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.future.GridFinishedFuture;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.CI1;
+import org.apache.ignite.internal.util.typedef.CI2;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteUuid;
+import org.jetbrains.annotations.Nullable;
+
+import javax.cache.expiry.ExpiryPolicy;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.CLOCK;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC;
+import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
+
+/**
+ * DHT atomic cache single near update future.
+ */
+public class GridNearAtomicSingleUpdateFuture extends GridFutureAdapter<Object>
+ implements GridCacheAtomicFuture<Object>{
+ /** Logger reference. */
+ private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
+
+ /** Logger. */
+ protected static IgniteLogger log;
+
+ /** Cache context. */
+ private final GridCacheContext cctx;
+
+ /** Cache. */
+ private GridDhtAtomicCache cache;
+
+ /** Update operation. */
+ private final GridCacheOperation op;
+
+ /** Keys */
+ private Collection<?> keys;
+
+ /** Values. */
+ @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"})
+ private Collection<?> vals;
+
+ /** Optional arguments for entry processor. */
+ private Object[] invokeArgs;
+
+ /** Conflict put values. */
+ @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"})
+ private Collection<GridCacheDrInfo> conflictPutVals;
+
+ /** Conflict remove values. */
+ @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"})
+ private Collection<GridCacheVersion> conflictRmvVals;
+
+ /** Return value require flag. */
+ private final boolean retval;
+
+ /** Expiry policy. */
+ private final ExpiryPolicy expiryPlc;
+
+ /** Optional filter. */
+ private final CacheEntryPredicate[] filter;
+
+ /** Write synchronization mode. */
+ private final CacheWriteSynchronizationMode syncMode;
+
+ /** Raw return value flag. */
+ private final boolean rawRetval;
+
+ /** Fast map flag. */
+ private final boolean fastMap;
+
+ /** Near cache flag. */
+ private final boolean nearEnabled;
+
+ /** Subject ID. */
+ private final UUID subjId;
+
+ /** Task name hash. */
+ private final int taskNameHash;
+
+ /** Topology locked flag. Set if atomic update is performed inside a TX or explicit lock. */
+ private boolean topLocked;
+
+ /** Skip store flag. */
+ private final boolean skipStore;
+
+ /** */
+ private final boolean keepBinary;
+
+ /** Wait for topology future flag. */
+ private final boolean waitTopFut;
+
+ /** Remap count. */
+ private int remapCnt;
+
+ /** State. */
+ private final UpdateState state;
+
+ /**
+ * @param cctx Cache context.
+ * @param cache Cache instance.
+ * @param syncMode Write synchronization mode.
+ * @param op Update operation.
+ * @param keys Keys to update.
+ * @param vals Values or transform closure.
+ * @param invokeArgs Optional arguments for entry processor.
+ * @param conflictPutVals Conflict put values (optional).
+ * @param conflictRmvVals Conflict remove values (optional).
+ * @param retval Return value require flag.
+ * @param rawRetval {@code True} if should return {@code GridCacheReturn} as future result.
+ * @param expiryPlc Expiry policy explicitly specified for cache operation.
+ * @param filter Entry filter.
+ * @param subjId Subject ID.
+ * @param taskNameHash Task name hash code.
+ * @param skipStore Skip store flag.
+ * @param keepBinary Keep binary flag.
+ * @param remapCnt Maximum number of retries.
+ * @param waitTopFut If {@code false} does not wait for affinity change future.
+ */
+ public GridNearAtomicSingleUpdateFuture(
+ GridCacheContext cctx,
+ GridDhtAtomicCache cache,
+ CacheWriteSynchronizationMode syncMode,
+ GridCacheOperation op,
+ Collection<?> keys,
+ @Nullable Collection<?> vals,
+ @Nullable Object[] invokeArgs,
+ @Nullable Collection<GridCacheDrInfo> conflictPutVals,
+ @Nullable Collection<GridCacheVersion> conflictRmvVals,
+ final boolean retval,
+ final boolean rawRetval,
+ @Nullable ExpiryPolicy expiryPlc,
+ final CacheEntryPredicate[] filter,
+ UUID subjId,
+ int taskNameHash,
+ boolean skipStore,
+ boolean keepBinary,
+ int remapCnt,
+ boolean waitTopFut
+ ) {
+ this.rawRetval = rawRetval;
+
+ assert vals == null || vals.size() == keys.size();
+ assert conflictPutVals == null || conflictPutVals.size() == keys.size();
+ assert conflictRmvVals == null || conflictRmvVals.size() == keys.size();
+ assert subjId != null;
+
+ this.cctx = cctx;
+ this.cache = cache;
+ this.syncMode = syncMode;
+ this.op = op;
+ this.keys = keys;
+ this.vals = vals;
+ this.invokeArgs = invokeArgs;
+ this.conflictPutVals = conflictPutVals;
+ this.conflictRmvVals = conflictRmvVals;
+ this.retval = retval;
+ this.expiryPlc = expiryPlc;
+ this.filter = filter;
+ this.subjId = subjId;
+ this.taskNameHash = taskNameHash;
+ this.skipStore = skipStore;
+ this.keepBinary = keepBinary;
+ this.waitTopFut = waitTopFut;
+
+ if (log == null)
+ log = U.logger(cctx.kernalContext(), logRef, GridFutureAdapter.class);
+
+ fastMap = F.isEmpty(filter) && op != TRANSFORM && cctx.config().getWriteSynchronizationMode() == FULL_SYNC &&
+ cctx.config().getAtomicWriteOrderMode() == CLOCK &&
+ !(cctx.writeThrough() && cctx.config().getInterceptor() != null);
+
+ nearEnabled = CU.isNearEnabled(cctx);
+
+ if (!waitTopFut)
+ remapCnt = 1;
+
+ this.remapCnt = remapCnt;
+
+ state = new UpdateState();
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteUuid futureId() {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridCacheVersion version() {
+ return state.futureVersion();
+ }
+
+ /**
+ * @return {@code True} if this future should block partition map exchange.
+ */
+ private boolean waitForPartitionExchange() {
+ // Wait fast-map near atomic update futures in CLOCK mode.
+ return fastMap;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<?> keys() {
+ return keys;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean onNodeLeft(UUID nodeId) {
+ state.onNodeLeft(nodeId);
+
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean trackable() {
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void markNotTrackable() {
+ // No-op.
+ }
+
+ /**
+ * Performs future mapping.
+ */
+ public void map() {
+ AffinityTopologyVersion topVer = cctx.shared().lockedTopologyVersion(null);
+
+ if (topVer == null)
+ mapOnTopology();
+ else {
+ topLocked = true;
+
+ // Cannot remap.
+ remapCnt = 1;
+
+ state.map(topVer, null);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteInternalFuture<Void> completeFuture(AffinityTopologyVersion topVer) {
+ if (waitForPartitionExchange()) {
+ GridFutureAdapter<Void> fut = state.completeFuture(topVer);
+
+ if (fut != null && isDone()) {
+ fut.onDone();
+
+ return null;
+ }
+
+ return fut;
+ }
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("ConstantConditions")
+ @Override public boolean onDone(@Nullable Object res, @Nullable Throwable err) {
+ assert res == null || res instanceof GridCacheReturn;
+
+ GridCacheReturn ret = (GridCacheReturn)res;
+
+ Object retval =
+ res == null ? null : rawRetval ? ret : (this.retval || op == TRANSFORM) ?
+ cctx.unwrapBinaryIfNeeded(ret.value(), keepBinary) : ret.success();
+
+ if (op == TRANSFORM && retval == null)
+ retval = Collections.emptyMap();
+
+ if (super.onDone(retval, err)) {
+ GridCacheVersion futVer = state.onFutureDone();
+
+ if (futVer != null)
+ cctx.mvcc().removeAtomicFuture(futVer);
+
+ return true;
+ }
+
+ return false;
+ }
+
+ /**
+ * Response callback.
+ *
+ * @param nodeId Node ID.
+ * @param res Update response.
+ */
+ public void onResult(UUID nodeId, GridNearAtomicUpdateResponse res) {
+ state.onResult(nodeId, res, false);
+ }
+
+ /**
+ * Updates near cache.
+ *
+ * @param req Update request.
+ * @param res Update response.
+ */
+ private void updateNear(GridNearAtomicUpdateRequest req, GridNearAtomicUpdateResponse res) {
+ assert nearEnabled;
+
+ if (res.remapKeys() != null || !req.hasPrimary())
+ return;
+
+ GridNearAtomicCache near = (GridNearAtomicCache)cctx.dht().near();
+
+ near.processNearAtomicUpdateResponse(req, res);
+ }
+
+ /**
+ * Maps future on ready topology.
+ */
+ private void mapOnTopology() {
+ cache.topology().readLock();
+
+ AffinityTopologyVersion topVer = null;
+
+ try {
+ if (cache.topology().stopping()) {
+ onDone(new IgniteCheckedException("Failed to perform cache operation (cache is stopped): " +
+ cache.name()));
+
+ return;
+ }
+
+ GridDhtTopologyFuture fut = cache.topology().topologyVersionFuture();
+
+ if (fut.isDone()) {
+ Throwable err = fut.validateCache(cctx);
+
+ if (err != null) {
+ onDone(err);
+
+ return;
+ }
+
+ topVer = fut.topologyVersion();
+ }
+ else {
+ if (waitTopFut) {
+ assert !topLocked : this;
+
+ fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
+ @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
+ cctx.kernalContext().closure().runLocalSafe(new Runnable() {
+ @Override public void run() {
+ mapOnTopology();
+ }
+ });
+ }
+ });
+ }
+ else
+ onDone(new GridCacheTryPutFailedException());
+
+ return;
+ }
+ }
+ finally {
+ cache.topology().readUnlock();
+ }
+
+ state.map(topVer, null);
+ }
+
+ /**
+ * @return {@code True} future is stored by {@link GridCacheMvccManager#addAtomicFuture}.
+ */
+ private boolean storeFuture() {
+ return cctx.config().getAtomicWriteOrderMode() == CLOCK || syncMode != FULL_ASYNC;
+ }
+
+ /**
+ * Maps key to nodes. If filters are absent and operation is not TRANSFORM, then we can assign version on near
+ * node and send updates in parallel to all participating nodes.
+ *
+ * @param key Key to map.
+ * @param topVer Topology version to map.
+ * @param fastMap Flag indicating whether mapping is performed for fast-circuit update.
+ * @return Collection of nodes to which key is mapped.
+ */
+ private Collection<ClusterNode> mapKey(
+ KeyCacheObject key,
+ AffinityTopologyVersion topVer,
+ boolean fastMap
+ ) {
+ GridCacheAffinityManager affMgr = cctx.affinity();
+
+ // If we can send updates in parallel - do it.
+ return fastMap ?
+ cctx.topology().nodes(affMgr.partition(key), topVer) :
+ Collections.singletonList(affMgr.primary(key, topVer));
+ }
+
+ /**
+ * Maps future to single node.
+ *
+ * @param nodeId Node ID.
+ * @param req Request.
+ */
+ private void mapSingle(UUID nodeId, GridNearAtomicUpdateRequest req) {
+ if (cctx.localNodeId().equals(nodeId)) {
+ cache.updateAllAsyncInternal(nodeId, req,
+ new CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse>() {
+ @Override public void apply(GridNearAtomicUpdateRequest req, GridNearAtomicUpdateResponse res) {
+ onResult(res.nodeId(), res);
+ }
+ });
+ }
+ else {
+ try {
+ if (log.isDebugEnabled())
+ log.debug("Sending near atomic update request [nodeId=" + req.nodeId() + ", req=" + req + ']');
+
+ cctx.io().send(req.nodeId(), req, cctx.ioPolicy());
+
+ if (syncMode == FULL_ASYNC)
+ onDone(new GridCacheReturn(cctx, true, true, null, true));
+ }
+ catch (IgniteCheckedException e) {
+ state.onSendError(req, e);
+ }
+ }
+ }
+
+ /**
+ * Sends messages to remote nodes and updates local cache.
+ *
+ * @param mappings Mappings to send.
+ */
+ private void doUpdate(Map<UUID, GridNearAtomicUpdateRequest> mappings) {
+ UUID locNodeId = cctx.localNodeId();
+
+ GridNearAtomicUpdateRequest locUpdate = null;
+
+ // Send messages to remote nodes first, then run local update.
+ for (GridNearAtomicUpdateRequest req : mappings.values()) {
+ if (locNodeId.equals(req.nodeId())) {
+ assert locUpdate == null : "Cannot have more than one local mapping [locUpdate=" + locUpdate +
+ ", req=" + req + ']';
+
+ locUpdate = req;
+ }
+ else {
+ try {
+ if (log.isDebugEnabled())
+ log.debug("Sending near atomic update request [nodeId=" + req.nodeId() + ", req=" + req + ']');
+
+ cctx.io().send(req.nodeId(), req, cctx.ioPolicy());
+ }
+ catch (IgniteCheckedException e) {
+ state.onSendError(req, e);
+ }
+ }
+ }
+
+ if (locUpdate != null) {
+ cache.updateAllAsyncInternal(cctx.localNodeId(), locUpdate,
+ new CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse>() {
+ @Override public void apply(GridNearAtomicUpdateRequest req, GridNearAtomicUpdateResponse res) {
+ onResult(res.nodeId(), res);
+ }
+ });
+ }
+
+ if (syncMode == FULL_ASYNC)
+ onDone(new GridCacheReturn(cctx, true, true, null, true));
+ }
+
+ /**
+ *
+ */
+ private class UpdateState {
+ /** Current topology version. */
+ private AffinityTopologyVersion topVer = AffinityTopologyVersion.ZERO;
+
+ /** */
+ private GridCacheVersion updVer;
+
+ /** Topology version when got mapping error. */
+ private AffinityTopologyVersion mapErrTopVer;
+
+ /** Mappings if operations is mapped to more than one node. */
+ @GridToStringInclude
+ private Map<UUID, GridNearAtomicUpdateRequest> mappings;
+
+ /** */
+ private int resCnt;
+
+ /** Error. */
+ private CachePartialUpdateCheckedException err;
+
+ /** Future ID. */
+ private GridCacheVersion futVer;
+
+ /** Completion future for a particular topology version. */
+ private GridFutureAdapter<Void> topCompleteFut;
+
+ /** Keys to remap. */
+ private Collection<KeyCacheObject> remapKeys;
+
+ /** Not null is operation is mapped to single node. */
+ private GridNearAtomicUpdateRequest singleReq;
+
+ /** Operation result. */
+ private GridCacheReturn opRes;
+
+ /**
+ * @return Future version.
+ */
+ @Nullable synchronized GridCacheVersion futureVersion() {
+ return futVer;
+ }
+
+ /**
+ * @param nodeId Left node ID.
+ */
+ void onNodeLeft(UUID nodeId) {
+ GridNearAtomicUpdateResponse res = null;
+
+ synchronized (this) {
+ GridNearAtomicUpdateRequest req;
+
+ if (singleReq != null)
+ req = singleReq.nodeId().equals(nodeId) ? singleReq : null;
+ else
+ req = mappings != null ? mappings.get(nodeId) : null;
+
+ if (req != null && req.response() == null) {
+ res = new GridNearAtomicUpdateResponse(cctx.cacheId(),
+ nodeId,
+ req.futureVersion(),
+ cctx.deploymentEnabled());
+
+ ClusterTopologyCheckedException e = new ClusterTopologyCheckedException("Primary node left grid " +
+ "before response is received: " + nodeId);
+
+ e.retryReadyFuture(cctx.shared().nextAffinityReadyFuture(req.topologyVersion()));
+
+ res.addFailedKeys(req.keys(), e);
+ }
+ }
+
+ if (res != null)
+ onResult(nodeId, res, true);
+ }
+
+ /**
+ * @param nodeId Node ID.
+ * @param res Response.
+ * @param nodeErr {@code True} if response was created on node failure.
+ */
+ @SuppressWarnings("unchecked")
+ void onResult(UUID nodeId, GridNearAtomicUpdateResponse res, boolean nodeErr) {
+ GridNearAtomicUpdateRequest req;
+
+ AffinityTopologyVersion remapTopVer = null;
+
+ GridCacheReturn opRes0 = null;
+ CachePartialUpdateCheckedException err0 = null;
+
+ boolean rcvAll;
+
+ GridFutureAdapter<?> fut0 = null;
+
+ synchronized (this) {
+ if (!res.futureVersion().equals(futVer))
+ return;
+
+ if (singleReq != null) {
+ if (!singleReq.nodeId().equals(nodeId))
+ return;
+
+ req = singleReq;
+
+ singleReq = null;
+
+ rcvAll = true;
+ }
+ else {
+ req = mappings != null ? mappings.get(nodeId) : null;
+
+ if (req != null && req.onResponse(res)) {
+ resCnt++;
+
+ rcvAll = mappings.size() == resCnt;
+ }
+ else
+ return;
+ }
+
+ assert req != null && req.topologyVersion().equals(topVer) : req;
+
+ if (res.remapKeys() != null) {
+ assert !fastMap || cctx.kernalContext().clientNode();
+
+ if (remapKeys == null)
+ remapKeys = U.newHashSet(res.remapKeys().size());
+
+ remapKeys.addAll(res.remapKeys());
+
+ if (mapErrTopVer == null || mapErrTopVer.compareTo(req.topologyVersion()) < 0)
+ mapErrTopVer = req.topologyVersion();
+ }
+ else if (res.error() != null) {
+ if (res.failedKeys() != null)
+ addFailedKeys(res.failedKeys(), req.topologyVersion(), res.error());
+ }
+ else {
+ if (!req.fastMap() || req.hasPrimary()) {
+ GridCacheReturn ret = res.returnValue();
+
+ if (op == TRANSFORM) {
+ if (ret != null)
+ addInvokeResults(ret);
+ }
+ else
+ opRes = ret;
+ }
+ }
+
+ if (rcvAll) {
+ if (remapKeys != null) {
+ assert mapErrTopVer != null;
+
+ remapTopVer = new AffinityTopologyVersion(mapErrTopVer.topologyVersion() + 1);
+ }
+ else {
+ if (err != null &&
+ X.hasCause(err, CachePartialUpdateCheckedException.class) &&
+ X.hasCause(err, ClusterTopologyCheckedException.class) &&
+ storeFuture() &&
+ --remapCnt > 0) {
+ ClusterTopologyCheckedException topErr =
+ X.cause(err, ClusterTopologyCheckedException.class);
+
+ if (!(topErr instanceof ClusterTopologyServerNotFoundException)) {
+ CachePartialUpdateCheckedException cause =
+ X.cause(err, CachePartialUpdateCheckedException.class);
+
+ assert cause != null && cause.topologyVersion() != null : err;
+
+ remapTopVer =
+ new AffinityTopologyVersion(cause.topologyVersion().topologyVersion() + 1);
+
+ err = null;
+
+ Collection<Object> failedKeys = cause.failedKeys();
+
+ remapKeys = new ArrayList<>(failedKeys.size());
+
+ for (Object key : failedKeys)
+ remapKeys.add(cctx.toCacheKeyObject(key));
+
+ updVer = null;
+ }
+ }
+ }
+
+ if (remapTopVer == null) {
+ err0 = err;
+ opRes0 = opRes;
+ }
+ else {
+ fut0 = topCompleteFut;
+
+ topCompleteFut = null;
+
+ cctx.mvcc().removeAtomicFuture(futVer);
+
+ futVer = null;
+ topVer = AffinityTopologyVersion.ZERO;
+ }
+ }
+ }
+
+ if (res.error() != null && res.failedKeys() == null) {
+ onDone(res.error());
+
+ return;
+ }
+
+ if (rcvAll && nearEnabled) {
+ if (mappings != null) {
+ for (GridNearAtomicUpdateRequest req0 : mappings.values()) {
+ GridNearAtomicUpdateResponse res0 = req0.response();
+
+ assert res0 != null : req0;
+
+ updateNear(req0, res0);
+ }
+ }
+ else if (!nodeErr)
+ updateNear(req, res);
+ }
+
+ if (remapTopVer != null) {
+ if (fut0 != null)
+ fut0.onDone();
+
+ if (!waitTopFut) {
+ onDone(new GridCacheTryPutFailedException());
+
+ return;
+ }
+
+ if (topLocked) {
+ assert !F.isEmpty(remapKeys) : remapKeys;
+
+ CachePartialUpdateCheckedException e =
+ new CachePartialUpdateCheckedException("Failed to update keys (retry update if possible).");
+
+ ClusterTopologyCheckedException cause = new ClusterTopologyCheckedException(
+ "Failed to update keys, topology changed while execute atomic update inside transaction.");
+
+ cause.retryReadyFuture(cctx.affinity().affinityReadyFuture(remapTopVer));
+
+ e.add(remapKeys, cause);
+
+ onDone(e);
+
+ return;
+ }
+
+ IgniteInternalFuture<AffinityTopologyVersion> fut =
+ cctx.shared().exchange().affinityReadyFuture(remapTopVer);
+
+ if (fut == null)
+ fut = new GridFinishedFuture<>(remapTopVer);
+
+ fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
+ @Override public void apply(final IgniteInternalFuture<AffinityTopologyVersion> fut) {
+ cctx.kernalContext().closure().runLocalSafe(new Runnable() {
+ @Override public void run() {
+ try {
+ AffinityTopologyVersion topVer = fut.get();
+
+ map(topVer, remapKeys);
+ }
+ catch (IgniteCheckedException e) {
+ onDone(e);
+ }
+ }
+ });
+ }
+ });
+
+ return;
+ }
+
+ if (rcvAll)
+ onDone(opRes0, err0);
+ }
+
+ /**
+ * @param req Request.
+ * @param e Error.
+ */
+ void onSendError(GridNearAtomicUpdateRequest req, IgniteCheckedException e) {
+ synchronized (this) {
+ GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(cctx.cacheId(),
+ req.nodeId(),
+ req.futureVersion(),
+ cctx.deploymentEnabled());
+
+ res.addFailedKeys(req.keys(), e);
+
+ onResult(req.nodeId(), res, true);
+ }
+ }
+
+ /**
+ * @param topVer Topology version.
+ * @param remapKeys Keys to remap.
+ */
+ void map(AffinityTopologyVersion topVer, @Nullable Collection<KeyCacheObject> remapKeys) {
+ Collection<ClusterNode> topNodes = CU.affinityNodes(cctx, topVer);
+
+ if (F.isEmpty(topNodes)) {
+ onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all partition nodes " +
+ "left the grid)."));
+
+ return;
+ }
+
+ Exception err = null;
+ GridNearAtomicUpdateRequest singleReq0 = null;
+ Map<UUID, GridNearAtomicUpdateRequest> mappings0 = null;
+
+ int size = keys.size();
+
+ GridCacheVersion futVer = cctx.versions().next(topVer);
+
+ GridCacheVersion updVer;
+
+ // Assign version on near node in CLOCK ordering mode even if fastMap is false.
+ if (cctx.config().getAtomicWriteOrderMode() == CLOCK) {
+ updVer = this.updVer;
+
+ if (updVer == null) {
+ updVer = cctx.versions().next(topVer);
+
+ if (log.isDebugEnabled())
+ log.debug("Assigned fast-map version for update on near node: " + updVer);
+ }
+ }
+ else
+ updVer = null;
+
+ try {
+ if (size == 1 && !fastMap) {
+ assert remapKeys == null || remapKeys.size() == 1;
+
+ singleReq0 = mapSingleUpdate(topVer, futVer, updVer);
+ }
+ else {
+ Map<UUID, GridNearAtomicUpdateRequest> pendingMappings = mapUpdate(topNodes,
+ topVer,
+ futVer,
+ updVer,
+ remapKeys);
+
+ if (pendingMappings.size() == 1)
+ singleReq0 = F.firstValue(pendingMappings);
+ else {
+ if (syncMode == PRIMARY_SYNC) {
+ mappings0 = U.newHashMap(pendingMappings.size());
+
+ for (GridNearAtomicUpdateRequest req : pendingMappings.values()) {
+ if (req.hasPrimary())
+ mappings0.put(req.nodeId(), req);
+ }
+ }
+ else
+ mappings0 = pendingMappings;
+
+ assert !mappings0.isEmpty() || size == 0 : GridNearAtomicSingleUpdateFuture.this;
+ }
+ }
+
+ synchronized (this) {
+ assert this.futVer == null : this;
+ assert this.topVer == AffinityTopologyVersion.ZERO : this;
+
+ this.topVer = topVer;
+ this.updVer = updVer;
+ this.futVer = futVer;
+
+ resCnt = 0;
+
+ singleReq = singleReq0;
+ mappings = mappings0;
+
+ this.remapKeys = null;
+ }
+ }
+ catch (Exception e) {
+ err = e;
+ }
+
+ if (err != null) {
+ onDone(err);
+
+ return;
+ }
+
+ if (storeFuture()) {
+ if (!cctx.mvcc().addAtomicFuture(futVer, GridNearAtomicSingleUpdateFuture.this)) {
+ assert isDone() : GridNearAtomicSingleUpdateFuture.this;
+
+ return;
+ }
+ }
+
+ // Optimize mapping for single key.
+ if (singleReq0 != null)
+ mapSingle(singleReq0.nodeId(), singleReq0);
+ else {
+ assert mappings0 != null;
+
+ if (size == 0)
+ onDone(new GridCacheReturn(cctx, true, true, null, true));
+ else
+ doUpdate(mappings0);
+ }
+ }
+
+ /**
+ * @param topVer Topology version.
+ * @return Future.
+ */
+ @Nullable synchronized GridFutureAdapter<Void> completeFuture(AffinityTopologyVersion topVer) {
+ if (this.topVer == AffinityTopologyVersion.ZERO)
+ return null;
+
+ if (this.topVer.compareTo(topVer) < 0) {
+ if (topCompleteFut == null)
+ topCompleteFut = new GridFutureAdapter<>();
+
+ return topCompleteFut;
+ }
+
+ return null;
+ }
+
+ /**
+ * @return Future version.
+ */
+ GridCacheVersion onFutureDone() {
+ GridCacheVersion ver0;
+
+ GridFutureAdapter<Void> fut0;
+
+ synchronized (this) {
+ fut0 = topCompleteFut;
+
+ topCompleteFut = null;
+
+ ver0 = futVer;
+
+ futVer = null;
+ }
+
+ if (fut0 != null)
+ fut0.onDone();
+
+ return ver0;
+ }
+
+ /**
+ * @param topNodes Cache nodes.
+ * @param topVer Topology version.
+ * @param futVer Future version.
+ * @param updVer Update version.
+ * @param remapKeys Keys to remap.
+ * @return Mapping.
+ * @throws Exception If failed.
+ */
+ private Map<UUID, GridNearAtomicUpdateRequest> mapUpdate(Collection<ClusterNode> topNodes,
+ AffinityTopologyVersion topVer,
+ GridCacheVersion futVer,
+ @Nullable GridCacheVersion updVer,
+ @Nullable Collection<KeyCacheObject> remapKeys) throws Exception {
+ Iterator<?> it = null;
+
+ if (vals != null)
+ it = vals.iterator();
+
+ Iterator<GridCacheDrInfo> conflictPutValsIt = null;
+
+ if (conflictPutVals != null)
+ conflictPutValsIt = conflictPutVals.iterator();
+
+ Iterator<GridCacheVersion> conflictRmvValsIt = null;
+
+ if (conflictRmvVals != null)
+ conflictRmvValsIt = conflictRmvVals.iterator();
+
+ Map<UUID, GridNearAtomicUpdateRequest> pendingMappings = U.newHashMap(topNodes.size());
+
+ // Create mappings first, then send messages.
+ for (Object key : keys) {
+ if (key == null)
+ throw new NullPointerException("Null key.");
+
+ Object val;
+ GridCacheVersion conflictVer;
+ long conflictTtl;
+ long conflictExpireTime;
+
+ if (vals != null) {
+ val = it.next();
+ conflictVer = null;
+ conflictTtl = CU.TTL_NOT_CHANGED;
+ conflictExpireTime = CU.EXPIRE_TIME_CALCULATE;
+
+ if (val == null)
+ throw new NullPointerException("Null value.");
+ }
+ else if (conflictPutVals != null) {
+ GridCacheDrInfo conflictPutVal = conflictPutValsIt.next();
+
+ val = conflictPutVal.valueEx();
+ conflictVer = conflictPutVal.version();
+ conflictTtl = conflictPutVal.ttl();
+ conflictExpireTime = conflictPutVal.expireTime();
+ }
+ else if (conflictRmvVals != null) {
+ val = null;
+ conflictVer = conflictRmvValsIt.next();
+ conflictTtl = CU.TTL_NOT_CHANGED;
+ conflictExpireTime = CU.EXPIRE_TIME_CALCULATE;
+ }
+ else {
+ val = null;
+ conflictVer = null;
+ conflictTtl = CU.TTL_NOT_CHANGED;
+ conflictExpireTime = CU.EXPIRE_TIME_CALCULATE;
+ }
+
+ if (val == null && op != GridCacheOperation.DELETE)
+ continue;
+
+ KeyCacheObject cacheKey = cctx.toCacheKeyObject(key);
+
+ if (remapKeys != null && !remapKeys.contains(cacheKey))
+ continue;
+
+ if (op != TRANSFORM)
+ val = cctx.toCacheObject(val);
+
+ Collection<ClusterNode> affNodes = mapKey(cacheKey, topVer, fastMap);
+
+ if (affNodes.isEmpty())
+ throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
+ "(all partition nodes left the grid).");
+
+ int i = 0;
+
+ for (ClusterNode affNode : affNodes) {
+ if (affNode == null)
+ throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
+ "(all partition nodes left the grid).");
+
+ UUID nodeId = affNode.id();
+
+ GridNearAtomicUpdateRequest mapped = pendingMappings.get(nodeId);
+
+ if (mapped == null) {
+ mapped = new GridNearAtomicUpdateRequest(
+ cctx.cacheId(),
+ nodeId,
+ futVer,
+ fastMap,
+ updVer,
+ topVer,
+ topLocked,
+ syncMode,
+ op,
+ retval,
+ expiryPlc,
+ invokeArgs,
+ filter,
+ subjId,
+ taskNameHash,
+ skipStore,
+ keepBinary,
+ cctx.kernalContext().clientNode(),
+ cctx.deploymentEnabled(),
+ keys.size());
+
+ pendingMappings.put(nodeId, mapped);
+ }
+
+ mapped.addUpdateEntry(cacheKey, val, conflictTtl, conflictExpireTime, conflictVer, i == 0);
+
+ i++;
+ }
+ }
+
+ return pendingMappings;
+ }
+
+ /**
+ * @param topVer Topology version.
+ * @param futVer Future version.
+ * @param updVer Update version.
+ * @return Request.
+ * @throws Exception If failed.
+ */
+ private GridNearAtomicUpdateRequest mapSingleUpdate(AffinityTopologyVersion topVer,
+ GridCacheVersion futVer,
+ @Nullable GridCacheVersion updVer) throws Exception {
+ Object key = F.first(keys);
+
+ Object val;
+ GridCacheVersion conflictVer;
+ long conflictTtl;
+ long conflictExpireTime;
+
+ if (vals != null) {
+ // Regular PUT.
+ val = F.first(vals);
+ conflictVer = null;
+ conflictTtl = CU.TTL_NOT_CHANGED;
+ conflictExpireTime = CU.EXPIRE_TIME_CALCULATE;
+ }
+ else if (conflictPutVals != null) {
+ // Conflict PUT.
+ GridCacheDrInfo conflictPutVal = F.first(conflictPutVals);
+
+ val = conflictPutVal.valueEx();
+ conflictVer = conflictPutVal.version();
+ conflictTtl = conflictPutVal.ttl();
+ conflictExpireTime = conflictPutVal.expireTime();
+ }
+ else if (conflictRmvVals != null) {
+ // Conflict REMOVE.
+ val = null;
+ conflictVer = F.first(conflictRmvVals);
+ conflictTtl = CU.TTL_NOT_CHANGED;
+ conflictExpireTime = CU.EXPIRE_TIME_CALCULATE;
+ }
+ else {
+ // Regular REMOVE.
+ val = null;
+ conflictVer = null;
+ conflictTtl = CU.TTL_NOT_CHANGED;
+ conflictExpireTime = CU.EXPIRE_TIME_CALCULATE;
+ }
+
+ // We still can get here if user pass map with single element.
+ if (key == null)
+ throw new NullPointerException("Null key.");
+
+ if (val == null && op != GridCacheOperation.DELETE)
+ throw new NullPointerException("Null value.");
+
+ KeyCacheObject cacheKey = cctx.toCacheKeyObject(key);
+
+ if (op != TRANSFORM)
+ val = cctx.toCacheObject(val);
+
+ ClusterNode primary = cctx.affinity().primary(cacheKey, topVer);
+
+ if (primary == null)
+ throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all partition nodes " +
+ "left the grid).");
+
+ GridNearAtomicUpdateRequest req = new GridNearAtomicUpdateRequest(
+ cctx.cacheId(),
+ primary.id(),
+ futVer,
+ fastMap,
+ updVer,
+ topVer,
+ topLocked,
+ syncMode,
+ op,
+ retval,
+ expiryPlc,
+ invokeArgs,
+ filter,
+ subjId,
+ taskNameHash,
+ skipStore,
+ keepBinary,
+ cctx.kernalContext().clientNode(),
+ cctx.deploymentEnabled(),
+ 1);
+
+ req.addUpdateEntry(cacheKey,
+ val,
+ conflictTtl,
+ conflictExpireTime,
+ conflictVer,
+ true);
+
+ return req;
+ }
+
+ /**
+ * @param ret Result from single node.
+ */
+ @SuppressWarnings("unchecked")
+ private void addInvokeResults(GridCacheReturn ret) {
+ assert op == TRANSFORM : op;
+ assert ret.value() == null || ret.value() instanceof Map : ret.value();
+
+ if (ret.value() != null) {
+ if (opRes != null)
+ opRes.mergeEntryProcessResults(ret);
+ else
+ opRes = ret;
+ }
+ }
+
+ /**
+ * @param failedKeys Failed keys.
+ * @param topVer Topology version for failed update.
+ * @param err Error cause.
+ */
+ private void addFailedKeys(Collection<KeyCacheObject> failedKeys,
+ AffinityTopologyVersion topVer,
+ Throwable err) {
+ CachePartialUpdateCheckedException err0 = this.err;
+
+ if (err0 == null)
+ err0 = this.err = new CachePartialUpdateCheckedException("Failed to update keys (retry update if possible).");
+
+ Collection<Object> keys = new ArrayList<>(failedKeys.size());
+
+ for (KeyCacheObject key : failedKeys)
+ keys.add(cctx.cacheObjectContext().unwrapBinaryIfNeeded(key, keepBinary, false));
+
+ err0.add(keys, err, topVer);
+ }
+
+ /** {@inheritDoc} */
+ @Override public synchronized String toString() {
+ return S.toString(UpdateState.class, this);
+ }
+ }
+
+ /** {@inheritDoc} */
+ public String toString() {
+ return S.toString(GridNearAtomicSingleUpdateFuture.class, this, super.toString());
+ }
+}