You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vk...@apache.org on 2015/07/31 02:26:10 UTC
[3/5] incubator-ignite git commit: IGNITE-104 - Ordered ATOMIC updates
IGNITE-104 - Ordered ATOMIC updates
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/2d16d99f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/2d16d99f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/2d16d99f
Branch: refs/heads/ignite-104
Commit: 2d16d99f64fdfbff591124abcb4c5d42ac29d8bf
Parents: dad4691
Author: Valentin Kulichenko <vk...@gridgain.com>
Authored: Thu Jul 30 16:48:30 2015 -0700
Committer: Valentin Kulichenko <vk...@gridgain.com>
Committed: Thu Jul 30 16:48:30 2015 -0700
----------------------------------------------------------------------
.../ignite/codegen/MessageCodeGenerator.java | 2 +
.../org/apache/ignite/internal/GridTopic.java | 83 --------
.../managers/communication/GridIoManager.java | 199 ++++++++++++++++++-
.../managers/communication/GridIoMessage.java | 48 +++--
.../processors/cache/GridCacheIoManager.java | 99 +++++++--
.../processors/cache/GridCacheUtils.java | 11 -
.../dht/atomic/GridAtomicMappingKey.java | 86 ++++++++
.../dht/atomic/GridAtomicRequestTopic.java | 96 +++++++++
.../dht/atomic/GridDhtAtomicCache.java | 33 ++-
.../dht/atomic/GridDhtAtomicUpdateFuture.java | 89 ++-------
.../dht/atomic/GridNearAtomicUpdateFuture.java | 172 +++++++++++-----
.../dht/atomic/GridNearAtomicUpdateRequest.java | 63 ++++--
.../atomic/GridNearAtomicUpdateResponse.java | 36 +++-
.../preloader/GridDhtPartitionDemandPool.java | 15 +-
.../query/GridCacheDistributedQueryManager.java | 8 +-
.../resources/META-INF/classnames.properties | 2 +-
16 files changed, 730 insertions(+), 312 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2d16d99f/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
----------------------------------------------------------------------
diff --git a/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java b/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
index c560118..2510d65 100644
--- a/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
+++ b/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
@@ -142,6 +142,8 @@ public class MessageCodeGenerator {
MessageCodeGenerator gen = new MessageCodeGenerator(srcDir);
+// gen.generateAndWrite(GridIoMessage.class);
+
// gen.generateAndWrite(GridNearAtomicUpdateRequest.class);
// gen.generateAndWrite(GridNearAtomicUpdateResponse.class);
// gen.generateAndWrite(GridDhtAtomicUpdateRequest.class);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2d16d99f/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java b/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
index e9da40c..56aea1e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
@@ -183,15 +183,6 @@ public enum GridTopic {
}
/**
- * @param id1 ID1.
- * @param id2 ID2.
- * @return Grid message topic with specified IDs.
- */
- public Object topic(int id1, int id2) {
- return new T9(this, id1, id2);
- }
-
- /**
*
*/
private static class T1 implements Externalizable {
@@ -765,78 +756,4 @@ public enum GridTopic {
return S.toString(T8.class, this);
}
}
-
- /**
- */
- private static class T9 implements Externalizable {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** */
- private GridTopic topic;
-
- /** */
- private int id1;
-
- /** */
- private int id2;
-
- /**
- * No-arg constructor needed for {@link Serializable}.
- */
- public T9() {
- // No-op.
- }
-
- /**
- * @param topic Topic.
- * @param id1 ID1.
- * @param id2 ID2.
- */
- private T9(GridTopic topic, int id1, int id2) {
- this.topic = topic;
- this.id1 = id1;
- this.id2 = id2;
- }
-
- /** {@inheritDoc} */
- @Override public int hashCode() {
- int res = topic.ordinal();
-
- res += 31 * res + id1;
- res += 31 * res + id2;
-
- return res;
- }
-
- /** {@inheritDoc} */
- @Override public boolean equals(Object obj) {
- if (obj.getClass() == T9.class) {
- T9 that = (T9)obj;
-
- return topic == that.topic && id1 == that.id1 && id2 == that.id2;
- }
-
- return false;
- }
-
- /** {@inheritDoc} */
- @Override public void writeExternal(ObjectOutput out) throws IOException {
- out.writeByte(topic.ordinal());
- out.writeInt(id1);
- out.writeInt(id2);
- }
-
- /** {@inheritDoc} */
- @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- topic = fromOrdinal(in.readByte());
- id1 = in.readInt();
- id2 = in.readInt();
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(T9.class, this);
- }
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2d16d99f/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index c1fb79a..765ba65 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -100,6 +100,9 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
private final ConcurrentMap<Object, ConcurrentMap<UUID, GridCommunicationMessageSet>> msgSetMap =
new ConcurrentHashMap8<>();
+ /** */
+ private final ConcurrentMap<Object, SequentialMessageSet> seqMsgs = new ConcurrentHashMap8<>();
+
/** Local node ID. */
private final UUID locNodeId;
@@ -576,6 +579,8 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
{
if (msg.isOrdered())
processOrderedMessage(nodeId, msg, plc, msgC);
+ else if (msg.isSequential())
+ processSequentialMessage(nodeId, msg, plc, msgC);
else
processRegularMessage(nodeId, msg, plc, msgC);
@@ -591,6 +596,8 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
if (msg.isOrdered())
processOrderedMessage(nodeId, msg, plc, msgC);
+ else if (msg.isSequential())
+ processSequentialMessage(nodeId, msg, plc, msgC);
else
processRegularMessage(nodeId, msg, plc, msgC);
}
@@ -963,6 +970,78 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
}
/**
+ * @param nodeId Node ID.
+ * @param msg Message.
+ * @param plc Execution policy.
+ * @param msgC Closure to call when message processing finished.
+ */
+ private void processSequentialMessage(
+ final UUID nodeId,
+ final GridIoMessage msg,
+ byte plc,
+ final IgniteRunnable msgC
+ ) throws IgniteCheckedException {
+ final GridMessageListener lsnr = lsnrMap.get(msg.topic());
+
+ if (lsnr == null) {
+ if (log.isDebugEnabled())
+ log.debug("Ignoring message because listener is not found: " + msg);
+
+ if (msgC != null)
+ msgC.run();
+
+ return;
+ }
+
+ SequentialMessageSet msgSet = seqMsgs.get(msg.topic());
+
+ if (msgSet == null) {
+ SequentialMessageSet old = seqMsgs.putIfAbsent(msg.topic(), msgSet = new SequentialMessageSet());
+
+ if (old != null)
+ msgSet = old;
+ }
+
+ msgSet.add(nodeId, msg, msgC);
+
+ if (msgC == null) {
+ assert locNodeId.equals(nodeId);
+
+ msgSet.unwind(lsnr);
+ }
+ else {
+ assert !locNodeId.equals(nodeId);
+
+ final SequentialMessageSet msgSet0 = msgSet;
+
+ Runnable c = new Runnable() {
+ @Override public void run() {
+ try {
+ threadProcessingMessage(true);
+
+ msgSet0.unwind(lsnr);
+ }
+ finally {
+ threadProcessingMessage(false);
+ }
+ }
+ };
+
+ try {
+ pool(plc).execute(c);
+ }
+ catch (RejectedExecutionException e) {
+ U.error(log, "Failed to process sequential message due to execution rejection. " +
+ "Increase the upper bound on executor service provided by corresponding " +
+ "configuration property. Will attempt to process message in the listener " +
+ "thread instead [msgPlc=" + plc + ']', e);
+
+ c.run();
+ }
+ }
+ }
+
+ /**
* @param node Destination node.
* @param topic Topic to send the message to.
* @param topicOrd GridTopic enumeration ordinal.
@@ -980,6 +1059,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
Message msg,
byte plc,
boolean ordered,
+ boolean seq,
long timeout,
boolean skipOnTimeout
) throws IgniteCheckedException {
@@ -987,7 +1067,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
assert topic != null;
assert msg != null;
- GridIoMessage ioMsg = new GridIoMessage(plc, topic, topicOrd, msg, ordered, timeout, skipOnTimeout);
+ GridIoMessage ioMsg = new GridIoMessage(plc, topic, topicOrd, msg, ordered, seq, timeout, skipOnTimeout);
if (locNodeId.equals(node.id())) {
assert plc != P2P_POOL;
@@ -999,6 +1079,8 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
if (ordered)
processOrderedMessage(locNodeId, ioMsg, plc, null);
+ else if (seq)
+ processSequentialMessage(locNodeId, ioMsg, plc, null);
else
processRegularMessage0(ioMsg, locNodeId);
}
@@ -1050,7 +1132,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
if (node == null)
throw new IgniteCheckedException("Failed to send message to node (has node left grid?): " + nodeId);
- send(node, topic, topic.ordinal(), msg, plc, false, 0, false);
+ send(node, topic, topic.ordinal(), msg, plc, false, false, 0, false);
}
/**
@@ -1062,7 +1144,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
*/
public void send(ClusterNode node, Object topic, Message msg, byte plc)
throws IgniteCheckedException {
- send(node, topic, -1, msg, plc, false, 0, false);
+ send(node, topic, -1, msg, plc, false, false, 0, false);
}
/**
@@ -1074,7 +1156,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
*/
public void send(ClusterNode node, GridTopic topic, Message msg, byte plc)
throws IgniteCheckedException {
- send(node, topic, topic.ordinal(), msg, plc, false, 0, false);
+ send(node, topic, topic.ordinal(), msg, plc, false, false, 0, false);
}
/**
@@ -1096,7 +1178,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
) throws IgniteCheckedException {
assert timeout > 0 || skipOnTimeout;
- send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout);
+ send(node, topic, (byte)-1, msg, plc, true, false, timeout, skipOnTimeout);
}
/**
@@ -1123,7 +1205,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
if (node == null)
throw new IgniteCheckedException("Failed to send message to node (has node left grid?): " + nodeId);
- send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout);
+ send(node, topic, (byte)-1, msg, plc, true, false, timeout, skipOnTimeout);
}
/**
@@ -1146,7 +1228,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
throws IgniteCheckedException {
assert timeout > 0 || skipOnTimeout;
- send(nodes, topic, -1, msg, plc, true, timeout, skipOnTimeout);
+ send(nodes, topic, -1, msg, plc, true, false, timeout, skipOnTimeout);
}
/**
@@ -1162,7 +1244,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
Message msg,
byte plc
) throws IgniteCheckedException {
- send(nodes, topic, -1, msg, plc, false, 0, false);
+ send(nodes, topic, -1, msg, plc, false, false, 0, false);
}
/**
@@ -1178,7 +1260,48 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
Message msg,
byte plc
) throws IgniteCheckedException {
- send(nodes, topic, topic.ordinal(), msg, plc, false, 0, false);
+ send(nodes, topic, topic.ordinal(), msg, plc, false, false, 0, false);
+ }
+
+ /**
+ * Sends sequential message.
+ *
+ * @param nodeId Destination node ID.
+ * @param topic Topic.
+ * @param msg Message.
+ * @param plc Policy.
+ * @throws IgniteCheckedException In case of error.
+ */
+ public void sendSequentialMessage(
+ UUID nodeId,
+ Object topic,
+ Message msg,
+ byte plc
+ ) throws IgniteCheckedException {
+ ClusterNode node = ctx.discovery().node(nodeId);
+
+ if (node == null)
+ throw new IgniteCheckedException("Failed to send message to node (has node left grid?): " + nodeId);
+
+ sendSequentialMessage(node, topic, msg, plc);
+ }
+
+ /**
+ * Sends sequential message.
+ *
+ * @param node Destination node.
+ * @param topic Topic.
+ * @param msg Message.
+ * @param plc Policy.
+ * @throws IgniteCheckedException In case of error.
+ */
+ public void sendSequentialMessage(
+ ClusterNode node,
+ Object topic,
+ Message msg,
+ byte plc
+ ) throws IgniteCheckedException {
+ send(node, topic, -1, msg, plc, false, true, 0, false);
}
/**
@@ -1307,6 +1430,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
* @param msg Message to send.
* @param plc Type of processing.
* @param ordered Ordered flag.
+ * @param seq Sequential message flag.
* @param timeout Message timeout.
* @param skipOnTimeout Whether message can be skipped in timeout.
* @throws IgniteCheckedException Thrown in case of any errors.
@@ -1318,6 +1442,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
Message msg,
byte plc,
boolean ordered,
+ boolean seq,
long timeout,
boolean skipOnTimeout
) throws IgniteCheckedException {
@@ -1334,7 +1459,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
// messages to one node vs. many.
if (!nodes.isEmpty()) {
for (ClusterNode node : nodes)
- send(node, topic, topicOrd, msg, plc, ordered, timeout, skipOnTimeout);
+ send(node, topic, topicOrd, msg, plc, ordered, seq, timeout, skipOnTimeout);
}
else if (log.isDebugEnabled())
log.debug("Failed to send message to empty nodes collection [topic=" + topic + ", msg=" +
@@ -2216,4 +2341,58 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
return S.toString(DelayedMessage.class, this, super.toString());
}
}
+
+ /**
+ */
+ private static class SequentialMessageSet {
+ /** */
+ private final Queue<GridTuple3<UUID, GridIoMessage, IgniteRunnable>> queue = new ConcurrentLinkedDeque8<>();
+
+ /** */
+ private final AtomicBoolean reserve = new AtomicBoolean();
+
+ /**
+ * @param nodeId Node ID.
+ * @param msg Message.
+ * @param msgC Closure to call when message processing finished.
+ */
+ void add(UUID nodeId, GridIoMessage msg, IgniteRunnable msgC) {
+ queue.add(F.t(nodeId, msg, msgC));
+ }
+
+ /**
+ * @param lsnr Message listener.
+ */
+ void unwind(GridMessageListener lsnr) {
+ assert lsnr != null;
+
+ while (true) {
+ if (reserve.compareAndSet(false, true)) {
+ try {
+ GridTuple3<UUID, GridIoMessage, IgniteRunnable> t;
+
+ while ((t = queue.poll()) != null) {
+ try {
+ lsnr.onMessage(t.get1(), t.get2().message());
+ }
+ finally {
+ IgniteRunnable msgC = t.get3();
+
+ if (msgC != null)
+ msgC.run();
+ }
+ }
+ }
+ finally {
+ reserve.set(false);
+ }
+
+ if (queue.isEmpty())
+ return;
+ }
+ else
+ return;
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2d16d99f/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java
index 6cf1ae5..d729f75 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java
@@ -49,6 +49,9 @@ public class GridIoMessage implements Message {
/** Message ordered flag. */
private boolean ordered;
+ /** Sequential message flag. */
+ private boolean seq;
+
/** Message timeout. */
private long timeout;
@@ -72,6 +75,7 @@ public class GridIoMessage implements Message {
* @param topicOrd Topic ordinal value.
* @param msg Message.
* @param ordered Message ordered flag.
+ * @param seq Sequential message flag.
* @param timeout Timeout.
* @param skipOnTimeout Whether message can be skipped on timeout.
*/
@@ -81,18 +85,21 @@ public class GridIoMessage implements Message {
int topicOrd,
Message msg,
boolean ordered,
+ boolean seq,
long timeout,
boolean skipOnTimeout
) {
assert topic != null;
assert topicOrd <= Byte.MAX_VALUE;
assert msg != null;
+ assert !ordered || !seq; // Message can't be ordered and sequential at the same time.
this.plc = plc;
this.msg = msg;
this.topic = topic;
this.topicOrd = topicOrd;
this.ordered = ordered;
+ this.seq = seq;
this.timeout = timeout;
this.skipOnTimeout = skipOnTimeout;
}
@@ -167,6 +174,13 @@ public class GridIoMessage implements Message {
return ordered;
}
+ /**
+ * @return Sequential message flag.
+ */
+ boolean isSequential() {
+ return seq;
+ }
+
/** {@inheritDoc} */
@Override public boolean equals(Object obj) {
throw new AssertionError();
@@ -208,24 +222,30 @@ public class GridIoMessage implements Message {
writer.incrementState();
case 3:
- if (!writer.writeBoolean("skipOnTimeout", skipOnTimeout))
+ if (!writer.writeBoolean("seq", seq))
return false;
writer.incrementState();
case 4:
- if (!writer.writeLong("timeout", timeout))
+ if (!writer.writeBoolean("skipOnTimeout", skipOnTimeout))
return false;
writer.incrementState();
case 5:
- if (!writer.writeByteArray("topicBytes", topicBytes))
+ if (!writer.writeLong("timeout", timeout))
return false;
writer.incrementState();
case 6:
+ if (!writer.writeByteArray("topicBytes", topicBytes))
+ return false;
+
+ writer.incrementState();
+
+ case 7:
if (!writer.writeInt("topicOrd", topicOrd))
return false;
@@ -261,19 +281,15 @@ public class GridIoMessage implements Message {
reader.incrementState();
case 2:
- byte plc0;
-
- plc0 = reader.readByte("plc");
+ plc = reader.readByte("plc");
if (!reader.isLastRead())
return false;
- plc = plc0;
-
reader.incrementState();
case 3:
- skipOnTimeout = reader.readBoolean("skipOnTimeout");
+ seq = reader.readBoolean("seq");
if (!reader.isLastRead())
return false;
@@ -281,7 +297,7 @@ public class GridIoMessage implements Message {
reader.incrementState();
case 4:
- timeout = reader.readLong("timeout");
+ skipOnTimeout = reader.readBoolean("skipOnTimeout");
if (!reader.isLastRead())
return false;
@@ -289,7 +305,7 @@ public class GridIoMessage implements Message {
reader.incrementState();
case 5:
- topicBytes = reader.readByteArray("topicBytes");
+ timeout = reader.readLong("timeout");
if (!reader.isLastRead())
return false;
@@ -297,6 +313,14 @@ public class GridIoMessage implements Message {
reader.incrementState();
case 6:
+ topicBytes = reader.readByteArray("topicBytes");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 7:
topicOrd = reader.readInt("topicOrd");
if (!reader.isLastRead())
@@ -316,7 +340,7 @@ public class GridIoMessage implements Message {
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 7;
+ return 8;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2d16d99f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index 5858424..490a5d8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -62,8 +62,8 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
private ConcurrentMap<ListenerKey, IgniteBiInClosure<UUID, GridCacheMessage>>
clsHandlers = new ConcurrentHashMap8<>();
- /** Ordered handler registry. */
- private ConcurrentMap<Object, IgniteBiInClosure<UUID, ? extends GridCacheMessage>> orderedHandlers =
+ /** Per topic handler registry. */
+ private ConcurrentMap<Object, IgniteBiInClosure<UUID, ? extends GridCacheMessage>> perTopicHandlers =
new ConcurrentHashMap8<>();
/** Stopping flag. */
@@ -173,7 +173,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
@Override protected void onKernalStop0(boolean cancel) {
cctx.gridIO().removeMessageListener(TOPIC_CACHE);
- for (Object ordTopic : orderedHandlers.keySet())
+ for (Object ordTopic : perTopicHandlers.keySet())
cctx.gridIO().removeMessageListener(ordTopic);
boolean interrupted = false;
@@ -394,7 +394,8 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(
ctx.cacheId(),
nodeId,
- req.futureVersion());
+ req.futureVersion(),
+ req.partition());
res.error(req.classError());
@@ -813,6 +814,64 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
}
/**
+ * @param nodeId Destination node ID.
+ * @param topic Topic to send the message to.
+ * @param msg Message to send.
+ * @param plc IO policy.
+ * @throws IgniteCheckedException Thrown in case of any errors.
+ */
+ public void sendSequentialMessage(UUID nodeId, Object topic, GridCacheMessage msg, byte plc)
+ throws IgniteCheckedException {
+ ClusterNode n = cctx.discovery().node(nodeId);
+
+ if (n == null)
+ throw new ClusterTopologyCheckedException("Failed to send message because node left grid [nodeId=" +
+ nodeId + ", msg=" + msg + ']');
+
+ sendSequentialMessage(n, topic, msg, plc);
+ }
+
+ /**
+ * @param node Destination node.
+ * @param topic Topic to send the message to.
+ * @param msg Message to send.
+ * @param plc IO policy.
+ * @throws IgniteCheckedException Thrown in case of any errors.
+ */
+ public void sendSequentialMessage(ClusterNode node, Object topic, GridCacheMessage msg, byte plc)
+ throws IgniteCheckedException {
+ onSend(msg, node.id());
+
+ int cnt = 0;
+
+ while (cnt <= retryCnt) {
+ try {
+ cnt++;
+
+ cctx.gridIO().sendSequentialMessage(node, topic, msg, plc);
+
+ if (log.isDebugEnabled())
+ log.debug("Sent sequential cache message [topic=" + topic + ", msg=" + msg +
+ ", nodeId=" + node.id() + ']');
+
+ return;
+ }
+ catch (IgniteCheckedException e) {
+ if (cctx.discovery().node(node.id()) == null)
+ throw new ClusterTopologyCheckedException("Node left grid while sending sequential message [" +
+ "nodeId=" + node.id() + ", msg=" + msg + ']', e);
+
+ if (cnt == retryCnt)
+ throw e;
+ else if (log.isDebugEnabled())
+ log.debug("Failed to send message to node (will retry): " + node.id());
+ }
+
+ U.sleep(retryDelay);
+ }
+ }
+
+ /**
* @return ID that auto-grows based on local counter and counters received
* from other nodes.
*/
@@ -940,39 +999,39 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
}
/**
- * Adds ordered message handler.
+ * Adds per topic message handler.
*
* @param topic Topic.
* @param c Handler.
*/
@SuppressWarnings({"unchecked"})
- public void addOrderedHandler(Object topic, IgniteBiInClosure<UUID, ? extends GridCacheMessage> c) {
- if (orderedHandlers.putIfAbsent(topic, c) == null) {
- cctx.gridIO().addMessageListener(topic, new OrderedMessageListener(
+ public void addPerTopicHandler(Object topic, IgniteBiInClosure<UUID, ? extends GridCacheMessage> c) {
+ if (perTopicHandlers.putIfAbsent(topic, c) == null) {
+ cctx.gridIO().addMessageListener(topic, new PerTopicMessageListener(
(IgniteBiInClosure<UUID, GridCacheMessage>)c));
if (log != null && log.isDebugEnabled())
- log.debug("Registered ordered cache communication handler [topic=" + topic + ", handler=" + c + ']');
+ log.debug("Registered per topic cache communication handler [topic=" + topic + ", handler=" + c + ']');
}
else if (log != null)
- U.warn(log, "Failed to register ordered cache communication handler because it is already " +
+ U.warn(log, "Failed to register per topic cache communication handler because it is already " +
"registered for this topic [topic=" + topic + ", handler=" + c + ']');
}
/**
- * Removed ordered message handler.
+ * Removed per topic message handler.
*
* @param topic Topic.
*/
- public void removeOrderedHandler(Object topic) {
- if (orderedHandlers.remove(topic) != null) {
+ public void removePerTopicHandler(Object topic) {
+ if (perTopicHandlers.remove(topic) != null) {
cctx.gridIO().removeMessageListener(topic);
if (log != null && log.isDebugEnabled())
- log.debug("Unregistered ordered cache communication handler for topic:" + topic);
+ log.debug("Unregistered per topic cache communication handler for topic:" + topic);
}
else if (log != null)
- U.warn(log, "Failed to unregister ordered cache communication handler because it was not found " +
+ U.warn(log, "Failed to unregister per topic cache communication handler because it was not found " +
"for topic: " + topic);
}
@@ -1019,20 +1078,20 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
X.println(">>> ");
X.println(">>> Cache IO manager memory stats [grid=" + cctx.gridName() + ']');
X.println(">>> clsHandlersSize: " + clsHandlers.size());
- X.println(">>> orderedHandlersSize: " + orderedHandlers.size());
+ X.println(">>> perTopicHandlersSize: " + perTopicHandlers.size());
}
/**
- * Ordered message listener.
+ * Per topic message listener.
*/
- private class OrderedMessageListener implements GridMessageListener {
+ private class PerTopicMessageListener implements GridMessageListener {
/** */
private final IgniteBiInClosure<UUID, GridCacheMessage> c;
/**
* @param c Handler closure.
*/
- OrderedMessageListener(IgniteBiInClosure<UUID, GridCacheMessage> c) {
+ PerTopicMessageListener(IgniteBiInClosure<UUID, GridCacheMessage> c) {
this.c = c;
}
@@ -1040,7 +1099,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
@SuppressWarnings({"CatchGenericClass", "unchecked"})
@Override public void onMessage(final UUID nodeId, Object msg) {
if (log.isDebugEnabled())
- log.debug("Received cache ordered message [nodeId=" + nodeId + ", msg=" + msg + ']');
+ log.debug("Received per topic cache message [nodeId=" + nodeId + ", msg=" + msg + ']');
final GridCacheMessage cacheMsg = (GridCacheMessage)msg;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2d16d99f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
index d82acca..a313e3d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
@@ -1733,15 +1733,4 @@ public class GridCacheUtils {
}
};
}
-
- /**
- * @param ctx Cache context.
- * @param part Partition.
- * @return Per-partition message topic.
- */
- public static Object partitionMessageTopic(GridCacheContext ctx, int part) {
- assert part >= 0;
-
- return TOPIC_CACHE.topic(ctx.cacheId(), part);
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2d16d99f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridAtomicMappingKey.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridAtomicMappingKey.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridAtomicMappingKey.java
new file mode 100644
index 0000000..52e3c7f
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridAtomicMappingKey.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
+
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+import java.util.*;
+
+/**
+ * Mapping Key.
+ */
+class GridAtomicMappingKey {
+ /** Node ID. */
+ private final UUID nodeId;
+
+ /** Partition. */
+ private final int part;
+
+ /**
+ * @param nodeId Node ID.
+ * @param part Partition.
+ */
+ GridAtomicMappingKey(UUID nodeId, int part) {
+ assert nodeId != null;
+ assert part >= -1 : part;
+
+ this.nodeId = nodeId;
+ this.part = part;
+ }
+
+ /**
+ * @return Node ID.
+ */
+ UUID nodeId() {
+ return nodeId;
+ }
+
+ /**
+ * @return Partition.
+ */
+ int partition() {
+ return part;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ GridAtomicMappingKey key = (GridAtomicMappingKey)o;
+
+ return nodeId.equals(key.nodeId) && part == key.part;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ int res = nodeId.hashCode();
+
+ res = 31 * res + part;
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridAtomicMappingKey.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2d16d99f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridAtomicRequestTopic.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridAtomicRequestTopic.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridAtomicRequestTopic.java
new file mode 100644
index 0000000..9feb409
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridAtomicRequestTopic.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
+
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+import java.io.*;
+
+/**
+ */
+class GridAtomicRequestTopic implements Externalizable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private int cacheId;
+
+ /** */
+ private int part;
+
+ /** */
+ private boolean near;
+
+ /**
+ * For {@link Externalizable}.
+ */
+ public GridAtomicRequestTopic() {
+ // No-op.
+ }
+
+ /**
+ * @param cacheId Cache ID.
+ * @param part Partition.
+ * @param near Near flag.
+ */
+ GridAtomicRequestTopic(int cacheId, int part, boolean near) {
+ this.cacheId = cacheId;
+ this.part = part;
+ this.near = near;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ GridAtomicRequestTopic topic = (GridAtomicRequestTopic)o;
+
+ return cacheId == topic.cacheId && part == topic.part && near == topic.near;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ int res = cacheId;
+
+ res = 31 * res + part;
+ res = 31 * res + (near ? 1 : 0);
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ out.writeInt(cacheId);
+ out.writeInt(part);
+ out.writeBoolean(near);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ cacheId = in.readInt();
+ part = in.readInt();
+ near = in.readBoolean();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridAtomicRequestTopic.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2d16d99f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index be35d00..a010baa 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -181,15 +181,19 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
}
});
- ctx.io().addHandler(ctx.cacheId(), GridNearAtomicUpdateRequest.class, new CI2<UUID, GridNearAtomicUpdateRequest>() {
- @Override public void apply(UUID nodeId, GridNearAtomicUpdateRequest req) {
- processNearAtomicUpdateRequest(nodeId, req);
- }
- });
-
if (ctx.config().isAtomicOrderedUpdates()) {
for (int part = 0; part < ctx.affinity().partitions(); part++) {
- ctx.io().addOrderedHandler(CU.partitionMessageTopic(ctx, part), new CI2<UUID, GridDhtAtomicUpdateRequest>() {
+ Object nearTopic = new GridAtomicRequestTopic(ctx.cacheId(), part, true);
+
+ ctx.io().addPerTopicHandler(nearTopic, new CI2<UUID, GridNearAtomicUpdateRequest>() {
+ @Override public void apply(UUID nodeId, GridNearAtomicUpdateRequest req) {
+ processNearAtomicUpdateRequest(nodeId, req);
+ }
+ });
+
+ Object dhtTopic = new GridAtomicRequestTopic(ctx.cacheId(), part, false);
+
+ ctx.io().addPerTopicHandler(dhtTopic, new CI2<UUID, GridDhtAtomicUpdateRequest>() {
@Override public void apply(UUID nodeId, GridDhtAtomicUpdateRequest req) {
processDhtAtomicUpdateRequest(nodeId, req);
}
@@ -197,6 +201,12 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
}
}
else {
+ ctx.io().addHandler(ctx.cacheId(), GridNearAtomicUpdateRequest.class, new CI2<UUID, GridNearAtomicUpdateRequest>() {
+ @Override public void apply(UUID nodeId, GridNearAtomicUpdateRequest req) {
+ processNearAtomicUpdateRequest(nodeId, req);
+ }
+ });
+
ctx.io().addHandler(ctx.cacheId(), GridDhtAtomicUpdateRequest.class, new CI2<UUID, GridDhtAtomicUpdateRequest>() {
@Override public void apply(UUID nodeId, GridDhtAtomicUpdateRequest req) {
processDhtAtomicUpdateRequest(nodeId, req);
@@ -238,8 +248,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
buf.finish();
if (ctx.config().isAtomicOrderedUpdates()) {
- for (int part = 0; part < ctx.affinity().partitions(); part++)
- ctx.io().removeOrderedHandler(CU.partitionMessageTopic(ctx, part));
+ for (int part = 0; part < ctx.affinity().partitions(); part++) {
+ ctx.io().removePerTopicHandler(new GridAtomicRequestTopic(ctx.cacheId(), part, true));
+ ctx.io().removePerTopicHandler(new GridAtomicRequestTopic(ctx.cacheId(), part, false));
+ }
}
}
@@ -1033,7 +1045,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
) {
GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(ctx.cacheId(),
nodeId,
- req.futureVersion());
+ req.futureVersion(),
+ req.partition());
List<KeyCacheObject> keys = req.keys();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2d16d99f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
index 93c20da..c05f4c6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
@@ -73,7 +73,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
/** Mappings. */
@GridToStringInclude
- private ConcurrentMap<MappingKey, GridDhtAtomicUpdateRequest> mappings = new ConcurrentHashMap8<>();
+ private ConcurrentMap<GridAtomicMappingKey, GridDhtAtomicUpdateRequest> mappings = new ConcurrentHashMap8<>();
/** Entries with readers. */
private Map<KeyCacheObject, GridDhtCacheEntry> nearReadersEntries;
@@ -142,8 +142,8 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
/** {@inheritDoc} */
@Override public Collection<? extends ClusterNode> nodes() {
- return F.view(F.viewReadOnly(mappings.keySet(), new C1<MappingKey, ClusterNode>() {
- @Override public ClusterNode apply(MappingKey mappingKey) {
+ return F.view(F.viewReadOnly(mappings.keySet(), new C1<GridAtomicMappingKey, ClusterNode>() {
+ @Override public ClusterNode apply(GridAtomicMappingKey mappingKey) {
return cctx.kernalContext().discovery().node(mappingKey.nodeId());
}
}), F.notNull());
@@ -154,15 +154,15 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
if (log.isDebugEnabled())
log.debug("Processing node leave event [fut=" + this + ", nodeId=" + nodeId + ']');
- Collection<MappingKey> mappingKeys = new ArrayList<>(mappings.size());
+ Collection<GridAtomicMappingKey> mappingKeys = new ArrayList<>(mappings.size());
- for (MappingKey mappingKey : mappings.keySet()) {
+ for (GridAtomicMappingKey mappingKey : mappings.keySet()) {
if (mappingKey.nodeId().equals(nodeId))
mappingKeys.add(mappingKey);
}
if (!mappingKeys.isEmpty()) {
- for (MappingKey mappingKey : mappingKeys)
+ for (GridAtomicMappingKey mappingKey : mappingKeys)
mappings.remove(mappingKey);
checkComplete();
@@ -234,7 +234,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
for (ClusterNode node : dhtNodes) {
UUID nodeId = node.id();
- MappingKey mappingKey = new MappingKey(nodeId, part);
+ GridAtomicMappingKey mappingKey = new GridAtomicMappingKey(nodeId, part);
if (!nodeId.equals(cctx.localNodeId())) {
GridDhtAtomicUpdateRequest updateReq = mappings.get(mappingKey);
@@ -287,7 +287,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
int part = cctx.config().isAtomicOrderedUpdates() ? entry.partition() : -1;
for (UUID nodeId : readers) {
- MappingKey mappingKey = new MappingKey(nodeId, part);
+ GridAtomicMappingKey mappingKey = new GridAtomicMappingKey(nodeId, part);
GridDhtAtomicUpdateRequest updateReq = mappings.get(mappingKey);
@@ -345,8 +345,8 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
*/
public void map() {
if (!mappings.isEmpty()) {
- for (Map.Entry<MappingKey, GridDhtAtomicUpdateRequest> e : mappings.entrySet()) {
- MappingKey mappingKey = e.getKey();
+ for (Map.Entry<GridAtomicMappingKey, GridDhtAtomicUpdateRequest> e : mappings.entrySet()) {
+ GridAtomicMappingKey mappingKey = e.getKey();
GridDhtAtomicUpdateRequest req = e.getValue();
UUID nodeId = mappingKey.nodeId();
@@ -359,7 +359,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
log.debug("Sending DHT atomic update request [nodeId=" + nodeId + ", req=" + req + ']');
if (part >= 0) {
- Object topic = CU.partitionMessageTopic(cctx, part);
+ Object topic = new GridAtomicRequestTopic(cctx.cacheId(), part, false);
cctx.io().sendOrderedMessage(nodeId, topic, req, cctx.ioPolicy(),
2 * cctx.gridConfig().getNetworkTimeout());
@@ -429,7 +429,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
}
}
- mappings.remove(new MappingKey(nodeId, updateRes.partition()));
+ mappings.remove(new GridAtomicMappingKey(nodeId, updateRes.partition()));
checkComplete();
}
@@ -445,7 +445,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
log.debug("Received deferred DHT atomic update future result [nodeId=" + nodeId + ']');
for (Integer part : res.partitions())
- mappings.remove(new MappingKey(nodeId, part));
+ mappings.remove(new GridAtomicMappingKey(nodeId, part));
checkComplete();
}
@@ -468,67 +468,4 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
return S.toString(GridDhtAtomicUpdateFuture.class, this);
}
- /**
- * Mapping Key.
- */
- private static class MappingKey {
- /** Node ID. */
- private final UUID nodeId;
-
- /** Partition. */
- private final int part;
-
- /**
- * @param nodeId Node ID.
- * @param part Partition.
- */
- MappingKey(UUID nodeId, int part) {
- assert nodeId != null;
- assert part >= -1 : part;
-
- this.nodeId = nodeId;
- this.part = part;
- }
-
- /**
- * @return Node ID.
- */
- UUID nodeId() {
- return nodeId;
- }
-
- /**
- * @return Partition.
- */
- int partition() {
- return part;
- }
-
- /** {@inheritDoc} */
- @Override public boolean equals(Object o) {
- if (this == o)
- return true;
-
- if (o == null || getClass() != o.getClass())
- return false;
-
- MappingKey key = (MappingKey)o;
-
- return nodeId.equals(key.nodeId) && part == key.part;
- }
-
- /** {@inheritDoc} */
- @Override public int hashCode() {
- int res = nodeId.hashCode();
-
- res = 31 * res + part;
-
- return res;
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(MappingKey.class, this);
- }
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2d16d99f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index 4c8a161..4642b1a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -90,7 +90,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
/** Mappings. */
@GridToStringInclude
- private ConcurrentMap<UUID, GridNearAtomicUpdateRequest> mappings;
+ private ConcurrentMap<GridAtomicMappingKey, GridNearAtomicUpdateRequest> mappings;
/** Error. */
private volatile CachePartialUpdateCheckedException err;
@@ -246,7 +246,11 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
/** {@inheritDoc} */
@Override public Collection<? extends ClusterNode> nodes() {
- return F.view(F.viewReadOnly(mappings.keySet(), U.id2Node(cctx.kernalContext())), F.notNull());
+ return F.view(F.viewReadOnly(mappings.keySet(), new C1<GridAtomicMappingKey, ClusterNode>() {
+ @Override public ClusterNode apply(GridAtomicMappingKey mappingKey) {
+ return cctx.kernalContext().discovery().node(mappingKey.nodeId());
+ }
+ }), F.notNull());
}
/**
@@ -283,13 +287,24 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
return false;
}
- GridNearAtomicUpdateRequest req = mappings.get(nodeId);
+ Collection<GridAtomicMappingKey> mappingKeys = new ArrayList<>(mappings.size());
+ Collection<KeyCacheObject> failedKeys = new ArrayList<>();
+
+ for (Map.Entry<GridAtomicMappingKey, GridNearAtomicUpdateRequest> e : mappings.entrySet()) {
+ if (e.getKey().nodeId().equals(nodeId)) {
+ mappingKeys.add(e.getKey());
+
+ failedKeys.addAll(e.getValue().keys());
+ }
+ }
- if (req != null) {
- addFailedKeys(req.keys(), new ClusterTopologyCheckedException("Primary node left grid before response is " +
- "received: " + nodeId));
+ if (!mappingKeys.isEmpty()) {
+ if (!failedKeys.isEmpty())
+ addFailedKeys(failedKeys, new ClusterTopologyCheckedException("Primary node left grid before " +
+ "response is received: " + nodeId));
- mappings.remove(nodeId);
+ for (GridAtomicMappingKey key : mappingKeys)
+ mappings.remove(key);
checkComplete();
@@ -529,7 +544,9 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
}
}
else {
- GridNearAtomicUpdateRequest req = mappings.get(nodeId);
+ GridAtomicMappingKey mappingKey = new GridAtomicMappingKey(nodeId, res.partition());
+
+ GridNearAtomicUpdateRequest req = mappings.get(mappingKey);
if (req != null) { // req can be null if onResult is being processed concurrently with onNodeLeft.
updateNear(req, res);
@@ -547,7 +564,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
opRes = ret;
}
- mappings.remove(nodeId);
+ mappings.remove(mappingKey);
}
checkComplete();
@@ -763,7 +780,11 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
if (op != TRANSFORM)
val = cctx.toCacheObject(val);
- ClusterNode primary = cctx.affinity().primary(cacheKey, topVer);
+ int part = cctx.affinity().partition(cacheKey);
+ ClusterNode primary = cctx.affinity().primary(part, topVer);
+
+ if (!ccfg.isAtomicOrderedUpdates())
+ part = -1;
if (primary == null) {
onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all partition nodes " +
@@ -789,7 +810,8 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
subjId,
taskNameHash,
skipStore,
- cctx.kernalContext().clientNode());
+ cctx.kernalContext().clientNode(),
+ part);
req.addUpdateEntry(cacheKey,
val,
@@ -805,7 +827,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
}
// Optimize mapping for single key.
- mapSingle(primary.id(), req);
+ mapSingle(new GridAtomicMappingKey(primary.id(), part), req);
return;
}
@@ -825,13 +847,18 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
if (conflictRmvVals != null)
conflictRmvValsIt = conflictRmvVals.iterator();
- Map<UUID, GridNearAtomicUpdateRequest> pendingMappings = new HashMap<>(topNodes.size(), 1.0f);
+ Map<GridAtomicMappingKey, GridNearAtomicUpdateRequest> pendingMappings = new HashMap<>(topNodes.size(), 1.0f);
// Must do this in synchronized block because we need to atomically remove and add mapping.
// Otherwise checkComplete() may see empty intermediate state.
synchronized (this) {
- if (oldNodeId != null)
- removeMapping(oldNodeId);
+ if (oldNodeId != null) {
+ // TODO: IGNITE-104 - Try to avoid iteration.
+ for (Map.Entry<GridAtomicMappingKey, GridNearAtomicUpdateRequest> e : mappings.entrySet()) {
+ if (e.getKey().nodeId().equals(oldNodeId))
+ mappings.remove(e.getKey());
+ }
+ }
// For fastMap mode wait for all responses before remapping.
if (remap && fastMap && !mappings.isEmpty()) {
@@ -901,7 +928,10 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
if (op != TRANSFORM)
val = cctx.toCacheObject(val);
- Collection<ClusterNode> affNodes = mapKey(cacheKey, topVer, fastMap);
+ T2<Integer, Collection<ClusterNode>> t = mapKey(cacheKey, topVer, fastMap);
+
+ int part = ccfg.isAtomicOrderedUpdates() ? t.get1() : -1;
+ Collection<ClusterNode> affNodes = t.get2();
if (affNodes.isEmpty()) {
onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
@@ -922,7 +952,9 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
UUID nodeId = affNode.id();
- GridNearAtomicUpdateRequest mapped = pendingMappings.get(nodeId);
+ GridAtomicMappingKey mappingKey = new GridAtomicMappingKey(nodeId, part);
+
+ GridNearAtomicUpdateRequest mapped = pendingMappings.get(mappingKey);
if (mapped == null) {
mapped = new GridNearAtomicUpdateRequest(
@@ -942,11 +974,12 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
subjId,
taskNameHash,
skipStore,
- cctx.kernalContext().clientNode());
+ cctx.kernalContext().clientNode(),
+ part);
- pendingMappings.put(nodeId, mapped);
+ pendingMappings.put(mappingKey, mapped);
- GridNearAtomicUpdateRequest old = mappings.put(nodeId, mapped);
+ GridNearAtomicUpdateRequest old = mappings.put(mappingKey, mapped);
assert old == null || (old != null && remap) :
"Invalid mapping state [old=" + old + ", remap=" + remap + ']';
@@ -964,7 +997,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
}
if ((single == null || single) && pendingMappings.size() == 1) {
- Map.Entry<UUID, GridNearAtomicUpdateRequest> entry = F.first(pendingMappings.entrySet());
+ Map.Entry<GridAtomicMappingKey, GridNearAtomicUpdateRequest> entry = F.first(pendingMappings.entrySet());
single = true;
@@ -987,31 +1020,35 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
* @param fastMap Flag indicating whether mapping is performed for fast-circuit update.
* @return Collection of nodes to which key is mapped.
*/
- private Collection<ClusterNode> mapKey(
+ private T2<Integer, Collection<ClusterNode>> mapKey(
KeyCacheObject key,
AffinityTopologyVersion topVer,
boolean fastMap
) {
GridCacheAffinityManager affMgr = cctx.affinity();
+ int part = affMgr.partition(key);
+
// If we can send updates in parallel - do it.
- return fastMap ?
- cctx.topology().nodes(affMgr.partition(key), topVer) :
- Collections.singletonList(affMgr.primary(key, topVer));
+ Collection<ClusterNode> nodes = fastMap ?
+ cctx.topology().nodes(part, topVer) :
+ Collections.singletonList(affMgr.primary(part, topVer));
+
+ return new T2<>(part, nodes);
}
/**
* Maps future to single node.
*
- * @param nodeId Node ID.
+ * @param mappingKey Mapping key.
* @param req Request.
*/
- private void mapSingle(UUID nodeId, GridNearAtomicUpdateRequest req) {
- singleNodeId = nodeId;
+ private void mapSingle(GridAtomicMappingKey mappingKey, GridNearAtomicUpdateRequest req) {
+ singleNodeId = mappingKey.nodeId();
singleReq = req;
- if (cctx.localNodeId().equals(nodeId)) {
- cache.updateAllAsyncInternal(nodeId, req,
+ if (cctx.localNodeId().equals(mappingKey.nodeId())) {
+ cache.updateAllAsyncInternal(mappingKey.nodeId(), req,
new CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse>() {
@Override public void apply(GridNearAtomicUpdateRequest req,
GridNearAtomicUpdateResponse res) {
@@ -1026,7 +1063,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
if (log.isDebugEnabled())
log.debug("Sending near atomic update request [nodeId=" + req.nodeId() + ", req=" + req + ']');
- cctx.io().send(req.nodeId(), req, cctx.ioPolicy());
+ sendRequest(mappingKey, req);
if (syncMode == FULL_ASYNC && cctx.config().getAtomicWriteOrderMode() == PRIMARY)
onDone(new GridCacheReturn(cctx, true, null, true));
@@ -1042,34 +1079,37 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
*
* @param mappings Mappings to send.
*/
- private void doUpdate(Map<UUID, GridNearAtomicUpdateRequest> mappings) {
+ private void doUpdate(Map<GridAtomicMappingKey, GridNearAtomicUpdateRequest> mappings) {
UUID locNodeId = cctx.localNodeId();
- GridNearAtomicUpdateRequest locUpdate = null;
+ Collection<GridNearAtomicUpdateRequest> locUpdates = null;
// Send messages to remote nodes first, then run local update.
- for (GridNearAtomicUpdateRequest req : mappings.values()) {
+ for (Map.Entry<GridAtomicMappingKey, GridNearAtomicUpdateRequest> e : mappings.entrySet()) {
+ GridAtomicMappingKey mappingKey = e.getKey();
+ GridNearAtomicUpdateRequest req = e.getValue();
+
if (locNodeId.equals(req.nodeId())) {
- assert locUpdate == null : "Cannot have more than one local mapping [locUpdate=" + locUpdate +
- ", req=" + req + ']';
+ if (locUpdates == null)
+ locUpdates = new ArrayList<>(mappings.size());
- locUpdate = req;
+ locUpdates.add(req);
}
else {
try {
if (log.isDebugEnabled())
log.debug("Sending near atomic update request [nodeId=" + req.nodeId() + ", req=" + req + ']');
- cctx.io().send(req.nodeId(), req, cctx.ioPolicy());
+ sendRequest(mappingKey, req);
}
- catch (IgniteCheckedException e) {
- addFailedKeys(req.keys(), e);
+ catch (IgniteCheckedException ex) {
+ addFailedKeys(req.keys(), ex);
- removeMapping(req.nodeId());
+ removeMapping(mappingKey);
}
if (syncMode == PRIMARY_SYNC && !req.hasPrimary())
- removeMapping(req.nodeId());
+ removeMapping(mappingKey);
}
}
@@ -1077,28 +1117,52 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
// In FULL_ASYNC mode always return (null, true).
opRes = new GridCacheReturn(cctx, true, null, true);
- if (locUpdate != null) {
- cache.updateAllAsyncInternal(cctx.localNodeId(), locUpdate,
- new CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse>() {
- @Override public void apply(GridNearAtomicUpdateRequest req,
- GridNearAtomicUpdateResponse res) {
- assert res.futureVersion().equals(futVer) : futVer;
+ if (locUpdates != null) {
+ for (GridNearAtomicUpdateRequest locUpdate : locUpdates) {
+ cache.updateAllAsyncInternal(cctx.localNodeId(), locUpdate,
+ new CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse>() {
+ @Override public void apply(GridNearAtomicUpdateRequest req,
+ GridNearAtomicUpdateResponse res) {
+ assert res.futureVersion().equals(futVer) : futVer;
- onResult(res.nodeId(), res);
- }
- });
+ onResult(res.nodeId(), res);
+ }
+ });
+ }
}
checkComplete();
}
/**
+ * Sends request.
+ *
+ * @param mappingKey Mapping key.
+ * @param req Update request.
+ * @throws IgniteCheckedException In case of error.
+ */
+ private void sendRequest(GridAtomicMappingKey mappingKey, GridNearAtomicUpdateRequest req)
+ throws IgniteCheckedException {
+ if (mappingKey.partition() >= 0) {
+ Object topic = new GridAtomicRequestTopic(cctx.cacheId(), mappingKey.partition(), true);
+
+ cctx.io().sendOrderedMessage(mappingKey.nodeId(), topic, req, cctx.ioPolicy(),
+ 2 * cctx.gridConfig().getNetworkTimeout());
+ }
+ else {
+ assert mappingKey.partition() == -1;
+
+ cctx.io().send(req.nodeId(), req, cctx.ioPolicy());
+ }
+ }
+
+ /**
* Removes mapping from future mappings map.
*
- * @param nodeId Node ID to remove mapping for.
+ * @param mappingKey Mapping key.
*/
- private void removeMapping(UUID nodeId) {
- mappings.remove(nodeId);
+ private void removeMapping(GridAtomicMappingKey mappingKey) {
+ mappings.remove(mappingKey);
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2d16d99f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
index 86c5ab8..b3075c4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
@@ -135,6 +135,9 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
/** */
private boolean clientReq;
+ /** Partition. */
+ private int part;
+
/**
* Empty constructor required by {@link Externalizable}.
*/
@@ -162,6 +165,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
* @param taskNameHash Task name hash code.
* @param skipStore Skip write-through to a persistent storage.
* @param clientReq Client node request flag.
+ * @param part Partition.
*/
public GridNearAtomicUpdateRequest(
int cacheId,
@@ -180,7 +184,8 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
@Nullable UUID subjId,
int taskNameHash,
boolean skipStore,
- boolean clientReq
+ boolean clientReq,
+ int part
) {
this.cacheId = cacheId;
this.nodeId = nodeId;
@@ -200,6 +205,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
this.taskNameHash = taskNameHash;
this.skipStore = skipStore;
this.clientReq = clientReq;
+ this.part = part;
keys = new ArrayList<>();
}
@@ -315,6 +321,13 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
}
/**
+ * @return Partition.
+ */
+ public int partition() {
+ return part;
+ }
+
+ /**
* @param key Key to add.
* @param val Optional update value.
* @param conflictTtl Conflict TTL (optional).
@@ -666,54 +679,60 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
writer.incrementState();
case 16:
- if (!writer.writeBoolean("retval", retval))
+ if (!writer.writeInt("part", part))
return false;
writer.incrementState();
case 17:
- if (!writer.writeBoolean("skipStore", skipStore))
+ if (!writer.writeBoolean("retval", retval))
return false;
writer.incrementState();
case 18:
- if (!writer.writeUuid("subjId", subjId))
+ if (!writer.writeBoolean("skipStore", skipStore))
return false;
writer.incrementState();
case 19:
- if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1))
+ if (!writer.writeUuid("subjId", subjId))
return false;
writer.incrementState();
case 20:
- if (!writer.writeInt("taskNameHash", taskNameHash))
+ if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1))
return false;
writer.incrementState();
case 21:
- if (!writer.writeBoolean("topLocked", topLocked))
+ if (!writer.writeInt("taskNameHash", taskNameHash))
return false;
writer.incrementState();
case 22:
- if (!writer.writeMessage("topVer", topVer))
+ if (!writer.writeBoolean("topLocked", topLocked))
return false;
writer.incrementState();
case 23:
- if (!writer.writeMessage("updateVer", updateVer))
+ if (!writer.writeMessage("topVer", topVer))
return false;
writer.incrementState();
case 24:
+ if (!writer.writeMessage("updateVer", updateVer))
+ return false;
+
+ writer.incrementState();
+
+ case 25:
if (!writer.writeCollection("vals", vals, MessageCollectionItemType.MSG))
return false;
@@ -844,7 +863,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
reader.incrementState();
case 16:
- retval = reader.readBoolean("retval");
+ part = reader.readInt("part");
if (!reader.isLastRead())
return false;
@@ -852,7 +871,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
reader.incrementState();
case 17:
- skipStore = reader.readBoolean("skipStore");
+ retval = reader.readBoolean("retval");
if (!reader.isLastRead())
return false;
@@ -860,7 +879,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
reader.incrementState();
case 18:
- subjId = reader.readUuid("subjId");
+ skipStore = reader.readBoolean("skipStore");
if (!reader.isLastRead())
return false;
@@ -868,6 +887,14 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
reader.incrementState();
case 19:
+ subjId = reader.readUuid("subjId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 20:
byte syncModeOrd;
syncModeOrd = reader.readByte("syncMode");
@@ -879,7 +906,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
reader.incrementState();
- case 20:
+ case 21:
taskNameHash = reader.readInt("taskNameHash");
if (!reader.isLastRead())
@@ -887,7 +914,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
reader.incrementState();
- case 21:
+ case 22:
topLocked = reader.readBoolean("topLocked");
if (!reader.isLastRead())
@@ -895,7 +922,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
reader.incrementState();
- case 22:
+ case 23:
topVer = reader.readMessage("topVer");
if (!reader.isLastRead())
@@ -903,7 +930,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
reader.incrementState();
- case 23:
+ case 24:
updateVer = reader.readMessage("updateVer");
if (!reader.isLastRead())
@@ -911,7 +938,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
reader.incrementState();
- case 24:
+ case 25:
vals = reader.readCollection("vals", MessageCollectionItemType.MSG);
if (!reader.isLastRead())
@@ -931,7 +958,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 25;
+ return 26;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2d16d99f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
index 8e1bee2..e2d33d5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
@@ -92,6 +92,9 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
/** Near expire times. */
private GridLongList nearExpireTimes;
+ /** Partition. */
+ private int part;
+
/**
* Empty constructor required by {@link Externalizable}.
*/
@@ -103,11 +106,13 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
* @param cacheId Cache ID.
* @param nodeId Node ID this reply should be sent to.
* @param futVer Future version.
+ * @param part Partition.
*/
- public GridNearAtomicUpdateResponse(int cacheId, UUID nodeId, GridCacheVersion futVer) {
+ public GridNearAtomicUpdateResponse(int cacheId, UUID nodeId, GridCacheVersion futVer, int part) {
this.cacheId = cacheId;
this.nodeId = nodeId;
this.futVer = futVer;
+ this.part = part;
}
/** {@inheritDoc} */
@@ -138,7 +143,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
/**
* Sets update error.
- * @param err Exception.
+ * @param err
*/
public void error(IgniteCheckedException err){
this.err = err;
@@ -188,6 +193,13 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
}
/**
+ * @return Partition.
+ */
+ public int partition() {
+ return part;
+ }
+
+ /**
* Adds value to be put in near cache on originating node.
*
* @param keyIdx Key index.
@@ -485,12 +497,18 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
writer.incrementState();
case 12:
- if (!writer.writeCollection("remapKeys", remapKeys, MessageCollectionItemType.MSG))
+ if (!writer.writeInt("part", part))
return false;
writer.incrementState();
case 13:
+ if (!writer.writeCollection("remapKeys", remapKeys, MessageCollectionItemType.MSG))
+ return false;
+
+ writer.incrementState();
+
+ case 14:
if (!writer.writeMessage("ret", ret))
return false;
@@ -585,7 +603,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
reader.incrementState();
case 12:
- remapKeys = reader.readCollection("remapKeys", MessageCollectionItemType.MSG);
+ part = reader.readInt("part");
if (!reader.isLastRead())
return false;
@@ -593,6 +611,14 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
reader.incrementState();
case 13:
+ remapKeys = reader.readCollection("remapKeys", MessageCollectionItemType.MSG);
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 14:
ret = reader.readMessage("ret");
if (!reader.isLastRead())
@@ -612,7 +638,7 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 14;
+ return 15;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2d16d99f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
index a6e6c4d..37824eb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemandPool.java
@@ -599,7 +599,7 @@ public class GridDhtPartitionDemandPool {
if (isCancelled() || topologyChanged())
return missed;
- cctx.io().addOrderedHandler(d.topic(), new CI2<UUID, GridDhtPartitionSupplyMessage>() {
+ cctx.io().addPerTopicHandler(d.topic(), new CI2<UUID, GridDhtPartitionSupplyMessage>() {
@Override public void apply(UUID nodeId, GridDhtPartitionSupplyMessage msg) {
addMessage(new SupplyMessage(nodeId, msg));
}
@@ -641,7 +641,7 @@ public class GridDhtPartitionDemandPool {
growTimeout(timeout);
// Ordered listener was removed if timeout expired.
- cctx.io().removeOrderedHandler(d.topic());
+ cctx.io().removePerTopicHandler(d.topic());
// Must create copy to be able to work with IO manager thread local caches.
d = new GridDhtPartitionDemandMessage(d, remaining);
@@ -650,13 +650,12 @@ public class GridDhtPartitionDemandPool {
d.topic(topic(++cntr));
// Create new ordered listener.
- cctx.io().addOrderedHandler(d.topic(),
- new CI2<UUID, GridDhtPartitionSupplyMessage>() {
- @Override public void apply(UUID nodeId,
- GridDhtPartitionSupplyMessage msg) {
+ cctx.io().addPerTopicHandler(d.topic(), new CI2<UUID, GridDhtPartitionSupplyMessage>() {
+ @Override public void apply(UUID nodeId, GridDhtPartitionSupplyMessage msg) {
addMessage(new SupplyMessage(nodeId, msg));
}
- });
+ }
+ );
// Resend message with larger timeout.
retry = true;
@@ -800,7 +799,7 @@ public class GridDhtPartitionDemandPool {
return missed;
}
finally {
- cctx.io().removeOrderedHandler(d.topic());
+ cctx.io().removePerTopicHandler(d.topic());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2d16d99f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
index 316713f..a530a5d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryManager.java
@@ -556,11 +556,11 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage
final Object topic = topic(cctx.nodeId(), req.id());
- cctx.io().addOrderedHandler(topic, resHnd);
+ cctx.io().addPerTopicHandler(topic, resHnd);
fut.listen(new CI1<IgniteInternalFuture<?>>() {
@Override public void apply(IgniteInternalFuture<?> fut) {
- cctx.io().removeOrderedHandler(topic);
+ cctx.io().removePerTopicHandler(topic);
}
});
@@ -665,11 +665,11 @@ public class GridCacheDistributedQueryManager<K, V> extends GridCacheQueryManage
final Object topic = topic(cctx.nodeId(), req.id());
- cctx.io().addOrderedHandler(topic, resHnd);
+ cctx.io().addPerTopicHandler(topic, resHnd);
fut.listen(new CI1<IgniteInternalFuture<?>>() {
@Override public void apply(IgniteInternalFuture<?> fut) {
- cctx.io().removeOrderedHandler(topic);
+ cctx.io().removePerTopicHandler(topic);
}
});
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2d16d99f/modules/core/src/main/resources/META-INF/classnames.properties
----------------------------------------------------------------------
diff --git a/modules/core/src/main/resources/META-INF/classnames.properties b/modules/core/src/main/resources/META-INF/classnames.properties
index df4873a..3ec1d07 100644
--- a/modules/core/src/main/resources/META-INF/classnames.properties
+++ b/modules/core/src/main/resources/META-INF/classnames.properties
@@ -529,6 +529,7 @@ org.apache.ignite.internal.processors.cache.distributed.dht.GridPartitionedGetFu
org.apache.ignite.internal.processors.cache.distributed.dht.GridPartitionedGetFuture$MiniFuture$1
org.apache.ignite.internal.processors.cache.distributed.dht.GridPartitionedGetFuture$MiniFuture$2
org.apache.ignite.internal.processors.cache.distributed.dht.GridPartitionedGetFuture$MiniFuture$2$1
+org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridAtomicRequestTopic
org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache
org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache$10
org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache$11
@@ -806,7 +807,6 @@ org.apache.ignite.internal.processors.closure.GridClosureProcessor$T5
org.apache.ignite.internal.processors.closure.GridClosureProcessor$T6
org.apache.ignite.internal.processors.closure.GridClosureProcessor$T7
org.apache.ignite.internal.processors.closure.GridClosureProcessor$T8
-org.apache.ignite.internal.processors.closure.GridClosureProcessor$T9
org.apache.ignite.internal.processors.closure.GridClosureProcessor$TaskNoReduceAdapter
org.apache.ignite.internal.processors.closure.GridPeerDeployAwareTaskAdapter
org.apache.ignite.internal.processors.continuous.GridContinuousHandler