You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/02/20 11:33:48 UTC
[5/5] ignite git commit: ignite-4705
ignite-4705
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7b979880
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7b979880
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7b979880
Branch: refs/heads/ignite-4705
Commit: 7b979880cd1f987d9e707627339f3cb8890ceb30
Parents: 743f8b4
Author: sboikov <sb...@gridgain.com>
Authored: Mon Feb 20 12:43:47 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon Feb 20 14:33:40 2017 +0300
----------------------------------------------------------------------
.../ignite/codegen/MessageCodeGenerator.java | 2 +
.../apache/ignite/internal/IgniteKernal.java | 1 +
.../communication/GridIoMessageFactory.java | 6 +
.../processors/cache/GridCacheIoManager.java | 3 -
.../processors/cache/GridCacheMessage.java | 2 +-
.../processors/cache/GridCacheMvccManager.java | 9 +-
.../GridDhtAtomicAbstractUpdateFuture.java | 31 ++-
.../dht/atomic/GridDhtAtomicCache.java | 76 ++++++--
.../GridDhtAtomicDeferredUpdateResponse.java | 6 +
.../atomic/GridDhtAtomicSingleUpdateFuture.java | 3 +-
.../dht/atomic/GridDhtAtomicUpdateFuture.java | 3 +-
.../GridNearAtomicAbstractUpdateFuture.java | 6 +-
.../atomic/GridNearAtomicMappingResponse.java | 193 +++++++++++++++++++
.../GridNearAtomicSingleUpdateFuture.java | 34 +++-
.../dht/atomic/GridNearAtomicUpdateFuture.java | 15 +-
15 files changed, 352 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/7b979880/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
----------------------------------------------------------------------
diff --git a/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java b/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
index 6636bf2..50876dd 100644
--- a/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
+++ b/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
@@ -53,6 +53,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDh
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractSingleUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicFullUpdateRequest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicMappingResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateFilterRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateInvokeRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateRequest;
@@ -196,6 +197,7 @@ public class MessageCodeGenerator {
gen.generateAndWrite(GridNearAtomicSingleUpdateInvokeRequest.class);
gen.generateAndWrite(GridNearAtomicSingleUpdateRequest.class);
gen.generateAndWrite(GridNearAtomicUpdateResponse.class);
+ gen.generateAndWrite(GridNearAtomicMappingResponse.class);
// gen.generateAndWrite(GridMessageCollection.class);
// gen.generateAndWrite(DataStreamerEntry.class);
http://git-wip-us.apache.org/repos/asf/ignite/blob/7b979880/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 750c316..9f01615 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -1151,6 +1151,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
"Metrics for local node (to disable set 'metricsLogFrequency' to 0)" + NL +
" ^-- Node [id=" + id + ", name=" + name() + ", uptime=" + getUpTimeFormatted() + "]" + NL +
" ^-- H/N/C [hosts=" + hosts + ", nodes=" + nodes + ", CPUs=" + cpus + "]" + NL +
+ " ^-- Futures [futs=" + ctx.cache().context().mvcc().atomicFuturesCount() + "]" + NL +
" ^-- CPU [cur=" + dblFmt.format(cpuLoadPct) + "%, avg=" +
dblFmt.format(avgCpuLoadPct) + "%, GC=" + dblFmt.format(gcPct) + "%]" + NL +
" ^-- Heap [used=" + dblFmt.format(heapUsedInMBytes) + "MB, free=" +
http://git-wip-us.apache.org/repos/asf/ignite/blob/7b979880/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 5ed46ff..908d1d6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -72,6 +72,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDh
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicFullUpdateRequest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicMappingResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateFilterRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateInvokeRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateRequest;
@@ -175,6 +176,11 @@ public class GridIoMessageFactory implements MessageFactory {
Message msg = null;
switch (type) {
+ case -47:
+ msg = new GridNearAtomicMappingResponse();
+
+ break;
+
case -46:
msg = new UpdateErrors();
http://git-wip-us.apache.org/repos/asf/ignite/blob/7b979880/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index 0f7371d..4d3ab1e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -362,9 +362,6 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
if (depEnabled)
cctx.deploy().ignoreOwnership(true);
- if (!cacheMsg.partitionExchangeMessage())
- log.info("Cache message: " + cacheMsg);
-
unmarshall(nodeId, cacheMsg);
if (cacheMsg.classError() != null)
http://git-wip-us.apache.org/repos/asf/ignite/blob/7b979880/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
index 3ec5323..f3acf66 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
@@ -50,7 +50,7 @@ public abstract class GridCacheMessage implements Message {
private static final long serialVersionUID = 0L;
/** Maximum number of cache lookup indexes. */
- public static final int MAX_CACHE_MSG_LOOKUP_INDEX = 6;
+ public static final int MAX_CACHE_MSG_LOOKUP_INDEX = 7;
/** Cache message index field name. */
public static final String CACHE_MSG_INDEX_FIELD_NAME = "CACHE_MSG_IDX";
http://git-wip-us.apache.org/repos/asf/ignite/blob/7b979880/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
index e8a5c8d..7f9f18c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
@@ -107,7 +107,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
private final ConcurrentMap<GridCacheVersion, Collection<GridCacheMvccFuture<?>>> mvccFuts = newMap();
/** Pending atomic futures. */
- private final ConcurrentMap<Long, GridCacheAtomicFuture<?>> atomicFuts = new ConcurrentHashMap8<>();
+ private final ConcurrentHashMap8<Long, GridCacheAtomicFuture<?>> atomicFuts = new ConcurrentHashMap8<>();
/** Pending data streamer futures. */
private final GridConcurrentHashSet<DataStreamerFuture> dataStreamerFuts = new GridConcurrentHashSet<>();
@@ -453,6 +453,13 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
}
/**
+ * @return Collection of pending atomic futures.
+ */
+ public int atomicFuturesCount() {
+ return atomicFuts.size();
+ }
+
+ /**
* @return Collection of pending data streamer futures.
*/
public Collection<DataStreamerFuture> dataStreamerFutures() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/7b979880/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
index da6616b..b512bdc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@@ -81,7 +82,7 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
/** Completion callback. */
@GridToStringExclude
- private final CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> completionCb;
+ private final GridDhtAtomicCache.UpdateReplyClosure completionCb;
/** Update request. */
protected final GridNearAtomicAbstractUpdateRequest updateReq;
@@ -108,7 +109,7 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
*/
protected GridDhtAtomicAbstractUpdateFuture(
GridCacheContext cctx,
- CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> completionCb,
+ GridDhtAtomicCache.UpdateReplyClosure completionCb,
GridCacheVersion writeVer,
GridNearAtomicAbstractUpdateRequest updateReq,
GridNearAtomicUpdateResponse updateRes) {
@@ -367,9 +368,13 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
List<UUID> dhtNodes = null;
if (fullSync) {
- dhtNodes = new ArrayList<>(mappings.size());
+ if (!F.isEmpty(mappings)) {
+ dhtNodes = new ArrayList<>(mappings.size());
- dhtNodes.addAll(mappings.keySet());
+ dhtNodes.addAll(mappings.keySet());
+ }
+ else
+ dhtNodes = Collections.emptyList();
if (primaryReplyToNear)
updateRes.mapping(dhtNodes);
@@ -380,9 +385,25 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte
if (primaryReplyToNear)
completionCb.apply(updateReq, updateRes);
+ else {
+ if (fullSync && GridDhtAtomicCache.IGNITE_ATOMIC_SND_MAPPING_TO_NEAR) {
+ GridNearAtomicMappingResponse mappingRes = new GridNearAtomicMappingResponse(
+ cctx.cacheId(),
+ updateReq.partition(),
+ updateReq.futureId(),
+ dhtNodes);
+
+ try {
+ cctx.io().send(updateRes.nodeId(), mappingRes, cctx.ioPolicy());
+ }
+ catch (IgniteCheckedException e) {
+ U.error(msgLog, "Failed to send mapping response [futId=" + futId +
+ ", writeVer=" + writeVer + ", node=" + updateRes.nodeId() + ']');
+ }
+ }
+ }
}
else {
- // Reply.
completionCb.apply(updateReq, updateRes);
onDone();
http://git-wip-us.apache.org/repos/asf/ignite/blob/7b979880/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index d402c86..73a5acb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -35,6 +35,7 @@ import javax.cache.processor.EntryProcessorResult;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.NodeStoppingException;
@@ -110,7 +111,6 @@ import static org.apache.ignite.IgniteSystemProperties.IGNITE_ATOMIC_DEFERRED_AC
import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.CLOCK;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
-import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC;
import static org.apache.ignite.internal.GridTopic.TOPIC_CACHE;
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.DELETE;
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
@@ -137,9 +137,12 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
private static final int DEFERRED_UPDATE_RESPONSE_TIMEOUT =
Integer.getInteger(IGNITE_ATOMIC_DEFERRED_ACK_TIMEOUT, 500);
+ /** */
+ static final boolean IGNITE_ATOMIC_SND_MAPPING_TO_NEAR = IgniteSystemProperties.getBoolean("IGNITE_ATOMIC_SND_MAPPING_TO_NEAR", false);
+
/** Update reply closure. */
@GridToStringExclude
- private CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> updateReplyClos;
+ private UpdateReplyClosure updateReplyClos;
/** Pending */
private GridDeferredAckMessageSender deferredUpdateMsgSnd;
@@ -208,7 +211,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
@Override protected void init() {
super.init();
- updateReplyClos = new CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse>() {
+ updateReplyClos = new UpdateReplyClosure() {
@SuppressWarnings("ThrowableResultOfMethodCallIgnored")
@Override public void apply(GridNearAtomicAbstractUpdateRequest req, GridNearAtomicUpdateResponse res) {
if (req.writeSynchronizationMode() != FULL_ASYNC)
@@ -231,6 +234,11 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
@Override public void start() throws IgniteCheckedException {
super.start();
+ // TODO IGNITE-4705.
+ log.info("Atomic cache start [name=" + name() +
+ ", mode=" + configuration().getWriteSynchronizationMode() +
+ ", IGNITE_ATOMIC_SND_MAPPING_TO_NEAR=" + IGNITE_ATOMIC_SND_MAPPING_TO_NEAR + ']');
+
deferredUpdateMsgSnd = new GridDeferredAckMessageSender<Long>(ctx.time(), ctx.closures()) {
@Override public int getTimeout() {
return DEFERRED_UPDATE_RESPONSE_TIMEOUT;
@@ -423,6 +431,19 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
}
});
+ ctx.io().addHandler(ctx.cacheId(),
+ GridNearAtomicMappingResponse.class,
+ new CI2<UUID, GridNearAtomicMappingResponse>() {
+ @Override public void apply(UUID uuid, GridNearAtomicMappingResponse msg) {
+ processDhtAtomicNearMappingResponse(uuid, msg);
+ }
+
+ @Override public String toString() {
+ return "GridNearAtomicMappingResponse handler " +
+ "[msgIdx=" + GridNearAtomicMappingResponse.CACHE_MSG_IDX + ']';
+ }
+ });
+
if (near == null) {
ctx.io().addHandler(
ctx.cacheId(),
@@ -1697,10 +1718,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
* @param req Update request.
* @param completionCb Completion callback.
*/
- public void updateAllAsyncInternal(
+ void updateAllAsyncInternal(
final UUID nodeId,
final GridNearAtomicAbstractUpdateRequest req,
- final CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> completionCb
+ final UpdateReplyClosure completionCb
) {
IgniteInternalFuture<Object> forceFut = preldr.request(req, req.topologyVersion());
@@ -1749,7 +1770,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
*/
private void onForceKeysError(final UUID nodeId,
final GridNearAtomicAbstractUpdateRequest req,
- final CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> completionCb,
+ final UpdateReplyClosure completionCb,
IgniteCheckedException e
) {
GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(ctx.cacheId(),
@@ -1772,7 +1793,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
private void updateAllAsyncInternal0(
UUID nodeId,
GridNearAtomicAbstractUpdateRequest req,
- CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> completionCb
+ UpdateReplyClosure completionCb
) {
GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(ctx.cacheId(), nodeId, req.futureId(),
ctx.deploymentEnabled());
@@ -1997,7 +2018,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
final List<GridDhtCacheEntry> locked,
final GridCacheVersion ver,
@Nullable GridDhtAtomicAbstractUpdateFuture dhtFut,
- final CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> completionCb,
+ final GridDhtAtomicCache.UpdateReplyClosure completionCb,
final boolean replicate,
final String taskName,
@Nullable final IgniteCacheExpiryPolicy expiry,
@@ -2420,7 +2441,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
List<GridDhtCacheEntry> locked,
GridCacheVersion ver,
@Nullable GridDhtAtomicAbstractUpdateFuture dhtFut,
- CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> completionCb,
+ GridDhtAtomicCache.UpdateReplyClosure completionCb,
boolean replicate,
String taskName,
@Nullable IgniteCacheExpiryPolicy expiry,
@@ -2651,7 +2672,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
@Nullable final Collection<KeyCacheObject> rmvKeys,
@Nullable final Map<KeyCacheObject, EntryProcessor<Object, Object, Object>> entryProcessorMap,
@Nullable GridDhtAtomicAbstractUpdateFuture dhtFut,
- final CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> completionCb,
+ final GridDhtAtomicCache.UpdateReplyClosure completionCb,
final GridNearAtomicAbstractUpdateRequest req,
final GridNearAtomicUpdateResponse res,
final boolean replicate,
@@ -3133,7 +3154,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
GridCacheVersion writeVer,
GridNearAtomicAbstractUpdateRequest updateReq,
GridNearAtomicUpdateResponse updateRes,
- CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> completionCb,
+ GridDhtAtomicCache.UpdateReplyClosure completionCb,
boolean force
) {
if (updateReq.size() == 1)
@@ -3370,6 +3391,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
}
/**
+ * @param primaryId Primary node ID.
* @param req Request.
* @param nearRes Response to send.
*/
@@ -3424,6 +3446,31 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
* @param nodeId Node ID.
* @param res Response.
*/
+ private void processDhtAtomicNearMappingResponse(UUID nodeId, GridNearAtomicMappingResponse res) {
+ GridNearAtomicAbstractUpdateFuture updateFut =
+ (GridNearAtomicAbstractUpdateFuture)ctx.mvcc().atomicFuture(res.futureId());
+
+ if (updateFut != null) {
+ if (msgLog.isDebugEnabled()) {
+ msgLog.debug("Received near mapping response [futId=" + res.futureId() +
+ ", node=" + nodeId + ']');
+ }
+
+ updateFut.onMappingReceived(nodeId, res);
+ }
+ else {
+ if (msgLog.isDebugEnabled()) {
+ msgLog.debug("Failed to find future for near mapping response [futId=" + res.futureId() +
+ ", node=" + nodeId +
+ ", res=" + res + ']');
+ }
+ }
+ }
+
+ /**
+ * @param nodeId Node ID.
+ * @param res Response.
+ */
private void processDhtAtomicNearResponse(UUID nodeId, GridDhtAtomicNearResponse res) {
GridNearAtomicAbstractUpdateFuture updateFut =
(GridNearAtomicAbstractUpdateFuture)ctx.mvcc().atomicFuture(res.futureId());
@@ -3663,4 +3710,11 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
return Collections.emptyList();
}
}
+
+ /**
+ *
+ */
+ static interface UpdateReplyClosure extends CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> {
+ // No-op.
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/7b979880/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicDeferredUpdateResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicDeferredUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicDeferredUpdateResponse.java
index bd2bae0..671d516 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicDeferredUpdateResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicDeferredUpdateResponse.java
@@ -26,6 +26,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
import org.apache.ignite.internal.processors.cache.GridCacheMessage;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
@@ -147,4 +148,9 @@ public class GridDhtAtomicDeferredUpdateResponse extends GridCacheMessage implem
@Override public byte fieldsCount() {
return 4;
}
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridDhtAtomicDeferredUpdateResponse.class, this);
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/7b979880/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
index 4ee90a0..1e63165 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
@@ -28,7 +28,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedExceptio
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.internal.util.typedef.CI2;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
@@ -58,7 +57,7 @@ class GridDhtAtomicSingleUpdateFuture extends GridDhtAtomicAbstractUpdateFuture
*/
GridDhtAtomicSingleUpdateFuture(
GridCacheContext cctx,
- CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> completionCb,
+ GridDhtAtomicCache.UpdateReplyClosure completionCb,
GridCacheVersion writeVer,
GridNearAtomicAbstractUpdateRequest updateReq,
GridNearAtomicUpdateResponse updateRes
http://git-wip-us.apache.org/repos/asf/ignite/blob/7b979880/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
index 20c3d4f..dece1d9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
@@ -31,7 +31,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedExceptio
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.internal.util.typedef.CI2;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -61,7 +60,7 @@ class GridDhtAtomicUpdateFuture extends GridDhtAtomicAbstractUpdateFuture {
*/
GridDhtAtomicUpdateFuture(
GridCacheContext cctx,
- CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> completionCb,
+ GridDhtAtomicCache.UpdateReplyClosure completionCb,
GridCacheVersion writeVer,
GridNearAtomicAbstractUpdateRequest updateReq,
GridNearAtomicUpdateResponse updateRes
http://git-wip-us.apache.org/repos/asf/ignite/blob/7b979880/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
index 82f171d..15075c3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
@@ -197,7 +197,7 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
/**
* Performs future mapping.
*/
- public void map() {
+ public final void map() {
AffinityTopologyVersion topVer = cctx.shared().lockedTopologyVersion(null);
if (topVer == null)
@@ -257,7 +257,7 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
protected void mapSingle(UUID nodeId, GridNearAtomicAbstractUpdateRequest req) {
if (cctx.localNodeId().equals(nodeId)) {
cache.updateAllAsyncInternal(nodeId, req,
- new CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse>() {
+ new GridDhtAtomicCache.UpdateReplyClosure() {
@Override public void apply(GridNearAtomicAbstractUpdateRequest req, GridNearAtomicUpdateResponse res) {
onResult(res.nodeId(), res, false);
}
@@ -300,6 +300,8 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
public abstract void onResult(UUID nodeId, GridDhtAtomicNearResponse res);
+ public abstract void onMappingReceived(UUID nodeId, GridNearAtomicMappingResponse res);
+
/**
* @param req Request.
* @param e Error.
http://git-wip-us.apache.org/repos/asf/ignite/blob/7b979880/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicMappingResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicMappingResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicMappingResponse.java
new file mode 100644
index 0000000..be1f0f5
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicMappingResponse.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.UUID;
+import org.apache.ignite.internal.GridDirectCollection;
+import org.apache.ignite.internal.processors.cache.GridCacheMessage;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ *
+ */
+public class GridNearAtomicMappingResponse extends GridCacheMessage {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Message index. */
+ public static final int CACHE_MSG_IDX = nextIndexId();
+
+ /** */
+ private int part;
+
+ /** */
+ @GridDirectCollection(UUID.class)
+ private List<UUID> mapping;
+
+ /** */
+ private long futId;
+
+ /**
+ *
+ */
+ public GridNearAtomicMappingResponse() {
+ // No-op.
+ }
+
+ /**
+ * @param cacheId Cache ID.
+ * @param part Partition.
+ * @param futId Future ID.
+ * @param mapping Mapping.
+ */
+ GridNearAtomicMappingResponse(int cacheId, int part, long futId, List<UUID> mapping) {
+ assert part >= 0 : part;
+
+ this.cacheId = cacheId;
+ this.part = part;
+ this.futId = futId;
+ this.mapping = mapping;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int partition() {
+ return part;
+ }
+
+ /**
+ * @return Mapping.
+ */
+ public List<UUID> mapping() {
+ return mapping;
+ }
+
+ /**
+ * @return Future ID.
+ */
+ public long futureId() {
+ return futId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int lookupIndex() {
+ return CACHE_MSG_IDX;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte directType() {
+ return -47;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte fieldsCount() {
+ return 6;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean addDeploymentInfo() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ writer.setBuffer(buf);
+
+ if (!super.writeTo(buf, writer))
+ return false;
+
+ if (!writer.isHeaderWritten()) {
+ if (!writer.writeHeader(directType(), fieldsCount()))
+ return false;
+
+ writer.onHeaderWritten();
+ }
+
+ switch (writer.state()) {
+ case 3:
+ if (!writer.writeLong("futId", futId))
+ return false;
+
+ writer.incrementState();
+
+ case 4:
+ if (!writer.writeCollection("mapping", mapping, MessageCollectionItemType.UUID))
+ return false;
+
+ writer.incrementState();
+
+ case 5:
+ if (!writer.writeInt("part", part))
+ return false;
+
+ writer.incrementState();
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ reader.setBuffer(buf);
+
+ if (!reader.beforeMessageRead())
+ return false;
+
+ if (!super.readFrom(buf, reader))
+ return false;
+
+ switch (reader.state()) {
+ case 3:
+ futId = reader.readLong("futId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 4:
+ mapping = reader.readCollection("mapping", MessageCollectionItemType.UUID);
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 5:
+ part = reader.readInt("part");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ }
+
+ return reader.afterMessageRead(GridNearAtomicMappingResponse.class);
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridNearAtomicMappingResponse.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/7b979880/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
index 2016c98..8bfbe72 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
@@ -217,6 +217,29 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
if (cctx.discovery().node(dhtNodeId) != null)
mapping.add(dhtNodeId);
}
+
+ if (rcvd != null)
+ mapping.removeAll(rcvd);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onMappingReceived(UUID nodeId, GridNearAtomicMappingResponse res) {
+ GridCacheReturn opRes0 = null;
+
+ synchronized (mux) {
+ if (futId == null || futId != res.futureId())
+ return;
+
+ if (mapping == null) {
+ initMapping(res.mapping());
+
+ if (mapping.isEmpty() && opRes != null)
+ opRes0 = opRes;
+ }
+ }
+
+ if (opRes0 != null)
+ onDone(opRes0);
}
/** {@inheritDoc} */
@@ -251,11 +274,8 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
if (opRes == null && res.hasResult())
opRes = res.result();
- if (mapping.isEmpty() && opRes != null) {
+ if (mapping.isEmpty() && opRes != null)
opRes0 = opRes;
-
- futId = null;
- }
}
if (opRes0 != null)
@@ -324,12 +344,8 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
else
opRes = ret;
- if (res.mapping() != null) {
+ if (res.mapping() != null)
initMapping(res.mapping());
-
- if (rcvd != null)
- mapping.removeAll(rcvd);
- }
else
mapping = Collections.emptySet();
http://git-wip-us.apache.org/repos/asf/ignite/blob/7b979880/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index 7b573b1..e135000 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -256,6 +256,11 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
}
/** {@inheritDoc} */
+ @Override public void onMappingReceived(UUID nodeId, GridNearAtomicMappingResponse res) {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
@Override public void onResult(UUID nodeId, GridDhtAtomicNearResponse res) {
throw new UnsupportedOperationException();
}
@@ -591,8 +596,8 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
if (locUpdate != null) {
cache.updateAllAsyncInternal(cctx.localNodeId(), locUpdate,
- new CI2<GridNearAtomicFullUpdateRequest, GridNearAtomicUpdateResponse>() {
- @Override public void apply(GridNearAtomicFullUpdateRequest req, GridNearAtomicUpdateResponse res) {
+ new GridDhtAtomicCache.UpdateReplyClosure() {
+ @Override public void apply(GridNearAtomicAbstractUpdateRequest req, GridNearAtomicUpdateResponse res) {
onResult(res.nodeId(), res, false);
}
});
@@ -615,6 +620,12 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
void map(AffinityTopologyVersion topVer,
Long futId,
@Nullable Collection<KeyCacheObject> remapKeys) {
+ if (true) {
+ onDone(new IgniteCheckedException("Failed"));
+
+ return;
+ }
+
Collection<ClusterNode> topNodes = CU.affinityNodes(cctx, topVer);
if (F.isEmpty(topNodes)) {