You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vk...@apache.org on 2015/07/22 01:56:16 UTC
incubator-ignite git commit: IGNITE-104 - Ordered ATOMIC updates
Repository: incubator-ignite
Updated Branches:
refs/heads/ignite-104 [created] 4f14522ab
IGNITE-104 - Ordered ATOMIC updates
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/4f14522a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/4f14522a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/4f14522a
Branch: refs/heads/ignite-104
Commit: 4f14522ab7b92b38810fd24ec15bcb094f480d08
Parents: 73a2b14
Author: Valentin Kulichenko <vk...@gridgain.com>
Authored: Tue Jul 21 16:55:32 2015 -0700
Committer: Valentin Kulichenko <vk...@gridgain.com>
Committed: Tue Jul 21 16:55:32 2015 -0700
----------------------------------------------------------------------
.../configuration/CacheConfiguration.java | 21 ++
.../org/apache/ignite/internal/GridTopic.java | 73 +++++++
.../processors/cache/GridCacheIoManager.java | 32 ++-
.../processors/cache/GridCacheUtils.java | 12 +-
.../dht/atomic/GridDhtAtomicCache.java | 60 +++--
.../GridDhtAtomicDeferredUpdateResponse.java | 16 +-
.../dht/atomic/GridDhtAtomicUpdateFuture.java | 129 +++++++++--
.../dht/atomic/GridDhtAtomicUpdateRequest.java | 15 +-
.../dht/atomic/GridDhtAtomicUpdateResponse.java | 17 +-
.../dht/atomic/GridNearAtomicUpdateFuture.java | 219 ++++++++++++++-----
.../dht/atomic/GridNearAtomicUpdateRequest.java | 15 +-
.../atomic/GridNearAtomicUpdateResponse.java | 14 +-
12 files changed, 519 insertions(+), 104 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4f14522a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
index 3ad0f01..83847dc 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
@@ -219,6 +219,9 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
/** Write ordering mode. */
private CacheAtomicWriteOrderMode atomicWriteOrderMode;
+ /** Ordered updates mode. */
+ private boolean atomicOrderedUpdates;
+
/** Number of backups for cache. */
private int backups = DFLT_BACKUPS;
@@ -345,6 +348,7 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
aff = cc.getAffinity();
affMapper = cc.getAffinityMapper();
atomicityMode = cc.getAtomicityMode();
+ atomicOrderedUpdates = cc.isAtomicOrderedUpdates();
atomicWriteOrderMode = cc.getAtomicWriteOrderMode();
backups = cc.getBackups();
cacheLoaderFactory = cc.getCacheLoaderFactory();
@@ -896,6 +900,23 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
}
/**
+ * @return Ordered updates mode.
+ */
+ public boolean isAtomicOrderedUpdates() {
+ return atomicOrderedUpdates;
+ }
+
+ /**
+ * @param atomicOrderedUpdates Ordered updates mode.
+ * @return {@code this} for chaining.
+ */
+ public CacheConfiguration<K, V> setAtomicOrderedUpdates(boolean atomicOrderedUpdates) {
+ this.atomicOrderedUpdates = atomicOrderedUpdates;
+
+ return this;
+ }
+
+ /**
* Gets number of nodes used to back up single partition for {@link CacheMode#PARTITIONED} cache.
* <p>
* If not set, default value is {@link #DFLT_BACKUPS}.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4f14522a/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java b/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
index ba3b8b2..1ed8725 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
@@ -183,6 +183,15 @@ public enum GridTopic {
}
/**
+ * @param id1 ID1.
+ * @param id2 ID2.
+ * @return Grid message topic with specified IDs.
+ */
+ public Object topic(int id1, int id2) {
+ return new T9(this, id1, id2);
+ }
+
+ /**
*
*/
private static class T1 implements Externalizable {
@@ -756,4 +765,68 @@ public enum GridTopic {
return S.toString(T8.class, this);
}
}
+
+ /**
+ */
+ private static class T9 implements Externalizable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private GridTopic topic;
+
+ /** */
+ private int id1;
+
+ /** */
+ private int id2;
+
+ /**
+ * No-arg constructor needed for {@link Serializable}.
+ */
+ public T9() {
+ // No-op.
+ }
+
+ /**
+ * @param topic Topic.
+ * @param id1 ID1.
+ * @param id2 ID2.
+ */
+ private T9(GridTopic topic, int id1, int id2) {
+ this.topic = topic;
+ this.id1 = id1;
+ this.id2 = id2;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return topic.ordinal() + 31 * id1 + 31 * id2;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object obj) {
+ if (obj.getClass() == T9.class) {
+ T9 that = (T9)obj;
+
+ return topic == that.topic && id1 == that.id1 && id2 == that.id2;
+ }
+
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ out.writeByte(topic.ordinal());
+ out.writeInt(id1);
+ out.writeInt(id2);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ topic = fromOrdinal(in.readByte());
+ id1 = in.readByte();
+ id2 = in.readByte();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4f14522a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index 84e4dc2..dec6aef 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -378,7 +378,8 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
GridDhtAtomicUpdateResponse res = new GridDhtAtomicUpdateResponse(
ctx.cacheId(),
- req.futureVersion());
+ req.futureVersion(),
+ req.partition());
res.onError(req.classError());
@@ -393,7 +394,8 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(
ctx.cacheId(),
nodeId,
- req.futureVersion());
+ req.futureVersion(),
+ req.partition());
res.error(req.classError());
@@ -745,13 +747,32 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
ClusterNode n = cctx.discovery().node(nodeId);
if (n == null)
- throw new ClusterTopologyCheckedException("Failed to send message because node left grid [nodeId=" + nodeId +
- ", msg=" + msg + ']');
+ throw new ClusterTopologyCheckedException("Failed to send message because node left grid [nodeId=" +
+ nodeId + ", msg=" + msg + ']');
send(n, msg, plc);
}
/**
+ * @param nodeId Destination node ID.
+ * @param topic Topic to send the message to.
+ * @param msg Message to send.
+ * @param plc IO policy.
+ * @param timeout Timeout to keep a message on receiving queue.
+ * @throws IgniteCheckedException Thrown in case of any errors.
+ */
+ public void sendOrderedMessage(UUID nodeId, Object topic, GridCacheMessage msg, byte plc, long timeout)
+ throws IgniteCheckedException {
+ ClusterNode n = cctx.discovery().node(nodeId);
+
+ if (n == null)
+ throw new ClusterTopologyCheckedException("Failed to send message because node left grid [nodeId=" +
+ nodeId + ", msg=" + msg + ']');
+
+ sendOrderedMessage(n, topic, msg, plc, timeout);
+ }
+
+ /**
* @param node Destination node.
* @param topic Topic to send the message to.
* @param msg Message to send.
@@ -779,7 +800,8 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
}
catch (IgniteCheckedException e) {
if (cctx.discovery().node(node.id()) == null)
- throw new ClusterTopologyCheckedException("Node left grid while sending ordered message to: " + node.id(), e);
+ throw new ClusterTopologyCheckedException("Node left grid while sending ordered message to: " +
+ node.id(), e);
if (cnt == retryCnt)
throw e;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4f14522a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
index e16e30d..b0edc3c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
@@ -57,7 +57,6 @@ import static org.apache.ignite.cache.CacheMode.*;
import static org.apache.ignite.cache.CacheRebalanceMode.*;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*;
import static org.apache.ignite.internal.GridTopic.*;
-import static org.apache.ignite.internal.IgniteNodeAttributes.*;
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.*;
/**
@@ -1750,4 +1749,15 @@ public class GridCacheUtils {
}
};
}
+
+ /**
+ * @param ctx Cache context.
+ * @param part Partition.
+ * @return Per-partition message topic.
+ */
+ public static Object partitionMassageTopic(GridCacheContext ctx, int part) {
+ assert part >= 0;
+
+ return TOPIC_CACHE.topic(ctx.cacheId(), part);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4f14522a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 0a21979..38073f3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -181,11 +181,22 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
}
});
- ctx.io().addHandler(ctx.cacheId(), GridNearAtomicUpdateRequest.class, new CI2<UUID, GridNearAtomicUpdateRequest>() {
- @Override public void apply(UUID nodeId, GridNearAtomicUpdateRequest req) {
- processNearAtomicUpdateRequest(nodeId, req);
+ if (ctx.config().isAtomicOrderedUpdates()) {
+ for (int part = 0; part < ctx.affinity().partitions(); part++) {
+ ctx.io().addOrderedHandler(CU.partitionMassageTopic(ctx, part), new CI2<UUID, GridNearAtomicUpdateRequest>() {
+ @Override public void apply(UUID nodeId, GridNearAtomicUpdateRequest req) {
+ processNearAtomicUpdateRequest(nodeId, req);
+ }
+ });
}
- });
+ }
+ else {
+ ctx.io().addHandler(ctx.cacheId(), GridNearAtomicUpdateRequest.class, new CI2<UUID, GridNearAtomicUpdateRequest>() {
+ @Override public void apply(UUID nodeId, GridNearAtomicUpdateRequest req) {
+ processNearAtomicUpdateRequest(nodeId, req);
+ }
+ });
+ }
ctx.io().addHandler(ctx.cacheId(), GridNearAtomicUpdateResponse.class, new CI2<UUID, GridNearAtomicUpdateResponse>() {
@Override public void apply(UUID nodeId, GridNearAtomicUpdateResponse res) {
@@ -193,11 +204,22 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
}
});
- ctx.io().addHandler(ctx.cacheId(), GridDhtAtomicUpdateRequest.class, new CI2<UUID, GridDhtAtomicUpdateRequest>() {
- @Override public void apply(UUID nodeId, GridDhtAtomicUpdateRequest req) {
- processDhtAtomicUpdateRequest(nodeId, req);
+ if (ctx.config().isAtomicOrderedUpdates()) {
+ for (int part = 0; part < ctx.affinity().partitions(); part++) {
+ ctx.io().addOrderedHandler(CU.partitionMassageTopic(ctx, part), new CI2<UUID, GridDhtAtomicUpdateRequest>() {
+ @Override public void apply(UUID nodeId, GridDhtAtomicUpdateRequest req) {
+ processDhtAtomicUpdateRequest(nodeId, req);
+ }
+ });
}
- });
+ }
+ else {
+ ctx.io().addHandler(ctx.cacheId(), GridDhtAtomicUpdateRequest.class, new CI2<UUID, GridDhtAtomicUpdateRequest>() {
+ @Override public void apply(UUID nodeId, GridDhtAtomicUpdateRequest req) {
+ processDhtAtomicUpdateRequest(nodeId, req);
+ }
+ });
+ }
ctx.io().addHandler(ctx.cacheId(), GridDhtAtomicUpdateResponse.class, new CI2<UUID, GridDhtAtomicUpdateResponse>() {
@Override public void apply(UUID nodeId, GridDhtAtomicUpdateResponse res) {
@@ -1017,7 +1039,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
) {
GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(ctx.cacheId(),
nodeId,
- req.futureVersion());
+ req.futureVersion(),
+ req.partition());
List<KeyCacheObject> keys = req.keys();
@@ -2389,7 +2412,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
GridCacheVersion ver = req.writeVersion();
// Always send update reply.
- GridDhtAtomicUpdateResponse res = new GridDhtAtomicUpdateResponse(ctx.cacheId(), req.futureVersion());
+ GridDhtAtomicUpdateResponse res = new GridDhtAtomicUpdateResponse(
+ ctx.cacheId(), req.futureVersion(), req.partition());
Boolean replicate = ctx.isDrEnabled();
@@ -2477,7 +2501,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
ctx.io().send(nodeId, res, ctx.ioPolicy());
else {
// No failed keys and sync mode is not FULL_SYNC, thus sending deferred response.
- sendDeferredUpdateResponse(nodeId, req.futureVersion());
+ sendDeferredUpdateResponse(nodeId, req.futureVersion(), req.partition());
}
}
catch (ClusterTopologyCheckedException ignored) {
@@ -2494,7 +2518,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
* @param nodeId Node ID to send message to.
* @param ver Version to ack.
*/
- private void sendDeferredUpdateResponse(UUID nodeId, GridCacheVersion ver) {
+ private void sendDeferredUpdateResponse(UUID nodeId, GridCacheVersion ver, int part) {
while (true) {
DeferredResponseBuffer buf = pendingResponses.get(nodeId);
@@ -2511,7 +2535,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
buf = old;
}
- if (!buf.addResponse(ver))
+ if (!buf.addResponse(ver, part))
// Some thread is sending filled up buffer, we can remove it.
pendingResponses.remove(nodeId, buf);
else
@@ -2551,7 +2575,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
GridDhtAtomicUpdateFuture updateFut = (GridDhtAtomicUpdateFuture)ctx.mvcc().atomicFuture(ver);
if (updateFut != null)
- updateFut.onResult(nodeId);
+ updateFut.onResult(nodeId, res);
else
U.warn(log, "Failed to find DHT update future for deferred update response [nodeId=" +
nodeId + ", ver=" + ver + ", res=" + res + ']');
@@ -2751,6 +2775,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
/** Response versions. */
private Collection<GridCacheVersion> respVers = new ConcurrentLinkedDeque8<>();
+ /** Response partitions. */
+ private Collection<Integer> respParts = new GridConcurrentHashSet<>();
+
/** Node ID. */
private final UUID nodeId;
@@ -2805,7 +2832,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
* @param ver Version to send.
* @return {@code True} if response was handled, {@code false} if this buffer is filled and cannot be used.
*/
- public boolean addResponse(GridCacheVersion ver) {
+ public boolean addResponse(GridCacheVersion ver, int part) {
readLock().lock();
boolean snd = false;
@@ -2815,6 +2842,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
return false;
respVers.add(ver);
+ respParts.add(part);
if (respVers.size() > DEFERRED_UPDATE_RESPONSE_BUFFER_SIZE && guard.compareAndSet(false, true))
snd = true;
@@ -2845,7 +2873,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
*/
private void finish() {
GridDhtAtomicDeferredUpdateResponse msg = new GridDhtAtomicDeferredUpdateResponse(ctx.cacheId(),
- respVers);
+ respVers, respParts);
try {
ctx.kernalContext().gateway().readLock();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4f14522a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicDeferredUpdateResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicDeferredUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicDeferredUpdateResponse.java
index 1163761..e203b75 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicDeferredUpdateResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicDeferredUpdateResponse.java
@@ -41,6 +41,10 @@ public class GridDhtAtomicDeferredUpdateResponse extends GridCacheMessage implem
@GridDirectCollection(GridCacheVersion.class)
private Collection<GridCacheVersion> futVers;
+ /** Partitions. */
+ @GridDirectCollection(int.class)
+ private Collection<Integer> parts;
+
/** {@inheritDoc} */
@Override public int lookupIndex() {
return CACHE_MSG_IDX;
@@ -57,12 +61,15 @@ public class GridDhtAtomicDeferredUpdateResponse extends GridCacheMessage implem
* Constructor.
*
* @param futVers Future versions.
+ * @param parts Partitions.
*/
- public GridDhtAtomicDeferredUpdateResponse(int cacheId, Collection<GridCacheVersion> futVers) {
+ public GridDhtAtomicDeferredUpdateResponse(int cacheId, Collection<GridCacheVersion> futVers,
+ Collection<Integer> parts) {
assert !F.isEmpty(futVers);
this.cacheId = cacheId;
this.futVers = futVers;
+ this.parts = parts;
}
/**
@@ -72,6 +79,13 @@ public class GridDhtAtomicDeferredUpdateResponse extends GridCacheMessage implem
return futVers;
}
+ /**
+ * @return Partitions.
+ */
+ public Collection<Integer> partitions() {
+ return parts;
+ }
+
/** {@inheritDoc} */
@Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
writer.setBuffer(buf);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4f14522a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
index 4b1a58f..f7e574d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
@@ -73,7 +73,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
/** Mappings. */
@GridToStringInclude
- private ConcurrentMap<UUID, GridDhtAtomicUpdateRequest> mappings = new ConcurrentHashMap8<>();
+ private ConcurrentMap<MappingKey, GridDhtAtomicUpdateRequest> mappings = new ConcurrentHashMap8<>();
/** Entries with readers. */
private Map<KeyCacheObject, GridDhtCacheEntry> nearReadersEntries;
@@ -135,7 +135,11 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
/** {@inheritDoc} */
@Override public Collection<? extends ClusterNode> nodes() {
- return F.view(F.viewReadOnly(mappings.keySet(), U.id2Node(cctx.kernalContext())), F.notNull());
+ return F.view(F.viewReadOnly(mappings.keySet(), new C1<MappingKey, ClusterNode>() {
+ @Override public ClusterNode apply(MappingKey mappingKey) {
+ return cctx.kernalContext().discovery().node(mappingKey.nodeId);
+ }
+ }), F.notNull());
}
/** {@inheritDoc} */
@@ -143,11 +147,16 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
if (log.isDebugEnabled())
log.debug("Processing node leave event [fut=" + this + ", nodeId=" + nodeId + ']');
- GridDhtAtomicUpdateRequest req = mappings.get(nodeId);
+ Collection<MappingKey> mappingKeys = new ArrayList<>(mappings.size());
+
+ for (MappingKey mappingKey : mappings.keySet()) {
+ if (mappingKey.nodeId.equals(nodeId))
+ mappingKeys.add(mappingKey);
+ }
- if (req != null) {
- // Remove only after added keys to failed set.
- mappings.remove(nodeId);
+ if (!mappingKeys.isEmpty()) {
+ for (MappingKey mappingKey : mappingKeys)
+ mappings.remove(mappingKey);
checkComplete();
@@ -201,7 +210,12 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
@Nullable GridCacheVersion conflictVer) {
AffinityTopologyVersion topVer = updateReq.topologyVersion();
- Collection<ClusterNode> dhtNodes = cctx.dht().topology().nodes(entry.partition(), topVer);
+ int part = entry.partition();
+
+ Collection<ClusterNode> dhtNodes = cctx.dht().topology().nodes(part, topVer);
+
+ if (!cctx.config().isAtomicOrderedUpdates())
+ part = -1;
if (log.isDebugEnabled())
log.debug("Mapping entry to DHT nodes [nodes=" + U.nodeIds(dhtNodes) + ", entry=" + entry + ']');
@@ -213,8 +227,10 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
for (ClusterNode node : dhtNodes) {
UUID nodeId = node.id();
+ MappingKey mappingKey = new MappingKey(nodeId, part);
+
if (!nodeId.equals(cctx.localNodeId())) {
- GridDhtAtomicUpdateRequest updateReq = mappings.get(nodeId);
+ GridDhtAtomicUpdateRequest updateReq = mappings.get(mappingKey);
if (updateReq == null) {
updateReq = new GridDhtAtomicUpdateRequest(
@@ -227,9 +243,10 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
forceTransformBackups,
this.updateReq.subjectId(),
this.updateReq.taskNameHash(),
- forceTransformBackups ? this.updateReq.invokeArguments() : null);
+ forceTransformBackups ? this.updateReq.invokeArguments() : null,
+ part);
- mappings.put(nodeId, updateReq);
+ mappings.put(mappingKey, updateReq);
}
updateReq.addWriteValue(entry.key(),
@@ -262,8 +279,12 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
AffinityTopologyVersion topVer = updateReq.topologyVersion();
+ int part = cctx.config().isAtomicOrderedUpdates() ? entry.partition() : -1;
+
for (UUID nodeId : readers) {
- GridDhtAtomicUpdateRequest updateReq = mappings.get(nodeId);
+ MappingKey mappingKey = new MappingKey(nodeId, part);
+
+ GridDhtAtomicUpdateRequest updateReq = mappings.get(mappingKey);
if (updateReq == null) {
ClusterNode node = cctx.discovery().node(nodeId);
@@ -282,9 +303,10 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
forceTransformBackups,
this.updateReq.subjectId(),
this.updateReq.taskNameHash(),
- forceTransformBackups ? this.updateReq.invokeArguments() : null);
+ forceTransformBackups ? this.updateReq.invokeArguments() : null,
+ part);
- mappings.put(nodeId, updateReq);
+ mappings.put(mappingKey, updateReq);
}
if (nearReadersEntries == null)
@@ -319,24 +341,36 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
*/
public void map() {
if (!mappings.isEmpty()) {
- for (GridDhtAtomicUpdateRequest req : mappings.values()) {
+ for (Map.Entry<MappingKey, GridDhtAtomicUpdateRequest> e : mappings.entrySet()) {
+ MappingKey mappingKey = e.getKey();
+ GridDhtAtomicUpdateRequest req = e.getValue();
+
try {
if (log.isDebugEnabled())
log.debug("Sending DHT atomic update request [nodeId=" + req.nodeId() + ", req=" + req + ']');
- cctx.io().send(req.nodeId(), req, cctx.ioPolicy());
+ if (mappingKey.part >= 0) {
+ Object topic = CU.partitionMassageTopic(cctx, mappingKey.part);
+
+ cctx.io().sendOrderedMessage(mappingKey.nodeId, topic, req, cctx.ioPolicy(), 0);
+ }
+ else {
+ assert mappingKey.part == -1;
+
+ cctx.io().send(req.nodeId(), req, cctx.ioPolicy());
+ }
}
catch (ClusterTopologyCheckedException ignored) {
U.warn(log, "Failed to send update request to backup node because it left grid: " +
req.nodeId());
- mappings.remove(req.nodeId());
+ mappings.remove(mappingKey);
}
- catch (IgniteCheckedException e) {
+ catch (IgniteCheckedException ex) {
U.error(log, "Failed to send update request to backup node (did node leave the grid?): "
- + req.nodeId(), e);
+ + req.nodeId(), ex);
- mappings.remove(req.nodeId());
+ mappings.remove(mappingKey);
}
}
}
@@ -376,7 +410,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
}
}
- mappings.remove(nodeId);
+ mappings.remove(new MappingKey(nodeId, updateRes.partition()));
checkComplete();
}
@@ -385,12 +419,14 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
* Deferred update response.
*
* @param nodeId Backup node ID.
+ * @param res Response.
*/
- public void onResult(UUID nodeId) {
+ public void onResult(UUID nodeId, GridDhtAtomicDeferredUpdateResponse res) {
if (log.isDebugEnabled())
log.debug("Received deferred DHT atomic update future result [nodeId=" + nodeId + ']');
- mappings.remove(nodeId);
+ for (Integer part : res.partitions())
+ mappings.remove(new MappingKey(nodeId, part));
checkComplete();
}
@@ -412,4 +448,53 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
@Override public String toString() {
return S.toString(GridDhtAtomicUpdateFuture.class, this);
}
+
+ /**
+ */
+ private static class MappingKey {
+ /** Node ID. */
+ private final UUID nodeId;
+
+ /** Partition. */
+ private final int part;
+
+ /**
+ * @param nodeId Node ID.
+ * @param part Partition.
+ */
+ private MappingKey(UUID nodeId, int part) {
+ assert nodeId != null;
+ assert part >= -1 : part;
+
+ this.nodeId = nodeId;
+ this.part = part;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ MappingKey key = (MappingKey)o;
+
+ return nodeId.equals(key.nodeId) && part == key.part;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ int res = nodeId.hashCode();
+
+ res = 31 * res + part;
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(MappingKey.class, this);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4f14522a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
index f83b8fa..031edb0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
@@ -127,6 +127,9 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
/** Task name hash. */
private int taskNameHash;
+ /** Partition. */
+ private int part;
+
/**
* Empty constructor required by {@link Externalizable}.
*/
@@ -147,6 +150,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
* @param forceTransformBackups Force transform backups flag.
* @param subjId Subject ID.
* @param taskNameHash Task name hash code.
+ * @param part Partition.
*/
public GridDhtAtomicUpdateRequest(
int cacheId,
@@ -158,7 +162,8 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
boolean forceTransformBackups,
UUID subjId,
int taskNameHash,
- Object[] invokeArgs
+ Object[] invokeArgs,
+ int part
) {
assert invokeArgs == null || forceTransformBackups;
@@ -172,6 +177,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
this.subjId = subjId;
this.taskNameHash = taskNameHash;
this.invokeArgs = invokeArgs;
+ this.part = part;
keys = new ArrayList<>();
@@ -318,6 +324,13 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
}
/**
+ * @return Partition.
+ */
+ public int partition() {
+ return part;
+ }
+
+ /**
* @return Node ID.
*/
public UUID nodeId() {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4f14522a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java
index c5b5a37..509a918 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java
@@ -59,6 +59,9 @@ public class GridDhtAtomicUpdateResponse extends GridCacheMessage implements Gri
@GridDirectCollection(KeyCacheObject.class)
private List<KeyCacheObject> nearEvicted;
+ /** Partition. */
+ private int part;
+
/**
* Empty constructor required by {@link Externalizable}.
*/
@@ -69,10 +72,12 @@ public class GridDhtAtomicUpdateResponse extends GridCacheMessage implements Gri
/**
* @param cacheId Cache ID.
* @param futVer Future version.
+ * @param part Partition.
*/
- public GridDhtAtomicUpdateResponse(int cacheId, GridCacheVersion futVer) {
+ public GridDhtAtomicUpdateResponse(int cacheId, GridCacheVersion futVer, int part) {
this.cacheId = cacheId;
this.futVer = futVer;
+ this.part = part;
}
/** {@inheritDoc} */
@@ -89,7 +94,8 @@ public class GridDhtAtomicUpdateResponse extends GridCacheMessage implements Gri
/**
* Sets update error.
- * @param err
+ *
+ * @param err Error.
*/
public void onError(IgniteCheckedException err){
this.err = err;
@@ -110,6 +116,13 @@ public class GridDhtAtomicUpdateResponse extends GridCacheMessage implements Gri
}
/**
+ * @return Partition.
+ */
+ public int partition() {
+ return part;
+ }
+
+ /**
* Adds key to collection of failed keys.
*
* @param key Key to add.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4f14522a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index 4c8a161..63818f2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -90,7 +90,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
/** Mappings. */
@GridToStringInclude
- private ConcurrentMap<UUID, GridNearAtomicUpdateRequest> mappings;
+ private ConcurrentMap<MappingKey, GridNearAtomicUpdateRequest> mappings;
/** Error. */
private volatile CachePartialUpdateCheckedException err;
@@ -246,7 +246,11 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
/** {@inheritDoc} */
@Override public Collection<? extends ClusterNode> nodes() {
- return F.view(F.viewReadOnly(mappings.keySet(), U.id2Node(cctx.kernalContext())), F.notNull());
+ return F.view(F.viewReadOnly(mappings.keySet(), new C1<MappingKey, ClusterNode>() {
+ @Override public ClusterNode apply(MappingKey mappingKey) {
+ return cctx.kernalContext().discovery().node(mappingKey.nodeId);
+ }
+ }), F.notNull());
}
/**
@@ -283,13 +287,24 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
return false;
}
- GridNearAtomicUpdateRequest req = mappings.get(nodeId);
+ Collection<MappingKey> mappingKeys = new ArrayList<>(mappings.size());
+ Collection<KeyCacheObject> failedKeys = new ArrayList<>();
+
+ for (Map.Entry<MappingKey, GridNearAtomicUpdateRequest> e : mappings.entrySet()) {
+ if (e.getKey().nodeId.equals(nodeId)) {
+ mappingKeys.add(e.getKey());
- if (req != null) {
- addFailedKeys(req.keys(), new ClusterTopologyCheckedException("Primary node left grid before response is " +
- "received: " + nodeId));
+ failedKeys.addAll(e.getValue().keys());
+ }
+ }
- mappings.remove(nodeId);
+ if (!mappingKeys.isEmpty()) {
+ if (!failedKeys.isEmpty())
+ addFailedKeys(failedKeys, new ClusterTopologyCheckedException("Primary node left grid before " +
+ "response is received: " + nodeId));
+
+ for (MappingKey key : mappingKeys)
+ mappings.remove(key);
checkComplete();
@@ -529,7 +544,9 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
}
}
else {
- GridNearAtomicUpdateRequest req = mappings.get(nodeId);
+ MappingKey mappingKey = new MappingKey(nodeId, res.partition());
+
+ GridNearAtomicUpdateRequest req = mappings.get(mappingKey);
if (req != null) { // req can be null if onResult is being processed concurrently with onNodeLeft.
updateNear(req, res);
@@ -547,7 +564,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
opRes = ret;
}
- mappings.remove(nodeId);
+ mappings.remove(mappingKey);
}
checkComplete();
@@ -763,7 +780,11 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
if (op != TRANSFORM)
val = cctx.toCacheObject(val);
- ClusterNode primary = cctx.affinity().primary(cacheKey, topVer);
+ int part = cctx.affinity().partition(cacheKey);
+ ClusterNode primary = cctx.affinity().primary(part, topVer);
+
+ if (!ccfg.isAtomicOrderedUpdates())
+ part = -1;
if (primary == null) {
onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all partition nodes " +
@@ -789,7 +810,8 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
subjId,
taskNameHash,
skipStore,
- cctx.kernalContext().clientNode());
+ cctx.kernalContext().clientNode(),
+ part);
req.addUpdateEntry(cacheKey,
val,
@@ -805,7 +827,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
}
// Optimize mapping for single key.
- mapSingle(primary.id(), req);
+ mapSingle(new MappingKey(primary.id(), part), req);
return;
}
@@ -825,13 +847,18 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
if (conflictRmvVals != null)
conflictRmvValsIt = conflictRmvVals.iterator();
- Map<UUID, GridNearAtomicUpdateRequest> pendingMappings = new HashMap<>(topNodes.size(), 1.0f);
+ Map<MappingKey, GridNearAtomicUpdateRequest> pendingMappings = new HashMap<>(topNodes.size(), 1.0f);
// Must do this in synchronized block because we need to atomically remove and add mapping.
// Otherwise checkComplete() may see empty intermediate state.
synchronized (this) {
- if (oldNodeId != null)
- removeMapping(oldNodeId);
+ if (oldNodeId != null) {
+ // TODO: IGNITE-104 - Try to avoid iteration.
+ for (Map.Entry<MappingKey, GridNearAtomicUpdateRequest> e : mappings.entrySet()) {
+ if (e.getKey().nodeId.equals(oldNodeId))
+ mappings.remove(e.getKey());
+ }
+ }
// For fastMap mode wait for all responses before remapping.
if (remap && fastMap && !mappings.isEmpty()) {
@@ -901,7 +928,10 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
if (op != TRANSFORM)
val = cctx.toCacheObject(val);
- Collection<ClusterNode> affNodes = mapKey(cacheKey, topVer, fastMap);
+ T2<Integer, Collection<ClusterNode>> t = mapKey(cacheKey, topVer, fastMap);
+
+ int part = ccfg.isAtomicOrderedUpdates() ? t.get1() : -1;
+ Collection<ClusterNode> affNodes = t.get2();
if (affNodes.isEmpty()) {
onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
@@ -922,7 +952,9 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
UUID nodeId = affNode.id();
- GridNearAtomicUpdateRequest mapped = pendingMappings.get(nodeId);
+ MappingKey mappingKey = new MappingKey(nodeId, part);
+
+ GridNearAtomicUpdateRequest mapped = pendingMappings.get(mappingKey);
if (mapped == null) {
mapped = new GridNearAtomicUpdateRequest(
@@ -942,11 +974,12 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
subjId,
taskNameHash,
skipStore,
- cctx.kernalContext().clientNode());
+ cctx.kernalContext().clientNode(),
+ part);
- pendingMappings.put(nodeId, mapped);
+ pendingMappings.put(mappingKey, mapped);
- GridNearAtomicUpdateRequest old = mappings.put(nodeId, mapped);
+ GridNearAtomicUpdateRequest old = mappings.put(mappingKey, mapped);
assert old == null || (old != null && remap) :
"Invalid mapping state [old=" + old + ", remap=" + remap + ']';
@@ -964,7 +997,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
}
if ((single == null || single) && pendingMappings.size() == 1) {
- Map.Entry<UUID, GridNearAtomicUpdateRequest> entry = F.first(pendingMappings.entrySet());
+ Map.Entry<MappingKey, GridNearAtomicUpdateRequest> entry = F.first(pendingMappings.entrySet());
single = true;
@@ -987,31 +1020,35 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
* @param fastMap Flag indicating whether mapping is performed for fast-circuit update.
* @return Collection of nodes to which key is mapped.
*/
- private Collection<ClusterNode> mapKey(
+ private T2<Integer, Collection<ClusterNode>> mapKey(
KeyCacheObject key,
AffinityTopologyVersion topVer,
boolean fastMap
) {
GridCacheAffinityManager affMgr = cctx.affinity();
+ int part = affMgr.partition(key);
+
// If we can send updates in parallel - do it.
- return fastMap ?
- cctx.topology().nodes(affMgr.partition(key), topVer) :
- Collections.singletonList(affMgr.primary(key, topVer));
+ Collection<ClusterNode> nodes = fastMap ?
+ cctx.topology().nodes(part, topVer) :
+ Collections.singletonList(affMgr.primary(part, topVer));
+
+ return new T2<>(part, nodes);
}
/**
* Maps future to single node.
*
- * @param nodeId Node ID.
+ * @param mappingKey Mapping key.
* @param req Request.
*/
- private void mapSingle(UUID nodeId, GridNearAtomicUpdateRequest req) {
- singleNodeId = nodeId;
+ private void mapSingle(MappingKey mappingKey, GridNearAtomicUpdateRequest req) {
+ singleNodeId = mappingKey.nodeId;
singleReq = req;
- if (cctx.localNodeId().equals(nodeId)) {
- cache.updateAllAsyncInternal(nodeId, req,
+ if (cctx.localNodeId().equals(mappingKey.nodeId)) {
+ cache.updateAllAsyncInternal(mappingKey.nodeId, req,
new CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse>() {
@Override public void apply(GridNearAtomicUpdateRequest req,
GridNearAtomicUpdateResponse res) {
@@ -1026,7 +1063,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
if (log.isDebugEnabled())
log.debug("Sending near atomic update request [nodeId=" + req.nodeId() + ", req=" + req + ']');
- cctx.io().send(req.nodeId(), req, cctx.ioPolicy());
+ sendRequest(mappingKey, req);
if (syncMode == FULL_ASYNC && cctx.config().getAtomicWriteOrderMode() == PRIMARY)
onDone(new GridCacheReturn(cctx, true, null, true));
@@ -1042,34 +1079,37 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
*
* @param mappings Mappings to send.
*/
- private void doUpdate(Map<UUID, GridNearAtomicUpdateRequest> mappings) {
+ private void doUpdate(Map<MappingKey, GridNearAtomicUpdateRequest> mappings) {
UUID locNodeId = cctx.localNodeId();
- GridNearAtomicUpdateRequest locUpdate = null;
+ Collection<GridNearAtomicUpdateRequest> locUpdates = null;
// Send messages to remote nodes first, then run local update.
- for (GridNearAtomicUpdateRequest req : mappings.values()) {
+ for (Map.Entry<MappingKey, GridNearAtomicUpdateRequest> e : mappings.entrySet()) {
+ MappingKey mappingKey = e.getKey();
+ GridNearAtomicUpdateRequest req = e.getValue();
+
if (locNodeId.equals(req.nodeId())) {
- assert locUpdate == null : "Cannot have more than one local mapping [locUpdate=" + locUpdate +
- ", req=" + req + ']';
+ if (locUpdates == null)
+ locUpdates = new ArrayList<>(mappings.size());
- locUpdate = req;
+ locUpdates.add(req);
}
else {
try {
if (log.isDebugEnabled())
log.debug("Sending near atomic update request [nodeId=" + req.nodeId() + ", req=" + req + ']');
- cctx.io().send(req.nodeId(), req, cctx.ioPolicy());
+ sendRequest(mappingKey, req);
}
- catch (IgniteCheckedException e) {
- addFailedKeys(req.keys(), e);
+ catch (IgniteCheckedException ex) {
+ addFailedKeys(req.keys(), ex);
- removeMapping(req.nodeId());
+ removeMapping(mappingKey);
}
if (syncMode == PRIMARY_SYNC && !req.hasPrimary())
- removeMapping(req.nodeId());
+ removeMapping(mappingKey);
}
}
@@ -1077,28 +1117,50 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
// In FULL_ASYNC mode always return (null, true).
opRes = new GridCacheReturn(cctx, true, null, true);
- if (locUpdate != null) {
- cache.updateAllAsyncInternal(cctx.localNodeId(), locUpdate,
- new CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse>() {
- @Override public void apply(GridNearAtomicUpdateRequest req,
- GridNearAtomicUpdateResponse res) {
- assert res.futureVersion().equals(futVer) : futVer;
+ if (locUpdates != null) {
+ for (GridNearAtomicUpdateRequest locUpdate : locUpdates) {
+ cache.updateAllAsyncInternal(cctx.localNodeId(), locUpdate,
+ new CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse>() {
+ @Override public void apply(GridNearAtomicUpdateRequest req,
+ GridNearAtomicUpdateResponse res) {
+ assert res.futureVersion().equals(futVer) : futVer;
- onResult(res.nodeId(), res);
- }
- });
+ onResult(res.nodeId(), res);
+ }
+ });
+ }
}
checkComplete();
}
/**
+ * Sends request.
+ *
+ * @param mappingKey Mapping key.
+ * @param req Update request.
+ * @throws IgniteCheckedException In case of error.
+ */
+ private void sendRequest(MappingKey mappingKey, GridNearAtomicUpdateRequest req) throws IgniteCheckedException {
+ if (mappingKey.part >= 0) {
+ Object topic = CU.partitionMassageTopic(cctx, mappingKey.part);
+
+ cctx.io().sendOrderedMessage(mappingKey.nodeId, topic, req, cctx.ioPolicy(), 0);
+ }
+ else {
+ assert mappingKey.part == -1;
+
+ cctx.io().send(req.nodeId(), req, cctx.ioPolicy());
+ }
+ }
+
+ /**
* Removes mapping from future mappings map.
*
- * @param nodeId Node ID to remove mapping for.
+ * @param mappingKey Mapping key.
*/
- private void removeMapping(UUID nodeId) {
- mappings.remove(nodeId);
+ private void removeMapping(MappingKey mappingKey) {
+ mappings.remove(mappingKey);
}
/**
@@ -1142,4 +1204,53 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
public String toString() {
return S.toString(GridNearAtomicUpdateFuture.class, this, super.toString());
}
+
+ /**
+ */
+ private static class MappingKey {
+ /** Node ID. */
+ private final UUID nodeId;
+
+ /** Partition. */
+ private final int part;
+
+ /**
+ * @param nodeId Node ID.
+ * @param part Partition.
+ */
+ private MappingKey(UUID nodeId, int part) {
+ assert nodeId != null;
+ assert part >= -1 : part;
+
+ this.nodeId = nodeId;
+ this.part = part;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ MappingKey key = (MappingKey)o;
+
+ return nodeId.equals(key.nodeId) && part == key.part;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ int res = nodeId.hashCode();
+
+ res = 31 * res + part;
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(MappingKey.class, this);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4f14522a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
index 86c5ab8..93429c0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
@@ -135,6 +135,9 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
/** */
private boolean clientReq;
+ /** Partition. */
+ private int part;
+
/**
* Empty constructor required by {@link Externalizable}.
*/
@@ -162,6 +165,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
* @param taskNameHash Task name hash code.
* @param skipStore Skip write-through to a persistent storage.
* @param clientReq Client node request flag.
+ * @param part Partition.
*/
public GridNearAtomicUpdateRequest(
int cacheId,
@@ -180,7 +184,8 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
@Nullable UUID subjId,
int taskNameHash,
boolean skipStore,
- boolean clientReq
+ boolean clientReq,
+ int part
) {
this.cacheId = cacheId;
this.nodeId = nodeId;
@@ -200,6 +205,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
this.taskNameHash = taskNameHash;
this.skipStore = skipStore;
this.clientReq = clientReq;
+ this.part = part;
keys = new ArrayList<>();
}
@@ -315,6 +321,13 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
}
/**
+ * @return Partition.
+ */
+ public int partition() {
+ return part;
+ }
+
+ /**
* @param key Key to add.
* @param val Optional update value.
* @param conflictTtl Conflict TTL (optional).
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/4f14522a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
index 330e43c..404670a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
@@ -92,6 +92,9 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
/** Near expire times. */
private GridLongList nearExpireTimes;
+ /** Partition. */
+ private int part;
+
/**
* Empty constructor required by {@link Externalizable}.
*/
@@ -103,11 +106,13 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
* @param cacheId Cache ID.
* @param nodeId Node ID this reply should be sent to.
* @param futVer Future version.
+ * @param part Partition.
*/
- public GridNearAtomicUpdateResponse(int cacheId, UUID nodeId, GridCacheVersion futVer) {
+ public GridNearAtomicUpdateResponse(int cacheId, UUID nodeId, GridCacheVersion futVer, int part) {
this.cacheId = cacheId;
this.nodeId = nodeId;
this.futVer = futVer;
+ this.part = part;
}
/** {@inheritDoc} */
@@ -188,6 +193,13 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
}
/**
+ * @return Partition.
+ */
+ public int partition() {
+ return part;
+ }
+
+ /**
* Adds value to be put in near cache on originating node.
*
* @param keyIdx Key index.