You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2017/03/31 21:00:36 UTC

samza git commit: SAMZA-1176; Make TestJoinOperator unit tests safe for concurrent execution

Repository: samza
Updated Branches:
  refs/heads/master 944c70878 -> 71004e1eb


SAMZA-1176; Make TestJoinOperator unit tests safe for concurrent execution

Author: Prateek Maheshwari <pm...@linkedin.com>

Reviewers: Jagadish <ja...@apache.org>

Closes #105 from prateekm/join-tests


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/71004e1e
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/71004e1e
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/71004e1e

Branch: refs/heads/master
Commit: 71004e1eb07bd57dfd5f5e72148476cbf646f3c3
Parents: 944c708
Author: Prateek Maheshwari <pm...@linkedin.com>
Authored: Fri Mar 31 14:00:30 2017 -0700
Committer: vjagadish1989 <jv...@linkedin.com>
Committed: Fri Mar 31 14:00:30 2017 -0700

----------------------------------------------------------------------
 .../samza/operators/TestJoinOperator.java       | 117 ++++++++++++-------
 1 file changed, 76 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/71004e1e/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java b/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java
index 07c6a97..4e6c750 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java
@@ -26,13 +26,14 @@ import org.apache.samza.operators.data.MessageEnvelope;
 import org.apache.samza.operators.functions.JoinFunction;
 import org.apache.samza.runtime.ApplicationRunner;
 import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.OutgoingMessageEnvelope;
 import org.apache.samza.system.StreamSpec;
+import org.apache.samza.system.SystemStream;
 import org.apache.samza.system.SystemStreamPartition;
 import org.apache.samza.task.MessageCollector;
 import org.apache.samza.task.StreamOperatorTask;
 import org.apache.samza.task.TaskContext;
 import org.apache.samza.task.TaskCoordinator;
-import org.junit.Before;
 import org.junit.Test;
 
 import java.time.Duration;
@@ -46,54 +47,47 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 public class TestJoinOperator {
-  private final MessageCollector messageCollector = mock(MessageCollector.class);
   private final TaskCoordinator taskCoordinator = mock(TaskCoordinator.class);
-  private final Set<Integer> numbers = ImmutableSet.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
-
-  private StreamOperatorTask sot;
-  private List<Integer> output = new ArrayList<>();
   private final ApplicationRunner runner = mock(ApplicationRunner.class);
-
-
-  @Before
-  public void setup() throws Exception {
-    output.clear();
-
-    TaskContext taskContext = mock(TaskContext.class);
-    when(taskContext.getSystemStreamPartitions()).thenReturn(ImmutableSet
-        .of(new SystemStreamPartition("insystem", "instream", new Partition(0)),
-            new SystemStreamPartition("insystem2", "instream2", new Partition(0))));
-    Config config = mock(Config.class);
-
-    StreamApplication sgb = new TestStreamApplication();
-    sot = new StreamOperatorTask(sgb, runner);
-    sot.init(config, taskContext);
-  }
+  private final Set<Integer> numbers = ImmutableSet.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
 
   @Test
-  public void join() {
+  public void join() throws Exception {
+    StreamOperatorTask sot = createStreamOperatorTask();
+    List<Integer> output = new ArrayList<>();
+    MessageCollector messageCollector = envelope -> output.add((Integer) envelope.getMessage());
+
     // push messages to first stream
     numbers.forEach(n -> sot.process(new FirstStreamIME(n, n), messageCollector, taskCoordinator));
     // push messages to second stream with same keys
     numbers.forEach(n -> sot.process(new SecondStreamIME(n, n), messageCollector, taskCoordinator));
 
+
     int outputSum = output.stream().reduce(0, (s, m) -> s + m);
-    assertEquals(outputSum, 110);
+    assertEquals(110, outputSum);
   }
 
   @Test
-  public void joinReverse() {
+  public void joinReverse() throws Exception {
+    StreamOperatorTask sot = createStreamOperatorTask();
+    List<Integer> output = new ArrayList<>();
+    MessageCollector messageCollector = envelope -> output.add((Integer) envelope.getMessage());
+
     // push messages to second stream
     numbers.forEach(n -> sot.process(new SecondStreamIME(n, n), messageCollector, taskCoordinator));
     // push messages to first stream with same keys
     numbers.forEach(n -> sot.process(new FirstStreamIME(n, n), messageCollector, taskCoordinator));
 
     int outputSum = output.stream().reduce(0, (s, m) -> s + m);
-    assertEquals(outputSum, 110);
+    assertEquals(110, outputSum);
   }
 
   @Test
-  public void joinNoMatch() {
+  public void joinNoMatch() throws Exception {
+    StreamOperatorTask sot = createStreamOperatorTask();
+    List<Integer> output = new ArrayList<>();
+    MessageCollector messageCollector = envelope -> output.add((Integer) envelope.getMessage());
+
     // push messages to first stream
     numbers.forEach(n -> sot.process(new FirstStreamIME(n, n), messageCollector, taskCoordinator));
     // push messages to second stream with different keys
@@ -103,7 +97,11 @@ public class TestJoinOperator {
   }
 
   @Test
-  public void joinNoMatchReverse() {
+  public void joinNoMatchReverse() throws Exception {
+    StreamOperatorTask sot = createStreamOperatorTask();
+    List<Integer> output = new ArrayList<>();
+    MessageCollector messageCollector = envelope -> output.add((Integer) envelope.getMessage());
+
     // push messages to second stream
     numbers.forEach(n -> sot.process(new SecondStreamIME(n, n), messageCollector, taskCoordinator));
     // push messages to first stream with different keys
@@ -113,7 +111,11 @@ public class TestJoinOperator {
   }
 
   @Test
-  public void joinRetainsLatestMessageForKey() {
+  public void joinRetainsLatestMessageForKey() throws Exception {
+    StreamOperatorTask sot = createStreamOperatorTask();
+    List<Integer> output = new ArrayList<>();
+    MessageCollector messageCollector = envelope -> output.add((Integer) envelope.getMessage());
+
     // push messages to first stream
     numbers.forEach(n -> sot.process(new FirstStreamIME(n, n), messageCollector, taskCoordinator));
     // push messages to first stream again with same keys but different values
@@ -122,11 +124,15 @@ public class TestJoinOperator {
     numbers.forEach(n -> sot.process(new SecondStreamIME(n, n), messageCollector, taskCoordinator));
 
     int outputSum = output.stream().reduce(0, (s, m) -> s + m);
-    assertEquals(outputSum, 165); // should use latest messages in the first stream
+    assertEquals(165, outputSum); // should use latest messages in the first stream
   }
 
   @Test
-  public void joinRetainsLatestMessageForKeyReverse() {
+  public void joinRetainsLatestMessageForKeyReverse() throws Exception {
+    StreamOperatorTask sot = createStreamOperatorTask();
+    List<Integer> output = new ArrayList<>();
+    MessageCollector messageCollector = envelope -> output.add((Integer) envelope.getMessage());
+
     // push messages to second stream
     numbers.forEach(n -> sot.process(new SecondStreamIME(n, n), messageCollector, taskCoordinator));
     // push messages to second stream again with same keys but different values
@@ -135,47 +141,59 @@ public class TestJoinOperator {
     numbers.forEach(n -> sot.process(new FirstStreamIME(n, n), messageCollector, taskCoordinator));
 
     int outputSum = output.stream().reduce(0, (s, m) -> s + m);
-    assertEquals(outputSum, 165); // should use latest messages in the second stream
+    assertEquals(165, outputSum); // should use latest messages in the second stream
   }
 
   @Test
-  public void joinRetainsMatchedMessages() {
+  public void joinRetainsMatchedMessages() throws Exception {
+    StreamOperatorTask sot = createStreamOperatorTask();
+    List<Integer> output = new ArrayList<>();
+    MessageCollector messageCollector = envelope -> output.add((Integer) envelope.getMessage());
+
     // push messages to first stream
     numbers.forEach(n -> sot.process(new FirstStreamIME(n, n), messageCollector, taskCoordinator));
     // push messages to second stream with same key
     numbers.forEach(n -> sot.process(new SecondStreamIME(n, n), messageCollector, taskCoordinator));
 
     int outputSum = output.stream().reduce(0, (s, m) -> s + m);
-    assertEquals(outputSum, 110);
+    assertEquals(110, outputSum);
 
     output.clear();
 
     // push messages to first stream with same keys once again.
     numbers.forEach(n -> sot.process(new FirstStreamIME(n, n), messageCollector, taskCoordinator));
     int newOutputSum = output.stream().reduce(0, (s, m) -> s + m);
-    assertEquals(newOutputSum, 110); // should produce the same output as before
+    assertEquals(110, newOutputSum); // should produce the same output as before
   }
 
   @Test
-  public void joinRetainsMatchedMessagesReverse() {
+  public void joinRetainsMatchedMessagesReverse() throws Exception {
+    StreamOperatorTask sot = createStreamOperatorTask();
+    List<Integer> output = new ArrayList<>();
+    MessageCollector messageCollector = envelope -> output.add((Integer) envelope.getMessage());
+
     // push messages to first stream
     numbers.forEach(n -> sot.process(new FirstStreamIME(n, n), messageCollector, taskCoordinator));
     // push messages to second stream with same key
     numbers.forEach(n -> sot.process(new SecondStreamIME(n, n), messageCollector, taskCoordinator));
 
     int outputSum = output.stream().reduce(0, (s, m) -> s + m);
-    assertEquals(outputSum, 110);
+    assertEquals(110, outputSum);
 
     output.clear();
 
     // push messages to second stream with same keys once again.
     numbers.forEach(n -> sot.process(new SecondStreamIME(n, n), messageCollector, taskCoordinator));
     int newOutputSum = output.stream().reduce(0, (s, m) -> s + m);
-    assertEquals(newOutputSum, 110); // should produce the same output as before
+    assertEquals(110, newOutputSum); // should produce the same output as before
   }
 
   @Test
   public void joinRemovesExpiredMessages() throws Exception {
+    StreamOperatorTask sot = createStreamOperatorTask();
+    List<Integer> output = new ArrayList<>();
+    MessageCollector messageCollector = envelope -> output.add((Integer) envelope.getMessage());
+
     // push messages to first stream
     numbers.forEach(n -> sot.process(new FirstStreamIME(n, n), messageCollector, taskCoordinator));
 
@@ -191,6 +209,10 @@ public class TestJoinOperator {
 
   @Test
   public void joinRemovesExpiredMessagesReverse() throws Exception {
+    StreamOperatorTask sot = createStreamOperatorTask();
+    List<Integer> output = new ArrayList<>();
+    MessageCollector messageCollector = envelope -> output.add((Integer) envelope.getMessage());
+
     // push messages to second stream
     numbers.forEach(n -> sot.process(new SecondStreamIME(n, n), messageCollector, taskCoordinator));
 
@@ -203,6 +225,19 @@ public class TestJoinOperator {
     assertTrue(output.isEmpty());
   }
 
+  private StreamOperatorTask createStreamOperatorTask() throws Exception {
+    TaskContext taskContext = mock(TaskContext.class);
+    when(taskContext.getSystemStreamPartitions()).thenReturn(ImmutableSet
+        .of(new SystemStreamPartition("insystem", "instream", new Partition(0)),
+            new SystemStreamPartition("insystem2", "instream2", new Partition(0))));
+    Config config = mock(Config.class);
+
+    StreamApplication sgb = new TestStreamApplication();
+    StreamOperatorTask sot = new StreamOperatorTask(sgb, runner);
+    sot.init(config, taskContext);
+    return sot;
+  }
+
   private class TestStreamApplication implements StreamApplication {
     StreamSpec inStreamSpec = new StreamSpec("instream", "instream", "insystem");
     StreamSpec inStreamSpec2 = new StreamSpec("instream2", "instream2", "insystem2");
@@ -212,11 +247,11 @@ public class TestJoinOperator {
       MessageStream<MessageEnvelope<Integer, Integer>> inStream = graph.createInStream(inStreamSpec, null, null);
       MessageStream<MessageEnvelope<Integer, Integer>> inStream2 = graph.createInStream(inStreamSpec2, null, null);
 
+      SystemStream outputSystemStream = new SystemStream("outputSystem", "outputStream");
       inStream
           .join(inStream2, new TestJoinFunction(), Duration.ofMillis(10))
-          .map(m -> {
-              output.add(m);
-              return m;
+          .sink((message, messageCollector, taskCoordinator) -> {
+              messageCollector.send(new OutgoingMessageEnvelope(outputSystemStream, message));
             });
     }
   }