You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2017/06/07 19:42:20 UTC
[2/4] samza git commit: SAMZA-1221,
SAMZA-1101: Internal cleanup for High-Level API implementation.
http://git-wip-us.apache.org/repos/asf/samza/blob/c1c4289c/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java b/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
index b2a5e2a..61224f2 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
@@ -19,302 +19,343 @@
package org.apache.samza.operators;
import com.google.common.collect.ImmutableList;
-import org.apache.samza.config.Config;
-import org.apache.samza.config.JobConfig;
-import org.apache.samza.config.MapConfig;
-import org.apache.samza.operators.data.MessageType;
-import org.apache.samza.operators.data.TestExtOutputMessageEnvelope;
-import org.apache.samza.operators.data.TestInputMessageEnvelope;
import org.apache.samza.operators.data.TestMessageEnvelope;
import org.apache.samza.operators.data.TestOutputMessageEnvelope;
import org.apache.samza.operators.functions.FilterFunction;
import org.apache.samza.operators.functions.FlatMapFunction;
+import org.apache.samza.operators.functions.FoldLeftFunction;
import org.apache.samza.operators.functions.JoinFunction;
import org.apache.samza.operators.functions.MapFunction;
import org.apache.samza.operators.functions.SinkFunction;
+import org.apache.samza.operators.spec.JoinOperatorSpec;
import org.apache.samza.operators.spec.OperatorSpec;
+import org.apache.samza.operators.spec.OperatorSpec.OpCode;
+import org.apache.samza.operators.spec.OutputOperatorSpec;
+import org.apache.samza.operators.spec.OutputStreamImpl;
import org.apache.samza.operators.spec.SinkOperatorSpec;
-import org.apache.samza.operators.spec.PartialJoinOperatorSpec;
import org.apache.samza.operators.spec.StreamOperatorSpec;
-import org.apache.samza.runtime.ApplicationRunner;
-import org.apache.samza.system.OutgoingMessageEnvelope;
-import org.apache.samza.system.SystemStream;
-import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.TaskContext;
-import org.apache.samza.task.TaskCoordinator;
+import org.apache.samza.operators.spec.WindowOperatorSpec;
+import org.apache.samza.operators.stream.IntermediateMessageStreamImpl;
+import org.apache.samza.operators.windows.Window;
+import org.apache.samza.operators.windows.WindowPane;
+import org.apache.samza.operators.windows.Windows;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import java.time.Duration;
-import java.util.ArrayList;
import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.Collections;
+import java.util.function.BiFunction;
import java.util.function.Function;
+import java.util.function.Supplier;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
-
+@SuppressWarnings("unchecked")
public class TestMessageStreamImpl {
- private StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
-
@Test
public void testMap() {
- MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph);
- MapFunction<TestMessageEnvelope, TestOutputMessageEnvelope> xMap = (TestMessageEnvelope m) ->
- new TestOutputMessageEnvelope(m.getKey(), m.getMessage().getValue().length() + 1);
- MessageStream<TestOutputMessageEnvelope> outputStream = inputStream.map(xMap);
- Collection<OperatorSpec> subs = inputStream.getRegisteredOperatorSpecs();
- assertEquals(subs.size(), 1);
- OperatorSpec<TestOutputMessageEnvelope> mapOp = subs.iterator().next();
- assertTrue(mapOp instanceof StreamOperatorSpec);
- assertEquals(mapOp.getNextStream(), outputStream);
- // assert that the transformation function is what we defined above
- TestMessageEnvelope xTestMsg = mock(TestMessageEnvelope.class);
- MessageType mockInnerTestMessage = mock(MessageType.class);
- when(xTestMsg.getKey()).thenReturn("test-msg-key");
- when(xTestMsg.getMessage()).thenReturn(mockInnerTestMessage);
- when(mockInnerTestMessage.getValue()).thenReturn("123456789");
-
- Collection<TestOutputMessageEnvelope> cOutputMsg = ((StreamOperatorSpec<TestMessageEnvelope, TestOutputMessageEnvelope>) mapOp).getTransformFn().apply(xTestMsg);
- assertEquals(cOutputMsg.size(), 1);
- TestOutputMessageEnvelope outputMessage = cOutputMsg.iterator().next();
- assertEquals(outputMessage.getKey(), xTestMsg.getKey());
- assertEquals(outputMessage.getMessage(), Integer.valueOf(xTestMsg.getMessage().getValue().length() + 1));
+ StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
+ OperatorSpec mockOpSpec = mock(OperatorSpec.class);
+ MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph, mockOpSpec);
+
+ MapFunction<TestMessageEnvelope, TestOutputMessageEnvelope> mockMapFn = mock(MapFunction.class);
+ inputStream.map(mockMapFn);
+
+ ArgumentCaptor<OperatorSpec> registeredOpCaptor = ArgumentCaptor.forClass(OperatorSpec.class);
+ verify(mockOpSpec).registerNextOperatorSpec(registeredOpCaptor.capture());
+ OperatorSpec<?, TestMessageEnvelope> registeredOpSpec = registeredOpCaptor.getValue();
+ assertTrue(registeredOpSpec instanceof StreamOperatorSpec);
+
+ FlatMapFunction transformFn = ((StreamOperatorSpec) registeredOpSpec).getTransformFn();
+ assertNotNull(transformFn);
+ assertEquals(OpCode.MAP, registeredOpSpec.getOpCode());
+
+ TestOutputMessageEnvelope mockOutput = mock(TestOutputMessageEnvelope.class);
+ when(mockMapFn.apply(anyObject())).thenReturn(mockOutput);
+ assertTrue(transformFn.apply(new Object()).contains(mockOutput));
+ when(mockMapFn.apply(anyObject())).thenReturn(null);
+ assertTrue(transformFn.apply(null).isEmpty());
}
@Test
public void testFlatMap() {
- MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph);
- List<TestOutputMessageEnvelope> flatOuts = new ArrayList<TestOutputMessageEnvelope>() { {
- this.add(mock(TestOutputMessageEnvelope.class));
- this.add(mock(TestOutputMessageEnvelope.class));
- this.add(mock(TestOutputMessageEnvelope.class));
- } };
- final List<TestMessageEnvelope> inputMsgs = new ArrayList<>();
- FlatMapFunction<TestMessageEnvelope, TestOutputMessageEnvelope> xFlatMap = (TestMessageEnvelope message) -> {
- inputMsgs.add(message);
- return flatOuts;
- };
- MessageStream<TestOutputMessageEnvelope> outputStream = inputStream.flatMap(xFlatMap);
- Collection<OperatorSpec> subs = inputStream.getRegisteredOperatorSpecs();
- assertEquals(subs.size(), 1);
- OperatorSpec<TestOutputMessageEnvelope> flatMapOp = subs.iterator().next();
- assertTrue(flatMapOp instanceof StreamOperatorSpec);
- assertEquals(flatMapOp.getNextStream(), outputStream);
- assertEquals(((StreamOperatorSpec) flatMapOp).getTransformFn(), xFlatMap);
-
- TestMessageEnvelope mockInput = mock(TestMessageEnvelope.class);
- // assert that the transformation function is what we defined above
- List<TestOutputMessageEnvelope> result = (List<TestOutputMessageEnvelope>)
- ((StreamOperatorSpec<TestMessageEnvelope, TestOutputMessageEnvelope>) flatMapOp).getTransformFn().apply(mockInput);
- assertEquals(flatOuts, result);
- assertEquals(inputMsgs.size(), 1);
- assertEquals(inputMsgs.get(0), mockInput);
+ StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
+ OperatorSpec mockOpSpec = mock(OperatorSpec.class);
+ MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph, mockOpSpec);
+
+ inputStream.flatMap(mock(FlatMapFunction.class));
+
+ ArgumentCaptor<OperatorSpec> registeredOpCaptor = ArgumentCaptor.forClass(OperatorSpec.class);
+ verify(mockOpSpec).registerNextOperatorSpec(registeredOpCaptor.capture());
+ OperatorSpec<?, TestMessageEnvelope> registeredOpSpec = registeredOpCaptor.getValue();
+
+ assertTrue(registeredOpSpec instanceof StreamOperatorSpec);
+ assertNotNull(((StreamOperatorSpec) registeredOpSpec).getTransformFn());
+ assertEquals(OpCode.FLAT_MAP, registeredOpSpec.getOpCode());
}
@Test
public void testFlatMapWithRelaxedTypes() {
- MessageStreamImpl<TestInputMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph);
- List<TestExtOutputMessageEnvelope> flatOuts = new ArrayList<TestExtOutputMessageEnvelope>() { {
- this.add(new TestExtOutputMessageEnvelope("output-key-1", 1, "output-id-001"));
- this.add(new TestExtOutputMessageEnvelope("output-key-2", 2, "output-id-002"));
- this.add(new TestExtOutputMessageEnvelope("output-key-3", 3, "output-id-003"));
- } };
-
- class MyFlatMapFunction implements FlatMapFunction<TestMessageEnvelope, TestExtOutputMessageEnvelope> {
- public final List<TestMessageEnvelope> inputMsgs = new ArrayList<>();
-
- @Override
- public Collection<TestExtOutputMessageEnvelope> apply(TestMessageEnvelope message) {
- inputMsgs.add(message);
- return flatOuts;
- }
-
- @Override
- public void init(Config config, TaskContext context) {
- inputMsgs.clear();
- }
- }
-
- MyFlatMapFunction xFlatMap = new MyFlatMapFunction();
-
- MessageStream<TestOutputMessageEnvelope> outputStream = inputStream.flatMap(xFlatMap);
- Collection<OperatorSpec> subs = inputStream.getRegisteredOperatorSpecs();
- assertEquals(subs.size(), 1);
- OperatorSpec<TestOutputMessageEnvelope> flatMapOp = subs.iterator().next();
- assertTrue(flatMapOp instanceof StreamOperatorSpec);
- assertEquals(flatMapOp.getNextStream(), outputStream);
- assertEquals(((StreamOperatorSpec) flatMapOp).getTransformFn(), xFlatMap);
-
- TestMessageEnvelope mockInput = mock(TestMessageEnvelope.class);
- // assert that the transformation function is what we defined above
- List<TestOutputMessageEnvelope> result = (List<TestOutputMessageEnvelope>)
- ((StreamOperatorSpec<TestMessageEnvelope, TestOutputMessageEnvelope>) flatMapOp).getTransformFn().apply(mockInput);
- assertEquals(flatOuts, result);
- assertEquals(xFlatMap.inputMsgs.size(), 1);
- assertEquals(xFlatMap.inputMsgs.get(0), mockInput);
+ StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
+ OperatorSpec mockOpSpec = mock(OperatorSpec.class);
+ MessageStreamImpl<TestInputMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph, mockOpSpec);
+
+ FlatMapFunction flatMapFunction =
+ (FlatMapFunction<TestMessageEnvelope, TestOutputMessageEnvelope>) message -> Collections.emptyList();
+ // should compile since TestInputMessageEnvelope extends TestMessageEnvelope
+ inputStream.flatMap(flatMapFunction);
+
+ ArgumentCaptor<OperatorSpec> registeredOpCaptor = ArgumentCaptor.forClass(OperatorSpec.class);
+ verify(mockOpSpec).registerNextOperatorSpec(registeredOpCaptor.capture());
+ OperatorSpec<?, TestMessageEnvelope> registeredOpSpec = registeredOpCaptor.getValue();
+
+ assertTrue(registeredOpSpec instanceof StreamOperatorSpec);
+ assertNotNull(((StreamOperatorSpec) registeredOpSpec).getTransformFn());
+ assertEquals(OpCode.FLAT_MAP, registeredOpSpec.getOpCode());
}
@Test
public void testFilter() {
- MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph);
- FilterFunction<TestMessageEnvelope> xFilter = (TestMessageEnvelope m) -> m.getMessage().getEventTime() > 123456L;
- MessageStream<TestMessageEnvelope> outputStream = inputStream.filter(xFilter);
- Collection<OperatorSpec> subs = inputStream.getRegisteredOperatorSpecs();
- assertEquals(subs.size(), 1);
- OperatorSpec<TestMessageEnvelope> filterOp = subs.iterator().next();
- assertTrue(filterOp instanceof StreamOperatorSpec);
- assertEquals(filterOp.getNextStream(), outputStream);
- // assert that the transformation function is what we defined above
- FlatMapFunction<TestMessageEnvelope, TestMessageEnvelope> txfmFn = ((StreamOperatorSpec<TestMessageEnvelope, TestMessageEnvelope>) filterOp).getTransformFn();
- TestMessageEnvelope mockMsg = mock(TestMessageEnvelope.class);
- MessageType mockInnerTestMessage = mock(MessageType.class);
- when(mockMsg.getMessage()).thenReturn(mockInnerTestMessage);
- when(mockInnerTestMessage.getEventTime()).thenReturn(11111L);
- Collection<TestMessageEnvelope> output = txfmFn.apply(mockMsg);
- assertTrue(output.isEmpty());
- when(mockMsg.getMessage()).thenReturn(mockInnerTestMessage);
- when(mockInnerTestMessage.getEventTime()).thenReturn(999999L);
- output = txfmFn.apply(mockMsg);
- assertEquals(output.size(), 1);
- assertEquals(output.iterator().next(), mockMsg);
+ StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
+ OperatorSpec mockOpSpec = mock(OperatorSpec.class);
+ MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph, mockOpSpec);
+
+ FilterFunction<Object> mockFilterFn = mock(FilterFunction.class);
+ inputStream.filter(mockFilterFn);
+
+ ArgumentCaptor<OperatorSpec> registeredOpCaptor = ArgumentCaptor.forClass(OperatorSpec.class);
+ verify(mockOpSpec).registerNextOperatorSpec(registeredOpCaptor.capture());
+ OperatorSpec<?, TestMessageEnvelope> registeredOpSpec = registeredOpCaptor.getValue();
+ assertTrue(registeredOpSpec instanceof StreamOperatorSpec);
+
+ FlatMapFunction transformFn = ((StreamOperatorSpec) registeredOpSpec).getTransformFn();
+ assertNotNull(transformFn);
+ assertEquals(OpCode.FILTER, registeredOpSpec.getOpCode());
+
+ Object mockInput = new Object();
+ when(mockFilterFn.apply(anyObject())).thenReturn(true);
+ assertTrue(transformFn.apply(mockInput).contains(mockInput));
+ when(mockFilterFn.apply(anyObject())).thenReturn(false);
+ assertTrue(transformFn.apply(mockInput).isEmpty());
}
@Test
public void testSink() {
- MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph);
- SystemStream testStream = new SystemStream("test-sys", "test-stream");
- SinkFunction<TestMessageEnvelope> xSink = (TestMessageEnvelope m, MessageCollector mc, TaskCoordinator tc) -> {
- mc.send(new OutgoingMessageEnvelope(testStream, m.getMessage()));
- tc.commit(TaskCoordinator.RequestScope.CURRENT_TASK);
- };
- inputStream.sink(xSink);
- Collection<OperatorSpec> subs = inputStream.getRegisteredOperatorSpecs();
- assertEquals(subs.size(), 1);
- OperatorSpec<TestMessageEnvelope> sinkOp = subs.iterator().next();
- assertTrue(sinkOp instanceof SinkOperatorSpec);
- assertEquals(((SinkOperatorSpec) sinkOp).getSinkFn(), xSink);
-
- TestMessageEnvelope mockTest1 = mock(TestMessageEnvelope.class);
- MessageType mockMsgBody = mock(MessageType.class);
- when(mockTest1.getMessage()).thenReturn(mockMsgBody);
- final List<OutgoingMessageEnvelope> outMsgs = new ArrayList<>();
- MessageCollector mockCollector = mock(MessageCollector.class);
- doAnswer(invocation -> {
- outMsgs.add((OutgoingMessageEnvelope) invocation.getArguments()[0]);
- return null;
- }).when(mockCollector).send(any());
- TaskCoordinator mockCoordinator = mock(TaskCoordinator.class);
- ((SinkOperatorSpec) sinkOp).getSinkFn().apply(mockTest1, mockCollector, mockCoordinator);
- assertEquals(1, outMsgs.size());
- assertEquals(testStream, outMsgs.get(0).getSystemStream());
- assertEquals(mockMsgBody, outMsgs.get(0).getMessage());
+ StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
+ OperatorSpec mockOpSpec = mock(OperatorSpec.class);
+ MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph, mockOpSpec);
+
+ inputStream.sink(mock(SinkFunction.class));
+
+ ArgumentCaptor<OperatorSpec> registeredOpCaptor = ArgumentCaptor.forClass(OperatorSpec.class);
+ verify(mockOpSpec).registerNextOperatorSpec(registeredOpCaptor.capture());
+ OperatorSpec<?, TestMessageEnvelope> registeredOpSpec = registeredOpCaptor.getValue();
+
+ assertTrue(registeredOpSpec instanceof SinkOperatorSpec);
+ assertNotNull(((SinkOperatorSpec) registeredOpSpec).getSinkFn());
+ assertEquals(OpCode.SINK, registeredOpSpec.getOpCode());
}
@Test
- public void testJoin() {
- MessageStreamImpl<TestMessageEnvelope> source1 = new MessageStreamImpl<>(mockGraph);
- MessageStreamImpl<TestMessageEnvelope> source2 = new MessageStreamImpl<>(mockGraph);
- JoinFunction<String, TestMessageEnvelope, TestMessageEnvelope, TestOutputMessageEnvelope> joiner =
- new JoinFunction<String, TestMessageEnvelope, TestMessageEnvelope, TestOutputMessageEnvelope>() {
- @Override
- public TestOutputMessageEnvelope apply(TestMessageEnvelope m1, TestMessageEnvelope m2) {
- return new TestOutputMessageEnvelope(m1.getKey(), m1.getMessage().getValue().length() + m2.getMessage().getValue().length());
- }
-
- @Override
- public String getFirstKey(TestMessageEnvelope message) {
- return message.getKey();
- }
-
- @Override
- public String getSecondKey(TestMessageEnvelope message) {
- return message.getKey();
- }
- };
-
- MessageStream<TestOutputMessageEnvelope> joinOutput = source1.join(source2, joiner, Duration.ofMinutes(1));
- Collection<OperatorSpec> subs = source1.getRegisteredOperatorSpecs();
- assertEquals(subs.size(), 1);
- OperatorSpec<TestMessageEnvelope> joinOp1 = subs.iterator().next();
- assertTrue(joinOp1 instanceof PartialJoinOperatorSpec);
- assertEquals(((PartialJoinOperatorSpec) joinOp1).getNextStream(), joinOutput);
- subs = source2.getRegisteredOperatorSpecs();
- assertEquals(subs.size(), 1);
- OperatorSpec<TestMessageEnvelope> joinOp2 = subs.iterator().next();
- assertTrue(joinOp2 instanceof PartialJoinOperatorSpec);
- assertEquals(((PartialJoinOperatorSpec) joinOp2).getNextStream(), joinOutput);
- TestMessageEnvelope joinMsg1 = new TestMessageEnvelope("test-join-1", "join-msg-001", 11111L);
- TestMessageEnvelope joinMsg2 = new TestMessageEnvelope("test-join-2", "join-msg-002", 22222L);
- TestOutputMessageEnvelope xOut = (TestOutputMessageEnvelope) ((PartialJoinOperatorSpec) joinOp1).getThisPartialJoinFn().apply(joinMsg1, joinMsg2);
- assertEquals(xOut.getKey(), "test-join-1");
- assertEquals(xOut.getMessage(), Integer.valueOf(24));
- xOut = (TestOutputMessageEnvelope) ((PartialJoinOperatorSpec) joinOp2).getThisPartialJoinFn().apply(joinMsg2, joinMsg1);
- assertEquals(xOut.getKey(), "test-join-1");
- assertEquals(xOut.getMessage(), Integer.valueOf(24));
+ public void testSendTo() {
+ StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
+ OperatorSpec mockOpSpec = mock(OperatorSpec.class);
+ MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph, mockOpSpec);
+
+ OutputStreamImpl mockOutputOpSpec = mock(OutputStreamImpl.class);
+ inputStream.sendTo(mockOutputOpSpec);
+
+ ArgumentCaptor<OperatorSpec> registeredOpCaptor = ArgumentCaptor.forClass(OperatorSpec.class);
+ verify(mockOpSpec).registerNextOperatorSpec(registeredOpCaptor.capture());
+ OperatorSpec<?, TestMessageEnvelope> registeredOpSpec = registeredOpCaptor.getValue();
+
+ assertTrue(registeredOpSpec instanceof OutputOperatorSpec);
+ assertEquals(OpCode.SEND_TO, registeredOpSpec.getOpCode());
+ assertEquals(mockOutputOpSpec, ((OutputOperatorSpec) registeredOpSpec).getOutputStream());
}
@Test
- public void testMerge() {
- MessageStream<TestMessageEnvelope> merge1 = new MessageStreamImpl<>(mockGraph);
- Collection<MessageStream<TestMessageEnvelope>> others = ImmutableList.of(
- new MessageStreamImpl<>(mockGraph), new MessageStreamImpl<>(mockGraph));
- MessageStream<TestMessageEnvelope> mergeOutput = merge1.merge(others);
- validateMergeOperator(merge1, mergeOutput);
+ public void testPartitionBy() {
+ StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
+ OperatorSpec mockOpSpec = mock(OperatorSpec.class);
+
+ String streamName = String.format("%s-%s", OperatorSpec.OpCode.PARTITION_BY.name().toLowerCase(), 0);
+ Function<TestMessageEnvelope, String> mockKeyFn = mock(Function.class);
+ OutputStreamImpl mockOutputOpSpec = mock(OutputStreamImpl.class);
+ IntermediateMessageStreamImpl mockIntermediateStream = mock(IntermediateMessageStreamImpl.class);
+ when(mockGraph.getIntermediateStream(eq(streamName), eq(mockKeyFn), any(Function.class), any(BiFunction.class)))
+ .thenReturn(mockIntermediateStream);
+ when(mockIntermediateStream.getOutputStream())
+ .thenReturn(mockOutputOpSpec);
+
+ MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph, mockOpSpec);
+ inputStream.partitionBy(mockKeyFn);
+
+ ArgumentCaptor<OperatorSpec> registeredOpCaptor = ArgumentCaptor.forClass(OperatorSpec.class);
+ verify(mockOpSpec).registerNextOperatorSpec(registeredOpCaptor.capture());
+ OperatorSpec<?, TestMessageEnvelope> registeredOpSpec = registeredOpCaptor.getValue();
+
+ assertTrue(registeredOpSpec instanceof OutputOperatorSpec);
+ assertEquals(OpCode.PARTITION_BY, registeredOpSpec.getOpCode());
+ assertEquals(mockOutputOpSpec, ((OutputOperatorSpec) registeredOpSpec).getOutputStream());
+ }
- others.forEach(merge -> validateMergeOperator(merge, mergeOutput));
+ @Test
+ public void testWindowWithRelaxedTypes() throws Exception {
+ StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
+ OperatorSpec mockOpSpec = mock(OperatorSpec.class);
+ MessageStream<TestInputMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph, mockOpSpec);
+
+ Function<TestMessageEnvelope, String> keyExtractor = m -> m.getKey();
+ FoldLeftFunction<TestMessageEnvelope, Integer> aggregator = (m, c) -> c + 1;
+ Supplier<Integer> initialValue = () -> 0;
+
+ // should compile since TestMessageEnvelope (input for functions) is base class of TestInputMessageEnvelope (M)
+ Window<TestInputMessageEnvelope, String, Integer> window = Windows
+ .keyedTumblingWindow(keyExtractor, Duration.ofHours(1), initialValue, aggregator);
+ MessageStream<WindowPane<String, Integer>> windowedStream = inputStream.window(window);
+
+ ArgumentCaptor<OperatorSpec> registeredOpCaptor = ArgumentCaptor.forClass(OperatorSpec.class);
+ verify(mockOpSpec).registerNextOperatorSpec(registeredOpCaptor.capture());
+ OperatorSpec<?, TestMessageEnvelope> registeredOpSpec = registeredOpCaptor.getValue();
+
+ assertTrue(registeredOpSpec instanceof WindowOperatorSpec);
+ assertEquals(OpCode.WINDOW, registeredOpSpec.getOpCode());
+ assertEquals(window, ((WindowOperatorSpec) registeredOpSpec).getWindow());
}
@Test
- public void testMergeWithRelaxedTypes() {
- MessageStream<TestMessageEnvelope> input1 = new MessageStreamImpl<>(mockGraph);
- Collection<MessageStream<? extends TestMessageEnvelope>> others = ImmutableList.of(
- new MessageStreamImpl<TestInputMessageEnvelope>(mockGraph),
- new MessageStreamImpl<TestMessageEnvelope>(mockGraph));
+ public void testJoin() {
+ StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
+ OperatorSpec leftInputOpSpec = mock(OperatorSpec.class);
+ MessageStreamImpl<TestMessageEnvelope> source1 = new MessageStreamImpl<>(mockGraph, leftInputOpSpec);
+ OperatorSpec rightInputOpSpec = mock(OperatorSpec.class);
+ MessageStreamImpl<TestMessageEnvelope> source2 = new MessageStreamImpl<>(mockGraph, rightInputOpSpec);
+
+ JoinFunction<String, TestMessageEnvelope, TestMessageEnvelope, TestOutputMessageEnvelope> mockJoinFn =
+ mock(JoinFunction.class);
+
+ Duration joinTtl = Duration.ofMinutes(1);
+ source1.join(source2, mockJoinFn, joinTtl);
+
+ ArgumentCaptor<OperatorSpec> leftRegisteredOpCaptor = ArgumentCaptor.forClass(OperatorSpec.class);
+ verify(leftInputOpSpec).registerNextOperatorSpec(leftRegisteredOpCaptor.capture());
+ OperatorSpec<?, TestMessageEnvelope> leftRegisteredOpSpec = leftRegisteredOpCaptor.getValue();
+
+ ArgumentCaptor<OperatorSpec> rightRegisteredOpCaptor = ArgumentCaptor.forClass(OperatorSpec.class);
+ verify(rightInputOpSpec).registerNextOperatorSpec(rightRegisteredOpCaptor.capture());
+ OperatorSpec<?, TestMessageEnvelope> rightRegisteredOpSpec = rightRegisteredOpCaptor.getValue();
+
+ assertEquals(leftRegisteredOpSpec, rightRegisteredOpSpec);
+ assertEquals(OpCode.JOIN, leftRegisteredOpSpec.getOpCode());
+ assertTrue(leftRegisteredOpSpec instanceof JoinOperatorSpec);
+ assertEquals(mockJoinFn, ((JoinOperatorSpec) leftRegisteredOpSpec).getJoinFn());
+ assertEquals(joinTtl.toMillis(), ((JoinOperatorSpec) leftRegisteredOpSpec).getTtlMs());
+ assertEquals(leftInputOpSpec, ((JoinOperatorSpec) leftRegisteredOpSpec).getLeftInputOpSpec());
+ assertEquals(rightInputOpSpec, ((JoinOperatorSpec) leftRegisteredOpSpec).getRightInputOpSpec());
+ }
- // should compile
- MessageStream<TestMessageEnvelope> mergeOutput = input1.merge(others);
- validateMergeOperator(input1, mergeOutput);
+ @Test
+ public void testMerge() {
+ StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
+ OperatorSpec mockOpSpec1 = mock(OperatorSpec.class);
+ MessageStream<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph, mockOpSpec1);
+
+ // other streams have the same message type T as input stream message type M
+ OperatorSpec mockOpSpec2 = mock(OperatorSpec.class);
+ OperatorSpec mockOpSpec3 = mock(OperatorSpec.class);
+ Collection<MessageStream<TestMessageEnvelope>> otherStreams1 = ImmutableList.of(
+ new MessageStreamImpl<>(mockGraph, mockOpSpec2),
+ new MessageStreamImpl<>(mockGraph, mockOpSpec3)
+ );
+
+ inputStream.merge(otherStreams1);
+
+ ArgumentCaptor<OperatorSpec> registeredOpCaptor1 = ArgumentCaptor.forClass(OperatorSpec.class);
+ verify(mockOpSpec1).registerNextOperatorSpec(registeredOpCaptor1.capture());
+ OperatorSpec<?, TestMessageEnvelope> registeredOpSpec1 = registeredOpCaptor1.getValue();
+ assertTrue(registeredOpSpec1 instanceof StreamOperatorSpec);
+ FlatMapFunction transformFn = ((StreamOperatorSpec) registeredOpSpec1).getTransformFn();
+
+ ArgumentCaptor<OperatorSpec> registeredOpCaptor2 = ArgumentCaptor.forClass(OperatorSpec.class);
+ verify(mockOpSpec2).registerNextOperatorSpec(registeredOpCaptor2.capture());
+ OperatorSpec<?, TestMessageEnvelope> registeredOpSpec2 = registeredOpCaptor2.getValue();
+
+ ArgumentCaptor<OperatorSpec> registeredOpCaptor3 = ArgumentCaptor.forClass(OperatorSpec.class);
+ verify(mockOpSpec3).registerNextOperatorSpec(registeredOpCaptor3.capture());
+ OperatorSpec<?, TestMessageEnvelope> registeredOpSpec3 = registeredOpCaptor3.getValue();
+
+ assertEquals(registeredOpSpec1, registeredOpSpec2);
+ assertEquals(registeredOpSpec2, registeredOpSpec3);
+ assertEquals(OpCode.MERGE, registeredOpSpec1.getOpCode());
+
+ assertNotNull(transformFn);
+ TestMessageEnvelope mockInput = mock(TestMessageEnvelope.class);
+ assertTrue(transformFn.apply(mockInput).contains(mockInput));
+ assertEquals(1, transformFn.apply(mockInput).size());
+ }
- others.forEach(merge -> validateMergeOperator((MessageStream<TestMessageEnvelope>) merge, mergeOutput));
+ @Test
+ public void testMergeWithRelaxedTypes() {
+ StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
+ MessageStream<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph, mock(OperatorSpec.class));
+
+ // other streams have the same message type T as input stream message type M
+ Collection<MessageStream<TestMessageEnvelope>> otherStreams1 = ImmutableList.of(
+ new MessageStreamImpl<>(mockGraph, mock(OperatorSpec.class)),
+ new MessageStreamImpl<>(mockGraph, mock(OperatorSpec.class))
+ );
+
+ // other streams have the same message type T that extends as input stream message type M
+ Collection<MessageStream<TestInputMessageEnvelope>> otherStreams2 = ImmutableList.of(
+ new MessageStreamImpl<TestInputMessageEnvelope>(mockGraph, mock(OperatorSpec.class)),
+ new MessageStreamImpl<TestInputMessageEnvelope>(mockGraph, mock(OperatorSpec.class))
+ );
+
+ // other streams have a mix of message types such that T extends input stream message type M
+ Collection<MessageStream<TestMessageEnvelope>> otherStreams3 = ImmutableList.of(
+ new MessageStreamImpl<TestMessageEnvelope>(mockGraph, mock(OperatorSpec.class)),
+ // unchecked cast required for the next stream
+ (MessageStream) new MessageStreamImpl<TestInputMessageEnvelope>(mockGraph, mock(OperatorSpec.class))
+ );
+
+ // not supported:
+ // other streams have a mix of message types such that T extends input stream message type M
+ Collection<MessageStream<? extends TestMessageEnvelope>> otherStreams4 = ImmutableList.of(
+ new MessageStreamImpl<TestMessageEnvelope>(mockGraph, mock(OperatorSpec.class)),
+ new MessageStreamImpl<TestInputMessageEnvelope>(mockGraph, mock(OperatorSpec.class))
+ );
+
+ // check if all type combinations compile
+ inputStream.merge(otherStreams1);
+ inputStream.merge(otherStreams2);
+ inputStream.merge(otherStreams3);
+ inputStream.merge(otherStreams4);
}
@Test
public <T> void testMergeWithNestedTypes() {
class MessageEnvelope<TM> { }
- MessageStream<MessageEnvelope<T>> ms1 = new MessageStreamImpl<>(mock(StreamGraphImpl.class));
- MessageStream<MessageEnvelope<T>> ms2 = new MessageStreamImpl<>(mock(StreamGraphImpl.class));
- MessageStream<MessageEnvelope<T>> ms3 = new MessageStreamImpl<>(mock(StreamGraphImpl.class));
+ MessageStream<MessageEnvelope<T>> ms1 = mock(MessageStreamImpl.class);
+ MessageStream<MessageEnvelope<T>> ms2 = mock(MessageStreamImpl.class);
+ MessageStream<MessageEnvelope<T>> ms3 = mock(MessageStreamImpl.class);
Collection<MessageStream<MessageEnvelope<T>>> otherStreams = ImmutableList.of(ms2, ms3);
// should compile
ms1.merge(otherStreams);
}
- private void validateMergeOperator(MessageStream<TestMessageEnvelope> mergeSource, MessageStream<TestMessageEnvelope> mergeOutput) {
- Collection<OperatorSpec> subs = ((MessageStreamImpl<TestMessageEnvelope>) mergeSource).getRegisteredOperatorSpecs();
- assertEquals(subs.size(), 1);
- OperatorSpec<TestMessageEnvelope> mergeOp = subs.iterator().next();
- assertTrue(mergeOp instanceof StreamOperatorSpec);
- assertEquals(((StreamOperatorSpec) mergeOp).getNextStream(), mergeOutput);
- TestMessageEnvelope mockMsg = mock(TestMessageEnvelope.class);
- Collection<TestMessageEnvelope> outputs = ((StreamOperatorSpec<TestMessageEnvelope, TestMessageEnvelope>) mergeOp).getTransformFn().apply(
- mockMsg);
- assertEquals(outputs.size(), 1);
- assertEquals(outputs.iterator().next(), mockMsg);
- }
-
@Test
public void testMergeAll() {
MessageStream<TestMessageEnvelope> input1 = mock(MessageStreamImpl.class);
@@ -361,26 +402,9 @@ public class TestMessageStreamImpl {
assertTrue(otherStreamsCaptor.getValue().contains(input3));
}
- @Test
- public void testPartitionBy() {
- Map<String, String> map = new HashMap<>();
- map.put(JobConfig.JOB_DEFAULT_SYSTEM(), "testsystem");
- Config config = new MapConfig(map);
- ApplicationRunner runner = ApplicationRunner.fromConfig(config);
- StreamGraphImpl streamGraph = new StreamGraphImpl(runner, config);
- MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(streamGraph);
- Function<TestMessageEnvelope, String> keyExtractorFunc = m -> "222";
- inputStream.partitionBy(keyExtractorFunc);
- assertTrue(streamGraph.getInputStreams().size() == 1);
- assertTrue(streamGraph.getOutputStreams().size() == 1);
-
- Collection<OperatorSpec> subs = inputStream.getRegisteredOperatorSpecs();
- assertEquals(subs.size(), 1);
- OperatorSpec<TestMessageEnvelope> partitionByOp = subs.iterator().next();
- assertTrue(partitionByOp instanceof SinkOperatorSpec);
- assertNull(partitionByOp.getNextStream());
-
- ((SinkOperatorSpec) partitionByOp).getSinkFn().apply(new TestMessageEnvelope("111", "test", 1000),
- envelope -> assertTrue(envelope.getPartitionKey().equals("222")), null);
+ class TestInputMessageEnvelope extends TestMessageEnvelope {
+ public TestInputMessageEnvelope(String key, Object value) {
+ super(key, value);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/c1c4289c/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImplUtil.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImplUtil.java b/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImplUtil.java
deleted file mode 100644
index c4e9f51..0000000
--- a/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImplUtil.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators;
-
-
-public class TestMessageStreamImplUtil {
- public static <M> MessageStreamImpl<M> getMessageStreamImpl(StreamGraphImpl graph) {
- return new MessageStreamImpl<M>(graph);
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/c1c4289c/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphImpl.java b/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphImpl.java
index 9d95217..1fc60bd 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphImpl.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphImpl.java
@@ -21,23 +21,21 @@ package org.apache.samza.operators;
import junit.framework.Assert;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
-import org.apache.samza.operators.data.MessageType;
-import org.apache.samza.operators.data.TestInputMessageEnvelope;
import org.apache.samza.operators.data.TestMessageEnvelope;
-import org.apache.samza.operators.stream.InputStreamInternal;
-import org.apache.samza.operators.stream.InputStreamInternalImpl;
-import org.apache.samza.operators.stream.IntermediateStreamInternalImpl;
-import org.apache.samza.operators.stream.OutputStreamInternalImpl;
+import org.apache.samza.operators.spec.InputOperatorSpec;
+import org.apache.samza.operators.spec.OperatorSpec.OpCode;
+import org.apache.samza.operators.spec.OutputStreamImpl;
+import org.apache.samza.operators.stream.IntermediateMessageStreamImpl;
import org.apache.samza.runtime.ApplicationRunner;
import org.apache.samza.system.StreamSpec;
import org.junit.Test;
import java.util.ArrayList;
+import java.util.List;
import java.util.function.BiFunction;
import java.util.function.Function;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -46,152 +44,141 @@ public class TestStreamGraphImpl {
@Test
public void testGetInputStream() {
ApplicationRunner mockRunner = mock(ApplicationRunner.class);
- Config mockConfig = mock(Config.class);
- StreamSpec testStreamSpec = new StreamSpec("test-stream-1", "physical-stream-1", "test-system");
- when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(testStreamSpec);
+ StreamSpec mockStreamSpec = mock(StreamSpec.class);
+ when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
+ StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
+ BiFunction<String, String, TestMessageEnvelope> mockMsgBuilder = mock(BiFunction.class);
+
+ MessageStream<TestMessageEnvelope> inputStream = graph.getInputStream("test-stream-1", mockMsgBuilder);
+
+ InputOperatorSpec<String, String, TestMessageEnvelope> inputOpSpec =
+ (InputOperatorSpec) ((MessageStreamImpl<TestMessageEnvelope>) inputStream).getOperatorSpec();
+ assertEquals(OpCode.INPUT, inputOpSpec.getOpCode());
+ assertEquals(graph.getInputOperators().get(mockStreamSpec), inputOpSpec);
+ assertEquals(mockMsgBuilder, inputOpSpec.getMsgBuilder());
+ assertEquals(mockStreamSpec, inputOpSpec.getStreamSpec());
+ }
- StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig);
- BiFunction<String, MessageType, TestInputMessageEnvelope> xMsgBuilder =
- (k, v) -> new TestInputMessageEnvelope(k, v.getValue(), v.getEventTime(), "input-id-1");
- MessageStream<TestMessageEnvelope> mInputStream = graph.getInputStream("test-stream-1", xMsgBuilder);
- assertEquals(graph.getInputStreams().get(testStreamSpec), mInputStream);
- assertTrue(mInputStream instanceof InputStreamInternalImpl);
- assertEquals(((InputStreamInternalImpl) mInputStream).getMsgBuilder(), xMsgBuilder);
-
- String key = "test-input-key";
- MessageType msgBody = new MessageType("test-msg-value", 333333L);
- TestMessageEnvelope xInputMsg = ((InputStreamInternalImpl<String, MessageType, TestMessageEnvelope>) mInputStream).
- getMsgBuilder().apply(key, msgBody);
- assertEquals(xInputMsg.getKey(), key);
- assertEquals(xInputMsg.getMessage().getValue(), msgBody.getValue());
- assertEquals(xInputMsg.getMessage().getEventTime(), msgBody.getEventTime());
- assertEquals(((TestInputMessageEnvelope) xInputMsg).getInputId(), "input-id-1");
+ @Test
+ public void testGetInputStreamWithRelaxedTypes() {
+ ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+ StreamSpec mockStreamSpec = mock(StreamSpec.class);
+ when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
+ StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
+ BiFunction<String, String, TestMessageEnvelope> mockMsgBuilder = mock(BiFunction.class);
+
+ MessageStream<TestMessageEnvelope> inputStream = graph.getInputStream("test-stream-1", mockMsgBuilder);
+
+ InputOperatorSpec<String, String, TestMessageEnvelope> inputOpSpec =
+ (InputOperatorSpec) ((MessageStreamImpl<TestMessageEnvelope>) inputStream).getOperatorSpec();
+ assertEquals(OpCode.INPUT, inputOpSpec.getOpCode());
+ assertEquals(graph.getInputOperators().get(mockStreamSpec), inputOpSpec);
+ assertEquals(mockMsgBuilder, inputOpSpec.getMsgBuilder());
+ assertEquals(mockStreamSpec, inputOpSpec.getStreamSpec());
+ }
+
+ @Test
+ public void testMultipleGetInputStreams() {
+ ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+ StreamSpec mockStreamSpec1 = mock(StreamSpec.class);
+ StreamSpec mockStreamSpec2 = mock(StreamSpec.class);
+ when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec1);
+ when(mockRunner.getStreamSpec("test-stream-2")).thenReturn(mockStreamSpec2);
+
+ StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
+ MessageStream<Object> inputStream1 = graph.getInputStream("test-stream-1", mock(BiFunction.class));
+ MessageStream<Object> inputStream2 = graph.getInputStream("test-stream-2", mock(BiFunction.class));
+
+ InputOperatorSpec<String, String, TestMessageEnvelope> inputOpSpec1 =
+ (InputOperatorSpec) ((MessageStreamImpl<Object>) inputStream1).getOperatorSpec();
+ InputOperatorSpec<String, String, TestMessageEnvelope> inputOpSpec2 =
+ (InputOperatorSpec) ((MessageStreamImpl<Object>) inputStream2).getOperatorSpec();
+
+ assertEquals(graph.getInputOperators().size(), 2);
+ assertEquals(graph.getInputOperators().get(mockStreamSpec1), inputOpSpec1);
+ assertEquals(graph.getInputOperators().get(mockStreamSpec2), inputOpSpec2);
}
@Test(expected = IllegalStateException.class)
- public void testMultipleGetInputStream() {
+ public void testGetSameInputStreamTwice() {
ApplicationRunner mockRunner = mock(ApplicationRunner.class);
- Config mockConfig = mock(Config.class);
+ when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mock(StreamSpec.class));
- StreamSpec testStreamSpec1 = new StreamSpec("test-stream-1", "physical-stream-1", "test-system");
- StreamSpec testStreamSpec2 = new StreamSpec("test-stream-2", "physical-stream-2", "test-system");
- StreamSpec nonExistentStreamSpec = new StreamSpec("non-existent-stream", "physical-stream-1", "test-system");
+ StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
+ graph.getInputStream("test-stream-1", mock(BiFunction.class));
+ graph.getInputStream("test-stream-1", mock(BiFunction.class)); // should throw exception
+ }
- when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(testStreamSpec1);
- when(mockRunner.getStreamSpec("test-stream-2")).thenReturn(testStreamSpec2);
+ @Test
+ public void testGetOutputStream() {
+ ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+ StreamSpec mockStreamSpec = mock(StreamSpec.class);
+ when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
- StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig);
- BiFunction<String, MessageType, TestInputMessageEnvelope> xMsgBuilder =
- (k, v) -> new TestInputMessageEnvelope(k, v.getValue(), v.getEventTime(), "input-id-1");
- //create 2 streams for the corresponding streamIds
- MessageStream<TestInputMessageEnvelope> inputStream1 = graph.getInputStream("test-stream-1", xMsgBuilder);
- MessageStream<TestInputMessageEnvelope> inputStream2 = graph.getInputStream("test-stream-2", xMsgBuilder);
+ StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
+ Function<TestMessageEnvelope, String> mockKeyExtractor = mock(Function.class);
+ Function<TestMessageEnvelope, String> mockMsgExtractor = mock(Function.class);
- //assert that the streamGraph contains only the above 2 streams
- assertEquals(graph.getInputStreams().get(testStreamSpec1), inputStream1);
- assertEquals(graph.getInputStreams().get(testStreamSpec2), inputStream2);
- assertEquals(graph.getInputStreams().get(nonExistentStreamSpec), null);
- assertEquals(graph.getInputStreams().size(), 2);
+ OutputStream<String, String, TestMessageEnvelope> outputStream =
+ graph.getOutputStream("test-stream-1", mockKeyExtractor, mockMsgExtractor);
- //should throw IllegalStateException
- graph.getInputStream("test-stream-1", xMsgBuilder);
+ OutputStreamImpl<String, String, TestMessageEnvelope> outputOpSpec = (OutputStreamImpl) outputStream;
+ assertEquals(graph.getOutputStreams().get(mockStreamSpec), outputOpSpec);
+ assertEquals(mockKeyExtractor, outputOpSpec.getKeyExtractor());
+ assertEquals(mockMsgExtractor, outputOpSpec.getMsgExtractor());
+ assertEquals(mockStreamSpec, outputOpSpec.getStreamSpec());
}
-
- @Test
- public void testGetOutputStream() {
+ @Test(expected = IllegalStateException.class)
+ public void testGetSameOutputStreamTwice() {
ApplicationRunner mockRunner = mock(ApplicationRunner.class);
- Config mockConfig = mock(Config.class);
- StreamSpec testStreamSpec = new StreamSpec("test-stream-1", "physical-stream-1", "test-system");
- when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(testStreamSpec);
-
- class MyMessageType extends MessageType {
- public final String outputId;
+ when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mock(StreamSpec.class));
- public MyMessageType(String value, long eventTime, String outputId) {
- super(value, eventTime);
- this.outputId = outputId;
- }
- }
-
- StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig);
- Function<TestMessageEnvelope, String> xKeyExtractor = x -> x.getKey();
- Function<TestMessageEnvelope, MyMessageType> xMsgExtractor =
- x -> new MyMessageType(x.getMessage().getValue(), x.getMessage().getEventTime(), "test-output-id-1");
-
- OutputStream<String, MyMessageType, TestInputMessageEnvelope> mOutputStream =
- graph.getOutputStream("test-stream-1", xKeyExtractor, xMsgExtractor);
- assertEquals(graph.getOutputStreams().get(testStreamSpec), mOutputStream);
- assertTrue(mOutputStream instanceof OutputStreamInternalImpl);
- assertEquals(((OutputStreamInternalImpl) mOutputStream).getKeyExtractor(), xKeyExtractor);
- assertEquals(((OutputStreamInternalImpl) mOutputStream).getMsgExtractor(), xMsgExtractor);
-
- TestInputMessageEnvelope xInputMsg = new TestInputMessageEnvelope("test-key-1", "test-msg-1", 33333L, "input-id-1");
- assertEquals(((OutputStreamInternalImpl<String, MyMessageType, TestInputMessageEnvelope>) mOutputStream).
- getKeyExtractor().apply(xInputMsg), "test-key-1");
- assertEquals(((OutputStreamInternalImpl<String, MyMessageType, TestInputMessageEnvelope>) mOutputStream).
- getMsgExtractor().apply(xInputMsg).getValue(), "test-msg-1");
- assertEquals(((OutputStreamInternalImpl<String, MyMessageType, TestInputMessageEnvelope>) mOutputStream).
- getMsgExtractor().apply(xInputMsg).getEventTime(), 33333L);
- assertEquals(((OutputStreamInternalImpl<String, MyMessageType, TestInputMessageEnvelope>) mOutputStream).
- getMsgExtractor().apply(xInputMsg).outputId, "test-output-id-1");
+ StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
+ graph.getOutputStream("test-stream-1", mock(Function.class), mock(Function.class));
+ graph.getOutputStream("test-stream-1", mock(Function.class), mock(Function.class)); // should throw exception
}
@Test
public void testGetIntermediateStream() {
ApplicationRunner mockRunner = mock(ApplicationRunner.class);
Config mockConfig = mock(Config.class);
- StreamSpec testStreamSpec = new StreamSpec("myJob-i001-test-stream-1", "physical-stream-1", "test-system");
- when(mockRunner.getStreamSpec("myJob-i001-test-stream-1")).thenReturn(testStreamSpec);
+ StreamSpec mockStreamSpec = mock(StreamSpec.class);
when(mockConfig.get(JobConfig.JOB_NAME())).thenReturn("myJob");
when(mockConfig.get(JobConfig.JOB_ID(), "1")).thenReturn("i001");
+ when(mockRunner.getStreamSpec("myJob-i001-test-stream-1")).thenReturn(mockStreamSpec);
- class MyMessageType extends MessageType {
- public final String outputId;
+ StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig);
+ Function<TestMessageEnvelope, String> mockKeyExtractor = mock(Function.class);
+ Function<TestMessageEnvelope, String> mockMsgExtractor = mock(Function.class);
+ BiFunction<String, String, TestMessageEnvelope> mockMsgBuilder = mock(BiFunction.class);
+
+ IntermediateMessageStreamImpl<?, ?, TestMessageEnvelope> intermediateStreamImpl =
+ graph.getIntermediateStream("test-stream-1", mockKeyExtractor, mockMsgExtractor, mockMsgBuilder);
+
+ assertEquals(graph.getInputOperators().get(mockStreamSpec), intermediateStreamImpl.getOperatorSpec());
+ assertEquals(graph.getOutputStreams().get(mockStreamSpec), intermediateStreamImpl.getOutputStream());
+ assertEquals(mockStreamSpec, intermediateStreamImpl.getStreamSpec());
+ assertEquals(mockKeyExtractor, intermediateStreamImpl.getOutputStream().getKeyExtractor());
+ assertEquals(mockMsgExtractor, intermediateStreamImpl.getOutputStream().getMsgExtractor());
+ assertEquals(mockMsgBuilder, ((InputOperatorSpec) intermediateStreamImpl.getOperatorSpec()).getMsgBuilder());
+ }
- public MyMessageType(String value, long eventTime, String outputId) {
- super(value, eventTime);
- this.outputId = outputId;
- }
- }
+ @Test(expected = IllegalStateException.class)
+ public void testGetSameIntermediateStreamTwice() {
+ ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+ when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mock(StreamSpec.class));
- StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig);
- Function<TestMessageEnvelope, String> xKeyExtractor = x -> x.getKey();
- Function<TestMessageEnvelope, MyMessageType> xMsgExtractor =
- x -> new MyMessageType(x.getMessage().getValue(), x.getMessage().getEventTime(), "test-output-id-1");
- BiFunction<String, MessageType, TestInputMessageEnvelope> xMsgBuilder =
- (k, v) -> new TestInputMessageEnvelope(k, v.getValue(), v.getEventTime(), "input-id-1");
-
- MessageStream<TestMessageEnvelope> mIntermediateStream =
- graph.getIntermediateStream("test-stream-1", xKeyExtractor, xMsgExtractor, xMsgBuilder);
- assertEquals(graph.getOutputStreams().get(testStreamSpec), mIntermediateStream);
- assertTrue(mIntermediateStream instanceof IntermediateStreamInternalImpl);
- assertEquals(((IntermediateStreamInternalImpl) mIntermediateStream).getKeyExtractor(), xKeyExtractor);
- assertEquals(((IntermediateStreamInternalImpl) mIntermediateStream).getMsgExtractor(), xMsgExtractor);
- assertEquals(((IntermediateStreamInternalImpl) mIntermediateStream).getMsgBuilder(), xMsgBuilder);
-
- TestMessageEnvelope xInputMsg = new TestMessageEnvelope("test-key-1", "test-msg-1", 33333L);
- assertEquals(((IntermediateStreamInternalImpl<String, MessageType, TestMessageEnvelope>) mIntermediateStream).
- getKeyExtractor().apply(xInputMsg), "test-key-1");
- assertEquals(((IntermediateStreamInternalImpl<String, MessageType, TestMessageEnvelope>) mIntermediateStream).
- getMsgExtractor().apply(xInputMsg).getValue(), "test-msg-1");
- assertEquals(((IntermediateStreamInternalImpl<String, MessageType, TestMessageEnvelope>) mIntermediateStream).
- getMsgExtractor().apply(xInputMsg).getEventTime(), 33333L);
- assertEquals(((IntermediateStreamInternalImpl<String, MessageType, TestMessageEnvelope>) mIntermediateStream).
- getMsgBuilder().apply("test-key-1", new MyMessageType("test-msg-1", 33333L, "test-output-id-1")).getKey(), "test-key-1");
- assertEquals(((IntermediateStreamInternalImpl<String, MessageType, TestMessageEnvelope>) mIntermediateStream).
- getMsgBuilder().apply("test-key-1", new MyMessageType("test-msg-1", 33333L, "test-output-id-1")).getMessage().getValue(), "test-msg-1");
- assertEquals(((IntermediateStreamInternalImpl<String, MessageType, TestMessageEnvelope>) mIntermediateStream).
- getMsgBuilder().apply("test-key-1", new MyMessageType("test-msg-1", 33333L, "test-output-id-1")).getMessage().getEventTime(), 33333L);
+ StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
+ graph.getIntermediateStream("test-stream-1", mock(Function.class), mock(Function.class), mock(BiFunction.class));
+ graph.getIntermediateStream("test-stream-1", mock(Function.class), mock(Function.class), mock(BiFunction.class));
}
@Test
- public void testGetNextOpId() {
+ public void testGetNextOpIdIncrementsId() {
ApplicationRunner mockRunner = mock(ApplicationRunner.class);
- Config mockConfig = mock(Config.class);
-
- StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig);
+ StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
assertEquals(graph.getNextOpId(), 0);
assertEquals(graph.getNextOpId(), 1);
}
@@ -216,10 +203,10 @@ public class TestStreamGraphImpl {
graph.getInputStream("test-stream-2", (k, v) -> v);
graph.getInputStream("test-stream-3", (k, v) -> v);
- ArrayList<InputStreamInternal> inputMessageStreams = new ArrayList<>(graph.getInputStreams().values());
- Assert.assertEquals(inputMessageStreams.size(), 3);
- Assert.assertEquals(inputMessageStreams.get(0).getStreamSpec(), testStreamSpec1);
- Assert.assertEquals(inputMessageStreams.get(1).getStreamSpec(), testStreamSpec2);
- Assert.assertEquals(inputMessageStreams.get(2).getStreamSpec(), testStreamSpec3);
+ List<InputOperatorSpec> inputSpecs = new ArrayList<>(graph.getInputOperators().values());
+ Assert.assertEquals(inputSpecs.size(), 3);
+ Assert.assertEquals(inputSpecs.get(0).getStreamSpec(), testStreamSpec1);
+ Assert.assertEquals(inputSpecs.get(1).getStreamSpec(), testStreamSpec2);
+ Assert.assertEquals(inputSpecs.get(2).getStreamSpec(), testStreamSpec3);
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/c1c4289c/samza-core/src/test/java/org/apache/samza/operators/data/MessageType.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/data/MessageType.java b/samza-core/src/test/java/org/apache/samza/operators/data/MessageType.java
deleted file mode 100644
index 3fd015b..0000000
--- a/samza-core/src/test/java/org/apache/samza/operators/data/MessageType.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.data;
-
-public class MessageType {
- private final String value;
- private final long eventTime;
-
- public MessageType(String value, long eventTime) {
- this.value = value;
- this.eventTime = eventTime;
- }
-
- public long getEventTime() {
- return eventTime;
- }
-
- public String getValue() {
- return value;
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/c1c4289c/samza-core/src/test/java/org/apache/samza/operators/data/TestExtOutputMessageEnvelope.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/data/TestExtOutputMessageEnvelope.java b/samza-core/src/test/java/org/apache/samza/operators/data/TestExtOutputMessageEnvelope.java
deleted file mode 100644
index 22222ed..0000000
--- a/samza-core/src/test/java/org/apache/samza/operators/data/TestExtOutputMessageEnvelope.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.data;
-
-public class TestExtOutputMessageEnvelope extends TestOutputMessageEnvelope {
- private final String outputId;
-
- public TestExtOutputMessageEnvelope(String key, Integer value, String outputId) {
- super(key, value);
- this.outputId = outputId;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/c1c4289c/samza-core/src/test/java/org/apache/samza/operators/data/TestInputMessageEnvelope.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/data/TestInputMessageEnvelope.java b/samza-core/src/test/java/org/apache/samza/operators/data/TestInputMessageEnvelope.java
deleted file mode 100644
index 089f534..0000000
--- a/samza-core/src/test/java/org/apache/samza/operators/data/TestInputMessageEnvelope.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.data;
-
-public class TestInputMessageEnvelope extends TestMessageEnvelope {
- private final String inputId;
-
- public TestInputMessageEnvelope(String key, String value, long eventTime, String inputId) {
- super(key, value, eventTime);
- this.inputId = inputId;
- }
-
- public String getInputId() {
- return this.inputId;
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/c1c4289c/samza-core/src/test/java/org/apache/samza/operators/data/TestMessageEnvelope.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/data/TestMessageEnvelope.java b/samza-core/src/test/java/org/apache/samza/operators/data/TestMessageEnvelope.java
index 05a63cd..68305cc 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/data/TestMessageEnvelope.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/data/TestMessageEnvelope.java
@@ -20,21 +20,19 @@ package org.apache.samza.operators.data;
public class TestMessageEnvelope {
-
private final String key;
- private final MessageType value;
+ private final Object value;
- public TestMessageEnvelope(String key, String value, long eventTime) {
+ public TestMessageEnvelope(String key, Object value) {
this.key = key;
- this.value = new MessageType(value, eventTime);
+ this.value = value;
}
- public MessageType getMessage() {
+ public Object getMessage() {
return this.value;
}
public String getKey() {
return this.key;
}
-
}
http://git-wip-us.apache.org/repos/asf/samza/blob/c1c4289c/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
index 73d851b..d50d271 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
@@ -23,7 +23,6 @@ import org.apache.samza.metrics.Counter;
import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.metrics.MetricsRegistryMap;
import org.apache.samza.metrics.Timer;
-import org.apache.samza.operators.MessageStreamImpl;
import org.apache.samza.operators.spec.OperatorSpec;
import org.apache.samza.task.MessageCollector;
import org.apache.samza.task.TaskContext;
@@ -177,9 +176,11 @@ public class TestOperatorImpl {
private static class TestOpImpl extends OperatorImpl<Object, Object> {
private final Object mockOutput;
+ private final TestOpSpec testOpSpec;
TestOpImpl(Object mockOutput) {
this.mockOutput = mockOutput;
+ this.testOpSpec = new TestOpSpec();
}
@Override
@@ -199,31 +200,14 @@ public class TestOperatorImpl {
@Override
protected void handleClose() {}
- @Override
- protected OperatorSpec<Object> getOperatorSpec() {
- return new TestOpSpec();
+ protected OperatorSpec<Object, Object> getOperatorSpec() {
+ return testOpSpec;
}
}
- private static class TestOpSpec implements OperatorSpec<Object> {
- @Override
- public MessageStreamImpl<Object> getNextStream() {
- return null;
- }
-
- @Override
- public OpCode getOpCode() {
- return OpCode.INPUT;
- }
-
- @Override
- public int getOpId() {
- return -1;
- }
-
- @Override
- public String getSourceLocation() {
- return "";
+ private static class TestOpSpec extends OperatorSpec<Object, Object> {
+ TestOpSpec() {
+ super(OpCode.INPUT, 1);
}
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/c1c4289c/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
index 67e5b46..b2c7722 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
@@ -19,86 +19,211 @@
package org.apache.samza.operators.impl;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.samza.config.Config;
import org.apache.samza.metrics.MetricsRegistryMap;
import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.OutputStream;
import org.apache.samza.operators.StreamGraphImpl;
+import org.apache.samza.operators.functions.FilterFunction;
+import org.apache.samza.operators.functions.JoinFunction;
import org.apache.samza.operators.functions.MapFunction;
+import org.apache.samza.operators.spec.OperatorSpec.OpCode;
import org.apache.samza.runtime.ApplicationRunner;
import org.apache.samza.system.StreamSpec;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.task.MessageCollector;
import org.apache.samza.task.TaskContext;
+import org.apache.samza.task.TaskCoordinator;
+import org.apache.samza.util.Clock;
import org.apache.samza.util.SystemClock;
import org.junit.Test;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
-
-import static junit.framework.Assert.assertEquals;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class TestOperatorImplGraph {
+ public void testEmptyChain() {
+ StreamGraphImpl streamGraph = new StreamGraphImpl(mock(ApplicationRunner.class), mock(Config.class));
+ OperatorImplGraph opGraph =
+ new OperatorImplGraph(streamGraph, mock(Config.class), mock(TaskContext.class), mock(Clock.class));
+ assertEquals(0, opGraph.getAllInputOperators().size());
+ }
+
@Test
- public void testOperatorGraphInitAndClose() {
+ public void testLinearChain() {
+ ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+ when(mockRunner.getStreamSpec(eq("input"))).thenReturn(new StreamSpec("input", "input-stream", "input-system"));
+ when(mockRunner.getStreamSpec(eq("output"))).thenReturn(mock(StreamSpec.class));
+ StreamGraphImpl streamGraph = new StreamGraphImpl(mockRunner, mock(Config.class));
+
+ MessageStream<Object> inputStream = streamGraph.getInputStream("input", mock(BiFunction.class));
+ OutputStream<Object, Object, Object> outputStream =
+ streamGraph.getOutputStream("output", mock(Function.class), mock(Function.class));
+
+ inputStream
+ .filter(mock(FilterFunction.class))
+ .map(mock(MapFunction.class))
+ .sendTo(outputStream);
+
+ TaskContext mockTaskContext = mock(TaskContext.class);
+ when(mockTaskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap());
+ OperatorImplGraph opImplGraph =
+ new OperatorImplGraph(streamGraph, mock(Config.class), mockTaskContext, mock(Clock.class));
+
+ InputOperatorImpl inputOpImpl = opImplGraph.getInputOperator(new SystemStream("input-system", "input-stream"));
+ assertEquals(1, inputOpImpl.registeredOperators.size());
+
+ OperatorImpl filterOpImpl = (StreamOperatorImpl) inputOpImpl.registeredOperators.iterator().next();
+ assertEquals(1, filterOpImpl.registeredOperators.size());
+ assertEquals(OpCode.FILTER, filterOpImpl.getOperatorSpec().getOpCode());
+
+ OperatorImpl mapOpImpl = (StreamOperatorImpl) filterOpImpl.registeredOperators.iterator().next();
+ assertEquals(1, mapOpImpl.registeredOperators.size());
+ assertEquals(OpCode.MAP, mapOpImpl.getOperatorSpec().getOpCode());
+
+ OperatorImpl sendToOpImpl = (OutputOperatorImpl) mapOpImpl.registeredOperators.iterator().next();
+ assertEquals(0, sendToOpImpl.registeredOperators.size());
+ assertEquals(OpCode.SEND_TO, sendToOpImpl.getOperatorSpec().getOpCode());
+ }
+
+
+ @Test
+ public void testBroadcastChain() {
+ ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+ when(mockRunner.getStreamSpec(eq("input"))).thenReturn(new StreamSpec("input", "input-stream", "input-system"));
+ StreamGraphImpl streamGraph = new StreamGraphImpl(mockRunner, mock(Config.class));
+
+ MessageStream<Object> inputStream = streamGraph.getInputStream("input", mock(BiFunction.class));
+ inputStream.filter(mock(FilterFunction.class));
+ inputStream.map(mock(MapFunction.class));
+
+ TaskContext mockTaskContext = mock(TaskContext.class);
+ when(mockTaskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap());
+ OperatorImplGraph opImplGraph =
+ new OperatorImplGraph(streamGraph, mock(Config.class), mockTaskContext, mock(Clock.class));
+
+ InputOperatorImpl inputOpImpl = opImplGraph.getInputOperator(new SystemStream("input-system", "input-stream"));
+ assertEquals(2, inputOpImpl.registeredOperators.size());
+ assertTrue(inputOpImpl.registeredOperators.stream().anyMatch(opImpl ->
+ ((OperatorImpl) opImpl).getOperatorSpec().getOpCode() == OpCode.FILTER));
+ assertTrue(inputOpImpl.registeredOperators.stream().anyMatch(opImpl ->
+ ((OperatorImpl) opImpl).getOperatorSpec().getOpCode() == OpCode.MAP));
+ }
+
+ @Test
+ public void testJoinChain() {
ApplicationRunner mockRunner = mock(ApplicationRunner.class);
- StreamSpec testStreamSpec1 = new StreamSpec("test-stream-1", "physical-stream-1", "test-system");
- when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(testStreamSpec1);
- StreamSpec testStreamSpec2 = new StreamSpec("test-stream-2", "physical-stream-2", "test-system");
- when(mockRunner.getStreamSpec("test-stream-2")).thenReturn(testStreamSpec2);
+ when(mockRunner.getStreamSpec(eq("input1"))).thenReturn(new StreamSpec("input1", "input-stream1", "input-system"));
+ when(mockRunner.getStreamSpec(eq("input2"))).thenReturn(new StreamSpec("input2", "input-stream2", "input-system"));
+ StreamGraphImpl streamGraph = new StreamGraphImpl(mockRunner, mock(Config.class));
+
+ JoinFunction mockJoinFunction = mock(JoinFunction.class);
+ MessageStream<Object> inputStream1 = streamGraph.getInputStream("input1", (k, v) -> v);
+ MessageStream<Object> inputStream2 = streamGraph.getInputStream("input2", (k, v) -> v);
+ inputStream1.join(inputStream2, mockJoinFunction, Duration.ofHours(1));
+
+ TaskContext mockTaskContext = mock(TaskContext.class);
+ when(mockTaskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap());
+ OperatorImplGraph opImplGraph =
+ new OperatorImplGraph(streamGraph, mock(Config.class), mockTaskContext, mock(Clock.class));
+
+ // verify that join function is initialized once.
+ verify(mockJoinFunction, times(1)).init(any(Config.class), any(TaskContext.class));
+
+ InputOperatorImpl inputOpImpl1 = opImplGraph.getInputOperator(new SystemStream("input-system", "input-stream1"));
+ InputOperatorImpl inputOpImpl2 = opImplGraph.getInputOperator(new SystemStream("input-system", "input-stream2"));
+ PartialJoinOperatorImpl leftPartialJoinOpImpl =
+ (PartialJoinOperatorImpl) inputOpImpl1.registeredOperators.iterator().next();
+ PartialJoinOperatorImpl rightPartialJoinOpImpl =
+ (PartialJoinOperatorImpl) inputOpImpl2.registeredOperators.iterator().next();
+
+ assertEquals(leftPartialJoinOpImpl.getOperatorSpec(), rightPartialJoinOpImpl.getOperatorSpec());
+ assertNotSame(leftPartialJoinOpImpl, rightPartialJoinOpImpl);
+
+ Object joinKey = new Object();
+ // verify that left partial join operator calls getFirstKey
+ Object mockLeftMessage = mock(Object.class);
+ when(mockJoinFunction.getFirstKey(eq(mockLeftMessage))).thenReturn(joinKey);
+ inputOpImpl1.onMessage(Pair.of("", mockLeftMessage), mock(MessageCollector.class), mock(TaskCoordinator.class));
+ verify(mockJoinFunction, times(1)).getFirstKey(mockLeftMessage);
+
+ // verify that right partial join operator calls getSecondKey
+ Object mockRightMessage = mock(Object.class);
+ when(mockJoinFunction.getSecondKey(eq(mockRightMessage))).thenReturn(joinKey);
+ inputOpImpl2.onMessage(Pair.of("", mockRightMessage), mock(MessageCollector.class), mock(TaskCoordinator.class));
+ verify(mockJoinFunction, times(1)).getSecondKey(mockRightMessage);
+
+ // verify that the join function apply is called with the correct messages on match
+ verify(mockJoinFunction, times(1)).apply(mockLeftMessage, mockRightMessage);
+ }
+ @Test
+ public void testOperatorGraphInitAndClose() {
+ ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+ when(mockRunner.getStreamSpec("input1")).thenReturn(new StreamSpec("input1", "input-stream1", "input-system"));
+ when(mockRunner.getStreamSpec("input2")).thenReturn(new StreamSpec("input2", "input-stream2", "input-system"));
Config mockConfig = mock(Config.class);
- TaskContext mockContext = createMockContext();
- StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig);
+ TaskContext mockContext = mock(TaskContext.class);
+ when(mockContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap());
+ StreamGraphImpl streamGraph = new StreamGraphImpl(mockRunner, mockConfig);
- List<String> initializationOrder = new ArrayList<>();
- List<String> finalizationOrder = new ArrayList<>();
+ MessageStream<Object> inputStream1 = streamGraph.getInputStream("input1", (k, v) -> v);
+ MessageStream<Object> inputStream2 = streamGraph.getInputStream("input2", (k, v) -> v);
- MessageStream<Object> inputStream1 = graph.getInputStream("test-stream-1", (k, v) -> v);
- MessageStream<Object> inputStream2 = graph.getInputStream("test-stream-2", (k, v) -> v);
+ List<String> initializedOperators = new ArrayList<>();
+ List<String> closedOperators = new ArrayList<>();
- inputStream1.map(createMapFunction("1", initializationOrder, finalizationOrder))
- .map(createMapFunction("2", initializationOrder, finalizationOrder));
+ inputStream1.map(createMapFunction("1", initializedOperators, closedOperators))
+ .map(createMapFunction("2", initializedOperators, closedOperators));
- inputStream2.map(createMapFunction("3", initializationOrder, finalizationOrder))
- .map(createMapFunction("4", initializationOrder, finalizationOrder));
+ inputStream2.map(createMapFunction("3", initializedOperators, closedOperators))
+ .map(createMapFunction("4", initializedOperators, closedOperators));
- OperatorImplGraph implGraph = new OperatorImplGraph(SystemClock.instance());
+ OperatorImplGraph opImplGraph = new OperatorImplGraph(streamGraph, mockConfig, mockContext, SystemClock.instance());
// Assert that initialization occurs in topological order.
- implGraph.init(graph, mockConfig, mockContext);
- assertEquals(initializationOrder.get(0), "1");
- assertEquals(initializationOrder.get(1), "2");
- assertEquals(initializationOrder.get(2), "3");
- assertEquals(initializationOrder.get(3), "4");
+ assertEquals(initializedOperators.get(0), "1");
+ assertEquals(initializedOperators.get(1), "2");
+ assertEquals(initializedOperators.get(2), "3");
+ assertEquals(initializedOperators.get(3), "4");
// Assert that finalization occurs in reverse topological order.
- implGraph.close();
- assertEquals(finalizationOrder.get(0), "4");
- assertEquals(finalizationOrder.get(1), "3");
- assertEquals(finalizationOrder.get(2), "2");
- assertEquals(finalizationOrder.get(3), "1");
- }
-
- private TaskContext createMockContext() {
- TaskContext mockContext = mock(TaskContext.class);
- when(mockContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap());
- return mockContext;
+ opImplGraph.close();
+ assertEquals(closedOperators.get(0), "4");
+ assertEquals(closedOperators.get(1), "3");
+ assertEquals(closedOperators.get(2), "2");
+ assertEquals(closedOperators.get(3), "1");
}
/**
* Creates an identity map function that appends to the provided lists when init/close is invoked.
*/
- private MapFunction<Object, Object> createMapFunction(String id, List<String> initializationOrder, List<String> finalizationOrder) {
+ private MapFunction<Object, Object> createMapFunction(String id,
+ List<String> initializedOperators, List<String> finalizedOperators) {
return new MapFunction<Object, Object>() {
@Override
public void init(Config config, TaskContext context) {
- initializationOrder.add(id);
+ initializedOperators.add(id);
}
@Override
public void close() {
- finalizationOrder.add(id);
+ finalizedOperators.add(id);
}
@Override
@@ -108,4 +233,3 @@ public class TestOperatorImplGraph {
};
}
}
-
http://git-wip-us.apache.org/repos/asf/samza/blob/c1c4289c/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java
deleted file mode 100644
index a75fadb..0000000
--- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java
+++ /dev/null
@@ -1,236 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.impl;
-
-import org.apache.samza.config.Config;
-import org.apache.samza.metrics.MetricsRegistryMap;
-import org.apache.samza.operators.MessageStreamImpl;
-import org.apache.samza.operators.StreamGraphImpl;
-import org.apache.samza.operators.TestMessageStreamImplUtil;
-import org.apache.samza.operators.data.TestMessageEnvelope;
-import org.apache.samza.operators.data.TestOutputMessageEnvelope;
-import org.apache.samza.operators.functions.FlatMapFunction;
-import org.apache.samza.operators.functions.JoinFunction;
-import org.apache.samza.operators.functions.SinkFunction;
-import org.apache.samza.operators.spec.OperatorSpec;
-import org.apache.samza.operators.spec.PartialJoinOperatorSpec;
-import org.apache.samza.operators.spec.SinkOperatorSpec;
-import org.apache.samza.operators.spec.StreamOperatorSpec;
-import org.apache.samza.operators.spec.WindowOperatorSpec;
-import org.apache.samza.operators.windows.Windows;
-import org.apache.samza.operators.windows.internal.WindowInternal;
-import org.apache.samza.operators.windows.internal.WindowType;
-import org.apache.samza.task.TaskContext;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.lang.reflect.Field;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.Set;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotSame;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-
-public class TestOperatorImpls {
- Field nextOperatorsField = null;
- Method createOpMethod = null;
- Method createOpsMethod = null;
-
- @Before
- public void prep() throws NoSuchFieldException, NoSuchMethodException {
- nextOperatorsField = OperatorImpl.class.getDeclaredField("registeredOperators");
- nextOperatorsField.setAccessible(true);
-
- createOpMethod = OperatorImplGraph.class.getDeclaredMethod("createOperatorImpl",
- OperatorSpec.class, Config.class, TaskContext.class);
- createOpMethod.setAccessible(true);
-
- createOpsMethod = OperatorImplGraph.class.getDeclaredMethod("createOperatorImpls", MessageStreamImpl.class,
- Config.class, TaskContext.class);
- createOpsMethod.setAccessible(true);
- }
-
- @Test
- public void testCreateOperator() throws NoSuchFieldException, IllegalAccessException, InvocationTargetException {
- // get window operator
- WindowOperatorSpec mockWnd = mock(WindowOperatorSpec.class);
- WindowInternal<TestMessageEnvelope, String, Integer> windowInternal = new WindowInternal<>(null, null, null, null, null, WindowType.TUMBLING);
- when(mockWnd.getWindow()).thenReturn(windowInternal);
- Config mockConfig = mock(Config.class);
- TaskContext mockContext = mock(TaskContext.class);
-
- OperatorImplGraph opGraph = new OperatorImplGraph();
- OperatorImpl<TestMessageEnvelope, ?> opImpl = (OperatorImpl<TestMessageEnvelope, ?>)
- createOpMethod.invoke(opGraph, mockWnd, mockConfig, mockContext);
- assertTrue(opImpl instanceof WindowOperatorImpl);
- Field wndInternalField = WindowOperatorImpl.class.getDeclaredField("window");
- wndInternalField.setAccessible(true);
- WindowInternal wndInternal = (WindowInternal) wndInternalField.get(opImpl);
- assertEquals(wndInternal, windowInternal);
-
- // get simple operator
- StreamOperatorSpec<TestMessageEnvelope, TestOutputMessageEnvelope> mockSimpleOp = mock(StreamOperatorSpec.class);
- FlatMapFunction<TestMessageEnvelope, TestOutputMessageEnvelope> mockTxfmFn = mock(FlatMapFunction.class);
- when(mockSimpleOp.getTransformFn()).thenReturn(mockTxfmFn);
- opImpl = (OperatorImpl<TestMessageEnvelope, ?>) createOpMethod.invoke(opGraph, mockSimpleOp, mockConfig, mockContext);
- assertTrue(opImpl instanceof StreamOperatorImpl);
- Field txfmFnField = StreamOperatorImpl.class.getDeclaredField("transformFn");
- txfmFnField.setAccessible(true);
- assertEquals(mockTxfmFn, txfmFnField.get(opImpl));
-
- // get sink operator
- SinkFunction<TestMessageEnvelope> sinkFn = (m, mc, tc) -> { };
- SinkOperatorSpec<TestMessageEnvelope> sinkOp = mock(SinkOperatorSpec.class);
- when(sinkOp.getSinkFn()).thenReturn(sinkFn);
- opImpl = (OperatorImpl<TestMessageEnvelope, ?>) createOpMethod.invoke(opGraph, sinkOp, mockConfig, mockContext);
- assertTrue(opImpl instanceof SinkOperatorImpl);
- Field sinkFnField = SinkOperatorImpl.class.getDeclaredField("sinkFn");
- sinkFnField.setAccessible(true);
- assertEquals(sinkFn, sinkFnField.get(opImpl));
-
- // get join operator
- PartialJoinOperatorSpec<String, TestMessageEnvelope, TestMessageEnvelope, TestOutputMessageEnvelope> joinOp = mock(PartialJoinOperatorSpec.class);
- opImpl = (OperatorImpl<TestMessageEnvelope, ?>) createOpMethod.invoke(opGraph, joinOp, mockConfig, mockContext);
- assertTrue(opImpl instanceof PartialJoinOperatorImpl);
- }
-
- @Test
- public void testEmptyChain() throws InvocationTargetException, IllegalAccessException {
- // test creation of empty chain
- MessageStreamImpl<TestMessageEnvelope> testStream = mock(MessageStreamImpl.class);
- TaskContext mockContext = mock(TaskContext.class);
- when(mockContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap());
- Config mockConfig = mock(Config.class);
- OperatorImplGraph opGraph = new OperatorImplGraph();
- RootOperatorImpl operatorChain = (RootOperatorImpl) createOpsMethod.invoke(opGraph, testStream, mockConfig, mockContext);
- assertTrue(operatorChain != null);
- }
-
- @Test
- public void testLinearChain() throws IllegalAccessException, InvocationTargetException {
- // test creation of linear chain
- StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
- MessageStreamImpl<TestMessageEnvelope> testInput = TestMessageStreamImplUtil.getMessageStreamImpl(mockGraph);
- TaskContext mockContext = mock(TaskContext.class);
- when(mockContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap());
- Config mockConfig = mock(Config.class);
- testInput.map(m -> m).window(Windows.keyedSessionWindow(TestMessageEnvelope::getKey, Duration.ofMinutes(10)));
- OperatorImplGraph opGraph = new OperatorImplGraph();
- RootOperatorImpl operatorChain = (RootOperatorImpl) createOpsMethod.invoke(opGraph, testInput, mockConfig, mockContext);
- Set<OperatorImpl> subsSet = (Set<OperatorImpl>) nextOperatorsField.get(operatorChain);
- assertEquals(subsSet.size(), 1);
- OperatorImpl<TestMessageEnvelope, TestMessageEnvelope> firstOpImpl = subsSet.iterator().next();
- Set<OperatorImpl> subsOps = (Set<OperatorImpl>) nextOperatorsField.get(firstOpImpl);
- assertEquals(subsOps.size(), 1);
- OperatorImpl wndOpImpl = subsOps.iterator().next();
- subsOps = (Set<OperatorImpl>) nextOperatorsField.get(wndOpImpl);
- assertEquals(subsOps.size(), 0);
- }
-
- @Test
- public void testBroadcastChain() throws IllegalAccessException, InvocationTargetException {
- // test creation of broadcast chain
- StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
- MessageStreamImpl<TestMessageEnvelope> testInput = TestMessageStreamImplUtil.getMessageStreamImpl(mockGraph);
- TaskContext mockContext = mock(TaskContext.class);
- when(mockContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap());
- Config mockConfig = mock(Config.class);
- testInput.filter(m -> m.getMessage().getEventTime() > 123456L).flatMap(m -> new ArrayList() { { this.add(m); this.add(m); } });
- testInput.filter(m -> m.getMessage().getEventTime() < 123456L).map(m -> m);
- OperatorImplGraph opGraph = new OperatorImplGraph();
- RootOperatorImpl operatorChain = (RootOperatorImpl) createOpsMethod.invoke(opGraph, testInput, mockConfig, mockContext);
- Set<OperatorImpl> subsSet = (Set<OperatorImpl>) nextOperatorsField.get(operatorChain);
- assertEquals(subsSet.size(), 2);
- Iterator<OperatorImpl> iter = subsSet.iterator();
- // check the first branch w/ flatMap
- OperatorImpl<TestMessageEnvelope, TestMessageEnvelope> opImpl = iter.next();
- Set<OperatorImpl> subsOps = (Set<OperatorImpl>) nextOperatorsField.get(opImpl);
- assertEquals(subsOps.size(), 1);
- OperatorImpl flatMapImpl = subsOps.iterator().next();
- subsOps = (Set<OperatorImpl>) nextOperatorsField.get(flatMapImpl);
- assertEquals(subsOps.size(), 0);
- // check the second branch w/ map
- opImpl = iter.next();
- subsOps = (Set<OperatorImpl>) nextOperatorsField.get(opImpl);
- assertEquals(subsOps.size(), 1);
- OperatorImpl mapImpl = subsOps.iterator().next();
- subsOps = (Set<OperatorImpl>) nextOperatorsField.get(mapImpl);
- assertEquals(subsOps.size(), 0);
- }
-
- @Test
- public void testJoinChain() throws IllegalAccessException, InvocationTargetException {
- // test creation of join chain
- StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
- MessageStreamImpl<TestMessageEnvelope> input1 = TestMessageStreamImplUtil.getMessageStreamImpl(mockGraph);
- MessageStreamImpl<TestMessageEnvelope> input2 = TestMessageStreamImplUtil.getMessageStreamImpl(mockGraph);
- TaskContext mockContext = mock(TaskContext.class);
- when(mockContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap());
- Config mockConfig = mock(Config.class);
- input1
- .join(input2,
- new JoinFunction<String, TestMessageEnvelope, TestMessageEnvelope, TestOutputMessageEnvelope>() {
- @Override
- public TestOutputMessageEnvelope apply(TestMessageEnvelope m1, TestMessageEnvelope m2) {
- return new TestOutputMessageEnvelope(m1.getKey(), m1.getMessage().getValue().length() + m2.getMessage().getValue().length());
- }
-
- @Override
- public String getFirstKey(TestMessageEnvelope message) {
- return message.getKey();
- }
-
- @Override
- public String getSecondKey(TestMessageEnvelope message) {
- return message.getKey();
- }
- }, Duration.ofMinutes(1))
- .map(m -> m);
- OperatorImplGraph opGraph = new OperatorImplGraph();
- // now, we create chained operators from each input sources
- RootOperatorImpl chain1 = (RootOperatorImpl) createOpsMethod.invoke(opGraph, input1, mockConfig, mockContext);
- RootOperatorImpl chain2 = (RootOperatorImpl) createOpsMethod.invoke(opGraph, input2, mockConfig, mockContext);
- // check that those two chains will merge at map operator
- // first branch of the join
- Set<OperatorImpl> subsSet = (Set<OperatorImpl>) nextOperatorsField.get(chain1);
- assertEquals(subsSet.size(), 1);
- OperatorImpl<TestMessageEnvelope, TestOutputMessageEnvelope> joinOp1 = subsSet.iterator().next();
- Set<OperatorImpl> subsOps = (Set<OperatorImpl>) nextOperatorsField.get(joinOp1);
- assertEquals(subsOps.size(), 1);
- // the map operator consumes the common join output, where two branches merge
- OperatorImpl mapImpl = subsOps.iterator().next();
- // second branch of the join
- subsSet = (Set<OperatorImpl>) nextOperatorsField.get(chain2);
- assertEquals(subsSet.size(), 1);
- OperatorImpl<TestMessageEnvelope, TestOutputMessageEnvelope> joinOp2 = subsSet.iterator().next();
- assertNotSame(joinOp1, joinOp2);
- subsOps = (Set<OperatorImpl>) nextOperatorsField.get(joinOp2);
- assertEquals(subsOps.size(), 1);
- // make sure that the map operator is the same
- assertEquals(mapImpl, subsOps.iterator().next());
- }
-}