You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by av...@apache.org on 2016/11/21 13:43:02 UTC
[14/15] ignite git commit: IGNITE-3074 Optimize DHT atomic update
future
IGNITE-3074 Optimize DHT atomic update future
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/88f38ac6
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/88f38ac6
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/88f38ac6
Branch: refs/heads/ignite-4242
Commit: 88f38ac6305578946f2881b12d2d557bd561f67d
Parents: a24a394
Author: Konstantin Dudkov <kd...@ya.ru>
Authored: Mon Nov 21 15:11:09 2016 +0300
Committer: Konstantin Dudkov <kd...@ya.ru>
Committed: Mon Nov 21 15:11:09 2016 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheEntryEx.java | 4 +-
.../processors/cache/GridCacheMapEntry.java | 4 +-
.../GridDhtAtomicAbstractUpdateFuture.java | 461 +++++++++++++++++++
.../dht/atomic/GridDhtAtomicCache.java | 33 +-
.../atomic/GridDhtAtomicSingleUpdateFuture.java | 121 +++++
.../dht/atomic/GridDhtAtomicUpdateFuture.java | 393 +---------------
.../atomic/GridNearAtomicFullUpdateRequest.java | 24 +-
.../GridNearAtomicSingleUpdateFuture.java | 4 +-
.../continuous/CacheContinuousQueryHandler.java | 4 +-
.../CacheContinuousQueryListener.java | 4 +-
.../continuous/CacheContinuousQueryManager.java | 6 +-
.../processors/cache/GridCacheTestEntryEx.java | 4 +-
12 files changed, 638 insertions(+), 424 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/88f38ac6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
index ef6a244..176fe77 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
@@ -27,7 +27,7 @@ import org.apache.ignite.cache.eviction.EvictableEntry;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.distributed.GridDistributedLockCancelledException;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
-import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateFuture;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateFuture;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -504,7 +504,7 @@ public interface GridCacheEntryEx {
String taskName,
@Nullable CacheObject prevVal,
@Nullable Long updateCntr,
- @Nullable GridDhtAtomicUpdateFuture fut
+ @Nullable GridDhtAtomicAbstractUpdateFuture fut
) throws IgniteCheckedException, GridCacheEntryRemovedException;
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/88f38ac6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 5996672..2bcf360 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -43,7 +43,7 @@ import org.apache.ignite.internal.managers.deployment.GridDeploymentInfoBean;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
-import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateFuture;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateFuture;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheEntry;
import org.apache.ignite.internal.processors.cache.extras.GridCacheEntryExtras;
import org.apache.ignite.internal.processors.cache.extras.GridCacheMvccEntryExtras;
@@ -1951,7 +1951,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
final String taskName,
@Nullable final CacheObject prevVal,
@Nullable final Long updateCntr,
- @Nullable GridDhtAtomicUpdateFuture fut
+ @Nullable GridDhtAtomicAbstractUpdateFuture fut
) throws IgniteCheckedException, GridCacheEntryRemovedException, GridClosureException {
assert cctx.atomic();
http://git-wip-us.apache.org/repos/asf/ignite/blob/88f38ac6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
new file mode 100644
index 0000000..3bbc348
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
@@ -0,0 +1,461 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicReference;
+import javax.cache.processor.EntryProcessor;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.GridCacheAtomicFuture;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.CI1;
+import org.apache.ignite.internal.util.typedef.CI2;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteUuid;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ * DHT atomic cache backup update future.
+ */
+public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapter<Void>
+ implements GridCacheAtomicFuture<Void> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Logger. */
+ protected static IgniteLogger log;
+
+ /** Logger reference. */
+ private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
+
+ /** Logger. */
+ protected static IgniteLogger msgLog;
+
+ /** Write version. */
+ protected final GridCacheVersion writeVer;
+
+ /** Cache context. */
+ protected final GridCacheContext cctx;
+
+ /** Future version. */
+ protected final GridCacheVersion futVer;
+
+ /** Completion callback. */
+ @GridToStringExclude
+ private final CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> completionCb;
+
+ /** Update request. */
+ private final GridNearAtomicAbstractUpdateRequest updateReq;
+
+ /** Update response. */
+ final GridNearAtomicUpdateResponse updateRes;
+
+ /** Force transform backup flag. */
+ private boolean forceTransformBackups;
+
+ /** Mappings. */
+ @GridToStringInclude
+ protected Map<UUID, GridDhtAtomicUpdateRequest> mappings;
+
+ /** Continuous query closures. */
+ private Collection<CI1<Boolean>> cntQryClsrs;
+
+ /** */
+ private final boolean waitForExchange;
+
+ /** Response count. */
+ private volatile int resCnt;
+
+ /**
+ * @param cctx Cache context.
+ * @param completionCb Callback to invoke when future is completed.
+ * @param writeVer Write version.
+ * @param updateReq Update request.
+ * @param updateRes Update response.
+ */
+ protected GridDhtAtomicAbstractUpdateFuture(
+ GridCacheContext cctx,
+ CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> completionCb,
+ GridCacheVersion writeVer,
+ GridNearAtomicAbstractUpdateRequest updateReq,
+ GridNearAtomicUpdateResponse updateRes) {
+ this.cctx = cctx;
+
+ futVer = cctx.versions().next(updateReq.topologyVersion());
+ this.updateReq = updateReq;
+ this.completionCb = completionCb;
+ this.updateRes = updateRes;
+ this.writeVer = writeVer;
+
+ waitForExchange = !(updateReq.topologyLocked() || (updateReq.fastMap() && !updateReq.clientRequest()));
+
+ if (log == null) {
+ msgLog = cctx.shared().atomicMessageLogger();
+ log = U.logger(cctx.kernalContext(), logRef, GridDhtAtomicUpdateFuture.class);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public final IgniteInternalFuture<Void> completeFuture(AffinityTopologyVersion topVer) {
+ if (waitForExchange && updateReq.topologyVersion().compareTo(topVer) < 0)
+ return this;
+
+ return null;
+ }
+
+ /**
+ * @param clsr Continuous query closure.
+ */
+ public final void addContinuousQueryClosure(CI1<Boolean> clsr) {
+ assert !isDone() : this;
+
+ if (cntQryClsrs == null)
+ cntQryClsrs = new ArrayList<>(10);
+
+ cntQryClsrs.add(clsr);
+ }
+
+ /**
+ * @param entry Entry to map.
+ * @param val Value to write.
+ * @param entryProcessor Entry processor.
+ * @param ttl TTL (optional).
+ * @param conflictExpireTime Conflict expire time (optional).
+ * @param conflictVer Conflict version (optional).
+ * @param addPrevVal If {@code true} sends previous value to backups.
+ * @param prevVal Previous value.
+ * @param updateCntr Partition update counter.
+ */
+ @SuppressWarnings("ForLoopReplaceableByForEach")
+ final void addWriteEntry(GridDhtCacheEntry entry,
+ @Nullable CacheObject val,
+ EntryProcessor<Object, Object, Object> entryProcessor,
+ long ttl,
+ long conflictExpireTime,
+ @Nullable GridCacheVersion conflictVer,
+ boolean addPrevVal,
+ @Nullable CacheObject prevVal,
+ long updateCntr) {
+ AffinityTopologyVersion topVer = updateReq.topologyVersion();
+
+ List<ClusterNode> dhtNodes = cctx.dht().topology().nodes(entry.partition(), topVer);
+
+ if (log.isDebugEnabled())
+ log.debug("Mapping entry to DHT nodes [nodes=" + U.nodeIds(dhtNodes) + ", entry=" + entry + ']');
+
+ CacheWriteSynchronizationMode syncMode = updateReq.writeSynchronizationMode();
+
+ addDhtKey(entry.key(), dhtNodes);
+
+ for (int i = 0; i < dhtNodes.size(); i++) {
+ ClusterNode node = dhtNodes.get(i);
+
+ UUID nodeId = node.id();
+
+ if (!nodeId.equals(cctx.localNodeId())) {
+ GridDhtAtomicUpdateRequest updateReq = mappings.get(nodeId);
+
+ if (updateReq == null) {
+ updateReq = new GridDhtAtomicUpdateRequest(
+ cctx.cacheId(),
+ nodeId,
+ futVer,
+ writeVer,
+ syncMode,
+ topVer,
+ forceTransformBackups,
+ this.updateReq.subjectId(),
+ this.updateReq.taskNameHash(),
+ forceTransformBackups ? this.updateReq.invokeArguments() : null,
+ cctx.deploymentEnabled(),
+ this.updateReq.keepBinary(),
+ this.updateReq.skipStore());
+
+ mappings.put(nodeId, updateReq);
+ }
+
+ updateReq.addWriteValue(entry.key(),
+ val,
+ entryProcessor,
+ ttl,
+ conflictExpireTime,
+ conflictVer,
+ addPrevVal,
+ entry.partition(),
+ prevVal,
+ updateCntr);
+ }
+ }
+ }
+
+ /**
+ * @param key Key.
+ * @param dhtNodes DHT nodes.
+ */
+ protected abstract void addDhtKey(KeyCacheObject key, List<ClusterNode> dhtNodes);
+
+ /**
+ * @param key Key.
+ * @param readers Near cache readers.
+ */
+ protected abstract void addNearKey(KeyCacheObject key, Collection<UUID> readers);
+
+ /**
+ * @param readers Entry readers.
+ * @param entry Entry.
+ * @param val Value.
+ * @param entryProcessor Entry processor..
+ * @param ttl TTL for near cache update (optional).
+ * @param expireTime Expire time for near cache update (optional).
+ */
+ final void addNearWriteEntries(Collection<UUID> readers,
+ GridDhtCacheEntry entry,
+ @Nullable CacheObject val,
+ EntryProcessor<Object, Object, Object> entryProcessor,
+ long ttl,
+ long expireTime) {
+ CacheWriteSynchronizationMode syncMode = updateReq.writeSynchronizationMode();
+
+ addNearKey(entry.key(), readers);
+
+ AffinityTopologyVersion topVer = updateReq.topologyVersion();
+
+ for (UUID nodeId : readers) {
+ GridDhtAtomicUpdateRequest updateReq = mappings.get(nodeId);
+
+ if (updateReq == null) {
+ ClusterNode node = cctx.discovery().node(nodeId);
+
+ // Node left the grid.
+ if (node == null)
+ continue;
+
+ updateReq = new GridDhtAtomicUpdateRequest(
+ cctx.cacheId(),
+ nodeId,
+ futVer,
+ writeVer,
+ syncMode,
+ topVer,
+ forceTransformBackups,
+ this.updateReq.subjectId(),
+ this.updateReq.taskNameHash(),
+ forceTransformBackups ? this.updateReq.invokeArguments() : null,
+ cctx.deploymentEnabled(),
+ this.updateReq.keepBinary(),
+ this.updateReq.skipStore());
+
+ mappings.put(nodeId, updateReq);
+ }
+
+ addNearReaderEntry(entry);
+
+ updateReq.addNearWriteValue(entry.key(),
+ val,
+ entryProcessor,
+ ttl,
+ expireTime);
+ }
+ }
+
+ /**
+ * adds new nearReader.
+ *
+ * @param entry GridDhtCacheEntry.
+ */
+ protected abstract void addNearReaderEntry(GridDhtCacheEntry entry);
+
+ /**
+ * @return Write version.
+ */
+ final GridCacheVersion writeVersion() {
+ return writeVer;
+ }
+
+ /** {@inheritDoc} */
+ @Override public final IgniteUuid futureId() {
+ return futVer.asGridUuid();
+ }
+
+ /** {@inheritDoc} */
+ @Override public final GridCacheVersion version() {
+ return futVer;
+ }
+
+ /** {@inheritDoc} */
+ @Override public final boolean onNodeLeft(UUID nodeId) {
+ boolean res = registerResponse(nodeId);
+
+ if (res && msgLog.isDebugEnabled()) {
+ msgLog.debug("DTH update fut, node left [futId=" + futVer + ", writeVer=" + writeVer +
+ ", node=" + nodeId + ']');
+ }
+
+ return res;
+ }
+
+ /**
+ * @param nodeId Node ID.
+ * @return {@code True} if request found.
+ */
+ final boolean registerResponse(UUID nodeId) {
+ int resCnt0;
+
+ GridDhtAtomicUpdateRequest req = mappings != null ? mappings.get(nodeId) : null;
+
+ if (req != null) {
+ synchronized (this) {
+ if (req.onResponse()) {
+ resCnt0 = resCnt;
+
+ resCnt0 += 1;
+
+ resCnt = resCnt0;
+ }
+ else
+ return false;
+ }
+
+ if (resCnt0 == mappings.size())
+ onDone();
+
+ return true;
+ }
+
+ return false;
+ }
+
+ /**
+ * Sends requests to remote nodes.
+ */
+ final void map() {
+ if (!F.isEmpty(mappings)) {
+ for (GridDhtAtomicUpdateRequest req : mappings.values()) {
+ try {
+ cctx.io().send(req.nodeId(), req, cctx.ioPolicy());
+
+ if (msgLog.isDebugEnabled()) {
+ msgLog.debug("DTH update fut, sent request [futId=" + futVer +
+ ", writeVer=" + writeVer + ", node=" + req.nodeId() + ']');
+ }
+ }
+ catch (ClusterTopologyCheckedException ignored) {
+ if (msgLog.isDebugEnabled()) {
+ msgLog.debug("DTH update fut, failed to send request, node left [futId=" + futVer +
+ ", writeVer=" + writeVer + ", node=" + req.nodeId() + ']');
+ }
+
+ registerResponse(req.nodeId());
+ }
+ catch (IgniteCheckedException e) {
+ U.error(msgLog, "Failed to send request [futId=" + futVer +
+ ", writeVer=" + writeVer + ", node=" + req.nodeId() + ']');
+
+ registerResponse(req.nodeId());
+ }
+ }
+ }
+ else
+ onDone();
+
+ // Send response right away if no ACKs from backup is required.
+ // Backups will send ACKs anyway, future will be completed after all backups have replied.
+ if (updateReq.writeSynchronizationMode() != FULL_SYNC)
+ completionCb.apply(updateReq, updateRes);
+ }
+
+ /**
+ * Deferred update response.
+ *
+ * @param nodeId Backup node ID.
+ */
+ public final void onResult(UUID nodeId) {
+ if (log.isDebugEnabled())
+ log.debug("Received deferred DHT atomic update future result [nodeId=" + nodeId + ']');
+
+ registerResponse(nodeId);
+ }
+
+ /**
+ * Callback for backup update response.
+ *
+ * @param nodeId Backup node ID.
+ * @param updateRes Update response.
+ */
+ public abstract void onResult(UUID nodeId, GridDhtAtomicUpdateResponse updateRes);
+
+ /**
+ * @param updateRes Response.
+ * @param err Error.
+ */
+ protected abstract void addFailedKeys(GridNearAtomicUpdateResponse updateRes, Throwable err);
+
+ /** {@inheritDoc} */
+ @Override public final boolean onDone(@Nullable Void res, @Nullable Throwable err) {
+ if (super.onDone(res, err)) {
+ cctx.mvcc().removeAtomicFuture(version());
+
+ boolean suc = err == null;
+
+ if (!suc)
+ addFailedKeys(updateRes, err);
+
+ if (cntQryClsrs != null) {
+ for (CI1<Boolean> clsr : cntQryClsrs)
+ clsr.apply(suc);
+ }
+
+ if (updateReq.writeSynchronizationMode() == FULL_SYNC)
+ completionCb.apply(updateReq, updateRes);
+
+ return true;
+ }
+
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean trackable() {
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void markNotTrackable() {
+ // No-op.
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/88f38ac6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index f7d1973..d7eb062 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -1685,7 +1685,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
assert !req.returnValue() || (req.operation() == TRANSFORM || req.size() == 1);
- GridDhtAtomicUpdateFuture dhtFut = null;
+ GridDhtAtomicAbstractUpdateFuture dhtFut = null;
boolean remap = false;
@@ -1908,7 +1908,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
final GridNearAtomicUpdateResponse res,
final List<GridDhtCacheEntry> locked,
final GridCacheVersion ver,
- @Nullable GridDhtAtomicUpdateFuture dhtFut,
+ @Nullable GridDhtAtomicAbstractUpdateFuture dhtFut,
final CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> completionCb,
final boolean replicate,
final String taskName,
@@ -2331,7 +2331,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
GridNearAtomicUpdateResponse res,
List<GridDhtCacheEntry> locked,
GridCacheVersion ver,
- @Nullable GridDhtAtomicUpdateFuture dhtFut,
+ @Nullable GridDhtAtomicAbstractUpdateFuture dhtFut,
CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> completionCb,
boolean replicate,
String taskName,
@@ -2552,7 +2552,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
* @return Deleted entries.
*/
@SuppressWarnings("ForLoopReplaceableByForEach")
- @Nullable private GridDhtAtomicUpdateFuture updatePartialBatch(
+ @Nullable private GridDhtAtomicAbstractUpdateFuture updatePartialBatch(
final boolean hasNear,
final int firstEntryIdx,
final List<GridDhtCacheEntry> entries,
@@ -2562,7 +2562,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
@Nullable final Map<KeyCacheObject, CacheObject> putMap,
@Nullable final Collection<KeyCacheObject> rmvKeys,
@Nullable final Map<KeyCacheObject, EntryProcessor<Object, Object, Object>> entryProcessorMap,
- @Nullable GridDhtAtomicUpdateFuture dhtFut,
+ @Nullable GridDhtAtomicAbstractUpdateFuture dhtFut,
final CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> completionCb,
final GridNearAtomicAbstractUpdateRequest req,
final GridNearAtomicUpdateResponse res,
@@ -3036,7 +3036,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
* @param force If {@code true} then creates future without optimizations checks.
* @return Backup update future or {@code null} if there are no backups.
*/
- @Nullable private GridDhtAtomicUpdateFuture createDhtFuture(
+ @Nullable private GridDhtAtomicAbstractUpdateFuture createDhtFuture(
GridCacheVersion writeVer,
GridNearAtomicAbstractUpdateRequest updateReq,
GridNearAtomicUpdateResponse updateRes,
@@ -3064,7 +3064,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
}
}
- return new GridDhtAtomicUpdateFuture(ctx, completionCb, writeVer, updateReq, updateRes);
+ if (updateReq.size() == 1)
+ return new GridDhtAtomicSingleUpdateFuture(ctx, completionCb, writeVer, updateReq, updateRes);
+ else
+ return new GridDhtAtomicUpdateFuture(ctx, completionCb, writeVer, updateReq, updateRes);
}
/**
@@ -3256,7 +3259,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
*/
@SuppressWarnings("unchecked")
private void processDhtAtomicUpdateResponse(UUID nodeId, GridDhtAtomicUpdateResponse res) {
- GridDhtAtomicUpdateFuture updateFut = (GridDhtAtomicUpdateFuture)ctx.mvcc().atomicFuture(res.futureVersion());
+ GridDhtAtomicAbstractUpdateFuture updateFut = (GridDhtAtomicAbstractUpdateFuture)ctx.mvcc().atomicFuture(res.futureVersion());
if (updateFut != null) {
if (msgLog.isDebugEnabled()) {
@@ -3279,7 +3282,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
@SuppressWarnings("unchecked")
private void processDhtAtomicDeferredUpdateResponse(UUID nodeId, GridDhtAtomicDeferredUpdateResponse res) {
for (GridCacheVersion ver : res.futureVersions()) {
- GridDhtAtomicUpdateFuture updateFut = (GridDhtAtomicUpdateFuture)ctx.mvcc().atomicFuture(ver);
+ GridDhtAtomicAbstractUpdateFuture updateFut = (GridDhtAtomicAbstractUpdateFuture)ctx.mvcc().atomicFuture(ver);
if (updateFut != null) {
if (msgLog.isDebugEnabled()) {
@@ -3335,7 +3338,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
private final Collection<IgniteBiTuple<GridDhtCacheEntry, GridCacheVersion>> deleted;
/** */
- private final GridDhtAtomicUpdateFuture dhtFut;
+ private final GridDhtAtomicAbstractUpdateFuture dhtFut;
/**
* @param retVal Return value.
@@ -3344,7 +3347,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
*/
private UpdateSingleResult(GridCacheReturn retVal,
Collection<IgniteBiTuple<GridDhtCacheEntry, GridCacheVersion>> deleted,
- GridDhtAtomicUpdateFuture dhtFut) {
+ GridDhtAtomicAbstractUpdateFuture dhtFut) {
this.retVal = retVal;
this.deleted = deleted;
this.dhtFut = dhtFut;
@@ -3367,7 +3370,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
/**
* @return DHT future.
*/
- public GridDhtAtomicUpdateFuture dhtFuture() {
+ public GridDhtAtomicAbstractUpdateFuture dhtFuture() {
return dhtFut;
}
}
@@ -3380,7 +3383,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
private Collection<IgniteBiTuple<GridDhtCacheEntry, GridCacheVersion>> deleted;
/** */
- private GridDhtAtomicUpdateFuture dhtFut;
+ private GridDhtAtomicAbstractUpdateFuture dhtFut;
/** */
private boolean readersOnly;
@@ -3414,7 +3417,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
/**
* @return DHT future.
*/
- public GridDhtAtomicUpdateFuture dhtFuture() {
+ public GridDhtAtomicAbstractUpdateFuture dhtFuture() {
return dhtFut;
}
@@ -3435,7 +3438,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
/**
* @param dhtFut DHT future.
*/
- private void dhtFuture(@Nullable GridDhtAtomicUpdateFuture dhtFut) {
+ private void dhtFuture(@Nullable GridDhtAtomicAbstractUpdateFuture dhtFut) {
this.dhtFut = dhtFut;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/88f38ac6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
new file mode 100644
index 0000000..f83a7b7
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.UUID;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.typedef.CI2;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**
+ *
+ */
+class GridDhtAtomicSingleUpdateFuture extends GridDhtAtomicAbstractUpdateFuture {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Future keys. */
+ private KeyCacheObject key;
+
+ /** Entries with readers. */
+ private GridDhtCacheEntry nearReaderEntry;
+
+ /**
+ * @param cctx Cache context.
+ * @param completionCb Callback to invoke when future is completed.
+ * @param writeVer Write version.
+ * @param updateReq Update request.
+ * @param updateRes Update response.
+ */
+ GridDhtAtomicSingleUpdateFuture(
+ GridCacheContext cctx,
+ CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> completionCb,
+ GridCacheVersion writeVer,
+ GridNearAtomicAbstractUpdateRequest updateReq,
+ GridNearAtomicUpdateResponse updateRes
+ ) {
+ super(cctx, completionCb, writeVer, updateReq, updateRes);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void addDhtKey(KeyCacheObject key, List<ClusterNode> dhtNodes) {
+ assert this.key == null || this.key.equals(key) : this.key;
+
+ if (mappings == null)
+ mappings = U.newHashMap(dhtNodes.size());
+
+ this.key = key;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void addNearKey(KeyCacheObject key, Collection<UUID> readers) {
+ assert this.key == null || this.key.equals(key) : this.key;
+
+ if (mappings == null)
+ mappings = U.newHashMap(readers.size());
+
+ this.key = key;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void addNearReaderEntry(GridDhtCacheEntry entry) {
+ nearReaderEntry = entry;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onResult(UUID nodeId, GridDhtAtomicUpdateResponse updateRes) {
+ if (log.isDebugEnabled())
+ log.debug("Received DHT atomic update future result [nodeId=" + nodeId + ", updateRes=" + updateRes + ']');
+
+ if (updateRes.error() != null)
+ this.updateRes.addFailedKeys(updateRes.failedKeys(), updateRes.error());
+
+ if (!F.isEmpty(updateRes.nearEvicted())) {
+ try {
+ assert nearReaderEntry != null;
+
+ nearReaderEntry.removeReader(nodeId, updateRes.messageId());
+ }
+ catch (GridCacheEntryRemovedException e) {
+ if (log.isDebugEnabled())
+ log.debug("Entry with evicted reader was removed [entry=" + nearReaderEntry + ", err=" + e + ']');
+ }
+ }
+
+ registerResponse(nodeId);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void addFailedKeys(GridNearAtomicUpdateResponse updateRes, Throwable err) {
+ updateRes.addFailedKey(key, err);
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridDhtAtomicSingleUpdateFuture.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/88f38ac6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
index c2ad8b8..864aadd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
@@ -23,92 +23,30 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
-import java.util.concurrent.atomic.AtomicReference;
-import javax.cache.processor.EntryProcessor;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.CacheObject;
-import org.apache.ignite.internal.processors.cache.GridCacheAtomicFuture;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.internal.util.future.GridFutureAdapter;
-import org.apache.ignite.internal.util.tostring.GridToStringExclude;
-import org.apache.ignite.internal.util.tostring.GridToStringInclude;
-import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.CI2;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteUuid;
-import org.jetbrains.annotations.Nullable;
-
-import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
/**
* DHT atomic cache backup update future.
*/
-public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
- implements GridCacheAtomicFuture<Void> {
+class GridDhtAtomicUpdateFuture extends GridDhtAtomicAbstractUpdateFuture {
/** */
private static final long serialVersionUID = 0L;
- /** Logger reference. */
- private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
-
- /** Logger. */
- private static IgniteLogger log;
-
- /** Logger. */
- private static IgniteLogger msgLog;
-
- /** Cache context. */
- private final GridCacheContext cctx;
-
- /** Future version. */
- private final GridCacheVersion futVer;
-
- /** Write version. */
- private final GridCacheVersion writeVer;
-
- /** Force transform backup flag. */
- private boolean forceTransformBackups;
-
- /** Completion callback. */
- @GridToStringExclude
- private final CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> completionCb;
-
- /** Mappings. */
- @GridToStringInclude
- private final Map<UUID, GridDhtAtomicUpdateRequest> mappings;
-
- /** Entries with readers. */
- private Map<KeyCacheObject, GridDhtCacheEntry> nearReadersEntries;
-
- /** Update request. */
- private final GridNearAtomicAbstractUpdateRequest updateReq;
-
- /** Update response. */
- private final GridNearAtomicUpdateResponse updateRes;
-
/** Future keys. */
private final Collection<KeyCacheObject> keys;
- /** Continuous query closures. */
- private Collection<CI1<Boolean>> cntQryClsrs;
-
- /** */
- private final boolean waitForExchange;
+ /** Entries with readers. */
+ private Map<KeyCacheObject, GridDhtCacheEntry> nearReadersEntries;
- /** Response count. */
- private volatile int resCnt;
/**
* @param cctx Cache context.
@@ -117,328 +55,39 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
* @param updateReq Update request.
* @param updateRes Update response.
*/
- public GridDhtAtomicUpdateFuture(
+ GridDhtAtomicUpdateFuture(
GridCacheContext cctx,
CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> completionCb,
GridCacheVersion writeVer,
GridNearAtomicAbstractUpdateRequest updateReq,
GridNearAtomicUpdateResponse updateRes
) {
- this.cctx = cctx;
- this.writeVer = writeVer;
-
- futVer = cctx.versions().next(updateReq.topologyVersion());
- this.updateReq = updateReq;
- this.completionCb = completionCb;
- this.updateRes = updateRes;
-
- if (log == null) {
- msgLog = cctx.shared().atomicMessageLogger();
- log = U.logger(cctx.kernalContext(), logRef, GridDhtAtomicUpdateFuture.class);
- }
+ super(cctx, completionCb, writeVer, updateReq, updateRes);
keys = new ArrayList<>(updateReq.size());
mappings = U.newHashMap(updateReq.size());
-
- waitForExchange = !(updateReq.topologyLocked() || (updateReq.fastMap() && !updateReq.clientRequest()));
- }
-
- /**
- * @return Write version.
- */
- GridCacheVersion writeVersion() {
- return writeVer;
- }
-
- /** {@inheritDoc} */
- @Override public IgniteUuid futureId() {
- return futVer.asGridUuid();
}
/** {@inheritDoc} */
- @Override public GridCacheVersion version() {
- return futVer;
+ @Override protected void addDhtKey(KeyCacheObject key, List<ClusterNode> dhtNodes) {
+ keys.add(key);
}
/** {@inheritDoc} */
- @Override public boolean onNodeLeft(UUID nodeId) {
- boolean res = registerResponse(nodeId);
-
- if (res && msgLog.isDebugEnabled()) {
- msgLog.debug("DTH update fut, node left [futId=" + futVer + ", writeVer=" + writeVer +
- ", node=" + nodeId + ']');
- }
-
- return res;
- }
-
- /**
- * @param nodeId Node ID.
- * @return {@code True} if request found.
- */
- private boolean registerResponse(UUID nodeId) {
- int resCnt0;
-
- GridDhtAtomicUpdateRequest req = mappings.get(nodeId);
-
- if (req != null) {
- synchronized (this) {
- if (req.onResponse()) {
- resCnt0 = resCnt;
-
- resCnt0 += 1;
-
- resCnt = resCnt0;
- }
- else
- return false;
- }
-
- if (resCnt0 == mappings.size())
- onDone();
-
- return true;
- }
-
- return false;
+ @Override protected void addNearKey(KeyCacheObject key, Collection<UUID> readers) {
+ keys.add(key);
}
/** {@inheritDoc} */
- @Override public boolean trackable() {
- return true;
- }
+ @Override protected void addNearReaderEntry(GridDhtCacheEntry entry) {
+ if (nearReadersEntries == null)
+ nearReadersEntries = new HashMap<>();
- /** {@inheritDoc} */
- @Override public void markNotTrackable() {
- // No-op.
+ nearReadersEntries.put(entry.key(), entry);
}
/** {@inheritDoc} */
- @Override public IgniteInternalFuture<Void> completeFuture(AffinityTopologyVersion topVer) {
- if (waitForExchange && updateReq.topologyVersion().compareTo(topVer) < 0)
- return this;
-
- return null;
- }
-
- /**
- * @param entry Entry to map.
- * @param val Value to write.
- * @param entryProcessor Entry processor.
- * @param ttl TTL (optional).
- * @param conflictExpireTime Conflict expire time (optional).
- * @param conflictVer Conflict version (optional).
- * @param addPrevVal If {@code true} sends previous value to backups.
- * @param prevVal Previous value.
- * @param updateCntr Partition update counter.
- */
- @SuppressWarnings("ForLoopReplaceableByForEach")
- public void addWriteEntry(GridDhtCacheEntry entry,
- @Nullable CacheObject val,
- EntryProcessor<Object, Object, Object> entryProcessor,
- long ttl,
- long conflictExpireTime,
- @Nullable GridCacheVersion conflictVer,
- boolean addPrevVal,
- @Nullable CacheObject prevVal,
- long updateCntr) {
- AffinityTopologyVersion topVer = updateReq.topologyVersion();
-
- List<ClusterNode> dhtNodes = cctx.dht().topology().nodes(entry.partition(), topVer);
-
- if (log.isDebugEnabled())
- log.debug("Mapping entry to DHT nodes [nodes=" + U.nodeIds(dhtNodes) + ", entry=" + entry + ']');
-
- CacheWriteSynchronizationMode syncMode = updateReq.writeSynchronizationMode();
-
- keys.add(entry.key());
-
- for (int i = 0; i < dhtNodes.size(); i++) {
- ClusterNode node = dhtNodes.get(i);
-
- UUID nodeId = node.id();
-
- if (!nodeId.equals(cctx.localNodeId())) {
- GridDhtAtomicUpdateRequest updateReq = mappings.get(nodeId);
-
- if (updateReq == null) {
- updateReq = new GridDhtAtomicUpdateRequest(
- cctx.cacheId(),
- nodeId,
- futVer,
- writeVer,
- syncMode,
- topVer,
- forceTransformBackups,
- this.updateReq.subjectId(),
- this.updateReq.taskNameHash(),
- forceTransformBackups ? this.updateReq.invokeArguments() : null,
- cctx.deploymentEnabled(),
- this.updateReq.keepBinary(),
- this.updateReq.skipStore());
-
- mappings.put(nodeId, updateReq);
- }
-
- updateReq.addWriteValue(entry.key(),
- val,
- entryProcessor,
- ttl,
- conflictExpireTime,
- conflictVer,
- addPrevVal,
- entry.partition(),
- prevVal,
- updateCntr);
- }
- }
- }
-
- /**
- * @param readers Entry readers.
- * @param entry Entry.
- * @param val Value.
- * @param entryProcessor Entry processor..
- * @param ttl TTL for near cache update (optional).
- * @param expireTime Expire time for near cache update (optional).
- */
- public void addNearWriteEntries(Iterable<UUID> readers,
- GridDhtCacheEntry entry,
- @Nullable CacheObject val,
- EntryProcessor<Object, Object, Object> entryProcessor,
- long ttl,
- long expireTime) {
- CacheWriteSynchronizationMode syncMode = updateReq.writeSynchronizationMode();
-
- keys.add(entry.key());
-
- AffinityTopologyVersion topVer = updateReq.topologyVersion();
-
- for (UUID nodeId : readers) {
- GridDhtAtomicUpdateRequest updateReq = mappings.get(nodeId);
-
- if (updateReq == null) {
- ClusterNode node = cctx.discovery().node(nodeId);
-
- // Node left the grid.
- if (node == null)
- continue;
-
- updateReq = new GridDhtAtomicUpdateRequest(
- cctx.cacheId(),
- nodeId,
- futVer,
- writeVer,
- syncMode,
- topVer,
- forceTransformBackups,
- this.updateReq.subjectId(),
- this.updateReq.taskNameHash(),
- forceTransformBackups ? this.updateReq.invokeArguments() : null,
- cctx.deploymentEnabled(),
- this.updateReq.keepBinary(),
- this.updateReq.skipStore());
-
- mappings.put(nodeId, updateReq);
- }
-
- if (nearReadersEntries == null)
- nearReadersEntries = new HashMap<>();
-
- nearReadersEntries.put(entry.key(), entry);
-
- updateReq.addNearWriteValue(entry.key(),
- val,
- entryProcessor,
- ttl,
- expireTime);
- }
- }
-
- /**
- * @param clsr Continuous query closure.
- */
- public void addContinuousQueryClosure(CI1<Boolean> clsr){
- assert !isDone() : this;
-
- if (cntQryClsrs == null)
- cntQryClsrs = new ArrayList<>(10);
-
- cntQryClsrs.add(clsr);
- }
-
- /** {@inheritDoc} */
- @Override public boolean onDone(@Nullable Void res, @Nullable Throwable err) {
- if (super.onDone(res, err)) {
- cctx.mvcc().removeAtomicFuture(version());
-
- boolean suc = err == null;
-
- if (!suc) {
- for (KeyCacheObject key : keys)
- updateRes.addFailedKey(key, err);
- }
-
- if (cntQryClsrs != null) {
- for (CI1<Boolean> clsr : cntQryClsrs)
- clsr.apply(suc);
- }
-
- if (updateReq.writeSynchronizationMode() == FULL_SYNC)
- completionCb.apply(updateReq, updateRes);
-
- return true;
- }
-
- return false;
- }
-
- /**
- * Sends requests to remote nodes.
- */
- public void map() {
- if (!mappings.isEmpty()) {
- for (GridDhtAtomicUpdateRequest req : mappings.values()) {
- try {
- cctx.io().send(req.nodeId(), req, cctx.ioPolicy());
-
- if (msgLog.isDebugEnabled()) {
- msgLog.debug("DTH update fut, sent request [futId=" + futVer +
- ", writeVer=" + writeVer + ", node=" + req.nodeId() + ']');
- }
- }
- catch (ClusterTopologyCheckedException ignored) {
- if (msgLog.isDebugEnabled()) {
- msgLog.debug("DTH update fut, failed to send request, node left [futId=" + futVer +
- ", writeVer=" + writeVer + ", node=" + req.nodeId() + ']');
- }
-
- registerResponse(req.nodeId());
- }
- catch (IgniteCheckedException e) {
- U.error(msgLog, "Failed to send request [futId=" + futVer +
- ", writeVer=" + writeVer + ", node=" + req.nodeId() + ']');
-
- registerResponse(req.nodeId());
- }
- }
- }
- else
- onDone();
-
- // Send response right away if no ACKs from backup is required.
- // Backups will send ACKs anyway, future will be completed after all backups have replied.
- if (updateReq.writeSynchronizationMode() != FULL_SYNC)
- completionCb.apply(updateReq, updateRes);
- }
-
- /**
- * Callback for backup update response.
- *
- * @param nodeId Backup node ID.
- * @param updateRes Update response.
- */
- public void onResult(UUID nodeId, GridDhtAtomicUpdateResponse updateRes) {
+ @Override public void onResult(UUID nodeId, GridDhtAtomicUpdateResponse updateRes) {
if (log.isDebugEnabled())
log.debug("Received DHT atomic update future result [nodeId=" + nodeId + ", updateRes=" + updateRes + ']');
@@ -462,16 +111,10 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
registerResponse(nodeId);
}
- /**
- * Deferred update response.
- *
- * @param nodeId Backup node ID.
- */
- public void onResult(UUID nodeId) {
- if (log.isDebugEnabled())
- log.debug("Received deferred DHT atomic update future result [nodeId=" + nodeId + ']');
-
- registerResponse(nodeId);
+ /** {@inheritDoc} */
+ @Override protected void addFailedKeys(GridNearAtomicUpdateResponse updateRes, Throwable err) {
+ for (KeyCacheObject key : keys)
+ updateRes.addFailedKey(key, err);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/88f38ac6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
index e2314f8..b733d7b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
@@ -487,44 +487,32 @@ public class GridNearAtomicFullUpdateRequest extends GridNearAtomicAbstractUpdat
return invokeArgs;
}
- /**
- * @return Flag indicating whether this is fast-map udpate.
- */
+ /** {@inheritDoc} */
@Override public boolean fastMap() {
return fastMap;
}
- /**
- * @return Topology locked flag.
- */
+ /** {@inheritDoc} */
@Override public boolean topologyLocked() {
return topLocked;
}
- /**
- * @return {@code True} if request sent from client node.
- */
+ /** {@inheritDoc} */
@Override public boolean clientRequest() {
return clientReq;
}
- /**
- * @return Return value flag.
- */
+ /** {@inheritDoc} */
@Override public boolean returnValue() {
return retval;
}
- /**
- * @return Skip write-through to a persistent storage.
- */
+ /** {@inheritDoc} */
@Override public boolean skipStore() {
return skipStore;
}
- /**
- * @return Keep binary flag.
- */
+ /** {@inheritDoc} */
@Override public boolean keepBinary() {
return keepBinary;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/88f38ac6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
index eaf2f2c..bd231cf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
@@ -667,9 +667,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
* @return {@code True} can use 'single' update requests.
*/
private boolean canUseSingleRequest(ClusterNode node) {
- assert node != null;
-
- return expiryPlc == null && node.version().compareToIgnoreTimestamp(SINGLE_UPDATE_REQUEST) >= 0;
+ return expiryPlc == null && node != null && node.version().compareToIgnoreTimestamp(SINGLE_UPDATE_REQUEST) >= 0;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/88f38ac6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index 304d031..10784fc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -59,7 +59,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheDeploymentManager;
-import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateFuture;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateFuture;
import org.apache.ignite.internal.processors.cache.query.CacheQueryType;
import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryManager.JCacheQueryLocalListener;
import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryManager.JCacheQueryRemoteFilter;
@@ -382,7 +382,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
@Override public void onEntryUpdated(final CacheContinuousQueryEvent<K, V> evt,
boolean primary,
final boolean recordIgniteEvt,
- GridDhtAtomicUpdateFuture fut) {
+ GridDhtAtomicAbstractUpdateFuture fut) {
if (ignoreExpired && evt.getEventType() == EventType.EXPIRED)
return ;
http://git-wip-us.apache.org/repos/asf/ignite/blob/88f38ac6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
index 8eca81c..84b22f9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
@@ -20,7 +20,7 @@ package org.apache.ignite.internal.processors.cache.query.continuous;
import java.util.Map;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateFuture;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateFuture;
import org.jetbrains.annotations.Nullable;
/**
@@ -41,7 +41,7 @@ public interface CacheContinuousQueryListener<K, V> {
* @param fut Dht atomic future.
*/
public void onEntryUpdated(CacheContinuousQueryEvent<K, V> evt, boolean primary,
- boolean recordIgniteEvt, @Nullable GridDhtAtomicUpdateFuture fut);
+ boolean recordIgniteEvt, @Nullable GridDhtAtomicAbstractUpdateFuture fut);
/**
* Listener unregistered callback.
http://git-wip-us.apache.org/repos/asf/ignite/blob/88f38ac6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index 2863f3d..e2fbf52 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -56,7 +56,7 @@ import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.GridCacheManagerAdapter;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
-import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateFuture;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateFuture;
import org.apache.ignite.internal.processors.continuous.GridContinuousHandler;
import org.apache.ignite.internal.processors.continuous.GridContinuousProcessor;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
@@ -245,7 +245,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
boolean primary,
boolean preload,
long updateCntr,
- @Nullable GridDhtAtomicUpdateFuture fut,
+ @Nullable GridDhtAtomicAbstractUpdateFuture fut,
AffinityTopologyVersion topVer
) throws IgniteCheckedException {
Map<UUID, CacheContinuousQueryListener> lsnrCol = updateListeners(internal, preload);
@@ -290,7 +290,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
boolean primary,
boolean preload,
long updateCntr,
- @Nullable GridDhtAtomicUpdateFuture fut,
+ @Nullable GridDhtAtomicAbstractUpdateFuture fut,
AffinityTopologyVersion topVer)
throws IgniteCheckedException
{
http://git-wip-us.apache.org/repos/asf/ignite/blob/88f38ac6/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
index bf543cb..396629a 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
@@ -26,7 +26,7 @@ import javax.cache.processor.EntryProcessorResult;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cache.eviction.EvictableEntry;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateFuture;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateFuture;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -542,7 +542,7 @@ public class GridCacheTestEntryEx extends GridMetadataAwareAdapter implements Gr
String taskName,
@Nullable CacheObject prevVal,
@Nullable Long updateCntr,
- @Nullable GridDhtAtomicUpdateFuture fut) throws IgniteCheckedException,
+ @Nullable GridDhtAtomicAbstractUpdateFuture fut) throws IgniteCheckedException,
GridCacheEntryRemovedException {
assert false;