You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/02/14 13:11:35 UTC
[16/34] ignite git commit: IGNITE-3727 test send sync/async and small
fix in code
IGNITE-3727 test send sync/async and small fix in code
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/8b416e8b
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/8b416e8b
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/8b416e8b
Branch: refs/heads/ignite-3727-2
Commit: 8b416e8b2b3ea8f0a56eb9e41098df0b96a59cc9
Parents: e68a4fa
Author: DmitriyGovorukhin <dg...@gridgain.com>
Authored: Fri Sep 9 12:02:19 2016 +0300
Committer: DmitriyGovorukhin <dg...@gridgain.com>
Committed: Fri Sep 9 12:02:19 2016 +0300
----------------------------------------------------------------------
.../managers/communication/GridIoManager.java | 4 +-
.../messaging/IgniteMessagingSendAsyncTest.java | 140 +++++++++++++++++++
2 files changed, 142 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/8b416e8b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 1ba649e..6f5421e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -1375,7 +1375,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
*/
public void send(ClusterNode node, GridTopic topic, Message msg, byte plc, boolean async)
throws IgniteCheckedException {
- send(node, topic, topic.ordinal(), msg, plc, false, 0, false, null, false);
+ send(node, topic, topic.ordinal(), msg, plc, false, 0, false, null, async);
}
/**
@@ -1614,7 +1614,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
if (ordered)
sendOrderedMessage(nodes, TOPIC_COMM_USER, ioMsg, PUBLIC_POOL, timeout, true);
else if (loc)
- send(F.first(nodes), TOPIC_COMM_USER, ioMsg, PUBLIC_POOL);
+ send(F.first(nodes), TOPIC_COMM_USER, ioMsg, PUBLIC_POOL, async);
else {
ClusterNode locNode = F.find(nodes, null, F.localNode(locNodeId));
http://git-wip-us.apache.org/repos/asf/ignite/blob/8b416e8b/modules/core/src/test/java/org/apache/ignite/messaging/IgniteMessagingSendAsyncTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/messaging/IgniteMessagingSendAsyncTest.java b/modules/core/src/test/java/org/apache/ignite/messaging/IgniteMessagingSendAsyncTest.java
new file mode 100644
index 0000000..6f1188e
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/messaging/IgniteMessagingSendAsyncTest.java
@@ -0,0 +1,140 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.ignite.messaging;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteMessaging;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Assert;
+
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Created by dgovorukhin on 09.09.2016.
+ */
+public class IgniteMessagingSendAsyncTest extends GridCommonAbstractTest {
+ /**
+ * Ignite instance for test.
+ */
+ private Ignite ignite;
+
+ /**
+ * Topic name.
+ */
+ private final String TOPIC = "topic";
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ protected void beforeTest() throws Exception {
+ super.beforeTest();
+
+ ignite = startGrid(0);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ protected void afterTest() throws Exception {
+ super.afterTest();
+
+ stopGrid(0);
+ }
+
+ /**
+ * Test for check, that if use default mode, local listeners execute
+ * in the same thread.
+ */
+ public void testSendDefaultMode() throws InterruptedException {
+ final String msgStr = "message";
+
+ send(ignite.message(), msgStr, new ProcedureApply() {
+ @Override
+ public void apply(String msg, Thread thread) {
+ Assert.assertEquals(Thread.currentThread(), thread);
+ Assert.assertEquals(msgStr, msg);
+ }
+ });
+
+ }
+
+ /**
+ * Test for check, that if use async mode, local listeners execute
+ * in another thread(through pool).
+ */
+ public void testSendAsyncMode() throws InterruptedException {
+ final String msgStr = "message";
+
+ send(ignite.message().withAsync(), msgStr, new ProcedureApply() {
+ @Override
+ public void apply(String msg, Thread thread) {
+ Assert.assertTrue(!Thread.currentThread().equals(thread));
+ Assert.assertEquals(msgStr, msg);
+ }
+ });
+ }
+
+ /**
+ * @param igniteMsg Ignite message.
+ * @param msgStr Message string.
+ * @param cls callback for compare result.
+ */
+ private void send(
+ IgniteMessaging igniteMsg,
+ String msgStr,
+ ProcedureApply cls
+ ) throws InterruptedException {
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ final AtomicReference<Thread> thread = new AtomicReference<>();
+ final AtomicReference<String> val = new AtomicReference<>();
+
+ igniteMsg.localListen(TOPIC, new IgniteBiPredicate<UUID, String>() {
+ @Override
+ public boolean apply(UUID uuid, String msgStr) {
+ thread.set(Thread.currentThread());
+ val.set(msgStr);
+ latch.countDown();
+ return false;
+ }
+ });
+
+ igniteMsg.send(TOPIC, msgStr);
+
+ latch.await();
+
+ cls.apply(val.get(), thread.get());
+ }
+
+ /**
+ * only for this test procedure
+ */
+ private interface ProcedureApply {
+
+ /**
+ * @param val Value.
+ * @param thread Thread.
+ */
+ void apply(String val, Thread thread);
+ }
+
+}