You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2017/06/07 19:42:20 UTC

[2/4] samza git commit: SAMZA-1221, SAMZA-1101: Internal cleanup for High-Level API implementation.

http://git-wip-us.apache.org/repos/asf/samza/blob/c1c4289c/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java b/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
index b2a5e2a..61224f2 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
@@ -19,302 +19,343 @@
 package org.apache.samza.operators;
 
 import com.google.common.collect.ImmutableList;
-import org.apache.samza.config.Config;
-import org.apache.samza.config.JobConfig;
-import org.apache.samza.config.MapConfig;
-import org.apache.samza.operators.data.MessageType;
-import org.apache.samza.operators.data.TestExtOutputMessageEnvelope;
-import org.apache.samza.operators.data.TestInputMessageEnvelope;
 import org.apache.samza.operators.data.TestMessageEnvelope;
 import org.apache.samza.operators.data.TestOutputMessageEnvelope;
 import org.apache.samza.operators.functions.FilterFunction;
 import org.apache.samza.operators.functions.FlatMapFunction;
+import org.apache.samza.operators.functions.FoldLeftFunction;
 import org.apache.samza.operators.functions.JoinFunction;
 import org.apache.samza.operators.functions.MapFunction;
 import org.apache.samza.operators.functions.SinkFunction;
+import org.apache.samza.operators.spec.JoinOperatorSpec;
 import org.apache.samza.operators.spec.OperatorSpec;
+import org.apache.samza.operators.spec.OperatorSpec.OpCode;
+import org.apache.samza.operators.spec.OutputOperatorSpec;
+import org.apache.samza.operators.spec.OutputStreamImpl;
 import org.apache.samza.operators.spec.SinkOperatorSpec;
-import org.apache.samza.operators.spec.PartialJoinOperatorSpec;
 import org.apache.samza.operators.spec.StreamOperatorSpec;
-import org.apache.samza.runtime.ApplicationRunner;
-import org.apache.samza.system.OutgoingMessageEnvelope;
-import org.apache.samza.system.SystemStream;
-import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.TaskContext;
-import org.apache.samza.task.TaskCoordinator;
+import org.apache.samza.operators.spec.WindowOperatorSpec;
+import org.apache.samza.operators.stream.IntermediateMessageStreamImpl;
+import org.apache.samza.operators.windows.Window;
+import org.apache.samza.operators.windows.WindowPane;
+import org.apache.samza.operators.windows.Windows;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 
 import java.time.Duration;
-import java.util.ArrayList;
 import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.Collections;
+import java.util.function.BiFunction;
 import java.util.function.Function;
+import java.util.function.Supplier;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
-
+@SuppressWarnings("unchecked")
 public class TestMessageStreamImpl {
 
-  private StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
-
   @Test
   public void testMap() {
-    MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph);
-    MapFunction<TestMessageEnvelope, TestOutputMessageEnvelope> xMap = (TestMessageEnvelope m)  ->
-        new TestOutputMessageEnvelope(m.getKey(), m.getMessage().getValue().length() + 1);
-    MessageStream<TestOutputMessageEnvelope> outputStream = inputStream.map(xMap);
-    Collection<OperatorSpec> subs = inputStream.getRegisteredOperatorSpecs();
-    assertEquals(subs.size(), 1);
-    OperatorSpec<TestOutputMessageEnvelope> mapOp = subs.iterator().next();
-    assertTrue(mapOp instanceof StreamOperatorSpec);
-    assertEquals(mapOp.getNextStream(), outputStream);
-    // assert that the transformation function is what we defined above
-    TestMessageEnvelope xTestMsg = mock(TestMessageEnvelope.class);
-    MessageType mockInnerTestMessage = mock(MessageType.class);
-    when(xTestMsg.getKey()).thenReturn("test-msg-key");
-    when(xTestMsg.getMessage()).thenReturn(mockInnerTestMessage);
-    when(mockInnerTestMessage.getValue()).thenReturn("123456789");
-
-    Collection<TestOutputMessageEnvelope> cOutputMsg = ((StreamOperatorSpec<TestMessageEnvelope, TestOutputMessageEnvelope>) mapOp).getTransformFn().apply(xTestMsg);
-    assertEquals(cOutputMsg.size(), 1);
-    TestOutputMessageEnvelope outputMessage = cOutputMsg.iterator().next();
-    assertEquals(outputMessage.getKey(), xTestMsg.getKey());
-    assertEquals(outputMessage.getMessage(), Integer.valueOf(xTestMsg.getMessage().getValue().length() + 1));
+    StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
+    OperatorSpec mockOpSpec = mock(OperatorSpec.class);
+    MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph, mockOpSpec);
+
+    MapFunction<TestMessageEnvelope, TestOutputMessageEnvelope> mockMapFn = mock(MapFunction.class);
+    inputStream.map(mockMapFn);
+
+    ArgumentCaptor<OperatorSpec> registeredOpCaptor = ArgumentCaptor.forClass(OperatorSpec.class);
+    verify(mockOpSpec).registerNextOperatorSpec(registeredOpCaptor.capture());
+    OperatorSpec<?, TestMessageEnvelope> registeredOpSpec = registeredOpCaptor.getValue();
+    assertTrue(registeredOpSpec instanceof StreamOperatorSpec);
+
+    FlatMapFunction transformFn = ((StreamOperatorSpec) registeredOpSpec).getTransformFn();
+    assertNotNull(transformFn);
+    assertEquals(OpCode.MAP, registeredOpSpec.getOpCode());
+
+    TestOutputMessageEnvelope mockOutput = mock(TestOutputMessageEnvelope.class);
+    when(mockMapFn.apply(anyObject())).thenReturn(mockOutput);
+    assertTrue(transformFn.apply(new Object()).contains(mockOutput));
+    when(mockMapFn.apply(anyObject())).thenReturn(null);
+    assertTrue(transformFn.apply(null).isEmpty());
   }
 
   @Test
   public void testFlatMap() {
-    MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph);
-    List<TestOutputMessageEnvelope> flatOuts = new ArrayList<TestOutputMessageEnvelope>() { {
-        this.add(mock(TestOutputMessageEnvelope.class));
-        this.add(mock(TestOutputMessageEnvelope.class));
-        this.add(mock(TestOutputMessageEnvelope.class));
-      } };
-    final List<TestMessageEnvelope> inputMsgs = new ArrayList<>();
-    FlatMapFunction<TestMessageEnvelope, TestOutputMessageEnvelope> xFlatMap = (TestMessageEnvelope message) -> {
-      inputMsgs.add(message);
-      return flatOuts;
-    };
-    MessageStream<TestOutputMessageEnvelope> outputStream = inputStream.flatMap(xFlatMap);
-    Collection<OperatorSpec> subs = inputStream.getRegisteredOperatorSpecs();
-    assertEquals(subs.size(), 1);
-    OperatorSpec<TestOutputMessageEnvelope> flatMapOp = subs.iterator().next();
-    assertTrue(flatMapOp instanceof StreamOperatorSpec);
-    assertEquals(flatMapOp.getNextStream(), outputStream);
-    assertEquals(((StreamOperatorSpec) flatMapOp).getTransformFn(), xFlatMap);
-
-    TestMessageEnvelope mockInput  = mock(TestMessageEnvelope.class);
-    // assert that the transformation function is what we defined above
-    List<TestOutputMessageEnvelope> result = (List<TestOutputMessageEnvelope>)
-        ((StreamOperatorSpec<TestMessageEnvelope, TestOutputMessageEnvelope>) flatMapOp).getTransformFn().apply(mockInput);
-    assertEquals(flatOuts, result);
-    assertEquals(inputMsgs.size(), 1);
-    assertEquals(inputMsgs.get(0), mockInput);
+    StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
+    OperatorSpec mockOpSpec = mock(OperatorSpec.class);
+    MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph, mockOpSpec);
+
+    inputStream.flatMap(mock(FlatMapFunction.class));
+
+    ArgumentCaptor<OperatorSpec> registeredOpCaptor = ArgumentCaptor.forClass(OperatorSpec.class);
+    verify(mockOpSpec).registerNextOperatorSpec(registeredOpCaptor.capture());
+    OperatorSpec<?, TestMessageEnvelope> registeredOpSpec = registeredOpCaptor.getValue();
+
+    assertTrue(registeredOpSpec instanceof StreamOperatorSpec);
+    assertNotNull(((StreamOperatorSpec) registeredOpSpec).getTransformFn());
+    assertEquals(OpCode.FLAT_MAP, registeredOpSpec.getOpCode());
   }
 
   @Test
   public void testFlatMapWithRelaxedTypes() {
-    MessageStreamImpl<TestInputMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph);
-    List<TestExtOutputMessageEnvelope> flatOuts = new ArrayList<TestExtOutputMessageEnvelope>() { {
-        this.add(new TestExtOutputMessageEnvelope("output-key-1", 1, "output-id-001"));
-        this.add(new TestExtOutputMessageEnvelope("output-key-2", 2, "output-id-002"));
-        this.add(new TestExtOutputMessageEnvelope("output-key-3", 3, "output-id-003"));
-      } };
-
-    class MyFlatMapFunction implements FlatMapFunction<TestMessageEnvelope, TestExtOutputMessageEnvelope> {
-      public final List<TestMessageEnvelope> inputMsgs = new ArrayList<>();
-
-      @Override
-      public Collection<TestExtOutputMessageEnvelope> apply(TestMessageEnvelope message) {
-        inputMsgs.add(message);
-        return flatOuts;
-      }
-
-      @Override
-      public void init(Config config, TaskContext context) {
-        inputMsgs.clear();
-      }
-    }
-
-    MyFlatMapFunction xFlatMap = new MyFlatMapFunction();
-
-    MessageStream<TestOutputMessageEnvelope> outputStream = inputStream.flatMap(xFlatMap);
-    Collection<OperatorSpec> subs = inputStream.getRegisteredOperatorSpecs();
-    assertEquals(subs.size(), 1);
-    OperatorSpec<TestOutputMessageEnvelope> flatMapOp = subs.iterator().next();
-    assertTrue(flatMapOp instanceof StreamOperatorSpec);
-    assertEquals(flatMapOp.getNextStream(), outputStream);
-    assertEquals(((StreamOperatorSpec) flatMapOp).getTransformFn(), xFlatMap);
-
-    TestMessageEnvelope mockInput  = mock(TestMessageEnvelope.class);
-    // assert that the transformation function is what we defined above
-    List<TestOutputMessageEnvelope> result = (List<TestOutputMessageEnvelope>)
-        ((StreamOperatorSpec<TestMessageEnvelope, TestOutputMessageEnvelope>) flatMapOp).getTransformFn().apply(mockInput);
-    assertEquals(flatOuts, result);
-    assertEquals(xFlatMap.inputMsgs.size(), 1);
-    assertEquals(xFlatMap.inputMsgs.get(0), mockInput);
+    StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
+    OperatorSpec mockOpSpec = mock(OperatorSpec.class);
+    MessageStreamImpl<TestInputMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph, mockOpSpec);
+
+    FlatMapFunction flatMapFunction =
+        (FlatMapFunction<TestMessageEnvelope, TestOutputMessageEnvelope>) message -> Collections.emptyList();
+    // should compile since TestInputMessageEnvelope extends TestMessageEnvelope
+    inputStream.flatMap(flatMapFunction);
+
+    ArgumentCaptor<OperatorSpec> registeredOpCaptor = ArgumentCaptor.forClass(OperatorSpec.class);
+    verify(mockOpSpec).registerNextOperatorSpec(registeredOpCaptor.capture());
+    OperatorSpec<?, TestMessageEnvelope> registeredOpSpec = registeredOpCaptor.getValue();
+
+    assertTrue(registeredOpSpec instanceof StreamOperatorSpec);
+    assertNotNull(((StreamOperatorSpec) registeredOpSpec).getTransformFn());
+    assertEquals(OpCode.FLAT_MAP, registeredOpSpec.getOpCode());
   }
 
   @Test
   public void testFilter() {
-    MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph);
-    FilterFunction<TestMessageEnvelope> xFilter = (TestMessageEnvelope m) -> m.getMessage().getEventTime() > 123456L;
-    MessageStream<TestMessageEnvelope> outputStream = inputStream.filter(xFilter);
-    Collection<OperatorSpec> subs = inputStream.getRegisteredOperatorSpecs();
-    assertEquals(subs.size(), 1);
-    OperatorSpec<TestMessageEnvelope> filterOp = subs.iterator().next();
-    assertTrue(filterOp instanceof StreamOperatorSpec);
-    assertEquals(filterOp.getNextStream(), outputStream);
-    // assert that the transformation function is what we defined above
-    FlatMapFunction<TestMessageEnvelope, TestMessageEnvelope> txfmFn = ((StreamOperatorSpec<TestMessageEnvelope, TestMessageEnvelope>) filterOp).getTransformFn();
-    TestMessageEnvelope mockMsg = mock(TestMessageEnvelope.class);
-    MessageType mockInnerTestMessage = mock(MessageType.class);
-    when(mockMsg.getMessage()).thenReturn(mockInnerTestMessage);
-    when(mockInnerTestMessage.getEventTime()).thenReturn(11111L);
-    Collection<TestMessageEnvelope> output = txfmFn.apply(mockMsg);
-    assertTrue(output.isEmpty());
-    when(mockMsg.getMessage()).thenReturn(mockInnerTestMessage);
-    when(mockInnerTestMessage.getEventTime()).thenReturn(999999L);
-    output = txfmFn.apply(mockMsg);
-    assertEquals(output.size(), 1);
-    assertEquals(output.iterator().next(), mockMsg);
+    StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
+    OperatorSpec mockOpSpec = mock(OperatorSpec.class);
+    MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph, mockOpSpec);
+
+    FilterFunction<Object> mockFilterFn = mock(FilterFunction.class);
+    inputStream.filter(mockFilterFn);
+
+    ArgumentCaptor<OperatorSpec> registeredOpCaptor = ArgumentCaptor.forClass(OperatorSpec.class);
+    verify(mockOpSpec).registerNextOperatorSpec(registeredOpCaptor.capture());
+    OperatorSpec<?, TestMessageEnvelope> registeredOpSpec = registeredOpCaptor.getValue();
+    assertTrue(registeredOpSpec instanceof StreamOperatorSpec);
+
+    FlatMapFunction transformFn = ((StreamOperatorSpec) registeredOpSpec).getTransformFn();
+    assertNotNull(transformFn);
+    assertEquals(OpCode.FILTER, registeredOpSpec.getOpCode());
+
+    Object mockInput = new Object();
+    when(mockFilterFn.apply(anyObject())).thenReturn(true);
+    assertTrue(transformFn.apply(mockInput).contains(mockInput));
+    when(mockFilterFn.apply(anyObject())).thenReturn(false);
+    assertTrue(transformFn.apply(mockInput).isEmpty());
   }
 
   @Test
   public void testSink() {
-    MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph);
-    SystemStream testStream = new SystemStream("test-sys", "test-stream");
-    SinkFunction<TestMessageEnvelope> xSink = (TestMessageEnvelope m, MessageCollector mc, TaskCoordinator tc) -> {
-      mc.send(new OutgoingMessageEnvelope(testStream, m.getMessage()));
-      tc.commit(TaskCoordinator.RequestScope.CURRENT_TASK);
-    };
-    inputStream.sink(xSink);
-    Collection<OperatorSpec> subs = inputStream.getRegisteredOperatorSpecs();
-    assertEquals(subs.size(), 1);
-    OperatorSpec<TestMessageEnvelope> sinkOp = subs.iterator().next();
-    assertTrue(sinkOp instanceof SinkOperatorSpec);
-    assertEquals(((SinkOperatorSpec) sinkOp).getSinkFn(), xSink);
-
-    TestMessageEnvelope mockTest1 = mock(TestMessageEnvelope.class);
-    MessageType mockMsgBody = mock(MessageType.class);
-    when(mockTest1.getMessage()).thenReturn(mockMsgBody);
-    final List<OutgoingMessageEnvelope> outMsgs = new ArrayList<>();
-    MessageCollector mockCollector = mock(MessageCollector.class);
-    doAnswer(invocation -> {
-        outMsgs.add((OutgoingMessageEnvelope) invocation.getArguments()[0]);
-        return null;
-      }).when(mockCollector).send(any());
-    TaskCoordinator mockCoordinator = mock(TaskCoordinator.class);
-    ((SinkOperatorSpec) sinkOp).getSinkFn().apply(mockTest1, mockCollector, mockCoordinator);
-    assertEquals(1, outMsgs.size());
-    assertEquals(testStream, outMsgs.get(0).getSystemStream());
-    assertEquals(mockMsgBody, outMsgs.get(0).getMessage());
+    StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
+    OperatorSpec mockOpSpec = mock(OperatorSpec.class);
+    MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph, mockOpSpec);
+
+    inputStream.sink(mock(SinkFunction.class));
+
+    ArgumentCaptor<OperatorSpec> registeredOpCaptor = ArgumentCaptor.forClass(OperatorSpec.class);
+    verify(mockOpSpec).registerNextOperatorSpec(registeredOpCaptor.capture());
+    OperatorSpec<?, TestMessageEnvelope> registeredOpSpec = registeredOpCaptor.getValue();
+
+    assertTrue(registeredOpSpec instanceof SinkOperatorSpec);
+    assertNotNull(((SinkOperatorSpec) registeredOpSpec).getSinkFn());
+    assertEquals(OpCode.SINK, registeredOpSpec.getOpCode());
   }
 
   @Test
-  public void testJoin() {
-    MessageStreamImpl<TestMessageEnvelope> source1 = new MessageStreamImpl<>(mockGraph);
-    MessageStreamImpl<TestMessageEnvelope> source2 = new MessageStreamImpl<>(mockGraph);
-    JoinFunction<String, TestMessageEnvelope, TestMessageEnvelope, TestOutputMessageEnvelope> joiner =
-      new JoinFunction<String, TestMessageEnvelope, TestMessageEnvelope, TestOutputMessageEnvelope>() {
-        @Override
-        public TestOutputMessageEnvelope apply(TestMessageEnvelope m1, TestMessageEnvelope m2) {
-          return new TestOutputMessageEnvelope(m1.getKey(), m1.getMessage().getValue().length() + m2.getMessage().getValue().length());
-        }
-
-        @Override
-        public String getFirstKey(TestMessageEnvelope message) {
-          return message.getKey();
-        }
-
-        @Override
-        public String getSecondKey(TestMessageEnvelope message) {
-          return message.getKey();
-        }
-      };
-
-    MessageStream<TestOutputMessageEnvelope> joinOutput = source1.join(source2, joiner, Duration.ofMinutes(1));
-    Collection<OperatorSpec> subs = source1.getRegisteredOperatorSpecs();
-    assertEquals(subs.size(), 1);
-    OperatorSpec<TestMessageEnvelope> joinOp1 = subs.iterator().next();
-    assertTrue(joinOp1 instanceof PartialJoinOperatorSpec);
-    assertEquals(((PartialJoinOperatorSpec) joinOp1).getNextStream(), joinOutput);
-    subs = source2.getRegisteredOperatorSpecs();
-    assertEquals(subs.size(), 1);
-    OperatorSpec<TestMessageEnvelope> joinOp2 = subs.iterator().next();
-    assertTrue(joinOp2 instanceof PartialJoinOperatorSpec);
-    assertEquals(((PartialJoinOperatorSpec) joinOp2).getNextStream(), joinOutput);
-    TestMessageEnvelope joinMsg1 = new TestMessageEnvelope("test-join-1", "join-msg-001", 11111L);
-    TestMessageEnvelope joinMsg2 = new TestMessageEnvelope("test-join-2", "join-msg-002", 22222L);
-    TestOutputMessageEnvelope xOut = (TestOutputMessageEnvelope) ((PartialJoinOperatorSpec) joinOp1).getThisPartialJoinFn().apply(joinMsg1, joinMsg2);
-    assertEquals(xOut.getKey(), "test-join-1");
-    assertEquals(xOut.getMessage(), Integer.valueOf(24));
-    xOut = (TestOutputMessageEnvelope) ((PartialJoinOperatorSpec) joinOp2).getThisPartialJoinFn().apply(joinMsg2, joinMsg1);
-    assertEquals(xOut.getKey(), "test-join-1");
-    assertEquals(xOut.getMessage(), Integer.valueOf(24));
+  public void testSendTo() {
+    StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
+    OperatorSpec mockOpSpec = mock(OperatorSpec.class);
+    MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph, mockOpSpec);
+
+    OutputStreamImpl mockOutputOpSpec = mock(OutputStreamImpl.class);
+    inputStream.sendTo(mockOutputOpSpec);
+
+    ArgumentCaptor<OperatorSpec> registeredOpCaptor = ArgumentCaptor.forClass(OperatorSpec.class);
+    verify(mockOpSpec).registerNextOperatorSpec(registeredOpCaptor.capture());
+    OperatorSpec<?, TestMessageEnvelope> registeredOpSpec = registeredOpCaptor.getValue();
+
+    assertTrue(registeredOpSpec instanceof OutputOperatorSpec);
+    assertEquals(OpCode.SEND_TO, registeredOpSpec.getOpCode());
+    assertEquals(mockOutputOpSpec, ((OutputOperatorSpec) registeredOpSpec).getOutputStream());
   }
 
   @Test
-  public void testMerge() {
-    MessageStream<TestMessageEnvelope> merge1 = new MessageStreamImpl<>(mockGraph);
-    Collection<MessageStream<TestMessageEnvelope>> others = ImmutableList.of(
-        new MessageStreamImpl<>(mockGraph), new MessageStreamImpl<>(mockGraph));
-    MessageStream<TestMessageEnvelope> mergeOutput = merge1.merge(others);
-    validateMergeOperator(merge1, mergeOutput);
+  public void testPartitionBy() {
+    StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
+    OperatorSpec mockOpSpec = mock(OperatorSpec.class);
+
+    String streamName = String.format("%s-%s", OperatorSpec.OpCode.PARTITION_BY.name().toLowerCase(), 0);
+    Function<TestMessageEnvelope, String> mockKeyFn = mock(Function.class);
+    OutputStreamImpl mockOutputOpSpec = mock(OutputStreamImpl.class);
+    IntermediateMessageStreamImpl mockIntermediateStream = mock(IntermediateMessageStreamImpl.class);
+    when(mockGraph.getIntermediateStream(eq(streamName), eq(mockKeyFn), any(Function.class), any(BiFunction.class)))
+        .thenReturn(mockIntermediateStream);
+    when(mockIntermediateStream.getOutputStream())
+        .thenReturn(mockOutputOpSpec);
+
+    MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph, mockOpSpec);
+    inputStream.partitionBy(mockKeyFn);
+
+    ArgumentCaptor<OperatorSpec> registeredOpCaptor = ArgumentCaptor.forClass(OperatorSpec.class);
+    verify(mockOpSpec).registerNextOperatorSpec(registeredOpCaptor.capture());
+    OperatorSpec<?, TestMessageEnvelope> registeredOpSpec = registeredOpCaptor.getValue();
+
+    assertTrue(registeredOpSpec instanceof OutputOperatorSpec);
+    assertEquals(OpCode.PARTITION_BY, registeredOpSpec.getOpCode());
+    assertEquals(mockOutputOpSpec, ((OutputOperatorSpec) registeredOpSpec).getOutputStream());
+  }
 
-    others.forEach(merge -> validateMergeOperator(merge, mergeOutput));
+  @Test
+  public void testWindowWithRelaxedTypes() throws Exception {
+    StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
+    OperatorSpec mockOpSpec = mock(OperatorSpec.class);
+    MessageStream<TestInputMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph, mockOpSpec);
+
+    Function<TestMessageEnvelope, String> keyExtractor = m -> m.getKey();
+    FoldLeftFunction<TestMessageEnvelope, Integer> aggregator = (m, c) -> c + 1;
+    Supplier<Integer> initialValue = () -> 0;
+
+    // should compile since TestMessageEnvelope (input for functions) is base class of TestInputMessageEnvelope (M)
+    Window<TestInputMessageEnvelope, String, Integer> window = Windows
+        .keyedTumblingWindow(keyExtractor, Duration.ofHours(1), initialValue, aggregator);
+    MessageStream<WindowPane<String, Integer>> windowedStream = inputStream.window(window);
+
+    ArgumentCaptor<OperatorSpec> registeredOpCaptor = ArgumentCaptor.forClass(OperatorSpec.class);
+    verify(mockOpSpec).registerNextOperatorSpec(registeredOpCaptor.capture());
+    OperatorSpec<?, TestMessageEnvelope> registeredOpSpec = registeredOpCaptor.getValue();
+
+    assertTrue(registeredOpSpec instanceof WindowOperatorSpec);
+    assertEquals(OpCode.WINDOW, registeredOpSpec.getOpCode());
+    assertEquals(window, ((WindowOperatorSpec) registeredOpSpec).getWindow());
   }
 
   @Test
-  public void testMergeWithRelaxedTypes() {
-    MessageStream<TestMessageEnvelope> input1 = new MessageStreamImpl<>(mockGraph);
-    Collection<MessageStream<? extends TestMessageEnvelope>> others = ImmutableList.of(
-        new MessageStreamImpl<TestInputMessageEnvelope>(mockGraph),
-        new MessageStreamImpl<TestMessageEnvelope>(mockGraph));
+  public void testJoin() {
+    StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
+    OperatorSpec leftInputOpSpec = mock(OperatorSpec.class);
+    MessageStreamImpl<TestMessageEnvelope> source1 = new MessageStreamImpl<>(mockGraph, leftInputOpSpec);
+    OperatorSpec rightInputOpSpec = mock(OperatorSpec.class);
+    MessageStreamImpl<TestMessageEnvelope> source2 = new MessageStreamImpl<>(mockGraph, rightInputOpSpec);
+
+    JoinFunction<String, TestMessageEnvelope, TestMessageEnvelope, TestOutputMessageEnvelope> mockJoinFn =
+        mock(JoinFunction.class);
+
+    Duration joinTtl = Duration.ofMinutes(1);
+    source1.join(source2, mockJoinFn, joinTtl);
+
+    ArgumentCaptor<OperatorSpec> leftRegisteredOpCaptor = ArgumentCaptor.forClass(OperatorSpec.class);
+    verify(leftInputOpSpec).registerNextOperatorSpec(leftRegisteredOpCaptor.capture());
+    OperatorSpec<?, TestMessageEnvelope> leftRegisteredOpSpec = leftRegisteredOpCaptor.getValue();
+
+    ArgumentCaptor<OperatorSpec> rightRegisteredOpCaptor = ArgumentCaptor.forClass(OperatorSpec.class);
+    verify(rightInputOpSpec).registerNextOperatorSpec(rightRegisteredOpCaptor.capture());
+    OperatorSpec<?, TestMessageEnvelope> rightRegisteredOpSpec = rightRegisteredOpCaptor.getValue();
+
+    assertEquals(leftRegisteredOpSpec, rightRegisteredOpSpec);
+    assertEquals(OpCode.JOIN, leftRegisteredOpSpec.getOpCode());
+    assertTrue(leftRegisteredOpSpec instanceof JoinOperatorSpec);
+    assertEquals(mockJoinFn, ((JoinOperatorSpec) leftRegisteredOpSpec).getJoinFn());
+    assertEquals(joinTtl.toMillis(), ((JoinOperatorSpec) leftRegisteredOpSpec).getTtlMs());
+    assertEquals(leftInputOpSpec, ((JoinOperatorSpec) leftRegisteredOpSpec).getLeftInputOpSpec());
+    assertEquals(rightInputOpSpec, ((JoinOperatorSpec) leftRegisteredOpSpec).getRightInputOpSpec());
+  }
 
-    // should compile
-    MessageStream<TestMessageEnvelope> mergeOutput = input1.merge(others);
-    validateMergeOperator(input1, mergeOutput);
+  @Test
+  public void testMerge() {
+    StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
+    OperatorSpec mockOpSpec1 = mock(OperatorSpec.class);
+    MessageStream<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph, mockOpSpec1);
+
+    // other streams have the same message type T as input stream message type M
+    OperatorSpec mockOpSpec2 = mock(OperatorSpec.class);
+    OperatorSpec mockOpSpec3 = mock(OperatorSpec.class);
+    Collection<MessageStream<TestMessageEnvelope>> otherStreams1 = ImmutableList.of(
+        new MessageStreamImpl<>(mockGraph, mockOpSpec2),
+        new MessageStreamImpl<>(mockGraph, mockOpSpec3)
+    );
+
+    inputStream.merge(otherStreams1);
+
+    ArgumentCaptor<OperatorSpec> registeredOpCaptor1 = ArgumentCaptor.forClass(OperatorSpec.class);
+    verify(mockOpSpec1).registerNextOperatorSpec(registeredOpCaptor1.capture());
+    OperatorSpec<?, TestMessageEnvelope> registeredOpSpec1 = registeredOpCaptor1.getValue();
+    assertTrue(registeredOpSpec1 instanceof StreamOperatorSpec);
+    FlatMapFunction transformFn = ((StreamOperatorSpec) registeredOpSpec1).getTransformFn();
+
+    ArgumentCaptor<OperatorSpec> registeredOpCaptor2 = ArgumentCaptor.forClass(OperatorSpec.class);
+    verify(mockOpSpec2).registerNextOperatorSpec(registeredOpCaptor2.capture());
+    OperatorSpec<?, TestMessageEnvelope> registeredOpSpec2 = registeredOpCaptor2.getValue();
+
+    ArgumentCaptor<OperatorSpec> registeredOpCaptor3 = ArgumentCaptor.forClass(OperatorSpec.class);
+    verify(mockOpSpec3).registerNextOperatorSpec(registeredOpCaptor3.capture());
+    OperatorSpec<?, TestMessageEnvelope> registeredOpSpec3 = registeredOpCaptor3.getValue();
+
+    assertEquals(registeredOpSpec1, registeredOpSpec2);
+    assertEquals(registeredOpSpec2, registeredOpSpec3);
+    assertEquals(OpCode.MERGE, registeredOpSpec1.getOpCode());
+
+    assertNotNull(transformFn);
+    TestMessageEnvelope mockInput = mock(TestMessageEnvelope.class);
+    assertTrue(transformFn.apply(mockInput).contains(mockInput));
+    assertEquals(1, transformFn.apply(mockInput).size());
+  }
 
-    others.forEach(merge -> validateMergeOperator((MessageStream<TestMessageEnvelope>) merge, mergeOutput));
+  @Test
+  public void testMergeWithRelaxedTypes() {
+    StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
+    MessageStream<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph, mock(OperatorSpec.class));
+
+    // other streams have the same message type T as input stream message type M
+    Collection<MessageStream<TestMessageEnvelope>> otherStreams1 = ImmutableList.of(
+        new MessageStreamImpl<>(mockGraph, mock(OperatorSpec.class)),
+        new MessageStreamImpl<>(mockGraph, mock(OperatorSpec.class))
+    );
+
+    // other streams have the same message type T that extends as input stream message type M
+    Collection<MessageStream<TestInputMessageEnvelope>> otherStreams2 = ImmutableList.of(
+        new MessageStreamImpl<TestInputMessageEnvelope>(mockGraph, mock(OperatorSpec.class)),
+        new MessageStreamImpl<TestInputMessageEnvelope>(mockGraph, mock(OperatorSpec.class))
+    );
+
+    // other streams have a mix of message types such that T extends input stream message type M
+    Collection<MessageStream<TestMessageEnvelope>> otherStreams3 = ImmutableList.of(
+        new MessageStreamImpl<TestMessageEnvelope>(mockGraph, mock(OperatorSpec.class)),
+        // unchecked cast required for the next stream
+        (MessageStream) new MessageStreamImpl<TestInputMessageEnvelope>(mockGraph, mock(OperatorSpec.class))
+    );
+
+    // not supported:
+    // other streams have a mix of message types such that T extends input stream message type M
+    Collection<MessageStream<? extends TestMessageEnvelope>> otherStreams4 = ImmutableList.of(
+        new MessageStreamImpl<TestMessageEnvelope>(mockGraph, mock(OperatorSpec.class)),
+        new MessageStreamImpl<TestInputMessageEnvelope>(mockGraph, mock(OperatorSpec.class))
+    );
+
+    // check if all type combinations compile
+    inputStream.merge(otherStreams1);
+    inputStream.merge(otherStreams2);
+    inputStream.merge(otherStreams3);
+    inputStream.merge(otherStreams4);
   }
 
   @Test
   public <T> void testMergeWithNestedTypes() {
     class MessageEnvelope<TM> { }
-    MessageStream<MessageEnvelope<T>> ms1 = new MessageStreamImpl<>(mock(StreamGraphImpl.class));
-    MessageStream<MessageEnvelope<T>> ms2 = new MessageStreamImpl<>(mock(StreamGraphImpl.class));
-    MessageStream<MessageEnvelope<T>> ms3 = new MessageStreamImpl<>(mock(StreamGraphImpl.class));
+    MessageStream<MessageEnvelope<T>> ms1 = mock(MessageStreamImpl.class);
+    MessageStream<MessageEnvelope<T>> ms2 = mock(MessageStreamImpl.class);
+    MessageStream<MessageEnvelope<T>> ms3 = mock(MessageStreamImpl.class);
     Collection<MessageStream<MessageEnvelope<T>>> otherStreams = ImmutableList.of(ms2, ms3);
 
     // should compile
     ms1.merge(otherStreams);
   }
 
-  private void validateMergeOperator(MessageStream<TestMessageEnvelope> mergeSource, MessageStream<TestMessageEnvelope> mergeOutput) {
-    Collection<OperatorSpec> subs = ((MessageStreamImpl<TestMessageEnvelope>) mergeSource).getRegisteredOperatorSpecs();
-    assertEquals(subs.size(), 1);
-    OperatorSpec<TestMessageEnvelope> mergeOp = subs.iterator().next();
-    assertTrue(mergeOp instanceof StreamOperatorSpec);
-    assertEquals(((StreamOperatorSpec) mergeOp).getNextStream(), mergeOutput);
-    TestMessageEnvelope mockMsg = mock(TestMessageEnvelope.class);
-    Collection<TestMessageEnvelope> outputs = ((StreamOperatorSpec<TestMessageEnvelope, TestMessageEnvelope>) mergeOp).getTransformFn().apply(
-        mockMsg);
-    assertEquals(outputs.size(), 1);
-    assertEquals(outputs.iterator().next(), mockMsg);
-  }
-
   @Test
   public void testMergeAll() {
     MessageStream<TestMessageEnvelope> input1 = mock(MessageStreamImpl.class);
@@ -361,26 +402,9 @@ public class TestMessageStreamImpl {
     assertTrue(otherStreamsCaptor.getValue().contains(input3));
   }
 
-  @Test
-  public void testPartitionBy() {
-    Map<String, String> map = new HashMap<>();
-    map.put(JobConfig.JOB_DEFAULT_SYSTEM(), "testsystem");
-    Config config = new MapConfig(map);
-    ApplicationRunner runner = ApplicationRunner.fromConfig(config);
-    StreamGraphImpl streamGraph = new StreamGraphImpl(runner, config);
-    MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(streamGraph);
-    Function<TestMessageEnvelope, String> keyExtractorFunc = m -> "222";
-    inputStream.partitionBy(keyExtractorFunc);
-    assertTrue(streamGraph.getInputStreams().size() == 1);
-    assertTrue(streamGraph.getOutputStreams().size() == 1);
-
-    Collection<OperatorSpec> subs = inputStream.getRegisteredOperatorSpecs();
-    assertEquals(subs.size(), 1);
-    OperatorSpec<TestMessageEnvelope> partitionByOp = subs.iterator().next();
-    assertTrue(partitionByOp instanceof SinkOperatorSpec);
-    assertNull(partitionByOp.getNextStream());
-
-    ((SinkOperatorSpec) partitionByOp).getSinkFn().apply(new TestMessageEnvelope("111", "test", 1000),
-        envelope -> assertTrue(envelope.getPartitionKey().equals("222")), null);
+  class TestInputMessageEnvelope extends TestMessageEnvelope {
+    public TestInputMessageEnvelope(String key, Object value) {
+      super(key, value);
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/c1c4289c/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImplUtil.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImplUtil.java b/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImplUtil.java
deleted file mode 100644
index c4e9f51..0000000
--- a/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImplUtil.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators;
-
-
-public class TestMessageStreamImplUtil {
-  public static <M> MessageStreamImpl<M> getMessageStreamImpl(StreamGraphImpl graph) {
-    return new MessageStreamImpl<M>(graph);
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/c1c4289c/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphImpl.java b/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphImpl.java
index 9d95217..1fc60bd 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphImpl.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphImpl.java
@@ -21,23 +21,21 @@ package org.apache.samza.operators;
 import junit.framework.Assert;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
-import org.apache.samza.operators.data.MessageType;
-import org.apache.samza.operators.data.TestInputMessageEnvelope;
 import org.apache.samza.operators.data.TestMessageEnvelope;
-import org.apache.samza.operators.stream.InputStreamInternal;
-import org.apache.samza.operators.stream.InputStreamInternalImpl;
-import org.apache.samza.operators.stream.IntermediateStreamInternalImpl;
-import org.apache.samza.operators.stream.OutputStreamInternalImpl;
+import org.apache.samza.operators.spec.InputOperatorSpec;
+import org.apache.samza.operators.spec.OperatorSpec.OpCode;
+import org.apache.samza.operators.spec.OutputStreamImpl;
+import org.apache.samza.operators.stream.IntermediateMessageStreamImpl;
 import org.apache.samza.runtime.ApplicationRunner;
 import org.apache.samza.system.StreamSpec;
 import org.junit.Test;
 
 import java.util.ArrayList;
+import java.util.List;
 import java.util.function.BiFunction;
 import java.util.function.Function;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -46,152 +44,141 @@ public class TestStreamGraphImpl {
   @Test
   public void testGetInputStream() {
     ApplicationRunner mockRunner = mock(ApplicationRunner.class);
-    Config mockConfig = mock(Config.class);
-    StreamSpec testStreamSpec = new StreamSpec("test-stream-1", "physical-stream-1", "test-system");
-    when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(testStreamSpec);
+    StreamSpec mockStreamSpec = mock(StreamSpec.class);
+    when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
+    StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
+    BiFunction<String, String, TestMessageEnvelope> mockMsgBuilder = mock(BiFunction.class);
+
+    MessageStream<TestMessageEnvelope> inputStream = graph.getInputStream("test-stream-1", mockMsgBuilder);
+
+    InputOperatorSpec<String, String, TestMessageEnvelope> inputOpSpec =
+        (InputOperatorSpec) ((MessageStreamImpl<TestMessageEnvelope>) inputStream).getOperatorSpec();
+    assertEquals(OpCode.INPUT, inputOpSpec.getOpCode());
+    assertEquals(graph.getInputOperators().get(mockStreamSpec), inputOpSpec);
+    assertEquals(mockMsgBuilder, inputOpSpec.getMsgBuilder());
+    assertEquals(mockStreamSpec, inputOpSpec.getStreamSpec());
+  }
 
-    StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig);
-    BiFunction<String, MessageType, TestInputMessageEnvelope> xMsgBuilder =
-        (k, v) -> new TestInputMessageEnvelope(k, v.getValue(), v.getEventTime(), "input-id-1");
-    MessageStream<TestMessageEnvelope> mInputStream = graph.getInputStream("test-stream-1", xMsgBuilder);
-    assertEquals(graph.getInputStreams().get(testStreamSpec), mInputStream);
-    assertTrue(mInputStream instanceof InputStreamInternalImpl);
-    assertEquals(((InputStreamInternalImpl) mInputStream).getMsgBuilder(), xMsgBuilder);
-
-    String key = "test-input-key";
-    MessageType msgBody = new MessageType("test-msg-value", 333333L);
-    TestMessageEnvelope xInputMsg = ((InputStreamInternalImpl<String, MessageType, TestMessageEnvelope>) mInputStream).
-        getMsgBuilder().apply(key, msgBody);
-    assertEquals(xInputMsg.getKey(), key);
-    assertEquals(xInputMsg.getMessage().getValue(), msgBody.getValue());
-    assertEquals(xInputMsg.getMessage().getEventTime(), msgBody.getEventTime());
-    assertEquals(((TestInputMessageEnvelope) xInputMsg).getInputId(), "input-id-1");
+  @Test
+  public void testGetInputStreamWithRelaxedTypes() {
+    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+    StreamSpec mockStreamSpec = mock(StreamSpec.class);
+    when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
+    StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
+    BiFunction<String, String, TestMessageEnvelope> mockMsgBuilder = mock(BiFunction.class);
+
+    MessageStream<TestMessageEnvelope> inputStream = graph.getInputStream("test-stream-1", mockMsgBuilder);
+
+    InputOperatorSpec<String, String, TestMessageEnvelope> inputOpSpec =
+        (InputOperatorSpec) ((MessageStreamImpl<TestMessageEnvelope>) inputStream).getOperatorSpec();
+    assertEquals(OpCode.INPUT, inputOpSpec.getOpCode());
+    assertEquals(graph.getInputOperators().get(mockStreamSpec), inputOpSpec);
+    assertEquals(mockMsgBuilder, inputOpSpec.getMsgBuilder());
+    assertEquals(mockStreamSpec, inputOpSpec.getStreamSpec());
+  }
+
+  @Test
+  public void testMultipleGetInputStreams() {
+    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+    StreamSpec mockStreamSpec1 = mock(StreamSpec.class);
+    StreamSpec mockStreamSpec2 = mock(StreamSpec.class);
+    when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec1);
+    when(mockRunner.getStreamSpec("test-stream-2")).thenReturn(mockStreamSpec2);
+
+    StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
+    MessageStream<Object> inputStream1 = graph.getInputStream("test-stream-1", mock(BiFunction.class));
+    MessageStream<Object> inputStream2 = graph.getInputStream("test-stream-2", mock(BiFunction.class));
+
+    InputOperatorSpec<String, String, TestMessageEnvelope> inputOpSpec1 =
+        (InputOperatorSpec) ((MessageStreamImpl<Object>) inputStream1).getOperatorSpec();
+    InputOperatorSpec<String, String, TestMessageEnvelope> inputOpSpec2 =
+        (InputOperatorSpec) ((MessageStreamImpl<Object>) inputStream2).getOperatorSpec();
+
+    assertEquals(graph.getInputOperators().size(), 2);
+    assertEquals(graph.getInputOperators().get(mockStreamSpec1), inputOpSpec1);
+    assertEquals(graph.getInputOperators().get(mockStreamSpec2), inputOpSpec2);
   }
 
   @Test(expected = IllegalStateException.class)
-  public void testMultipleGetInputStream() {
+  public void testGetSameInputStreamTwice() {
     ApplicationRunner mockRunner = mock(ApplicationRunner.class);
-    Config mockConfig = mock(Config.class);
+    when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mock(StreamSpec.class));
 
-    StreamSpec testStreamSpec1 = new StreamSpec("test-stream-1", "physical-stream-1", "test-system");
-    StreamSpec testStreamSpec2 = new StreamSpec("test-stream-2", "physical-stream-2", "test-system");
-    StreamSpec nonExistentStreamSpec = new StreamSpec("non-existent-stream", "physical-stream-1", "test-system");
+    StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
+    graph.getInputStream("test-stream-1", mock(BiFunction.class));
+    graph.getInputStream("test-stream-1", mock(BiFunction.class)); // should throw exception
+  }
 
-    when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(testStreamSpec1);
-    when(mockRunner.getStreamSpec("test-stream-2")).thenReturn(testStreamSpec2);
+  @Test
+  public void testGetOutputStream() {
+    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+    StreamSpec mockStreamSpec = mock(StreamSpec.class);
+    when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
 
-    StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig);
-    BiFunction<String, MessageType, TestInputMessageEnvelope> xMsgBuilder =
-        (k, v) -> new TestInputMessageEnvelope(k, v.getValue(), v.getEventTime(), "input-id-1");
 
-    //create 2 streams for the corresponding streamIds
-    MessageStream<TestInputMessageEnvelope> inputStream1 = graph.getInputStream("test-stream-1", xMsgBuilder);
-    MessageStream<TestInputMessageEnvelope> inputStream2 = graph.getInputStream("test-stream-2", xMsgBuilder);
+    StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
+    Function<TestMessageEnvelope, String> mockKeyExtractor = mock(Function.class);
+    Function<TestMessageEnvelope, String> mockMsgExtractor = mock(Function.class);
 
-    //assert that the streamGraph contains only the above 2 streams
-    assertEquals(graph.getInputStreams().get(testStreamSpec1), inputStream1);
-    assertEquals(graph.getInputStreams().get(testStreamSpec2), inputStream2);
-    assertEquals(graph.getInputStreams().get(nonExistentStreamSpec), null);
-    assertEquals(graph.getInputStreams().size(), 2);
+    OutputStream<String, String, TestMessageEnvelope> outputStream =
+        graph.getOutputStream("test-stream-1", mockKeyExtractor, mockMsgExtractor);
 
-    //should throw IllegalStateException
-    graph.getInputStream("test-stream-1", xMsgBuilder);
+    OutputStreamImpl<String, String, TestMessageEnvelope> outputOpSpec = (OutputStreamImpl) outputStream;
+    assertEquals(graph.getOutputStreams().get(mockStreamSpec), outputOpSpec);
+    assertEquals(mockKeyExtractor, outputOpSpec.getKeyExtractor());
+    assertEquals(mockMsgExtractor, outputOpSpec.getMsgExtractor());
+    assertEquals(mockStreamSpec, outputOpSpec.getStreamSpec());
   }
 
-
-  @Test
-  public void testGetOutputStream() {
+  @Test(expected = IllegalStateException.class)
+  public void testGetSameOutputStreamTwice() {
     ApplicationRunner mockRunner = mock(ApplicationRunner.class);
-    Config mockConfig = mock(Config.class);
-    StreamSpec testStreamSpec = new StreamSpec("test-stream-1", "physical-stream-1", "test-system");
-    when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(testStreamSpec);
-
-    class MyMessageType extends MessageType {
-      public final String outputId;
+    when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mock(StreamSpec.class));
 
-      public MyMessageType(String value, long eventTime, String outputId) {
-        super(value, eventTime);
-        this.outputId = outputId;
-      }
-    }
-
-    StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig);
-    Function<TestMessageEnvelope, String> xKeyExtractor = x -> x.getKey();
-    Function<TestMessageEnvelope, MyMessageType> xMsgExtractor =
-        x -> new MyMessageType(x.getMessage().getValue(), x.getMessage().getEventTime(), "test-output-id-1");
-
-    OutputStream<String, MyMessageType, TestInputMessageEnvelope> mOutputStream =
-        graph.getOutputStream("test-stream-1", xKeyExtractor, xMsgExtractor);
-    assertEquals(graph.getOutputStreams().get(testStreamSpec), mOutputStream);
-    assertTrue(mOutputStream instanceof OutputStreamInternalImpl);
-    assertEquals(((OutputStreamInternalImpl) mOutputStream).getKeyExtractor(), xKeyExtractor);
-    assertEquals(((OutputStreamInternalImpl) mOutputStream).getMsgExtractor(), xMsgExtractor);
-
-    TestInputMessageEnvelope xInputMsg = new TestInputMessageEnvelope("test-key-1", "test-msg-1", 33333L, "input-id-1");
-    assertEquals(((OutputStreamInternalImpl<String, MyMessageType, TestInputMessageEnvelope>) mOutputStream).
-        getKeyExtractor().apply(xInputMsg), "test-key-1");
-    assertEquals(((OutputStreamInternalImpl<String, MyMessageType, TestInputMessageEnvelope>) mOutputStream).
-        getMsgExtractor().apply(xInputMsg).getValue(), "test-msg-1");
-    assertEquals(((OutputStreamInternalImpl<String, MyMessageType, TestInputMessageEnvelope>) mOutputStream).
-        getMsgExtractor().apply(xInputMsg).getEventTime(), 33333L);
-    assertEquals(((OutputStreamInternalImpl<String, MyMessageType, TestInputMessageEnvelope>) mOutputStream).
-        getMsgExtractor().apply(xInputMsg).outputId, "test-output-id-1");
+    StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
+    graph.getOutputStream("test-stream-1", mock(Function.class), mock(Function.class));
+    graph.getOutputStream("test-stream-1", mock(Function.class), mock(Function.class)); // should throw exception
   }
 
   @Test
   public void testGetIntermediateStream() {
     ApplicationRunner mockRunner = mock(ApplicationRunner.class);
     Config mockConfig = mock(Config.class);
-    StreamSpec testStreamSpec = new StreamSpec("myJob-i001-test-stream-1", "physical-stream-1", "test-system");
-    when(mockRunner.getStreamSpec("myJob-i001-test-stream-1")).thenReturn(testStreamSpec);
+    StreamSpec mockStreamSpec = mock(StreamSpec.class);
     when(mockConfig.get(JobConfig.JOB_NAME())).thenReturn("myJob");
     when(mockConfig.get(JobConfig.JOB_ID(), "1")).thenReturn("i001");
+    when(mockRunner.getStreamSpec("myJob-i001-test-stream-1")).thenReturn(mockStreamSpec);
 
-    class MyMessageType extends MessageType {
-      public final String outputId;
+    StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig);
+    Function<TestMessageEnvelope, String> mockKeyExtractor = mock(Function.class);
+    Function<TestMessageEnvelope, String> mockMsgExtractor = mock(Function.class);
+    BiFunction<String, String, TestMessageEnvelope> mockMsgBuilder = mock(BiFunction.class);
+
+    IntermediateMessageStreamImpl<?, ?, TestMessageEnvelope> intermediateStreamImpl =
+        graph.getIntermediateStream("test-stream-1", mockKeyExtractor, mockMsgExtractor, mockMsgBuilder);
+
+    assertEquals(graph.getInputOperators().get(mockStreamSpec), intermediateStreamImpl.getOperatorSpec());
+    assertEquals(graph.getOutputStreams().get(mockStreamSpec), intermediateStreamImpl.getOutputStream());
+    assertEquals(mockStreamSpec, intermediateStreamImpl.getStreamSpec());
+    assertEquals(mockKeyExtractor, intermediateStreamImpl.getOutputStream().getKeyExtractor());
+    assertEquals(mockMsgExtractor, intermediateStreamImpl.getOutputStream().getMsgExtractor());
+    assertEquals(mockMsgBuilder, ((InputOperatorSpec) intermediateStreamImpl.getOperatorSpec()).getMsgBuilder());
+  }
 
-      public MyMessageType(String value, long eventTime, String outputId) {
-        super(value, eventTime);
-        this.outputId = outputId;
-      }
-    }
+  @Test(expected = IllegalStateException.class)
+  public void testGetSameIntermediateStreamTwice() {
+    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+    when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mock(StreamSpec.class));
 
-    StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig);
-    Function<TestMessageEnvelope, String> xKeyExtractor = x -> x.getKey();
-    Function<TestMessageEnvelope, MyMessageType> xMsgExtractor =
-        x -> new MyMessageType(x.getMessage().getValue(), x.getMessage().getEventTime(), "test-output-id-1");
-    BiFunction<String, MessageType, TestInputMessageEnvelope> xMsgBuilder =
-        (k, v) -> new TestInputMessageEnvelope(k, v.getValue(), v.getEventTime(), "input-id-1");
-
-    MessageStream<TestMessageEnvelope> mIntermediateStream =
-        graph.getIntermediateStream("test-stream-1", xKeyExtractor, xMsgExtractor, xMsgBuilder);
-    assertEquals(graph.getOutputStreams().get(testStreamSpec), mIntermediateStream);
-    assertTrue(mIntermediateStream instanceof IntermediateStreamInternalImpl);
-    assertEquals(((IntermediateStreamInternalImpl) mIntermediateStream).getKeyExtractor(), xKeyExtractor);
-    assertEquals(((IntermediateStreamInternalImpl) mIntermediateStream).getMsgExtractor(), xMsgExtractor);
-    assertEquals(((IntermediateStreamInternalImpl) mIntermediateStream).getMsgBuilder(), xMsgBuilder);
-
-    TestMessageEnvelope xInputMsg = new TestMessageEnvelope("test-key-1", "test-msg-1", 33333L);
-    assertEquals(((IntermediateStreamInternalImpl<String, MessageType, TestMessageEnvelope>) mIntermediateStream).
-        getKeyExtractor().apply(xInputMsg), "test-key-1");
-    assertEquals(((IntermediateStreamInternalImpl<String, MessageType, TestMessageEnvelope>) mIntermediateStream).
-        getMsgExtractor().apply(xInputMsg).getValue(), "test-msg-1");
-    assertEquals(((IntermediateStreamInternalImpl<String, MessageType, TestMessageEnvelope>) mIntermediateStream).
-        getMsgExtractor().apply(xInputMsg).getEventTime(), 33333L);
-    assertEquals(((IntermediateStreamInternalImpl<String, MessageType, TestMessageEnvelope>) mIntermediateStream).
-        getMsgBuilder().apply("test-key-1", new MyMessageType("test-msg-1", 33333L, "test-output-id-1")).getKey(), "test-key-1");
-    assertEquals(((IntermediateStreamInternalImpl<String, MessageType, TestMessageEnvelope>) mIntermediateStream).
-        getMsgBuilder().apply("test-key-1", new MyMessageType("test-msg-1", 33333L, "test-output-id-1")).getMessage().getValue(), "test-msg-1");
-    assertEquals(((IntermediateStreamInternalImpl<String, MessageType, TestMessageEnvelope>) mIntermediateStream).
-        getMsgBuilder().apply("test-key-1", new MyMessageType("test-msg-1", 33333L, "test-output-id-1")).getMessage().getEventTime(), 33333L);
+    StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
+    graph.getIntermediateStream("test-stream-1", mock(Function.class), mock(Function.class), mock(BiFunction.class));
+    graph.getIntermediateStream("test-stream-1", mock(Function.class), mock(Function.class), mock(BiFunction.class));
   }
 
   @Test
-  public void testGetNextOpId() {
+  public void testGetNextOpIdIncrementsId() {
     ApplicationRunner mockRunner = mock(ApplicationRunner.class);
-    Config mockConfig = mock(Config.class);
-
-    StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig);
+    StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
     assertEquals(graph.getNextOpId(), 0);
     assertEquals(graph.getNextOpId(), 1);
   }
@@ -216,10 +203,10 @@ public class TestStreamGraphImpl {
     graph.getInputStream("test-stream-2", (k, v) -> v);
     graph.getInputStream("test-stream-3", (k, v) -> v);
 
-    ArrayList<InputStreamInternal> inputMessageStreams = new ArrayList<>(graph.getInputStreams().values());
-    Assert.assertEquals(inputMessageStreams.size(), 3);
-    Assert.assertEquals(inputMessageStreams.get(0).getStreamSpec(), testStreamSpec1);
-    Assert.assertEquals(inputMessageStreams.get(1).getStreamSpec(), testStreamSpec2);
-    Assert.assertEquals(inputMessageStreams.get(2).getStreamSpec(), testStreamSpec3);
+    List<InputOperatorSpec> inputSpecs = new ArrayList<>(graph.getInputOperators().values());
+    Assert.assertEquals(inputSpecs.size(), 3);
+    Assert.assertEquals(inputSpecs.get(0).getStreamSpec(), testStreamSpec1);
+    Assert.assertEquals(inputSpecs.get(1).getStreamSpec(), testStreamSpec2);
+    Assert.assertEquals(inputSpecs.get(2).getStreamSpec(), testStreamSpec3);
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/c1c4289c/samza-core/src/test/java/org/apache/samza/operators/data/MessageType.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/data/MessageType.java b/samza-core/src/test/java/org/apache/samza/operators/data/MessageType.java
deleted file mode 100644
index 3fd015b..0000000
--- a/samza-core/src/test/java/org/apache/samza/operators/data/MessageType.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.data;
-
-public class MessageType {
-  private final String value;
-  private final long eventTime;
-
-  public MessageType(String value, long eventTime) {
-    this.value = value;
-    this.eventTime = eventTime;
-  }
-
-  public long getEventTime() {
-    return eventTime;
-  }
-
-  public String getValue() {
-    return value;
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/c1c4289c/samza-core/src/test/java/org/apache/samza/operators/data/TestExtOutputMessageEnvelope.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/data/TestExtOutputMessageEnvelope.java b/samza-core/src/test/java/org/apache/samza/operators/data/TestExtOutputMessageEnvelope.java
deleted file mode 100644
index 22222ed..0000000
--- a/samza-core/src/test/java/org/apache/samza/operators/data/TestExtOutputMessageEnvelope.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.data;
-
-public class TestExtOutputMessageEnvelope extends TestOutputMessageEnvelope {
-  private final String outputId;
-
-  public TestExtOutputMessageEnvelope(String key, Integer value, String outputId) {
-    super(key, value);
-    this.outputId = outputId;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/c1c4289c/samza-core/src/test/java/org/apache/samza/operators/data/TestInputMessageEnvelope.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/data/TestInputMessageEnvelope.java b/samza-core/src/test/java/org/apache/samza/operators/data/TestInputMessageEnvelope.java
deleted file mode 100644
index 089f534..0000000
--- a/samza-core/src/test/java/org/apache/samza/operators/data/TestInputMessageEnvelope.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.data;
-
-public class TestInputMessageEnvelope extends TestMessageEnvelope {
-  private final String inputId;
-
-  public TestInputMessageEnvelope(String key, String value, long eventTime, String inputId) {
-    super(key, value, eventTime);
-    this.inputId = inputId;
-  }
-
-  public String getInputId() {
-    return this.inputId;
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/c1c4289c/samza-core/src/test/java/org/apache/samza/operators/data/TestMessageEnvelope.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/data/TestMessageEnvelope.java b/samza-core/src/test/java/org/apache/samza/operators/data/TestMessageEnvelope.java
index 05a63cd..68305cc 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/data/TestMessageEnvelope.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/data/TestMessageEnvelope.java
@@ -20,21 +20,19 @@ package org.apache.samza.operators.data;
 
 
 public class TestMessageEnvelope {
-
   private final String key;
-  private final MessageType value;
+  private final Object value;
 
-  public TestMessageEnvelope(String key, String value, long eventTime) {
+  public TestMessageEnvelope(String key, Object value) {
     this.key = key;
-    this.value = new MessageType(value, eventTime);
+    this.value = value;
   }
 
-  public MessageType getMessage() {
+  public Object getMessage() {
     return this.value;
   }
 
   public String getKey() {
     return this.key;
   }
-
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/c1c4289c/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
index 73d851b..d50d271 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
@@ -23,7 +23,6 @@ import org.apache.samza.metrics.Counter;
 import org.apache.samza.metrics.MetricsRegistry;
 import org.apache.samza.metrics.MetricsRegistryMap;
 import org.apache.samza.metrics.Timer;
-import org.apache.samza.operators.MessageStreamImpl;
 import org.apache.samza.operators.spec.OperatorSpec;
 import org.apache.samza.task.MessageCollector;
 import org.apache.samza.task.TaskContext;
@@ -177,9 +176,11 @@ public class TestOperatorImpl {
 
   private static class TestOpImpl extends OperatorImpl<Object, Object> {
     private final Object mockOutput;
+    private final TestOpSpec testOpSpec;
 
     TestOpImpl(Object mockOutput) {
       this.mockOutput = mockOutput;
+      this.testOpSpec = new TestOpSpec();
     }
 
     @Override
@@ -199,31 +200,14 @@ public class TestOperatorImpl {
     @Override
     protected void handleClose() {}
 
-    @Override
-    protected OperatorSpec<Object> getOperatorSpec() {
-      return new TestOpSpec();
+    protected OperatorSpec<Object, Object> getOperatorSpec() {
+      return testOpSpec;
     }
   }
 
-  private static class TestOpSpec implements OperatorSpec<Object> {
-    @Override
-    public MessageStreamImpl<Object> getNextStream() {
-      return null;
-    }
-
-    @Override
-    public OpCode getOpCode() {
-      return OpCode.INPUT;
-    }
-
-    @Override
-    public int getOpId() {
-      return -1;
-    }
-
-    @Override
-    public String getSourceLocation() {
-      return "";
+  private static class TestOpSpec extends OperatorSpec<Object, Object> {
+    TestOpSpec() {
+     super(OpCode.INPUT, 1);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/c1c4289c/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
index 67e5b46..b2c7722 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
@@ -19,86 +19,211 @@
 
 package org.apache.samza.operators.impl;
 
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.samza.config.Config;
 import org.apache.samza.metrics.MetricsRegistryMap;
 import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.OutputStream;
 import org.apache.samza.operators.StreamGraphImpl;
+import org.apache.samza.operators.functions.FilterFunction;
+import org.apache.samza.operators.functions.JoinFunction;
 import org.apache.samza.operators.functions.MapFunction;
+import org.apache.samza.operators.spec.OperatorSpec.OpCode;
 import org.apache.samza.runtime.ApplicationRunner;
 import org.apache.samza.system.StreamSpec;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.task.MessageCollector;
 import org.apache.samza.task.TaskContext;
+import org.apache.samza.task.TaskCoordinator;
+import org.apache.samza.util.Clock;
 import org.apache.samza.util.SystemClock;
 import org.junit.Test;
 
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.List;
-
-import static junit.framework.Assert.assertEquals;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 public class TestOperatorImplGraph {
 
+  public void testEmptyChain() {
+    StreamGraphImpl streamGraph = new StreamGraphImpl(mock(ApplicationRunner.class), mock(Config.class));
+    OperatorImplGraph opGraph =
+        new OperatorImplGraph(streamGraph, mock(Config.class), mock(TaskContext.class), mock(Clock.class));
+    assertEquals(0, opGraph.getAllInputOperators().size());
+  }
+
   @Test
-  public void testOperatorGraphInitAndClose() {
+  public void testLinearChain() {
+    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+    when(mockRunner.getStreamSpec(eq("input"))).thenReturn(new StreamSpec("input", "input-stream", "input-system"));
+    when(mockRunner.getStreamSpec(eq("output"))).thenReturn(mock(StreamSpec.class));
+    StreamGraphImpl streamGraph = new StreamGraphImpl(mockRunner, mock(Config.class));
+
+    MessageStream<Object> inputStream = streamGraph.getInputStream("input", mock(BiFunction.class));
+    OutputStream<Object, Object, Object> outputStream =
+        streamGraph.getOutputStream("output", mock(Function.class), mock(Function.class));
+
+    inputStream
+        .filter(mock(FilterFunction.class))
+        .map(mock(MapFunction.class))
+        .sendTo(outputStream);
+
+    TaskContext mockTaskContext = mock(TaskContext.class);
+    when(mockTaskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap());
+    OperatorImplGraph opImplGraph =
+        new OperatorImplGraph(streamGraph, mock(Config.class), mockTaskContext, mock(Clock.class));
+
+    InputOperatorImpl inputOpImpl = opImplGraph.getInputOperator(new SystemStream("input-system", "input-stream"));
+    assertEquals(1, inputOpImpl.registeredOperators.size());
+
+    OperatorImpl filterOpImpl = (StreamOperatorImpl) inputOpImpl.registeredOperators.iterator().next();
+    assertEquals(1, filterOpImpl.registeredOperators.size());
+    assertEquals(OpCode.FILTER, filterOpImpl.getOperatorSpec().getOpCode());
+
+    OperatorImpl mapOpImpl = (StreamOperatorImpl) filterOpImpl.registeredOperators.iterator().next();
+    assertEquals(1, mapOpImpl.registeredOperators.size());
+    assertEquals(OpCode.MAP, mapOpImpl.getOperatorSpec().getOpCode());
+
+    OperatorImpl sendToOpImpl = (OutputOperatorImpl) mapOpImpl.registeredOperators.iterator().next();
+    assertEquals(0, sendToOpImpl.registeredOperators.size());
+    assertEquals(OpCode.SEND_TO, sendToOpImpl.getOperatorSpec().getOpCode());
+  }
+
+
+  @Test
+  public void testBroadcastChain() {
+    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+    when(mockRunner.getStreamSpec(eq("input"))).thenReturn(new StreamSpec("input", "input-stream", "input-system"));
+    StreamGraphImpl streamGraph = new StreamGraphImpl(mockRunner, mock(Config.class));
+
+    MessageStream<Object> inputStream = streamGraph.getInputStream("input", mock(BiFunction.class));
+    inputStream.filter(mock(FilterFunction.class));
+    inputStream.map(mock(MapFunction.class));
+
+    TaskContext mockTaskContext = mock(TaskContext.class);
+    when(mockTaskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap());
+    OperatorImplGraph opImplGraph =
+        new OperatorImplGraph(streamGraph, mock(Config.class), mockTaskContext, mock(Clock.class));
+
+    InputOperatorImpl inputOpImpl = opImplGraph.getInputOperator(new SystemStream("input-system", "input-stream"));
+    assertEquals(2, inputOpImpl.registeredOperators.size());
+    assertTrue(inputOpImpl.registeredOperators.stream().anyMatch(opImpl ->
+        ((OperatorImpl) opImpl).getOperatorSpec().getOpCode() == OpCode.FILTER));
+    assertTrue(inputOpImpl.registeredOperators.stream().anyMatch(opImpl ->
+        ((OperatorImpl) opImpl).getOperatorSpec().getOpCode() == OpCode.MAP));
+  }
+
+  @Test
+  public void testJoinChain() {
     ApplicationRunner mockRunner = mock(ApplicationRunner.class);
-    StreamSpec testStreamSpec1 = new StreamSpec("test-stream-1", "physical-stream-1", "test-system");
-    when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(testStreamSpec1);
-    StreamSpec testStreamSpec2 = new StreamSpec("test-stream-2", "physical-stream-2", "test-system");
-    when(mockRunner.getStreamSpec("test-stream-2")).thenReturn(testStreamSpec2);
+    when(mockRunner.getStreamSpec(eq("input1"))).thenReturn(new StreamSpec("input1", "input-stream1", "input-system"));
+    when(mockRunner.getStreamSpec(eq("input2"))).thenReturn(new StreamSpec("input2", "input-stream2", "input-system"));
+    StreamGraphImpl streamGraph = new StreamGraphImpl(mockRunner, mock(Config.class));
+
+    JoinFunction mockJoinFunction = mock(JoinFunction.class);
+    MessageStream<Object> inputStream1 = streamGraph.getInputStream("input1", (k, v) -> v);
+    MessageStream<Object> inputStream2 = streamGraph.getInputStream("input2", (k, v) -> v);
+    inputStream1.join(inputStream2, mockJoinFunction, Duration.ofHours(1));
+
+    TaskContext mockTaskContext = mock(TaskContext.class);
+    when(mockTaskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap());
+    OperatorImplGraph opImplGraph =
+        new OperatorImplGraph(streamGraph, mock(Config.class), mockTaskContext, mock(Clock.class));
+
+    // verify that join function is initialized once.
+    verify(mockJoinFunction, times(1)).init(any(Config.class), any(TaskContext.class));
+
+    InputOperatorImpl inputOpImpl1 = opImplGraph.getInputOperator(new SystemStream("input-system", "input-stream1"));
+    InputOperatorImpl inputOpImpl2 = opImplGraph.getInputOperator(new SystemStream("input-system", "input-stream2"));
+    PartialJoinOperatorImpl leftPartialJoinOpImpl =
+        (PartialJoinOperatorImpl) inputOpImpl1.registeredOperators.iterator().next();
+    PartialJoinOperatorImpl rightPartialJoinOpImpl =
+        (PartialJoinOperatorImpl) inputOpImpl2.registeredOperators.iterator().next();
+
+    assertEquals(leftPartialJoinOpImpl.getOperatorSpec(), rightPartialJoinOpImpl.getOperatorSpec());
+    assertNotSame(leftPartialJoinOpImpl, rightPartialJoinOpImpl);
+
+    Object joinKey = new Object();
+    // verify that left partial join operator calls getFirstKey
+    Object mockLeftMessage = mock(Object.class);
+    when(mockJoinFunction.getFirstKey(eq(mockLeftMessage))).thenReturn(joinKey);
+    inputOpImpl1.onMessage(Pair.of("", mockLeftMessage), mock(MessageCollector.class), mock(TaskCoordinator.class));
+    verify(mockJoinFunction, times(1)).getFirstKey(mockLeftMessage);
+
+    // verify that right partial join operator calls getSecondKey
+    Object mockRightMessage = mock(Object.class);
+    when(mockJoinFunction.getSecondKey(eq(mockRightMessage))).thenReturn(joinKey);
+    inputOpImpl2.onMessage(Pair.of("", mockRightMessage), mock(MessageCollector.class), mock(TaskCoordinator.class));
+    verify(mockJoinFunction, times(1)).getSecondKey(mockRightMessage);
+
+    // verify that the join function apply is called with the correct messages on match
+    verify(mockJoinFunction, times(1)).apply(mockLeftMessage, mockRightMessage);
+  }
 
+  @Test
+  public void testOperatorGraphInitAndClose() {
+    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+    when(mockRunner.getStreamSpec("input1")).thenReturn(new StreamSpec("input1", "input-stream1", "input-system"));
+    when(mockRunner.getStreamSpec("input2")).thenReturn(new StreamSpec("input2", "input-stream2", "input-system"));
     Config mockConfig = mock(Config.class);
-    TaskContext mockContext = createMockContext();
-    StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig);
+    TaskContext mockContext = mock(TaskContext.class);
+    when(mockContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap());
+    StreamGraphImpl streamGraph = new StreamGraphImpl(mockRunner, mockConfig);
 
-    List<String> initializationOrder = new ArrayList<>();
-    List<String> finalizationOrder = new ArrayList<>();
+    MessageStream<Object> inputStream1 = streamGraph.getInputStream("input1", (k, v) -> v);
+    MessageStream<Object> inputStream2 = streamGraph.getInputStream("input2", (k, v) -> v);
 
-    MessageStream<Object> inputStream1 = graph.getInputStream("test-stream-1", (k, v) -> v);
-    MessageStream<Object> inputStream2 = graph.getInputStream("test-stream-2", (k, v) -> v);
+    List<String> initializedOperators = new ArrayList<>();
+    List<String> closedOperators = new ArrayList<>();
 
-    inputStream1.map(createMapFunction("1", initializationOrder, finalizationOrder))
-               .map(createMapFunction("2", initializationOrder, finalizationOrder));
+    inputStream1.map(createMapFunction("1", initializedOperators, closedOperators))
+        .map(createMapFunction("2", initializedOperators, closedOperators));
 
-    inputStream2.map(createMapFunction("3", initializationOrder, finalizationOrder))
-        .map(createMapFunction("4", initializationOrder, finalizationOrder));
+    inputStream2.map(createMapFunction("3", initializedOperators, closedOperators))
+        .map(createMapFunction("4", initializedOperators, closedOperators));
 
-    OperatorImplGraph implGraph = new OperatorImplGraph(SystemClock.instance());
+    OperatorImplGraph opImplGraph = new OperatorImplGraph(streamGraph, mockConfig, mockContext, SystemClock.instance());
 
     // Assert that initialization occurs in topological order.
-    implGraph.init(graph, mockConfig, mockContext);
-    assertEquals(initializationOrder.get(0), "1");
-    assertEquals(initializationOrder.get(1), "2");
-    assertEquals(initializationOrder.get(2), "3");
-    assertEquals(initializationOrder.get(3), "4");
+    assertEquals(initializedOperators.get(0), "1");
+    assertEquals(initializedOperators.get(1), "2");
+    assertEquals(initializedOperators.get(2), "3");
+    assertEquals(initializedOperators.get(3), "4");
 
     // Assert that finalization occurs in reverse topological order.
-    implGraph.close();
-    assertEquals(finalizationOrder.get(0), "4");
-    assertEquals(finalizationOrder.get(1), "3");
-    assertEquals(finalizationOrder.get(2), "2");
-    assertEquals(finalizationOrder.get(3), "1");
-  }
-
-  private TaskContext createMockContext() {
-    TaskContext mockContext = mock(TaskContext.class);
-    when(mockContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap());
-    return mockContext;
+    opImplGraph.close();
+    assertEquals(closedOperators.get(0), "4");
+    assertEquals(closedOperators.get(1), "3");
+    assertEquals(closedOperators.get(2), "2");
+    assertEquals(closedOperators.get(3), "1");
   }
 
   /**
    * Creates an identity map function that appends to the provided lists when init/close is invoked.
    */
-  private MapFunction<Object, Object> createMapFunction(String id, List<String> initializationOrder, List<String> finalizationOrder) {
+  private MapFunction<Object, Object> createMapFunction(String id,
+      List<String> initializedOperators, List<String> finalizedOperators) {
     return new MapFunction<Object, Object>() {
       @Override
       public void init(Config config, TaskContext context) {
-        initializationOrder.add(id);
+        initializedOperators.add(id);
       }
 
       @Override
       public void close() {
-        finalizationOrder.add(id);
+        finalizedOperators.add(id);
       }
 
       @Override
@@ -108,4 +233,3 @@ public class TestOperatorImplGraph {
     };
   }
 }
-

http://git-wip-us.apache.org/repos/asf/samza/blob/c1c4289c/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java
deleted file mode 100644
index a75fadb..0000000
--- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java
+++ /dev/null
@@ -1,236 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.impl;
-
-import org.apache.samza.config.Config;
-import org.apache.samza.metrics.MetricsRegistryMap;
-import org.apache.samza.operators.MessageStreamImpl;
-import org.apache.samza.operators.StreamGraphImpl;
-import org.apache.samza.operators.TestMessageStreamImplUtil;
-import org.apache.samza.operators.data.TestMessageEnvelope;
-import org.apache.samza.operators.data.TestOutputMessageEnvelope;
-import org.apache.samza.operators.functions.FlatMapFunction;
-import org.apache.samza.operators.functions.JoinFunction;
-import org.apache.samza.operators.functions.SinkFunction;
-import org.apache.samza.operators.spec.OperatorSpec;
-import org.apache.samza.operators.spec.PartialJoinOperatorSpec;
-import org.apache.samza.operators.spec.SinkOperatorSpec;
-import org.apache.samza.operators.spec.StreamOperatorSpec;
-import org.apache.samza.operators.spec.WindowOperatorSpec;
-import org.apache.samza.operators.windows.Windows;
-import org.apache.samza.operators.windows.internal.WindowInternal;
-import org.apache.samza.operators.windows.internal.WindowType;
-import org.apache.samza.task.TaskContext;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.lang.reflect.Field;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.Set;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotSame;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-
-public class TestOperatorImpls {
-  Field nextOperatorsField = null;
-  Method createOpMethod = null;
-  Method createOpsMethod = null;
-
-  @Before
-  public void prep() throws NoSuchFieldException, NoSuchMethodException {
-    nextOperatorsField = OperatorImpl.class.getDeclaredField("registeredOperators");
-    nextOperatorsField.setAccessible(true);
-
-    createOpMethod = OperatorImplGraph.class.getDeclaredMethod("createOperatorImpl",
-        OperatorSpec.class, Config.class, TaskContext.class);
-    createOpMethod.setAccessible(true);
-
-    createOpsMethod = OperatorImplGraph.class.getDeclaredMethod("createOperatorImpls", MessageStreamImpl.class,
-        Config.class, TaskContext.class);
-    createOpsMethod.setAccessible(true);
-  }
-
-  @Test
-  public void testCreateOperator() throws NoSuchFieldException, IllegalAccessException, InvocationTargetException {
-    // get window operator
-    WindowOperatorSpec mockWnd = mock(WindowOperatorSpec.class);
-    WindowInternal<TestMessageEnvelope, String, Integer> windowInternal = new WindowInternal<>(null, null, null, null, null, WindowType.TUMBLING);
-    when(mockWnd.getWindow()).thenReturn(windowInternal);
-    Config mockConfig = mock(Config.class);
-    TaskContext mockContext = mock(TaskContext.class);
-
-    OperatorImplGraph opGraph = new OperatorImplGraph();
-    OperatorImpl<TestMessageEnvelope, ?> opImpl = (OperatorImpl<TestMessageEnvelope, ?>)
-        createOpMethod.invoke(opGraph, mockWnd, mockConfig, mockContext);
-    assertTrue(opImpl instanceof WindowOperatorImpl);
-    Field wndInternalField = WindowOperatorImpl.class.getDeclaredField("window");
-    wndInternalField.setAccessible(true);
-    WindowInternal wndInternal = (WindowInternal) wndInternalField.get(opImpl);
-    assertEquals(wndInternal, windowInternal);
-
-    // get simple operator
-    StreamOperatorSpec<TestMessageEnvelope, TestOutputMessageEnvelope> mockSimpleOp = mock(StreamOperatorSpec.class);
-    FlatMapFunction<TestMessageEnvelope, TestOutputMessageEnvelope> mockTxfmFn = mock(FlatMapFunction.class);
-    when(mockSimpleOp.getTransformFn()).thenReturn(mockTxfmFn);
-    opImpl = (OperatorImpl<TestMessageEnvelope, ?>) createOpMethod.invoke(opGraph, mockSimpleOp, mockConfig, mockContext);
-    assertTrue(opImpl instanceof StreamOperatorImpl);
-    Field txfmFnField = StreamOperatorImpl.class.getDeclaredField("transformFn");
-    txfmFnField.setAccessible(true);
-    assertEquals(mockTxfmFn, txfmFnField.get(opImpl));
-
-    // get sink operator
-    SinkFunction<TestMessageEnvelope> sinkFn = (m, mc, tc) -> { };
-    SinkOperatorSpec<TestMessageEnvelope> sinkOp = mock(SinkOperatorSpec.class);
-    when(sinkOp.getSinkFn()).thenReturn(sinkFn);
-    opImpl = (OperatorImpl<TestMessageEnvelope, ?>) createOpMethod.invoke(opGraph, sinkOp, mockConfig, mockContext);
-    assertTrue(opImpl instanceof SinkOperatorImpl);
-    Field sinkFnField = SinkOperatorImpl.class.getDeclaredField("sinkFn");
-    sinkFnField.setAccessible(true);
-    assertEquals(sinkFn, sinkFnField.get(opImpl));
-
-    // get join operator
-    PartialJoinOperatorSpec<String, TestMessageEnvelope, TestMessageEnvelope, TestOutputMessageEnvelope> joinOp = mock(PartialJoinOperatorSpec.class);
-    opImpl = (OperatorImpl<TestMessageEnvelope, ?>) createOpMethod.invoke(opGraph, joinOp, mockConfig, mockContext);
-    assertTrue(opImpl instanceof PartialJoinOperatorImpl);
-  }
-
-  @Test
-  public void testEmptyChain() throws InvocationTargetException, IllegalAccessException {
-    // test creation of empty chain
-    MessageStreamImpl<TestMessageEnvelope> testStream = mock(MessageStreamImpl.class);
-    TaskContext mockContext = mock(TaskContext.class);
-    when(mockContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap());
-    Config mockConfig = mock(Config.class);
-    OperatorImplGraph opGraph = new OperatorImplGraph();
-    RootOperatorImpl operatorChain = (RootOperatorImpl) createOpsMethod.invoke(opGraph, testStream, mockConfig, mockContext);
-    assertTrue(operatorChain != null);
-  }
-
-  @Test
-  public void testLinearChain() throws IllegalAccessException, InvocationTargetException {
-    // test creation of linear chain
-    StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
-    MessageStreamImpl<TestMessageEnvelope> testInput = TestMessageStreamImplUtil.getMessageStreamImpl(mockGraph);
-    TaskContext mockContext = mock(TaskContext.class);
-    when(mockContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap());
-    Config mockConfig = mock(Config.class);
-    testInput.map(m -> m).window(Windows.keyedSessionWindow(TestMessageEnvelope::getKey, Duration.ofMinutes(10)));
-    OperatorImplGraph opGraph = new OperatorImplGraph();
-    RootOperatorImpl operatorChain = (RootOperatorImpl) createOpsMethod.invoke(opGraph, testInput, mockConfig, mockContext);
-    Set<OperatorImpl> subsSet = (Set<OperatorImpl>) nextOperatorsField.get(operatorChain);
-    assertEquals(subsSet.size(), 1);
-    OperatorImpl<TestMessageEnvelope, TestMessageEnvelope> firstOpImpl = subsSet.iterator().next();
-    Set<OperatorImpl> subsOps = (Set<OperatorImpl>) nextOperatorsField.get(firstOpImpl);
-    assertEquals(subsOps.size(), 1);
-    OperatorImpl wndOpImpl = subsOps.iterator().next();
-    subsOps = (Set<OperatorImpl>) nextOperatorsField.get(wndOpImpl);
-    assertEquals(subsOps.size(), 0);
-  }
-
-  @Test
-  public void testBroadcastChain() throws IllegalAccessException, InvocationTargetException {
-    // test creation of broadcast chain
-    StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
-    MessageStreamImpl<TestMessageEnvelope> testInput = TestMessageStreamImplUtil.getMessageStreamImpl(mockGraph);
-    TaskContext mockContext = mock(TaskContext.class);
-    when(mockContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap());
-    Config mockConfig = mock(Config.class);
-    testInput.filter(m -> m.getMessage().getEventTime() > 123456L).flatMap(m -> new ArrayList() { { this.add(m); this.add(m); } });
-    testInput.filter(m -> m.getMessage().getEventTime() < 123456L).map(m -> m);
-    OperatorImplGraph opGraph = new OperatorImplGraph();
-    RootOperatorImpl operatorChain = (RootOperatorImpl) createOpsMethod.invoke(opGraph, testInput, mockConfig, mockContext);
-    Set<OperatorImpl> subsSet = (Set<OperatorImpl>) nextOperatorsField.get(operatorChain);
-    assertEquals(subsSet.size(), 2);
-    Iterator<OperatorImpl> iter = subsSet.iterator();
-    // check the first branch w/ flatMap
-    OperatorImpl<TestMessageEnvelope, TestMessageEnvelope> opImpl = iter.next();
-    Set<OperatorImpl> subsOps = (Set<OperatorImpl>) nextOperatorsField.get(opImpl);
-    assertEquals(subsOps.size(), 1);
-    OperatorImpl flatMapImpl = subsOps.iterator().next();
-    subsOps = (Set<OperatorImpl>) nextOperatorsField.get(flatMapImpl);
-    assertEquals(subsOps.size(), 0);
-    // check the second branch w/ map
-    opImpl = iter.next();
-    subsOps = (Set<OperatorImpl>) nextOperatorsField.get(opImpl);
-    assertEquals(subsOps.size(), 1);
-    OperatorImpl mapImpl = subsOps.iterator().next();
-    subsOps = (Set<OperatorImpl>) nextOperatorsField.get(mapImpl);
-    assertEquals(subsOps.size(), 0);
-  }
-
-  @Test
-  public void testJoinChain() throws IllegalAccessException, InvocationTargetException {
-    // test creation of join chain
-    StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
-    MessageStreamImpl<TestMessageEnvelope> input1 = TestMessageStreamImplUtil.getMessageStreamImpl(mockGraph);
-    MessageStreamImpl<TestMessageEnvelope> input2 = TestMessageStreamImplUtil.getMessageStreamImpl(mockGraph);
-    TaskContext mockContext = mock(TaskContext.class);
-    when(mockContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap());
-    Config mockConfig = mock(Config.class);
-    input1
-        .join(input2,
-            new JoinFunction<String, TestMessageEnvelope, TestMessageEnvelope, TestOutputMessageEnvelope>() {
-              @Override
-              public TestOutputMessageEnvelope apply(TestMessageEnvelope m1, TestMessageEnvelope m2) {
-                return new TestOutputMessageEnvelope(m1.getKey(), m1.getMessage().getValue().length() + m2.getMessage().getValue().length());
-              }
-
-              @Override
-              public String getFirstKey(TestMessageEnvelope message) {
-                return message.getKey();
-              }
-
-              @Override
-              public String getSecondKey(TestMessageEnvelope message) {
-                return message.getKey();
-              }
-            }, Duration.ofMinutes(1))
-        .map(m -> m);
-    OperatorImplGraph opGraph = new OperatorImplGraph();
-    // now, we create chained operators from each input sources
-    RootOperatorImpl chain1 = (RootOperatorImpl) createOpsMethod.invoke(opGraph, input1, mockConfig, mockContext);
-    RootOperatorImpl chain2 = (RootOperatorImpl) createOpsMethod.invoke(opGraph, input2, mockConfig, mockContext);
-    // check that those two chains will merge at map operator
-    // first branch of the join
-    Set<OperatorImpl> subsSet = (Set<OperatorImpl>) nextOperatorsField.get(chain1);
-    assertEquals(subsSet.size(), 1);
-    OperatorImpl<TestMessageEnvelope, TestOutputMessageEnvelope> joinOp1 = subsSet.iterator().next();
-    Set<OperatorImpl> subsOps = (Set<OperatorImpl>) nextOperatorsField.get(joinOp1);
-    assertEquals(subsOps.size(), 1);
-    // the map operator consumes the common join output, where two branches merge
-    OperatorImpl mapImpl = subsOps.iterator().next();
-    // second branch of the join
-    subsSet = (Set<OperatorImpl>) nextOperatorsField.get(chain2);
-    assertEquals(subsSet.size(), 1);
-    OperatorImpl<TestMessageEnvelope, TestOutputMessageEnvelope> joinOp2 = subsSet.iterator().next();
-    assertNotSame(joinOp1, joinOp2);
-    subsOps = (Set<OperatorImpl>) nextOperatorsField.get(joinOp2);
-    assertEquals(subsOps.size(), 1);
-    // make sure that the map operator is the same
-    assertEquals(mapImpl, subsOps.iterator().next());
-  }
-}