You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2017/03/31 21:00:36 UTC
samza git commit: SAMZA-1176;
Make TestJoinOperator unit tests safe for concurrent execution
Repository: samza
Updated Branches:
refs/heads/master 944c70878 -> 71004e1eb
SAMZA-1176; Make TestJoinOperator unit tests safe for concurrent execution
Author: Prateek Maheshwari <pm...@linkedin.com>
Reviewers: Jagadish <ja...@apache.org>
Closes #105 from prateekm/join-tests
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/71004e1e
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/71004e1e
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/71004e1e
Branch: refs/heads/master
Commit: 71004e1eb07bd57dfd5f5e72148476cbf646f3c3
Parents: 944c708
Author: Prateek Maheshwari <pm...@linkedin.com>
Authored: Fri Mar 31 14:00:30 2017 -0700
Committer: vjagadish1989 <jv...@linkedin.com>
Committed: Fri Mar 31 14:00:30 2017 -0700
----------------------------------------------------------------------
.../samza/operators/TestJoinOperator.java | 117 ++++++++++++-------
1 file changed, 76 insertions(+), 41 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/71004e1e/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java b/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java
index 07c6a97..4e6c750 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java
@@ -26,13 +26,14 @@ import org.apache.samza.operators.data.MessageEnvelope;
import org.apache.samza.operators.functions.JoinFunction;
import org.apache.samza.runtime.ApplicationRunner;
import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.OutgoingMessageEnvelope;
import org.apache.samza.system.StreamSpec;
+import org.apache.samza.system.SystemStream;
import org.apache.samza.system.SystemStreamPartition;
import org.apache.samza.task.MessageCollector;
import org.apache.samza.task.StreamOperatorTask;
import org.apache.samza.task.TaskContext;
import org.apache.samza.task.TaskCoordinator;
-import org.junit.Before;
import org.junit.Test;
import java.time.Duration;
@@ -46,54 +47,47 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class TestJoinOperator {
- private final MessageCollector messageCollector = mock(MessageCollector.class);
private final TaskCoordinator taskCoordinator = mock(TaskCoordinator.class);
- private final Set<Integer> numbers = ImmutableSet.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
-
- private StreamOperatorTask sot;
- private List<Integer> output = new ArrayList<>();
private final ApplicationRunner runner = mock(ApplicationRunner.class);
-
-
- @Before
- public void setup() throws Exception {
- output.clear();
-
- TaskContext taskContext = mock(TaskContext.class);
- when(taskContext.getSystemStreamPartitions()).thenReturn(ImmutableSet
- .of(new SystemStreamPartition("insystem", "instream", new Partition(0)),
- new SystemStreamPartition("insystem2", "instream2", new Partition(0))));
- Config config = mock(Config.class);
-
- StreamApplication sgb = new TestStreamApplication();
- sot = new StreamOperatorTask(sgb, runner);
- sot.init(config, taskContext);
- }
+ private final Set<Integer> numbers = ImmutableSet.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
@Test
- public void join() {
+ public void join() throws Exception {
+ StreamOperatorTask sot = createStreamOperatorTask();
+ List<Integer> output = new ArrayList<>();
+ MessageCollector messageCollector = envelope -> output.add((Integer) envelope.getMessage());
+
// push messages to first stream
numbers.forEach(n -> sot.process(new FirstStreamIME(n, n), messageCollector, taskCoordinator));
// push messages to second stream with same keys
numbers.forEach(n -> sot.process(new SecondStreamIME(n, n), messageCollector, taskCoordinator));
+
int outputSum = output.stream().reduce(0, (s, m) -> s + m);
- assertEquals(outputSum, 110);
+ assertEquals(110, outputSum);
}
@Test
- public void joinReverse() {
+ public void joinReverse() throws Exception {
+ StreamOperatorTask sot = createStreamOperatorTask();
+ List<Integer> output = new ArrayList<>();
+ MessageCollector messageCollector = envelope -> output.add((Integer) envelope.getMessage());
+
// push messages to second stream
numbers.forEach(n -> sot.process(new SecondStreamIME(n, n), messageCollector, taskCoordinator));
// push messages to first stream with same keys
numbers.forEach(n -> sot.process(new FirstStreamIME(n, n), messageCollector, taskCoordinator));
int outputSum = output.stream().reduce(0, (s, m) -> s + m);
- assertEquals(outputSum, 110);
+ assertEquals(110, outputSum);
}
@Test
- public void joinNoMatch() {
+ public void joinNoMatch() throws Exception {
+ StreamOperatorTask sot = createStreamOperatorTask();
+ List<Integer> output = new ArrayList<>();
+ MessageCollector messageCollector = envelope -> output.add((Integer) envelope.getMessage());
+
// push messages to first stream
numbers.forEach(n -> sot.process(new FirstStreamIME(n, n), messageCollector, taskCoordinator));
// push messages to second stream with different keys
@@ -103,7 +97,11 @@ public class TestJoinOperator {
}
@Test
- public void joinNoMatchReverse() {
+ public void joinNoMatchReverse() throws Exception {
+ StreamOperatorTask sot = createStreamOperatorTask();
+ List<Integer> output = new ArrayList<>();
+ MessageCollector messageCollector = envelope -> output.add((Integer) envelope.getMessage());
+
// push messages to second stream
numbers.forEach(n -> sot.process(new SecondStreamIME(n, n), messageCollector, taskCoordinator));
// push messages to first stream with different keys
@@ -113,7 +111,11 @@ public class TestJoinOperator {
}
@Test
- public void joinRetainsLatestMessageForKey() {
+ public void joinRetainsLatestMessageForKey() throws Exception {
+ StreamOperatorTask sot = createStreamOperatorTask();
+ List<Integer> output = new ArrayList<>();
+ MessageCollector messageCollector = envelope -> output.add((Integer) envelope.getMessage());
+
// push messages to first stream
numbers.forEach(n -> sot.process(new FirstStreamIME(n, n), messageCollector, taskCoordinator));
// push messages to first stream again with same keys but different values
@@ -122,11 +124,15 @@ public class TestJoinOperator {
numbers.forEach(n -> sot.process(new SecondStreamIME(n, n), messageCollector, taskCoordinator));
int outputSum = output.stream().reduce(0, (s, m) -> s + m);
- assertEquals(outputSum, 165); // should use latest messages in the first stream
+ assertEquals(165, outputSum); // should use latest messages in the first stream
}
@Test
- public void joinRetainsLatestMessageForKeyReverse() {
+ public void joinRetainsLatestMessageForKeyReverse() throws Exception {
+ StreamOperatorTask sot = createStreamOperatorTask();
+ List<Integer> output = new ArrayList<>();
+ MessageCollector messageCollector = envelope -> output.add((Integer) envelope.getMessage());
+
// push messages to second stream
numbers.forEach(n -> sot.process(new SecondStreamIME(n, n), messageCollector, taskCoordinator));
// push messages to second stream again with same keys but different values
@@ -135,47 +141,59 @@ public class TestJoinOperator {
numbers.forEach(n -> sot.process(new FirstStreamIME(n, n), messageCollector, taskCoordinator));
int outputSum = output.stream().reduce(0, (s, m) -> s + m);
- assertEquals(outputSum, 165); // should use latest messages in the second stream
+ assertEquals(165, outputSum); // should use latest messages in the second stream
}
@Test
- public void joinRetainsMatchedMessages() {
+ public void joinRetainsMatchedMessages() throws Exception {
+ StreamOperatorTask sot = createStreamOperatorTask();
+ List<Integer> output = new ArrayList<>();
+ MessageCollector messageCollector = envelope -> output.add((Integer) envelope.getMessage());
+
// push messages to first stream
numbers.forEach(n -> sot.process(new FirstStreamIME(n, n), messageCollector, taskCoordinator));
// push messages to second stream with same key
numbers.forEach(n -> sot.process(new SecondStreamIME(n, n), messageCollector, taskCoordinator));
int outputSum = output.stream().reduce(0, (s, m) -> s + m);
- assertEquals(outputSum, 110);
+ assertEquals(110, outputSum);
output.clear();
// push messages to first stream with same keys once again.
numbers.forEach(n -> sot.process(new FirstStreamIME(n, n), messageCollector, taskCoordinator));
int newOutputSum = output.stream().reduce(0, (s, m) -> s + m);
- assertEquals(newOutputSum, 110); // should produce the same output as before
+ assertEquals(110, newOutputSum); // should produce the same output as before
}
@Test
- public void joinRetainsMatchedMessagesReverse() {
+ public void joinRetainsMatchedMessagesReverse() throws Exception {
+ StreamOperatorTask sot = createStreamOperatorTask();
+ List<Integer> output = new ArrayList<>();
+ MessageCollector messageCollector = envelope -> output.add((Integer) envelope.getMessage());
+
// push messages to first stream
numbers.forEach(n -> sot.process(new FirstStreamIME(n, n), messageCollector, taskCoordinator));
// push messages to second stream with same key
numbers.forEach(n -> sot.process(new SecondStreamIME(n, n), messageCollector, taskCoordinator));
int outputSum = output.stream().reduce(0, (s, m) -> s + m);
- assertEquals(outputSum, 110);
+ assertEquals(110, outputSum);
output.clear();
// push messages to second stream with same keys once again.
numbers.forEach(n -> sot.process(new SecondStreamIME(n, n), messageCollector, taskCoordinator));
int newOutputSum = output.stream().reduce(0, (s, m) -> s + m);
- assertEquals(newOutputSum, 110); // should produce the same output as before
+ assertEquals(110, newOutputSum); // should produce the same output as before
}
@Test
public void joinRemovesExpiredMessages() throws Exception {
+ StreamOperatorTask sot = createStreamOperatorTask();
+ List<Integer> output = new ArrayList<>();
+ MessageCollector messageCollector = envelope -> output.add((Integer) envelope.getMessage());
+
// push messages to first stream
numbers.forEach(n -> sot.process(new FirstStreamIME(n, n), messageCollector, taskCoordinator));
@@ -191,6 +209,10 @@ public class TestJoinOperator {
@Test
public void joinRemovesExpiredMessagesReverse() throws Exception {
+ StreamOperatorTask sot = createStreamOperatorTask();
+ List<Integer> output = new ArrayList<>();
+ MessageCollector messageCollector = envelope -> output.add((Integer) envelope.getMessage());
+
// push messages to second stream
numbers.forEach(n -> sot.process(new SecondStreamIME(n, n), messageCollector, taskCoordinator));
@@ -203,6 +225,19 @@ public class TestJoinOperator {
assertTrue(output.isEmpty());
}
+ private StreamOperatorTask createStreamOperatorTask() throws Exception {
+ TaskContext taskContext = mock(TaskContext.class);
+ when(taskContext.getSystemStreamPartitions()).thenReturn(ImmutableSet
+ .of(new SystemStreamPartition("insystem", "instream", new Partition(0)),
+ new SystemStreamPartition("insystem2", "instream2", new Partition(0))));
+ Config config = mock(Config.class);
+
+ StreamApplication sgb = new TestStreamApplication();
+ StreamOperatorTask sot = new StreamOperatorTask(sgb, runner);
+ sot.init(config, taskContext);
+ return sot;
+ }
+
private class TestStreamApplication implements StreamApplication {
StreamSpec inStreamSpec = new StreamSpec("instream", "instream", "insystem");
StreamSpec inStreamSpec2 = new StreamSpec("instream2", "instream2", "insystem2");
@@ -212,11 +247,11 @@ public class TestJoinOperator {
MessageStream<MessageEnvelope<Integer, Integer>> inStream = graph.createInStream(inStreamSpec, null, null);
MessageStream<MessageEnvelope<Integer, Integer>> inStream2 = graph.createInStream(inStreamSpec2, null, null);
+ SystemStream outputSystemStream = new SystemStream("outputSystem", "outputStream");
inStream
.join(inStream2, new TestJoinFunction(), Duration.ofMillis(10))
- .map(m -> {
- output.add(m);
- return m;
+ .sink((message, messageCollector, taskCoordinator) -> {
+ messageCollector.send(new OutgoingMessageEnvelope(outputSystemStream, message));
});
}
}