You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/03/22 13:31:04 UTC
[1/2] ignite git commit: Restore clock mode.
Repository: ignite
Updated Branches:
refs/heads/ignite-2.0-clock [created] 0015962a4
http://git-wip-us.apache.org/repos/asf/ignite/blob/0015962a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFastMapUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFastMapUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFastMapUpdateFuture.java
new file mode 100644
index 0000000..9048e85
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFastMapUpdateFuture.java
@@ -0,0 +1,1026 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import javax.cache.expiry.ExpiryPolicy;
+import javax.cache.processor.EntryProcessor;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
+import org.apache.ignite.internal.processors.cache.CachePartialUpdateCheckedException;
+import org.apache.ignite.internal.processors.cache.EntryProcessorResourceInjectorProxy;
+import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheOperation;
+import org.apache.ignite.internal.processors.cache.GridCacheReturn;
+import org.apache.ignite.internal.processors.cache.GridCacheTryPutFailedException;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearAtomicCache;
+import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.future.GridFinishedFuture;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.CI1;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.CLOCK;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC;
+import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
+
+/**
+ * DHT atomic cache near update future.
+ */
+public class GridNearAtomicFastMapUpdateFuture extends GridNearAtomicAbstractUpdateFuture {
+ /** Completion future for a particular topology version. */
+ private GridFutureAdapter<Void> topCompleteFut;
+
+ /** */
+ private final boolean fastMap = true;
+
+ /** Topology version when got mapping error. */
+ private AffinityTopologyVersion mapErrTopVer;
+
+ /** Keys */
+ private Collection<?> keys;
+
+ /** Values. */
+ @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"})
+ private Collection<?> vals;
+
+ /** Conflict put values. */
+ @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"})
+ private Collection<GridCacheDrInfo> conflictPutVals;
+
+ /** Conflict remove values. */
+ @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"})
+ private Collection<GridCacheVersion> conflictRmvVals;
+
+ /** Mappings if operations is mapped to more than one node. */
+ @GridToStringInclude
+ private Map<UUID, GridNearAtomicFullUpdateRequest> mappings;
+
+ /** Keys to remap. */
+ private Collection<KeyCacheObject> remapKeys;
+
+ /** Not null is operation is mapped to single node. */
+ private GridNearAtomicFullUpdateRequest singleReq;
+
+ /** */
+ private int resCnt;
+
+ /**
+ * @param cctx Cache context.
+ * @param cache Cache instance.
+ * @param syncMode Write synchronization mode.
+ * @param op Update operation.
+ * @param keys Keys to update.
+ * @param vals Values or transform closure.
+ * @param invokeArgs Optional arguments for entry processor.
+ * @param conflictPutVals Conflict put values (optional).
+ * @param conflictRmvVals Conflict remove values (optional).
+ * @param retval Return value require flag.
+ * @param rawRetval {@code True} if should return {@code GridCacheReturn} as future result.
+ * @param expiryPlc Expiry policy explicitly specified for cache operation.
+ * @param filter Entry filter.
+ * @param subjId Subject ID.
+ * @param taskNameHash Task name hash code.
+ * @param skipStore Skip store flag.
+ * @param keepBinary Keep binary flag.
+ * @param remapCnt Maximum number of retries.
+ * @param waitTopFut If {@code false} does not wait for affinity change future.
+ */
+ GridNearAtomicFastMapUpdateFuture(
+ GridCacheContext cctx,
+ GridDhtAtomicCache cache,
+ CacheWriteSynchronizationMode syncMode,
+ GridCacheOperation op,
+ Collection<?> keys,
+ @Nullable Collection<?> vals,
+ @Nullable Object[] invokeArgs,
+ @Nullable Collection<GridCacheDrInfo> conflictPutVals,
+ @Nullable Collection<GridCacheVersion> conflictRmvVals,
+ final boolean retval,
+ final boolean rawRetval,
+ @Nullable ExpiryPolicy expiryPlc,
+ final CacheEntryPredicate[] filter,
+ UUID subjId,
+ int taskNameHash,
+ boolean skipStore,
+ boolean keepBinary,
+ int remapCnt,
+ boolean waitTopFut
+ ) {
+ super(cctx, cache, syncMode, op, invokeArgs, retval, rawRetval, expiryPlc, filter, subjId, taskNameHash,
+ skipStore, keepBinary, remapCnt, waitTopFut);
+
+ assert vals == null || vals.size() == keys.size();
+ assert conflictPutVals == null || conflictPutVals.size() == keys.size();
+ assert conflictRmvVals == null || conflictRmvVals.size() == keys.size();
+ assert subjId != null;
+ assert cache.isFastMap(filter, op);
+
+ this.keys = keys;
+ this.vals = vals;
+ this.conflictPutVals = conflictPutVals;
+ this.conflictRmvVals = conflictRmvVals;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Long id() {
+ synchronized (mux) {
+ return futId;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override protected boolean fastMap() {
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean onNodeLeft(UUID nodeId) {
+ GridNearAtomicUpdateResponse res = null;
+
+ GridNearAtomicFullUpdateRequest req;
+
+ synchronized (mux) {
+ if (singleReq != null)
+ req = singleReq.nodeId().equals(nodeId) ? singleReq : null;
+ else
+ req = mappings != null ? mappings.get(nodeId) : null;
+
+ if (req != null && req.response() == null) {
+ res = new GridNearAtomicUpdateResponse(cctx.cacheId(),
+ nodeId,
+ req.futureId(),
+ req.partition(),
+ false,
+ cctx.deploymentEnabled());
+
+ ClusterTopologyCheckedException e = new ClusterTopologyCheckedException("Primary node left grid " +
+ "before response is received: " + nodeId);
+
+ e.retryReadyFuture(cctx.shared().nextAffinityReadyFuture(req.topologyVersion()));
+
+ res.addFailedKeys(req.keys(), e);
+ }
+ }
+
+ if (res != null) {
+ if (msgLog.isDebugEnabled()) {
+ msgLog.debug("Near update fut, node left [futId=" + req.futureId() +
+ ", writeVer=" + req.updateVersion() +
+ ", node=" + nodeId + ']');
+ }
+
+ onPrimaryResponse(nodeId, res, true);
+ }
+
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteInternalFuture<Void> completeFuture(AffinityTopologyVersion topVer) {
+ // Wait fast-map near atomic update futures in CLOCK mode.
+ if (fastMap) {
+ GridFutureAdapter<Void> fut;
+
+ synchronized (mux) {
+ if (this.topVer != AffinityTopologyVersion.ZERO && this.topVer.compareTo(topVer) < 0) {
+ if (topCompleteFut == null)
+ topCompleteFut = new GridFutureAdapter<>();
+
+ fut = topCompleteFut;
+ }
+ else
+ fut = null;
+ }
+
+ if (fut != null && isDone()) {
+ fut.onDone();
+
+ return null;
+ }
+
+ return fut;
+ }
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("ConstantConditions")
+ @Override public boolean onDone(@Nullable Object res, @Nullable Throwable err) {
+ assert res == null || res instanceof GridCacheReturn;
+
+ GridCacheReturn ret = (GridCacheReturn)res;
+
+ Object retval =
+ res == null ? null : rawRetval ? ret : (this.retval || op == TRANSFORM) ?
+ cctx.unwrapBinaryIfNeeded(ret.value(), keepBinary) : ret.success();
+
+ if (op == TRANSFORM && retval == null)
+ retval = Collections.emptyMap();
+
+ if (super.onDone(retval, err)) {
+ Long futId = onFutureDone();
+
+ if (futId != null)
+ cctx.mvcc().removeAtomicFuture(futId);
+
+ return true;
+ }
+
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onDhtResponse(UUID nodeId, GridDhtAtomicNearResponse res) {
+ assert false;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings({"unchecked", "ThrowableResultOfMethodCallIgnored"})
+ @Override public void onPrimaryResponse(UUID nodeId, GridNearAtomicUpdateResponse res, boolean nodeErr) {
+ GridNearAtomicFullUpdateRequest req;
+
+ AffinityTopologyVersion remapTopVer = null;
+
+ GridCacheReturn opRes0 = null;
+ CachePartialUpdateCheckedException err0 = null;
+
+ boolean rcvAll;
+
+ GridFutureAdapter<?> fut0 = null;
+
+ synchronized (mux) {
+ if (futId == null || futId != res.futureId())
+ return;
+
+ if (singleReq != null) {
+ if (!singleReq.nodeId().equals(nodeId))
+ return;
+
+ req = singleReq;
+
+ singleReq = null;
+
+ rcvAll = true;
+ }
+ else {
+ req = mappings != null ? mappings.get(nodeId) : null;
+
+ if (req != null && req.onResponse(res)) {
+ resCnt++;
+
+ rcvAll = mappings.size() == resCnt;
+ }
+ else
+ return;
+ }
+
+ assert req != null && req.topologyVersion().equals(topVer) : req;
+
+ if (res.remapTopologyVersion() != null) {
+ assert !fastMap || cctx.kernalContext().clientNode();
+
+ if (remapKeys == null)
+ remapKeys = U.newHashSet(req.size());
+
+ remapKeys.addAll(req.keys());
+
+ if (mapErrTopVer == null || mapErrTopVer.compareTo(req.topologyVersion()) < 0)
+ mapErrTopVer = req.topologyVersion();
+ }
+ else if (res.error() != null) {
+ if (res.failedKeys() != null) {
+ if (err == null)
+ err = new CachePartialUpdateCheckedException(
+ "Failed to update keys (retry update if possible).");
+
+ Collection<Object> keys = new ArrayList<>(res.failedKeys().size());
+
+ for (KeyCacheObject key : res.failedKeys())
+ keys.add(cctx.cacheObjectContext().unwrapBinaryIfNeeded(key, keepBinary, false));
+
+ err.add(keys, res.error(), req.topologyVersion());
+ }
+ }
+ else {
+ if (!req.fastMap() || req.hasPrimary()) {
+ GridCacheReturn ret = res.returnValue();
+
+ if (op == TRANSFORM) {
+ if (ret != null) {
+ assert ret.value() == null || ret.value() instanceof Map : ret.value();
+
+ if (ret.value() != null) {
+ if (opRes != null)
+ opRes.mergeEntryProcessResults(ret);
+ else
+ opRes = ret;
+ }
+ }
+ }
+ else
+ opRes = ret;
+ }
+ }
+
+ if (rcvAll) {
+ if (remapKeys != null) {
+ assert mapErrTopVer != null;
+
+ remapTopVer = cctx.shared().exchange().topologyVersion();
+ }
+ else {
+ if (err != null &&
+ X.hasCause(err, CachePartialUpdateCheckedException.class) &&
+ X.hasCause(err, ClusterTopologyCheckedException.class) &&
+ storeFuture() &&
+ --remapCnt > 0) {
+ ClusterTopologyCheckedException topErr =
+ X.cause(err, ClusterTopologyCheckedException.class);
+
+ if (!(topErr instanceof ClusterTopologyServerNotFoundException)) {
+ CachePartialUpdateCheckedException cause =
+ X.cause(err, CachePartialUpdateCheckedException.class);
+
+ assert cause != null && cause.topologyVersion() != null : err;
+
+ remapTopVer =
+ new AffinityTopologyVersion(cause.topologyVersion().topologyVersion() + 1);
+
+ err = null;
+
+ Collection<Object> failedKeys = cause.failedKeys();
+
+ remapKeys = new ArrayList<>(failedKeys.size());
+
+ for (Object key : failedKeys)
+ remapKeys.add(cctx.toCacheKeyObject(key));
+
+ updVer = null;
+ }
+ }
+ }
+
+ if (remapTopVer == null) {
+ err0 = err;
+ opRes0 = opRes;
+ }
+ else {
+ fut0 = topCompleteFut;
+
+ topCompleteFut = null;
+
+ cctx.mvcc().removeAtomicFuture(futId);
+
+ futId = null;
+ topVer = AffinityTopologyVersion.ZERO;
+ }
+ }
+ }
+
+ if (res.error() != null && res.failedKeys() == null) {
+ onDone(res.error());
+
+ return;
+ }
+
+ if (rcvAll && nearEnabled) {
+ if (mappings != null) {
+ for (GridNearAtomicFullUpdateRequest req0 : mappings.values()) {
+ GridNearAtomicUpdateResponse res0 = req0.response();
+
+ assert res0 != null : req0;
+
+ updateNear(req0, res0);
+ }
+ }
+ else if (!nodeErr)
+ updateNear(req, res);
+ }
+
+ if (remapTopVer != null) {
+ if (fut0 != null)
+ fut0.onDone();
+
+ if (!waitTopFut) {
+ onDone(new GridCacheTryPutFailedException());
+
+ return;
+ }
+
+ if (topLocked) {
+ assert !F.isEmpty(remapKeys) : remapKeys;
+
+ CachePartialUpdateCheckedException e =
+ new CachePartialUpdateCheckedException("Failed to update keys (retry update if possible).");
+
+ ClusterTopologyCheckedException cause = new ClusterTopologyCheckedException(
+ "Failed to update keys, topology changed while execute atomic update inside transaction.");
+
+ cause.retryReadyFuture(cctx.affinity().affinityReadyFuture(remapTopVer));
+
+ e.add(remapKeys, cause);
+
+ onDone(e);
+
+ return;
+ }
+
+ IgniteInternalFuture<AffinityTopologyVersion> fut =
+ cctx.shared().exchange().affinityReadyFuture(remapTopVer);
+
+ if (fut == null)
+ fut = new GridFinishedFuture<>(remapTopVer);
+
+ fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
+ @Override public void apply(final IgniteInternalFuture<AffinityTopologyVersion> fut) {
+ cctx.kernalContext().closure().runLocalSafe(new Runnable() {
+ @Override public void run() {
+ mapOnTopology();
+ }
+ });
+ }
+ });
+
+ return;
+ }
+
+ if (rcvAll)
+ onDone(opRes0, err0);
+ }
+
+ /**
+ * Updates near cache.
+ *
+ * @param req Update request.
+ * @param res Update response.
+ */
+ private void updateNear(GridNearAtomicFullUpdateRequest req, GridNearAtomicUpdateResponse res) {
+ assert nearEnabled;
+
+ if (res.remapTopologyVersion() != null || !req.hasPrimary())
+ return;
+
+ GridNearAtomicCache near = (GridNearAtomicCache)cctx.dht().near();
+
+ near.processNearAtomicUpdateResponse(req, res);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void mapOnTopology() {
+ AffinityTopologyVersion topVer;
+ Long futId;
+
+ cache.topology().readLock();
+
+ try {
+ if (cache.topology().stopping()) {
+ onDone(new IgniteCheckedException("Failed to perform cache operation (cache is stopped): " +
+ cache.name()));
+
+ return;
+ }
+
+ GridDhtTopologyFuture fut = cache.topology().topologyVersionFuture();
+
+ if (fut.isDone()) {
+ Throwable err = fut.validateCache(cctx);
+
+ if (err != null) {
+ onDone(err);
+
+ return;
+ }
+
+ topVer = fut.topologyVersion();
+
+ futId = addAtomicFuture(topVer);
+ }
+ else {
+ if (waitTopFut) {
+ assert !topLocked : this;
+
+ fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
+ @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
+ cctx.kernalContext().closure().runLocalSafe(new Runnable() {
+ @Override public void run() {
+ mapOnTopology();
+ }
+ });
+ }
+ });
+ }
+ else
+ onDone(new GridCacheTryPutFailedException());
+
+ return;
+ }
+ }
+ finally {
+ cache.topology().readUnlock();
+ }
+
+ if (futId != null)
+ map(topVer, futId, remapKeys);
+ }
+
+ /**
+ * Sends messages to remote nodes and updates local cache.
+ *
+ * @param mappings Mappings to send.
+ */
+ private void doUpdate(Map<UUID, GridNearAtomicFullUpdateRequest> mappings) {
+ UUID locNodeId = cctx.localNodeId();
+
+ GridNearAtomicFullUpdateRequest locUpdate = null;
+
+ // Send messages to remote nodes first, then run local update.
+ for (GridNearAtomicFullUpdateRequest req : mappings.values()) {
+ if (locNodeId.equals(req.nodeId())) {
+ assert locUpdate == null : "Cannot have more than one local mapping [locUpdate=" + locUpdate +
+ ", req=" + req + ']';
+
+ locUpdate = req;
+ }
+ else {
+ try {
+ cctx.io().send(req.nodeId(), req, cctx.ioPolicy());
+
+ if (msgLog.isDebugEnabled()) {
+ msgLog.debug("Near update fut, sent request [futId=" + req.futureId() +
+ ", writeVer=" + req.updateVersion() +
+ ", node=" + req.nodeId() + ']');
+ }
+ }
+ catch (IgniteCheckedException e) {
+ if (msgLog.isDebugEnabled()) {
+ msgLog.debug("Near update fut, failed to send request [futId=" + req.futureId() +
+ ", writeVer=" + req.updateVersion() +
+ ", node=" + req.nodeId() +
+ ", err=" + e + ']');
+ }
+
+ onSendError(req, e);
+ }
+ }
+ }
+
+ if (locUpdate != null) {
+ cache.updateAllAsyncInternal(cctx.localNodeId(), locUpdate,
+ new GridDhtAtomicCache.UpdateReplyClosure() {
+ @Override public void apply(GridNearAtomicAbstractUpdateRequest req, GridNearAtomicUpdateResponse res) {
+ onPrimaryResponse(res.nodeId(), res, false);
+ }
+ });
+ }
+
+ if (syncMode == FULL_ASYNC)
+ onDone(new GridCacheReturn(cctx, true, true, null, true));
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void map(AffinityTopologyVersion topVer, Long futVer) {
+ map(topVer, futVer, null);
+ }
+
+ /**
+ * @param topVer Topology version.
+ * @param futId Future ID.
+ * @param remapKeys Keys to remap.
+ */
+ void map(AffinityTopologyVersion topVer,
+ Long futId,
+ @Nullable Collection<KeyCacheObject> remapKeys) {
+ Collection<ClusterNode> topNodes = CU.affinityNodes(cctx, topVer);
+
+ if (F.isEmpty(topNodes)) {
+ onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all partition nodes " +
+ "left the grid)."));
+
+ return;
+ }
+
+ GridCacheVersion updVer;
+
+ // Assign version on near node in CLOCK ordering mode even if fastMap is false.
+ if (cctx.config().getAtomicWriteOrderMode() == CLOCK) {
+ updVer = this.updVer;
+
+ if (updVer == null) {
+ updVer = cctx.versions().next(topVer);
+
+ if (log.isDebugEnabled())
+ log.debug("Assigned fast-map version for update on near node: " + updVer);
+ }
+ }
+ else
+ updVer = null;
+
+ Exception err = null;
+ GridNearAtomicFullUpdateRequest singleReq0 = null;
+ Map<UUID, GridNearAtomicFullUpdateRequest> mappings0 = null;
+
+ int size = keys.size();
+
+ try {
+ if (size == 1 && !fastMap) {
+ assert remapKeys == null || remapKeys.size() == 1;
+
+ singleReq0 = mapSingleUpdate(topVer, futId, updVer);
+ }
+ else {
+ Map<UUID, GridNearAtomicFullUpdateRequest> pendingMappings = mapUpdate(topNodes,
+ topVer,
+ futId,
+ updVer,
+ remapKeys);
+
+ if (pendingMappings.size() == 1)
+ singleReq0 = F.firstValue(pendingMappings);
+ else {
+ if (syncMode == PRIMARY_SYNC) {
+ mappings0 = U.newHashMap(pendingMappings.size());
+
+ for (GridNearAtomicFullUpdateRequest req : pendingMappings.values()) {
+ if (req.hasPrimary())
+ mappings0.put(req.nodeId(), req);
+ }
+ }
+ else
+ mappings0 = pendingMappings;
+
+ assert !mappings0.isEmpty() || size == 0 : this;
+ }
+ }
+
+ synchronized (mux) {
+ assert futId.equals(this.futId) || (this.isDone() && this.error() != null);
+ assert this.topVer == topVer;
+
+ this.updVer = updVer;
+
+ resCnt = 0;
+
+ singleReq = singleReq0;
+ mappings = mappings0;
+
+ this.remapKeys = null;
+ }
+ }
+ catch (Exception e) {
+ err = e;
+ }
+
+ if (err != null) {
+ onDone(err);
+
+ return;
+ }
+
+ // Optimize mapping for single key.
+ if (singleReq0 != null)
+ sendSingleRequest(singleReq0.nodeId(), singleReq0);
+ else {
+ assert mappings0 != null;
+
+ if (size == 0)
+ onDone(new GridCacheReturn(cctx, true, true, null, true));
+ else
+ doUpdate(mappings0);
+ }
+ }
+
+ /**
+ * @return Future version.
+ */
+ private Long onFutureDone() {
+ Long futId0;
+
+ GridFutureAdapter<Void> fut0;
+
+ synchronized (mux) {
+ fut0 = topCompleteFut;
+
+ topCompleteFut = null;
+
+ futId0 = futId;
+
+ futId = null;
+ }
+
+ if (fut0 != null)
+ fut0.onDone();
+
+ return futId0;
+ }
+
+ /**
+ * @param topNodes Cache nodes.
+ * @param topVer Topology version.
+ * @param futId Future ID.
+ * @param updVer Update version.
+ * @param remapKeys Keys to remap.
+ * @return Mapping.
+ * @throws Exception If failed.
+ */
+ @SuppressWarnings("ForLoopReplaceableByForEach")
+ private Map<UUID, GridNearAtomicFullUpdateRequest> mapUpdate(Collection<ClusterNode> topNodes,
+ AffinityTopologyVersion topVer,
+ Long futId,
+ @Nullable GridCacheVersion updVer,
+ @Nullable Collection<KeyCacheObject> remapKeys) throws Exception {
+ Iterator<?> it = null;
+
+ if (vals != null)
+ it = vals.iterator();
+
+ Iterator<GridCacheDrInfo> conflictPutValsIt = null;
+
+ if (conflictPutVals != null)
+ conflictPutValsIt = conflictPutVals.iterator();
+
+ Iterator<GridCacheVersion> conflictRmvValsIt = null;
+
+ if (conflictRmvVals != null)
+ conflictRmvValsIt = conflictRmvVals.iterator();
+
+ Map<UUID, GridNearAtomicFullUpdateRequest> pendingMappings = U.newHashMap(topNodes.size());
+
+ // Create mappings first, then send messages.
+ for (Object key : keys) {
+ if (key == null)
+ throw new NullPointerException("Null key.");
+
+ Object val;
+ GridCacheVersion conflictVer;
+ long conflictTtl;
+ long conflictExpireTime;
+
+ if (vals != null) {
+ val = it.next();
+ conflictVer = null;
+ conflictTtl = CU.TTL_NOT_CHANGED;
+ conflictExpireTime = CU.EXPIRE_TIME_CALCULATE;
+
+ if (val == null)
+ throw new NullPointerException("Null value.");
+ }
+ else if (conflictPutVals != null) {
+ GridCacheDrInfo conflictPutVal = conflictPutValsIt.next();
+
+ val = conflictPutVal.valueEx();
+ conflictVer = conflictPutVal.version();
+ conflictTtl = conflictPutVal.ttl();
+ conflictExpireTime = conflictPutVal.expireTime();
+ }
+ else if (conflictRmvVals != null) {
+ val = null;
+ conflictVer = conflictRmvValsIt.next();
+ conflictTtl = CU.TTL_NOT_CHANGED;
+ conflictExpireTime = CU.EXPIRE_TIME_CALCULATE;
+ }
+ else {
+ val = null;
+ conflictVer = null;
+ conflictTtl = CU.TTL_NOT_CHANGED;
+ conflictExpireTime = CU.EXPIRE_TIME_CALCULATE;
+ }
+
+ if (val == null && op != GridCacheOperation.DELETE)
+ continue;
+
+ KeyCacheObject cacheKey = cctx.toCacheKeyObject(key);
+
+ if (remapKeys != null && !remapKeys.contains(cacheKey))
+ continue;
+
+ if (op != TRANSFORM)
+ val = cctx.toCacheObject(val);
+ else
+ val = EntryProcessorResourceInjectorProxy.wrap(cctx.kernalContext(), (EntryProcessor)val);
+
+ List<ClusterNode> affNodes = mapKey(cacheKey, topVer);
+
+ if (affNodes.isEmpty())
+ throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
+ "(all partition nodes left the grid).");
+
+ int i = 0;
+
+ for (int n = 0; n < affNodes.size(); n++) {
+ ClusterNode affNode = affNodes.get(n);
+
+ if (affNode == null)
+ throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
+ "(all partition nodes left the grid).");
+
+ UUID nodeId = affNode.id();
+
+ GridNearAtomicFullUpdateRequest mapped = pendingMappings.get(nodeId);
+
+ if (mapped == null) {
+ mapped = new GridNearAtomicFullUpdateRequest(
+ cctx.cacheId(),
+ nodeId,
+ futId,
+ topVer,
+ topLocked,
+ syncMode,
+ op,
+ retval,
+ expiryPlc,
+ invokeArgs,
+ filter,
+ subjId,
+ taskNameHash,
+ /*fastMap*/true,
+ cctx.kernalContext().clientNode(),
+ /*needPrimaryRes*/false,
+ skipStore,
+ keepBinary,
+ cctx.deploymentEnabled(),
+ keys.size());
+
+ mapped.updateVersion(updVer);
+
+ pendingMappings.put(nodeId, mapped);
+ }
+
+ mapped.addUpdateEntry(cacheKey,
+ val,
+ conflictTtl,
+ conflictExpireTime,
+ conflictVer,
+ i == 0);
+
+ i++;
+ }
+ }
+
+ return pendingMappings;
+ }
+
+ /**
+ * @param topVer Topology version.
+ * @param futId Future ID.
+ * @param updVer Update version.
+ * @return Request.
+ * @throws Exception If failed.
+ */
+ private GridNearAtomicFullUpdateRequest mapSingleUpdate(AffinityTopologyVersion topVer,
+ Long futId,
+ @Nullable GridCacheVersion updVer) throws Exception {
+ Object key = F.first(keys);
+
+ Object val;
+ GridCacheVersion conflictVer;
+ long conflictTtl;
+ long conflictExpireTime;
+
+ if (vals != null) {
+ // Regular PUT.
+ val = F.first(vals);
+ conflictVer = null;
+ conflictTtl = CU.TTL_NOT_CHANGED;
+ conflictExpireTime = CU.EXPIRE_TIME_CALCULATE;
+ }
+ else if (conflictPutVals != null) {
+ // Conflict PUT.
+ GridCacheDrInfo conflictPutVal = F.first(conflictPutVals);
+
+ val = conflictPutVal.valueEx();
+ conflictVer = conflictPutVal.version();
+ conflictTtl = conflictPutVal.ttl();
+ conflictExpireTime = conflictPutVal.expireTime();
+ }
+ else if (conflictRmvVals != null) {
+ // Conflict REMOVE.
+ val = null;
+ conflictVer = F.first(conflictRmvVals);
+ conflictTtl = CU.TTL_NOT_CHANGED;
+ conflictExpireTime = CU.EXPIRE_TIME_CALCULATE;
+ }
+ else {
+ // Regular REMOVE.
+ val = null;
+ conflictVer = null;
+ conflictTtl = CU.TTL_NOT_CHANGED;
+ conflictExpireTime = CU.EXPIRE_TIME_CALCULATE;
+ }
+
+ // We still can get here if user pass map with single element.
+ if (key == null)
+ throw new NullPointerException("Null key.");
+
+ if (val == null && op != GridCacheOperation.DELETE)
+ throw new NullPointerException("Null value.");
+
+ KeyCacheObject cacheKey = cctx.toCacheKeyObject(key);
+
+ if (op != TRANSFORM)
+ val = cctx.toCacheObject(val);
+ else
+ val = EntryProcessorResourceInjectorProxy.wrap(cctx.kernalContext(), (EntryProcessor)val);
+
+ ClusterNode primary = cctx.affinity().primaryByPartition(cacheKey.partition(), topVer);
+
+ if (primary == null)
+ throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all partition nodes " +
+ "left the grid).");
+
+ GridNearAtomicFullUpdateRequest req = new GridNearAtomicFullUpdateRequest(
+ cctx.cacheId(),
+ primary.id(),
+ futId,
+ topVer,
+ topLocked,
+ syncMode,
+ op,
+ retval,
+ expiryPlc,
+ invokeArgs,
+ filter,
+ subjId,
+ taskNameHash,
+ /*fastMap*/true,
+ cctx.kernalContext().clientNode(),
+ /*needPrimaryRes*/false,
+ skipStore,
+ keepBinary,
+ cctx.deploymentEnabled(),
+ 1);
+
+ req.updateVersion(updVer);
+
+ req.addUpdateEntry(cacheKey,
+ val,
+ conflictTtl,
+ conflictExpireTime,
+ conflictVer,
+ true);
+
+ return req;
+ }
+
+ /**
+ * Maps key to nodes. If filters are absent and operation is not TRANSFORM, then we can assign version on near
+ * node and send updates in parallel to all participating nodes.
+ *
+ * @param key Key to map.
+ * @param topVer Topology version to map.
+ * @return Collection of nodes to which key is mapped.
+ */
+ private List<ClusterNode> mapKey(KeyCacheObject key, AffinityTopologyVersion topVer) {
+ GridCacheAffinityManager affMgr = cctx.affinity();
+
+ // If we can send updates in parallel - do it.
+ return fastMap ? cctx.topology().nodes(affMgr.partition(key), topVer) :
+ Collections.singletonList(affMgr.primaryByKey(key, topVer));
+ }
+
+ /** {@inheritDoc} */
+ public String toString() {
+ synchronized (mux) {
+ return S.toString(GridNearAtomicFastMapUpdateFuture.class, this, super.toString());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/0015962a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
index c381333..ede1a05 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
@@ -151,6 +151,8 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
@Nullable CacheEntryPredicate[] filter,
@Nullable UUID subjId,
int taskNameHash,
+ boolean fastMap,
+ boolean clientReq,
boolean needPrimaryRes,
boolean skipStore,
boolean keepBinary,
@@ -167,6 +169,8 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
retval,
subjId,
taskNameHash,
+ fastMap,
+ clientReq,
needPrimaryRes,
skipStore,
keepBinary,
@@ -189,7 +193,11 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
@Nullable Object val,
long conflictTtl,
long conflictExpireTime,
- @Nullable GridCacheVersion conflictVer) {
+ @Nullable GridCacheVersion conflictVer,
+ boolean primary) {
+ if (primary)
+ hasPrimary(true);
+
EntryProcessor<Object, Object, Object> entryProcessor = null;
if (op == TRANSFORM) {
@@ -445,55 +453,55 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
}
switch (writer.state()) {
- case 10:
+ case 11:
if (!writer.writeMessage("conflictExpireTimes", conflictExpireTimes))
return false;
writer.incrementState();
- case 11:
+ case 12:
if (!writer.writeMessage("conflictTtls", conflictTtls))
return false;
writer.incrementState();
- case 12:
+ case 13:
if (!writer.writeCollection("conflictVers", conflictVers, MessageCollectionItemType.MSG))
return false;
writer.incrementState();
- case 13:
+ case 14:
if (!writer.writeCollection("entryProcessorsBytes", entryProcessorsBytes, MessageCollectionItemType.BYTE_ARR))
return false;
writer.incrementState();
- case 14:
+ case 15:
if (!writer.writeByteArray("expiryPlcBytes", expiryPlcBytes))
return false;
writer.incrementState();
- case 15:
+ case 16:
if (!writer.writeObjectArray("filter", filter, MessageCollectionItemType.MSG))
return false;
writer.incrementState();
- case 16:
+ case 17:
if (!writer.writeObjectArray("invokeArgsBytes", invokeArgsBytes, MessageCollectionItemType.BYTE_ARR))
return false;
writer.incrementState();
- case 17:
+ case 18:
if (!writer.writeCollection("keys", keys, MessageCollectionItemType.MSG))
return false;
writer.incrementState();
- case 18:
+ case 19:
if (!writer.writeCollection("vals", vals, MessageCollectionItemType.MSG))
return false;
@@ -515,7 +523,7 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
return false;
switch (reader.state()) {
- case 10:
+ case 11:
conflictExpireTimes = reader.readMessage("conflictExpireTimes");
if (!reader.isLastRead())
@@ -523,7 +531,7 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
reader.incrementState();
- case 11:
+ case 12:
conflictTtls = reader.readMessage("conflictTtls");
if (!reader.isLastRead())
@@ -531,7 +539,7 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
reader.incrementState();
- case 12:
+ case 13:
conflictVers = reader.readCollection("conflictVers", MessageCollectionItemType.MSG);
if (!reader.isLastRead())
@@ -539,7 +547,7 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
reader.incrementState();
- case 13:
+ case 14:
entryProcessorsBytes = reader.readCollection("entryProcessorsBytes", MessageCollectionItemType.BYTE_ARR);
if (!reader.isLastRead())
@@ -547,7 +555,7 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
reader.incrementState();
- case 14:
+ case 15:
expiryPlcBytes = reader.readByteArray("expiryPlcBytes");
if (!reader.isLastRead())
@@ -555,7 +563,7 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
reader.incrementState();
- case 15:
+ case 16:
filter = reader.readObjectArray("filter", MessageCollectionItemType.MSG, CacheEntryPredicate.class);
if (!reader.isLastRead())
@@ -563,7 +571,7 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
reader.incrementState();
- case 16:
+ case 17:
invokeArgsBytes = reader.readObjectArray("invokeArgsBytes", MessageCollectionItemType.BYTE_ARR, byte[].class);
if (!reader.isLastRead())
@@ -571,7 +579,7 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
reader.incrementState();
- case 17:
+ case 18:
keys = reader.readCollection("keys", MessageCollectionItemType.MSG);
if (!reader.isLastRead())
@@ -579,7 +587,7 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
reader.incrementState();
- case 18:
+ case 19:
vals = reader.readCollection("vals", MessageCollectionItemType.MSG);
if (!reader.isLastRead())
@@ -611,7 +619,7 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 19;
+ return 20;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/0015962a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFilterRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFilterRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFilterRequest.java
index 78582b0..ab55cf9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFilterRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFilterRequest.java
@@ -83,6 +83,8 @@ public class GridNearAtomicSingleUpdateFilterRequest extends GridNearAtomicSingl
@Nullable CacheEntryPredicate[] filter,
@Nullable UUID subjId,
int taskNameHash,
+ boolean fastMap,
+ boolean clientReq,
boolean needPrimaryRes,
boolean skipStore,
boolean keepBinary,
@@ -99,6 +101,8 @@ public class GridNearAtomicSingleUpdateFilterRequest extends GridNearAtomicSingl
retval,
subjId,
taskNameHash,
+ fastMap,
+ clientReq,
needPrimaryRes,
skipStore,
keepBinary,
@@ -166,7 +170,7 @@ public class GridNearAtomicSingleUpdateFilterRequest extends GridNearAtomicSingl
}
switch (writer.state()) {
- case 12:
+ case 13:
if (!writer.writeObjectArray("filter", filter, MessageCollectionItemType.MSG))
return false;
@@ -188,7 +192,7 @@ public class GridNearAtomicSingleUpdateFilterRequest extends GridNearAtomicSingl
return false;
switch (reader.state()) {
- case 12:
+ case 13:
filter = reader.readObjectArray("filter", MessageCollectionItemType.MSG, CacheEntryPredicate.class);
if (!reader.isLastRead())
@@ -208,7 +212,7 @@ public class GridNearAtomicSingleUpdateFilterRequest extends GridNearAtomicSingl
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 13;
+ return 14;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/0015962a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
index 930c4af..da590ff 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
@@ -119,6 +119,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
waitTopFut);
assert subjId != null;
+ assert !cache.isFastMap(filter, op);
this.key = key;
this.val = val;
@@ -344,6 +345,8 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
ClusterTopologyCheckedException topErr = X.cause(err, ClusterTopologyCheckedException.class);
if (!(topErr instanceof ClusterTopologyServerNotFoundException)) {
+ updVer = null;
+
CachePartialUpdateCheckedException cause =
X.cause(err, CachePartialUpdateCheckedException.class);
@@ -474,11 +477,11 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
return;
}
- map(topVer);
+ map(topVer, null);
}
/** {@inheritDoc} */
- @Override protected void map(AffinityTopologyVersion topVer) {
+ @Override protected void map(AffinityTopologyVersion topVer, Long ignore) {
long futId = cctx.mvcc().atomicFutureId();
Exception err = null;
@@ -550,9 +553,8 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
err0 = err;
remapTopVer0 = onAllReceived();
}
- else if (res == DhtLeftResult.ALL_RCVD_CHECK_PRIMARY){
+ else if (res == DhtLeftResult.ALL_RCVD_CHECK_PRIMARY)
checkReq = new GridNearAtomicCheckUpdateRequest(reqState.req);
- }
else
return true;
}
@@ -632,6 +634,8 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
invokeArgs,
subjId,
taskNameHash,
+ /*fastMap*/false,
+ cctx.kernalContext().clientNode(),
needPrimaryRes,
skipStore,
keepBinary,
@@ -650,6 +654,8 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
retval,
subjId,
taskNameHash,
+ /*fastMap*/false,
+ cctx.kernalContext().clientNode(),
needPrimaryRes,
skipStore,
keepBinary,
@@ -668,6 +674,8 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
filter,
subjId,
taskNameHash,
+ /*fastMap*/false,
+ cctx.kernalContext().clientNode(),
needPrimaryRes,
skipStore,
keepBinary,
@@ -690,6 +698,8 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
filter,
subjId,
taskNameHash,
+ /*fastMap*/false,
+ cctx.kernalContext().clientNode(),
needPrimaryRes,
skipStore,
keepBinary,
@@ -697,11 +707,14 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
1);
}
+ initUpdateVersion(req, topVer);
+
req.addUpdateEntry(cacheKey,
val,
CU.TTL_NOT_CHANGED,
CU.EXPIRE_TIME_CALCULATE,
- null);
+ null,
+ true);
return new PrimaryRequestState(req, nodes, true);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/0015962a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateInvokeRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateInvokeRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateInvokeRequest.java
index f8b3984..614d6cb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateInvokeRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateInvokeRequest.java
@@ -101,6 +101,8 @@ public class GridNearAtomicSingleUpdateInvokeRequest extends GridNearAtomicSingl
@Nullable Object[] invokeArgs,
@Nullable UUID subjId,
int taskNameHash,
+ boolean fastMap,
+ boolean clientReq,
boolean needPrimaryRes,
boolean skipStore,
boolean keepBinary,
@@ -117,6 +119,8 @@ public class GridNearAtomicSingleUpdateInvokeRequest extends GridNearAtomicSingl
retval,
subjId,
taskNameHash,
+ fastMap,
+ clientReq,
needPrimaryRes,
skipStore,
keepBinary,
@@ -139,7 +143,8 @@ public class GridNearAtomicSingleUpdateInvokeRequest extends GridNearAtomicSingl
@Nullable Object val,
long conflictTtl,
long conflictExpireTime,
- @Nullable GridCacheVersion conflictVer) {
+ @Nullable GridCacheVersion conflictVer,
+ boolean primary) {
assert conflictTtl < 0 : conflictTtl;
assert conflictExpireTime < 0 : conflictExpireTime;
assert conflictVer == null : conflictVer;
@@ -235,13 +240,13 @@ public class GridNearAtomicSingleUpdateInvokeRequest extends GridNearAtomicSingl
}
switch (writer.state()) {
- case 12:
+ case 13:
if (!writer.writeByteArray("entryProcessorBytes", entryProcessorBytes))
return false;
writer.incrementState();
- case 13:
+ case 14:
if (!writer.writeObjectArray("invokeArgsBytes", invokeArgsBytes, MessageCollectionItemType.BYTE_ARR))
return false;
@@ -263,7 +268,7 @@ public class GridNearAtomicSingleUpdateInvokeRequest extends GridNearAtomicSingl
return false;
switch (reader.state()) {
- case 12:
+ case 13:
entryProcessorBytes = reader.readByteArray("entryProcessorBytes");
if (!reader.isLastRead())
@@ -271,7 +276,7 @@ public class GridNearAtomicSingleUpdateInvokeRequest extends GridNearAtomicSingl
reader.incrementState();
- case 13:
+ case 14:
invokeArgsBytes = reader.readObjectArray("invokeArgsBytes", MessageCollectionItemType.BYTE_ARR, byte[].class);
if (!reader.isLastRead())
@@ -286,7 +291,7 @@ public class GridNearAtomicSingleUpdateInvokeRequest extends GridNearAtomicSingl
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 14;
+ return 15;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/0015962a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
index b9a1fc6..1db6c74 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
@@ -92,6 +92,8 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractSin
boolean retval,
@Nullable UUID subjId,
int taskNameHash,
+ boolean fastMap,
+ boolean clientReq,
boolean needPrimaryRes,
boolean skipStore,
boolean keepBinary,
@@ -107,6 +109,8 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractSin
retval,
subjId,
taskNameHash,
+ fastMap,
+ clientReq,
needPrimaryRes,
skipStore,
keepBinary,
@@ -120,18 +124,13 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractSin
return key.partition();
}
- /**
- * @param key Key to add.
- * @param val Optional update value.
- * @param conflictTtl Conflict TTL (optional).
- * @param conflictExpireTime Conflict expire time (optional).
- * @param conflictVer Conflict version (optional).
- */
+ /** {@inheritDoc} */
@Override public void addUpdateEntry(KeyCacheObject key,
@Nullable Object val,
long conflictTtl,
long conflictExpireTime,
- @Nullable GridCacheVersion conflictVer) {
+ @Nullable GridCacheVersion conflictVer,
+ boolean primary) {
assert op != TRANSFORM;
assert val != null || op == DELETE;
assert conflictTtl < 0 : conflictTtl;
@@ -257,13 +256,13 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractSin
}
switch (writer.state()) {
- case 10:
+ case 11:
if (!writer.writeMessage("key", key))
return false;
writer.incrementState();
- case 11:
+ case 12:
if (!writer.writeMessage("val", val))
return false;
@@ -285,7 +284,7 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractSin
return false;
switch (reader.state()) {
- case 10:
+ case 11:
key = reader.readMessage("key");
if (!reader.isLastRead())
@@ -293,7 +292,7 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractSin
reader.incrementState();
- case 11:
+ case 12:
val = reader.readMessage("val");
if (!reader.isLastRead())
@@ -321,7 +320,7 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractSin
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 12;
+ return 13;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/0015962a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index a44ccf9..5f51770 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -142,6 +142,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
assert conflictPutVals == null || conflictPutVals.size() == keys.size();
assert conflictRmvVals == null || conflictRmvVals.size() == keys.size();
assert subjId != null;
+ assert !cache.isFastMap(filter, op);
this.keys = keys;
this.vals = vals;
@@ -552,6 +553,8 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
ClusterTopologyCheckedException topErr = X.cause(err, ClusterTopologyCheckedException.class);
if (!(topErr instanceof ClusterTopologyServerNotFoundException)) {
+ updVer = null;
+
CachePartialUpdateCheckedException cause =
X.cause(err, CachePartialUpdateCheckedException.class);
@@ -682,7 +685,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
return;
}
- map(topVer, remapKeys);
+ doMap(topVer, remapKeys);
}
/**
@@ -749,15 +752,15 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
}
/** {@inheritDoc} */
- @Override protected void map(AffinityTopologyVersion topVer) {
- map(topVer, null);
+ @Override protected void map(AffinityTopologyVersion topVer, Long ignore) {
+ doMap(topVer, null);
}
/**
* @param topVer Topology version.
* @param remapKeys Keys to remap.
*/
- void map(AffinityTopologyVersion topVer, @Nullable Collection<KeyCacheObject> remapKeys) {
+ private void doMap(AffinityTopologyVersion topVer, @Nullable Collection<KeyCacheObject> remapKeys) {
Collection<ClusterNode> topNodes = CU.affinityNodes(cctx, topVer);
if (F.isEmpty(topNodes)) {
@@ -1059,12 +1062,16 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
filter,
subjId,
taskNameHash,
+ /*fastMap*/false,
+ cctx.kernalContext().clientNode(),
needPrimaryRes,
skipStore,
keepBinary,
cctx.deploymentEnabled(),
keys.size());
+ initUpdateVersion(req, topVer);
+
mapped = new PrimaryRequestState(req, nodes, false);
pendingMappings.put(nodeId, mapped);
@@ -1073,7 +1080,12 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
if (mapped.req.initMappingLocally())
mapped.addMapping(nodes);
- mapped.req.addUpdateEntry(cacheKey, val, conflictTtl, conflictExpireTime, conflictVer);
+ mapped.req.addUpdateEntry(cacheKey,
+ val,
+ conflictTtl,
+ conflictExpireTime,
+ conflictVer,
+ true);
}
return pendingMappings;
@@ -1164,17 +1176,22 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
filter,
subjId,
taskNameHash,
+ /*fastMap*/false,
+ cctx.kernalContext().clientNode(),
needPrimaryRes,
skipStore,
keepBinary,
cctx.deploymentEnabled(),
1);
+ initUpdateVersion(req, topVer);
+
req.addUpdateEntry(cacheKey,
val,
conflictTtl,
conflictExpireTime,
- conflictVer);
+ conflictVer,
+ true);
return new PrimaryRequestState(req, nodes, true);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/0015962a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
index 62aecd1..3b86f96 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
@@ -142,7 +142,10 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
List<Integer> nearValsIdxs = res.nearValuesIndexes();
List<Integer> skipped = res.skippedIndexes();
- GridCacheVersion ver = res.nearVersion();
+ GridCacheVersion ver = req.updateVersion();
+
+ if (ver == null)
+ ver = res.nearVersion();
assert ver != null : "Failed to find version [req=" + req + ", res=" + res + ']';
http://git-wip-us.apache.org/repos/asf/ignite/blob/0015962a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicMessageCountSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicMessageCountSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicMessageCountSelfTest.java
index a12b6b9..6da34b1 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicMessageCountSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicMessageCountSelfTest.java
@@ -94,6 +94,13 @@ public class GridCacheAtomicMessageCountSelfTest extends GridCommonAbstractTest
/**
* @throws Exception If failed.
*/
+ public void testPartitionedClock() throws Exception {
+ checkMessages(false, CLOCK);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
public void testPartitionedPrimary() throws Exception {
checkMessages(false, PRIMARY);
}
@@ -101,6 +108,13 @@ public class GridCacheAtomicMessageCountSelfTest extends GridCommonAbstractTest
/**
* @throws Exception If failed.
*/
+ public void testClientClock() throws Exception {
+ checkMessages(true, CLOCK);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
public void testClientPrimary() throws Exception {
checkMessages(true, PRIMARY);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/0015962a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java
index 6d90d0e..7927299 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java
@@ -51,6 +51,7 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY;
import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
import static org.apache.ignite.cache.CacheRebalanceMode.ASYNC;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC;
@@ -834,6 +835,7 @@ public class IgniteCacheAtomicProtocolTest extends GridCommonAbstractTest {
CacheWriteSynchronizationMode writeSync) {
CacheConfiguration<Integer, Integer> ccfg = new CacheConfiguration<>();
+ ccfg.setAtomicWriteOrderMode(PRIMARY);
ccfg.setName(TEST_CACHE);
ccfg.setAtomicityMode(ATOMIC);
ccfg.setWriteSynchronizationMode(writeSync);
[2/2] ignite git commit: Restore clock mode.
Posted by sb...@apache.org.
Restore clock mode.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0015962a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0015962a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0015962a
Branch: refs/heads/ignite-2.0-clock
Commit: 0015962a43cc48d5b7857503a0f84a43518cbd7f
Parents: 827befb
Author: sboikov <sb...@gridgain.com>
Authored: Wed Mar 22 15:20:14 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Mar 22 16:30:49 2017 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheMapEntry.java | 12 +-
.../GridDhtAtomicAbstractUpdateFuture.java | 62 +-
.../dht/atomic/GridDhtAtomicCache.java | 544 +++++++---
.../atomic/GridDhtAtomicSingleUpdateFuture.java | 6 +-
.../dht/atomic/GridDhtAtomicUpdateFuture.java | 6 +-
...idNearAtomicAbstractSingleUpdateRequest.java | 4 +
.../GridNearAtomicAbstractUpdateFuture.java | 72 +-
.../GridNearAtomicAbstractUpdateRequest.java | 120 +-
.../GridNearAtomicFastMapUpdateFuture.java | 1026 ++++++++++++++++++
.../atomic/GridNearAtomicFullUpdateRequest.java | 48 +-
...GridNearAtomicSingleUpdateFilterRequest.java | 10 +-
.../GridNearAtomicSingleUpdateFuture.java | 23 +-
...GridNearAtomicSingleUpdateInvokeRequest.java | 17 +-
.../GridNearAtomicSingleUpdateRequest.java | 25 +-
.../dht/atomic/GridNearAtomicUpdateFuture.java | 29 +-
.../distributed/near/GridNearAtomicCache.java | 5 +-
.../GridCacheAtomicMessageCountSelfTest.java | 14 +
.../atomic/IgniteCacheAtomicProtocolTest.java | 2 +
18 files changed, 1745 insertions(+), 280 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/0015962a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 54b4ed7..2237e22 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -2170,6 +2170,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
assert conflictCtx != null;
+ boolean ignoreTime = cctx.config().getAtomicWriteOrderMode() == CacheAtomicWriteOrderMode.PRIMARY;
+
// Use old value?
if (conflictCtx.isUseOld()) {
GridCacheVersion newConflictVer = conflictVer != null ? conflictVer : newVer;
@@ -2178,7 +2180,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
if (!isNew() && // Not initial value,
verCheck && // and atomic version check,
oldConflictVer.dataCenterId() == newConflictVer.dataCenterId() && // and data centers are equal,
- ATOMIC_VER_COMPARATOR.compare(oldConflictVer, newConflictVer, true) == 0 && // and both versions are equal,
+ ATOMIC_VER_COMPARATOR.compare(oldConflictVer, newConflictVer, ignoreTime) == 0 && // and both versions are equal,
cctx.writeThrough() && // and store is enabled,
primary) // and we are primary.
{
@@ -2224,11 +2226,13 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
conflictVer = null;
}
+ boolean ignoreTime = cctx.config().getAtomicWriteOrderMode() == CacheAtomicWriteOrderMode.PRIMARY;
+
// Perform version check only in case there was no explicit conflict resolution.
if (conflictCtx == null) {
if (verCheck) {
- if (!isNew() && ATOMIC_VER_COMPARATOR.compare(ver, newVer, true) >= 0) {
- if (ATOMIC_VER_COMPARATOR.compare(ver, newVer, true) == 0 && cctx.writeThrough() && primary) {
+ if (!isNew() && ATOMIC_VER_COMPARATOR.compare(ver, newVer, ignoreTime) >= 0) {
+ if (ATOMIC_VER_COMPARATOR.compare(ver, newVer, ignoreTime) == 0 && cctx.writeThrough() && primary) {
if (log.isDebugEnabled())
log.debug("Received entry update with same version as current (will update store) " +
"[entry=" + this + ", newVer=" + newVer + ']');
@@ -2303,7 +2307,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
}
}
else
- assert isNew() || ATOMIC_VER_COMPARATOR.compare(ver, newVer, true) <= 0 :
+ assert isNew() || ATOMIC_VER_COMPARATOR.compare(ver, newVer, ignoreTime) <= 0 :
"Invalid version for inner update [isNew=" + isNew() + ", entry=" + this + ", newVer=" + newVer + ']';
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/0015962a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
index 5ff5aa4..219b04a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
@@ -85,6 +85,14 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
/** Update request. */
final GridNearAtomicAbstractUpdateRequest updateReq;
+ /** Update response. */
+ @GridToStringExclude
+ private final GridNearAtomicUpdateResponse updateRes;
+
+ /** Completion callback. */
+ @GridToStringExclude
+ private final GridDhtAtomicCache.UpdateReplyClosure completionCb;
+
/** Mappings. */
@GridToStringExclude
protected Map<UUID, GridDhtAtomicAbstractUpdateRequest> mappings;
@@ -99,16 +107,22 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
* @param cctx Cache context.
* @param writeVer Write version.
* @param updateReq Update request.
+ * @param updateRes Response.
+ * @param completionCb Callback to invoke to send response to near node.
*/
protected GridDhtAtomicAbstractUpdateFuture(
GridCacheContext cctx,
GridCacheVersion writeVer,
- GridNearAtomicAbstractUpdateRequest updateReq
- ) {
+ GridNearAtomicAbstractUpdateRequest updateReq,
+ GridNearAtomicUpdateResponse updateRes,
+ GridDhtAtomicCache.UpdateReplyClosure completionCb)
+ {
this.cctx = cctx;
- this.updateReq = updateReq;
this.writeVer = writeVer;
+ this.updateReq = updateReq;
+ this.updateRes = updateRes;
+ this.completionCb = completionCb;
futId = cctx.mvcc().atomicFutureId();
@@ -354,13 +368,8 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
*
* @param nearNode Near node.
* @param ret Cache operation return value.
- * @param updateRes Response.
- * @param completionCb Callback to invoke to send response to near node.
*/
- final void map(ClusterNode nearNode,
- GridCacheReturn ret,
- GridNearAtomicUpdateResponse updateRes,
- GridDhtAtomicCache.UpdateReplyClosure completionCb) {
+ final void map(ClusterNode nearNode, GridCacheReturn ret) {
if (F.isEmpty(mappings)) {
updateRes.dhtNodes(Collections.<UUID>emptyList());
@@ -371,23 +380,27 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
return;
}
- boolean needReplyToNear = updateReq.writeSynchronizationMode() == PRIMARY_SYNC ||
- !ret.emptyResult() ||
- updateRes.nearVersion() != null ||
- cctx.localNodeId().equals(nearNode.id());
+ if (!updateReq.fastMap()) {
+ boolean needReplyToNear = updateReq.writeSynchronizationMode() == PRIMARY_SYNC ||
+ !ret.emptyResult() ||
+ updateRes.nearVersion() != null ||
+ cctx.localNodeId().equals(nearNode.id());
- boolean needMapping = updateReq.fullSync() && (updateReq.needPrimaryResponse() || !sendAllToDht());
+ boolean needMapping = updateReq.fullSync() && (updateReq.needPrimaryResponse() || !sendAllToDht());
- if (needMapping) {
- initMapping(updateRes);
+ if (needMapping) {
+ initMapping(updateRes);
- needReplyToNear = true;
- }
+ needReplyToNear = true;
+ }
- sendDhtRequests(nearNode, ret);
+ sendDhtRequests(nearNode, ret);
- if (needReplyToNear)
- completionCb.apply(updateReq, updateRes);
+ if (needReplyToNear)
+ completionCb.apply(updateReq, updateRes);
+ }
+ else
+ sendDhtRequests(nearNode, ret);
}
/**
@@ -416,14 +429,14 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
try {
assert !cctx.localNodeId().equals(req.nodeId()) : req;
- if (updateReq.fullSync()) {
+ if (!updateReq.fastMap() && updateReq.fullSync()) {
req.nearReplyInfo(nearNode.id(), updateReq.futureId());
if (ret.emptyResult())
req.hasResult(true);
}
- if (cntQryClsrs != null)
+ if (cntQryClsrs != null || updateReq.fastMap())
req.replyWithoutDelay(true);
cctx.io().send(req.nodeId(), req, cctx.ioPolicy());
@@ -519,6 +532,9 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
clsr.apply(suc);
}
+ if (updateReq.fastMap())
+ completionCb.apply(updateReq, updateRes);
+
return true;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/0015962a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index c20ed48..3735824 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -92,6 +92,7 @@ import org.apache.ignite.internal.util.typedef.CI2;
import org.apache.ignite.internal.util.typedef.CO;
import org.apache.ignite.internal.util.typedef.CX1;
import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.P1;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
@@ -106,7 +107,9 @@ import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_ATOMIC_DEFERRED_ACK_BUFFER_SIZE;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_ATOMIC_DEFERRED_ACK_TIMEOUT;
+import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.CLOCK;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.DELETE;
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.UPDATE;
@@ -1115,27 +1118,54 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
int taskNameHash = ctx.kernalContext().job().currentTaskNameHash();
- final GridNearAtomicUpdateFuture updateFut = new GridNearAtomicUpdateFuture(
- ctx,
- this,
- ctx.config().getWriteSynchronizationMode(),
- op,
- map != null ? map.keySet() : invokeMap != null ? invokeMap.keySet() : conflictPutMap != null ?
- conflictPutMap.keySet() : conflictRmvMap.keySet(),
- map != null ? map.values() : invokeMap != null ? invokeMap.values() : null,
- invokeArgs,
- (Collection)(conflictPutMap != null ? conflictPutMap.values() : null),
- conflictRmvMap != null ? conflictRmvMap.values() : null,
- retval,
- rawRetval,
- opCtx != null ? opCtx.expiry() : null,
- CU.filterArray(null),
- subjId,
- taskNameHash,
- opCtx != null && opCtx.skipStore(),
- opCtx != null && opCtx.isKeepBinary(),
- opCtx != null && opCtx.noRetries() ? 1 : MAX_RETRIES,
- waitTopFut);
+ final GridNearAtomicAbstractUpdateFuture updateFut;
+
+ if (isFastMap(null, op)) {
+ updateFut = new GridNearAtomicFastMapUpdateFuture(
+ ctx,
+ this,
+ ctx.config().getWriteSynchronizationMode(),
+ op,
+ map != null ? map.keySet() : invokeMap != null ? invokeMap.keySet() : conflictPutMap != null ?
+ conflictPutMap.keySet() : conflictRmvMap.keySet(),
+ map != null ? map.values() : invokeMap != null ? invokeMap.values() : null,
+ invokeArgs,
+ (Collection)(conflictPutMap != null ? conflictPutMap.values() : null),
+ conflictRmvMap != null ? conflictRmvMap.values() : null,
+ retval,
+ rawRetval,
+ opCtx != null ? opCtx.expiry() : null,
+ CU.filterArray(null),
+ subjId,
+ taskNameHash,
+ opCtx != null && opCtx.skipStore(),
+ opCtx != null && opCtx.isKeepBinary(),
+ opCtx != null && opCtx.noRetries() ? 1 : MAX_RETRIES,
+ waitTopFut);
+ }
+ else {
+ updateFut = new GridNearAtomicUpdateFuture(
+ ctx,
+ this,
+ ctx.config().getWriteSynchronizationMode(),
+ op,
+ map != null ? map.keySet() : invokeMap != null ? invokeMap.keySet() : conflictPutMap != null ?
+ conflictPutMap.keySet() : conflictRmvMap.keySet(),
+ map != null ? map.values() : invokeMap != null ? invokeMap.values() : null,
+ invokeArgs,
+ (Collection)(conflictPutMap != null ? conflictPutMap.values() : null),
+ conflictRmvMap != null ? conflictRmvMap.values() : null,
+ retval,
+ rawRetval,
+ opCtx != null ? opCtx.expiry() : null,
+ CU.filterArray(null),
+ subjId,
+ taskNameHash,
+ opCtx != null && opCtx.skipStore(),
+ opCtx != null && opCtx.isKeepBinary(),
+ opCtx != null && opCtx.noRetries() ? 1 : MAX_RETRIES,
+ waitTopFut);
+ }
if (async) {
return asyncOp(new CO<IgniteInternalFuture<Object>>() {
@@ -1306,7 +1336,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
CacheEntryPredicate[] filters = CU.filterArray(filter);
- if (conflictPutVal == null && conflictRmvVer == null) {
+ if (conflictPutVal == null &&
+ conflictRmvVer == null &&
+ !isFastMap(filters, op)) {
return new GridNearAtomicSingleUpdateFuture(
ctx,
this,
@@ -1328,30 +1360,67 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
);
}
else {
- return new GridNearAtomicUpdateFuture(
- ctx,
- this,
- ctx.config().getWriteSynchronizationMode(),
- op,
- Collections.singletonList(key),
- val0 != null ? Collections.singletonList(val0) : null,
- invokeArgs,
- conflictPutVal != null ? Collections.singleton(conflictPutVal) : null,
- conflictRmvVer != null ? Collections.singleton(conflictRmvVer) : null,
- retval,
- false,
- opCtx != null ? opCtx.expiry() : null,
- filters,
- ctx.subjectIdPerCall(null, opCtx),
- ctx.kernalContext().job().currentTaskNameHash(),
- opCtx != null && opCtx.skipStore(),
- opCtx != null && opCtx.isKeepBinary(),
- opCtx != null && opCtx.noRetries() ? 1 : MAX_RETRIES,
- waitTopFut);
+ if (isFastMap(filters, op)) {
+ return new GridNearAtomicFastMapUpdateFuture(
+ ctx,
+ this,
+ ctx.config().getWriteSynchronizationMode(),
+ op,
+ Collections.singletonList(key),
+ val0 != null ? Collections.singletonList(val0) : null,
+ invokeArgs,
+ conflictPutVal != null ? Collections.singleton(conflictPutVal) : null,
+ conflictRmvVer != null ? Collections.singleton(conflictRmvVer) : null,
+ retval,
+ false,
+ opCtx != null ? opCtx.expiry() : null,
+ filters,
+ ctx.subjectIdPerCall(null, opCtx),
+ ctx.kernalContext().job().currentTaskNameHash(),
+ opCtx != null && opCtx.skipStore(),
+ opCtx != null && opCtx.isKeepBinary(),
+ opCtx != null && opCtx.noRetries() ? 1 : MAX_RETRIES,
+ waitTopFut);
+ }
+ else {
+ return new GridNearAtomicUpdateFuture(
+ ctx,
+ this,
+ ctx.config().getWriteSynchronizationMode(),
+ op,
+ Collections.singletonList(key),
+ val0 != null ? Collections.singletonList(val0) : null,
+ invokeArgs,
+ conflictPutVal != null ? Collections.singleton(conflictPutVal) : null,
+ conflictRmvVer != null ? Collections.singleton(conflictRmvVer) : null,
+ retval,
+ false,
+ opCtx != null ? opCtx.expiry() : null,
+ filters,
+ ctx.subjectIdPerCall(null, opCtx),
+ ctx.kernalContext().job().currentTaskNameHash(),
+ opCtx != null && opCtx.skipStore(),
+ opCtx != null && opCtx.isKeepBinary(),
+ opCtx != null && opCtx.noRetries() ? 1 : MAX_RETRIES,
+ waitTopFut);
+ }
}
}
/**
+ * Whether this is fast-map operation.
+ *
+ * @param filters Filters.
+ * @param op Operation.
+ * @return {@code True} if fast-map.
+ */
+ public boolean isFastMap(CacheEntryPredicate[] filters, GridCacheOperation op) {
+ return F.isEmpty(filters) && op != TRANSFORM && ctx.config().getWriteSynchronizationMode() == FULL_SYNC &&
+ ctx.config().getAtomicWriteOrderMode() == CLOCK &&
+ !(ctx.writeThrough() && ctx.config().getInterceptor() != null);
+ }
+
+ /**
* Entry point for all public API remove methods.
*
* @param keys Keys to remove.
@@ -1394,26 +1463,52 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
});
}
- final GridNearAtomicUpdateFuture updateFut = new GridNearAtomicUpdateFuture(
- ctx,
- this,
- ctx.config().getWriteSynchronizationMode(),
- DELETE,
- keys != null ? keys : conflictMap.keySet(),
- null,
- null,
- null,
- drVers != null ? drVers : (keys != null ? null : conflictMap.values()),
- retval,
- rawRetval,
- opCtx != null ? opCtx.expiry() : null,
- CU.filterArray(null),
- subjId,
- taskNameHash,
- opCtx != null && opCtx.skipStore(),
- opCtx != null && opCtx.isKeepBinary(),
- opCtx != null && opCtx.noRetries() ? 1 : MAX_RETRIES,
- true);
+ final GridNearAtomicAbstractUpdateFuture updateFut;
+
+ if (isFastMap(null, DELETE)) {
+ updateFut = new GridNearAtomicFastMapUpdateFuture(
+ ctx,
+ this,
+ ctx.config().getWriteSynchronizationMode(),
+ DELETE,
+ keys != null ? keys : conflictMap.keySet(),
+ null,
+ null,
+ null,
+ drVers != null ? drVers : (keys != null ? null : conflictMap.values()),
+ retval,
+ rawRetval,
+ opCtx != null ? opCtx.expiry() : null,
+ CU.filterArray(null),
+ subjId,
+ taskNameHash,
+ opCtx != null && opCtx.skipStore(),
+ opCtx != null && opCtx.isKeepBinary(),
+ opCtx != null && opCtx.noRetries() ? 1 : MAX_RETRIES,
+ true);
+ }
+ else {
+ updateFut = new GridNearAtomicUpdateFuture(
+ ctx,
+ this,
+ ctx.config().getWriteSynchronizationMode(),
+ DELETE,
+ keys != null ? keys : conflictMap.keySet(),
+ null,
+ null,
+ null,
+ drVers != null ? drVers : (keys != null ? null : conflictMap.values()),
+ retval,
+ rawRetval,
+ opCtx != null ? opCtx.expiry() : null,
+ CU.filterArray(null),
+ subjId,
+ taskNameHash,
+ opCtx != null && opCtx.skipStore(),
+ opCtx != null && opCtx.isKeepBinary(),
+ opCtx != null && opCtx.noRetries() ? 1 : MAX_RETRIES,
+ true);
+ }
if (async) {
return asyncOp(new CO<IgniteInternalFuture<Object>>() {
@@ -1773,29 +1868,36 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
return;
}
- // Do not check topology version if topology was locked on near node by
+ // Do not check topology version for CLOCK versioning since
+ // partition exchange will wait for near update future (if future is on server node).
+ // Also do not check topology version if topology was locked on near node by
// external transaction or explicit lock.
- if (req.topologyLocked() || !needRemap(req.topologyVersion(), top.topologyVersion())) {
+ if ((req.fastMap() && !req.clientRequest()) || req.topologyLocked() ||
+ !needRemap(req.topologyVersion(), top.topologyVersion())) {
locked = lockEntries(req, req.topologyVersion());
boolean hasNear = ctx.discovery().cacheNearNode(node, name());
- // Assign next version for update inside entries lock.
- GridCacheVersion ver = ctx.versions().next(top.topologyVersion());
+ GridCacheVersion ver = req.updateVersion();
+
+ if (ver == null) {
+ // Assign next version for update inside entries lock.
+ ver = ctx.versions().next(top.topologyVersion());
- if (hasNear)
- res.nearVersion(ver);
+ if (hasNear)
+ res.nearVersion(ver);
- if (msgLog.isDebugEnabled()) {
- msgLog.debug("Assigned update version [futId=" + req.futureId() +
- ", writeVer=" + ver + ']');
+ if (msgLog.isDebugEnabled()) {
+ msgLog.debug("Assigned update version [futId=" + req.futureId() +
+ ", writeVer=" + ver + ']');
+ }
}
assert ver != null : "Got null version for update request: " + req;
boolean sndPrevVal = !top.rebalanceFinished(req.topologyVersion());
- dhtFut = createDhtFuture(ver, req);
+ dhtFut = req.fastMap() ? null : createDhtFuture(ver, req, res, completionCb);
expiry = expiryPolicy(req.expiry());
@@ -1812,6 +1914,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
hasNear,
req,
res,
+ completionCb,
locked,
ver,
dhtFut,
@@ -1831,6 +1934,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
hasNear,
req,
res,
+ completionCb,
locked,
ver,
dhtFut,
@@ -1910,9 +2014,15 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
completionCb.apply(req, res);
}
- else
+ else {
if (dhtFut != null)
- dhtFut.map(node, res.returnValue(), res, completionCb);
+ dhtFut.map(node, res.returnValue());
+ else {
+ assert req.fastMap() : req;
+
+ completionCb.apply(req, res);
+ }
+ }
if (req.writeSynchronizationMode() != FULL_ASYNC)
req.cleanup(!node.isLocal());
@@ -1943,6 +2053,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
final boolean hasNear,
final GridNearAtomicAbstractUpdateRequest req,
final GridNearAtomicUpdateResponse res,
+ GridDhtAtomicCache.UpdateReplyClosure completionCb,
final List<GridDhtCacheEntry> locked,
final GridCacheVersion ver,
@Nullable GridDhtAtomicAbstractUpdateFuture dhtFut,
@@ -1990,6 +2101,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
for (int i = 0; i < locked.size(); i++) {
GridDhtCacheEntry entry = locked.get(i);
+ if (entry == null)
+ continue;
+
try {
if (!checkFilter(entry, req, res)) {
if (expiry != null && entry.hasValue()) {
@@ -2095,6 +2209,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
dhtFut,
req,
res,
+ completionCb,
replicate,
updRes,
taskName,
@@ -2143,6 +2258,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
dhtFut,
req,
res,
+ completionCb,
replicate,
updRes,
taskName,
@@ -2269,6 +2385,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
dhtFut,
req,
res,
+ completionCb,
replicate,
updRes,
taskName,
@@ -2358,6 +2475,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
boolean hasNear,
GridNearAtomicAbstractUpdateRequest req,
GridNearAtomicUpdateResponse res,
+ GridDhtAtomicCache.UpdateReplyClosure completionCb,
List<GridDhtCacheEntry> locked,
GridCacheVersion ver,
@Nullable GridDhtAtomicAbstractUpdateFuture dhtFut,
@@ -2373,6 +2491,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
boolean checkReaders = hasNear || ctx.discovery().hasNearCache(ctx.cacheId(), topVer);
+ boolean readersOnly = false;
+
boolean intercept = ctx.config().getInterceptor() != null;
AffinityAssignment affAssignment = ctx.affinity().assignment(topVer);
@@ -2388,12 +2508,18 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
try {
GridDhtCacheEntry entry = locked.get(i);
+ if (entry == null)
+ continue;
+
GridCacheVersion newConflictVer = req.conflictVersion(i);
long newConflictTtl = req.conflictTtl(i);
long newConflictExpireTime = req.conflictExpireTime(i);
assert !(newConflictVer instanceof GridCacheVersionEx) : newConflictVer;
+ boolean primary = !req.fastMap() || ctx.affinity().primaryByPartition(ctx.localNode(), entry.partition(),
+ req.topologyVersion());
+
Object writeVal = op == TRANSFORM ? req.entryProcessor(i) : req.writeValue(i);
Collection<UUID> readers = null;
@@ -2411,18 +2537,19 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
op,
writeVal,
req.invokeArguments(),
- writeThrough() && !req.skipStore(),
+ (primary || (ctx.store().isLocal() && !ctx.shared().localStorePrimaryOnly()))
+ && writeThrough() && !req.skipStore(),
!req.skipStore(),
sndPrevVal || req.returnValue(),
req.keepBinary(),
expiry,
/*event*/true,
/*metrics*/true,
- /*primary*/true,
- /*verCheck*/false,
+ primary,
+ ctx.config().getAtomicWriteOrderMode() == CLOCK, // Check version in CLOCK mode on primary node.
topVer,
req.filter(),
- replicate ? DR_PRIMARY : DR_NONE,
+ replicate ? primary ? DR_PRIMARY : DR_BACKUP : DR_NONE,
newConflictTtl,
newConflictExpireTime,
newConflictVer,
@@ -2434,6 +2561,12 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
/*updateCntr*/null,
dhtFut);
+ if (dhtFut == null && !F.isEmpty(filteredReaders)) {
+ dhtFut = createDhtFuture(ver, req, res, completionCb);
+
+ readersOnly = true;
+ }
+
if (dhtFut != null) {
if (updRes.sendToDht()) { // Send to backups even in case of remove-remove scenarios.
GridCacheVersionConflictContext<?, ?> conflictCtx = updRes.conflictResolveResult();
@@ -2445,17 +2578,19 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
EntryProcessor<Object, Object, Object> entryProcessor = null;
- dhtFut.addWriteEntry(
- affAssignment,
- entry,
- updRes.newValue(),
- entryProcessor,
- updRes.newTtl(),
- updRes.conflictExpireTime(),
- newConflictVer,
- sndPrevVal,
- updRes.oldValue(),
- updRes.updateCounter());
+ if (!readersOnly) {
+ dhtFut.addWriteEntry(
+ affAssignment,
+ entry,
+ updRes.newValue(),
+ entryProcessor,
+ updRes.newTtl(),
+ updRes.conflictExpireTime(),
+ newConflictVer,
+ sndPrevVal,
+ updRes.oldValue(),
+ updRes.updateCounter());
+ }
if (!F.isEmpty(filteredReaders))
dhtFut.addNearWriteEntries(
@@ -2474,7 +2609,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
}
if (hasNear) {
- if (updRes.sendToDht()) {
+ if (primary && updRes.sendToDht()) {
if (!ctx.affinity().partitionBelongs(nearNode, entry.partition(), topVer)) {
// If put the same value as in request then do not need to send it back.
if (op == TRANSFORM || writeVal != updRes.newValue()) {
@@ -2580,6 +2715,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
@Nullable GridDhtAtomicAbstractUpdateFuture dhtFut,
final GridNearAtomicAbstractUpdateRequest req,
final GridNearAtomicUpdateResponse res,
+ GridDhtAtomicCache.UpdateReplyClosure completionCb,
final boolean replicate,
final UpdateBatchResult batchRes,
final String taskName,
@@ -2600,8 +2736,17 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
GridCacheOperation op;
if (putMap != null) {
+ // If fast mapping, filter primary keys for write to store.
+ Map<KeyCacheObject, CacheObject> storeMap = req.fastMap() ?
+ F.view(putMap, new P1<CacheObject>() {
+ @Override public boolean apply(CacheObject key) {
+ return ctx.affinity().primaryByKey(ctx.localNode(), key, req.topologyVersion());
+ }
+ }) :
+ putMap;
+
try {
- ctx.store().putAll(null, F.viewReadOnly(putMap, new C1<CacheObject, IgniteBiTuple<CacheObject, GridCacheVersion>>() {
+ ctx.store().putAll(null, F.viewReadOnly(storeMap, new C1<CacheObject, IgniteBiTuple<CacheObject, GridCacheVersion>>() {
@Override public IgniteBiTuple<CacheObject, GridCacheVersion> apply(CacheObject v) {
return F.t(v, ver);
}
@@ -2614,8 +2759,17 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
op = UPDATE;
}
else {
+ // If fast mapping, filter primary keys for write to store.
+ Collection<KeyCacheObject> storeKeys = req.fastMap() ?
+ F.view(rmvKeys, new P1<Object>() {
+ @Override public boolean apply(Object key) {
+ return ctx.affinity().primaryByKey(ctx.localNode(), key, req.topologyVersion());
+ }
+ }) :
+ rmvKeys;
+
try {
- ctx.store().removeAll(null, rmvKeys);
+ ctx.store().removeAll(null, storeKeys);
}
catch (CacheStorePartialUpdateException e) {
storeErr = e;
@@ -2650,6 +2804,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
assert writeVal != null || op == DELETE : "null write value found.";
+ boolean primary = !req.fastMap() || ctx.affinity().primaryByPartition(ctx.localNode(),
+ entry.partition(),
+ req.topologyVersion());
+
Collection<UUID> readers = null;
Collection<UUID> filteredReaders = null;
@@ -2672,11 +2830,11 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
expiry,
/*event*/true,
/*metrics*/true,
- /*primary*/true,
- /*verCheck*/false,
+ primary,
+ ctx.config().getAtomicWriteOrderMode() == CLOCK, // Check version in CLOCK mode on primary node.
topVer,
null,
- replicate ? DR_PRIMARY : DR_NONE,
+ replicate ? primary ? DR_PRIMARY : DR_BACKUP : DR_NONE,
CU.TTL_NOT_CHANGED,
CU.EXPIRE_TIME_CALCULATE,
null,
@@ -2710,23 +2868,31 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
batchRes.addDeleted(entry, updRes, entries);
+ if (dhtFut == null && !F.isEmpty(filteredReaders)) {
+ dhtFut = createDhtFuture(ver, req, res, completionCb);
+
+ batchRes.readersOnly(true);
+ }
+
if (dhtFut != null) {
EntryProcessor<Object, Object, Object> entryProcessor =
entryProcessorMap == null ? null : entryProcessorMap.get(entry.key());
- dhtFut.addWriteEntry(
- affAssignment,
- entry,
- writeVal,
- entryProcessor,
- updRes.newTtl(),
- CU.EXPIRE_TIME_CALCULATE,
- null,
- sndPrevVal,
- updRes.oldValue(),
- updRes.updateCounter());
+ if (!batchRes.readersOnly()) {
+ dhtFut.addWriteEntry(
+ affAssignment,
+ entry,
+ writeVal,
+ entryProcessor,
+ updRes.newTtl(),
+ CU.EXPIRE_TIME_CALCULATE,
+ null,
+ sndPrevVal,
+ updRes.oldValue(),
+ updRes.updateCounter());
+ }
- if (!F.isEmpty(filteredReaders))
+ if (!F.isEmpty(filteredReaders)) {
dhtFut.addNearWriteEntries(
filteredReaders,
entry,
@@ -2734,29 +2900,34 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
entryProcessor,
updRes.newTtl(),
CU.EXPIRE_TIME_CALCULATE);
+ }
}
if (hasNear) {
- if (!ctx.affinity().partitionBelongs(nearNode, entry.partition(), topVer)) {
- int idx = firstEntryIdx + i;
-
- if (req.operation() == TRANSFORM) {
- res.addNearValue(idx,
- writeVal,
- updRes.newTtl(),
- CU.EXPIRE_TIME_CALCULATE);
- }
- else
- res.addNearTtl(idx, updRes.newTtl(), CU.EXPIRE_TIME_CALCULATE);
+ if (primary) {
+ if (!ctx.affinity().partitionBelongs(nearNode, entry.partition(), topVer)) {
+ int idx = firstEntryIdx + i;
+
+ if (req.operation() == TRANSFORM) {
+ res.addNearValue(idx,
+ writeVal,
+ updRes.newTtl(),
+ CU.EXPIRE_TIME_CALCULATE);
+ }
+ else
+ res.addNearTtl(idx, updRes.newTtl(), CU.EXPIRE_TIME_CALCULATE);
- if (writeVal != null || entry.hasValue()) {
- IgniteInternalFuture<Boolean> f = entry.addReader(nearNode.id(), req.messageId(), topVer);
+ if (writeVal != null || entry.hasValue()) {
+ IgniteInternalFuture<Boolean> f = entry.addReader(nearNode.id(), req.messageId(), topVer);
- assert f == null : f;
+ assert f == null : f;
+ }
}
+ else if (readers.contains(nearNode.id())) // Reader became primary or backup.
+ entry.removeReader(nearNode.id(), req.messageId());
+ else
+ res.addSkippedIndex(firstEntryIdx + i);
}
- else if (readers.contains(nearNode.id())) // Reader became primary or backup.
- entry.removeReader(nearNode.id(), req.messageId());
else
res.addSkippedIndex(firstEntryIdx + i);
}
@@ -2800,14 +2971,23 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
KeyCacheObject key = req.key(0);
while (true) {
- GridDhtCacheEntry entry = entryExx(key, topVer);
+ try {
+ GridDhtCacheEntry entry = entryExx(key, topVer);
- GridUnsafe.monitorEnter(entry);
+ GridUnsafe.monitorEnter(entry);
- if (entry.obsolete())
- GridUnsafe.monitorExit(entry);
- else
- return Collections.singletonList(entry);
+ if (entry.obsolete())
+ GridUnsafe.monitorExit(entry);
+ else
+ return Collections.singletonList(entry);
+ }
+ catch (GridDhtInvalidPartitionException e) {
+ // Ignore invalid partition exception in CLOCK ordering mode.
+ if (ctx.config().getAtomicWriteOrderMode() == CLOCK)
+ return Collections.singletonList(null);
+ else
+ throw e;
+ }
}
}
else {
@@ -2815,9 +2995,18 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
while (true) {
for (int i = 0; i < req.size(); i++) {
- GridDhtCacheEntry entry = entryExx(req.key(i), topVer);
+ try {
+ GridDhtCacheEntry entry = entryExx(req.key(i), topVer);
- locked.add(entry);
+ locked.add(entry);
+ }
+ catch (GridDhtInvalidPartitionException e) {
+ // Ignore invalid partition exception in CLOCK ordering mode.
+ if (ctx.config().getAtomicWriteOrderMode() == CLOCK)
+ locked.add(null);
+ else
+ throw e;
+ }
}
boolean retry = false;
@@ -2970,28 +3159,54 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
drPutVals = null;
}
- GridNearAtomicUpdateFuture updateFut = new GridNearAtomicUpdateFuture(
- ctx,
- this,
- ctx.config().getWriteSynchronizationMode(),
- req.operation(),
- req.keys(),
- vals,
- req.invokeArguments(),
- drPutVals,
- drRmvVals,
- req.returnValue(),
- false,
- req.expiry(),
- req.filter(),
- req.subjectId(),
- req.taskNameHash(),
- req.skipStore(),
- req.keepBinary(),
- MAX_RETRIES,
- true);
+ if (isFastMap(req.filter(), req.operation())) {
+ GridNearAtomicFastMapUpdateFuture updateFut = new GridNearAtomicFastMapUpdateFuture(
+ ctx,
+ this,
+ ctx.config().getWriteSynchronizationMode(),
+ req.operation(),
+ req.keys(),
+ vals,
+ req.invokeArguments(),
+ drPutVals,
+ drRmvVals,
+ req.returnValue(),
+ false,
+ req.expiry(),
+ req.filter(),
+ req.subjectId(),
+ req.taskNameHash(),
+ req.skipStore(),
+ req.keepBinary(),
+ MAX_RETRIES,
+ true);
- updateFut.map();
+ updateFut.map();
+ }
+ else {
+ GridNearAtomicUpdateFuture updateFut = new GridNearAtomicUpdateFuture(
+ ctx,
+ this,
+ ctx.config().getWriteSynchronizationMode(),
+ req.operation(),
+ req.keys(),
+ vals,
+ req.invokeArguments(),
+ drPutVals,
+ drRmvVals,
+ req.returnValue(),
+ false,
+ req.expiry(),
+ req.filter(),
+ req.subjectId(),
+ req.taskNameHash(),
+ req.skipStore(),
+ req.keepBinary(),
+ MAX_RETRIES,
+ true);
+
+ updateFut.map();
+ }
}
/**
@@ -3003,12 +3218,14 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
*/
private GridDhtAtomicAbstractUpdateFuture createDhtFuture(
GridCacheVersion writeVer,
- GridNearAtomicAbstractUpdateRequest updateReq
+ GridNearAtomicAbstractUpdateRequest updateReq,
+ GridNearAtomicUpdateResponse updateRes,
+ GridDhtAtomicCache.UpdateReplyClosure completionCb
) {
if (updateReq.size() == 1)
- return new GridDhtAtomicSingleUpdateFuture(ctx, writeVer, updateReq);
+ return new GridDhtAtomicSingleUpdateFuture(ctx, writeVer, updateReq, updateRes, completionCb);
else
- return new GridDhtAtomicUpdateFuture(ctx, writeVer, updateReq);
+ return new GridDhtAtomicUpdateFuture(ctx, writeVer, updateReq, updateRes, completionCb);
}
/**
@@ -3520,6 +3737,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
private GridDhtAtomicAbstractUpdateFuture dhtFut;
/** */
+ private boolean readersOnly;
+
+ /** */
private GridCacheReturn invokeRes;
/**
@@ -3572,6 +3792,20 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
private void dhtFuture(@Nullable GridDhtAtomicAbstractUpdateFuture dhtFut) {
this.dhtFut = dhtFut;
}
+
+ /**
+ * @return {@code True} if only readers (not backups) should be updated.
+ */
+ private boolean readersOnly() {
+ return readersOnly;
+ }
+
+ /**
+ * @param readersOnly {@code True} if only readers (not backups) should be updated.
+ */
+ private void readersOnly(boolean readersOnly) {
+ this.readersOnly = readersOnly;
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/0015962a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
index 8ebe9c3..8ad6496 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
@@ -50,9 +50,11 @@ class GridDhtAtomicSingleUpdateFuture extends GridDhtAtomicAbstractUpdateFuture
GridDhtAtomicSingleUpdateFuture(
GridCacheContext cctx,
GridCacheVersion writeVer,
- GridNearAtomicAbstractUpdateRequest updateReq
+ GridNearAtomicAbstractUpdateRequest updateReq,
+ GridNearAtomicUpdateResponse updateRes,
+ GridDhtAtomicCache.UpdateReplyClosure completionCb
) {
- super(cctx, writeVer, updateReq);
+ super(cctx, writeVer, updateReq, updateRes, completionCb);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/0015962a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
index 5d5ddf0..6de08c2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
@@ -49,9 +49,11 @@ class GridDhtAtomicUpdateFuture extends GridDhtAtomicAbstractUpdateFuture {
GridDhtAtomicUpdateFuture(
GridCacheContext cctx,
GridCacheVersion writeVer,
- GridNearAtomicAbstractUpdateRequest updateReq
+ GridNearAtomicAbstractUpdateRequest updateReq,
+ GridNearAtomicUpdateResponse updateRes,
+ GridDhtAtomicCache.UpdateReplyClosure completionCb
) {
- super(cctx, writeVer, updateReq);
+ super(cctx, writeVer, updateReq, updateRes, completionCb);
mappings = U.newHashMap(updateReq.size());
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/0015962a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractSingleUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractSingleUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractSingleUpdateRequest.java
index 6811236..10f368e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractSingleUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractSingleUpdateRequest.java
@@ -73,6 +73,8 @@ public abstract class GridNearAtomicAbstractSingleUpdateRequest extends GridNear
boolean retval,
@Nullable UUID subjId,
int taskNameHash,
+ boolean fastMap,
+ boolean clientReq,
boolean mappingKnown,
boolean skipStore,
boolean keepBinary,
@@ -88,6 +90,8 @@ public abstract class GridNearAtomicAbstractSingleUpdateRequest extends GridNear
retval,
subjId,
taskNameHash,
+ fastMap,
+ clientReq,
mappingKnown,
skipStore,
keepBinary,
http://git-wip-us.apache.org/repos/asf/ignite/blob/0015962a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
index 39abb73..59f3e76 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
@@ -42,6 +42,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheMvccManager;
import org.apache.ignite.internal.processors.cache.GridCacheOperation;
import org.apache.ignite.internal.processors.cache.GridCacheReturn;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.F;
@@ -51,6 +52,7 @@ import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteUuid;
import org.jetbrains.annotations.Nullable;
+import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.CLOCK;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC;
@@ -140,6 +142,9 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
@GridToStringInclude
protected Long futId;
+ /** */
+ protected GridCacheVersion updVer;
+
/** Operation result. */
protected GridCacheReturn opRes;
@@ -208,12 +213,32 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
}
/** {@inheritDoc} */
- @Override public final IgniteInternalFuture<Void> completeFuture(AffinityTopologyVersion topVer) {
+ @Override public IgniteInternalFuture<Void> completeFuture(AffinityTopologyVersion topVer) {
return null;
}
/**
* @param req Request.
+ * @param topVer Topology version.
+ */
+ final void initUpdateVersion(GridNearAtomicAbstractUpdateRequest req, AffinityTopologyVersion topVer) {
+ // Assign version on near node in CLOCK ordering mode even if fastMap is false.
+ if (cctx.config().getAtomicWriteOrderMode() == CLOCK) {
+ GridCacheVersion updVer = this.updVer;
+
+ if (updVer == null) {
+ this.updVer = updVer = cctx.versions().next(topVer);
+
+ if (log.isDebugEnabled())
+ log.debug("Assigned fast-map version for update on near node: " + updVer);
+ }
+
+ req.updateVersion(updVer);
+ }
+ }
+
+ /**
+ * @param req Request.
*/
void sendCheckUpdateRequest(GridNearAtomicCheckUpdateRequest req) {
try {
@@ -241,14 +266,29 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
// Cannot remap.
remapCnt = 1;
- map(topVer);
+ if (fastMap()) {
+ Long futId = addAtomicFuture(topVer);
+
+ if (futId != null)
+ map(topVer, futId);
+ }
+ else
+ map(topVer, futId);
}
}
/**
+ * @return {@code True} for fast map update mode.
+ */
+ protected boolean fastMap() {
+ return false;
+ }
+
+ /**
* @param topVer Topology version.
+ * @param futId Future ID.
*/
- protected abstract void map(AffinityTopologyVersion topVer);
+ protected abstract void map(AffinityTopologyVersion topVer, Long futId);
/**
* Maps future on ready topology.
@@ -274,7 +314,7 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
* @return {@code True} future is stored by {@link GridCacheMvccManager#addAtomicFuture}.
*/
final boolean storeFuture() {
- return syncMode != FULL_ASYNC;
+ return syncMode != FULL_ASYNC || cctx.config().getAtomicWriteOrderMode() == CLOCK;
}
/**
@@ -416,6 +456,30 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
}
/**
+ * Adds future prevents topology change before operation complete.
+ * Should be invoked before topology lock released.
+ *
+ * @param topVer Topology version.
+ * @return Future version in case future added.
+ */
+ protected final Long addAtomicFuture(AffinityTopologyVersion topVer) {
+ Long futId = cctx.mvcc().atomicFutureId();
+
+ synchronized (mux) {
+ assert this.futId == null : this;
+ assert this.topVer == AffinityTopologyVersion.ZERO : this;
+
+ this.topVer = topVer;
+ this.futId = futId;
+ }
+
+ if (storeFuture() && !cctx.mvcc().addAtomicFuture(futId, this))
+ return null;
+
+ return futId;
+ }
+
+ /**
*
*/
static class PrimaryRequestState {
http://git-wip-us.apache.org/repos/asf/ignite/blob/0015962a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
index a43bfb0..bdf2678 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
@@ -48,7 +48,7 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa
/** Message index. */
public static final int CACHE_MSG_IDX = nextIndexId();
- /** . */
+ /** */
private static final int NEED_PRIMARY_RES_FLAG_MASK = 0x01;
/** Topology locked flag. Set if atomic update is performed inside TX or explicit lock. */
@@ -63,6 +63,15 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa
/** Return value flag. */
private static final int RET_VAL_FLAG_MASK = 0x10;
+ /** Fast map update flag. */
+ private static final int FAST_MAP_FLAG_MASK = 0x20;
+
+ /** Client node request flag. */
+ private static final int CLIENT_REQ_FLAG_MASK = 0x40;
+
+ /** Client node request flag. */
+ private static final int HAS_PRIMARY_FLAG_MASK = 0x80;
+
/** Target node ID. */
@GridDirectTransient
protected UUID nodeId;
@@ -93,6 +102,9 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa
@GridDirectTransient
private GridNearAtomicUpdateResponse res;
+ /** Update version. Set to non-null if fastMap is {@code true}. */
+ private GridCacheVersion updateVer;
+
/**
*
*/
@@ -129,6 +141,8 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa
boolean retval,
@Nullable UUID subjId,
int taskNameHash,
+ boolean fastMap,
+ boolean clientReq,
boolean needPrimaryRes,
boolean skipStore,
boolean keepBinary,
@@ -144,16 +158,62 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa
this.taskNameHash = taskNameHash;
this.addDepInfo = addDepInfo;
+ if (fastMap)
+ setFlag(true, FAST_MAP_FLAG_MASK);
+ if (clientReq)
+ setFlag(true, CLIENT_REQ_FLAG_MASK);
if (needPrimaryRes)
- needPrimaryResponse(true);
+ setFlag(true, NEED_PRIMARY_RES_FLAG_MASK);
if (topLocked)
- topologyLocked(true);
+ setFlag(true, TOP_LOCKED_FLAG_MASK);
if (retval)
- returnValue(true);
+ setFlag(true, RET_VAL_FLAG_MASK);
if (skipStore)
- skipStore(true);
+ setFlag(true, SKIP_STORE_FLAG_MASK);
if (keepBinary)
- keepBinary(true);
+ setFlag(true, KEEP_BINARY_FLAG_MASK);
+ }
+
+ /**
+ * @return Flag indicating whether this request contains primary keys.
+ */
+ boolean hasPrimary() {
+ return isFlag(HAS_PRIMARY_FLAG_MASK);
+ }
+
+ /**
+ * @param val Flag indicating whether this request contains primary keys.
+ */
+ void hasPrimary(boolean val) {
+ setFlag(val, HAS_PRIMARY_FLAG_MASK);
+ }
+
+ /**
+ * @return Flag indicating whether this is fast-map update.
+ */
+ boolean fastMap() {
+ return isFlag(FAST_MAP_FLAG_MASK);
+ }
+
+ /**
+ * @return {@code True} if request sent from client node.
+ */
+ boolean clientRequest() {
+ return isFlag(CLIENT_REQ_FLAG_MASK);
+ }
+
+ /**
+ * @return Update version for fast-map request.
+ */
+ @Nullable public final GridCacheVersion updateVersion() {
+ return updateVer;
+ }
+
+ /**
+ * @param updateVer Update version for fast-map request.
+ */
+ final void updateVersion(GridCacheVersion updateVer) {
+ this.updateVer = updateVer;
}
/** {@inheritDoc} */
@@ -291,13 +351,6 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa
}
/**
- * @param val {@code True} if topology is locked on near node.
- */
- private void topologyLocked(boolean val) {
- setFlag(val, TOP_LOCKED_FLAG_MASK);
- }
-
- /**
* @return Return value flag.
*/
public final boolean returnValue() {
@@ -305,13 +358,6 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa
}
/**
- * @param val Return value flag.
- */
- public final void returnValue(boolean val) {
- setFlag(val, RET_VAL_FLAG_MASK);
- }
-
- /**
* @return Skip write-through to a persistent storage.
*/
public final boolean skipStore() {
@@ -319,13 +365,6 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa
}
/**
- * @param val Skip store flag.
- */
- public void skipStore(boolean val) {
- setFlag(val, SKIP_STORE_FLAG_MASK);
- }
-
- /**
* @return Keep binary flag.
*/
public final boolean keepBinary() {
@@ -333,13 +372,6 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa
}
/**
- * @param val Keep binary flag.
- */
- public void keepBinary(boolean val) {
- setFlag(val, KEEP_BINARY_FLAG_MASK);
- }
-
- /**
* Sets flag mask.
*
* @param flag Set or clear.
@@ -380,12 +412,14 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa
* @param conflictTtl Conflict TTL (optional).
* @param conflictExpireTime Conflict expire time (optional).
* @param conflictVer Conflict version (optional).
+ * @param primary If given key is primary on this mapping.
*/
abstract void addUpdateEntry(KeyCacheObject key,
@Nullable Object val,
long conflictTtl,
long conflictExpireTime,
- @Nullable GridCacheVersion conflictVer);
+ @Nullable GridCacheVersion conflictVer,
+ boolean primary);
/**
* @return Keys for this update request.
@@ -458,7 +492,7 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 10;
+ return 11;
}
/** {@inheritDoc} */
@@ -518,6 +552,12 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa
writer.incrementState();
+ case 10:
+ if (!writer.writeMessage("updateVer", updateVer))
+ return false;
+
+ writer.incrementState();
+
}
return true;
@@ -598,6 +638,14 @@ public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessa
reader.incrementState();
+ case 10:
+ updateVer = reader.readMessage("updateVer");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
}
return reader.afterMessageRead(GridNearAtomicAbstractUpdateRequest.class);