You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/02/03 10:47:11 UTC
[8/8] ignite git commit: IGNITE-2532: Reverting changes to
GridNearAtomicUpdateFuture.
IGNITE-2532: Reverting changes to GridNearAtomicUpdateFuture.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/07c23931
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/07c23931
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/07c23931
Branch: refs/heads/ignite-2523
Commit: 07c23931f9758497db50bf0851af5d6c0fb8eaa4
Parents: 89c8074
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Wed Feb 3 12:45:42 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Wed Feb 3 12:45:42 2016 +0300
----------------------------------------------------------------------
.../GridNearAbstractAtomicUpdateFuture.java | 252 ----
.../dht/atomic/GridNearAtomicUpdateFuture.java | 1400 ++++++++++--------
2 files changed, 798 insertions(+), 854 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/07c23931/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAbstractAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAbstractAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAbstractAtomicUpdateFuture.java
deleted file mode 100644
index f8c6810..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAbstractAtomicUpdateFuture.java
+++ /dev/null
@@ -1,252 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
-
-import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.cache.CacheWriteSynchronizationMode;
-import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
-import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager;
-import org.apache.ignite.internal.processors.cache.GridCacheAtomicFuture;
-import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.GridCacheMvccManager;
-import org.apache.ignite.internal.processors.cache.GridCacheOperation;
-import org.apache.ignite.internal.processors.cache.KeyCacheObject;
-import org.apache.ignite.internal.processors.cache.distributed.near.GridNearAtomicCache;
-import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.internal.util.future.GridFutureAdapter;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.internal.CU;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteProductVersion;
-import org.apache.ignite.lang.IgniteUuid;
-import org.jetbrains.annotations.Nullable;
-
-import javax.cache.expiry.ExpiryPolicy;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.UUID;
-import java.util.concurrent.atomic.AtomicReference;
-
-import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.CLOCK;
-import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC;
-import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
-import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
-
-/**
- * Base class for near atomic update futures.
- */
-public abstract class GridNearAbstractAtomicUpdateFuture extends GridFutureAdapter<Object>
- implements GridCacheAtomicFuture<Object> {
- /** */
- public static final IgniteProductVersion SINGLE_PUT_MSG_SINCE = IgniteProductVersion.fromString("1.6.0");
-
- /** Logger reference. */
- protected static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
-
- /** Logger. */
- protected static IgniteLogger log;
-
- /** Optional arguments for entry processor. */
- protected Object[] invokeArgs;
-
- /** Cache context. */
- protected final GridCacheContext cctx;
-
- /** Cache. */
- protected final GridDhtAtomicCache cache;
-
- /** Update operation. */
- protected final GridCacheOperation op;
-
- /** Return value require flag. */
- protected final boolean retval;
-
- /** Expiry policy. */
- protected final ExpiryPolicy expiryPlc;
-
- /** Optional filter. */
- protected final CacheEntryPredicate[] filter;
-
- /** Write synchronization mode. */
- protected final CacheWriteSynchronizationMode syncMode;
-
- /** Raw return value flag. */
- protected final boolean rawRetval;
-
- /** Fast map flag. */
- protected final boolean fastMap;
-
- /** Near cache flag. */
- protected final boolean nearEnabled;
-
- /** Subject ID. */
- protected final UUID subjId;
-
- /** Task name hash. */
- protected final int taskNameHash;
-
- /** Skip store flag. */
- protected final boolean skipStore;
-
- /** */
- protected final boolean keepBinary;
-
- /** Wait for topology future flag. */
- protected final boolean waitTopFut;
-
- /** Topology locked flag. Set if atomic update is performed inside a TX or explicit lock. */
- protected boolean topLocked;
-
- /** Remap count. */
- protected int remapCnt;
-
- /**
- * @param cctx Cache context.
- * @param cache Cache instance.
- * @param syncMode Write synchronization mode.
- * @param op Update operation.
- * @param invokeArgs Optional arguments for entry processor.
- * @param retval Return value require flag.
- * @param rawRetval {@code True} if should return {@code GridCacheReturn} as future result.
- * @param expiryPlc Expiry policy explicitly specified for cache operation.
- * @param filter Entry filter.
- * @param subjId Subject ID.
- * @param taskNameHash Task name hash code.
- * @param skipStore Skip store flag.
- * @param keepBinary Keep binary flag.
- * @param remapCnt Maximum number of retries.
- * @param waitTopFut If {@code false} does not wait for affinity change future.
- */
- public GridNearAbstractAtomicUpdateFuture(
- GridCacheContext cctx,
- GridDhtAtomicCache cache,
- CacheWriteSynchronizationMode syncMode,
- GridCacheOperation op,
- @Nullable Object[] invokeArgs,
- final boolean retval,
- final boolean rawRetval,
- @Nullable ExpiryPolicy expiryPlc,
- final CacheEntryPredicate[] filter,
- UUID subjId,
- int taskNameHash,
- boolean skipStore,
- boolean keepBinary,
- int remapCnt,
- boolean waitTopFut
- ) {
- this.rawRetval = rawRetval;
-
- assert subjId != null;
-
- this.cctx = cctx;
- this.cache = cache;
- this.syncMode = syncMode;
- this.op = op;
- this.invokeArgs = invokeArgs;
- this.retval = retval;
- this.expiryPlc = expiryPlc;
- this.filter = filter;
- this.subjId = subjId;
- this.taskNameHash = taskNameHash;
- this.skipStore = skipStore;
- this.keepBinary = keepBinary;
- this.waitTopFut = waitTopFut;
-
- if (log == null)
- log = U.logger(cctx.kernalContext(), logRef, GridFutureAdapter.class);
-
- fastMap = F.isEmpty(filter) && op != TRANSFORM && cctx.config().getWriteSynchronizationMode() == FULL_SYNC &&
- cctx.config().getAtomicWriteOrderMode() == CLOCK &&
- !(cctx.writeThrough() && cctx.config().getInterceptor() != null);
-
- nearEnabled = CU.isNearEnabled(cctx);
-
- if (!waitTopFut)
- remapCnt = 1;
-
- this.remapCnt = remapCnt;
- }
-
- /** {@inheritDoc} */
- @Override public IgniteUuid futureId() {
- throw new UnsupportedOperationException();
- }
-
- /** {@inheritDoc} */
- @Override public boolean trackable() {
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public void markNotTrackable() {
- // No-op.
- }
-
- /**
- * @return {@code True} if this future should block partition map exchange.
- */
- protected boolean waitForPartitionExchange() {
- // Wait fast-map near atomic update futures in CLOCK mode.
- return fastMap;
- }
-
- /**
- * Updates near cache.
- *
- * @param req Update request.
- * @param res Update response.
- */
- protected void updateNear(GridNearAtomicUpdateRequestInterface req, GridNearAtomicUpdateResponse res) {
- assert nearEnabled;
-
- if (res.remapKeys() != null || !req.hasPrimary())
- return;
-
- GridNearAtomicCache near = (GridNearAtomicCache)cctx.dht().near();
-
- near.processNearAtomicUpdateResponse(req, res);
- }
-
- /**
- * @return {@code True} future is stored by {@link GridCacheMvccManager#addAtomicFuture}.
- */
- protected boolean storeFuture() {
- return cctx.config().getAtomicWriteOrderMode() == CLOCK || syncMode != FULL_ASYNC;
- }
-
- /**
- * Maps key to nodes. If filters are absent and operation is not TRANSFORM, then we can assign version on near
- * node and send updates in parallel to all participating nodes.
- *
- * @param key Key to map.
- * @param topVer Topology version to map.
- * @param fastMap Flag indicating whether mapping is performed for fast-circuit update.
- * @return Collection of nodes to which key is mapped.
- */
- protected Collection<ClusterNode> mapKey(KeyCacheObject key, AffinityTopologyVersion topVer, boolean fastMap
- ) {
- GridCacheAffinityManager affMgr = cctx.affinity();
-
- // If we can send updates in parallel - do it.
- return fastMap ?
- cctx.topology().nodes(affMgr.partition(key), topVer) :
- Collections.singletonList(affMgr.primary(key, topVer));
- }
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/07c23931/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index c8550f3..149d277 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -17,7 +17,16 @@
package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicReference;
+import javax.cache.expiry.ExpiryPolicy;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.IgniteInternalFuture;
@@ -26,12 +35,16 @@ import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
import org.apache.ignite.internal.processors.cache.CachePartialUpdateCheckedException;
+import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager;
+import org.apache.ignite.internal.processors.cache.GridCacheAtomicFuture;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheMvccManager;
import org.apache.ignite.internal.processors.cache.GridCacheOperation;
import org.apache.ignite.internal.processors.cache.GridCacheReturn;
import org.apache.ignite.internal.processors.cache.GridCacheTryPutFailedException;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearAtomicCache;
import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
@@ -44,26 +57,38 @@ import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteProductVersion;
+import org.apache.ignite.lang.IgniteUuid;
import org.jetbrains.annotations.Nullable;
-import javax.cache.expiry.ExpiryPolicy;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.UUID;
-
import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.CLOCK;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC;
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
/**
* DHT atomic cache near update future.
*/
-@SuppressWarnings("ThrowableResultOfMethodCallIgnored")
-public class GridNearAtomicUpdateFuture extends GridNearAbstractAtomicUpdateFuture {
+public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> implements GridCacheAtomicFuture<Object>{
+ /** Version where single-put optimization appeared.*/
+ public static final IgniteProductVersion SINGLE_PUT_MSG_SINCE = IgniteProductVersion.fromString("1.6.0");
+
+ /** Logger reference. */
+ private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
+
+ /** Logger. */
+ protected static IgniteLogger log;
+
+ /** Cache context. */
+ private final GridCacheContext cctx;
+
+ /** Cache. */
+ private GridDhtAtomicCache cache;
+
+ /** Update operation. */
+ private final GridCacheOperation op;
+
/** Keys */
private Collection<?> keys;
@@ -71,6 +96,9 @@ public class GridNearAtomicUpdateFuture extends GridNearAbstractAtomicUpdateFutu
@SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"})
private Collection<?> vals;
+ /** Optional arguments for entry processor. */
+ private Object[] invokeArgs;
+
/** Conflict put values. */
@SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"})
private Collection<GridCacheDrInfo> conflictPutVals;
@@ -79,39 +107,50 @@ public class GridNearAtomicUpdateFuture extends GridNearAbstractAtomicUpdateFutu
@SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"})
private Collection<GridCacheVersion> conflictRmvVals;
- /** Current topology version. */
- private AffinityTopologyVersion topVer = AffinityTopologyVersion.ZERO;
+ /** Return value require flag. */
+ private final boolean retval;
- /** */
- private GridCacheVersion updVer;
+ /** Expiry policy. */
+ private final ExpiryPolicy expiryPlc;
- /** Topology version when got mapping error. */
- private AffinityTopologyVersion mapErrTopVer;
+ /** Optional filter. */
+ private final CacheEntryPredicate[] filter;
- /** Mappings if operations is mapped to more than one node. */
- @GridToStringInclude
- private Map<UUID, GridNearAtomicUpdateRequest> mappings;
+ /** Write synchronization mode. */
+ private final CacheWriteSynchronizationMode syncMode;
- /** */
- private int resCnt;
+ /** Raw return value flag. */
+ private final boolean rawRetval;
+
+ /** Fast map flag. */
+ private final boolean fastMap;
+
+ /** Near cache flag. */
+ private final boolean nearEnabled;
- /** Error. */
- private CachePartialUpdateCheckedException err;
+ /** Subject ID. */
+ private final UUID subjId;
- /** Future ID. */
- private GridCacheVersion futVer;
+ /** Task name hash. */
+ private final int taskNameHash;
- /** Completion future for a particular topology version. */
- private GridFutureAdapter<Void> topCompleteFut;
+ /** Topology locked flag. Set if atomic update is performed inside a TX or explicit lock. */
+ private boolean topLocked;
+
+ /** Skip store flag. */
+ private final boolean skipStore;
+
+ /** */
+ private final boolean keepBinary;
- /** Keys to remap. */
- private Collection<KeyCacheObject> remapKeys;
+ /** Wait for topology future flag. */
+ private final boolean waitTopFut;
- /** Not null is operation is mapped to single node. */
- private GridNearAtomicUpdateRequestInterface singleReq;
+ /** Remap count. */
+ private int remapCnt;
- /** Operation result. */
- private GridCacheReturn opRes;
+ /** State. */
+ private final UpdateState state;
/**
* @param cctx Cache context.
@@ -155,72 +194,116 @@ public class GridNearAtomicUpdateFuture extends GridNearAbstractAtomicUpdateFutu
int remapCnt,
boolean waitTopFut
) {
- super(cctx, cache, syncMode, op, invokeArgs, retval, rawRetval, expiryPlc, filter, subjId, taskNameHash,
- skipStore, keepBinary, remapCnt, waitTopFut);
+ this.rawRetval = rawRetval;
assert vals == null || vals.size() == keys.size();
assert conflictPutVals == null || conflictPutVals.size() == keys.size();
assert conflictRmvVals == null || conflictRmvVals.size() == keys.size();
+ assert subjId != null;
+ this.cctx = cctx;
+ this.cache = cache;
+ this.syncMode = syncMode;
+ this.op = op;
this.keys = keys;
this.vals = vals;
+ this.invokeArgs = invokeArgs;
this.conflictPutVals = conflictPutVals;
this.conflictRmvVals = conflictRmvVals;
+ this.retval = retval;
+ this.expiryPlc = expiryPlc;
+ this.filter = filter;
+ this.subjId = subjId;
+ this.taskNameHash = taskNameHash;
+ this.skipStore = skipStore;
+ this.keepBinary = keepBinary;
+ this.waitTopFut = waitTopFut;
+
+ if (log == null)
+ log = U.logger(cctx.kernalContext(), logRef, GridFutureAdapter.class);
+
+ fastMap = F.isEmpty(filter) && op != TRANSFORM && cctx.config().getWriteSynchronizationMode() == FULL_SYNC &&
+ cctx.config().getAtomicWriteOrderMode() == CLOCK &&
+ !(cctx.writeThrough() && cctx.config().getInterceptor() != null);
+
+ nearEnabled = CU.isNearEnabled(cctx);
+
+ if (!waitTopFut)
+ remapCnt = 1;
+
+ this.remapCnt = remapCnt;
+
+ state = new UpdateState();
}
/** {@inheritDoc} */
- @Override public synchronized GridCacheVersion version() {
- return futVer;
+ @Override public IgniteUuid futureId() {
+ throw new UnsupportedOperationException();
}
/** {@inheritDoc} */
- @Override public IgniteInternalFuture<Void> completeFuture(AffinityTopologyVersion topVer) {
- if (waitForPartitionExchange()) {
- GridFutureAdapter<Void> fut = completeFuture0(topVer);
+ @Override public GridCacheVersion version() {
+ return state.futureVersion();
+ }
- if (fut != null && isDone()) {
- fut.onDone();
+ /**
+ * @return {@code True} if this future should block partition map exchange.
+ */
+ private boolean waitForPartitionExchange() {
+ // Wait fast-map near atomic update futures in CLOCK mode.
+ return fastMap;
+ }
- return null;
- }
+ /** {@inheritDoc} */
+ @Override public boolean onNodeLeft(UUID nodeId) {
+ state.onNodeLeft(nodeId);
- return fut;
- }
+ return false;
+ }
- return null;
+ /** {@inheritDoc} */
+ @Override public boolean trackable() {
+ return true;
}
/** {@inheritDoc} */
- @Override public boolean onNodeLeft(UUID nodeId) {
- GridNearAtomicUpdateResponse res = null;
+ @Override public void markNotTrackable() {
+ // No-op.
+ }
- synchronized (this) {
- GridNearAtomicUpdateRequestInterface req;
+ /**
+ * Performs future mapping.
+ */
+ public void map() {
+ AffinityTopologyVersion topVer = cctx.shared().lockedTopologyVersion(null);
- if (singleReq != null)
- req = singleReq.nodeId().equals(nodeId) ? singleReq : null;
- else
- req = mappings != null ? mappings.get(nodeId) : null;
+ if (topVer == null)
+ mapOnTopology();
+ else {
+ topLocked = true;
- if (req != null && req.response() == null) {
- res = new GridNearAtomicUpdateResponse(cctx.cacheId(),
- nodeId,
- req.futureVersion(),
- cctx.deploymentEnabled());
+ // Cannot remap.
+ remapCnt = 1;
- ClusterTopologyCheckedException e = new ClusterTopologyCheckedException("Primary node left grid " +
- "before response is received: " + nodeId);
+ state.map(topVer, null);
+ }
+ }
- e.retryReadyFuture(cctx.shared().nextAffinityReadyFuture(req.topologyVersion()));
+ /** {@inheritDoc} */
+ @Override public IgniteInternalFuture<Void> completeFuture(AffinityTopologyVersion topVer) {
+ if (waitForPartitionExchange()) {
+ GridFutureAdapter<Void> fut = state.completeFuture(topVer);
- res.addFailedKeys(req.keys(), e);
+ if (fut != null && isDone()) {
+ fut.onDone();
+
+ return null;
}
- }
- if (res != null)
- onResult(nodeId, res, true);
+ return fut;
+ }
- return false;
+ return null;
}
/** {@inheritDoc} */
@@ -238,7 +321,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAbstractAtomicUpdateFutu
retval = Collections.emptyMap();
if (super.onDone(retval, err)) {
- GridCacheVersion futVer = onFutureDone();
+ GridCacheVersion futVer = state.onFutureDone();
if (futVer != null)
cctx.mvcc().removeAtomicFuture(futVer);
@@ -250,31 +333,30 @@ public class GridNearAtomicUpdateFuture extends GridNearAbstractAtomicUpdateFutu
}
/**
- * Performs future mapping.
+ * Response callback.
+ *
+ * @param nodeId Node ID.
+ * @param res Update response.
*/
- public void map() {
- AffinityTopologyVersion topVer = cctx.shared().lockedTopologyVersion(null);
-
- if (topVer == null)
- mapOnTopology();
- else {
- topLocked = true;
-
- // Cannot remap.
- remapCnt = 1;
-
- map(topVer, null);
- }
+ public void onResult(UUID nodeId, GridNearAtomicUpdateResponse res) {
+ state.onResult(nodeId, res, false);
}
/**
- * Response callback.
+ * Updates near cache.
*
- * @param nodeId Node ID.
+ * @param req Update request.
* @param res Update response.
*/
- public void onResult(UUID nodeId, GridNearAtomicUpdateResponse res) {
- onResult(nodeId, res, false);
+ private void updateNear(GridNearAtomicUpdateRequestInterface req, GridNearAtomicUpdateResponse res) {
+ assert nearEnabled;
+
+ if (res.remapKeys() != null || !req.hasPrimary())
+ return;
+
+ GridNearAtomicCache near = (GridNearAtomicCache)cctx.dht().near();
+
+ near.processNearAtomicUpdateResponse(req, res);
}
/**
@@ -330,7 +412,36 @@ public class GridNearAtomicUpdateFuture extends GridNearAbstractAtomicUpdateFutu
cache.topology().readUnlock();
}
- map(topVer, null);
+ state.map(topVer, null);
+ }
+
+ /**
+ * @return {@code True} future is stored by {@link GridCacheMvccManager#addAtomicFuture}.
+ */
+ private boolean storeFuture() {
+ return cctx.config().getAtomicWriteOrderMode() == CLOCK || syncMode != FULL_ASYNC;
+ }
+
+ /**
+ * Maps key to nodes. If filters are absent and operation is not TRANSFORM, then we can assign version on near
+ * node and send updates in parallel to all participating nodes.
+ *
+ * @param key Key to map.
+ * @param topVer Topology version to map.
+ * @param fastMap Flag indicating whether mapping is performed for fast-circuit update.
+ * @return Collection of nodes to which key is mapped.
+ */
+ private Collection<ClusterNode> mapKey(
+ KeyCacheObject key,
+ AffinityTopologyVersion topVer,
+ boolean fastMap
+ ) {
+ GridCacheAffinityManager affMgr = cctx.affinity();
+
+ // If we can send updates in parallel - do it.
+ return fastMap ?
+ cctx.topology().nodes(affMgr.partition(key), topVer) :
+ Collections.singletonList(affMgr.primary(key, topVer));
}
/**
@@ -343,7 +454,8 @@ public class GridNearAtomicUpdateFuture extends GridNearAbstractAtomicUpdateFutu
if (cctx.localNodeId().equals(nodeId)) {
cache.updateAllAsyncInternal(nodeId, req,
new CI2<GridNearAtomicUpdateRequestInterface, GridNearAtomicUpdateResponse>() {
- @Override public void apply(GridNearAtomicUpdateRequestInterface req, GridNearAtomicUpdateResponse res) {
+ @Override public void apply(GridNearAtomicUpdateRequestInterface req,
+ GridNearAtomicUpdateResponse res) {
onResult(res.nodeId(), res);
}
});
@@ -365,7 +477,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAbstractAtomicUpdateFutu
onDone(new GridCacheReturn(cctx, true, true, null, true));
}
catch (IgniteCheckedException e) {
- onSendError(req, e);
+ state.onSendError(req, e);
}
}
}
@@ -378,7 +490,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAbstractAtomicUpdateFutu
private void doUpdate(Map<UUID, GridNearAtomicUpdateRequest> mappings) {
UUID locNodeId = cctx.localNodeId();
- GridNearAtomicUpdateRequestInterface locUpdate = null;
+ GridNearAtomicUpdateRequest locUpdate = null;
// Send messages to remote nodes first, then run local update.
for (GridNearAtomicUpdateRequest req : mappings.values()) {
@@ -396,7 +508,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAbstractAtomicUpdateFutu
cctx.io().send(req.nodeId(), req, cctx.ioPolicy());
}
catch (IgniteCheckedException e) {
- onSendError(req, e);
+ state.onSendError(req, e);
}
}
}
@@ -415,423 +527,611 @@ public class GridNearAtomicUpdateFuture extends GridNearAbstractAtomicUpdateFutu
}
/**
- * @param nodeId Node ID.
- * @param res Response.
- * @param nodeErr {@code True} if response was created on node failure.
+ *
*/
- @SuppressWarnings("unchecked")
- void onResult(UUID nodeId, GridNearAtomicUpdateResponse res, boolean nodeErr) {
- GridNearAtomicUpdateRequestInterface req;
+ private class UpdateState {
+ /** Current topology version. */
+ private AffinityTopologyVersion topVer = AffinityTopologyVersion.ZERO;
- AffinityTopologyVersion remapTopVer = null;
+ /** */
+ private GridCacheVersion updVer;
- GridCacheReturn opRes0 = null;
- CachePartialUpdateCheckedException err0 = null;
+ /** Topology version when got mapping error. */
+ private AffinityTopologyVersion mapErrTopVer;
- boolean rcvAll;
+ /** Mappings if operations is mapped to more than one node. */
+ @GridToStringInclude
+ private Map<UUID, GridNearAtomicUpdateRequest> mappings;
- GridFutureAdapter<?> fut0 = null;
+ /** */
+ private int resCnt;
- synchronized (this) {
- if (!res.futureVersion().equals(futVer))
- return;
+ /** Error. */
+ private CachePartialUpdateCheckedException err;
- if (singleReq != null) {
- if (!singleReq.nodeId().equals(nodeId))
- return;
+ /** Future ID. */
+ private GridCacheVersion futVer;
- req = singleReq;
+ /** Completion future for a particular topology version. */
+ private GridFutureAdapter<Void> topCompleteFut;
- singleReq = null;
+ /** Keys to remap. */
+ private Collection<KeyCacheObject> remapKeys;
- rcvAll = true;
- }
- else {
- req = mappings != null ? mappings.get(nodeId) : null;
+ /** Not null is operation is mapped to single node. */
+ private GridNearAtomicUpdateRequestInterface singleReq;
- if (req != null && req.onResponse(res)) {
- resCnt++;
+ /** Operation result. */
+ private GridCacheReturn opRes;
- rcvAll = mappings.size() == resCnt;
- }
+ /**
+ * @return Future version.
+ */
+ @Nullable synchronized GridCacheVersion futureVersion() {
+ return futVer;
+ }
+
+ /**
+ * @param nodeId Left node ID.
+ */
+ void onNodeLeft(UUID nodeId) {
+ GridNearAtomicUpdateResponse res = null;
+
+ synchronized (this) {
+ GridNearAtomicUpdateRequestInterface req;
+
+ if (singleReq != null)
+ req = singleReq.nodeId().equals(nodeId) ? singleReq : null;
else
- return;
+ req = mappings != null ? mappings.get(nodeId) : null;
+
+ if (req != null && req.response() == null) {
+ res = new GridNearAtomicUpdateResponse(cctx.cacheId(),
+ nodeId,
+ req.futureVersion(),
+ cctx.deploymentEnabled());
+
+ ClusterTopologyCheckedException e = new ClusterTopologyCheckedException("Primary node left grid " +
+ "before response is received: " + nodeId);
+
+ e.retryReadyFuture(cctx.shared().nextAffinityReadyFuture(req.topologyVersion()));
+
+ res.addFailedKeys(req.keys(), e);
+ }
}
- assert req != null && req.topologyVersion().equals(topVer) : req;
+ if (res != null)
+ onResult(nodeId, res, true);
+ }
- if (res.remapKeys() != null) {
- assert !fastMap || cctx.kernalContext().clientNode();
+ /**
+ * @param nodeId Node ID.
+ * @param res Response.
+ * @param nodeErr {@code True} if response was created on node failure.
+ */
+ @SuppressWarnings({"unchecked", "ThrowableResultOfMethodCallIgnored"})
+ void onResult(UUID nodeId, GridNearAtomicUpdateResponse res, boolean nodeErr) {
+ GridNearAtomicUpdateRequestInterface req;
- if (remapKeys == null)
- remapKeys = U.newHashSet(res.remapKeys().size());
+ AffinityTopologyVersion remapTopVer = null;
- remapKeys.addAll(res.remapKeys());
+ GridCacheReturn opRes0 = null;
+ CachePartialUpdateCheckedException err0 = null;
- if (mapErrTopVer == null || mapErrTopVer.compareTo(req.topologyVersion()) < 0)
- mapErrTopVer = req.topologyVersion();
- }
- else if (res.error() != null) {
- if (res.failedKeys() != null)
- addFailedKeys(res.failedKeys(), req.topologyVersion(), res.error());
- }
- else {
- if (!req.fastMap() || req.hasPrimary()) {
- GridCacheReturn ret = res.returnValue();
+ boolean rcvAll;
+
+ GridFutureAdapter<?> fut0 = null;
+
+ synchronized (this) {
+ if (!res.futureVersion().equals(futVer))
+ return;
+
+ if (singleReq != null) {
+ if (!singleReq.nodeId().equals(nodeId))
+ return;
+
+ req = singleReq;
+
+ singleReq = null;
+
+ rcvAll = true;
+ }
+ else {
+ req = mappings != null ? mappings.get(nodeId) : null;
+
+ if (req != null && req.onResponse(res)) {
+ resCnt++;
- if (op == TRANSFORM) {
- if (ret != null)
- addInvokeResults(ret);
+ rcvAll = mappings.size() == resCnt;
}
else
- opRes = ret;
+ return;
}
- }
- if (rcvAll) {
- if (remapKeys != null) {
- assert mapErrTopVer != null;
+ assert req != null && req.topologyVersion().equals(topVer) : req;
+
+ if (res.remapKeys() != null) {
+ assert !fastMap || cctx.kernalContext().clientNode();
+
+ if (remapKeys == null)
+ remapKeys = U.newHashSet(res.remapKeys().size());
- remapTopVer = new AffinityTopologyVersion(mapErrTopVer.topologyVersion() + 1);
+ remapKeys.addAll(res.remapKeys());
+
+ if (mapErrTopVer == null || mapErrTopVer.compareTo(req.topologyVersion()) < 0)
+ mapErrTopVer = req.topologyVersion();
+ }
+ else if (res.error() != null) {
+ if (res.failedKeys() != null)
+ addFailedKeys(res.failedKeys(), req.topologyVersion(), res.error());
}
else {
- if (err != null &&
- X.hasCause(err, CachePartialUpdateCheckedException.class) &&
- X.hasCause(err, ClusterTopologyCheckedException.class) &&
- storeFuture() &&
- --remapCnt > 0) {
- ClusterTopologyCheckedException topErr =
- X.cause(err, ClusterTopologyCheckedException.class);
+ if (!req.fastMap() || req.hasPrimary()) {
+ GridCacheReturn ret = res.returnValue();
+
+ if (op == TRANSFORM) {
+ if (ret != null)
+ addInvokeResults(ret);
+ }
+ else
+ opRes = ret;
+ }
+ }
+
+ if (rcvAll) {
+ if (remapKeys != null) {
+ assert mapErrTopVer != null;
+
+ remapTopVer = new AffinityTopologyVersion(mapErrTopVer.topologyVersion() + 1);
+ }
+ else {
+ if (err != null &&
+ X.hasCause(err, CachePartialUpdateCheckedException.class) &&
+ X.hasCause(err, ClusterTopologyCheckedException.class) &&
+ storeFuture() &&
+ --remapCnt > 0) {
+ ClusterTopologyCheckedException topErr =
+ X.cause(err, ClusterTopologyCheckedException.class);
- if (!(topErr instanceof ClusterTopologyServerNotFoundException)) {
- CachePartialUpdateCheckedException cause =
- X.cause(err, CachePartialUpdateCheckedException.class);
+ if (!(topErr instanceof ClusterTopologyServerNotFoundException)) {
+ CachePartialUpdateCheckedException cause =
+ X.cause(err, CachePartialUpdateCheckedException.class);
- assert cause != null && cause.topologyVersion() != null : err;
+ assert cause != null && cause.topologyVersion() != null : err;
- remapTopVer =
- new AffinityTopologyVersion(cause.topologyVersion().topologyVersion() + 1);
+ remapTopVer =
+ new AffinityTopologyVersion(cause.topologyVersion().topologyVersion() + 1);
- err = null;
+ err = null;
- Collection<Object> failedKeys = cause.failedKeys();
+ Collection<Object> failedKeys = cause.failedKeys();
- remapKeys = new ArrayList<>(failedKeys.size());
+ remapKeys = new ArrayList<>(failedKeys.size());
- for (Object key : failedKeys)
- remapKeys.add(cctx.toCacheKeyObject(key));
+ for (Object key : failedKeys)
+ remapKeys.add(cctx.toCacheKeyObject(key));
- updVer = null;
+ updVer = null;
+ }
}
}
- }
- if (remapTopVer == null) {
- err0 = err;
- opRes0 = opRes;
- }
- else {
- fut0 = topCompleteFut;
+ if (remapTopVer == null) {
+ err0 = err;
+ opRes0 = opRes;
+ }
+ else {
+ fut0 = topCompleteFut;
- topCompleteFut = null;
+ topCompleteFut = null;
- cctx.mvcc().removeAtomicFuture(futVer);
+ cctx.mvcc().removeAtomicFuture(futVer);
- futVer = null;
- topVer = AffinityTopologyVersion.ZERO;
+ futVer = null;
+ topVer = AffinityTopologyVersion.ZERO;
+ }
}
}
- }
- if (res.error() != null && res.failedKeys() == null) {
- onDone(res.error());
+ if (res.error() != null && res.failedKeys() == null) {
+ onDone(res.error());
- return;
- }
+ return;
+ }
- if (rcvAll && nearEnabled) {
- if (mappings != null) {
- for (GridNearAtomicUpdateRequestInterface req0 : mappings.values()) {
- GridNearAtomicUpdateResponse res0 = req0.response();
+ if (rcvAll && nearEnabled) {
+ if (mappings != null) {
+ for (GridNearAtomicUpdateRequest req0 : mappings.values()) {
+ GridNearAtomicUpdateResponse res0 = req0.response();
- assert res0 != null : req0;
+ assert res0 != null : req0;
- updateNear(req0, res0);
+ updateNear(req0, res0);
+ }
}
+ else if (!nodeErr)
+ updateNear(req, res);
}
- else if (!nodeErr)
- updateNear(req, res);
- }
- if (remapTopVer != null) {
- if (fut0 != null)
- fut0.onDone();
+ if (remapTopVer != null) {
+ if (fut0 != null)
+ fut0.onDone();
- if (!waitTopFut) {
- onDone(new GridCacheTryPutFailedException());
+ if (!waitTopFut) {
+ onDone(new GridCacheTryPutFailedException());
- return;
- }
+ return;
+ }
- if (topLocked) {
- assert !F.isEmpty(remapKeys) : remapKeys;
+ if (topLocked) {
+ assert !F.isEmpty(remapKeys) : remapKeys;
- CachePartialUpdateCheckedException e =
- new CachePartialUpdateCheckedException("Failed to update keys (retry update if possible).");
+ CachePartialUpdateCheckedException e =
+ new CachePartialUpdateCheckedException("Failed to update keys (retry update if possible).");
- ClusterTopologyCheckedException cause = new ClusterTopologyCheckedException(
- "Failed to update keys, topology changed while execute atomic update inside transaction.");
+ ClusterTopologyCheckedException cause = new ClusterTopologyCheckedException(
+ "Failed to update keys, topology changed while execute atomic update inside transaction.");
- cause.retryReadyFuture(cctx.affinity().affinityReadyFuture(remapTopVer));
+ cause.retryReadyFuture(cctx.affinity().affinityReadyFuture(remapTopVer));
- e.add(remapKeys, cause);
+ e.add(remapKeys, cause);
- onDone(e);
+ onDone(e);
- return;
- }
+ return;
+ }
- IgniteInternalFuture<AffinityTopologyVersion> fut =
- cctx.shared().exchange().affinityReadyFuture(remapTopVer);
+ IgniteInternalFuture<AffinityTopologyVersion> fut =
+ cctx.shared().exchange().affinityReadyFuture(remapTopVer);
- if (fut == null)
- fut = new GridFinishedFuture<>(remapTopVer);
+ if (fut == null)
+ fut = new GridFinishedFuture<>(remapTopVer);
- fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
- @Override public void apply(final IgniteInternalFuture<AffinityTopologyVersion> fut) {
- cctx.kernalContext().closure().runLocalSafe(new Runnable() {
- @Override public void run() {
- try {
- AffinityTopologyVersion topVer = fut.get();
+ fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
+ @Override public void apply(final IgniteInternalFuture<AffinityTopologyVersion> fut) {
+ cctx.kernalContext().closure().runLocalSafe(new Runnable() {
+ @Override public void run() {
+ try {
+ AffinityTopologyVersion topVer = fut.get();
- map(topVer, remapKeys);
- }
- catch (IgniteCheckedException e) {
- onDone(e);
+ map(topVer, remapKeys);
+ }
+ catch (IgniteCheckedException e) {
+ onDone(e);
+ }
}
- }
- });
- }
- });
+ });
+ }
+ });
- return;
- }
+ return;
+ }
- if (rcvAll)
- onDone(opRes0, err0);
- }
+ if (rcvAll)
+ onDone(opRes0, err0);
+ }
- /**
- * @param req Request.
- * @param e Error.
- */
- void onSendError(GridNearAtomicUpdateRequestInterface req, IgniteCheckedException e) {
- synchronized (this) {
- GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(cctx.cacheId(),
- req.nodeId(),
- req.futureVersion(),
- cctx.deploymentEnabled());
+ /**
+ * @param req Request.
+ * @param e Error.
+ */
+ void onSendError(GridNearAtomicUpdateRequestInterface req, IgniteCheckedException e) {
+ synchronized (this) {
+ GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(cctx.cacheId(),
+ req.nodeId(),
+ req.futureVersion(),
+ cctx.deploymentEnabled());
- res.addFailedKeys(req.keys(), e);
+ res.addFailedKeys(req.keys(), e);
- onResult(req.nodeId(), res, true);
+ onResult(req.nodeId(), res, true);
+ }
}
- }
- /**
- * @param topVer Topology version.
- * @param remapKeys Keys to remap.
- */
- void map(AffinityTopologyVersion topVer, @Nullable Collection<KeyCacheObject> remapKeys) {
- Collection<ClusterNode> topNodes = CU.affinityNodes(cctx, topVer);
+ /**
+ * @param topVer Topology version.
+ * @param remapKeys Keys to remap.
+ */
+ void map(AffinityTopologyVersion topVer, @Nullable Collection<KeyCacheObject> remapKeys) {
+ Collection<ClusterNode> topNodes = CU.affinityNodes(cctx, topVer);
- if (F.isEmpty(topNodes)) {
- onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all partition nodes " +
- "left the grid)."));
+ if (F.isEmpty(topNodes)) {
+ onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all partition nodes " +
+ "left the grid)."));
- return;
- }
+ return;
+ }
- Exception err = null;
- GridNearAtomicUpdateRequestInterface singleReq0 = null;
- Map<UUID, GridNearAtomicUpdateRequest> mappings0 = null;
+ Exception err = null;
+ GridNearAtomicUpdateRequestInterface singleReq0 = null;
+ Map<UUID, GridNearAtomicUpdateRequest> mappings0 = null;
- int size = keys.size();
+ int size = keys.size();
- GridCacheVersion futVer = cctx.versions().next(topVer);
+ GridCacheVersion futVer = cctx.versions().next(topVer);
- GridCacheVersion updVer;
+ GridCacheVersion updVer;
- // Assign version on near node in CLOCK ordering mode even if fastMap is false.
- if (cctx.config().getAtomicWriteOrderMode() == CLOCK) {
- updVer = this.updVer;
+ // Assign version on near node in CLOCK ordering mode even if fastMap is false.
+ if (cctx.config().getAtomicWriteOrderMode() == CLOCK) {
+ updVer = this.updVer;
- if (updVer == null) {
- updVer = cctx.versions().next(topVer);
+ if (updVer == null) {
+ updVer = cctx.versions().next(topVer);
- if (log.isDebugEnabled())
- log.debug("Assigned fast-map version for update on near node: " + updVer);
+ if (log.isDebugEnabled())
+ log.debug("Assigned fast-map version for update on near node: " + updVer);
+ }
}
- }
- else
- updVer = null;
-
- try {
- if (size == 1 && !fastMap) {
- assert remapKeys == null || remapKeys.size() == 1;
+ else
+ updVer = null;
- singleReq0 = mapSingleUpdate(topVer, topNodes, futVer, updVer);
- }
- else {
- Map<UUID, GridNearAtomicUpdateRequest> pendingMappings = mapUpdate(topNodes,
- topVer,
- futVer,
- updVer,
- remapKeys);
+ try {
+ if (size == 1 && !fastMap) {
+ assert remapKeys == null || remapKeys.size() == 1;
- if (pendingMappings.size() == 1)
- singleReq0 = F.firstValue(pendingMappings);
+ singleReq0 = mapSingleUpdate(topVer, topNodes, futVer, updVer);
+ }
else {
- if (syncMode == PRIMARY_SYNC) {
- mappings0 = U.newHashMap(pendingMappings.size());
+ Map<UUID, GridNearAtomicUpdateRequest> pendingMappings = mapUpdate(topNodes,
+ topVer,
+ futVer,
+ updVer,
+ remapKeys);
- for (GridNearAtomicUpdateRequest req : pendingMappings.values()) {
- if (req.hasPrimary())
- mappings0.put(req.nodeId(), req);
+ if (pendingMappings.size() == 1)
+ singleReq0 = F.firstValue(pendingMappings);
+ else {
+ if (syncMode == PRIMARY_SYNC) {
+ mappings0 = U.newHashMap(pendingMappings.size());
+
+ for (GridNearAtomicUpdateRequest req : pendingMappings.values()) {
+ if (req.hasPrimary())
+ mappings0.put(req.nodeId(), req);
+ }
}
- }
- else
- mappings0 = pendingMappings;
+ else
+ mappings0 = pendingMappings;
- assert !mappings0.isEmpty() || size == 0 : this;
+ assert !mappings0.isEmpty() || size == 0 : GridNearAtomicUpdateFuture.this;
+ }
}
- }
- synchronized (this) {
- assert this.futVer == null : this;
- assert this.topVer == AffinityTopologyVersion.ZERO : this;
+ synchronized (this) {
+ assert this.futVer == null : this;
+ assert this.topVer == AffinityTopologyVersion.ZERO : this;
- this.topVer = topVer;
- this.updVer = updVer;
- this.futVer = futVer;
+ this.topVer = topVer;
+ this.updVer = updVer;
+ this.futVer = futVer;
- resCnt = 0;
+ resCnt = 0;
- singleReq = singleReq0;
- mappings = mappings0;
+ singleReq = singleReq0;
+ mappings = mappings0;
- this.remapKeys = null;
+ this.remapKeys = null;
+ }
+ }
+ catch (Exception e) {
+ err = e;
}
- }
- catch (Exception e) {
- err = e;
- }
- if (err != null) {
- onDone(err);
+ if (err != null) {
+ onDone(err);
- return;
- }
+ return;
+ }
- if (storeFuture()) {
- if (!cctx.mvcc().addAtomicFuture(futVer, this)) {
- assert isDone() : this;
+ if (storeFuture()) {
+ if (!cctx.mvcc().addAtomicFuture(futVer, GridNearAtomicUpdateFuture.this)) {
+ assert isDone() : GridNearAtomicUpdateFuture.this;
- return;
+ return;
+ }
}
- }
- // Optimize mapping for single key.
- if (singleReq0 != null)
- mapSingle(singleReq0.nodeId(), singleReq0);
- else {
- assert mappings0 != null;
+ // Optimize mapping for single key.
+ if (singleReq0 != null)
+ mapSingle(singleReq0.nodeId(), singleReq0);
+ else {
+ assert mappings0 != null;
- if (size == 0)
- onDone(new GridCacheReturn(cctx, true, true, null, true));
- else
- doUpdate(mappings0);
+ if (size == 0)
+ onDone(new GridCacheReturn(cctx, true, true, null, true));
+ else
+ doUpdate(mappings0);
+ }
}
- }
- /**
- * @param topVer Topology version.
- * @return Future.
- */
- @Nullable private synchronized GridFutureAdapter<Void> completeFuture0(AffinityTopologyVersion topVer) {
- if (this.topVer == AffinityTopologyVersion.ZERO)
- return null;
+ /**
+ * @param topVer Topology version.
+ * @return Future.
+ */
+ @Nullable synchronized GridFutureAdapter<Void> completeFuture(AffinityTopologyVersion topVer) {
+ if (this.topVer == AffinityTopologyVersion.ZERO)
+ return null;
- if (this.topVer.compareTo(topVer) < 0) {
- if (topCompleteFut == null)
- topCompleteFut = new GridFutureAdapter<>();
+ if (this.topVer.compareTo(topVer) < 0) {
+ if (topCompleteFut == null)
+ topCompleteFut = new GridFutureAdapter<>();
- return topCompleteFut;
+ return topCompleteFut;
+ }
+
+ return null;
}
- return null;
- }
+ /**
+ * @return Future version.
+ */
+ GridCacheVersion onFutureDone() {
+ GridCacheVersion ver0;
- /**
- * @return Future version.
- */
- private GridCacheVersion onFutureDone() {
- GridCacheVersion ver0;
+ GridFutureAdapter<Void> fut0;
+
+ synchronized (this) {
+ fut0 = topCompleteFut;
- GridFutureAdapter<Void> fut0;
+ topCompleteFut = null;
- synchronized (this) {
- fut0 = topCompleteFut;
+ ver0 = futVer;
- topCompleteFut = null;
+ futVer = null;
+ }
- ver0 = futVer;
+ if (fut0 != null)
+ fut0.onDone();
- futVer = null;
+ return ver0;
}
- if (fut0 != null)
- fut0.onDone();
+ /**
+ * @param topNodes Cache nodes.
+ * @param topVer Topology version.
+ * @param futVer Future version.
+ * @param updVer Update version.
+ * @param remapKeys Keys to remap.
+ * @return Mapping.
+ * @throws Exception If failed.
+ */
+ @SuppressWarnings("ConstantConditions")
+ private Map<UUID, GridNearAtomicUpdateRequest> mapUpdate(Collection<ClusterNode> topNodes,
+ AffinityTopologyVersion topVer,
+ GridCacheVersion futVer,
+ @Nullable GridCacheVersion updVer,
+ @Nullable Collection<KeyCacheObject> remapKeys) throws Exception {
+ Iterator<?> it = null;
+
+ if (vals != null)
+ it = vals.iterator();
+
+ Iterator<GridCacheDrInfo> conflictPutValsIt = null;
+
+ if (conflictPutVals != null)
+ conflictPutValsIt = conflictPutVals.iterator();
+
+ Iterator<GridCacheVersion> conflictRmvValsIt = null;
+
+ if (conflictRmvVals != null)
+ conflictRmvValsIt = conflictRmvVals.iterator();
+
+ Map<UUID, GridNearAtomicUpdateRequest> pendingMappings = U.newHashMap(topNodes.size());
+
+ // Create mappings first, then send messages.
+ for (Object key : keys) {
+ if (key == null)
+ throw new NullPointerException("Null key.");
+
+ Object val;
+ GridCacheVersion conflictVer;
+ long conflictTtl;
+ long conflictExpireTime;
+
+ if (vals != null) {
+ val = it.next();
+ conflictVer = null;
+ conflictTtl = CU.TTL_NOT_CHANGED;
+ conflictExpireTime = CU.EXPIRE_TIME_CALCULATE;
+
+ if (val == null)
+ throw new NullPointerException("Null value.");
+ }
+ else if (conflictPutVals != null) {
+ GridCacheDrInfo conflictPutVal = conflictPutValsIt.next();
- return ver0;
- }
+ val = conflictPutVal.valueEx();
+ conflictVer = conflictPutVal.version();
+ conflictTtl = conflictPutVal.ttl();
+ conflictExpireTime = conflictPutVal.expireTime();
+ }
+ else if (conflictRmvVals != null) {
+ val = null;
+ conflictVer = conflictRmvValsIt.next();
+ conflictTtl = CU.TTL_NOT_CHANGED;
+ conflictExpireTime = CU.EXPIRE_TIME_CALCULATE;
+ }
+ else {
+ val = null;
+ conflictVer = null;
+ conflictTtl = CU.TTL_NOT_CHANGED;
+ conflictExpireTime = CU.EXPIRE_TIME_CALCULATE;
+ }
- /**
- * @param topNodes Cache nodes.
- * @param topVer Topology version.
- * @param futVer Future version.
- * @param updVer Update version.
- * @param remapKeys Keys to remap.
- * @return Mapping.
- * @throws Exception If failed.
- */
- @SuppressWarnings("ConstantConditions")
- private Map<UUID, GridNearAtomicUpdateRequest> mapUpdate(Collection<ClusterNode> topNodes,
- AffinityTopologyVersion topVer,
- GridCacheVersion futVer,
- @Nullable GridCacheVersion updVer,
- @Nullable Collection<KeyCacheObject> remapKeys) throws Exception {
- Iterator<?> it = null;
+ if (val == null && op != GridCacheOperation.DELETE)
+ continue;
- if (vals != null)
- it = vals.iterator();
+ KeyCacheObject cacheKey = cctx.toCacheKeyObject(key);
- Iterator<GridCacheDrInfo> conflictPutValsIt = null;
+ if (remapKeys != null && !remapKeys.contains(cacheKey))
+ continue;
- if (conflictPutVals != null)
- conflictPutValsIt = conflictPutVals.iterator();
+ if (op != TRANSFORM)
+ val = cctx.toCacheObject(val);
- Iterator<GridCacheVersion> conflictRmvValsIt = null;
+ Collection<ClusterNode> affNodes = mapKey(cacheKey, topVer, fastMap);
- if (conflictRmvVals != null)
- conflictRmvValsIt = conflictRmvVals.iterator();
+ if (affNodes.isEmpty())
+ throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
+ "(all partition nodes left the grid).");
- Map<UUID, GridNearAtomicUpdateRequest> pendingMappings = U.newHashMap(topNodes.size());
+ int i = 0;
+
+ for (ClusterNode affNode : affNodes) {
+ if (affNode == null)
+ throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
+ "(all partition nodes left the grid).");
+
+ UUID nodeId = affNode.id();
+
+ GridNearAtomicUpdateRequest mapped = pendingMappings.get(nodeId);
+
+ if (mapped == null) {
+ mapped = new GridNearAtomicUpdateRequest(
+ cctx.cacheId(),
+ nodeId,
+ futVer,
+ fastMap,
+ updVer,
+ topVer,
+ topLocked,
+ syncMode,
+ op,
+ retval,
+ expiryPlc,
+ invokeArgs,
+ filter,
+ subjId,
+ taskNameHash,
+ skipStore,
+ keepBinary,
+ cctx.kernalContext().clientNode(),
+ cctx.deploymentEnabled(),
+ keys.size());
+
+ pendingMappings.put(nodeId, mapped);
+ }
- // Create mappings first, then send messages.
- for (Object key : keys) {
- if (key == null)
- throw new NullPointerException("Null key.");
+ mapped.addUpdateEntry(cacheKey, val, conflictTtl, conflictExpireTime, conflictVer, i == 0);
+
+ i++;
+ }
+ }
+
+ return pendingMappings;
+ }
+
+ /**
+ * @param topVer Topology version.
+ * @param futVer Future version.
+ * @param updVer Update version.
+ * @return Request.
+ * @throws Exception If failed.
+ */
+ private GridNearAtomicUpdateRequestInterface mapSingleUpdate(AffinityTopologyVersion topVer,
+ Collection<ClusterNode> topNodes, GridCacheVersion futVer, @Nullable GridCacheVersion updVer)
+ throws Exception {
+ Object key = F.first(keys);
Object val;
GridCacheVersion conflictVer;
@@ -839,273 +1139,169 @@ public class GridNearAtomicUpdateFuture extends GridNearAbstractAtomicUpdateFutu
long conflictExpireTime;
if (vals != null) {
- val = it.next();
+ // Regular PUT.
+ val = F.first(vals);
conflictVer = null;
conflictTtl = CU.TTL_NOT_CHANGED;
conflictExpireTime = CU.EXPIRE_TIME_CALCULATE;
-
- if (val == null)
- throw new NullPointerException("Null value.");
}
else if (conflictPutVals != null) {
- GridCacheDrInfo conflictPutVal = conflictPutValsIt.next();
+ // Conflict PUT.
+ GridCacheDrInfo conflictPutVal = F.first(conflictPutVals);
val = conflictPutVal.valueEx();
conflictVer = conflictPutVal.version();
- conflictTtl = conflictPutVal.ttl();
+ conflictTtl = conflictPutVal.ttl();
conflictExpireTime = conflictPutVal.expireTime();
}
else if (conflictRmvVals != null) {
+ // Conflict REMOVE.
val = null;
- conflictVer = conflictRmvValsIt.next();
+ conflictVer = F.first(conflictRmvVals);
conflictTtl = CU.TTL_NOT_CHANGED;
conflictExpireTime = CU.EXPIRE_TIME_CALCULATE;
}
else {
+ // Regular REMOVE.
val = null;
conflictVer = null;
conflictTtl = CU.TTL_NOT_CHANGED;
conflictExpireTime = CU.EXPIRE_TIME_CALCULATE;
}
+ // We still can get here if user pass map with single element.
+ if (key == null)
+ throw new NullPointerException("Null key.");
+
if (val == null && op != GridCacheOperation.DELETE)
- continue;
+ throw new NullPointerException("Null value.");
KeyCacheObject cacheKey = cctx.toCacheKeyObject(key);
- if (remapKeys != null && !remapKeys.contains(cacheKey))
- continue;
-
if (op != TRANSFORM)
val = cctx.toCacheObject(val);
- Collection<ClusterNode> affNodes = mapKey(cacheKey, topVer, fastMap);
+ ClusterNode primary = cctx.affinity().primary(cacheKey, topVer);
- if (affNodes.isEmpty())
- throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
- "(all partition nodes left the grid).");
+ if (primary == null)
+ throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all partition nodes " +
+ "left the grid).");
- int i = 0;
+ // Decide whether we will use optimzied version of update request.
+ boolean optimize = true;
- for (ClusterNode affNode : affNodes) {
- if (affNode == null)
- throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
- "(all partition nodes left the grid).");
+ for (ClusterNode topNode : topNodes) {
+ if (topNode.version().compareTo(SINGLE_PUT_MSG_SINCE) < 0) {
+ optimize = false;
- UUID nodeId = affNode.id();
-
- GridNearAtomicUpdateRequest mapped = pendingMappings.get(nodeId);
-
- if (mapped == null) {
- mapped = new GridNearAtomicUpdateRequest(
- cctx.cacheId(),
- nodeId,
- futVer,
- fastMap,
- updVer,
- topVer,
- topLocked,
- syncMode,
- op,
- retval,
- expiryPlc,
- invokeArgs,
- filter,
- subjId,
- taskNameHash,
- skipStore,
- keepBinary,
- cctx.kernalContext().clientNode(),
- cctx.deploymentEnabled(),
- keys.size());
-
- pendingMappings.put(nodeId, mapped);
+ break;
}
+ }
- mapped.addUpdateEntry(cacheKey, val, conflictTtl, conflictExpireTime, conflictVer, i == 0);
-
- i++;
+ if (optimize) {
+ return new GridNearAtomicSingleUpdateRequest(
+ cacheKey,
+ val,
+ conflictTtl,
+ conflictExpireTime,
+ conflictVer,
+ cctx.cacheId(),
+ primary.id(),
+ futVer,
+ fastMap,
+ updVer,
+ topVer,
+ topLocked,
+ syncMode,
+ op,
+ retval,
+ expiryPlc,
+ invokeArgs,
+ filter,
+ subjId,
+ taskNameHash,
+ skipStore,
+ keepBinary,
+ cctx.kernalContext().clientNode(),
+ cctx.deploymentEnabled());
+ }
+ else {
+ GridNearAtomicUpdateRequest req = new GridNearAtomicUpdateRequest(
+ cctx.cacheId(),
+ primary.id(),
+ futVer,
+ fastMap,
+ updVer,
+ topVer,
+ topLocked,
+ syncMode,
+ op,
+ retval,
+ expiryPlc,
+ invokeArgs,
+ filter,
+ subjId,
+ taskNameHash,
+ skipStore,
+ keepBinary,
+ cctx.kernalContext().clientNode(),
+ cctx.deploymentEnabled(),
+ 1);
+
+ req.addUpdateEntry(cacheKey,
+ val,
+ conflictTtl,
+ conflictExpireTime,
+ conflictVer,
+ true);
+
+ return req;
}
}
- return pendingMappings;
- }
-
- /**
- * @param topVer Topology version.
- * @param topNodes Topology nodes.
- * @param futVer Future version.
- * @param updVer Update version.
- * @return Request.
- * @throws Exception If failed.
- */
- private GridNearAtomicUpdateRequestInterface mapSingleUpdate(AffinityTopologyVersion topVer,
- Collection<ClusterNode> topNodes, GridCacheVersion futVer, @Nullable GridCacheVersion updVer) throws Exception {
- Object key = F.first(keys);
-
- Object val;
- GridCacheVersion conflictVer;
- long conflictTtl;
- long conflictExpireTime;
-
- if (vals != null) {
- // Regular PUT.
- val = F.first(vals);
- conflictVer = null;
- conflictTtl = CU.TTL_NOT_CHANGED;
- conflictExpireTime = CU.EXPIRE_TIME_CALCULATE;
- }
- else if (conflictPutVals != null) {
- // Conflict PUT.
- GridCacheDrInfo conflictPutVal = F.first(conflictPutVals);
-
- val = conflictPutVal.valueEx();
- conflictVer = conflictPutVal.version();
- conflictTtl = conflictPutVal.ttl();
- conflictExpireTime = conflictPutVal.expireTime();
- }
- else if (conflictRmvVals != null) {
- // Conflict REMOVE.
- val = null;
- conflictVer = F.first(conflictRmvVals);
- conflictTtl = CU.TTL_NOT_CHANGED;
- conflictExpireTime = CU.EXPIRE_TIME_CALCULATE;
- }
- else {
- // Regular REMOVE.
- val = null;
- conflictVer = null;
- conflictTtl = CU.TTL_NOT_CHANGED;
- conflictExpireTime = CU.EXPIRE_TIME_CALCULATE;
+ /**
+ * @param ret Result from single node.
+ */
+ @SuppressWarnings("unchecked")
+ private void addInvokeResults(GridCacheReturn ret) {
+ assert op == TRANSFORM : op;
+ assert ret.value() == null || ret.value() instanceof Map : ret.value();
+
+ if (ret.value() != null) {
+ if (opRes != null)
+ opRes.mergeEntryProcessResults(ret);
+ else
+ opRes = ret;
+ }
}
- // We still can get here if user pass map with single element.
- if (key == null)
- throw new NullPointerException("Null key.");
+ /**
+ * @param failedKeys Failed keys.
+ * @param topVer Topology version for failed update.
+ * @param err Error cause.
+ */
+ private void addFailedKeys(Collection<KeyCacheObject> failedKeys,
+ AffinityTopologyVersion topVer,
+ Throwable err) {
+ CachePartialUpdateCheckedException err0 = this.err;
- if (val == null && op != GridCacheOperation.DELETE)
- throw new NullPointerException("Null value.");
+ if (err0 == null)
+ err0 = this.err = new CachePartialUpdateCheckedException("Failed to update keys (retry update if possible).");
- KeyCacheObject cacheKey = cctx.toCacheKeyObject(key);
+ Collection<Object> keys = new ArrayList<>(failedKeys.size());
- if (op != TRANSFORM)
- val = cctx.toCacheObject(val);
+ for (KeyCacheObject key : failedKeys)
+ keys.add(cctx.cacheObjectContext().unwrapBinaryIfNeeded(key, keepBinary, false));
- ClusterNode primary = cctx.affinity().primary(cacheKey, topVer);
-
- if (primary == null)
- throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all partition nodes " +
- "left the grid).");
-
- // Decide whether we will use optimzied version of update request.
- boolean optimize = true;
-
- for (ClusterNode topNode : topNodes) {
- if (topNode.version().compareTo(SINGLE_PUT_MSG_SINCE) < 0) {
- optimize = false;
-
- break;
- }
+ err0.add(keys, err, topVer);
}
- if (optimize) {
- return new GridNearAtomicSingleUpdateRequest(
- cacheKey,
- val,
- conflictTtl,
- conflictExpireTime,
- conflictVer,
- cctx.cacheId(),
- primary.id(),
- futVer,
- fastMap,
- updVer,
- topVer,
- topLocked,
- syncMode,
- op,
- retval,
- expiryPlc,
- invokeArgs,
- filter,
- subjId,
- taskNameHash,
- skipStore,
- keepBinary,
- cctx.kernalContext().clientNode(),
- cctx.deploymentEnabled());
- }
- else {
- GridNearAtomicUpdateRequest req = new GridNearAtomicUpdateRequest(
- cctx.cacheId(),
- primary.id(),
- futVer,
- fastMap,
- updVer,
- topVer,
- topLocked,
- syncMode,
- op,
- retval,
- expiryPlc,
- invokeArgs,
- filter,
- subjId,
- taskNameHash,
- skipStore,
- keepBinary,
- cctx.kernalContext().clientNode(),
- cctx.deploymentEnabled(),
- 1);
-
- req.addUpdateEntry(cacheKey,
- val,
- conflictTtl,
- conflictExpireTime,
- conflictVer,
- true);
-
- return req;
- }
- }
-
- /**
- * @param ret Result from single node.
- */
- @SuppressWarnings("unchecked")
- private void addInvokeResults(GridCacheReturn ret) {
- assert op == TRANSFORM : op;
- assert ret.value() == null || ret.value() instanceof Map : ret.value();
-
- if (ret.value() != null) {
- if (opRes != null)
- opRes.mergeEntryProcessResults(ret);
- else
- opRes = ret;
+ /** {@inheritDoc} */
+ @Override public synchronized String toString() {
+ return S.toString(UpdateState.class, this);
}
}
- /**
- * @param failedKeys Failed keys.
- * @param topVer Topology version for failed update.
- * @param err Error cause.
- */
- private void addFailedKeys(Collection<KeyCacheObject> failedKeys,
- AffinityTopologyVersion topVer,
- Throwable err) {
- CachePartialUpdateCheckedException err0 = this.err;
-
- if (err0 == null)
- err0 = this.err = new CachePartialUpdateCheckedException("Failed to update keys (retry update if possible).");
-
- Collection<Object> keys = new ArrayList<>(failedKeys.size());
-
- for (KeyCacheObject key : failedKeys)
- keys.add(cctx.cacheObjectContext().unwrapBinaryIfNeeded(key, keepBinary, false));
-
- err0.add(keys, err, topVer);
- }
-
/** {@inheritDoc} */
public String toString() {
return S.toString(GridNearAtomicUpdateFuture.class, this, super.toString());