You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/02/14 13:11:51 UTC

[32/34] ignite git commit: Merge remote-tracking branch 'remotes/origin/master' into ignite-3727-2

http://git-wip-us.apache.org/repos/asf/ignite/blob/91e83407/modules/core/src/main/java/org/apache/ignite/internal/managers/GridManagerAdapter.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/91e83407/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 3d9238a,7ef7bc0..cda1321
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@@ -830,14 -785,36 +784,37 @@@ public class GridIoManager extends Grid
                  finally {
                      threadProcessingMessage(false);
  
 -                    msgC.run();
 +                    if (msgC != null)
 +                        msgC.run();
                  }
              }
+ 
+             @Override public String toString() {
+                 return "Message closure [msg=" + msg + ']';
+             }
          };
  
+         if (msg.topicOrdinal() == TOPIC_IO_TEST.ordinal()) {
+             IgniteIoTestMessage msg0 = (IgniteIoTestMessage)msg.message();
+ 
+             if (msg0.processFromNioThread()) {
+                 c.run();
+ 
+                 return;
+             }
+         }
+ 
+         if (ctx.config().getStripedPoolSize() > 0 &&
+             plc == GridIoPolicy.SYSTEM_POOL &&
+             msg.partition() != Integer.MIN_VALUE
+             ) {
+             ctx.getStripedExecutorService().execute(msg.partition(), c);
+ 
+             return;
+         }
+ 
          try {
-             pool(plc).execute(c);
+             pools.poolForPolicy(plc).execute(c);
          }
          catch (RejectedExecutionException e) {
              U.error(log, "Failed to process regular message due to execution rejection. Increase the upper bound " +

http://git-wip-us.apache.org/repos/asf/ignite/blob/91e83407/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
----------------------------------------------------------------------
diff --cc modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
index 3eb7e5f,9e20d2a..688edf7
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
@@@ -51,10 -53,11 +53,12 @@@ import org.apache.ignite.internal.produ
  import org.apache.ignite.internal.util.nio.IgniteExceptionInNioWorkerSelfTest;
  import org.apache.ignite.internal.util.typedef.internal.U;
  import org.apache.ignite.marshaller.DynamicProxySerializationMultiJvmSelfTest;
+ import org.apache.ignite.marshaller.MarshallerContextSelfTest;
  import org.apache.ignite.messaging.GridMessagingNoPeerClassLoadingSelfTest;
  import org.apache.ignite.messaging.GridMessagingSelfTest;
 +import org.apache.ignite.messaging.IgniteMessagingSendAsyncTest;
  import org.apache.ignite.messaging.IgniteMessagingWithClientTest;
+ import org.apache.ignite.plugin.security.SecurityPermissionSetBuilderTest;
  import org.apache.ignite.spi.GridSpiLocalHostInjectionTest;
  import org.apache.ignite.startup.properties.NotStringSystemPropertyTest;
  import org.apache.ignite.testframework.GridTestUtils;

http://git-wip-us.apache.org/repos/asf/ignite/blob/91e83407/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java
----------------------------------------------------------------------
diff --cc modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java
index d1b9eaa,8ffea8c..3db68c4
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffle.java
@@@ -117,7 -144,10 +144,10 @@@ public class HadoopShuffle extends Hado
      private void send0(UUID nodeId, Object msg) throws IgniteCheckedException {
          ClusterNode node = ctx.kernalContext().discovery().node(nodeId);
  
-         ctx.kernalContext().io().sendUserMessage(F.asList(node), msg, GridTopic.TOPIC_HADOOP, false, 0, false);
+         if (msg instanceof Message)
+             ctx.kernalContext().io().send(node, GridTopic.TOPIC_HADOOP_MSG, (Message)msg, GridIoPolicy.PUBLIC_POOL);
+         else
 -            ctx.kernalContext().io().sendUserMessage(F.asList(node), msg, GridTopic.TOPIC_HADOOP, false, 0);
++            ctx.kernalContext().io().sendUserMessage(F.asList(node), msg, GridTopic.TOPIC_HADOOP, false, 0, false);
      }
  
      /**