You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vk...@apache.org on 2015/07/31 22:40:36 UTC
[7/8] incubator-ignite git commit: Merge branches 'ignite-104' and
'master' of https://git-wip-us.apache.org/repos/asf/incubator-ignite into
ignite-104
Merge branches 'ignite-104' and 'master' of https://git-wip-us.apache.org/repos/asf/incubator-ignite into ignite-104
Conflicts:
modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/5cdd2440
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/5cdd2440
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/5cdd2440
Branch: refs/heads/ignite-104
Commit: 5cdd2440a6b9eb3c5fe0a7620202caf5cb2db441
Parents: 6c1655f 1c10ade
Author: Valentin Kulichenko <vk...@gridgain.com>
Authored: Fri Jul 31 13:38:39 2015 -0700
Committer: Valentin Kulichenko <vk...@gridgain.com>
Committed: Fri Jul 31 13:38:39 2015 -0700
----------------------------------------------------------------------
.../JettyRestProcessorAbstractSelfTest.java | 14 +-
.../managers/communication/GridIoManager.java | 110 ++++-
.../GridDhtPartitionsExchangeFuture.java | 20 +-
.../handlers/query/QueryCommandHandler.java | 6 +-
.../util/nio/GridCommunicationClient.java | 5 +-
.../util/nio/GridNioFinishedFuture.java | 12 +
.../ignite/internal/util/nio/GridNioFuture.java | 14 +
.../internal/util/nio/GridNioFutureImpl.java | 15 +
.../util/nio/GridNioRecoveryDescriptor.java | 13 +-
.../ignite/internal/util/nio/GridNioServer.java | 5 +
.../util/nio/GridNioSessionMetaKey.java | 5 +-
.../util/nio/GridShmemCommunicationClient.java | 7 +-
.../util/nio/GridTcpNioCommunicationClient.java | 14 +-
.../communication/tcp/TcpCommunicationSpi.java | 84 +++-
.../ignite/spi/discovery/tcp/ServerImpl.java | 45 +-
.../spi/discovery/tcp/TcpDiscoverySpi.java | 2 +-
...CommunicationRecoveryAckClosureSelfTest.java | 464 +++++++++++++++++++
.../tcp/TcpDiscoveryMultiThreadedTest.java | 8 +-
.../IgniteSpiCommunicationSelfTestSuite.java | 1 +
.../http/jetty/GridJettyRestHandler.java | 12 +-
20 files changed, 779 insertions(+), 77 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5cdd2440/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 765ba65,7e17efc..479d116
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@@ -1059,9 -982,9 +1061,10 @@@ public class GridIoManager extends Grid
Message msg,
byte plc,
boolean ordered,
+ boolean seq,
long timeout,
- boolean skipOnTimeout
+ boolean skipOnTimeout,
+ IgniteInClosure<IgniteException> ackClosure
) throws IgniteCheckedException {
assert node != null;
assert topic != null;
@@@ -1079,10 -1002,11 +1082,13 @@@
if (ordered)
processOrderedMessage(locNodeId, ioMsg, plc, null);
+ else if (seq)
+ processSequentialMessage(locNodeId, ioMsg, plc, null);
else
processRegularMessage0(ioMsg, locNodeId);
+
+ if (ackClosure != null)
+ ackClosure.apply(null);
}
else {
if (topicOrd < 0)
@@@ -1132,7 -1059,7 +1141,7 @@@
if (node == null)
throw new IgniteCheckedException("Failed to send message to node (has node left grid?): " + nodeId);
- send(node, topic, topic.ordinal(), msg, plc, false, false, 0, false);
- send(node, topic, topic.ordinal(), msg, plc, false, 0, false, null);
++ send(node, topic, topic.ordinal(), msg, plc, false, false, 0, false, null);
}
/**
@@@ -1144,7 -1071,7 +1153,7 @@@
*/
public void send(ClusterNode node, Object topic, Message msg, byte plc)
throws IgniteCheckedException {
- send(node, topic, -1, msg, plc, false, false, 0, false);
- send(node, topic, -1, msg, plc, false, 0, false, null);
++ send(node, topic, -1, msg, plc, false, false, 0, false, null);
}
/**
@@@ -1156,7 -1083,7 +1165,7 @@@
*/
public void send(ClusterNode node, GridTopic topic, Message msg, byte plc)
throws IgniteCheckedException {
- send(node, topic, topic.ordinal(), msg, plc, false, false, 0, false);
- send(node, topic, topic.ordinal(), msg, plc, false, 0, false, null);
++ send(node, topic, topic.ordinal(), msg, plc, false, false, 0, false, null);
}
/**
@@@ -1178,7 -1105,7 +1187,7 @@@
) throws IgniteCheckedException {
assert timeout > 0 || skipOnTimeout;
- send(node, topic, (byte)-1, msg, plc, true, false, timeout, skipOnTimeout);
- send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, null);
++ send(node, topic, (byte)-1, msg, plc, true, false, timeout, skipOnTimeout, null);
}
/**
@@@ -1205,7 -1132,7 +1214,7 @@@
if (node == null)
throw new IgniteCheckedException("Failed to send message to node (has node left grid?): " + nodeId);
- send(node, topic, (byte)-1, msg, plc, true, false, timeout, skipOnTimeout);
- send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, null);
++ send(node, topic, (byte)-1, msg, plc, true, false, timeout, skipOnTimeout, null);
}
/**
@@@ -1264,47 -1217,30 +1299,71 @@@
}
/**
+ * @param node Destination node.
+ * @param topic Topic to send the message to.
+ * @param msg Message to send.
+ * @param plc Type of processing.
+ * @param timeout Timeout to keep a message on receiving queue.
+ * @param skipOnTimeout Whether message can be skipped on timeout.
+ * @param ackClosure Ack closure.
+ * @throws IgniteCheckedException Thrown in case of any errors.
+ */
+ public void sendOrderedMessage(
+ ClusterNode node,
+ Object topic,
+ Message msg,
+ byte plc,
+ long timeout,
+ boolean skipOnTimeout,
+ IgniteInClosure<IgniteException> ackClosure
+ ) throws IgniteCheckedException {
+ assert timeout > 0 || skipOnTimeout;
+
+ send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, ackClosure);
+ }
+
+ /**
+ * Sends sequential message.
+ *
+ * @param nodeId Destination node ID.
+ * @param topic Topic.
+ * @param msg Message.
+ * @param plc Policy.
+ * @throws IgniteCheckedException In case of error.
+ */
+ public void sendSequentialMessage(
+ UUID nodeId,
+ Object topic,
+ Message msg,
+ byte plc
+ ) throws IgniteCheckedException {
+ ClusterNode node = ctx.discovery().node(nodeId);
+
+ if (node == null)
+ throw new IgniteCheckedException("Failed to send message to node (has node left grid?): " + nodeId);
+
+ sendSequentialMessage(node, topic, msg, plc);
+ }
+
+ /**
+ * Sends sequential message.
+ *
+ * @param node Destination node.
+ * @param topic Topic.
+ * @param msg Message.
+ * @param plc Policy.
+ * @throws IgniteCheckedException In case of error.
+ */
+ public void sendSequentialMessage(
+ ClusterNode node,
+ Object topic,
+ Message msg,
+ byte plc
+ ) throws IgniteCheckedException {
- send(node, topic, -1, msg, plc, false, true, 0, false);
++ send(node, topic, -1, msg, plc, false, true, 0, false, null);
+ }
+
+ /**
* Sends a peer deployable user message.
*
* @param nodes Destination nodes.
@@@ -1459,7 -1422,7 +1547,7 @@@
// messages to one node vs. many.
if (!nodes.isEmpty()) {
for (ClusterNode node : nodes)
- send(node, topic, topicOrd, msg, plc, ordered, seq, timeout, skipOnTimeout);
- send(node, topic, topicOrd, msg, plc, ordered, timeout, skipOnTimeout, null);
++ send(node, topic, topicOrd, msg, plc, ordered, seq, timeout, skipOnTimeout, null);
}
else if (log.isDebugEnabled())
log.debug("Failed to send message to empty nodes collection [topic=" + topic + ", msg=" +