You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vk...@apache.org on 2015/07/31 22:40:36 UTC

[7/8] incubator-ignite git commit: Merge branches 'ignite-104' and 'master' of https://git-wip-us.apache.org/repos/asf/incubator-ignite into ignite-104

Merge branches 'ignite-104' and 'master' of https://git-wip-us.apache.org/repos/asf/incubator-ignite into ignite-104

Conflicts:
	modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/5cdd2440
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/5cdd2440
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/5cdd2440

Branch: refs/heads/ignite-104
Commit: 5cdd2440a6b9eb3c5fe0a7620202caf5cb2db441
Parents: 6c1655f 1c10ade
Author: Valentin Kulichenko <vk...@gridgain.com>
Authored: Fri Jul 31 13:38:39 2015 -0700
Committer: Valentin Kulichenko <vk...@gridgain.com>
Committed: Fri Jul 31 13:38:39 2015 -0700

----------------------------------------------------------------------
 .../JettyRestProcessorAbstractSelfTest.java     |  14 +-
 .../managers/communication/GridIoManager.java   | 110 ++++-
 .../GridDhtPartitionsExchangeFuture.java        |  20 +-
 .../handlers/query/QueryCommandHandler.java     |   6 +-
 .../util/nio/GridCommunicationClient.java       |   5 +-
 .../util/nio/GridNioFinishedFuture.java         |  12 +
 .../ignite/internal/util/nio/GridNioFuture.java |  14 +
 .../internal/util/nio/GridNioFutureImpl.java    |  15 +
 .../util/nio/GridNioRecoveryDescriptor.java     |  13 +-
 .../ignite/internal/util/nio/GridNioServer.java |   5 +
 .../util/nio/GridNioSessionMetaKey.java         |   5 +-
 .../util/nio/GridShmemCommunicationClient.java  |   7 +-
 .../util/nio/GridTcpNioCommunicationClient.java |  14 +-
 .../communication/tcp/TcpCommunicationSpi.java  |  84 +++-
 .../ignite/spi/discovery/tcp/ServerImpl.java    |  45 +-
 .../spi/discovery/tcp/TcpDiscoverySpi.java      |   2 +-
 ...CommunicationRecoveryAckClosureSelfTest.java | 464 +++++++++++++++++++
 .../tcp/TcpDiscoveryMultiThreadedTest.java      |   8 +-
 .../IgniteSpiCommunicationSelfTestSuite.java    |   1 +
 .../http/jetty/GridJettyRestHandler.java        |  12 +-
 20 files changed, 779 insertions(+), 77 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5cdd2440/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 765ba65,7e17efc..479d116
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@@ -1059,9 -982,9 +1061,10 @@@ public class GridIoManager extends Grid
          Message msg,
          byte plc,
          boolean ordered,
 +        boolean seq,
          long timeout,
-         boolean skipOnTimeout
+         boolean skipOnTimeout,
+         IgniteInClosure<IgniteException> ackClosure
      ) throws IgniteCheckedException {
          assert node != null;
          assert topic != null;
@@@ -1079,10 -1002,11 +1082,13 @@@
  
              if (ordered)
                  processOrderedMessage(locNodeId, ioMsg, plc, null);
 +            else if (seq)
 +                processSequentialMessage(locNodeId, ioMsg, plc, null);
              else
                  processRegularMessage0(ioMsg, locNodeId);
+ 
+             if (ackClosure != null)
+                 ackClosure.apply(null);
          }
          else {
              if (topicOrd < 0)
@@@ -1132,7 -1059,7 +1141,7 @@@
          if (node == null)
              throw new IgniteCheckedException("Failed to send message to node (has node left grid?): " + nodeId);
  
-         send(node, topic, topic.ordinal(), msg, plc, false, false, 0, false);
 -        send(node, topic, topic.ordinal(), msg, plc, false, 0, false, null);
++        send(node, topic, topic.ordinal(), msg, plc, false, false, 0, false, null);
      }
  
      /**
@@@ -1144,7 -1071,7 +1153,7 @@@
       */
      public void send(ClusterNode node, Object topic, Message msg, byte plc)
          throws IgniteCheckedException {
-         send(node, topic, -1, msg, plc, false, false, 0, false);
 -        send(node, topic, -1, msg, plc, false, 0, false, null);
++        send(node, topic, -1, msg, plc, false, false, 0, false, null);
      }
  
      /**
@@@ -1156,7 -1083,7 +1165,7 @@@
       */
      public void send(ClusterNode node, GridTopic topic, Message msg, byte plc)
          throws IgniteCheckedException {
-         send(node, topic, topic.ordinal(), msg, plc, false, false, 0, false);
 -        send(node, topic, topic.ordinal(), msg, plc, false, 0, false, null);
++        send(node, topic, topic.ordinal(), msg, plc, false, false, 0, false, null);
      }
  
      /**
@@@ -1178,7 -1105,7 +1187,7 @@@
      ) throws IgniteCheckedException {
          assert timeout > 0 || skipOnTimeout;
  
-         send(node, topic, (byte)-1, msg, plc, true, false, timeout, skipOnTimeout);
 -        send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, null);
++        send(node, topic, (byte)-1, msg, plc, true, false, timeout, skipOnTimeout, null);
      }
  
      /**
@@@ -1205,7 -1132,7 +1214,7 @@@
          if (node == null)
              throw new IgniteCheckedException("Failed to send message to node (has node left grid?): " + nodeId);
  
-         send(node, topic, (byte)-1, msg, plc, true, false, timeout, skipOnTimeout);
 -        send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, null);
++        send(node, topic, (byte)-1, msg, plc, true, false, timeout, skipOnTimeout, null);
      }
  
      /**
@@@ -1264,47 -1217,30 +1299,71 @@@
      }
  
      /**
+      * @param node Destination node.
+      * @param topic Topic to send the message to.
+      * @param msg Message to send.
+      * @param plc Type of processing.
+      * @param timeout Timeout to keep a message on receiving queue.
+      * @param skipOnTimeout Whether message can be skipped on timeout.
+      * @param ackClosure Ack closure.
+      * @throws IgniteCheckedException Thrown in case of any errors.
+      */
+     public void sendOrderedMessage(
+         ClusterNode node,
+         Object topic,
+         Message msg,
+         byte plc,
+         long timeout,
+         boolean skipOnTimeout,
+         IgniteInClosure<IgniteException> ackClosure
+     ) throws IgniteCheckedException {
+         assert timeout > 0 || skipOnTimeout;
+ 
+         send(node, topic, (byte)-1, msg, plc, true, timeout, skipOnTimeout, ackClosure);
+     }
+ 
+      /**
 +     * Sends sequential message.
 +     *
 +     * @param nodeId Destination node ID.
 +     * @param topic Topic.
 +     * @param msg Message.
 +     * @param plc Policy.
 +     * @throws IgniteCheckedException In case of error.
 +     */
 +    public void sendSequentialMessage(
 +        UUID nodeId,
 +        Object topic,
 +        Message msg,
 +        byte plc
 +    ) throws IgniteCheckedException {
 +        ClusterNode node = ctx.discovery().node(nodeId);
 +
 +        if (node == null)
 +            throw new IgniteCheckedException("Failed to send message to node (has node left grid?): " + nodeId);
 +
 +        sendSequentialMessage(node, topic, msg, plc);
 +    }
 +
 +    /**
 +     * Sends sequential message.
 +     *
 +     * @param node Destination node.
 +     * @param topic Topic.
 +     * @param msg Message.
 +     * @param plc Policy.
 +     * @throws IgniteCheckedException In case of error.
 +     */
 +    public void sendSequentialMessage(
 +        ClusterNode node,
 +        Object topic,
 +        Message msg,
 +        byte plc
 +    ) throws IgniteCheckedException {
-         send(node, topic, -1, msg, plc, false, true, 0, false);
++        send(node, topic, -1, msg, plc, false, true, 0, false, null);
 +    }
 +
 +    /**
       * Sends a peer deployable user message.
       *
       * @param nodes Destination nodes.
@@@ -1459,7 -1422,7 +1547,7 @@@
              // messages to one node vs. many.
              if (!nodes.isEmpty()) {
                  for (ClusterNode node : nodes)
-                     send(node, topic, topicOrd, msg, plc, ordered, seq, timeout, skipOnTimeout);
 -                    send(node, topic, topicOrd, msg, plc, ordered, timeout, skipOnTimeout, null);
++                    send(node, topic, topicOrd, msg, plc, ordered, seq, timeout, skipOnTimeout, null);
              }
              else if (log.isDebugEnabled())
                  log.debug("Failed to send message to empty nodes collection [topic=" + topic + ", msg=" +