You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/02/14 13:11:35 UTC

[16/34] ignite git commit: IGNITE-3727 test send sync/async and small fix in code

IGNITE-3727 test send sync/async and small fix in code


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/8b416e8b
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/8b416e8b
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/8b416e8b

Branch: refs/heads/ignite-3727-2
Commit: 8b416e8b2b3ea8f0a56eb9e41098df0b96a59cc9
Parents: e68a4fa
Author: DmitriyGovorukhin <dg...@gridgain.com>
Authored: Fri Sep 9 12:02:19 2016 +0300
Committer: DmitriyGovorukhin <dg...@gridgain.com>
Committed: Fri Sep 9 12:02:19 2016 +0300

----------------------------------------------------------------------
 .../managers/communication/GridIoManager.java   |   4 +-
 .../messaging/IgniteMessagingSendAsyncTest.java | 140 +++++++++++++++++++
 2 files changed, 142 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/8b416e8b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 1ba649e..6f5421e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -1375,7 +1375,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
      */
     public void send(ClusterNode node, GridTopic topic, Message msg, byte plc, boolean async)
         throws IgniteCheckedException {
-        send(node, topic, topic.ordinal(), msg, plc, false, 0, false, null, false);
+        send(node, topic, topic.ordinal(), msg, plc, false, 0, false, null, async);
     }
 
     /**
@@ -1614,7 +1614,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
         if (ordered)
             sendOrderedMessage(nodes, TOPIC_COMM_USER, ioMsg, PUBLIC_POOL, timeout, true);
         else if (loc)
-            send(F.first(nodes), TOPIC_COMM_USER, ioMsg, PUBLIC_POOL);
+            send(F.first(nodes), TOPIC_COMM_USER, ioMsg, PUBLIC_POOL, async);
         else {
             ClusterNode locNode = F.find(nodes, null, F.localNode(locNodeId));
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/8b416e8b/modules/core/src/test/java/org/apache/ignite/messaging/IgniteMessagingSendAsyncTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/messaging/IgniteMessagingSendAsyncTest.java b/modules/core/src/test/java/org/apache/ignite/messaging/IgniteMessagingSendAsyncTest.java
new file mode 100644
index 0000000..6f1188e
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/messaging/IgniteMessagingSendAsyncTest.java
@@ -0,0 +1,140 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.ignite.messaging;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteMessaging;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Assert;
+
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Created by dgovorukhin on 09.09.2016.
+ */
+public class IgniteMessagingSendAsyncTest extends GridCommonAbstractTest {
+    /**
+     * Ignite instance for test.
+     */
+    private Ignite ignite;
+
+    /**
+     * Topic name.
+     */
+    private final String TOPIC = "topic";
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        ignite = startGrid(0);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    protected void afterTest() throws Exception {
+        super.afterTest();
+
+        stopGrid(0);
+    }
+
+    /**
+     * Test for check, that if use default mode, local listeners execute
+     * in the same thread.
+     */
+    public void testSendDefaultMode() throws InterruptedException {
+        final String msgStr = "message";
+
+        send(ignite.message(), msgStr, new ProcedureApply() {
+            @Override
+            public void apply(String msg, Thread thread) {
+                Assert.assertEquals(Thread.currentThread(), thread);
+                Assert.assertEquals(msgStr, msg);
+            }
+        });
+
+    }
+
+    /**
+     * Test for check, that if use async mode, local listeners execute
+     * in another thread(through pool).
+     */
+    public void testSendAsyncMode() throws InterruptedException {
+        final String msgStr = "message";
+
+        send(ignite.message().withAsync(), msgStr, new ProcedureApply() {
+            @Override
+            public void apply(String msg, Thread thread) {
+                Assert.assertTrue(!Thread.currentThread().equals(thread));
+                Assert.assertEquals(msgStr, msg);
+            }
+        });
+    }
+
+    /**
+     * @param igniteMsg Ignite message.
+     * @param msgStr    Message string.
+     * @param cls       callback for compare result.
+     */
+    private void send(
+            IgniteMessaging igniteMsg,
+            String msgStr,
+            ProcedureApply cls
+    ) throws InterruptedException {
+        final CountDownLatch latch = new CountDownLatch(1);
+
+        final AtomicReference<Thread> thread = new AtomicReference<>();
+        final AtomicReference<String> val = new AtomicReference<>();
+
+        igniteMsg.localListen(TOPIC, new IgniteBiPredicate<UUID, String>() {
+            @Override
+            public boolean apply(UUID uuid, String msgStr) {
+                thread.set(Thread.currentThread());
+                val.set(msgStr);
+                latch.countDown();
+                return false;
+            }
+        });
+
+        igniteMsg.send(TOPIC, msgStr);
+
+        latch.await();
+
+        cls.apply(val.get(), thread.get());
+    }
+
+    /**
+     * only for this test procedure
+     */
+    private interface ProcedureApply {
+
+        /**
+         * @param val    Value.
+         * @param thread Thread.
+         */
+        void apply(String val, Thread thread);
+    }
+
+}