You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by dp...@apache.org on 2018/05/24 12:52:04 UTC

[41/50] [abbrv] ignite git commit: IGNITE-8525: Support for IgniteZeroMqStreamer non-multi-part pub-sub. - Fixes #4020.

IGNITE-8525: Support for IgniteZeroMqStreamer non-multi-part pub-sub. - Fixes #4020.

Signed-off-by: shroman <rs...@yahoo.com>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/2cad0ab3
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/2cad0ab3
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/2cad0ab3

Branch: refs/heads/ignite-5789-1
Commit: 2cad0ab39665b1d2cc1d4d3edf45e74699070411
Parents: cee9171
Author: shroman <rs...@yahoo.com>
Authored: Wed May 23 13:50:00 2018 +0900
Committer: shroman <rs...@yahoo.com>
Committed: Wed May 23 13:50:00 2018 +0900

----------------------------------------------------------------------
 .../stream/zeromq/IgniteZeroMqStreamer.java     | 43 +++++++++++---------
 .../stream/zeromq/IgniteZeroMqStreamerTest.java | 24 ++++++++++-
 2 files changed, 46 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/2cad0ab3/modules/zeromq/src/main/java/org/apache/ignite/stream/zeromq/IgniteZeroMqStreamer.java
----------------------------------------------------------------------
diff --git a/modules/zeromq/src/main/java/org/apache/ignite/stream/zeromq/IgniteZeroMqStreamer.java b/modules/zeromq/src/main/java/org/apache/ignite/stream/zeromq/IgniteZeroMqStreamer.java
index 18cb387..1a8e15b 100644
--- a/modules/zeromq/src/main/java/org/apache/ignite/stream/zeromq/IgniteZeroMqStreamer.java
+++ b/modules/zeromq/src/main/java/org/apache/ignite/stream/zeromq/IgniteZeroMqStreamer.java
@@ -95,29 +95,34 @@ public class IgniteZeroMqStreamer<K, V> extends StreamAdapter<byte[], K, V> impl
 
         ctx = ZMQ.context(ioThreads);
 
-        zeroMqExSrv.execute(new Runnable() {
-            @Override public void run() {
-                ZMQ.Socket socket = ctx.socket(socketType);
-                socket.connect(addr);
-
-                if (ZeroMqTypeSocket.SUB.getType() == socketType)
-                    socket.subscribe(topic);
-
-                while (isStarted) {
-                    try {
-                        if (ZeroMqTypeSocket.SUB.getType() == socketType)
-                            socket.recv(0);
-                        addMessage(socket.recv(0));
-                    }
-                    catch (ZMQException e) {
-                        if (e.getErrorCode() == ZMQ.Error.ETERM.getCode()) {
-                            break;
+        zeroMqExSrv.execute(() -> {
+            ZMQ.Socket socket = ctx.socket(socketType);
+            socket.connect(addr);
+
+            if (ZeroMqTypeSocket.SUB.getType() == socketType)
+                socket.subscribe(topic);
+
+            while (isStarted) {
+                try {
+                    byte[] msg = socket.recv(0);
+
+                    if (ZeroMqTypeSocket.SUB.getType() == socketType) {
+                        if (socket.hasReceiveMore()) {
+                            addMessage(socket.recv(0));
+                            continue;
                         }
                     }
-                }
 
-                socket.close();
+                    addMessage(msg);
+                }
+                catch (ZMQException e) {
+                    if (e.getErrorCode() == ZMQ.Error.ETERM.getCode()) {
+                        break;
+                    }
+                }
             }
+
+            socket.close();
         });
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/2cad0ab3/modules/zeromq/src/test/java/org/apache/ignite/stream/zeromq/IgniteZeroMqStreamerTest.java
----------------------------------------------------------------------
diff --git a/modules/zeromq/src/test/java/org/apache/ignite/stream/zeromq/IgniteZeroMqStreamerTest.java b/modules/zeromq/src/test/java/org/apache/ignite/stream/zeromq/IgniteZeroMqStreamerTest.java
index 5d2d0c2..0992126 100644
--- a/modules/zeromq/src/test/java/org/apache/ignite/stream/zeromq/IgniteZeroMqStreamerTest.java
+++ b/modules/zeromq/src/test/java/org/apache/ignite/stream/zeromq/IgniteZeroMqStreamerTest.java
@@ -44,6 +44,9 @@ public class IgniteZeroMqStreamerTest extends GridCommonAbstractTest {
     /** Topic name for PUB-SUB. */
     private final byte[] TOPIC = "0mq".getBytes();
 
+    /** If pub-sub envelopes are used. */
+    private static boolean multipart_pubsub;
+
     /** Constructor. */
     public IgniteZeroMqStreamerTest() {
         super(true);
@@ -74,6 +77,19 @@ public class IgniteZeroMqStreamerTest extends GridCommonAbstractTest {
     /**
      * @throws Exception Test exception.
      */
+    public void testZeroMqSubSocketMultipart() throws Exception {
+        try (IgniteDataStreamer<Integer, String> dataStreamer = grid().dataStreamer(DEFAULT_CACHE_NAME)) {
+            try (IgniteZeroMqStreamer streamer = newStreamerInstance(
+                dataStreamer, 3, ZeroMqTypeSocket.SUB, ADDR, TOPIC);) {
+                multipart_pubsub = true;
+                executeStreamer(streamer, ZMQ.PUB, TOPIC);
+            }
+        }
+    }
+
+    /**
+     * @throws Exception Test exception.
+     */
     public void testZeroMqSubSocket() throws Exception {
         try (IgniteDataStreamer<Integer, String> dataStreamer = grid().dataStreamer(DEFAULT_CACHE_NAME)) {
             try (IgniteZeroMqStreamer streamer = newStreamerInstance(
@@ -130,7 +146,7 @@ public class IgniteZeroMqStreamerTest extends GridCommonAbstractTest {
         String cachedValue = cache.get(testId);
 
         // ZeroMQ message successfully put to cache.
-        assertTrue(cachedValue != null && cachedValue.equals(String.valueOf(testId)));
+        assertTrue(cachedValue != null && cachedValue.endsWith(String.valueOf(testId)));
 
         assertTrue(cache.size() == CACHE_ENTRY_COUNT);
 
@@ -173,7 +189,11 @@ public class IgniteZeroMqStreamerTest extends GridCommonAbstractTest {
             for (int i = 0; i < CACHE_ENTRY_COUNT; i++) {
                 if (ZMQ.PUB == clientSocket)
                     socket.sendMore(topic);
-                socket.send(String.valueOf(i).getBytes("UTF-8"));
+
+                if (ZMQ.PUB == clientSocket && multipart_pubsub)
+                    socket.send((topic + " " + String.valueOf(i)).getBytes("UTF-8"));
+                else
+                    socket.send(String.valueOf(i).getBytes("UTF-8"));
             }
         }
     }