You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/02/25 13:31:01 UTC
[02/51] [abbrv] ignite git commit: IGNITE-2532: WIP. Only
refactorings for now.
IGNITE-2532: WIP. Only refactorings for now.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/29c2aee6
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/29c2aee6
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/29c2aee6
Branch: refs/heads/ignite-2523
Commit: 29c2aee6b6ffac4f27da3732575f7d82b0e8bedd
Parents: 4665283
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Wed Feb 3 11:09:13 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Wed Feb 3 11:09:13 2016 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheAtomicFuture.java | 6 -
.../dht/atomic/GridDhtAtomicUpdateFuture.java | 5 -
.../GridNearAtomicSingleUpdateFuture.java | 1264 ------------------
.../dht/atomic/GridNearAtomicUpdateFuture.java | 1179 ++++++++--------
4 files changed, 572 insertions(+), 1882 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/29c2aee6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java
index 359909e..be24191 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java
@@ -17,7 +17,6 @@
package org.apache.ignite.internal.processors.cache;
-import java.util.Collection;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -38,9 +37,4 @@ public interface GridCacheAtomicFuture<R> extends GridCacheFuture<R> {
* @return Future or {@code null} if no need to wait.
*/
public IgniteInternalFuture<Void> completeFuture(AffinityTopologyVersion topVer);
-
- /**
- * @return Future keys.
- */
- public Collection<?> keys();
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/29c2aee6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
index 6891d3b..9fe60c9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
@@ -201,11 +201,6 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> implement
return null;
}
- /** {@inheritDoc} */
- @Override public Collection<KeyCacheObject> keys() {
- return keys;
- }
-
/**
* @param entry Entry to map.
* @param val Value to write.
http://git-wip-us.apache.org/repos/asf/ignite/blob/29c2aee6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
deleted file mode 100644
index d633e47..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
+++ /dev/null
@@ -1,1264 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
-
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.cache.CacheWriteSynchronizationMode;
-import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
-import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
-import org.apache.ignite.internal.processors.cache.CachePartialUpdateCheckedException;
-import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager;
-import org.apache.ignite.internal.processors.cache.GridCacheAtomicFuture;
-import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.GridCacheMvccManager;
-import org.apache.ignite.internal.processors.cache.GridCacheOperation;
-import org.apache.ignite.internal.processors.cache.GridCacheReturn;
-import org.apache.ignite.internal.processors.cache.GridCacheTryPutFailedException;
-import org.apache.ignite.internal.processors.cache.KeyCacheObject;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
-import org.apache.ignite.internal.processors.cache.distributed.near.GridNearAtomicCache;
-import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
-import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.internal.util.future.GridFinishedFuture;
-import org.apache.ignite.internal.util.future.GridFutureAdapter;
-import org.apache.ignite.internal.util.tostring.GridToStringInclude;
-import org.apache.ignite.internal.util.typedef.CI1;
-import org.apache.ignite.internal.util.typedef.CI2;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.X;
-import org.apache.ignite.internal.util.typedef.internal.CU;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteUuid;
-import org.jetbrains.annotations.Nullable;
-
-import javax.cache.expiry.ExpiryPolicy;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.atomic.AtomicReference;
-
-import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.CLOCK;
-import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC;
-import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
-import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC;
-import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
-
-/**
- * DHT atomic cache single near update future.
- */
-public class GridNearAtomicSingleUpdateFuture extends GridFutureAdapter<Object>
- implements GridCacheAtomicFuture<Object>{
- /** Logger reference. */
- private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
-
- /** Logger. */
- protected static IgniteLogger log;
-
- /** Cache context. */
- private final GridCacheContext cctx;
-
- /** Cache. */
- private GridDhtAtomicCache cache;
-
- /** Update operation. */
- private final GridCacheOperation op;
-
- /** Keys */
- private Collection<?> keys;
-
- /** Values. */
- @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"})
- private Collection<?> vals;
-
- /** Optional arguments for entry processor. */
- private Object[] invokeArgs;
-
- /** Conflict put values. */
- @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"})
- private Collection<GridCacheDrInfo> conflictPutVals;
-
- /** Conflict remove values. */
- @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"})
- private Collection<GridCacheVersion> conflictRmvVals;
-
- /** Return value require flag. */
- private final boolean retval;
-
- /** Expiry policy. */
- private final ExpiryPolicy expiryPlc;
-
- /** Optional filter. */
- private final CacheEntryPredicate[] filter;
-
- /** Write synchronization mode. */
- private final CacheWriteSynchronizationMode syncMode;
-
- /** Raw return value flag. */
- private final boolean rawRetval;
-
- /** Fast map flag. */
- private final boolean fastMap;
-
- /** Near cache flag. */
- private final boolean nearEnabled;
-
- /** Subject ID. */
- private final UUID subjId;
-
- /** Task name hash. */
- private final int taskNameHash;
-
- /** Topology locked flag. Set if atomic update is performed inside a TX or explicit lock. */
- private boolean topLocked;
-
- /** Skip store flag. */
- private final boolean skipStore;
-
- /** */
- private final boolean keepBinary;
-
- /** Wait for topology future flag. */
- private final boolean waitTopFut;
-
- /** Remap count. */
- private int remapCnt;
-
- /** State. */
- private final UpdateState state;
-
- /**
- * @param cctx Cache context.
- * @param cache Cache instance.
- * @param syncMode Write synchronization mode.
- * @param op Update operation.
- * @param keys Keys to update.
- * @param vals Values or transform closure.
- * @param invokeArgs Optional arguments for entry processor.
- * @param conflictPutVals Conflict put values (optional).
- * @param conflictRmvVals Conflict remove values (optional).
- * @param retval Return value require flag.
- * @param rawRetval {@code True} if should return {@code GridCacheReturn} as future result.
- * @param expiryPlc Expiry policy explicitly specified for cache operation.
- * @param filter Entry filter.
- * @param subjId Subject ID.
- * @param taskNameHash Task name hash code.
- * @param skipStore Skip store flag.
- * @param keepBinary Keep binary flag.
- * @param remapCnt Maximum number of retries.
- * @param waitTopFut If {@code false} does not wait for affinity change future.
- */
- public GridNearAtomicSingleUpdateFuture(
- GridCacheContext cctx,
- GridDhtAtomicCache cache,
- CacheWriteSynchronizationMode syncMode,
- GridCacheOperation op,
- Collection<?> keys,
- @Nullable Collection<?> vals,
- @Nullable Object[] invokeArgs,
- @Nullable Collection<GridCacheDrInfo> conflictPutVals,
- @Nullable Collection<GridCacheVersion> conflictRmvVals,
- final boolean retval,
- final boolean rawRetval,
- @Nullable ExpiryPolicy expiryPlc,
- final CacheEntryPredicate[] filter,
- UUID subjId,
- int taskNameHash,
- boolean skipStore,
- boolean keepBinary,
- int remapCnt,
- boolean waitTopFut
- ) {
- this.rawRetval = rawRetval;
-
- assert vals == null || vals.size() == keys.size();
- assert conflictPutVals == null || conflictPutVals.size() == keys.size();
- assert conflictRmvVals == null || conflictRmvVals.size() == keys.size();
- assert subjId != null;
-
- this.cctx = cctx;
- this.cache = cache;
- this.syncMode = syncMode;
- this.op = op;
- this.keys = keys;
- this.vals = vals;
- this.invokeArgs = invokeArgs;
- this.conflictPutVals = conflictPutVals;
- this.conflictRmvVals = conflictRmvVals;
- this.retval = retval;
- this.expiryPlc = expiryPlc;
- this.filter = filter;
- this.subjId = subjId;
- this.taskNameHash = taskNameHash;
- this.skipStore = skipStore;
- this.keepBinary = keepBinary;
- this.waitTopFut = waitTopFut;
-
- if (log == null)
- log = U.logger(cctx.kernalContext(), logRef, GridFutureAdapter.class);
-
- fastMap = F.isEmpty(filter) && op != TRANSFORM && cctx.config().getWriteSynchronizationMode() == FULL_SYNC &&
- cctx.config().getAtomicWriteOrderMode() == CLOCK &&
- !(cctx.writeThrough() && cctx.config().getInterceptor() != null);
-
- nearEnabled = CU.isNearEnabled(cctx);
-
- if (!waitTopFut)
- remapCnt = 1;
-
- this.remapCnt = remapCnt;
-
- state = new UpdateState();
- }
-
- /** {@inheritDoc} */
- @Override public IgniteUuid futureId() {
- throw new UnsupportedOperationException();
- }
-
- /** {@inheritDoc} */
- @Override public GridCacheVersion version() {
- return state.futureVersion();
- }
-
- /**
- * @return {@code True} if this future should block partition map exchange.
- */
- private boolean waitForPartitionExchange() {
- // Wait fast-map near atomic update futures in CLOCK mode.
- return fastMap;
- }
-
- /** {@inheritDoc} */
- @Override public Collection<?> keys() {
- return keys;
- }
-
- /** {@inheritDoc} */
- @Override public boolean onNodeLeft(UUID nodeId) {
- state.onNodeLeft(nodeId);
-
- return false;
- }
-
- /** {@inheritDoc} */
- @Override public boolean trackable() {
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public void markNotTrackable() {
- // No-op.
- }
-
- /**
- * Performs future mapping.
- */
- public void map() {
- AffinityTopologyVersion topVer = cctx.shared().lockedTopologyVersion(null);
-
- if (topVer == null)
- mapOnTopology();
- else {
- topLocked = true;
-
- // Cannot remap.
- remapCnt = 1;
-
- state.map(topVer, null);
- }
- }
-
- /** {@inheritDoc} */
- @Override public IgniteInternalFuture<Void> completeFuture(AffinityTopologyVersion topVer) {
- if (waitForPartitionExchange()) {
- GridFutureAdapter<Void> fut = state.completeFuture(topVer);
-
- if (fut != null && isDone()) {
- fut.onDone();
-
- return null;
- }
-
- return fut;
- }
-
- return null;
- }
-
- /** {@inheritDoc} */
- @SuppressWarnings("ConstantConditions")
- @Override public boolean onDone(@Nullable Object res, @Nullable Throwable err) {
- assert res == null || res instanceof GridCacheReturn;
-
- GridCacheReturn ret = (GridCacheReturn)res;
-
- Object retval =
- res == null ? null : rawRetval ? ret : (this.retval || op == TRANSFORM) ?
- cctx.unwrapBinaryIfNeeded(ret.value(), keepBinary) : ret.success();
-
- if (op == TRANSFORM && retval == null)
- retval = Collections.emptyMap();
-
- if (super.onDone(retval, err)) {
- GridCacheVersion futVer = state.onFutureDone();
-
- if (futVer != null)
- cctx.mvcc().removeAtomicFuture(futVer);
-
- return true;
- }
-
- return false;
- }
-
- /**
- * Response callback.
- *
- * @param nodeId Node ID.
- * @param res Update response.
- */
- public void onResult(UUID nodeId, GridNearAtomicUpdateResponse res) {
- state.onResult(nodeId, res, false);
- }
-
- /**
- * Updates near cache.
- *
- * @param req Update request.
- * @param res Update response.
- */
- private void updateNear(GridNearAtomicUpdateRequest req, GridNearAtomicUpdateResponse res) {
- assert nearEnabled;
-
- if (res.remapKeys() != null || !req.hasPrimary())
- return;
-
- GridNearAtomicCache near = (GridNearAtomicCache)cctx.dht().near();
-
- near.processNearAtomicUpdateResponse(req, res);
- }
-
- /**
- * Maps future on ready topology.
- */
- private void mapOnTopology() {
- cache.topology().readLock();
-
- AffinityTopologyVersion topVer = null;
-
- try {
- if (cache.topology().stopping()) {
- onDone(new IgniteCheckedException("Failed to perform cache operation (cache is stopped): " +
- cache.name()));
-
- return;
- }
-
- GridDhtTopologyFuture fut = cache.topology().topologyVersionFuture();
-
- if (fut.isDone()) {
- Throwable err = fut.validateCache(cctx);
-
- if (err != null) {
- onDone(err);
-
- return;
- }
-
- topVer = fut.topologyVersion();
- }
- else {
- if (waitTopFut) {
- assert !topLocked : this;
-
- fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
- @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
- cctx.kernalContext().closure().runLocalSafe(new Runnable() {
- @Override public void run() {
- mapOnTopology();
- }
- });
- }
- });
- }
- else
- onDone(new GridCacheTryPutFailedException());
-
- return;
- }
- }
- finally {
- cache.topology().readUnlock();
- }
-
- state.map(topVer, null);
- }
-
- /**
- * @return {@code True} future is stored by {@link GridCacheMvccManager#addAtomicFuture}.
- */
- private boolean storeFuture() {
- return cctx.config().getAtomicWriteOrderMode() == CLOCK || syncMode != FULL_ASYNC;
- }
-
- /**
- * Maps key to nodes. If filters are absent and operation is not TRANSFORM, then we can assign version on near
- * node and send updates in parallel to all participating nodes.
- *
- * @param key Key to map.
- * @param topVer Topology version to map.
- * @param fastMap Flag indicating whether mapping is performed for fast-circuit update.
- * @return Collection of nodes to which key is mapped.
- */
- private Collection<ClusterNode> mapKey(
- KeyCacheObject key,
- AffinityTopologyVersion topVer,
- boolean fastMap
- ) {
- GridCacheAffinityManager affMgr = cctx.affinity();
-
- // If we can send updates in parallel - do it.
- return fastMap ?
- cctx.topology().nodes(affMgr.partition(key), topVer) :
- Collections.singletonList(affMgr.primary(key, topVer));
- }
-
- /**
- * Maps future to single node.
- *
- * @param nodeId Node ID.
- * @param req Request.
- */
- private void mapSingle(UUID nodeId, GridNearAtomicUpdateRequest req) {
- if (cctx.localNodeId().equals(nodeId)) {
- cache.updateAllAsyncInternal(nodeId, req,
- new CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse>() {
- @Override public void apply(GridNearAtomicUpdateRequest req, GridNearAtomicUpdateResponse res) {
- onResult(res.nodeId(), res);
- }
- });
- }
- else {
- try {
- if (log.isDebugEnabled())
- log.debug("Sending near atomic update request [nodeId=" + req.nodeId() + ", req=" + req + ']');
-
- cctx.io().send(req.nodeId(), req, cctx.ioPolicy());
-
- if (syncMode == FULL_ASYNC)
- onDone(new GridCacheReturn(cctx, true, true, null, true));
- }
- catch (IgniteCheckedException e) {
- state.onSendError(req, e);
- }
- }
- }
-
- /**
- * Sends messages to remote nodes and updates local cache.
- *
- * @param mappings Mappings to send.
- */
- private void doUpdate(Map<UUID, GridNearAtomicUpdateRequest> mappings) {
- UUID locNodeId = cctx.localNodeId();
-
- GridNearAtomicUpdateRequest locUpdate = null;
-
- // Send messages to remote nodes first, then run local update.
- for (GridNearAtomicUpdateRequest req : mappings.values()) {
- if (locNodeId.equals(req.nodeId())) {
- assert locUpdate == null : "Cannot have more than one local mapping [locUpdate=" + locUpdate +
- ", req=" + req + ']';
-
- locUpdate = req;
- }
- else {
- try {
- if (log.isDebugEnabled())
- log.debug("Sending near atomic update request [nodeId=" + req.nodeId() + ", req=" + req + ']');
-
- cctx.io().send(req.nodeId(), req, cctx.ioPolicy());
- }
- catch (IgniteCheckedException e) {
- state.onSendError(req, e);
- }
- }
- }
-
- if (locUpdate != null) {
- cache.updateAllAsyncInternal(cctx.localNodeId(), locUpdate,
- new CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse>() {
- @Override public void apply(GridNearAtomicUpdateRequest req, GridNearAtomicUpdateResponse res) {
- onResult(res.nodeId(), res);
- }
- });
- }
-
- if (syncMode == FULL_ASYNC)
- onDone(new GridCacheReturn(cctx, true, true, null, true));
- }
-
- /**
- *
- */
- private class UpdateState {
- /** Current topology version. */
- private AffinityTopologyVersion topVer = AffinityTopologyVersion.ZERO;
-
- /** */
- private GridCacheVersion updVer;
-
- /** Topology version when got mapping error. */
- private AffinityTopologyVersion mapErrTopVer;
-
- /** Mappings if operations is mapped to more than one node. */
- @GridToStringInclude
- private Map<UUID, GridNearAtomicUpdateRequest> mappings;
-
- /** */
- private int resCnt;
-
- /** Error. */
- private CachePartialUpdateCheckedException err;
-
- /** Future ID. */
- private GridCacheVersion futVer;
-
- /** Completion future for a particular topology version. */
- private GridFutureAdapter<Void> topCompleteFut;
-
- /** Keys to remap. */
- private Collection<KeyCacheObject> remapKeys;
-
- /** Not null is operation is mapped to single node. */
- private GridNearAtomicUpdateRequest singleReq;
-
- /** Operation result. */
- private GridCacheReturn opRes;
-
- /**
- * @return Future version.
- */
- @Nullable synchronized GridCacheVersion futureVersion() {
- return futVer;
- }
-
- /**
- * @param nodeId Left node ID.
- */
- void onNodeLeft(UUID nodeId) {
- GridNearAtomicUpdateResponse res = null;
-
- synchronized (this) {
- GridNearAtomicUpdateRequest req;
-
- if (singleReq != null)
- req = singleReq.nodeId().equals(nodeId) ? singleReq : null;
- else
- req = mappings != null ? mappings.get(nodeId) : null;
-
- if (req != null && req.response() == null) {
- res = new GridNearAtomicUpdateResponse(cctx.cacheId(),
- nodeId,
- req.futureVersion(),
- cctx.deploymentEnabled());
-
- ClusterTopologyCheckedException e = new ClusterTopologyCheckedException("Primary node left grid " +
- "before response is received: " + nodeId);
-
- e.retryReadyFuture(cctx.shared().nextAffinityReadyFuture(req.topologyVersion()));
-
- res.addFailedKeys(req.keys(), e);
- }
- }
-
- if (res != null)
- onResult(nodeId, res, true);
- }
-
- /**
- * @param nodeId Node ID.
- * @param res Response.
- * @param nodeErr {@code True} if response was created on node failure.
- */
- @SuppressWarnings("unchecked")
- void onResult(UUID nodeId, GridNearAtomicUpdateResponse res, boolean nodeErr) {
- GridNearAtomicUpdateRequest req;
-
- AffinityTopologyVersion remapTopVer = null;
-
- GridCacheReturn opRes0 = null;
- CachePartialUpdateCheckedException err0 = null;
-
- boolean rcvAll;
-
- GridFutureAdapter<?> fut0 = null;
-
- synchronized (this) {
- if (!res.futureVersion().equals(futVer))
- return;
-
- if (singleReq != null) {
- if (!singleReq.nodeId().equals(nodeId))
- return;
-
- req = singleReq;
-
- singleReq = null;
-
- rcvAll = true;
- }
- else {
- req = mappings != null ? mappings.get(nodeId) : null;
-
- if (req != null && req.onResponse(res)) {
- resCnt++;
-
- rcvAll = mappings.size() == resCnt;
- }
- else
- return;
- }
-
- assert req != null && req.topologyVersion().equals(topVer) : req;
-
- if (res.remapKeys() != null) {
- assert !fastMap || cctx.kernalContext().clientNode();
-
- if (remapKeys == null)
- remapKeys = U.newHashSet(res.remapKeys().size());
-
- remapKeys.addAll(res.remapKeys());
-
- if (mapErrTopVer == null || mapErrTopVer.compareTo(req.topologyVersion()) < 0)
- mapErrTopVer = req.topologyVersion();
- }
- else if (res.error() != null) {
- if (res.failedKeys() != null)
- addFailedKeys(res.failedKeys(), req.topologyVersion(), res.error());
- }
- else {
- if (!req.fastMap() || req.hasPrimary()) {
- GridCacheReturn ret = res.returnValue();
-
- if (op == TRANSFORM) {
- if (ret != null)
- addInvokeResults(ret);
- }
- else
- opRes = ret;
- }
- }
-
- if (rcvAll) {
- if (remapKeys != null) {
- assert mapErrTopVer != null;
-
- remapTopVer = new AffinityTopologyVersion(mapErrTopVer.topologyVersion() + 1);
- }
- else {
- if (err != null &&
- X.hasCause(err, CachePartialUpdateCheckedException.class) &&
- X.hasCause(err, ClusterTopologyCheckedException.class) &&
- storeFuture() &&
- --remapCnt > 0) {
- ClusterTopologyCheckedException topErr =
- X.cause(err, ClusterTopologyCheckedException.class);
-
- if (!(topErr instanceof ClusterTopologyServerNotFoundException)) {
- CachePartialUpdateCheckedException cause =
- X.cause(err, CachePartialUpdateCheckedException.class);
-
- assert cause != null && cause.topologyVersion() != null : err;
-
- remapTopVer =
- new AffinityTopologyVersion(cause.topologyVersion().topologyVersion() + 1);
-
- err = null;
-
- Collection<Object> failedKeys = cause.failedKeys();
-
- remapKeys = new ArrayList<>(failedKeys.size());
-
- for (Object key : failedKeys)
- remapKeys.add(cctx.toCacheKeyObject(key));
-
- updVer = null;
- }
- }
- }
-
- if (remapTopVer == null) {
- err0 = err;
- opRes0 = opRes;
- }
- else {
- fut0 = topCompleteFut;
-
- topCompleteFut = null;
-
- cctx.mvcc().removeAtomicFuture(futVer);
-
- futVer = null;
- topVer = AffinityTopologyVersion.ZERO;
- }
- }
- }
-
- if (res.error() != null && res.failedKeys() == null) {
- onDone(res.error());
-
- return;
- }
-
- if (rcvAll && nearEnabled) {
- if (mappings != null) {
- for (GridNearAtomicUpdateRequest req0 : mappings.values()) {
- GridNearAtomicUpdateResponse res0 = req0.response();
-
- assert res0 != null : req0;
-
- updateNear(req0, res0);
- }
- }
- else if (!nodeErr)
- updateNear(req, res);
- }
-
- if (remapTopVer != null) {
- if (fut0 != null)
- fut0.onDone();
-
- if (!waitTopFut) {
- onDone(new GridCacheTryPutFailedException());
-
- return;
- }
-
- if (topLocked) {
- assert !F.isEmpty(remapKeys) : remapKeys;
-
- CachePartialUpdateCheckedException e =
- new CachePartialUpdateCheckedException("Failed to update keys (retry update if possible).");
-
- ClusterTopologyCheckedException cause = new ClusterTopologyCheckedException(
- "Failed to update keys, topology changed while execute atomic update inside transaction.");
-
- cause.retryReadyFuture(cctx.affinity().affinityReadyFuture(remapTopVer));
-
- e.add(remapKeys, cause);
-
- onDone(e);
-
- return;
- }
-
- IgniteInternalFuture<AffinityTopologyVersion> fut =
- cctx.shared().exchange().affinityReadyFuture(remapTopVer);
-
- if (fut == null)
- fut = new GridFinishedFuture<>(remapTopVer);
-
- fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
- @Override public void apply(final IgniteInternalFuture<AffinityTopologyVersion> fut) {
- cctx.kernalContext().closure().runLocalSafe(new Runnable() {
- @Override public void run() {
- try {
- AffinityTopologyVersion topVer = fut.get();
-
- map(topVer, remapKeys);
- }
- catch (IgniteCheckedException e) {
- onDone(e);
- }
- }
- });
- }
- });
-
- return;
- }
-
- if (rcvAll)
- onDone(opRes0, err0);
- }
-
- /**
- * @param req Request.
- * @param e Error.
- */
- void onSendError(GridNearAtomicUpdateRequest req, IgniteCheckedException e) {
- synchronized (this) {
- GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(cctx.cacheId(),
- req.nodeId(),
- req.futureVersion(),
- cctx.deploymentEnabled());
-
- res.addFailedKeys(req.keys(), e);
-
- onResult(req.nodeId(), res, true);
- }
- }
-
- /**
- * @param topVer Topology version.
- * @param remapKeys Keys to remap.
- */
- void map(AffinityTopologyVersion topVer, @Nullable Collection<KeyCacheObject> remapKeys) {
- Collection<ClusterNode> topNodes = CU.affinityNodes(cctx, topVer);
-
- if (F.isEmpty(topNodes)) {
- onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all partition nodes " +
- "left the grid)."));
-
- return;
- }
-
- Exception err = null;
- GridNearAtomicUpdateRequest singleReq0 = null;
- Map<UUID, GridNearAtomicUpdateRequest> mappings0 = null;
-
- int size = keys.size();
-
- GridCacheVersion futVer = cctx.versions().next(topVer);
-
- GridCacheVersion updVer;
-
- // Assign version on near node in CLOCK ordering mode even if fastMap is false.
- if (cctx.config().getAtomicWriteOrderMode() == CLOCK) {
- updVer = this.updVer;
-
- if (updVer == null) {
- updVer = cctx.versions().next(topVer);
-
- if (log.isDebugEnabled())
- log.debug("Assigned fast-map version for update on near node: " + updVer);
- }
- }
- else
- updVer = null;
-
- try {
- if (size == 1 && !fastMap) {
- assert remapKeys == null || remapKeys.size() == 1;
-
- singleReq0 = mapSingleUpdate(topVer, futVer, updVer);
- }
- else {
- Map<UUID, GridNearAtomicUpdateRequest> pendingMappings = mapUpdate(topNodes,
- topVer,
- futVer,
- updVer,
- remapKeys);
-
- if (pendingMappings.size() == 1)
- singleReq0 = F.firstValue(pendingMappings);
- else {
- if (syncMode == PRIMARY_SYNC) {
- mappings0 = U.newHashMap(pendingMappings.size());
-
- for (GridNearAtomicUpdateRequest req : pendingMappings.values()) {
- if (req.hasPrimary())
- mappings0.put(req.nodeId(), req);
- }
- }
- else
- mappings0 = pendingMappings;
-
- assert !mappings0.isEmpty() || size == 0 : GridNearAtomicSingleUpdateFuture.this;
- }
- }
-
- synchronized (this) {
- assert this.futVer == null : this;
- assert this.topVer == AffinityTopologyVersion.ZERO : this;
-
- this.topVer = topVer;
- this.updVer = updVer;
- this.futVer = futVer;
-
- resCnt = 0;
-
- singleReq = singleReq0;
- mappings = mappings0;
-
- this.remapKeys = null;
- }
- }
- catch (Exception e) {
- err = e;
- }
-
- if (err != null) {
- onDone(err);
-
- return;
- }
-
- if (storeFuture()) {
- if (!cctx.mvcc().addAtomicFuture(futVer, GridNearAtomicSingleUpdateFuture.this)) {
- assert isDone() : GridNearAtomicSingleUpdateFuture.this;
-
- return;
- }
- }
-
- // Optimize mapping for single key.
- if (singleReq0 != null)
- mapSingle(singleReq0.nodeId(), singleReq0);
- else {
- assert mappings0 != null;
-
- if (size == 0)
- onDone(new GridCacheReturn(cctx, true, true, null, true));
- else
- doUpdate(mappings0);
- }
- }
-
- /**
- * @param topVer Topology version.
- * @return Future.
- */
- @Nullable synchronized GridFutureAdapter<Void> completeFuture(AffinityTopologyVersion topVer) {
- if (this.topVer == AffinityTopologyVersion.ZERO)
- return null;
-
- if (this.topVer.compareTo(topVer) < 0) {
- if (topCompleteFut == null)
- topCompleteFut = new GridFutureAdapter<>();
-
- return topCompleteFut;
- }
-
- return null;
- }
-
- /**
- * @return Future version.
- */
- GridCacheVersion onFutureDone() {
- GridCacheVersion ver0;
-
- GridFutureAdapter<Void> fut0;
-
- synchronized (this) {
- fut0 = topCompleteFut;
-
- topCompleteFut = null;
-
- ver0 = futVer;
-
- futVer = null;
- }
-
- if (fut0 != null)
- fut0.onDone();
-
- return ver0;
- }
-
- /**
- * @param topNodes Cache nodes.
- * @param topVer Topology version.
- * @param futVer Future version.
- * @param updVer Update version.
- * @param remapKeys Keys to remap.
- * @return Mapping.
- * @throws Exception If failed.
- */
- private Map<UUID, GridNearAtomicUpdateRequest> mapUpdate(Collection<ClusterNode> topNodes,
- AffinityTopologyVersion topVer,
- GridCacheVersion futVer,
- @Nullable GridCacheVersion updVer,
- @Nullable Collection<KeyCacheObject> remapKeys) throws Exception {
- Iterator<?> it = null;
-
- if (vals != null)
- it = vals.iterator();
-
- Iterator<GridCacheDrInfo> conflictPutValsIt = null;
-
- if (conflictPutVals != null)
- conflictPutValsIt = conflictPutVals.iterator();
-
- Iterator<GridCacheVersion> conflictRmvValsIt = null;
-
- if (conflictRmvVals != null)
- conflictRmvValsIt = conflictRmvVals.iterator();
-
- Map<UUID, GridNearAtomicUpdateRequest> pendingMappings = U.newHashMap(topNodes.size());
-
- // Create mappings first, then send messages.
- for (Object key : keys) {
- if (key == null)
- throw new NullPointerException("Null key.");
-
- Object val;
- GridCacheVersion conflictVer;
- long conflictTtl;
- long conflictExpireTime;
-
- if (vals != null) {
- val = it.next();
- conflictVer = null;
- conflictTtl = CU.TTL_NOT_CHANGED;
- conflictExpireTime = CU.EXPIRE_TIME_CALCULATE;
-
- if (val == null)
- throw new NullPointerException("Null value.");
- }
- else if (conflictPutVals != null) {
- GridCacheDrInfo conflictPutVal = conflictPutValsIt.next();
-
- val = conflictPutVal.valueEx();
- conflictVer = conflictPutVal.version();
- conflictTtl = conflictPutVal.ttl();
- conflictExpireTime = conflictPutVal.expireTime();
- }
- else if (conflictRmvVals != null) {
- val = null;
- conflictVer = conflictRmvValsIt.next();
- conflictTtl = CU.TTL_NOT_CHANGED;
- conflictExpireTime = CU.EXPIRE_TIME_CALCULATE;
- }
- else {
- val = null;
- conflictVer = null;
- conflictTtl = CU.TTL_NOT_CHANGED;
- conflictExpireTime = CU.EXPIRE_TIME_CALCULATE;
- }
-
- if (val == null && op != GridCacheOperation.DELETE)
- continue;
-
- KeyCacheObject cacheKey = cctx.toCacheKeyObject(key);
-
- if (remapKeys != null && !remapKeys.contains(cacheKey))
- continue;
-
- if (op != TRANSFORM)
- val = cctx.toCacheObject(val);
-
- Collection<ClusterNode> affNodes = mapKey(cacheKey, topVer, fastMap);
-
- if (affNodes.isEmpty())
- throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
- "(all partition nodes left the grid).");
-
- int i = 0;
-
- for (ClusterNode affNode : affNodes) {
- if (affNode == null)
- throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
- "(all partition nodes left the grid).");
-
- UUID nodeId = affNode.id();
-
- GridNearAtomicUpdateRequest mapped = pendingMappings.get(nodeId);
-
- if (mapped == null) {
- mapped = new GridNearAtomicUpdateRequest(
- cctx.cacheId(),
- nodeId,
- futVer,
- fastMap,
- updVer,
- topVer,
- topLocked,
- syncMode,
- op,
- retval,
- expiryPlc,
- invokeArgs,
- filter,
- subjId,
- taskNameHash,
- skipStore,
- keepBinary,
- cctx.kernalContext().clientNode(),
- cctx.deploymentEnabled(),
- keys.size());
-
- pendingMappings.put(nodeId, mapped);
- }
-
- mapped.addUpdateEntry(cacheKey, val, conflictTtl, conflictExpireTime, conflictVer, i == 0);
-
- i++;
- }
- }
-
- return pendingMappings;
- }
-
- /**
- * @param topVer Topology version.
- * @param futVer Future version.
- * @param updVer Update version.
- * @return Request.
- * @throws Exception If failed.
- */
- private GridNearAtomicUpdateRequest mapSingleUpdate(AffinityTopologyVersion topVer,
- GridCacheVersion futVer,
- @Nullable GridCacheVersion updVer) throws Exception {
- Object key = F.first(keys);
-
- Object val;
- GridCacheVersion conflictVer;
- long conflictTtl;
- long conflictExpireTime;
-
- if (vals != null) {
- // Regular PUT.
- val = F.first(vals);
- conflictVer = null;
- conflictTtl = CU.TTL_NOT_CHANGED;
- conflictExpireTime = CU.EXPIRE_TIME_CALCULATE;
- }
- else if (conflictPutVals != null) {
- // Conflict PUT.
- GridCacheDrInfo conflictPutVal = F.first(conflictPutVals);
-
- val = conflictPutVal.valueEx();
- conflictVer = conflictPutVal.version();
- conflictTtl = conflictPutVal.ttl();
- conflictExpireTime = conflictPutVal.expireTime();
- }
- else if (conflictRmvVals != null) {
- // Conflict REMOVE.
- val = null;
- conflictVer = F.first(conflictRmvVals);
- conflictTtl = CU.TTL_NOT_CHANGED;
- conflictExpireTime = CU.EXPIRE_TIME_CALCULATE;
- }
- else {
- // Regular REMOVE.
- val = null;
- conflictVer = null;
- conflictTtl = CU.TTL_NOT_CHANGED;
- conflictExpireTime = CU.EXPIRE_TIME_CALCULATE;
- }
-
- // We still can get here if user pass map with single element.
- if (key == null)
- throw new NullPointerException("Null key.");
-
- if (val == null && op != GridCacheOperation.DELETE)
- throw new NullPointerException("Null value.");
-
- KeyCacheObject cacheKey = cctx.toCacheKeyObject(key);
-
- if (op != TRANSFORM)
- val = cctx.toCacheObject(val);
-
- ClusterNode primary = cctx.affinity().primary(cacheKey, topVer);
-
- if (primary == null)
- throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all partition nodes " +
- "left the grid).");
-
- GridNearAtomicUpdateRequest req = new GridNearAtomicUpdateRequest(
- cctx.cacheId(),
- primary.id(),
- futVer,
- fastMap,
- updVer,
- topVer,
- topLocked,
- syncMode,
- op,
- retval,
- expiryPlc,
- invokeArgs,
- filter,
- subjId,
- taskNameHash,
- skipStore,
- keepBinary,
- cctx.kernalContext().clientNode(),
- cctx.deploymentEnabled(),
- 1);
-
- req.addUpdateEntry(cacheKey,
- val,
- conflictTtl,
- conflictExpireTime,
- conflictVer,
- true);
-
- return req;
- }
-
- /**
- * @param ret Result from single node.
- */
- @SuppressWarnings("unchecked")
- private void addInvokeResults(GridCacheReturn ret) {
- assert op == TRANSFORM : op;
- assert ret.value() == null || ret.value() instanceof Map : ret.value();
-
- if (ret.value() != null) {
- if (opRes != null)
- opRes.mergeEntryProcessResults(ret);
- else
- opRes = ret;
- }
- }
-
- /**
- * @param failedKeys Failed keys.
- * @param topVer Topology version for failed update.
- * @param err Error cause.
- */
- private void addFailedKeys(Collection<KeyCacheObject> failedKeys,
- AffinityTopologyVersion topVer,
- Throwable err) {
- CachePartialUpdateCheckedException err0 = this.err;
-
- if (err0 == null)
- err0 = this.err = new CachePartialUpdateCheckedException("Failed to update keys (retry update if possible).");
-
- Collection<Object> keys = new ArrayList<>(failedKeys.size());
-
- for (KeyCacheObject key : failedKeys)
- keys.add(cctx.cacheObjectContext().unwrapBinaryIfNeeded(key, keepBinary, false));
-
- err0.add(keys, err, topVer);
- }
-
- /** {@inheritDoc} */
- @Override public synchronized String toString() {
- return S.toString(UpdateState.class, this);
- }
- }
-
- /** {@inheritDoc} */
- public String toString() {
- return S.toString(GridNearAtomicSingleUpdateFuture.class, this, super.toString());
- }
-}