You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@edgent.apache.org by dj...@apache.org on 2016/03/07 19:26:40 UTC

[30/50] incubator-quarks git commit: Add connectors/pubsub to allow publish/subscribe across applications

Add connectors/pubsub to allow publish/subscribe across applications


Project: http://git-wip-us.apache.org/repos/asf/incubator-quarks/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quarks/commit/ca7627b0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quarks/tree/ca7627b0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quarks/diff/ca7627b0

Branch: refs/heads/master
Commit: ca7627b0e4e4ca85ebfdde829443b9084f688a67
Parents: ec70553
Author: Daniel J. Debrunner <de...@us.ibm.com>
Authored: Fri Mar 4 14:58:48 2016 -0800
Committer: Daniel J. Debrunner <de...@us.ibm.com>
Committed: Sun Mar 6 09:39:17 2016 -0800

----------------------------------------------------------------------
 connectors/.classpath                           |   2 +
 connectors/pubsub/build.xml                     |  31 ++++
 .../connectors/pubsub/PublishSubscribe.java     | 114 ++++++++++++
 .../connectors/pubsub/oplets/Publish.java       |  35 ++++
 .../quarks/connectors/pubsub/package-info.java  |   9 +
 .../pubsub/service/ProviderPubSub.java          |  49 +++++
 .../pubsub/service/PublishSubscribeService.java |  52 ++++++
 .../connectors/pubsub/service/TopicHandler.java |  40 +++++
 .../test/connectors/pubsub/PubSubTest.java      | 179 +++++++++++++++++++
 9 files changed, 511 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/ca7627b0/connectors/.classpath
----------------------------------------------------------------------
diff --git a/connectors/.classpath b/connectors/.classpath
index a993300..722905a 100644
--- a/connectors/.classpath
+++ b/connectors/.classpath
@@ -16,6 +16,8 @@
 	<classpathentry kind="src" path="kafka/src/test/java"/>
 	<classpathentry kind="src" path="mqtt/src/main/java"/>
 	<classpathentry kind="src" path="mqtt/src/test/java"/>
+	<classpathentry kind="src" path="pubsub/src/main/java"/>
+	<classpathentry kind="src" path="pubsub/src/test/java"/>
 	<classpathentry kind="src" path="serial/src/main/java"/>
 	<classpathentry kind="src" path="serial/src/test/java"/>
 	<classpathentry kind="src" path="quarks.javax.websocket/src/main/java"/>

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/ca7627b0/connectors/pubsub/build.xml
----------------------------------------------------------------------
diff --git a/connectors/pubsub/build.xml b/connectors/pubsub/build.xml
new file mode 100644
index 0000000..3fa17a1
--- /dev/null
+++ b/connectors/pubsub/build.xml
@@ -0,0 +1,31 @@
+<project name="quarks.connectors.pubsub" default="all" 
+    xmlns:jacoco="antlib:org.jacoco.ant"
+    >
+    <description>
+        Build Publish Subscribe connector and service.
+    </description>
+
+  <property name="component.path" value="connectors/pubsub"/>
+  <import file="../../common-build.xml"/>
+
+  <path id="compile.classpath">
+    <pathelement location="${quarks.lib}/quarks.api.topology.jar" />
+    <path refid="quarks.ext.classpath" />
+  </path>
+
+
+  <path id="test.compile.classpath">
+    <pathelement location="${jar}" />
+    <pathelement location="${lib}/quarks.providers.direct.jar"/>
+    <pathelement location="../../api/topology/test.classes"/>
+    <pathelement location="../../providers/direct/test.classes"/>
+    <path refid="compile.classpath"/>
+  </path>
+
+  <path id="test.classpath">
+    <pathelement location="${test.classes}" />
+    <path refid="test.compile.classpath"/>
+    <path refid="test.common.classpath" />
+  </path>
+
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/ca7627b0/connectors/pubsub/src/main/java/quarks/connectors/pubsub/PublishSubscribe.java
----------------------------------------------------------------------
diff --git a/connectors/pubsub/src/main/java/quarks/connectors/pubsub/PublishSubscribe.java b/connectors/pubsub/src/main/java/quarks/connectors/pubsub/PublishSubscribe.java
new file mode 100644
index 0000000..5654891
--- /dev/null
+++ b/connectors/pubsub/src/main/java/quarks/connectors/pubsub/PublishSubscribe.java
@@ -0,0 +1,114 @@
+/*
+# Licensed Materials - Property of IBM
+# Copyright IBM Corp. 2016 
+*/
+
+package quarks.connectors.pubsub;
+
+import quarks.connectors.pubsub.oplets.Publish;
+import quarks.connectors.pubsub.service.PublishSubscribeService;
+import quarks.execution.services.RuntimeServices;
+import quarks.function.Consumer;
+import quarks.function.Supplier;
+import quarks.topology.TSink;
+import quarks.topology.TStream;
+import quarks.topology.Topology;
+import quarks.topology.TopologyElement;
+
+/**
+ * Publish subscribe model.
+ * <BR>
+ * A stream can be {@link #publish(TStream, String, Class) published } to a topic
+ * and then {@link #subscribe(TopologyElement, String, Class) subscribed} to by
+ * other topologies.
+ * 
+ * <P>
+ * A published topic has a type and subscribers must subscribe using the same
+ * topic and type (inheritance matching is not supported).
+ * <BR>
+ * Multiple streams from different topologies can be published to
+ * same topic (and type) and there can be multiple subscribers
+ * from different topologies.
+ * <BR>
+ * A subscriber can exist before a publisher exists, they are connected
+ * automatically when the job starts.
+ * </P>
+ * <P>
+ * If no {@link PublishSubscribeService} is registered then published
+ * tuples are discarded and subscribers see no tuples.
+ * </P>
+ */
+public class PublishSubscribe {
+
+
+
+    /**
+     * Publish this stream to a topic.
+     * This is a model that allows jobs to subscribe to 
+     * streams published by other jobs.
+     * 
+     * @param topic Topic to publish to.
+     * @param streamType Type of objects on the stream.
+     * @return sink element representing termination of this stream.
+     * 
+     * @see #subscribe(TopologyElement, String, Class)
+     */
+    public static <T> TSink<T> publish(TStream<T> stream, String topic, Class<? super T> streamType) {
+        return stream.sink(new Publish<>(topic, streamType));
+    }
+        
+    /**
+     * Subscribe to a published topic.
+     * This is a model that allows jobs to subscribe to 
+     * streams published by other jobs.
+     * @param topic Topic to subscribe to.
+     * @param streamType Type of the stream.
+     * @return Stream containing published tuples.
+     * 
+     * @see #publish(TStream, String, Class)
+     */
+    public static <T> TStream<T> subscribe(TopologyElement te, String topic, Class<T> streamType) {
+        
+        Topology topology = te.topology();
+        
+        Supplier<RuntimeServices> rts = topology.getRuntimeServiceSupplier();
+        
+        return te.topology().events(new SubscriberSetup<T>(topic, streamType, rts));
+    }
+    
+    /**
+     * Subscriber setup function that adds a subscriber on
+     * start up and removes it on close. 
+     *
+     * @param <T> Type of the tuples.
+     */
+    private static final class SubscriberSetup<T> implements Consumer<Consumer<T>>, AutoCloseable{
+        private static final long serialVersionUID = 1L;
+        
+        private final Supplier<RuntimeServices> rts;
+        private final String topic;
+        private final Class<T> streamType;
+        private Consumer<T> submitter;
+        
+        SubscriberSetup(String topic, Class<T> streamType, Supplier<RuntimeServices> rts) {
+            this.topic = topic;
+            this.streamType = streamType;
+            this.rts = rts;
+        }
+        @Override
+        public void accept(Consumer<T> submitter) {
+            PublishSubscribeService pubSub = rts.get().getService(PublishSubscribeService.class);
+            if (pubSub != null) {
+                this.submitter = submitter;
+                pubSub.addSubscriber(topic, streamType, submitter);
+            }
+        }
+        @Override
+        public void close() throws Exception {
+            PublishSubscribeService pubSub = rts.get().getService(PublishSubscribeService.class);
+            if (pubSub != null) {
+                pubSub.removeSubscriber(topic, submitter);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/ca7627b0/connectors/pubsub/src/main/java/quarks/connectors/pubsub/oplets/Publish.java
----------------------------------------------------------------------
diff --git a/connectors/pubsub/src/main/java/quarks/connectors/pubsub/oplets/Publish.java b/connectors/pubsub/src/main/java/quarks/connectors/pubsub/oplets/Publish.java
new file mode 100644
index 0000000..c42e398
--- /dev/null
+++ b/connectors/pubsub/src/main/java/quarks/connectors/pubsub/oplets/Publish.java
@@ -0,0 +1,35 @@
+/*
+# Licensed Materials - Property of IBM
+# Copyright IBM Corp. 2016 
+*/
+package quarks.connectors.pubsub.oplets;
+
+import quarks.connectors.pubsub.service.PublishSubscribeService;
+import quarks.oplet.OpletContext;
+import quarks.oplet.core.Sink;
+
+/**
+ * Publish a stream to a PublishSubscribeService service.
+ * If no such service exists then no tuples are published.
+ *
+ * @param <T> Type of the tuples.
+ */
+public class Publish<T> extends Sink<T> {
+    
+    private final String topic;
+    private final Class<? super T> streamType;
+    
+    public Publish(String topic, Class<? super T> streamType) {
+        this.topic = topic;
+        this.streamType = streamType;
+    }
+    
+    @Override
+    public void initialize(OpletContext<T, Void> context) {
+        super.initialize(context);
+        
+        PublishSubscribeService pubSub = context.getService(PublishSubscribeService.class);
+        if (pubSub != null)
+            setSinker(pubSub.getPublishDestination(topic, streamType));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/ca7627b0/connectors/pubsub/src/main/java/quarks/connectors/pubsub/package-info.java
----------------------------------------------------------------------
diff --git a/connectors/pubsub/src/main/java/quarks/connectors/pubsub/package-info.java b/connectors/pubsub/src/main/java/quarks/connectors/pubsub/package-info.java
new file mode 100644
index 0000000..c4bd874
--- /dev/null
+++ b/connectors/pubsub/src/main/java/quarks/connectors/pubsub/package-info.java
@@ -0,0 +1,9 @@
+/*
+# Licensed Materials - Property of IBM
+# Copyright IBM Corp. 2016 
+*/
+
+/**
+ */
+package quarks.connectors.pubsub;
+

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/ca7627b0/connectors/pubsub/src/main/java/quarks/connectors/pubsub/service/ProviderPubSub.java
----------------------------------------------------------------------
diff --git a/connectors/pubsub/src/main/java/quarks/connectors/pubsub/service/ProviderPubSub.java b/connectors/pubsub/src/main/java/quarks/connectors/pubsub/service/ProviderPubSub.java
new file mode 100644
index 0000000..754a93e
--- /dev/null
+++ b/connectors/pubsub/src/main/java/quarks/connectors/pubsub/service/ProviderPubSub.java
@@ -0,0 +1,49 @@
+/*
+# Licensed Materials - Property of IBM
+# Copyright IBM Corp. 2016 
+*/
+package quarks.connectors.pubsub.service;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import quarks.function.Consumer;
+
+public class ProviderPubSub implements PublishSubscribeService {
+    
+    private final Map<String,TopicHandler<?>> topicHandlers = new HashMap<>();
+    
+    @Override
+    public <T> void addSubscriber(String topic, Class<T> streamType, Consumer<T> subscriber) { 
+        getTopicHandler(topic, streamType).addSubscriber(subscriber);
+    }
+    
+    @SuppressWarnings("unchecked")
+    @Override
+    public <T> Consumer<T> getPublishDestination(String topic, Class<? super T> streamType) {
+        return (Consumer<T>) getTopicHandler(topic, streamType);      
+    }
+    
+    @Override
+    public void removeSubscriber(String topic, Consumer<?> subscriber) {
+        TopicHandler<?> topicHandler;
+        synchronized (this) {
+            topicHandler = topicHandlers.get(topic);
+        }
+        if (topicHandler != null) {
+            topicHandler.removeSubscriber(subscriber);
+        }
+    }
+    
+    @SuppressWarnings("unchecked")
+    private synchronized <T> TopicHandler<T> getTopicHandler(String topic, Class<T> streamType) {
+        TopicHandler<T> topicHandler = (TopicHandler<T>) topicHandlers.get(topic);
+
+        if (topicHandler == null) {
+            topicHandlers.put(topic, topicHandler = new TopicHandler<T>(streamType));
+        } else {
+            topicHandler.checkClass(streamType);
+        }
+        return topicHandler;
+    } 
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/ca7627b0/connectors/pubsub/src/main/java/quarks/connectors/pubsub/service/PublishSubscribeService.java
----------------------------------------------------------------------
diff --git a/connectors/pubsub/src/main/java/quarks/connectors/pubsub/service/PublishSubscribeService.java b/connectors/pubsub/src/main/java/quarks/connectors/pubsub/service/PublishSubscribeService.java
new file mode 100644
index 0000000..2cb2f3e
--- /dev/null
+++ b/connectors/pubsub/src/main/java/quarks/connectors/pubsub/service/PublishSubscribeService.java
@@ -0,0 +1,52 @@
+/*
+# Licensed Materials - Property of IBM
+# Copyright IBM Corp. 2016 
+*/
+package quarks.connectors.pubsub.service;
+
+import quarks.function.Consumer;
+
+/**
+ * Publish subscribe model for streams.
+ * <BR>
+ * Service that allows jobs to subscribe to 
+ * streams published by other jobs.
+ * <BR>
+ * This is an optional service that allows streams
+ * to be published by topic between jobs.
+ * <P>
+ * When an instance of this service is not available
+ * then {@link quarks.connectors.pubsub.PublishSubscribe#publish(quarks.topology.TStream, String, Class) publish}
+ * is a no-op, a sink that discards all tuples on the stream.
+ * <BR>
+ * A {@link quarks.connectors.pubsub.PublishSubscribe#subscribe(quarks.topology.TopologyElement, String, Class) subscribe} 
+ * will have no tuples when an instance of this service is not available.
+ * </P>
+ * 
+ * @see quarks.connectors.pubsub.PublishSubscribe#publish(quarks.topology.TStream, String, Class)
+ * @see quarks.connectors.pubsub.PublishSubscribe#subscribe(quarks.topology.TopologyElement, String, Class)
+ */
+public interface PublishSubscribeService {
+    
+    /**
+     * Add a subscriber to a published topic.
+     * 
+     * @param topic Topic to subscribe to.
+     * @param streamType Type of the stream.
+     * @param subscriber How to deliver published tuples to the subscriber.
+     */
+    <T> void addSubscriber(String topic, Class<T> streamType, Consumer<T> subscriber);
+    
+    void removeSubscriber(String topic, Consumer<?> subscriber);
+    
+    /**
+     * Get the destination for a publisher.
+     * A publisher calls {@code destination.accept(tuple)} to publish
+     * {@code tuple} to the topic.
+     * 
+     * @param topic Topic tuples will be published to.
+     * @param streamType Type of the stream
+     * @return Consumer that is used to publish tuples.
+     */
+    <T> Consumer<T> getPublishDestination(String topic, Class<? super T> streamType);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/ca7627b0/connectors/pubsub/src/main/java/quarks/connectors/pubsub/service/TopicHandler.java
----------------------------------------------------------------------
diff --git a/connectors/pubsub/src/main/java/quarks/connectors/pubsub/service/TopicHandler.java b/connectors/pubsub/src/main/java/quarks/connectors/pubsub/service/TopicHandler.java
new file mode 100644
index 0000000..e80e3ea
--- /dev/null
+++ b/connectors/pubsub/src/main/java/quarks/connectors/pubsub/service/TopicHandler.java
@@ -0,0 +1,40 @@
+/*
+# Licensed Materials - Property of IBM
+# Copyright IBM Corp. 2016 
+*/
+package quarks.connectors.pubsub.service;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import quarks.function.Consumer;
+
+class TopicHandler<T> implements Consumer<T> {
+    private static final long serialVersionUID = 1L;
+
+    private final Class<T> streamType;
+    private final Set<Consumer<T>> subscribers = new HashSet<>();
+
+    TopicHandler(Class<T> streamType) {
+        this.streamType = streamType;
+    }
+
+    synchronized void addSubscriber(Consumer<T> subscriber) {
+        subscribers.add(subscriber);
+    }
+
+    synchronized void removeSubscriber(Consumer<?> subscriber) {
+        subscribers.remove(subscriber);
+    }
+
+    @Override
+    public synchronized void accept(T tuple) {
+        for (Consumer<T> subscriber : subscribers)
+            subscriber.accept(tuple);
+    }
+
+    void checkClass(Class<T> streamType) {
+        if (this.streamType != streamType)
+            throw new IllegalArgumentException();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/ca7627b0/connectors/pubsub/src/test/java/quarks/test/connectors/pubsub/PubSubTest.java
----------------------------------------------------------------------
diff --git a/connectors/pubsub/src/test/java/quarks/test/connectors/pubsub/PubSubTest.java b/connectors/pubsub/src/test/java/quarks/test/connectors/pubsub/PubSubTest.java
new file mode 100644
index 0000000..37ead52
--- /dev/null
+++ b/connectors/pubsub/src/test/java/quarks/test/connectors/pubsub/PubSubTest.java
@@ -0,0 +1,179 @@
+/*
+# Licensed Materials - Property of IBM
+# Copyright IBM Corp. 2016 
+*/
+package quarks.test.connectors.pubsub;
+
+import static org.junit.Assert.assertTrue;
+
+import java.util.List;
+
+import org.junit.Test;
+
+import quarks.connectors.pubsub.PublishSubscribe;
+import quarks.connectors.pubsub.service.ProviderPubSub;
+import quarks.connectors.pubsub.service.PublishSubscribeService;
+import quarks.execution.Job;
+import quarks.execution.Job.Action;
+import quarks.providers.direct.DirectProvider;
+import quarks.topology.TStream;
+import quarks.topology.Topology;
+import quarks.topology.tester.Condition;
+import quarks.topology.tester.Tester;
+
+public class PubSubTest {
+
+    /**
+     * Test without a pub-sub service so no
+     * cross job connections will be made.
+     * @throws Exception
+     */
+    @Test
+    public void testNoService() throws Exception {
+        DirectProvider dp = new DirectProvider();
+
+        TStream<String> publishedStream = createPublisher(dp, "t1", String.class, "A", "B", "C");
+        Tester testPub = publishedStream.topology().getTester();
+        Condition<Long> tcPub = testPub.tupleCount(publishedStream, 3);
+
+        TStream<String> subscribedStream = createSubscriber(dp, "t1", String.class);
+        Tester testSub = subscribedStream.topology().getTester();
+        Condition<Long> tcSub = testSub.tupleCount(subscribedStream, 0); // Expect none
+
+        Job js = dp.submit(subscribedStream.topology()).get();
+        Job jp = dp.submit(publishedStream.topology()).get();
+
+        Thread.sleep(1500);
+
+        assertTrue(tcPub.valid());
+        assertTrue(tcSub.valid());
+
+        js.stateChange(Action.CLOSE);
+        jp.stateChange(Action.CLOSE);
+    }
+    
+    private <T> TStream<T> createPublisher(DirectProvider dp, String topic, Class<? super T> streamType, @SuppressWarnings("unchecked") T...values) {
+        Topology publisher = dp.newTopology("Pub");
+        TStream<T> stream = publisher.of(values);
+        PublishSubscribe.publish(stream, topic, streamType);
+        return stream;
+    }
+    
+    private <T> TStream<T> createSubscriber(DirectProvider dp, String topic, Class<T> streamType) {
+        Topology subscriber = dp.newTopology("Sub");
+        return PublishSubscribe.subscribe(subscriber, topic, streamType);     
+    }
+
+    @Test
+    public void testProviderServiceSingleSubscriber() throws Exception {
+        DirectProvider dp = new DirectProvider();
+
+        dp.getServices().addService(PublishSubscribeService.class, new ProviderPubSub());
+
+        TStream<String> publishedStream = createPublisher(dp, "t1", String.class, "A", "B", "C");
+        Tester testPub = publishedStream.topology().getTester();
+        Condition<List<String>> tcPub = testPub.streamContents(publishedStream, "A", "B", "C");
+
+        TStream<String> subscribedStream = createSubscriber(dp, "t1", String.class);
+        Tester testSub = subscribedStream.topology().getTester();
+        Condition<List<String>> tcSub = testSub.streamContents(subscribedStream, "A", "B", "C"); // Expect all tuples
+
+        Job js = dp.submit(subscribedStream.topology()).get();
+        Job jp = dp.submit(publishedStream.topology()).get();
+        
+        for (int i = 0; i < 30 && !tcSub.valid(); i++)
+            Thread.sleep(50);
+
+        assertTrue(tcPub.valid());
+        assertTrue(tcSub.valid());
+
+        js.stateChange(Action.CLOSE);
+        jp.stateChange(Action.CLOSE);
+    }
+    
+    @Test
+    public void testProviderServiceMultipleSubscriber() throws Exception {
+        DirectProvider dp = new DirectProvider();
+
+        dp.getServices().addService(PublishSubscribeService.class, new ProviderPubSub());
+
+        TStream<String> publishedStream = createPublisher(dp, "t1", String.class, "A", "B", "C");
+        Tester testPub = publishedStream.topology().getTester();
+        Condition<List<String>> tcPub = testPub.streamContents(publishedStream, "A", "B", "C");
+        
+        TStream<String> subscribedStream1 = createSubscriber(dp, "t1", String.class);
+        Tester testSub1 = subscribedStream1.topology().getTester();
+        Condition<List<String>> tcSub1 = testSub1.streamContents(subscribedStream1, "A", "B", "C");
+        
+        TStream<String> subscribedStream2 = createSubscriber(dp, "t1", String.class);
+        Tester testSub2 = subscribedStream2.topology().getTester();
+        Condition<List<String>> tcSub2 = testSub2.streamContents(subscribedStream2, "A", "B", "C");
+        
+        TStream<String> subscribedStream3 = createSubscriber(dp, "t1", String.class);
+        Tester testSub3 = subscribedStream3.topology().getTester();
+        Condition<List<String>> tcSub3 = testSub3.streamContents(subscribedStream3, "A", "B", "C");
+
+        
+        Job js1 = dp.submit(subscribedStream1.topology()).get();
+        Job js2 = dp.submit(subscribedStream2.topology()).get();
+        Job js3 = dp.submit(subscribedStream3.topology()).get();
+        
+        Job jp = dp.submit(publishedStream.topology()).get();
+        
+        for (int i = 0; i < 30 && !tcSub1.valid() && !tcSub2.valid() && !tcSub3.valid(); i++)
+            Thread.sleep(50);
+
+        assertTrue(tcPub.valid());
+        assertTrue(tcSub1.valid());
+        assertTrue(tcSub2.valid());
+        assertTrue(tcSub3.valid());
+
+        js1.stateChange(Action.CLOSE);
+        js2.stateChange(Action.CLOSE);
+        js3.stateChange(Action.CLOSE);
+        jp.stateChange(Action.CLOSE);
+    }
+    
+    @Test
+    public void testProviderServiceMultiplePublisher() throws Exception {
+        DirectProvider dp = new DirectProvider();
+
+        dp.getServices().addService(PublishSubscribeService.class, new ProviderPubSub());
+
+        TStream<Integer> publishedStream1 = createPublisher(dp, "i1", Integer.class, 1,2,3,82);
+        Tester testPub1 = publishedStream1.topology().getTester();
+        Condition<List<Integer>> tcPub1 = testPub1.streamContents(publishedStream1, 1,2,3,82);
+        
+        TStream<Integer> publishedStream2 = createPublisher(dp, "i1", Integer.class, 5,432,34,99);
+        Tester testPub2 = publishedStream2.topology().getTester();
+        Condition<List<Integer>> tcPub2 = testPub2.streamContents(publishedStream2, 5,432,34,99);
+ 
+        TStream<Integer> publishedStream3 = createPublisher(dp, "i1", Integer.class, 35,456,888,263,578);
+        Tester testPub3 = publishedStream3.topology().getTester();
+        Condition<List<Integer>> tcPub3 = testPub3.streamContents(publishedStream3, 35,456,888,263,578);
+ 
+
+        TStream<Integer> subscribedStream = createSubscriber(dp, "i1", Integer.class);
+        Tester testSub = subscribedStream.topology().getTester();
+        Condition<List<Integer>> tcSub = testSub.contentsUnordered(subscribedStream,
+                1,2,3,82,5,432,34,99,35,456,888,263,578); // Expect all tuples
+
+        Job js = dp.submit(subscribedStream.topology()).get();
+        Job jp1 = dp.submit(publishedStream1.topology()).get();
+        Job jp2 = dp.submit(publishedStream2.topology()).get();
+        Job jp3 = dp.submit(publishedStream3.topology()).get();
+        
+        for (int i = 0; i < 30 && !tcSub.valid(); i++)
+            Thread.sleep(50);
+
+        assertTrue(tcPub1.valid());
+        assertTrue(tcPub2.valid());
+        assertTrue(tcPub3.valid());
+        assertTrue(tcSub.valid());
+
+        js.stateChange(Action.CLOSE);
+        jp1.stateChange(Action.CLOSE);
+        jp2.stateChange(Action.CLOSE);
+        jp3.stateChange(Action.CLOSE);
+    }
+}