You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by dp...@apache.org on 2018/05/24 12:52:04 UTC
[41/50] [abbrv] ignite git commit: IGNITE-8525: Support for
IgniteZeroMqStreamer non-multi-part pub-sub. - Fixes #4020.
IGNITE-8525: Support for IgniteZeroMqStreamer non-multi-part pub-sub. - Fixes #4020.
Signed-off-by: shroman <rs...@yahoo.com>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/2cad0ab3
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/2cad0ab3
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/2cad0ab3
Branch: refs/heads/ignite-5789-1
Commit: 2cad0ab39665b1d2cc1d4d3edf45e74699070411
Parents: cee9171
Author: shroman <rs...@yahoo.com>
Authored: Wed May 23 13:50:00 2018 +0900
Committer: shroman <rs...@yahoo.com>
Committed: Wed May 23 13:50:00 2018 +0900
----------------------------------------------------------------------
.../stream/zeromq/IgniteZeroMqStreamer.java | 43 +++++++++++---------
.../stream/zeromq/IgniteZeroMqStreamerTest.java | 24 ++++++++++-
2 files changed, 46 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/2cad0ab3/modules/zeromq/src/main/java/org/apache/ignite/stream/zeromq/IgniteZeroMqStreamer.java
----------------------------------------------------------------------
diff --git a/modules/zeromq/src/main/java/org/apache/ignite/stream/zeromq/IgniteZeroMqStreamer.java b/modules/zeromq/src/main/java/org/apache/ignite/stream/zeromq/IgniteZeroMqStreamer.java
index 18cb387..1a8e15b 100644
--- a/modules/zeromq/src/main/java/org/apache/ignite/stream/zeromq/IgniteZeroMqStreamer.java
+++ b/modules/zeromq/src/main/java/org/apache/ignite/stream/zeromq/IgniteZeroMqStreamer.java
@@ -95,29 +95,34 @@ public class IgniteZeroMqStreamer<K, V> extends StreamAdapter<byte[], K, V> impl
ctx = ZMQ.context(ioThreads);
- zeroMqExSrv.execute(new Runnable() {
- @Override public void run() {
- ZMQ.Socket socket = ctx.socket(socketType);
- socket.connect(addr);
-
- if (ZeroMqTypeSocket.SUB.getType() == socketType)
- socket.subscribe(topic);
-
- while (isStarted) {
- try {
- if (ZeroMqTypeSocket.SUB.getType() == socketType)
- socket.recv(0);
- addMessage(socket.recv(0));
- }
- catch (ZMQException e) {
- if (e.getErrorCode() == ZMQ.Error.ETERM.getCode()) {
- break;
+ zeroMqExSrv.execute(() -> {
+ ZMQ.Socket socket = ctx.socket(socketType);
+ socket.connect(addr);
+
+ if (ZeroMqTypeSocket.SUB.getType() == socketType)
+ socket.subscribe(topic);
+
+ while (isStarted) {
+ try {
+ byte[] msg = socket.recv(0);
+
+ if (ZeroMqTypeSocket.SUB.getType() == socketType) {
+ if (socket.hasReceiveMore()) {
+ addMessage(socket.recv(0));
+ continue;
}
}
- }
- socket.close();
+ addMessage(msg);
+ }
+ catch (ZMQException e) {
+ if (e.getErrorCode() == ZMQ.Error.ETERM.getCode()) {
+ break;
+ }
+ }
}
+
+ socket.close();
});
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2cad0ab3/modules/zeromq/src/test/java/org/apache/ignite/stream/zeromq/IgniteZeroMqStreamerTest.java
----------------------------------------------------------------------
diff --git a/modules/zeromq/src/test/java/org/apache/ignite/stream/zeromq/IgniteZeroMqStreamerTest.java b/modules/zeromq/src/test/java/org/apache/ignite/stream/zeromq/IgniteZeroMqStreamerTest.java
index 5d2d0c2..0992126 100644
--- a/modules/zeromq/src/test/java/org/apache/ignite/stream/zeromq/IgniteZeroMqStreamerTest.java
+++ b/modules/zeromq/src/test/java/org/apache/ignite/stream/zeromq/IgniteZeroMqStreamerTest.java
@@ -44,6 +44,9 @@ public class IgniteZeroMqStreamerTest extends GridCommonAbstractTest {
/** Topic name for PUB-SUB. */
private final byte[] TOPIC = "0mq".getBytes();
+ /** If pub-sub envelopes are used. */
+ private static boolean multipart_pubsub;
+
/** Constructor. */
public IgniteZeroMqStreamerTest() {
super(true);
@@ -74,6 +77,19 @@ public class IgniteZeroMqStreamerTest extends GridCommonAbstractTest {
/**
* @throws Exception Test exception.
*/
+ public void testZeroMqSubSocketMultipart() throws Exception {
+ try (IgniteDataStreamer<Integer, String> dataStreamer = grid().dataStreamer(DEFAULT_CACHE_NAME)) {
+ try (IgniteZeroMqStreamer streamer = newStreamerInstance(
+ dataStreamer, 3, ZeroMqTypeSocket.SUB, ADDR, TOPIC);) {
+ multipart_pubsub = true;
+ executeStreamer(streamer, ZMQ.PUB, TOPIC);
+ }
+ }
+ }
+
+ /**
+ * @throws Exception Test exception.
+ */
public void testZeroMqSubSocket() throws Exception {
try (IgniteDataStreamer<Integer, String> dataStreamer = grid().dataStreamer(DEFAULT_CACHE_NAME)) {
try (IgniteZeroMqStreamer streamer = newStreamerInstance(
@@ -130,7 +146,7 @@ public class IgniteZeroMqStreamerTest extends GridCommonAbstractTest {
String cachedValue = cache.get(testId);
// ZeroMQ message successfully put to cache.
- assertTrue(cachedValue != null && cachedValue.equals(String.valueOf(testId)));
+ assertTrue(cachedValue != null && cachedValue.endsWith(String.valueOf(testId)));
assertTrue(cache.size() == CACHE_ENTRY_COUNT);
@@ -173,7 +189,11 @@ public class IgniteZeroMqStreamerTest extends GridCommonAbstractTest {
for (int i = 0; i < CACHE_ENTRY_COUNT; i++) {
if (ZMQ.PUB == clientSocket)
socket.sendMore(topic);
- socket.send(String.valueOf(i).getBytes("UTF-8"));
+
+ if (ZMQ.PUB == clientSocket && multipart_pubsub)
+ socket.send((topic + " " + String.valueOf(i)).getBytes("UTF-8"));
+ else
+ socket.send(String.valueOf(i).getBytes("UTF-8"));
}
}
}