You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2017/06/07 19:42:19 UTC

[1/4] samza git commit: SAMZA-1221, SAMZA-1101: Internal cleanup for High-Level API implementation.

Repository: samza
Updated Branches:
  refs/heads/master 29cf374c5 -> c1c4289c8


http://git-wip-us.apache.org/repos/asf/samza/blob/c1c4289c/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java
index a5d9539..e183d87 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java
@@ -28,12 +28,10 @@ import org.apache.samza.task.TaskContext;
 import org.apache.samza.task.TaskCoordinator;
 import org.junit.Test;
 
-import java.util.ArrayList;
 import java.util.Collection;
 
 import static org.junit.Assert.assertEquals;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -50,12 +48,9 @@ public class TestStreamOperatorImpl {
     Config mockConfig = mock(Config.class);
     TaskContext mockContext = mock(TaskContext.class);
     StreamOperatorImpl<TestMessageEnvelope, TestOutputMessageEnvelope> opImpl =
-        spy(new StreamOperatorImpl<>(mockOp, mockConfig, mockContext));
+        new StreamOperatorImpl<>(mockOp, mockConfig, mockContext);
     TestMessageEnvelope inMsg = mock(TestMessageEnvelope.class);
-    TestOutputMessageEnvelope outMsg = mock(TestOutputMessageEnvelope.class);
-    Collection<TestOutputMessageEnvelope> mockOutputs = new ArrayList() { {
-        this.add(outMsg);
-      } };
+    Collection<TestOutputMessageEnvelope> mockOutputs = mock(Collection.class);
     when(txfmFn.apply(inMsg)).thenReturn(mockOutputs);
     MessageCollector mockCollector = mock(MessageCollector.class);
     TaskCoordinator mockCoordinator = mock(TaskCoordinator.class);
@@ -74,7 +69,7 @@ public class TestStreamOperatorImpl {
     TaskContext mockContext = mock(TaskContext.class);
 
     StreamOperatorImpl<TestMessageEnvelope, TestOutputMessageEnvelope> opImpl =
-        spy(new StreamOperatorImpl<>(mockOp, mockConfig, mockContext));
+        new StreamOperatorImpl<>(mockOp, mockConfig, mockContext);
 
     // ensure that close is not called yet
     verify(txfmFn, times(0)).close();

http://git-wip-us.apache.org/repos/asf/samza/blob/c1c4289c/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpecs.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpecs.java b/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpecs.java
deleted file mode 100644
index cccafaf..0000000
--- a/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpecs.java
+++ /dev/null
@@ -1,192 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.spec;
-
-import org.apache.samza.operators.MessageStreamImpl;
-import org.apache.samza.operators.StreamGraphImpl;
-import org.apache.samza.operators.TestMessageStreamImplUtil;
-import org.apache.samza.operators.data.MessageType;
-import org.apache.samza.operators.data.TestInputMessageEnvelope;
-import org.apache.samza.operators.data.TestMessageEnvelope;
-import org.apache.samza.operators.data.TestOutputMessageEnvelope;
-import org.apache.samza.operators.functions.FlatMapFunction;
-import org.apache.samza.operators.functions.FoldLeftFunction;
-import org.apache.samza.operators.functions.PartialJoinFunction;
-import org.apache.samza.operators.functions.SinkFunction;
-import org.apache.samza.operators.stream.OutputStreamInternalImpl;
-import org.apache.samza.operators.windows.WindowPane;
-import org.apache.samza.operators.windows.internal.WindowInternal;
-import org.apache.samza.operators.windows.internal.WindowType;
-import org.apache.samza.system.OutgoingMessageEnvelope;
-import org.apache.samza.system.SystemStream;
-import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.TaskCoordinator;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.function.Function;
-import java.util.function.Supplier;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-
-public class TestOperatorSpecs {
-  @Test
-  public void testCreateStreamOperator() {
-    FlatMapFunction<Object, TestMessageEnvelope> transformFn = m -> new ArrayList<TestMessageEnvelope>() { {
-        this.add(new TestMessageEnvelope(m.toString(), m.toString(), 12345L));
-      } };
-    MessageStreamImpl<TestMessageEnvelope> mockOutput = mock(MessageStreamImpl.class);
-    StreamOperatorSpec<Object, TestMessageEnvelope> streamOp =
-        OperatorSpecs.createStreamOperatorSpec(transformFn, mockOutput, 1);
-    assertEquals(streamOp.getTransformFn(), transformFn);
-
-    Object mockInput = mock(Object.class);
-    when(mockInput.toString()).thenReturn("test-string-1");
-    List<TestMessageEnvelope> outputs = (List<TestMessageEnvelope>) streamOp.getTransformFn().apply(mockInput);
-    assertEquals(outputs.size(), 1);
-    assertEquals(outputs.get(0).getKey(), "test-string-1");
-    assertEquals(outputs.get(0).getMessage().getValue(), "test-string-1");
-    assertEquals(outputs.get(0).getMessage().getEventTime(), 12345L);
-    assertEquals(streamOp.getNextStream(), mockOutput);
-  }
-
-  @Test
-  public void testCreateSinkOperator() {
-    SystemStream testStream = new SystemStream("test-sys", "test-stream");
-    SinkFunction<TestMessageEnvelope> sinkFn = (TestMessageEnvelope message, MessageCollector messageCollector,
-        TaskCoordinator taskCoordinator) -> {
-      messageCollector.send(new OutgoingMessageEnvelope(testStream, message.getKey(), message.getMessage()));
-    };
-    SinkOperatorSpec<TestMessageEnvelope> sinkOp = OperatorSpecs.createSinkOperatorSpec(sinkFn, 1);
-    assertEquals(sinkOp.getSinkFn(), sinkFn);
-
-    TestMessageEnvelope mockInput = mock(TestMessageEnvelope.class);
-    when(mockInput.getKey()).thenReturn("my-test-msg-key");
-    MessageType mockMsgBody = mock(MessageType.class);
-    when(mockInput.getMessage()).thenReturn(mockMsgBody);
-    final List<OutgoingMessageEnvelope> outputMsgs = new ArrayList<>();
-    MessageCollector mockCollector = mock(MessageCollector.class);
-    doAnswer(invocation -> {
-        outputMsgs.add((OutgoingMessageEnvelope) invocation.getArguments()[0]);
-        return null;
-      }).when(mockCollector).send(any());
-    sinkOp.getSinkFn().apply(mockInput, mockCollector, null);
-    assertEquals(1, outputMsgs.size());
-    assertEquals(outputMsgs.get(0).getKey(), "my-test-msg-key");
-    assertEquals(outputMsgs.get(0).getMessage(), mockMsgBody);
-    assertEquals(sinkOp.getOpCode(), OperatorSpec.OpCode.SINK);
-    assertEquals(sinkOp.getNextStream(), null);
-  }
-
-  @Test
-  public void testCreateSendToOperator() {
-    OutputStreamInternalImpl<Object, Object, TestMessageEnvelope> mockOutput = mock(OutputStreamInternalImpl.class);
-    SinkOperatorSpec<TestMessageEnvelope> sinkOp = OperatorSpecs.createSendToOperatorSpec(mockOutput, 1);
-    assertNotNull(sinkOp.getSinkFn());
-    assertEquals(sinkOp.getOpCode(), OperatorSpec.OpCode.SEND_TO);
-    assertEquals(sinkOp.getNextStream(), null);
-  }
-
-
-  @Test
-  public void testCreatePartitionByOperator() {
-    OutputStreamInternalImpl<Object, Object, TestMessageEnvelope> mockOutput = mock(OutputStreamInternalImpl.class);
-    SinkOperatorSpec<TestMessageEnvelope> sinkOp = OperatorSpecs.createPartitionByOperatorSpec(mockOutput, 1);
-    assertNotNull(sinkOp.getSinkFn());
-    assertEquals(sinkOp.getOpCode(), OperatorSpec.OpCode.PARTITION_BY);
-    assertEquals(sinkOp.getNextStream(), null);
-  }
-
-  @Test
-  public void testCreateWindowOperator() throws Exception {
-    Function<TestMessageEnvelope, String> keyExtractor = m -> "globalkey";
-    FoldLeftFunction<TestMessageEnvelope, Integer> aggregator = (m, c) -> c + 1;
-    Supplier<Integer> initialValue = () -> 0;
-    //instantiate a window using reflection
-    WindowInternal window = new WindowInternal(null, initialValue, aggregator, keyExtractor, null, WindowType.TUMBLING);
-
-    MessageStreamImpl<WindowPane<String, Integer>> mockWndOut = mock(MessageStreamImpl.class);
-    WindowOperatorSpec spec =
-        OperatorSpecs.<TestMessageEnvelope, String, Integer>createWindowOperatorSpec(window, mockWndOut, 1);
-    assertEquals(spec.getWindow(), window);
-    assertEquals(spec.getWindow().getKeyExtractor(), keyExtractor);
-    assertEquals(spec.getWindow().getFoldLeftFunction(), aggregator);
-  }
-
-  @Test
-  public void testCreateWindowOperatorWithRelaxedTypes() throws Exception {
-    Function<TestMessageEnvelope, String> keyExtractor = m -> m.getKey();
-    FoldLeftFunction<TestMessageEnvelope, Integer> aggregator = (m, c) -> c + 1;
-    Supplier<Integer> initialValue = () -> 0;
-    //instantiate a window using reflection
-    WindowInternal<TestInputMessageEnvelope, String, Integer> window = new WindowInternal(null, initialValue, aggregator, keyExtractor, null, WindowType.TUMBLING);
-
-    MessageStreamImpl<WindowPane<String, Integer>> mockWndOut = mock(MessageStreamImpl.class);
-    WindowOperatorSpec spec =
-        OperatorSpecs.createWindowOperatorSpec(window, mockWndOut, 1);
-    assertEquals(spec.getWindow(), window);
-    assertEquals(spec.getWindow().getKeyExtractor(), keyExtractor);
-    assertEquals(spec.getWindow().getFoldLeftFunction(), aggregator);
-
-    // make sure that the functions with relaxed types work as expected
-    TestInputMessageEnvelope inputMsg = new TestInputMessageEnvelope("test-input-key1", "test-value-1", 23456L, "input-id-1");
-    assertEquals("test-input-key1", spec.getWindow().getKeyExtractor().apply(inputMsg));
-    assertEquals(1, spec.getWindow().getFoldLeftFunction().apply(inputMsg, 0));
-  }
-
-  @Test
-  public void testCreatePartialJoinOperator() {
-    PartialJoinFunction<String, TestMessageEnvelope, TestMessageEnvelope, TestOutputMessageEnvelope> thisPartialJoinFn
-        = mock(PartialJoinFunction.class);
-    PartialJoinFunction<String, TestMessageEnvelope, TestMessageEnvelope, TestOutputMessageEnvelope> otherPartialJoinFn
-        = mock(PartialJoinFunction.class);
-    StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
-    MessageStreamImpl<TestOutputMessageEnvelope> joinOutput = TestMessageStreamImplUtil.getMessageStreamImpl(mockGraph);
-
-    PartialJoinOperatorSpec<String, TestMessageEnvelope, TestMessageEnvelope, TestOutputMessageEnvelope> partialJoinSpec
-        = OperatorSpecs.createPartialJoinOperatorSpec(thisPartialJoinFn, otherPartialJoinFn, 1000 * 60, joinOutput, 1);
-
-    assertEquals(partialJoinSpec.getNextStream(), joinOutput);
-    assertEquals(partialJoinSpec.getThisPartialJoinFn(), thisPartialJoinFn);
-    assertEquals(partialJoinSpec.getOtherPartialJoinFn(), otherPartialJoinFn);
-  }
-
-  @Test
-  public void testCreateMergeOperator() {
-    StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
-    MessageStreamImpl<TestMessageEnvelope> output = TestMessageStreamImplUtil.<TestMessageEnvelope>getMessageStreamImpl(mockGraph);
-    StreamOperatorSpec<TestMessageEnvelope, TestMessageEnvelope> mergeOp =
-        OperatorSpecs.createMergeOperatorSpec(output, 1);
-    Function<TestMessageEnvelope, Collection<TestMessageEnvelope>> mergeFn =
-        t -> new ArrayList<TestMessageEnvelope>() { {
-            this.add(t);
-          } };
-    TestMessageEnvelope t = mock(TestMessageEnvelope.class);
-    assertEquals(mergeOp.getTransformFn().apply(t), mergeFn.apply(t));
-    assertEquals(mergeOp.getNextStream(), output);
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/c1c4289c/samza-core/src/test/java/org/apache/samza/operators/spec/TestWindowOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/spec/TestWindowOperatorSpec.java b/samza-core/src/test/java/org/apache/samza/operators/spec/TestWindowOperatorSpec.java
index affe37f..12a32b1 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/spec/TestWindowOperatorSpec.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/spec/TestWindowOperatorSpec.java
@@ -19,7 +19,6 @@
 
 package org.apache.samza.operators.spec;
 
-import org.apache.samza.operators.MessageStreamImpl;
 import org.apache.samza.operators.triggers.Trigger;
 import org.apache.samza.operators.triggers.Triggers;
 import org.apache.samza.operators.windows.internal.WindowInternal;
@@ -38,14 +37,15 @@ public class TestWindowOperatorSpec {
         Triggers.any(Triggers.count(23),
             Triggers.timeSinceFirstMessage(Duration.ofMillis(15)),
             Triggers.any(Triggers.any(Triggers.count(6),
-                Triggers.timeSinceFirstMessage(Duration.ofMillis(15)), Triggers.timeSinceFirstMessage(Duration.ofMillis(25)),
+                Triggers.timeSinceFirstMessage(Duration.ofMillis(15)),
+                Triggers.timeSinceFirstMessage(Duration.ofMillis(25)),
                 Triggers.timeSinceLastMessage(Duration.ofMillis(15))))));
 
     WindowInternal window = new WindowInternal(defaultTrigger, null, null, null, null, WindowType.SESSION);
     window.setEarlyTrigger(earlyTrigger);
     window.setLateTrigger(lateTrigger);
 
-    WindowOperatorSpec spec = new WindowOperatorSpec(window, new MessageStreamImpl(null), 0);
+    WindowOperatorSpec spec = new WindowOperatorSpec(window, 0);
     Assert.assertEquals(spec.getDefaultTriggerMs(), 5);
   }
 
@@ -57,7 +57,7 @@ public class TestWindowOperatorSpec {
     WindowInternal window = new WindowInternal(defaultTrigger, null, null, null, null, WindowType.SESSION);
     window.setEarlyTrigger(earlyTrigger);
 
-    WindowOperatorSpec spec = new WindowOperatorSpec(window, new MessageStreamImpl(null), 0);
+    WindowOperatorSpec spec = new WindowOperatorSpec(window, 0);
     Assert.assertEquals(spec.getDefaultTriggerMs(), 150);
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/c1c4289c/samza-shell/src/main/visualizer/js/planToDagre.js
----------------------------------------------------------------------
diff --git a/samza-shell/src/main/visualizer/js/planToDagre.js b/samza-shell/src/main/visualizer/js/planToDagre.js
index 8ba198e..0421c33 100644
--- a/samza-shell/src/main/visualizer/js/planToDagre.js
+++ b/samza-shell/src/main/visualizer/js/planToDagre.js
@@ -42,15 +42,11 @@ function planToDagre(data) {
 
   var jobs = data.jobs;
   for (var i = 0; i < jobs.length; i++) {
-    var canonicalOpIds = jobs[i].operatorGraph.canonicalOpIds;
     var operators = jobs[i].operatorGraph.operators;
     for (var opId in operators) {
       var operator = operators[opId];
       var labelVal = "<div><h3 class=\"topbar\">" + operator.opCode + "</h3><ul class=\"detailBox\">";
       var opId = operator.opId;
-      if (!(opId in canonicalOpIds)) {
-        canonicalOpIds[opId] = opId.toString();
-      }
       labelVal +=  "<li>ID: " + opId + "</li>";
       labelVal +=  "<li>@" + operator.sourceLocation + "</li>";
 
@@ -62,7 +58,7 @@ function planToDagre(data) {
       }
 
       labelVal += "</ul></div>";
-      g.setNode(canonicalOpIds[opId],  { label: labelVal, labelType: "html", rx: 5, ry: 5 });
+      g.setNode(opId,  { label: labelVal, labelType: "html", rx: 5, ry: 5 });
     }
   }
 
@@ -71,7 +67,7 @@ function planToDagre(data) {
     for (var k = 0; k < inputs.length; k++) {
       var input = inputs[k];
       for (var m = 0; m < input.nextOperatorIds.length; m++) {
-        g.setEdge(input.streamId, canonicalOpIds[input.nextOperatorIds[m]]);
+        g.setEdge(input.streamId, input.nextOperatorIds[m]);
       }
     }
 
@@ -79,10 +75,10 @@ function planToDagre(data) {
     for (var opId in operators) {
       var operator = operators[opId];
       for (var j = 0; j < operator.nextOperatorIds.length; j++) {
-        g.setEdge(canonicalOpIds[opId], canonicalOpIds[operator.nextOperatorIds[j]]);
+        g.setEdge(opId, operator.nextOperatorIds[j]);
       }
       if (typeof(operator.outputStreamId) !== 'undefined') {
-        g.setEdge(canonicalOpIds[opId], operator.outputStreamId);
+        g.setEdge(opId, operator.outputStreamId);
       }
     }
   }


[2/4] samza git commit: SAMZA-1221, SAMZA-1101: Internal cleanup for High-Level API implementation.

Posted by xi...@apache.org.
http://git-wip-us.apache.org/repos/asf/samza/blob/c1c4289c/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java b/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
index b2a5e2a..61224f2 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
@@ -19,302 +19,343 @@
 package org.apache.samza.operators;
 
 import com.google.common.collect.ImmutableList;
-import org.apache.samza.config.Config;
-import org.apache.samza.config.JobConfig;
-import org.apache.samza.config.MapConfig;
-import org.apache.samza.operators.data.MessageType;
-import org.apache.samza.operators.data.TestExtOutputMessageEnvelope;
-import org.apache.samza.operators.data.TestInputMessageEnvelope;
 import org.apache.samza.operators.data.TestMessageEnvelope;
 import org.apache.samza.operators.data.TestOutputMessageEnvelope;
 import org.apache.samza.operators.functions.FilterFunction;
 import org.apache.samza.operators.functions.FlatMapFunction;
+import org.apache.samza.operators.functions.FoldLeftFunction;
 import org.apache.samza.operators.functions.JoinFunction;
 import org.apache.samza.operators.functions.MapFunction;
 import org.apache.samza.operators.functions.SinkFunction;
+import org.apache.samza.operators.spec.JoinOperatorSpec;
 import org.apache.samza.operators.spec.OperatorSpec;
+import org.apache.samza.operators.spec.OperatorSpec.OpCode;
+import org.apache.samza.operators.spec.OutputOperatorSpec;
+import org.apache.samza.operators.spec.OutputStreamImpl;
 import org.apache.samza.operators.spec.SinkOperatorSpec;
-import org.apache.samza.operators.spec.PartialJoinOperatorSpec;
 import org.apache.samza.operators.spec.StreamOperatorSpec;
-import org.apache.samza.runtime.ApplicationRunner;
-import org.apache.samza.system.OutgoingMessageEnvelope;
-import org.apache.samza.system.SystemStream;
-import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.TaskContext;
-import org.apache.samza.task.TaskCoordinator;
+import org.apache.samza.operators.spec.WindowOperatorSpec;
+import org.apache.samza.operators.stream.IntermediateMessageStreamImpl;
+import org.apache.samza.operators.windows.Window;
+import org.apache.samza.operators.windows.WindowPane;
+import org.apache.samza.operators.windows.Windows;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 
 import java.time.Duration;
-import java.util.ArrayList;
 import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.Collections;
+import java.util.function.BiFunction;
 import java.util.function.Function;
+import java.util.function.Supplier;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
-
+@SuppressWarnings("unchecked")
 public class TestMessageStreamImpl {
 
-  private StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
-
   @Test
   public void testMap() {
-    MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph);
-    MapFunction<TestMessageEnvelope, TestOutputMessageEnvelope> xMap = (TestMessageEnvelope m)  ->
-        new TestOutputMessageEnvelope(m.getKey(), m.getMessage().getValue().length() + 1);
-    MessageStream<TestOutputMessageEnvelope> outputStream = inputStream.map(xMap);
-    Collection<OperatorSpec> subs = inputStream.getRegisteredOperatorSpecs();
-    assertEquals(subs.size(), 1);
-    OperatorSpec<TestOutputMessageEnvelope> mapOp = subs.iterator().next();
-    assertTrue(mapOp instanceof StreamOperatorSpec);
-    assertEquals(mapOp.getNextStream(), outputStream);
-    // assert that the transformation function is what we defined above
-    TestMessageEnvelope xTestMsg = mock(TestMessageEnvelope.class);
-    MessageType mockInnerTestMessage = mock(MessageType.class);
-    when(xTestMsg.getKey()).thenReturn("test-msg-key");
-    when(xTestMsg.getMessage()).thenReturn(mockInnerTestMessage);
-    when(mockInnerTestMessage.getValue()).thenReturn("123456789");
-
-    Collection<TestOutputMessageEnvelope> cOutputMsg = ((StreamOperatorSpec<TestMessageEnvelope, TestOutputMessageEnvelope>) mapOp).getTransformFn().apply(xTestMsg);
-    assertEquals(cOutputMsg.size(), 1);
-    TestOutputMessageEnvelope outputMessage = cOutputMsg.iterator().next();
-    assertEquals(outputMessage.getKey(), xTestMsg.getKey());
-    assertEquals(outputMessage.getMessage(), Integer.valueOf(xTestMsg.getMessage().getValue().length() + 1));
+    StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
+    OperatorSpec mockOpSpec = mock(OperatorSpec.class);
+    MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph, mockOpSpec);
+
+    MapFunction<TestMessageEnvelope, TestOutputMessageEnvelope> mockMapFn = mock(MapFunction.class);
+    inputStream.map(mockMapFn);
+
+    ArgumentCaptor<OperatorSpec> registeredOpCaptor = ArgumentCaptor.forClass(OperatorSpec.class);
+    verify(mockOpSpec).registerNextOperatorSpec(registeredOpCaptor.capture());
+    OperatorSpec<?, TestMessageEnvelope> registeredOpSpec = registeredOpCaptor.getValue();
+    assertTrue(registeredOpSpec instanceof StreamOperatorSpec);
+
+    FlatMapFunction transformFn = ((StreamOperatorSpec) registeredOpSpec).getTransformFn();
+    assertNotNull(transformFn);
+    assertEquals(OpCode.MAP, registeredOpSpec.getOpCode());
+
+    TestOutputMessageEnvelope mockOutput = mock(TestOutputMessageEnvelope.class);
+    when(mockMapFn.apply(anyObject())).thenReturn(mockOutput);
+    assertTrue(transformFn.apply(new Object()).contains(mockOutput));
+    when(mockMapFn.apply(anyObject())).thenReturn(null);
+    assertTrue(transformFn.apply(null).isEmpty());
   }
 
   @Test
   public void testFlatMap() {
-    MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph);
-    List<TestOutputMessageEnvelope> flatOuts = new ArrayList<TestOutputMessageEnvelope>() { {
-        this.add(mock(TestOutputMessageEnvelope.class));
-        this.add(mock(TestOutputMessageEnvelope.class));
-        this.add(mock(TestOutputMessageEnvelope.class));
-      } };
-    final List<TestMessageEnvelope> inputMsgs = new ArrayList<>();
-    FlatMapFunction<TestMessageEnvelope, TestOutputMessageEnvelope> xFlatMap = (TestMessageEnvelope message) -> {
-      inputMsgs.add(message);
-      return flatOuts;
-    };
-    MessageStream<TestOutputMessageEnvelope> outputStream = inputStream.flatMap(xFlatMap);
-    Collection<OperatorSpec> subs = inputStream.getRegisteredOperatorSpecs();
-    assertEquals(subs.size(), 1);
-    OperatorSpec<TestOutputMessageEnvelope> flatMapOp = subs.iterator().next();
-    assertTrue(flatMapOp instanceof StreamOperatorSpec);
-    assertEquals(flatMapOp.getNextStream(), outputStream);
-    assertEquals(((StreamOperatorSpec) flatMapOp).getTransformFn(), xFlatMap);
-
-    TestMessageEnvelope mockInput  = mock(TestMessageEnvelope.class);
-    // assert that the transformation function is what we defined above
-    List<TestOutputMessageEnvelope> result = (List<TestOutputMessageEnvelope>)
-        ((StreamOperatorSpec<TestMessageEnvelope, TestOutputMessageEnvelope>) flatMapOp).getTransformFn().apply(mockInput);
-    assertEquals(flatOuts, result);
-    assertEquals(inputMsgs.size(), 1);
-    assertEquals(inputMsgs.get(0), mockInput);
+    StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
+    OperatorSpec mockOpSpec = mock(OperatorSpec.class);
+    MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph, mockOpSpec);
+
+    inputStream.flatMap(mock(FlatMapFunction.class));
+
+    ArgumentCaptor<OperatorSpec> registeredOpCaptor = ArgumentCaptor.forClass(OperatorSpec.class);
+    verify(mockOpSpec).registerNextOperatorSpec(registeredOpCaptor.capture());
+    OperatorSpec<?, TestMessageEnvelope> registeredOpSpec = registeredOpCaptor.getValue();
+
+    assertTrue(registeredOpSpec instanceof StreamOperatorSpec);
+    assertNotNull(((StreamOperatorSpec) registeredOpSpec).getTransformFn());
+    assertEquals(OpCode.FLAT_MAP, registeredOpSpec.getOpCode());
   }
 
   @Test
   public void testFlatMapWithRelaxedTypes() {
-    MessageStreamImpl<TestInputMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph);
-    List<TestExtOutputMessageEnvelope> flatOuts = new ArrayList<TestExtOutputMessageEnvelope>() { {
-        this.add(new TestExtOutputMessageEnvelope("output-key-1", 1, "output-id-001"));
-        this.add(new TestExtOutputMessageEnvelope("output-key-2", 2, "output-id-002"));
-        this.add(new TestExtOutputMessageEnvelope("output-key-3", 3, "output-id-003"));
-      } };
-
-    class MyFlatMapFunction implements FlatMapFunction<TestMessageEnvelope, TestExtOutputMessageEnvelope> {
-      public final List<TestMessageEnvelope> inputMsgs = new ArrayList<>();
-
-      @Override
-      public Collection<TestExtOutputMessageEnvelope> apply(TestMessageEnvelope message) {
-        inputMsgs.add(message);
-        return flatOuts;
-      }
-
-      @Override
-      public void init(Config config, TaskContext context) {
-        inputMsgs.clear();
-      }
-    }
-
-    MyFlatMapFunction xFlatMap = new MyFlatMapFunction();
-
-    MessageStream<TestOutputMessageEnvelope> outputStream = inputStream.flatMap(xFlatMap);
-    Collection<OperatorSpec> subs = inputStream.getRegisteredOperatorSpecs();
-    assertEquals(subs.size(), 1);
-    OperatorSpec<TestOutputMessageEnvelope> flatMapOp = subs.iterator().next();
-    assertTrue(flatMapOp instanceof StreamOperatorSpec);
-    assertEquals(flatMapOp.getNextStream(), outputStream);
-    assertEquals(((StreamOperatorSpec) flatMapOp).getTransformFn(), xFlatMap);
-
-    TestMessageEnvelope mockInput  = mock(TestMessageEnvelope.class);
-    // assert that the transformation function is what we defined above
-    List<TestOutputMessageEnvelope> result = (List<TestOutputMessageEnvelope>)
-        ((StreamOperatorSpec<TestMessageEnvelope, TestOutputMessageEnvelope>) flatMapOp).getTransformFn().apply(mockInput);
-    assertEquals(flatOuts, result);
-    assertEquals(xFlatMap.inputMsgs.size(), 1);
-    assertEquals(xFlatMap.inputMsgs.get(0), mockInput);
+    StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
+    OperatorSpec mockOpSpec = mock(OperatorSpec.class);
+    MessageStreamImpl<TestInputMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph, mockOpSpec);
+
+    FlatMapFunction flatMapFunction =
+        (FlatMapFunction<TestMessageEnvelope, TestOutputMessageEnvelope>) message -> Collections.emptyList();
+    // should compile since TestInputMessageEnvelope extends TestMessageEnvelope
+    inputStream.flatMap(flatMapFunction);
+
+    ArgumentCaptor<OperatorSpec> registeredOpCaptor = ArgumentCaptor.forClass(OperatorSpec.class);
+    verify(mockOpSpec).registerNextOperatorSpec(registeredOpCaptor.capture());
+    OperatorSpec<?, TestMessageEnvelope> registeredOpSpec = registeredOpCaptor.getValue();
+
+    assertTrue(registeredOpSpec instanceof StreamOperatorSpec);
+    assertNotNull(((StreamOperatorSpec) registeredOpSpec).getTransformFn());
+    assertEquals(OpCode.FLAT_MAP, registeredOpSpec.getOpCode());
   }
 
   @Test
   public void testFilter() {
-    MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph);
-    FilterFunction<TestMessageEnvelope> xFilter = (TestMessageEnvelope m) -> m.getMessage().getEventTime() > 123456L;
-    MessageStream<TestMessageEnvelope> outputStream = inputStream.filter(xFilter);
-    Collection<OperatorSpec> subs = inputStream.getRegisteredOperatorSpecs();
-    assertEquals(subs.size(), 1);
-    OperatorSpec<TestMessageEnvelope> filterOp = subs.iterator().next();
-    assertTrue(filterOp instanceof StreamOperatorSpec);
-    assertEquals(filterOp.getNextStream(), outputStream);
-    // assert that the transformation function is what we defined above
-    FlatMapFunction<TestMessageEnvelope, TestMessageEnvelope> txfmFn = ((StreamOperatorSpec<TestMessageEnvelope, TestMessageEnvelope>) filterOp).getTransformFn();
-    TestMessageEnvelope mockMsg = mock(TestMessageEnvelope.class);
-    MessageType mockInnerTestMessage = mock(MessageType.class);
-    when(mockMsg.getMessage()).thenReturn(mockInnerTestMessage);
-    when(mockInnerTestMessage.getEventTime()).thenReturn(11111L);
-    Collection<TestMessageEnvelope> output = txfmFn.apply(mockMsg);
-    assertTrue(output.isEmpty());
-    when(mockMsg.getMessage()).thenReturn(mockInnerTestMessage);
-    when(mockInnerTestMessage.getEventTime()).thenReturn(999999L);
-    output = txfmFn.apply(mockMsg);
-    assertEquals(output.size(), 1);
-    assertEquals(output.iterator().next(), mockMsg);
+    StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
+    OperatorSpec mockOpSpec = mock(OperatorSpec.class);
+    MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph, mockOpSpec);
+
+    FilterFunction<Object> mockFilterFn = mock(FilterFunction.class);
+    inputStream.filter(mockFilterFn);
+
+    ArgumentCaptor<OperatorSpec> registeredOpCaptor = ArgumentCaptor.forClass(OperatorSpec.class);
+    verify(mockOpSpec).registerNextOperatorSpec(registeredOpCaptor.capture());
+    OperatorSpec<?, TestMessageEnvelope> registeredOpSpec = registeredOpCaptor.getValue();
+    assertTrue(registeredOpSpec instanceof StreamOperatorSpec);
+
+    FlatMapFunction transformFn = ((StreamOperatorSpec) registeredOpSpec).getTransformFn();
+    assertNotNull(transformFn);
+    assertEquals(OpCode.FILTER, registeredOpSpec.getOpCode());
+
+    Object mockInput = new Object();
+    when(mockFilterFn.apply(anyObject())).thenReturn(true);
+    assertTrue(transformFn.apply(mockInput).contains(mockInput));
+    when(mockFilterFn.apply(anyObject())).thenReturn(false);
+    assertTrue(transformFn.apply(mockInput).isEmpty());
   }
 
   @Test
   public void testSink() {
-    MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph);
-    SystemStream testStream = new SystemStream("test-sys", "test-stream");
-    SinkFunction<TestMessageEnvelope> xSink = (TestMessageEnvelope m, MessageCollector mc, TaskCoordinator tc) -> {
-      mc.send(new OutgoingMessageEnvelope(testStream, m.getMessage()));
-      tc.commit(TaskCoordinator.RequestScope.CURRENT_TASK);
-    };
-    inputStream.sink(xSink);
-    Collection<OperatorSpec> subs = inputStream.getRegisteredOperatorSpecs();
-    assertEquals(subs.size(), 1);
-    OperatorSpec<TestMessageEnvelope> sinkOp = subs.iterator().next();
-    assertTrue(sinkOp instanceof SinkOperatorSpec);
-    assertEquals(((SinkOperatorSpec) sinkOp).getSinkFn(), xSink);
-
-    TestMessageEnvelope mockTest1 = mock(TestMessageEnvelope.class);
-    MessageType mockMsgBody = mock(MessageType.class);
-    when(mockTest1.getMessage()).thenReturn(mockMsgBody);
-    final List<OutgoingMessageEnvelope> outMsgs = new ArrayList<>();
-    MessageCollector mockCollector = mock(MessageCollector.class);
-    doAnswer(invocation -> {
-        outMsgs.add((OutgoingMessageEnvelope) invocation.getArguments()[0]);
-        return null;
-      }).when(mockCollector).send(any());
-    TaskCoordinator mockCoordinator = mock(TaskCoordinator.class);
-    ((SinkOperatorSpec) sinkOp).getSinkFn().apply(mockTest1, mockCollector, mockCoordinator);
-    assertEquals(1, outMsgs.size());
-    assertEquals(testStream, outMsgs.get(0).getSystemStream());
-    assertEquals(mockMsgBody, outMsgs.get(0).getMessage());
+    StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
+    OperatorSpec mockOpSpec = mock(OperatorSpec.class);
+    MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph, mockOpSpec);
+
+    inputStream.sink(mock(SinkFunction.class));
+
+    ArgumentCaptor<OperatorSpec> registeredOpCaptor = ArgumentCaptor.forClass(OperatorSpec.class);
+    verify(mockOpSpec).registerNextOperatorSpec(registeredOpCaptor.capture());
+    OperatorSpec<?, TestMessageEnvelope> registeredOpSpec = registeredOpCaptor.getValue();
+
+    assertTrue(registeredOpSpec instanceof SinkOperatorSpec);
+    assertNotNull(((SinkOperatorSpec) registeredOpSpec).getSinkFn());
+    assertEquals(OpCode.SINK, registeredOpSpec.getOpCode());
   }
 
   @Test
-  public void testJoin() {
-    MessageStreamImpl<TestMessageEnvelope> source1 = new MessageStreamImpl<>(mockGraph);
-    MessageStreamImpl<TestMessageEnvelope> source2 = new MessageStreamImpl<>(mockGraph);
-    JoinFunction<String, TestMessageEnvelope, TestMessageEnvelope, TestOutputMessageEnvelope> joiner =
-      new JoinFunction<String, TestMessageEnvelope, TestMessageEnvelope, TestOutputMessageEnvelope>() {
-        @Override
-        public TestOutputMessageEnvelope apply(TestMessageEnvelope m1, TestMessageEnvelope m2) {
-          return new TestOutputMessageEnvelope(m1.getKey(), m1.getMessage().getValue().length() + m2.getMessage().getValue().length());
-        }
-
-        @Override
-        public String getFirstKey(TestMessageEnvelope message) {
-          return message.getKey();
-        }
-
-        @Override
-        public String getSecondKey(TestMessageEnvelope message) {
-          return message.getKey();
-        }
-      };
-
-    MessageStream<TestOutputMessageEnvelope> joinOutput = source1.join(source2, joiner, Duration.ofMinutes(1));
-    Collection<OperatorSpec> subs = source1.getRegisteredOperatorSpecs();
-    assertEquals(subs.size(), 1);
-    OperatorSpec<TestMessageEnvelope> joinOp1 = subs.iterator().next();
-    assertTrue(joinOp1 instanceof PartialJoinOperatorSpec);
-    assertEquals(((PartialJoinOperatorSpec) joinOp1).getNextStream(), joinOutput);
-    subs = source2.getRegisteredOperatorSpecs();
-    assertEquals(subs.size(), 1);
-    OperatorSpec<TestMessageEnvelope> joinOp2 = subs.iterator().next();
-    assertTrue(joinOp2 instanceof PartialJoinOperatorSpec);
-    assertEquals(((PartialJoinOperatorSpec) joinOp2).getNextStream(), joinOutput);
-    TestMessageEnvelope joinMsg1 = new TestMessageEnvelope("test-join-1", "join-msg-001", 11111L);
-    TestMessageEnvelope joinMsg2 = new TestMessageEnvelope("test-join-2", "join-msg-002", 22222L);
-    TestOutputMessageEnvelope xOut = (TestOutputMessageEnvelope) ((PartialJoinOperatorSpec) joinOp1).getThisPartialJoinFn().apply(joinMsg1, joinMsg2);
-    assertEquals(xOut.getKey(), "test-join-1");
-    assertEquals(xOut.getMessage(), Integer.valueOf(24));
-    xOut = (TestOutputMessageEnvelope) ((PartialJoinOperatorSpec) joinOp2).getThisPartialJoinFn().apply(joinMsg2, joinMsg1);
-    assertEquals(xOut.getKey(), "test-join-1");
-    assertEquals(xOut.getMessage(), Integer.valueOf(24));
+  public void testSendTo() {
+    StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
+    OperatorSpec mockOpSpec = mock(OperatorSpec.class);
+    MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph, mockOpSpec);
+
+    OutputStreamImpl mockOutputOpSpec = mock(OutputStreamImpl.class);
+    inputStream.sendTo(mockOutputOpSpec);
+
+    ArgumentCaptor<OperatorSpec> registeredOpCaptor = ArgumentCaptor.forClass(OperatorSpec.class);
+    verify(mockOpSpec).registerNextOperatorSpec(registeredOpCaptor.capture());
+    OperatorSpec<?, TestMessageEnvelope> registeredOpSpec = registeredOpCaptor.getValue();
+
+    assertTrue(registeredOpSpec instanceof OutputOperatorSpec);
+    assertEquals(OpCode.SEND_TO, registeredOpSpec.getOpCode());
+    assertEquals(mockOutputOpSpec, ((OutputOperatorSpec) registeredOpSpec).getOutputStream());
   }
 
   @Test
-  public void testMerge() {
-    MessageStream<TestMessageEnvelope> merge1 = new MessageStreamImpl<>(mockGraph);
-    Collection<MessageStream<TestMessageEnvelope>> others = ImmutableList.of(
-        new MessageStreamImpl<>(mockGraph), new MessageStreamImpl<>(mockGraph));
-    MessageStream<TestMessageEnvelope> mergeOutput = merge1.merge(others);
-    validateMergeOperator(merge1, mergeOutput);
+  public void testPartitionBy() {
+    StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
+    OperatorSpec mockOpSpec = mock(OperatorSpec.class);
+
+    String streamName = String.format("%s-%s", OperatorSpec.OpCode.PARTITION_BY.name().toLowerCase(), 0);
+    Function<TestMessageEnvelope, String> mockKeyFn = mock(Function.class);
+    OutputStreamImpl mockOutputOpSpec = mock(OutputStreamImpl.class);
+    IntermediateMessageStreamImpl mockIntermediateStream = mock(IntermediateMessageStreamImpl.class);
+    when(mockGraph.getIntermediateStream(eq(streamName), eq(mockKeyFn), any(Function.class), any(BiFunction.class)))
+        .thenReturn(mockIntermediateStream);
+    when(mockIntermediateStream.getOutputStream())
+        .thenReturn(mockOutputOpSpec);
+
+    MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph, mockOpSpec);
+    inputStream.partitionBy(mockKeyFn);
+
+    ArgumentCaptor<OperatorSpec> registeredOpCaptor = ArgumentCaptor.forClass(OperatorSpec.class);
+    verify(mockOpSpec).registerNextOperatorSpec(registeredOpCaptor.capture());
+    OperatorSpec<?, TestMessageEnvelope> registeredOpSpec = registeredOpCaptor.getValue();
+
+    assertTrue(registeredOpSpec instanceof OutputOperatorSpec);
+    assertEquals(OpCode.PARTITION_BY, registeredOpSpec.getOpCode());
+    assertEquals(mockOutputOpSpec, ((OutputOperatorSpec) registeredOpSpec).getOutputStream());
+  }
 
-    others.forEach(merge -> validateMergeOperator(merge, mergeOutput));
+  @Test
+  public void testWindowWithRelaxedTypes() throws Exception {
+    StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
+    OperatorSpec mockOpSpec = mock(OperatorSpec.class);
+    MessageStream<TestInputMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph, mockOpSpec);
+
+    Function<TestMessageEnvelope, String> keyExtractor = m -> m.getKey();
+    FoldLeftFunction<TestMessageEnvelope, Integer> aggregator = (m, c) -> c + 1;
+    Supplier<Integer> initialValue = () -> 0;
+
+    // should compile since TestMessageEnvelope (input for functions) is base class of TestInputMessageEnvelope (M)
+    Window<TestInputMessageEnvelope, String, Integer> window = Windows
+        .keyedTumblingWindow(keyExtractor, Duration.ofHours(1), initialValue, aggregator);
+    MessageStream<WindowPane<String, Integer>> windowedStream = inputStream.window(window);
+
+    ArgumentCaptor<OperatorSpec> registeredOpCaptor = ArgumentCaptor.forClass(OperatorSpec.class);
+    verify(mockOpSpec).registerNextOperatorSpec(registeredOpCaptor.capture());
+    OperatorSpec<?, TestMessageEnvelope> registeredOpSpec = registeredOpCaptor.getValue();
+
+    assertTrue(registeredOpSpec instanceof WindowOperatorSpec);
+    assertEquals(OpCode.WINDOW, registeredOpSpec.getOpCode());
+    assertEquals(window, ((WindowOperatorSpec) registeredOpSpec).getWindow());
   }
 
   @Test
-  public void testMergeWithRelaxedTypes() {
-    MessageStream<TestMessageEnvelope> input1 = new MessageStreamImpl<>(mockGraph);
-    Collection<MessageStream<? extends TestMessageEnvelope>> others = ImmutableList.of(
-        new MessageStreamImpl<TestInputMessageEnvelope>(mockGraph),
-        new MessageStreamImpl<TestMessageEnvelope>(mockGraph));
+  public void testJoin() {
+    StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
+    OperatorSpec leftInputOpSpec = mock(OperatorSpec.class);
+    MessageStreamImpl<TestMessageEnvelope> source1 = new MessageStreamImpl<>(mockGraph, leftInputOpSpec);
+    OperatorSpec rightInputOpSpec = mock(OperatorSpec.class);
+    MessageStreamImpl<TestMessageEnvelope> source2 = new MessageStreamImpl<>(mockGraph, rightInputOpSpec);
+
+    JoinFunction<String, TestMessageEnvelope, TestMessageEnvelope, TestOutputMessageEnvelope> mockJoinFn =
+        mock(JoinFunction.class);
+
+    Duration joinTtl = Duration.ofMinutes(1);
+    source1.join(source2, mockJoinFn, joinTtl);
+
+    ArgumentCaptor<OperatorSpec> leftRegisteredOpCaptor = ArgumentCaptor.forClass(OperatorSpec.class);
+    verify(leftInputOpSpec).registerNextOperatorSpec(leftRegisteredOpCaptor.capture());
+    OperatorSpec<?, TestMessageEnvelope> leftRegisteredOpSpec = leftRegisteredOpCaptor.getValue();
+
+    ArgumentCaptor<OperatorSpec> rightRegisteredOpCaptor = ArgumentCaptor.forClass(OperatorSpec.class);
+    verify(rightInputOpSpec).registerNextOperatorSpec(rightRegisteredOpCaptor.capture());
+    OperatorSpec<?, TestMessageEnvelope> rightRegisteredOpSpec = rightRegisteredOpCaptor.getValue();
+
+    assertEquals(leftRegisteredOpSpec, rightRegisteredOpSpec);
+    assertEquals(OpCode.JOIN, leftRegisteredOpSpec.getOpCode());
+    assertTrue(leftRegisteredOpSpec instanceof JoinOperatorSpec);
+    assertEquals(mockJoinFn, ((JoinOperatorSpec) leftRegisteredOpSpec).getJoinFn());
+    assertEquals(joinTtl.toMillis(), ((JoinOperatorSpec) leftRegisteredOpSpec).getTtlMs());
+    assertEquals(leftInputOpSpec, ((JoinOperatorSpec) leftRegisteredOpSpec).getLeftInputOpSpec());
+    assertEquals(rightInputOpSpec, ((JoinOperatorSpec) leftRegisteredOpSpec).getRightInputOpSpec());
+  }
 
-    // should compile
-    MessageStream<TestMessageEnvelope> mergeOutput = input1.merge(others);
-    validateMergeOperator(input1, mergeOutput);
+  @Test
+  public void testMerge() {
+    StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
+    OperatorSpec mockOpSpec1 = mock(OperatorSpec.class);
+    MessageStream<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph, mockOpSpec1);
+
+    // other streams have the same message type T as input stream message type M
+    OperatorSpec mockOpSpec2 = mock(OperatorSpec.class);
+    OperatorSpec mockOpSpec3 = mock(OperatorSpec.class);
+    Collection<MessageStream<TestMessageEnvelope>> otherStreams1 = ImmutableList.of(
+        new MessageStreamImpl<>(mockGraph, mockOpSpec2),
+        new MessageStreamImpl<>(mockGraph, mockOpSpec3)
+    );
+
+    inputStream.merge(otherStreams1);
+
+    ArgumentCaptor<OperatorSpec> registeredOpCaptor1 = ArgumentCaptor.forClass(OperatorSpec.class);
+    verify(mockOpSpec1).registerNextOperatorSpec(registeredOpCaptor1.capture());
+    OperatorSpec<?, TestMessageEnvelope> registeredOpSpec1 = registeredOpCaptor1.getValue();
+    assertTrue(registeredOpSpec1 instanceof StreamOperatorSpec);
+    FlatMapFunction transformFn = ((StreamOperatorSpec) registeredOpSpec1).getTransformFn();
+
+    ArgumentCaptor<OperatorSpec> registeredOpCaptor2 = ArgumentCaptor.forClass(OperatorSpec.class);
+    verify(mockOpSpec2).registerNextOperatorSpec(registeredOpCaptor2.capture());
+    OperatorSpec<?, TestMessageEnvelope> registeredOpSpec2 = registeredOpCaptor2.getValue();
+
+    ArgumentCaptor<OperatorSpec> registeredOpCaptor3 = ArgumentCaptor.forClass(OperatorSpec.class);
+    verify(mockOpSpec3).registerNextOperatorSpec(registeredOpCaptor3.capture());
+    OperatorSpec<?, TestMessageEnvelope> registeredOpSpec3 = registeredOpCaptor3.getValue();
+
+    assertEquals(registeredOpSpec1, registeredOpSpec2);
+    assertEquals(registeredOpSpec2, registeredOpSpec3);
+    assertEquals(OpCode.MERGE, registeredOpSpec1.getOpCode());
+
+    assertNotNull(transformFn);
+    TestMessageEnvelope mockInput = mock(TestMessageEnvelope.class);
+    assertTrue(transformFn.apply(mockInput).contains(mockInput));
+    assertEquals(1, transformFn.apply(mockInput).size());
+  }
 
-    others.forEach(merge -> validateMergeOperator((MessageStream<TestMessageEnvelope>) merge, mergeOutput));
+  @Test
+  public void testMergeWithRelaxedTypes() {
+    StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
+    MessageStream<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph, mock(OperatorSpec.class));
+
+    // other streams have the same message type T as input stream message type M
+    Collection<MessageStream<TestMessageEnvelope>> otherStreams1 = ImmutableList.of(
+        new MessageStreamImpl<>(mockGraph, mock(OperatorSpec.class)),
+        new MessageStreamImpl<>(mockGraph, mock(OperatorSpec.class))
+    );
+
+    // other streams have the same message type T that extends as input stream message type M
+    Collection<MessageStream<TestInputMessageEnvelope>> otherStreams2 = ImmutableList.of(
+        new MessageStreamImpl<TestInputMessageEnvelope>(mockGraph, mock(OperatorSpec.class)),
+        new MessageStreamImpl<TestInputMessageEnvelope>(mockGraph, mock(OperatorSpec.class))
+    );
+
+    // other streams have a mix of message types such that T extends input stream message type M
+    Collection<MessageStream<TestMessageEnvelope>> otherStreams3 = ImmutableList.of(
+        new MessageStreamImpl<TestMessageEnvelope>(mockGraph, mock(OperatorSpec.class)),
+        // unchecked cast required for the next stream
+        (MessageStream) new MessageStreamImpl<TestInputMessageEnvelope>(mockGraph, mock(OperatorSpec.class))
+    );
+
+    // not supported:
+    // other streams have a mix of message types such that T extends input stream message type M
+    Collection<MessageStream<? extends TestMessageEnvelope>> otherStreams4 = ImmutableList.of(
+        new MessageStreamImpl<TestMessageEnvelope>(mockGraph, mock(OperatorSpec.class)),
+        new MessageStreamImpl<TestInputMessageEnvelope>(mockGraph, mock(OperatorSpec.class))
+    );
+
+    // check if all type combinations compile
+    inputStream.merge(otherStreams1);
+    inputStream.merge(otherStreams2);
+    inputStream.merge(otherStreams3);
+    inputStream.merge(otherStreams4);
   }
 
   @Test
   public <T> void testMergeWithNestedTypes() {
     class MessageEnvelope<TM> { }
-    MessageStream<MessageEnvelope<T>> ms1 = new MessageStreamImpl<>(mock(StreamGraphImpl.class));
-    MessageStream<MessageEnvelope<T>> ms2 = new MessageStreamImpl<>(mock(StreamGraphImpl.class));
-    MessageStream<MessageEnvelope<T>> ms3 = new MessageStreamImpl<>(mock(StreamGraphImpl.class));
+    MessageStream<MessageEnvelope<T>> ms1 = mock(MessageStreamImpl.class);
+    MessageStream<MessageEnvelope<T>> ms2 = mock(MessageStreamImpl.class);
+    MessageStream<MessageEnvelope<T>> ms3 = mock(MessageStreamImpl.class);
     Collection<MessageStream<MessageEnvelope<T>>> otherStreams = ImmutableList.of(ms2, ms3);
 
     // should compile
     ms1.merge(otherStreams);
   }
 
-  private void validateMergeOperator(MessageStream<TestMessageEnvelope> mergeSource, MessageStream<TestMessageEnvelope> mergeOutput) {
-    Collection<OperatorSpec> subs = ((MessageStreamImpl<TestMessageEnvelope>) mergeSource).getRegisteredOperatorSpecs();
-    assertEquals(subs.size(), 1);
-    OperatorSpec<TestMessageEnvelope> mergeOp = subs.iterator().next();
-    assertTrue(mergeOp instanceof StreamOperatorSpec);
-    assertEquals(((StreamOperatorSpec) mergeOp).getNextStream(), mergeOutput);
-    TestMessageEnvelope mockMsg = mock(TestMessageEnvelope.class);
-    Collection<TestMessageEnvelope> outputs = ((StreamOperatorSpec<TestMessageEnvelope, TestMessageEnvelope>) mergeOp).getTransformFn().apply(
-        mockMsg);
-    assertEquals(outputs.size(), 1);
-    assertEquals(outputs.iterator().next(), mockMsg);
-  }
-
   @Test
   public void testMergeAll() {
     MessageStream<TestMessageEnvelope> input1 = mock(MessageStreamImpl.class);
@@ -361,26 +402,9 @@ public class TestMessageStreamImpl {
     assertTrue(otherStreamsCaptor.getValue().contains(input3));
   }
 
-  @Test
-  public void testPartitionBy() {
-    Map<String, String> map = new HashMap<>();
-    map.put(JobConfig.JOB_DEFAULT_SYSTEM(), "testsystem");
-    Config config = new MapConfig(map);
-    ApplicationRunner runner = ApplicationRunner.fromConfig(config);
-    StreamGraphImpl streamGraph = new StreamGraphImpl(runner, config);
-    MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(streamGraph);
-    Function<TestMessageEnvelope, String> keyExtractorFunc = m -> "222";
-    inputStream.partitionBy(keyExtractorFunc);
-    assertTrue(streamGraph.getInputStreams().size() == 1);
-    assertTrue(streamGraph.getOutputStreams().size() == 1);
-
-    Collection<OperatorSpec> subs = inputStream.getRegisteredOperatorSpecs();
-    assertEquals(subs.size(), 1);
-    OperatorSpec<TestMessageEnvelope> partitionByOp = subs.iterator().next();
-    assertTrue(partitionByOp instanceof SinkOperatorSpec);
-    assertNull(partitionByOp.getNextStream());
-
-    ((SinkOperatorSpec) partitionByOp).getSinkFn().apply(new TestMessageEnvelope("111", "test", 1000),
-        envelope -> assertTrue(envelope.getPartitionKey().equals("222")), null);
+  class TestInputMessageEnvelope extends TestMessageEnvelope {
+    public TestInputMessageEnvelope(String key, Object value) {
+      super(key, value);
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/c1c4289c/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImplUtil.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImplUtil.java b/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImplUtil.java
deleted file mode 100644
index c4e9f51..0000000
--- a/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImplUtil.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators;
-
-
-public class TestMessageStreamImplUtil {
-  public static <M> MessageStreamImpl<M> getMessageStreamImpl(StreamGraphImpl graph) {
-    return new MessageStreamImpl<M>(graph);
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/c1c4289c/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphImpl.java b/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphImpl.java
index 9d95217..1fc60bd 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphImpl.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphImpl.java
@@ -21,23 +21,21 @@ package org.apache.samza.operators;
 import junit.framework.Assert;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
-import org.apache.samza.operators.data.MessageType;
-import org.apache.samza.operators.data.TestInputMessageEnvelope;
 import org.apache.samza.operators.data.TestMessageEnvelope;
-import org.apache.samza.operators.stream.InputStreamInternal;
-import org.apache.samza.operators.stream.InputStreamInternalImpl;
-import org.apache.samza.operators.stream.IntermediateStreamInternalImpl;
-import org.apache.samza.operators.stream.OutputStreamInternalImpl;
+import org.apache.samza.operators.spec.InputOperatorSpec;
+import org.apache.samza.operators.spec.OperatorSpec.OpCode;
+import org.apache.samza.operators.spec.OutputStreamImpl;
+import org.apache.samza.operators.stream.IntermediateMessageStreamImpl;
 import org.apache.samza.runtime.ApplicationRunner;
 import org.apache.samza.system.StreamSpec;
 import org.junit.Test;
 
 import java.util.ArrayList;
+import java.util.List;
 import java.util.function.BiFunction;
 import java.util.function.Function;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -46,152 +44,141 @@ public class TestStreamGraphImpl {
   @Test
   public void testGetInputStream() {
     ApplicationRunner mockRunner = mock(ApplicationRunner.class);
-    Config mockConfig = mock(Config.class);
-    StreamSpec testStreamSpec = new StreamSpec("test-stream-1", "physical-stream-1", "test-system");
-    when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(testStreamSpec);
+    StreamSpec mockStreamSpec = mock(StreamSpec.class);
+    when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
+    StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
+    BiFunction<String, String, TestMessageEnvelope> mockMsgBuilder = mock(BiFunction.class);
+
+    MessageStream<TestMessageEnvelope> inputStream = graph.getInputStream("test-stream-1", mockMsgBuilder);
+
+    InputOperatorSpec<String, String, TestMessageEnvelope> inputOpSpec =
+        (InputOperatorSpec) ((MessageStreamImpl<TestMessageEnvelope>) inputStream).getOperatorSpec();
+    assertEquals(OpCode.INPUT, inputOpSpec.getOpCode());
+    assertEquals(graph.getInputOperators().get(mockStreamSpec), inputOpSpec);
+    assertEquals(mockMsgBuilder, inputOpSpec.getMsgBuilder());
+    assertEquals(mockStreamSpec, inputOpSpec.getStreamSpec());
+  }
 
-    StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig);
-    BiFunction<String, MessageType, TestInputMessageEnvelope> xMsgBuilder =
-        (k, v) -> new TestInputMessageEnvelope(k, v.getValue(), v.getEventTime(), "input-id-1");
-    MessageStream<TestMessageEnvelope> mInputStream = graph.getInputStream("test-stream-1", xMsgBuilder);
-    assertEquals(graph.getInputStreams().get(testStreamSpec), mInputStream);
-    assertTrue(mInputStream instanceof InputStreamInternalImpl);
-    assertEquals(((InputStreamInternalImpl) mInputStream).getMsgBuilder(), xMsgBuilder);
-
-    String key = "test-input-key";
-    MessageType msgBody = new MessageType("test-msg-value", 333333L);
-    TestMessageEnvelope xInputMsg = ((InputStreamInternalImpl<String, MessageType, TestMessageEnvelope>) mInputStream).
-        getMsgBuilder().apply(key, msgBody);
-    assertEquals(xInputMsg.getKey(), key);
-    assertEquals(xInputMsg.getMessage().getValue(), msgBody.getValue());
-    assertEquals(xInputMsg.getMessage().getEventTime(), msgBody.getEventTime());
-    assertEquals(((TestInputMessageEnvelope) xInputMsg).getInputId(), "input-id-1");
+  @Test
+  public void testGetInputStreamWithRelaxedTypes() {
+    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+    StreamSpec mockStreamSpec = mock(StreamSpec.class);
+    when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
+    StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
+    BiFunction<String, String, TestMessageEnvelope> mockMsgBuilder = mock(BiFunction.class);
+
+    MessageStream<TestMessageEnvelope> inputStream = graph.getInputStream("test-stream-1", mockMsgBuilder);
+
+    InputOperatorSpec<String, String, TestMessageEnvelope> inputOpSpec =
+        (InputOperatorSpec) ((MessageStreamImpl<TestMessageEnvelope>) inputStream).getOperatorSpec();
+    assertEquals(OpCode.INPUT, inputOpSpec.getOpCode());
+    assertEquals(graph.getInputOperators().get(mockStreamSpec), inputOpSpec);
+    assertEquals(mockMsgBuilder, inputOpSpec.getMsgBuilder());
+    assertEquals(mockStreamSpec, inputOpSpec.getStreamSpec());
+  }
+
+  @Test
+  public void testMultipleGetInputStreams() {
+    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+    StreamSpec mockStreamSpec1 = mock(StreamSpec.class);
+    StreamSpec mockStreamSpec2 = mock(StreamSpec.class);
+    when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec1);
+    when(mockRunner.getStreamSpec("test-stream-2")).thenReturn(mockStreamSpec2);
+
+    StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
+    MessageStream<Object> inputStream1 = graph.getInputStream("test-stream-1", mock(BiFunction.class));
+    MessageStream<Object> inputStream2 = graph.getInputStream("test-stream-2", mock(BiFunction.class));
+
+    InputOperatorSpec<String, String, TestMessageEnvelope> inputOpSpec1 =
+        (InputOperatorSpec) ((MessageStreamImpl<Object>) inputStream1).getOperatorSpec();
+    InputOperatorSpec<String, String, TestMessageEnvelope> inputOpSpec2 =
+        (InputOperatorSpec) ((MessageStreamImpl<Object>) inputStream2).getOperatorSpec();
+
+    assertEquals(graph.getInputOperators().size(), 2);
+    assertEquals(graph.getInputOperators().get(mockStreamSpec1), inputOpSpec1);
+    assertEquals(graph.getInputOperators().get(mockStreamSpec2), inputOpSpec2);
   }
 
   @Test(expected = IllegalStateException.class)
-  public void testMultipleGetInputStream() {
+  public void testGetSameInputStreamTwice() {
     ApplicationRunner mockRunner = mock(ApplicationRunner.class);
-    Config mockConfig = mock(Config.class);
+    when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mock(StreamSpec.class));
 
-    StreamSpec testStreamSpec1 = new StreamSpec("test-stream-1", "physical-stream-1", "test-system");
-    StreamSpec testStreamSpec2 = new StreamSpec("test-stream-2", "physical-stream-2", "test-system");
-    StreamSpec nonExistentStreamSpec = new StreamSpec("non-existent-stream", "physical-stream-1", "test-system");
+    StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
+    graph.getInputStream("test-stream-1", mock(BiFunction.class));
+    graph.getInputStream("test-stream-1", mock(BiFunction.class)); // should throw exception
+  }
 
-    when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(testStreamSpec1);
-    when(mockRunner.getStreamSpec("test-stream-2")).thenReturn(testStreamSpec2);
+  @Test
+  public void testGetOutputStream() {
+    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+    StreamSpec mockStreamSpec = mock(StreamSpec.class);
+    when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
 
-    StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig);
-    BiFunction<String, MessageType, TestInputMessageEnvelope> xMsgBuilder =
-        (k, v) -> new TestInputMessageEnvelope(k, v.getValue(), v.getEventTime(), "input-id-1");
 
-    //create 2 streams for the corresponding streamIds
-    MessageStream<TestInputMessageEnvelope> inputStream1 = graph.getInputStream("test-stream-1", xMsgBuilder);
-    MessageStream<TestInputMessageEnvelope> inputStream2 = graph.getInputStream("test-stream-2", xMsgBuilder);
+    StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
+    Function<TestMessageEnvelope, String> mockKeyExtractor = mock(Function.class);
+    Function<TestMessageEnvelope, String> mockMsgExtractor = mock(Function.class);
 
-    //assert that the streamGraph contains only the above 2 streams
-    assertEquals(graph.getInputStreams().get(testStreamSpec1), inputStream1);
-    assertEquals(graph.getInputStreams().get(testStreamSpec2), inputStream2);
-    assertEquals(graph.getInputStreams().get(nonExistentStreamSpec), null);
-    assertEquals(graph.getInputStreams().size(), 2);
+    OutputStream<String, String, TestMessageEnvelope> outputStream =
+        graph.getOutputStream("test-stream-1", mockKeyExtractor, mockMsgExtractor);
 
-    //should throw IllegalStateException
-    graph.getInputStream("test-stream-1", xMsgBuilder);
+    OutputStreamImpl<String, String, TestMessageEnvelope> outputOpSpec = (OutputStreamImpl) outputStream;
+    assertEquals(graph.getOutputStreams().get(mockStreamSpec), outputOpSpec);
+    assertEquals(mockKeyExtractor, outputOpSpec.getKeyExtractor());
+    assertEquals(mockMsgExtractor, outputOpSpec.getMsgExtractor());
+    assertEquals(mockStreamSpec, outputOpSpec.getStreamSpec());
   }
 
-
-  @Test
-  public void testGetOutputStream() {
+  @Test(expected = IllegalStateException.class)
+  public void testGetSameOutputStreamTwice() {
     ApplicationRunner mockRunner = mock(ApplicationRunner.class);
-    Config mockConfig = mock(Config.class);
-    StreamSpec testStreamSpec = new StreamSpec("test-stream-1", "physical-stream-1", "test-system");
-    when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(testStreamSpec);
-
-    class MyMessageType extends MessageType {
-      public final String outputId;
+    when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mock(StreamSpec.class));
 
-      public MyMessageType(String value, long eventTime, String outputId) {
-        super(value, eventTime);
-        this.outputId = outputId;
-      }
-    }
-
-    StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig);
-    Function<TestMessageEnvelope, String> xKeyExtractor = x -> x.getKey();
-    Function<TestMessageEnvelope, MyMessageType> xMsgExtractor =
-        x -> new MyMessageType(x.getMessage().getValue(), x.getMessage().getEventTime(), "test-output-id-1");
-
-    OutputStream<String, MyMessageType, TestInputMessageEnvelope> mOutputStream =
-        graph.getOutputStream("test-stream-1", xKeyExtractor, xMsgExtractor);
-    assertEquals(graph.getOutputStreams().get(testStreamSpec), mOutputStream);
-    assertTrue(mOutputStream instanceof OutputStreamInternalImpl);
-    assertEquals(((OutputStreamInternalImpl) mOutputStream).getKeyExtractor(), xKeyExtractor);
-    assertEquals(((OutputStreamInternalImpl) mOutputStream).getMsgExtractor(), xMsgExtractor);
-
-    TestInputMessageEnvelope xInputMsg = new TestInputMessageEnvelope("test-key-1", "test-msg-1", 33333L, "input-id-1");
-    assertEquals(((OutputStreamInternalImpl<String, MyMessageType, TestInputMessageEnvelope>) mOutputStream).
-        getKeyExtractor().apply(xInputMsg), "test-key-1");
-    assertEquals(((OutputStreamInternalImpl<String, MyMessageType, TestInputMessageEnvelope>) mOutputStream).
-        getMsgExtractor().apply(xInputMsg).getValue(), "test-msg-1");
-    assertEquals(((OutputStreamInternalImpl<String, MyMessageType, TestInputMessageEnvelope>) mOutputStream).
-        getMsgExtractor().apply(xInputMsg).getEventTime(), 33333L);
-    assertEquals(((OutputStreamInternalImpl<String, MyMessageType, TestInputMessageEnvelope>) mOutputStream).
-        getMsgExtractor().apply(xInputMsg).outputId, "test-output-id-1");
+    StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
+    graph.getOutputStream("test-stream-1", mock(Function.class), mock(Function.class));
+    graph.getOutputStream("test-stream-1", mock(Function.class), mock(Function.class)); // should throw exception
   }
 
   @Test
   public void testGetIntermediateStream() {
     ApplicationRunner mockRunner = mock(ApplicationRunner.class);
     Config mockConfig = mock(Config.class);
-    StreamSpec testStreamSpec = new StreamSpec("myJob-i001-test-stream-1", "physical-stream-1", "test-system");
-    when(mockRunner.getStreamSpec("myJob-i001-test-stream-1")).thenReturn(testStreamSpec);
+    StreamSpec mockStreamSpec = mock(StreamSpec.class);
     when(mockConfig.get(JobConfig.JOB_NAME())).thenReturn("myJob");
     when(mockConfig.get(JobConfig.JOB_ID(), "1")).thenReturn("i001");
+    when(mockRunner.getStreamSpec("myJob-i001-test-stream-1")).thenReturn(mockStreamSpec);
 
-    class MyMessageType extends MessageType {
-      public final String outputId;
+    StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig);
+    Function<TestMessageEnvelope, String> mockKeyExtractor = mock(Function.class);
+    Function<TestMessageEnvelope, String> mockMsgExtractor = mock(Function.class);
+    BiFunction<String, String, TestMessageEnvelope> mockMsgBuilder = mock(BiFunction.class);
+
+    IntermediateMessageStreamImpl<?, ?, TestMessageEnvelope> intermediateStreamImpl =
+        graph.getIntermediateStream("test-stream-1", mockKeyExtractor, mockMsgExtractor, mockMsgBuilder);
+
+    assertEquals(graph.getInputOperators().get(mockStreamSpec), intermediateStreamImpl.getOperatorSpec());
+    assertEquals(graph.getOutputStreams().get(mockStreamSpec), intermediateStreamImpl.getOutputStream());
+    assertEquals(mockStreamSpec, intermediateStreamImpl.getStreamSpec());
+    assertEquals(mockKeyExtractor, intermediateStreamImpl.getOutputStream().getKeyExtractor());
+    assertEquals(mockMsgExtractor, intermediateStreamImpl.getOutputStream().getMsgExtractor());
+    assertEquals(mockMsgBuilder, ((InputOperatorSpec) intermediateStreamImpl.getOperatorSpec()).getMsgBuilder());
+  }
 
-      public MyMessageType(String value, long eventTime, String outputId) {
-        super(value, eventTime);
-        this.outputId = outputId;
-      }
-    }
+  @Test(expected = IllegalStateException.class)
+  public void testGetSameIntermediateStreamTwice() {
+    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+    when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mock(StreamSpec.class));
 
-    StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig);
-    Function<TestMessageEnvelope, String> xKeyExtractor = x -> x.getKey();
-    Function<TestMessageEnvelope, MyMessageType> xMsgExtractor =
-        x -> new MyMessageType(x.getMessage().getValue(), x.getMessage().getEventTime(), "test-output-id-1");
-    BiFunction<String, MessageType, TestInputMessageEnvelope> xMsgBuilder =
-        (k, v) -> new TestInputMessageEnvelope(k, v.getValue(), v.getEventTime(), "input-id-1");
-
-    MessageStream<TestMessageEnvelope> mIntermediateStream =
-        graph.getIntermediateStream("test-stream-1", xKeyExtractor, xMsgExtractor, xMsgBuilder);
-    assertEquals(graph.getOutputStreams().get(testStreamSpec), mIntermediateStream);
-    assertTrue(mIntermediateStream instanceof IntermediateStreamInternalImpl);
-    assertEquals(((IntermediateStreamInternalImpl) mIntermediateStream).getKeyExtractor(), xKeyExtractor);
-    assertEquals(((IntermediateStreamInternalImpl) mIntermediateStream).getMsgExtractor(), xMsgExtractor);
-    assertEquals(((IntermediateStreamInternalImpl) mIntermediateStream).getMsgBuilder(), xMsgBuilder);
-
-    TestMessageEnvelope xInputMsg = new TestMessageEnvelope("test-key-1", "test-msg-1", 33333L);
-    assertEquals(((IntermediateStreamInternalImpl<String, MessageType, TestMessageEnvelope>) mIntermediateStream).
-        getKeyExtractor().apply(xInputMsg), "test-key-1");
-    assertEquals(((IntermediateStreamInternalImpl<String, MessageType, TestMessageEnvelope>) mIntermediateStream).
-        getMsgExtractor().apply(xInputMsg).getValue(), "test-msg-1");
-    assertEquals(((IntermediateStreamInternalImpl<String, MessageType, TestMessageEnvelope>) mIntermediateStream).
-        getMsgExtractor().apply(xInputMsg).getEventTime(), 33333L);
-    assertEquals(((IntermediateStreamInternalImpl<String, MessageType, TestMessageEnvelope>) mIntermediateStream).
-        getMsgBuilder().apply("test-key-1", new MyMessageType("test-msg-1", 33333L, "test-output-id-1")).getKey(), "test-key-1");
-    assertEquals(((IntermediateStreamInternalImpl<String, MessageType, TestMessageEnvelope>) mIntermediateStream).
-        getMsgBuilder().apply("test-key-1", new MyMessageType("test-msg-1", 33333L, "test-output-id-1")).getMessage().getValue(), "test-msg-1");
-    assertEquals(((IntermediateStreamInternalImpl<String, MessageType, TestMessageEnvelope>) mIntermediateStream).
-        getMsgBuilder().apply("test-key-1", new MyMessageType("test-msg-1", 33333L, "test-output-id-1")).getMessage().getEventTime(), 33333L);
+    StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
+    graph.getIntermediateStream("test-stream-1", mock(Function.class), mock(Function.class), mock(BiFunction.class));
+    graph.getIntermediateStream("test-stream-1", mock(Function.class), mock(Function.class), mock(BiFunction.class));
   }
 
   @Test
-  public void testGetNextOpId() {
+  public void testGetNextOpIdIncrementsId() {
     ApplicationRunner mockRunner = mock(ApplicationRunner.class);
-    Config mockConfig = mock(Config.class);
-
-    StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig);
+    StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
     assertEquals(graph.getNextOpId(), 0);
     assertEquals(graph.getNextOpId(), 1);
   }
@@ -216,10 +203,10 @@ public class TestStreamGraphImpl {
     graph.getInputStream("test-stream-2", (k, v) -> v);
     graph.getInputStream("test-stream-3", (k, v) -> v);
 
-    ArrayList<InputStreamInternal> inputMessageStreams = new ArrayList<>(graph.getInputStreams().values());
-    Assert.assertEquals(inputMessageStreams.size(), 3);
-    Assert.assertEquals(inputMessageStreams.get(0).getStreamSpec(), testStreamSpec1);
-    Assert.assertEquals(inputMessageStreams.get(1).getStreamSpec(), testStreamSpec2);
-    Assert.assertEquals(inputMessageStreams.get(2).getStreamSpec(), testStreamSpec3);
+    List<InputOperatorSpec> inputSpecs = new ArrayList<>(graph.getInputOperators().values());
+    Assert.assertEquals(inputSpecs.size(), 3);
+    Assert.assertEquals(inputSpecs.get(0).getStreamSpec(), testStreamSpec1);
+    Assert.assertEquals(inputSpecs.get(1).getStreamSpec(), testStreamSpec2);
+    Assert.assertEquals(inputSpecs.get(2).getStreamSpec(), testStreamSpec3);
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/c1c4289c/samza-core/src/test/java/org/apache/samza/operators/data/MessageType.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/data/MessageType.java b/samza-core/src/test/java/org/apache/samza/operators/data/MessageType.java
deleted file mode 100644
index 3fd015b..0000000
--- a/samza-core/src/test/java/org/apache/samza/operators/data/MessageType.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.data;
-
-public class MessageType {
-  private final String value;
-  private final long eventTime;
-
-  public MessageType(String value, long eventTime) {
-    this.value = value;
-    this.eventTime = eventTime;
-  }
-
-  public long getEventTime() {
-    return eventTime;
-  }
-
-  public String getValue() {
-    return value;
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/c1c4289c/samza-core/src/test/java/org/apache/samza/operators/data/TestExtOutputMessageEnvelope.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/data/TestExtOutputMessageEnvelope.java b/samza-core/src/test/java/org/apache/samza/operators/data/TestExtOutputMessageEnvelope.java
deleted file mode 100644
index 22222ed..0000000
--- a/samza-core/src/test/java/org/apache/samza/operators/data/TestExtOutputMessageEnvelope.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.data;
-
-public class TestExtOutputMessageEnvelope extends TestOutputMessageEnvelope {
-  private final String outputId;
-
-  public TestExtOutputMessageEnvelope(String key, Integer value, String outputId) {
-    super(key, value);
-    this.outputId = outputId;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/c1c4289c/samza-core/src/test/java/org/apache/samza/operators/data/TestInputMessageEnvelope.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/data/TestInputMessageEnvelope.java b/samza-core/src/test/java/org/apache/samza/operators/data/TestInputMessageEnvelope.java
deleted file mode 100644
index 089f534..0000000
--- a/samza-core/src/test/java/org/apache/samza/operators/data/TestInputMessageEnvelope.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.data;
-
-public class TestInputMessageEnvelope extends TestMessageEnvelope {
-  private final String inputId;
-
-  public TestInputMessageEnvelope(String key, String value, long eventTime, String inputId) {
-    super(key, value, eventTime);
-    this.inputId = inputId;
-  }
-
-  public String getInputId() {
-    return this.inputId;
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/c1c4289c/samza-core/src/test/java/org/apache/samza/operators/data/TestMessageEnvelope.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/data/TestMessageEnvelope.java b/samza-core/src/test/java/org/apache/samza/operators/data/TestMessageEnvelope.java
index 05a63cd..68305cc 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/data/TestMessageEnvelope.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/data/TestMessageEnvelope.java
@@ -20,21 +20,19 @@ package org.apache.samza.operators.data;
 
 
 public class TestMessageEnvelope {
-
   private final String key;
-  private final MessageType value;
+  private final Object value;
 
-  public TestMessageEnvelope(String key, String value, long eventTime) {
+  public TestMessageEnvelope(String key, Object value) {
     this.key = key;
-    this.value = new MessageType(value, eventTime);
+    this.value = value;
   }
 
-  public MessageType getMessage() {
+  public Object getMessage() {
     return this.value;
   }
 
   public String getKey() {
     return this.key;
   }
-
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/c1c4289c/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
index 73d851b..d50d271 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
@@ -23,7 +23,6 @@ import org.apache.samza.metrics.Counter;
 import org.apache.samza.metrics.MetricsRegistry;
 import org.apache.samza.metrics.MetricsRegistryMap;
 import org.apache.samza.metrics.Timer;
-import org.apache.samza.operators.MessageStreamImpl;
 import org.apache.samza.operators.spec.OperatorSpec;
 import org.apache.samza.task.MessageCollector;
 import org.apache.samza.task.TaskContext;
@@ -177,9 +176,11 @@ public class TestOperatorImpl {
 
   private static class TestOpImpl extends OperatorImpl<Object, Object> {
     private final Object mockOutput;
+    private final TestOpSpec testOpSpec;
 
     TestOpImpl(Object mockOutput) {
       this.mockOutput = mockOutput;
+      this.testOpSpec = new TestOpSpec();
     }
 
     @Override
@@ -199,31 +200,14 @@ public class TestOperatorImpl {
     @Override
     protected void handleClose() {}
 
-    @Override
-    protected OperatorSpec<Object> getOperatorSpec() {
-      return new TestOpSpec();
+    protected OperatorSpec<Object, Object> getOperatorSpec() {
+      return testOpSpec;
     }
   }
 
-  private static class TestOpSpec implements OperatorSpec<Object> {
-    @Override
-    public MessageStreamImpl<Object> getNextStream() {
-      return null;
-    }
-
-    @Override
-    public OpCode getOpCode() {
-      return OpCode.INPUT;
-    }
-
-    @Override
-    public int getOpId() {
-      return -1;
-    }
-
-    @Override
-    public String getSourceLocation() {
-      return "";
+  private static class TestOpSpec extends OperatorSpec<Object, Object> {
+    TestOpSpec() {
+     super(OpCode.INPUT, 1);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/c1c4289c/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
index 67e5b46..b2c7722 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
@@ -19,86 +19,211 @@
 
 package org.apache.samza.operators.impl;
 
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.samza.config.Config;
 import org.apache.samza.metrics.MetricsRegistryMap;
 import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.OutputStream;
 import org.apache.samza.operators.StreamGraphImpl;
+import org.apache.samza.operators.functions.FilterFunction;
+import org.apache.samza.operators.functions.JoinFunction;
 import org.apache.samza.operators.functions.MapFunction;
+import org.apache.samza.operators.spec.OperatorSpec.OpCode;
 import org.apache.samza.runtime.ApplicationRunner;
 import org.apache.samza.system.StreamSpec;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.task.MessageCollector;
 import org.apache.samza.task.TaskContext;
+import org.apache.samza.task.TaskCoordinator;
+import org.apache.samza.util.Clock;
 import org.apache.samza.util.SystemClock;
 import org.junit.Test;
 
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.List;
-
-import static junit.framework.Assert.assertEquals;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 public class TestOperatorImplGraph {
 
+  public void testEmptyChain() {
+    StreamGraphImpl streamGraph = new StreamGraphImpl(mock(ApplicationRunner.class), mock(Config.class));
+    OperatorImplGraph opGraph =
+        new OperatorImplGraph(streamGraph, mock(Config.class), mock(TaskContext.class), mock(Clock.class));
+    assertEquals(0, opGraph.getAllInputOperators().size());
+  }
+
   @Test
-  public void testOperatorGraphInitAndClose() {
+  public void testLinearChain() {
+    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+    when(mockRunner.getStreamSpec(eq("input"))).thenReturn(new StreamSpec("input", "input-stream", "input-system"));
+    when(mockRunner.getStreamSpec(eq("output"))).thenReturn(mock(StreamSpec.class));
+    StreamGraphImpl streamGraph = new StreamGraphImpl(mockRunner, mock(Config.class));
+
+    MessageStream<Object> inputStream = streamGraph.getInputStream("input", mock(BiFunction.class));
+    OutputStream<Object, Object, Object> outputStream =
+        streamGraph.getOutputStream("output", mock(Function.class), mock(Function.class));
+
+    inputStream
+        .filter(mock(FilterFunction.class))
+        .map(mock(MapFunction.class))
+        .sendTo(outputStream);
+
+    TaskContext mockTaskContext = mock(TaskContext.class);
+    when(mockTaskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap());
+    OperatorImplGraph opImplGraph =
+        new OperatorImplGraph(streamGraph, mock(Config.class), mockTaskContext, mock(Clock.class));
+
+    InputOperatorImpl inputOpImpl = opImplGraph.getInputOperator(new SystemStream("input-system", "input-stream"));
+    assertEquals(1, inputOpImpl.registeredOperators.size());
+
+    OperatorImpl filterOpImpl = (StreamOperatorImpl) inputOpImpl.registeredOperators.iterator().next();
+    assertEquals(1, filterOpImpl.registeredOperators.size());
+    assertEquals(OpCode.FILTER, filterOpImpl.getOperatorSpec().getOpCode());
+
+    OperatorImpl mapOpImpl = (StreamOperatorImpl) filterOpImpl.registeredOperators.iterator().next();
+    assertEquals(1, mapOpImpl.registeredOperators.size());
+    assertEquals(OpCode.MAP, mapOpImpl.getOperatorSpec().getOpCode());
+
+    OperatorImpl sendToOpImpl = (OutputOperatorImpl) mapOpImpl.registeredOperators.iterator().next();
+    assertEquals(0, sendToOpImpl.registeredOperators.size());
+    assertEquals(OpCode.SEND_TO, sendToOpImpl.getOperatorSpec().getOpCode());
+  }
+
+
+  @Test
+  public void testBroadcastChain() {
+    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+    when(mockRunner.getStreamSpec(eq("input"))).thenReturn(new StreamSpec("input", "input-stream", "input-system"));
+    StreamGraphImpl streamGraph = new StreamGraphImpl(mockRunner, mock(Config.class));
+
+    MessageStream<Object> inputStream = streamGraph.getInputStream("input", mock(BiFunction.class));
+    inputStream.filter(mock(FilterFunction.class));
+    inputStream.map(mock(MapFunction.class));
+
+    TaskContext mockTaskContext = mock(TaskContext.class);
+    when(mockTaskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap());
+    OperatorImplGraph opImplGraph =
+        new OperatorImplGraph(streamGraph, mock(Config.class), mockTaskContext, mock(Clock.class));
+
+    InputOperatorImpl inputOpImpl = opImplGraph.getInputOperator(new SystemStream("input-system", "input-stream"));
+    assertEquals(2, inputOpImpl.registeredOperators.size());
+    assertTrue(inputOpImpl.registeredOperators.stream().anyMatch(opImpl ->
+        ((OperatorImpl) opImpl).getOperatorSpec().getOpCode() == OpCode.FILTER));
+    assertTrue(inputOpImpl.registeredOperators.stream().anyMatch(opImpl ->
+        ((OperatorImpl) opImpl).getOperatorSpec().getOpCode() == OpCode.MAP));
+  }
+
+  @Test
+  public void testJoinChain() {
     ApplicationRunner mockRunner = mock(ApplicationRunner.class);
-    StreamSpec testStreamSpec1 = new StreamSpec("test-stream-1", "physical-stream-1", "test-system");
-    when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(testStreamSpec1);
-    StreamSpec testStreamSpec2 = new StreamSpec("test-stream-2", "physical-stream-2", "test-system");
-    when(mockRunner.getStreamSpec("test-stream-2")).thenReturn(testStreamSpec2);
+    when(mockRunner.getStreamSpec(eq("input1"))).thenReturn(new StreamSpec("input1", "input-stream1", "input-system"));
+    when(mockRunner.getStreamSpec(eq("input2"))).thenReturn(new StreamSpec("input2", "input-stream2", "input-system"));
+    StreamGraphImpl streamGraph = new StreamGraphImpl(mockRunner, mock(Config.class));
+
+    JoinFunction mockJoinFunction = mock(JoinFunction.class);
+    MessageStream<Object> inputStream1 = streamGraph.getInputStream("input1", (k, v) -> v);
+    MessageStream<Object> inputStream2 = streamGraph.getInputStream("input2", (k, v) -> v);
+    inputStream1.join(inputStream2, mockJoinFunction, Duration.ofHours(1));
+
+    TaskContext mockTaskContext = mock(TaskContext.class);
+    when(mockTaskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap());
+    OperatorImplGraph opImplGraph =
+        new OperatorImplGraph(streamGraph, mock(Config.class), mockTaskContext, mock(Clock.class));
+
+    // verify that join function is initialized once.
+    verify(mockJoinFunction, times(1)).init(any(Config.class), any(TaskContext.class));
+
+    InputOperatorImpl inputOpImpl1 = opImplGraph.getInputOperator(new SystemStream("input-system", "input-stream1"));
+    InputOperatorImpl inputOpImpl2 = opImplGraph.getInputOperator(new SystemStream("input-system", "input-stream2"));
+    PartialJoinOperatorImpl leftPartialJoinOpImpl =
+        (PartialJoinOperatorImpl) inputOpImpl1.registeredOperators.iterator().next();
+    PartialJoinOperatorImpl rightPartialJoinOpImpl =
+        (PartialJoinOperatorImpl) inputOpImpl2.registeredOperators.iterator().next();
+
+    assertEquals(leftPartialJoinOpImpl.getOperatorSpec(), rightPartialJoinOpImpl.getOperatorSpec());
+    assertNotSame(leftPartialJoinOpImpl, rightPartialJoinOpImpl);
+
+    Object joinKey = new Object();
+    // verify that left partial join operator calls getFirstKey
+    Object mockLeftMessage = mock(Object.class);
+    when(mockJoinFunction.getFirstKey(eq(mockLeftMessage))).thenReturn(joinKey);
+    inputOpImpl1.onMessage(Pair.of("", mockLeftMessage), mock(MessageCollector.class), mock(TaskCoordinator.class));
+    verify(mockJoinFunction, times(1)).getFirstKey(mockLeftMessage);
+
+    // verify that right partial join operator calls getSecondKey
+    Object mockRightMessage = mock(Object.class);
+    when(mockJoinFunction.getSecondKey(eq(mockRightMessage))).thenReturn(joinKey);
+    inputOpImpl2.onMessage(Pair.of("", mockRightMessage), mock(MessageCollector.class), mock(TaskCoordinator.class));
+    verify(mockJoinFunction, times(1)).getSecondKey(mockRightMessage);
+
+    // verify that the join function apply is called with the correct messages on match
+    verify(mockJoinFunction, times(1)).apply(mockLeftMessage, mockRightMessage);
+  }
 
+  @Test
+  public void testOperatorGraphInitAndClose() {
+    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+    when(mockRunner.getStreamSpec("input1")).thenReturn(new StreamSpec("input1", "input-stream1", "input-system"));
+    when(mockRunner.getStreamSpec("input2")).thenReturn(new StreamSpec("input2", "input-stream2", "input-system"));
     Config mockConfig = mock(Config.class);
-    TaskContext mockContext = createMockContext();
-    StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig);
+    TaskContext mockContext = mock(TaskContext.class);
+    when(mockContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap());
+    StreamGraphImpl streamGraph = new StreamGraphImpl(mockRunner, mockConfig);
 
-    List<String> initializationOrder = new ArrayList<>();
-    List<String> finalizationOrder = new ArrayList<>();
+    MessageStream<Object> inputStream1 = streamGraph.getInputStream("input1", (k, v) -> v);
+    MessageStream<Object> inputStream2 = streamGraph.getInputStream("input2", (k, v) -> v);
 
-    MessageStream<Object> inputStream1 = graph.getInputStream("test-stream-1", (k, v) -> v);
-    MessageStream<Object> inputStream2 = graph.getInputStream("test-stream-2", (k, v) -> v);
+    List<String> initializedOperators = new ArrayList<>();
+    List<String> closedOperators = new ArrayList<>();
 
-    inputStream1.map(createMapFunction("1", initializationOrder, finalizationOrder))
-               .map(createMapFunction("2", initializationOrder, finalizationOrder));
+    inputStream1.map(createMapFunction("1", initializedOperators, closedOperators))
+        .map(createMapFunction("2", initializedOperators, closedOperators));
 
-    inputStream2.map(createMapFunction("3", initializationOrder, finalizationOrder))
-        .map(createMapFunction("4", initializationOrder, finalizationOrder));
+    inputStream2.map(createMapFunction("3", initializedOperators, closedOperators))
+        .map(createMapFunction("4", initializedOperators, closedOperators));
 
-    OperatorImplGraph implGraph = new OperatorImplGraph(SystemClock.instance());
+    OperatorImplGraph opImplGraph = new OperatorImplGraph(streamGraph, mockConfig, mockContext, SystemClock.instance());
 
     // Assert that initialization occurs in topological order.
-    implGraph.init(graph, mockConfig, mockContext);
-    assertEquals(initializationOrder.get(0), "1");
-    assertEquals(initializationOrder.get(1), "2");
-    assertEquals(initializationOrder.get(2), "3");
-    assertEquals(initializationOrder.get(3), "4");
+    assertEquals(initializedOperators.get(0), "1");
+    assertEquals(initializedOperators.get(1), "2");
+    assertEquals(initializedOperators.get(2), "3");
+    assertEquals(initializedOperators.get(3), "4");
 
     // Assert that finalization occurs in reverse topological order.
-    implGraph.close();
-    assertEquals(finalizationOrder.get(0), "4");
-    assertEquals(finalizationOrder.get(1), "3");
-    assertEquals(finalizationOrder.get(2), "2");
-    assertEquals(finalizationOrder.get(3), "1");
-  }
-
-  private TaskContext createMockContext() {
-    TaskContext mockContext = mock(TaskContext.class);
-    when(mockContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap());
-    return mockContext;
+    opImplGraph.close();
+    assertEquals(closedOperators.get(0), "4");
+    assertEquals(closedOperators.get(1), "3");
+    assertEquals(closedOperators.get(2), "2");
+    assertEquals(closedOperators.get(3), "1");
   }
 
   /**
    * Creates an identity map function that appends to the provided lists when init/close is invoked.
    */
-  private MapFunction<Object, Object> createMapFunction(String id, List<String> initializationOrder, List<String> finalizationOrder) {
+  private MapFunction<Object, Object> createMapFunction(String id,
+      List<String> initializedOperators, List<String> finalizedOperators) {
     return new MapFunction<Object, Object>() {
       @Override
       public void init(Config config, TaskContext context) {
-        initializationOrder.add(id);
+        initializedOperators.add(id);
       }
 
       @Override
       public void close() {
-        finalizationOrder.add(id);
+        finalizedOperators.add(id);
       }
 
       @Override
@@ -108,4 +233,3 @@ public class TestOperatorImplGraph {
     };
   }
 }
-

http://git-wip-us.apache.org/repos/asf/samza/blob/c1c4289c/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java
deleted file mode 100644
index a75fadb..0000000
--- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java
+++ /dev/null
@@ -1,236 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.impl;
-
-import org.apache.samza.config.Config;
-import org.apache.samza.metrics.MetricsRegistryMap;
-import org.apache.samza.operators.MessageStreamImpl;
-import org.apache.samza.operators.StreamGraphImpl;
-import org.apache.samza.operators.TestMessageStreamImplUtil;
-import org.apache.samza.operators.data.TestMessageEnvelope;
-import org.apache.samza.operators.data.TestOutputMessageEnvelope;
-import org.apache.samza.operators.functions.FlatMapFunction;
-import org.apache.samza.operators.functions.JoinFunction;
-import org.apache.samza.operators.functions.SinkFunction;
-import org.apache.samza.operators.spec.OperatorSpec;
-import org.apache.samza.operators.spec.PartialJoinOperatorSpec;
-import org.apache.samza.operators.spec.SinkOperatorSpec;
-import org.apache.samza.operators.spec.StreamOperatorSpec;
-import org.apache.samza.operators.spec.WindowOperatorSpec;
-import org.apache.samza.operators.windows.Windows;
-import org.apache.samza.operators.windows.internal.WindowInternal;
-import org.apache.samza.operators.windows.internal.WindowType;
-import org.apache.samza.task.TaskContext;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.lang.reflect.Field;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.Set;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotSame;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-
-public class TestOperatorImpls {
-  Field nextOperatorsField = null;
-  Method createOpMethod = null;
-  Method createOpsMethod = null;
-
-  @Before
-  public void prep() throws NoSuchFieldException, NoSuchMethodException {
-    nextOperatorsField = OperatorImpl.class.getDeclaredField("registeredOperators");
-    nextOperatorsField.setAccessible(true);
-
-    createOpMethod = OperatorImplGraph.class.getDeclaredMethod("createOperatorImpl",
-        OperatorSpec.class, Config.class, TaskContext.class);
-    createOpMethod.setAccessible(true);
-
-    createOpsMethod = OperatorImplGraph.class.getDeclaredMethod("createOperatorImpls", MessageStreamImpl.class,
-        Config.class, TaskContext.class);
-    createOpsMethod.setAccessible(true);
-  }
-
-  @Test
-  public void testCreateOperator() throws NoSuchFieldException, IllegalAccessException, InvocationTargetException {
-    // get window operator
-    WindowOperatorSpec mockWnd = mock(WindowOperatorSpec.class);
-    WindowInternal<TestMessageEnvelope, String, Integer> windowInternal = new WindowInternal<>(null, null, null, null, null, WindowType.TUMBLING);
-    when(mockWnd.getWindow()).thenReturn(windowInternal);
-    Config mockConfig = mock(Config.class);
-    TaskContext mockContext = mock(TaskContext.class);
-
-    OperatorImplGraph opGraph = new OperatorImplGraph();
-    OperatorImpl<TestMessageEnvelope, ?> opImpl = (OperatorImpl<TestMessageEnvelope, ?>)
-        createOpMethod.invoke(opGraph, mockWnd, mockConfig, mockContext);
-    assertTrue(opImpl instanceof WindowOperatorImpl);
-    Field wndInternalField = WindowOperatorImpl.class.getDeclaredField("window");
-    wndInternalField.setAccessible(true);
-    WindowInternal wndInternal = (WindowInternal) wndInternalField.get(opImpl);
-    assertEquals(wndInternal, windowInternal);
-
-    // get simple operator
-    StreamOperatorSpec<TestMessageEnvelope, TestOutputMessageEnvelope> mockSimpleOp = mock(StreamOperatorSpec.class);
-    FlatMapFunction<TestMessageEnvelope, TestOutputMessageEnvelope> mockTxfmFn = mock(FlatMapFunction.class);
-    when(mockSimpleOp.getTransformFn()).thenReturn(mockTxfmFn);
-    opImpl = (OperatorImpl<TestMessageEnvelope, ?>) createOpMethod.invoke(opGraph, mockSimpleOp, mockConfig, mockContext);
-    assertTrue(opImpl instanceof StreamOperatorImpl);
-    Field txfmFnField = StreamOperatorImpl.class.getDeclaredField("transformFn");
-    txfmFnField.setAccessible(true);
-    assertEquals(mockTxfmFn, txfmFnField.get(opImpl));
-
-    // get sink operator
-    SinkFunction<TestMessageEnvelope> sinkFn = (m, mc, tc) -> { };
-    SinkOperatorSpec<TestMessageEnvelope> sinkOp = mock(SinkOperatorSpec.class);
-    when(sinkOp.getSinkFn()).thenReturn(sinkFn);
-    opImpl = (OperatorImpl<TestMessageEnvelope, ?>) createOpMethod.invoke(opGraph, sinkOp, mockConfig, mockContext);
-    assertTrue(opImpl instanceof SinkOperatorImpl);
-    Field sinkFnField = SinkOperatorImpl.class.getDeclaredField("sinkFn");
-    sinkFnField.setAccessible(true);
-    assertEquals(sinkFn, sinkFnField.get(opImpl));
-
-    // get join operator
-    PartialJoinOperatorSpec<String, TestMessageEnvelope, TestMessageEnvelope, TestOutputMessageEnvelope> joinOp = mock(PartialJoinOperatorSpec.class);
-    opImpl = (OperatorImpl<TestMessageEnvelope, ?>) createOpMethod.invoke(opGraph, joinOp, mockConfig, mockContext);
-    assertTrue(opImpl instanceof PartialJoinOperatorImpl);
-  }
-
-  @Test
-  public void testEmptyChain() throws InvocationTargetException, IllegalAccessException {
-    // test creation of empty chain
-    MessageStreamImpl<TestMessageEnvelope> testStream = mock(MessageStreamImpl.class);
-    TaskContext mockContext = mock(TaskContext.class);
-    when(mockContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap());
-    Config mockConfig = mock(Config.class);
-    OperatorImplGraph opGraph = new OperatorImplGraph();
-    RootOperatorImpl operatorChain = (RootOperatorImpl) createOpsMethod.invoke(opGraph, testStream, mockConfig, mockContext);
-    assertTrue(operatorChain != null);
-  }
-
-  @Test
-  public void testLinearChain() throws IllegalAccessException, InvocationTargetException {
-    // test creation of linear chain
-    StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
-    MessageStreamImpl<TestMessageEnvelope> testInput = TestMessageStreamImplUtil.getMessageStreamImpl(mockGraph);
-    TaskContext mockContext = mock(TaskContext.class);
-    when(mockContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap());
-    Config mockConfig = mock(Config.class);
-    testInput.map(m -> m).window(Windows.keyedSessionWindow(TestMessageEnvelope::getKey, Duration.ofMinutes(10)));
-    OperatorImplGraph opGraph = new OperatorImplGraph();
-    RootOperatorImpl operatorChain = (RootOperatorImpl) createOpsMethod.invoke(opGraph, testInput, mockConfig, mockContext);
-    Set<OperatorImpl> subsSet = (Set<OperatorImpl>) nextOperatorsField.get(operatorChain);
-    assertEquals(subsSet.size(), 1);
-    OperatorImpl<TestMessageEnvelope, TestMessageEnvelope> firstOpImpl = subsSet.iterator().next();
-    Set<OperatorImpl> subsOps = (Set<OperatorImpl>) nextOperatorsField.get(firstOpImpl);
-    assertEquals(subsOps.size(), 1);
-    OperatorImpl wndOpImpl = subsOps.iterator().next();
-    subsOps = (Set<OperatorImpl>) nextOperatorsField.get(wndOpImpl);
-    assertEquals(subsOps.size(), 0);
-  }
-
-  @Test
-  public void testBroadcastChain() throws IllegalAccessException, InvocationTargetException {
-    // test creation of broadcast chain
-    StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
-    MessageStreamImpl<TestMessageEnvelope> testInput = TestMessageStreamImplUtil.getMessageStreamImpl(mockGraph);
-    TaskContext mockContext = mock(TaskContext.class);
-    when(mockContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap());
-    Config mockConfig = mock(Config.class);
-    testInput.filter(m -> m.getMessage().getEventTime() > 123456L).flatMap(m -> new ArrayList() { { this.add(m); this.add(m); } });
-    testInput.filter(m -> m.getMessage().getEventTime() < 123456L).map(m -> m);
-    OperatorImplGraph opGraph = new OperatorImplGraph();
-    RootOperatorImpl operatorChain = (RootOperatorImpl) createOpsMethod.invoke(opGraph, testInput, mockConfig, mockContext);
-    Set<OperatorImpl> subsSet = (Set<OperatorImpl>) nextOperatorsField.get(operatorChain);
-    assertEquals(subsSet.size(), 2);
-    Iterator<OperatorImpl> iter = subsSet.iterator();
-    // check the first branch w/ flatMap
-    OperatorImpl<TestMessageEnvelope, TestMessageEnvelope> opImpl = iter.next();
-    Set<OperatorImpl> subsOps = (Set<OperatorImpl>) nextOperatorsField.get(opImpl);
-    assertEquals(subsOps.size(), 1);
-    OperatorImpl flatMapImpl = subsOps.iterator().next();
-    subsOps = (Set<OperatorImpl>) nextOperatorsField.get(flatMapImpl);
-    assertEquals(subsOps.size(), 0);
-    // check the second branch w/ map
-    opImpl = iter.next();
-    subsOps = (Set<OperatorImpl>) nextOperatorsField.get(opImpl);
-    assertEquals(subsOps.size(), 1);
-    OperatorImpl mapImpl = subsOps.iterator().next();
-    subsOps = (Set<OperatorImpl>) nextOperatorsField.get(mapImpl);
-    assertEquals(subsOps.size(), 0);
-  }
-
-  @Test
-  public void testJoinChain() throws IllegalAccessException, InvocationTargetException {
-    // test creation of join chain
-    StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
-    MessageStreamImpl<TestMessageEnvelope> input1 = TestMessageStreamImplUtil.getMessageStreamImpl(mockGraph);
-    MessageStreamImpl<TestMessageEnvelope> input2 = TestMessageStreamImplUtil.getMessageStreamImpl(mockGraph);
-    TaskContext mockContext = mock(TaskContext.class);
-    when(mockContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap());
-    Config mockConfig = mock(Config.class);
-    input1
-        .join(input2,
-            new JoinFunction<String, TestMessageEnvelope, TestMessageEnvelope, TestOutputMessageEnvelope>() {
-              @Override
-              public TestOutputMessageEnvelope apply(TestMessageEnvelope m1, TestMessageEnvelope m2) {
-                return new TestOutputMessageEnvelope(m1.getKey(), m1.getMessage().getValue().length() + m2.getMessage().getValue().length());
-              }
-
-              @Override
-              public String getFirstKey(TestMessageEnvelope message) {
-                return message.getKey();
-              }
-
-              @Override
-              public String getSecondKey(TestMessageEnvelope message) {
-                return message.getKey();
-              }
-            }, Duration.ofMinutes(1))
-        .map(m -> m);
-    OperatorImplGraph opGraph = new OperatorImplGraph();
-    // now, we create chained operators from each input sources
-    RootOperatorImpl chain1 = (RootOperatorImpl) createOpsMethod.invoke(opGraph, input1, mockConfig, mockContext);
-    RootOperatorImpl chain2 = (RootOperatorImpl) createOpsMethod.invoke(opGraph, input2, mockConfig, mockContext);
-    // check that those two chains will merge at map operator
-    // first branch of the join
-    Set<OperatorImpl> subsSet = (Set<OperatorImpl>) nextOperatorsField.get(chain1);
-    assertEquals(subsSet.size(), 1);
-    OperatorImpl<TestMessageEnvelope, TestOutputMessageEnvelope> joinOp1 = subsSet.iterator().next();
-    Set<OperatorImpl> subsOps = (Set<OperatorImpl>) nextOperatorsField.get(joinOp1);
-    assertEquals(subsOps.size(), 1);
-    // the map operator consumes the common join output, where two branches merge
-    OperatorImpl mapImpl = subsOps.iterator().next();
-    // second branch of the join
-    subsSet = (Set<OperatorImpl>) nextOperatorsField.get(chain2);
-    assertEquals(subsSet.size(), 1);
-    OperatorImpl<TestMessageEnvelope, TestOutputMessageEnvelope> joinOp2 = subsSet.iterator().next();
-    assertNotSame(joinOp1, joinOp2);
-    subsOps = (Set<OperatorImpl>) nextOperatorsField.get(joinOp2);
-    assertEquals(subsOps.size(), 1);
-    // make sure that the map operator is the same
-    assertEquals(mapImpl, subsOps.iterator().next());
-  }
-}


[4/4] samza git commit: SAMZA-1221, SAMZA-1101: Internal cleanup for High-Level API implementation.

Posted by xi...@apache.org.
SAMZA-1221, SAMZA-1101: Internal cleanup for High-Level API implementation.

SAMZA-1221: Separated the OperatorSpec and MessageStream DAGs so that they're now duals of each other. Users interact with and construct the MessageStream DAG; we create and use the OperatorSpec DAG internally.
Moved common OperatorSpec functionality (getId, getOpCode, getOpName etc.) to the OperatorSpec abstract base class.
Added a new JoinOperatorSpec and PartialJoinOperatorImpls which are created from JoinOperatorSpec in OperatorGraphImpl.
Added a new InputOperatorSpec and InputOperatorImpl (previously RootOperatorImpl). InputOperatorSpec is created when StreamGraph#getInputStream is called.
SAMZA-1101: Added a new OutputOperatorSpec and OutputOperatorImpl for partitionBy and sendTo. These are Separate from SinkOperatorSpec for and SinkOperatorImpl for sink. We don't need to create a sinkFn for partitionBy and sendTo anymore.
Updated most unit tests to use the new classes and avoid reflection.

Author: Prateek Maheshwari <pm...@linkedin.com>

Reviewers: Jagadish V <vj...@apache.org>

Closes #194 from prateekm/internal-cleanup


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/c1c4289c
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/c1c4289c
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/c1c4289c

Branch: refs/heads/master
Commit: c1c4289c8586457ae35c01f1ccb28fa59697dcb1
Parents: 29cf374
Author: Prateek Maheshwari <pm...@linkedin.com>
Authored: Wed Jun 7 12:42:11 2017 -0700
Committer: Xinyu Liu <xi...@xiliu-ld.linkedin.biz>
Committed: Wed Jun 7 12:42:11 2017 -0700

----------------------------------------------------------------------
 .gitignore                                      |   1 +
 .../samza/execution/ExecutionPlanner.java       |  67 +--
 .../samza/execution/JobGraphJsonGenerator.java  |  83 +--
 .../org/apache/samza/execution/JobNode.java     |   4 +-
 .../samza/operators/MessageStreamImpl.java      | 190 ++-----
 .../apache/samza/operators/StreamGraphImpl.java |  88 ++--
 .../org/apache/samza/operators/WindowState.java |  49 --
 .../samza/operators/impl/InputOperatorImpl.java |  66 +++
 .../samza/operators/impl/OperatorImpl.java      |  27 +-
 .../samza/operators/impl/OperatorImplGraph.java | 242 +++++----
 .../operators/impl/OutputOperatorImpl.java      |  72 +++
 .../operators/impl/PartialJoinOperatorImpl.java |  50 +-
 .../samza/operators/impl/RootOperatorImpl.java  |  76 ---
 .../samza/operators/impl/SinkOperatorImpl.java  |   9 +-
 .../operators/impl/StreamOperatorImpl.java      |   9 +-
 .../operators/impl/WindowOperatorImpl.java      |   3 +-
 .../samza/operators/impl/WindowState.java       |  49 ++
 .../samza/operators/spec/InputOperatorSpec.java |  52 ++
 .../samza/operators/spec/JoinOperatorSpec.java  |  73 +++
 .../samza/operators/spec/OperatorSpec.java      |  72 ++-
 .../samza/operators/spec/OperatorSpecs.java     |  89 ++--
 .../operators/spec/OutputOperatorSpec.java      |  55 ++
 .../samza/operators/spec/OutputStreamImpl.java  |  50 ++
 .../operators/spec/PartialJoinOperatorSpec.java |  97 ----
 .../samza/operators/spec/SinkOperatorSpec.java  |  85 +--
 .../operators/spec/StreamOperatorSpec.java      |  43 +-
 .../operators/spec/WindowOperatorSpec.java      |  36 +-
 .../operators/stream/InputStreamInternal.java   |  39 --
 .../stream/InputStreamInternalImpl.java         |  45 --
 .../stream/IntermediateMessageStreamImpl.java   |  58 +++
 .../stream/IntermediateStreamInternalImpl.java  |  61 ---
 .../operators/stream/OutputStreamInternal.java  |  43 --
 .../stream/OutputStreamInternalImpl.java        |  52 --
 .../samza/operators/util/OperatorJsonUtils.java |  89 ----
 .../apache/samza/task/StreamOperatorTask.java   |  42 +-
 .../samza/execution/TestExecutionPlanner.java   |  10 +-
 .../execution/TestJobGraphJsonGenerator.java    |  37 +-
 .../samza/operators/TestJoinOperator.java       |   6 +-
 .../samza/operators/TestMessageStreamImpl.java  | 522 ++++++++++---------
 .../operators/TestMessageStreamImplUtil.java    |  26 -
 .../samza/operators/TestStreamGraphImpl.java    | 243 ++++-----
 .../samza/operators/data/MessageType.java       |  37 --
 .../data/TestExtOutputMessageEnvelope.java      |  29 --
 .../data/TestInputMessageEnvelope.java          |  32 --
 .../operators/data/TestMessageEnvelope.java     |  10 +-
 .../samza/operators/impl/TestOperatorImpl.java  |  30 +-
 .../operators/impl/TestOperatorImplGraph.java   | 200 +++++--
 .../samza/operators/impl/TestOperatorImpls.java | 236 ---------
 .../operators/impl/TestStreamOperatorImpl.java  |  11 +-
 .../samza/operators/spec/TestOperatorSpecs.java | 192 -------
 .../operators/spec/TestWindowOperatorSpec.java  |   8 +-
 .../src/main/visualizer/js/planToDagre.js       |  12 +-
 52 files changed, 1576 insertions(+), 2231 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/c1c4289c/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index a8de1b5..d4dcaa1 100644
--- a/.gitignore
+++ b/.gitignore
@@ -28,3 +28,4 @@ docs/learn/documentation/*/rest/javadocs
 out/
 *.patch
 **.pyc
+samza-shell/src/main/visualizer/plan.json

http://git-wip-us.apache.org/repos/asf/samza/blob/c1c4289c/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java b/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
index d763d84..00f4ad4 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
@@ -31,11 +31,9 @@ import java.util.Set;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
-import org.apache.samza.operators.MessageStream;
-import org.apache.samza.operators.MessageStreamImpl;
 import org.apache.samza.operators.StreamGraphImpl;
+import org.apache.samza.operators.spec.JoinOperatorSpec;
 import org.apache.samza.operators.spec.OperatorSpec;
-import org.apache.samza.operators.spec.PartialJoinOperatorSpec;
 import org.apache.samza.system.StreamSpec;
 import org.apache.samza.system.SystemStream;
 import org.slf4j.Logger;
@@ -77,7 +75,7 @@ public class ExecutionPlanner {
    */
   /* package private */ JobGraph createJobGraph(StreamGraphImpl streamGraph) {
     JobGraph jobGraph = new JobGraph(config);
-    Set<StreamSpec> sourceStreams = new HashSet<>(streamGraph.getInputStreams().keySet());
+    Set<StreamSpec> sourceStreams = new HashSet<>(streamGraph.getInputOperators().keySet());
     Set<StreamSpec> sinkStreams = new HashSet<>(streamGraph.getOutputStreams().keySet());
     Set<StreamSpec> intStreams = new HashSet<>(sourceStreams);
     intStreams.retainAll(sinkStreams);
@@ -120,7 +118,7 @@ public class ExecutionPlanner {
   /**
    * Fetch the partitions of source/sink streams and update the StreamEdges.
    * @param jobGraph {@link JobGraph}
-   * @param streamManager the {@StreamManager} to interface with the streams.
+   * @param streamManager the {@link StreamManager} to interface with the streams.
    */
   /* package private */ static void updateExistingPartitions(JobGraph jobGraph, StreamManager streamManager) {
     Set<StreamEdge> existingStreams = new HashSet<>();
@@ -157,20 +155,16 @@ public class ExecutionPlanner {
     Multimap<OperatorSpec, StreamEdge> joinSpecToStreamEdges = HashMultimap.create();
     // reverse mapping of the above
     Multimap<StreamEdge, OperatorSpec> streamEdgeToJoinSpecs = HashMultimap.create();
-    // Mapping from the output stream to the join spec. Since StreamGraph creates two partial join operators for a join and they
-    // will have the same output stream, this mapping is used to choose one of them as the unique join spec representing this join
-    // (who register first in the map wins).
-    Map<MessageStream, OperatorSpec> outputStreamToJoinSpec = new HashMap<>();
     // A queue of joins with known input partitions
     Queue<OperatorSpec> joinQ = new LinkedList<>();
     // The visited set keeps track of the join specs that have been already inserted in the queue before
     Set<OperatorSpec> visited = new HashSet<>();
 
-    streamGraph.getInputStreams().entrySet().forEach(entry -> {
+    streamGraph.getInputOperators().entrySet().forEach(entry -> {
         StreamEdge streamEdge = jobGraph.getOrCreateStreamEdge(entry.getKey());
         // Traverses the StreamGraph to find and update mappings for all Joins reachable from this input StreamEdge
         findReachableJoins(entry.getValue(), streamEdge, joinSpecToStreamEdges, streamEdgeToJoinSpecs,
-            outputStreamToJoinSpec, joinQ, visited);
+            joinQ, visited);
       });
 
     // At this point, joinQ contains joinSpecs where at least one of the input stream edge partitions is known.
@@ -209,44 +203,33 @@ public class ExecutionPlanner {
   }
 
   /**
-   * This function traverses the StreamGraph to find and update mappings for all Joins reachable from this input StreamEdge
-   * @param inputMessageStream next input MessageStream to traverse {@link MessageStream}
+   * This function traverses the {@link OperatorSpec} graph to find and update mappings for all Joins reachable
+   * from this input {@link StreamEdge}.
+   * @param operatorSpec the {@link OperatorSpec} to traverse
    * @param sourceStreamEdge source {@link StreamEdge}
    * @param joinSpecToStreamEdges mapping from join spec to its source {@link StreamEdge}s
    * @param streamEdgeToJoinSpecs mapping from source {@link StreamEdge} to the join specs that consumes it
-   * @param outputStreamToJoinSpec mapping from the output stream to the join spec
    * @param joinQ queue that contains joinSpecs where at least one of the input stream edge partitions is known.
    */
-  private static void findReachableJoins(MessageStream inputMessageStream, StreamEdge sourceStreamEdge,
-      Multimap<OperatorSpec, StreamEdge> joinSpecToStreamEdges, Multimap<StreamEdge, OperatorSpec> streamEdgeToJoinSpecs,
-      Map<MessageStream, OperatorSpec> outputStreamToJoinSpec, Queue<OperatorSpec> joinQ, Set<OperatorSpec> visited) {
-    Collection<OperatorSpec> specs = ((MessageStreamImpl) inputMessageStream).getRegisteredOperatorSpecs();
-    for (OperatorSpec spec : specs) {
-      if (spec instanceof PartialJoinOperatorSpec) {
-        // every join will have two partial join operators
-        // we will choose one of them in order to consolidate the inputs
-        // the first one who registered with the outputStreamToJoinSpec will win
-        MessageStream output = spec.getNextStream();
-        OperatorSpec joinSpec = outputStreamToJoinSpec.get(output);
-        if (joinSpec == null) {
-          joinSpec = spec;
-          outputStreamToJoinSpec.put(output, joinSpec);
-        }
-
-        joinSpecToStreamEdges.put(joinSpec, sourceStreamEdge);
-        streamEdgeToJoinSpecs.put(sourceStreamEdge, joinSpec);
-
-        if (!visited.contains(joinSpec) && sourceStreamEdge.getPartitionCount() > 0) {
-          // put the joins with known input partitions into the queue
-          joinQ.add(joinSpec);
-          visited.add(joinSpec);
-        }
+  private static void findReachableJoins(OperatorSpec operatorSpec, StreamEdge sourceStreamEdge,
+      Multimap<OperatorSpec, StreamEdge> joinSpecToStreamEdges,
+      Multimap<StreamEdge, OperatorSpec> streamEdgeToJoinSpecs,
+      Queue<OperatorSpec> joinQ, Set<OperatorSpec> visited) {
+    if (operatorSpec instanceof JoinOperatorSpec) {
+      joinSpecToStreamEdges.put(operatorSpec, sourceStreamEdge);
+      streamEdgeToJoinSpecs.put(sourceStreamEdge, operatorSpec);
+
+      if (!visited.contains(operatorSpec) && sourceStreamEdge.getPartitionCount() > 0) {
+        // put the joins with known input partitions into the queue and mark as visited
+        joinQ.add(operatorSpec);
+        visited.add(operatorSpec);
       }
+    }
 
-      if (spec.getNextStream() != null) {
-        findReachableJoins(spec.getNextStream(), sourceStreamEdge, joinSpecToStreamEdges, streamEdgeToJoinSpecs, outputStreamToJoinSpec, joinQ,
-            visited);
-      }
+    Collection<OperatorSpec> registeredOperatorSpecs = operatorSpec.getRegisteredOperatorSpecs();
+    for (OperatorSpec registeredOpSpec : registeredOperatorSpecs) {
+      findReachableJoins(registeredOpSpec, sourceStreamEdge, joinSpecToStreamEdges, streamEdgeToJoinSpecs, joinQ,
+          visited);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/c1c4289c/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java b/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java
index 96c0538..23c9d89 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java
@@ -19,13 +19,9 @@
 
 package org.apache.samza.execution;
 
-import com.google.common.base.Joiner;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Multimap;
 import java.io.ByteArrayOutputStream;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -33,10 +29,11 @@ import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 import org.apache.samza.config.ApplicationConfig;
-import org.apache.samza.operators.MessageStream;
-import org.apache.samza.operators.MessageStreamImpl;
+import org.apache.samza.operators.spec.JoinOperatorSpec;
 import org.apache.samza.operators.spec.OperatorSpec;
-import org.apache.samza.operators.util.OperatorJsonUtils;
+import org.apache.samza.operators.spec.OperatorSpec.OpCode;
+import org.apache.samza.operators.spec.OutputOperatorSpec;
+import org.apache.samza.operators.spec.OutputStreamImpl;
 import org.codehaus.jackson.annotate.JsonProperty;
 import org.codehaus.jackson.map.ObjectMapper;
 
@@ -73,8 +70,6 @@ import org.codehaus.jackson.map.ObjectMapper;
     List<StreamJson> outputStreams;
     @JsonProperty("operators")
     Map<Integer, Map<String, Object>> operators = new HashMap<>();
-    @JsonProperty("canonicalOpIds")
-    Map<Integer, String> canonicalOpIds = new HashMap<>();
   }
 
   static final class StreamJson {
@@ -108,11 +103,6 @@ import org.codehaus.jackson.map.ObjectMapper;
     String applicationId;
   }
 
-  // Mapping from the output stream to the ids.
-  // Logically they belong to the same operator, but in code we generate one operator for each input.
-  // This is to associate the operators that output to the same MessageStream.
-  Multimap<MessageStream, Integer> outputStreamToOpIds = HashMultimap.create();
-
   /**
    * Returns the JSON representation of a {@link JobGraph}
    * @param jobGraph {@link JobGraph}
@@ -157,28 +147,21 @@ import org.codehaus.jackson.map.ObjectMapper;
   }
 
   /**
-   * Traverse the {@StreamGraph} and build the operator graph JSON POJO.
+   * Traverse the {@link OperatorSpec} graph and build the operator graph JSON POJO.
    * @param jobNode job node in the {@link JobGraph}
    * @return {@link org.apache.samza.execution.JobGraphJsonGenerator.OperatorGraphJson}
    */
   private OperatorGraphJson buildOperatorGraphJson(JobNode jobNode) {
     OperatorGraphJson opGraph = new OperatorGraphJson();
     opGraph.inputStreams = new ArrayList<>();
-    jobNode.getStreamGraph().getInputStreams().forEach((streamSpec, stream) -> {
+    jobNode.getStreamGraph().getInputOperators().forEach((streamSpec, operatorSpec) -> {
         StreamJson inputJson = new StreamJson();
         opGraph.inputStreams.add(inputJson);
         inputJson.streamId = streamSpec.getId();
-        Collection<OperatorSpec> specs = ((MessageStreamImpl) stream).getRegisteredOperatorSpecs();
+        Collection<OperatorSpec> specs = operatorSpec.getRegisteredOperatorSpecs();
         inputJson.nextOperatorIds = specs.stream().map(OperatorSpec::getOpId).collect(Collectors.toSet());
 
-        updateOperatorGraphJson((MessageStreamImpl) stream, opGraph);
-
-        for (Map.Entry<MessageStream, Collection<Integer>> entry : outputStreamToOpIds.asMap().entrySet()) {
-          List<Integer> sortedIds = new ArrayList<>(entry.getValue());
-          Collections.sort(sortedIds);
-          String canonicalId = Joiner.on(',').join(sortedIds);
-          sortedIds.stream().forEach(id -> opGraph.canonicalOpIds.put(id, canonicalId));
-        }
+        updateOperatorGraphJson(operatorSpec, opGraph);
       });
 
     opGraph.outputStreams = new ArrayList<>();
@@ -191,23 +174,43 @@ import org.codehaus.jackson.map.ObjectMapper;
   }
 
   /**
-   * Traverse the {@StreamGraph} recursively and update the operator graph JSON POJO.
-   * @param messageStream input
+   * Traverse the {@link OperatorSpec} graph recursively and update the operator graph JSON POJO.
+   * @param operatorSpec input
    * @param opGraph operator graph to build
    */
-  private void updateOperatorGraphJson(MessageStreamImpl messageStream, OperatorGraphJson opGraph) {
-    Collection<OperatorSpec> specs = messageStream.getRegisteredOperatorSpecs();
-    specs.forEach(opSpec -> {
-        opGraph.operators.put(opSpec.getOpId(), OperatorJsonUtils.operatorToMap(opSpec));
-
-        if (opSpec.getOpCode() == OperatorSpec.OpCode.JOIN || opSpec.getOpCode() == OperatorSpec.OpCode.MERGE) {
-          outputStreamToOpIds.put(opSpec.getNextStream(), opSpec.getOpId());
-        }
-
-        if (opSpec.getNextStream() != null) {
-          updateOperatorGraphJson(opSpec.getNextStream(), opGraph);
-        }
-      });
+  private void updateOperatorGraphJson(OperatorSpec operatorSpec, OperatorGraphJson opGraph) {
+    // TODO xiliu: render input operators instead of input streams
+    if (operatorSpec.getOpCode() != OpCode.INPUT) {
+      opGraph.operators.put(operatorSpec.getOpId(), operatorToMap(operatorSpec));
+    }
+    Collection<OperatorSpec> specs = operatorSpec.getRegisteredOperatorSpecs();
+    specs.forEach(opSpec -> updateOperatorGraphJson(opSpec, opGraph));
+  }
+
+  /**
+   * Format the operator properties into a map
+   * @param spec a {@link OperatorSpec} instance
+   * @return map of the operator properties
+   */
+  private Map<String, Object> operatorToMap(OperatorSpec spec) {
+    Map<String, Object> map = new HashMap<>();
+    map.put("opCode", spec.getOpCode().name());
+    map.put("opId", spec.getOpId());
+    map.put("sourceLocation", spec.getSourceLocation());
+
+    Collection<OperatorSpec> nextOperators = spec.getRegisteredOperatorSpecs();
+    map.put("nextOperatorIds", nextOperators.stream().map(OperatorSpec::getOpId).collect(Collectors.toSet()));
+
+    if (spec instanceof OutputOperatorSpec) {
+      OutputStreamImpl outputStream = ((OutputOperatorSpec) spec).getOutputStream();
+      map.put("outputStreamId", outputStream.getStreamSpec().getId());
+    }
+
+    if (spec instanceof JoinOperatorSpec) {
+      map.put("ttlMs", ((JoinOperatorSpec) spec).getTtlMs());
+    }
+
+    return map;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/samza/blob/c1c4289c/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobNode.java b/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
index fbad520..c42e1cc 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
@@ -31,8 +31,8 @@ import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.config.TaskConfig;
 import org.apache.samza.operators.StreamGraphImpl;
+import org.apache.samza.operators.spec.JoinOperatorSpec;
 import org.apache.samza.operators.spec.OperatorSpec;
-import org.apache.samza.operators.spec.PartialJoinOperatorSpec;
 import org.apache.samza.operators.spec.WindowOperatorSpec;
 import org.apache.samza.operators.util.MathUtils;
 import org.apache.samza.util.Util;
@@ -145,7 +145,7 @@ public class JobNode {
     // Filter out the join operators, and obtain a list of their ttl values
     List<Long> joinTtlIntervals = operatorSpecs.stream()
         .filter(spec -> spec.getOpCode() == OperatorSpec.OpCode.JOIN)
-        .map(spec -> ((PartialJoinOperatorSpec) spec).getTtlMs())
+        .map(spec -> ((JoinOperatorSpec) spec).getTtlMs())
         .collect(Collectors.toList());
 
     // Combine both the above lists

http://git-wip-us.apache.org/repos/asf/samza/blob/c1c4289c/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java b/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
index 9912f95..db6fd5a 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
@@ -19,37 +19,35 @@
 
 package org.apache.samza.operators;
 
-import org.apache.samza.config.Config;
 import org.apache.samza.operators.functions.FilterFunction;
 import org.apache.samza.operators.functions.FlatMapFunction;
 import org.apache.samza.operators.functions.JoinFunction;
 import org.apache.samza.operators.functions.MapFunction;
-import org.apache.samza.operators.functions.PartialJoinFunction;
 import org.apache.samza.operators.functions.SinkFunction;
+import org.apache.samza.operators.spec.JoinOperatorSpec;
 import org.apache.samza.operators.spec.OperatorSpec;
 import org.apache.samza.operators.spec.OperatorSpecs;
+import org.apache.samza.operators.spec.OutputOperatorSpec;
+import org.apache.samza.operators.spec.OutputStreamImpl;
+import org.apache.samza.operators.spec.StreamOperatorSpec;
 import org.apache.samza.operators.spec.SinkOperatorSpec;
-import org.apache.samza.operators.stream.OutputStreamInternal;
-import org.apache.samza.operators.util.InternalInMemoryStore;
+import org.apache.samza.operators.stream.IntermediateMessageStreamImpl;
 import org.apache.samza.operators.windows.Window;
 import org.apache.samza.operators.windows.WindowPane;
 import org.apache.samza.operators.windows.internal.WindowInternal;
-import org.apache.samza.storage.kv.KeyValueStore;
-import org.apache.samza.task.TaskContext;
 
 import java.time.Duration;
-import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Set;
 import java.util.function.Function;
 
 
 /**
- * The implementation for input/output {@link MessageStream}s to/from the operators.
- * Users use the {@link MessageStream} API methods to describe and chain the operators specs.
+ * The {@link MessageStream} implementation that lets users describe their logical DAG.
+ * Users can obtain an instance by calling {@link StreamGraph#getInputStream}.
+ * <p>
+ * Each {@link MessageStreamImpl} is associated with a single {@link OperatorSpec} in the DAG and allows
+ * users to chain further operators on its {@link OperatorSpec}. In other words, a {@link MessageStreamImpl}
+ * presents an "edge-centric" (streams) view of the "node-centric" (specs) logical DAG for the users.
  *
  * @param <M>  type of messages in this {@link MessageStream}
  */
@@ -60,171 +58,97 @@ public class MessageStreamImpl<M> implements MessageStream<M> {
   private final StreamGraphImpl graph;
 
   /**
-   * The set of operators that consume the messages in this {@link MessageStream}
-   *
-   * Use a LinkedHashSet since we need deterministic ordering in initializing/closing operators.
+   * The {@link OperatorSpec} associated with this {@link MessageStreamImpl}
    */
-  private final Set<OperatorSpec> registeredOperatorSpecs = new LinkedHashSet<>();
+  private final OperatorSpec operatorSpec;
 
-  /**
-   * Default constructor
-   *
-   * @param graph the {@link StreamGraphImpl} object that this stream belongs to
-   */
-  public MessageStreamImpl(StreamGraphImpl graph) {
+  public MessageStreamImpl(StreamGraphImpl graph, OperatorSpec<?, M> operatorSpec) {
     this.graph = graph;
+    this.operatorSpec = operatorSpec;
   }
 
   @Override
   public <TM> MessageStream<TM> map(MapFunction<? super M, ? extends TM> mapFn) {
-    OperatorSpec<TM> op = OperatorSpecs.createMapOperatorSpec(
-        mapFn, new MessageStreamImpl<>(this.graph), this.graph.getNextOpId());
-    this.registeredOperatorSpecs.add(op);
-    return op.getNextStream();
+    OperatorSpec<M, TM> op = OperatorSpecs.createMapOperatorSpec(mapFn, this.graph.getNextOpId());
+    this.operatorSpec.registerNextOperatorSpec(op);
+    return new MessageStreamImpl<>(this.graph, op);
   }
 
   @Override
   public MessageStream<M> filter(FilterFunction<? super M> filterFn) {
-    OperatorSpec<M> op = OperatorSpecs.createFilterOperatorSpec(filterFn, new MessageStreamImpl<>(this.graph),
-        this.graph.getNextOpId());
-    this.registeredOperatorSpecs.add(op);
-    return op.getNextStream();
+    OperatorSpec<M, M> op = OperatorSpecs.createFilterOperatorSpec(filterFn, this.graph.getNextOpId());
+    this.operatorSpec.registerNextOperatorSpec(op);
+    return new MessageStreamImpl<>(this.graph, op);
   }
 
   @Override
   public <TM> MessageStream<TM> flatMap(FlatMapFunction<? super M, ? extends TM> flatMapFn) {
-    OperatorSpec<TM> op = OperatorSpecs.createStreamOperatorSpec(flatMapFn, new MessageStreamImpl<>(this.graph),
-        this.graph.getNextOpId());
-    this.registeredOperatorSpecs.add(op);
-    return op.getNextStream();
+    OperatorSpec<M, TM> op = OperatorSpecs.createFlatMapOperatorSpec(flatMapFn, this.graph.getNextOpId());
+    this.operatorSpec.registerNextOperatorSpec(op);
+    return new MessageStreamImpl<>(this.graph, op);
   }
 
   @Override
   public void sink(SinkFunction<? super M> sinkFn) {
     SinkOperatorSpec<M> op = OperatorSpecs.createSinkOperatorSpec(sinkFn, this.graph.getNextOpId());
-    this.registeredOperatorSpecs.add(op);
+    this.operatorSpec.registerNextOperatorSpec(op);
   }
 
   @Override
   public <K, V> void sendTo(OutputStream<K, V, M> outputStream) {
-    SinkOperatorSpec<M> op = OperatorSpecs.createSendToOperatorSpec((OutputStreamInternal<K, V, M>) outputStream,
-        this.graph.getNextOpId());
-    this.registeredOperatorSpecs.add(op);
+    OutputOperatorSpec<M> op = OperatorSpecs.createSendToOperatorSpec(
+        (OutputStreamImpl<K, V, M>) outputStream, this.graph.getNextOpId());
+    this.operatorSpec.registerNextOperatorSpec(op);
   }
 
   @Override
   public <K, WV> MessageStream<WindowPane<K, WV>> window(Window<M, K, WV> window) {
-    OperatorSpec<WindowPane<K, WV>> wndOp = OperatorSpecs.createWindowOperatorSpec((WindowInternal<M, K, WV>) window,
-        new MessageStreamImpl<>(this.graph), this.graph.getNextOpId());
-    this.registeredOperatorSpecs.add(wndOp);
-    return wndOp.getNextStream();
+    OperatorSpec<M, WindowPane<K, WV>> op = OperatorSpecs.createWindowOperatorSpec(
+        (WindowInternal<M, K, WV>) window, this.graph.getNextOpId());
+    this.operatorSpec.registerNextOperatorSpec(op);
+    return new MessageStreamImpl<>(this.graph, op);
   }
 
   @Override
-  public <K, OM, TM> MessageStream<TM> join(
-      MessageStream<OM> otherStream, JoinFunction<? extends K, ? super M, ? super OM, ? extends TM> joinFn, Duration ttl) {
-    MessageStreamImpl<TM> nextStream = new MessageStreamImpl<>(this.graph);
-
-    PartialJoinFunction<K, M, OM, TM> thisPartialJoinFn = new PartialJoinFunction<K, M, OM, TM>() {
-      private KeyValueStore<K, PartialJoinFunction.PartialJoinMessage<M>> thisStreamState;
-
-      @Override
-      public TM apply(M m, OM jm) {
-        return joinFn.apply(m, jm);
-      }
-
-      @Override
-      public K getKey(M message) {
-        return joinFn.getFirstKey(message);
-      }
-
-      @Override
-      public KeyValueStore<K, PartialJoinMessage<M>> getState() {
-        return thisStreamState;
-      }
-
-      @Override
-      public void init(Config config, TaskContext context) {
-        // joinFn#init() must only be called once, so we do it in this partial join function's #init.
-        joinFn.init(config, context);
-
-        thisStreamState = new InternalInMemoryStore<>();
-      }
-
-      @Override
-      public void close() {
-        // joinFn#close() must only be called once, so we do it in this partial join function's #close.
-        joinFn.close();
-      }
-    };
-
-    PartialJoinFunction<K, OM, M, TM> otherPartialJoinFn = new PartialJoinFunction<K, OM, M, TM>() {
-      private KeyValueStore<K, PartialJoinMessage<OM>> otherStreamState;
-
-      @Override
-      public TM apply(OM om, M m) {
-        return joinFn.apply(m, om);
-      }
-
-      @Override
-      public K getKey(OM message) {
-        return joinFn.getSecondKey(message);
-      }
-
-      @Override
-      public KeyValueStore<K, PartialJoinMessage<OM>> getState() {
-        return otherStreamState;
-      }
-
-      @Override
-      public void init(Config config, TaskContext taskContext) {
-        otherStreamState = new InternalInMemoryStore<>();
-      }
-    };
-
-    this.registeredOperatorSpecs.add(OperatorSpecs.createPartialJoinOperatorSpec(
-        thisPartialJoinFn, otherPartialJoinFn, ttl.toMillis(), nextStream, this.graph.getNextOpId()));
-
-    ((MessageStreamImpl<OM>) otherStream).registeredOperatorSpecs.add(OperatorSpecs
-        .createPartialJoinOperatorSpec(otherPartialJoinFn, thisPartialJoinFn, ttl.toMillis(), nextStream,
-            this.graph.getNextOpId()));
-
-    return nextStream;
+  public <K, JM, TM> MessageStream<TM> join(MessageStream<JM> otherStream,
+      JoinFunction<? extends K, ? super M, ? super JM, ? extends TM> joinFn, Duration ttl) {
+    OperatorSpec<?, JM> otherOpSpec = ((MessageStreamImpl<JM>) otherStream).getOperatorSpec();
+    JoinOperatorSpec<K, M, JM, TM> joinOpSpec =
+        OperatorSpecs.createJoinOperatorSpec(this.operatorSpec, otherOpSpec,
+            (JoinFunction<K, M, JM, TM>) joinFn, ttl.toMillis(), this.graph.getNextOpId());
+
+    this.operatorSpec.registerNextOperatorSpec(joinOpSpec);
+    otherOpSpec.registerNextOperatorSpec((OperatorSpec<JM, ?>) joinOpSpec);
+
+    return new MessageStreamImpl<>(this.graph, joinOpSpec);
   }
 
   @Override
   public MessageStream<M> merge(Collection<? extends MessageStream<? extends M>> otherStreams) {
-    MessageStreamImpl<M> nextStream = new MessageStreamImpl<>(this.graph);
-    List<MessageStream<M>> streamsToMerge = new ArrayList<>((Collection<MessageStream<M>>) otherStreams);
-    streamsToMerge.add(this);
-    
-    streamsToMerge.forEach(stream -> {
-        OperatorSpec mergeOperatorSpec =
-            OperatorSpecs.createMergeOperatorSpec(nextStream, this.graph.getNextOpId());
-        ((MessageStreamImpl<M>) stream).registeredOperatorSpecs.add(mergeOperatorSpec);
-      });
-    return nextStream;
+    StreamOperatorSpec<M, M> opSpec = OperatorSpecs.createMergeOperatorSpec(this.graph.getNextOpId());
+    this.operatorSpec.registerNextOperatorSpec(opSpec);
+    otherStreams.forEach(other ->
+        ((MessageStreamImpl<M>) other).getOperatorSpec().registerNextOperatorSpec(opSpec));
+    return new MessageStreamImpl<>(this.graph, opSpec);
   }
 
   @Override
   public <K> MessageStream<M> partitionBy(Function<? super M, ? extends K> keyExtractor) {
     int opId = this.graph.getNextOpId();
     String opName = String.format("%s-%s", OperatorSpec.OpCode.PARTITION_BY.name().toLowerCase(), opId);
-    MessageStreamImpl<M> intermediateStream =
-        this.graph.<K, M, M>getIntermediateStream(opName, keyExtractor, m -> m, (k, m) -> m);
-    SinkOperatorSpec<M> partitionByOperatorSpec = OperatorSpecs.createPartitionByOperatorSpec(
-        (OutputStreamInternal<K, M, M>) intermediateStream, opId);
-    this.registeredOperatorSpecs.add(partitionByOperatorSpec);
+    IntermediateMessageStreamImpl<K, M, M> intermediateStream =
+        this.graph.getIntermediateStream(opName, keyExtractor, m -> m, (k, m) -> m);
+    OutputOperatorSpec<M> partitionByOperatorSpec = OperatorSpecs.createPartitionByOperatorSpec(
+        intermediateStream.getOutputStream(), opId);
+    this.operatorSpec.registerNextOperatorSpec(partitionByOperatorSpec);
     return intermediateStream;
   }
 
   /**
-   * Gets the operator specs registered to consume the output of this {@link MessageStream}. This is an internal API and
-   * should not be exposed to users.
-   *
-   * @return  a collection containing all {@link OperatorSpec}s that are registered with this {@link MessageStream}.
+   * Get the {@link OperatorSpec} associated with this {@link MessageStreamImpl}.
+   * @return the {@link OperatorSpec} associated with this {@link MessageStreamImpl}.
    */
-  public Collection<OperatorSpec> getRegisteredOperatorSpecs() {
-    return Collections.unmodifiableSet(this.registeredOperatorSpecs);
+  protected OperatorSpec<?, M> getOperatorSpec() {
+    return this.operatorSpec;
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/c1c4289c/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java b/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
index fcce5eb..c0da1b2 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
@@ -20,12 +20,10 @@ package org.apache.samza.operators;
 
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
+import org.apache.samza.operators.spec.InputOperatorSpec;
+import org.apache.samza.operators.spec.OutputStreamImpl;
+import org.apache.samza.operators.stream.IntermediateMessageStreamImpl;
 import org.apache.samza.operators.spec.OperatorSpec;
-import org.apache.samza.operators.stream.InputStreamInternal;
-import org.apache.samza.operators.stream.InputStreamInternalImpl;
-import org.apache.samza.operators.stream.IntermediateStreamInternalImpl;
-import org.apache.samza.operators.stream.OutputStreamInternal;
-import org.apache.samza.operators.stream.OutputStreamInternalImpl;
 import org.apache.samza.runtime.ApplicationRunner;
 import org.apache.samza.system.StreamSpec;
 
@@ -51,9 +49,9 @@ public class StreamGraphImpl implements StreamGraph {
    */
   private int opId = 0;
 
-  // Using LHM for deterministic order in initializing and closing operators.
-  private final Map<StreamSpec, InputStreamInternal> inStreams = new LinkedHashMap<>();
-  private final Map<StreamSpec, OutputStreamInternal> outStreams = new LinkedHashMap<>();
+  // We use a LHM for deterministic order in initializing and closing operators.
+  private final Map<StreamSpec, InputOperatorSpec> inputOperators = new LinkedHashMap<>();
+  private final Map<StreamSpec, OutputStreamImpl> outputStreams = new LinkedHashMap<>();
   private final ApplicationRunner runner;
   private final Config config;
 
@@ -67,17 +65,21 @@ public class StreamGraphImpl implements StreamGraph {
   }
 
   @Override
-  public <K, V, M> MessageStream<M> getInputStream(String streamId, BiFunction<? super K, ? super V, ? extends M> msgBuilder) {
+  public <K, V, M> MessageStream<M> getInputStream(String streamId,
+      BiFunction<? super K, ? super V, ? extends M> msgBuilder) {
     if (msgBuilder == null) {
       throw new IllegalArgumentException("msgBuilder can't be null for an input stream");
     }
 
-    if (inStreams.containsKey(runner.getStreamSpec(streamId))) {
-      throw new IllegalStateException("Cannot invoke getInputStream() multiple times with the same streamId: " + streamId);
+    if (inputOperators.containsKey(runner.getStreamSpec(streamId))) {
+      throw new IllegalStateException("getInputStream() invoked multiple times "
+          + "with the same streamId: " + streamId);
     }
 
-    return inStreams.computeIfAbsent(runner.getStreamSpec(streamId),
-        streamSpec -> new InputStreamInternalImpl<>(this, streamSpec, (BiFunction<K, V, M>) msgBuilder));
+    StreamSpec streamSpec = runner.getStreamSpec(streamId);
+    inputOperators.put(streamSpec,
+        new InputOperatorSpec<>(streamSpec, (BiFunction<K, V, M>) msgBuilder, this.getNextOpId()));
+    return new MessageStreamImpl<>(this, inputOperators.get(streamSpec));
   }
 
   @Override
@@ -91,12 +93,15 @@ public class StreamGraphImpl implements StreamGraph {
       throw new IllegalArgumentException("msgExtractor can't be null for an output stream.");
     }
 
-    if (outStreams.containsKey(runner.getStreamSpec(streamId))) {
-      throw new IllegalStateException("Cannot invoke getOutputStream() multiple times with the same streamId: " + streamId);
+    if (outputStreams.containsKey(runner.getStreamSpec(streamId))) {
+      throw new IllegalStateException("getOutputStream() invoked multiple times "
+          + "with the same streamId: " + streamId);
     }
 
-    return outStreams.computeIfAbsent(runner.getStreamSpec(streamId),
-        streamSpec -> new OutputStreamInternalImpl<>(this, streamSpec, (Function<M, K>) keyExtractor, (Function<M, V>) msgExtractor));
+    StreamSpec streamSpec = runner.getStreamSpec(streamId);
+    outputStreams.put(streamSpec,
+        new OutputStreamImpl<>(streamSpec, (Function<M, K>) keyExtractor, (Function<M, V>) msgExtractor));
+    return outputStreams.get(streamSpec);
   }
 
   @Override
@@ -120,8 +125,9 @@ public class StreamGraphImpl implements StreamGraph {
    * @param <M> the type of messages in the intermediate {@link MessageStream}
    * @return  the intermediate {@link MessageStreamImpl}
    */
-  <K, V, M> MessageStreamImpl<M> getIntermediateStream(String streamName,
-      Function<? super M, ? extends K> keyExtractor, Function<? super M, ? extends V> msgExtractor, BiFunction<? super K, ? super V, ? extends M> msgBuilder) {
+  <K, V, M> IntermediateMessageStreamImpl<K, V, M> getIntermediateStream(String streamName,
+      Function<? super M, ? extends K> keyExtractor, Function<? super M, ? extends V> msgExtractor,
+      BiFunction<? super K, ? super V, ? extends M> msgBuilder) {
     String streamId = String.format("%s-%s-%s",
         config.get(JobConfig.JOB_NAME()),
         config.get(JobConfig.JOB_ID(), "1"),
@@ -129,30 +135,28 @@ public class StreamGraphImpl implements StreamGraph {
     if (msgBuilder == null) {
       throw new IllegalArgumentException("msgBuilder cannot be null for an intermediate stream");
     }
-
     if (keyExtractor == null) {
       throw new IllegalArgumentException("keyExtractor can't be null for an output stream.");
     }
     if (msgExtractor == null) {
       throw new IllegalArgumentException("msgExtractor can't be null for an output stream.");
     }
-
     StreamSpec streamSpec = runner.getStreamSpec(streamId);
-    IntermediateStreamInternalImpl<K, V, M> intStream =
-        (IntermediateStreamInternalImpl<K, V, M>) inStreams
-            .computeIfAbsent(streamSpec,
-                k -> new IntermediateStreamInternalImpl<>(this, streamSpec, (Function<M, K>) keyExtractor,
-                    (Function<M, V>) msgExtractor, (BiFunction<K, V, M>) msgBuilder));
-    outStreams.putIfAbsent(streamSpec, intStream);
-    return intStream;
+    if (inputOperators.containsKey(streamSpec) || outputStreams.containsKey(streamSpec)) {
+      throw new IllegalStateException("getIntermediateStream() invoked multiple times "
+          + "with the same streamId: " + streamId);
+    }
+    inputOperators.put(streamSpec, new InputOperatorSpec(streamSpec, msgBuilder, this.getNextOpId()));
+    outputStreams.put(streamSpec, new OutputStreamImpl(streamSpec, keyExtractor, msgExtractor));
+    return new IntermediateMessageStreamImpl<>(this, inputOperators.get(streamSpec), outputStreams.get(streamSpec));
   }
 
-  public Map<StreamSpec, InputStreamInternal> getInputStreams() {
-    return Collections.unmodifiableMap(inStreams);
+  public Map<StreamSpec, InputOperatorSpec> getInputOperators() {
+    return Collections.unmodifiableMap(inputOperators);
   }
 
-  public Map<StreamSpec, OutputStreamInternal> getOutputStreams() {
-    return Collections.unmodifiableMap(outStreams);
+  public Map<StreamSpec, OutputStreamImpl> getOutputStreams() {
+    return Collections.unmodifiableMap(outputStreams);
   }
 
   public ContextManager getContextManager() {
@@ -169,24 +173,20 @@ public class StreamGraphImpl implements StreamGraph {
    * @return  a set of all available {@link OperatorSpec}s
    */
   public Collection<OperatorSpec> getAllOperatorSpecs() {
-    Collection<InputStreamInternal> inputStreams = inStreams.values();
+    Collection<InputOperatorSpec> inputOperatorSpecs = inputOperators.values();
     Set<OperatorSpec> operatorSpecs = new HashSet<>();
 
-    for (InputStreamInternal stream : inputStreams) {
-      doGetOperatorSpecs((MessageStreamImpl) stream, operatorSpecs);
+    for (InputOperatorSpec inputOperatorSpec: inputOperatorSpecs) {
+      doGetOperatorSpecs(inputOperatorSpec, operatorSpecs);
     }
     return operatorSpecs;
   }
 
-  private void doGetOperatorSpecs(MessageStreamImpl stream, Set<OperatorSpec> specs) {
-    Collection<OperatorSpec> registeredOperatorSpecs = stream.getRegisteredOperatorSpecs();
-    for (OperatorSpec spec : registeredOperatorSpecs) {
-      specs.add(spec);
-      MessageStreamImpl nextStream = spec.getNextStream();
-      if (nextStream != null) {
-        //Recursively traverse and obtain all reachable operators
-        doGetOperatorSpecs(nextStream, specs);
-      }
+  private void doGetOperatorSpecs(OperatorSpec operatorSpec, Set<OperatorSpec> specs) {
+    Collection<OperatorSpec> registeredOperatorSpecs = operatorSpec.getRegisteredOperatorSpecs();
+    for (OperatorSpec registeredOperatorSpec: registeredOperatorSpecs) {
+      specs.add(registeredOperatorSpec);
+      doGetOperatorSpecs(registeredOperatorSpec, specs);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/c1c4289c/samza-core/src/main/java/org/apache/samza/operators/WindowState.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/WindowState.java b/samza-core/src/main/java/org/apache/samza/operators/WindowState.java
deleted file mode 100644
index 801044b..0000000
--- a/samza-core/src/main/java/org/apache/samza/operators/WindowState.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators;
-
-/**
- * Wraps the value stored for a particular {@link org.apache.samza.operators.windows.WindowKey} with additional metadata.
- */
-public class WindowState<WV> {
-
-  final WV wv;
-  /**
-   * Time of the first message in the window
-   */
-  final long earliestRecvTime;
-
-  public WindowState(WV wv, long earliestRecvTime) {
-    this.wv = wv;
-    this.earliestRecvTime = earliestRecvTime;
-  }
-
-  public WV getWindowValue() {
-    return wv;
-  }
-
-  public long getEarliestTimestamp() {
-    return earliestRecvTime;
-  }
-
-  @Override
-  public String toString() {
-    return String.format("WindowState: {time=%d, value=%s}", earliestRecvTime, wv);
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/c1c4289c/samza-core/src/main/java/org/apache/samza/operators/impl/InputOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/InputOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/InputOperatorImpl.java
new file mode 100644
index 0000000..0545af1
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/InputOperatorImpl.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.impl;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.spec.InputOperatorSpec;
+import org.apache.samza.operators.spec.OperatorSpec;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskContext;
+import org.apache.samza.task.TaskCoordinator;
+
+import java.util.Collection;
+import java.util.Collections;
+
+
+/**
+ * An operator that builds the input message from the incoming message.
+ *
+ * @param <K> the type of key in the incoming message
+ * @param <V> the type of message in the incoming message
+ * @param <M> the type of input message
+ */
+public final class InputOperatorImpl<K, V, M> extends OperatorImpl<Pair<K, V>, M> {
+
+  private final InputOperatorSpec<K, V, M> inputOpSpec;
+
+  InputOperatorImpl(InputOperatorSpec<K, V, M> inputOpSpec) {
+    this.inputOpSpec = inputOpSpec;
+  }
+
+  @Override
+  protected void handleInit(Config config, TaskContext context) {
+  }
+
+  @Override
+  public Collection<M> handleMessage(Pair<K, V> pair, MessageCollector collector, TaskCoordinator coordinator) {
+    // TODO: SAMZA-1148 - Cast to appropriate input (key, msg) types based on the serde before applying the msgBuilder.
+    M message = this.inputOpSpec.getMsgBuilder().apply(pair.getKey(), pair.getValue());
+    return Collections.singletonList(message);
+  }
+
+  @Override
+  protected void handleClose() {
+  }
+
+  protected OperatorSpec<Pair<K, V>, M> getOperatorSpec() {
+    return this.inputOpSpec;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/c1c4289c/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
index 23c31ac..73bb83d 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
@@ -43,12 +43,13 @@ public abstract class OperatorImpl<M, RM> {
 
   private boolean initialized;
   private boolean closed;
-  private Set<OperatorImpl<RM, ?>> registeredOperators;
   private HighResolutionClock highResClock;
   private Counter numMessage;
   private Timer handleMessageNs;
   private Timer handleTimerNs;
 
+  Set<OperatorImpl<RM, ?>> registeredOperators;
+
   /**
    * Initialize this {@link OperatorImpl} and its user-defined functions.
    *
@@ -56,7 +57,7 @@ public abstract class OperatorImpl<M, RM> {
    * @param context  the {@link TaskContext} for the task
    */
   public final void init(Config config, TaskContext context) {
-    String opName = getOperatorSpec().getOpName();
+    String opName = getOperatorName();
 
     if (initialized) {
       throw new IllegalStateException(String.format("Attempted to initialize Operator %s more than once.", opName));
@@ -95,7 +96,7 @@ public abstract class OperatorImpl<M, RM> {
     if (!initialized) {
       throw new IllegalStateException(
           String.format("Attempted to register next operator before initializing operator %s.",
-              getOperatorSpec().getOpName()));
+              getOperatorName()));
     }
     this.registeredOperators.add(nextOperator);
   }
@@ -167,10 +168,9 @@ public abstract class OperatorImpl<M, RM> {
   }
 
   public void close() {
-    String opName = getOperatorSpec().getOpName();
-
     if (closed) {
-      throw new IllegalStateException(String.format("Attempted to close Operator %s more than once.", opName));
+      throw new IllegalStateException(
+          String.format("Attempted to close Operator %s more than once.", getOperatorSpec().getOpName()));
     }
     handleClose();
     closed = true;
@@ -183,7 +183,20 @@ public abstract class OperatorImpl<M, RM> {
    *
    * @return the {@link OperatorSpec} for this {@link OperatorImpl}
    */
-  protected abstract OperatorSpec<RM> getOperatorSpec();
+  protected abstract OperatorSpec<M, RM> getOperatorSpec();
+
+  /**
+   * Get the name for this {@link OperatorImpl}.
+   *
+   * Some {@link OperatorImpl}s don't have a 1:1 mapping with their {@link OperatorSpec}. E.g., there are
+   * 2 PartialJoinOperatorImpls for a JoinOperatorSpec. Overriding this method allows them to provide an
+   * implementation specific name, e.g., for use in metrics.
+   *
+   * @return the operator name
+   */
+  protected String getOperatorName() {
+    return getOperatorSpec().getOpName();
+  }
 
   private HighResolutionClock createHighResClock(Config config) {
     if (new MetricsConfig(config).getMetricsTimerEnabled()) {

http://git-wip-us.apache.org/repos/asf/samza/blob/c1c4289c/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
index e99b3ee..e5fce13 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
@@ -19,18 +19,23 @@
 package org.apache.samza.operators.impl;
 
 import com.google.common.collect.Lists;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.samza.config.Config;
-import org.apache.samza.operators.MessageStreamImpl;
 import org.apache.samza.operators.StreamGraphImpl;
+import org.apache.samza.operators.functions.JoinFunction;
+import org.apache.samza.operators.functions.PartialJoinFunction;
+import org.apache.samza.operators.spec.InputOperatorSpec;
+import org.apache.samza.operators.spec.JoinOperatorSpec;
 import org.apache.samza.operators.spec.OperatorSpec;
-import org.apache.samza.operators.spec.SinkOperatorSpec;
-import org.apache.samza.operators.spec.PartialJoinOperatorSpec;
+import org.apache.samza.operators.spec.OutputOperatorSpec;
 import org.apache.samza.operators.spec.StreamOperatorSpec;
+import org.apache.samza.operators.spec.SinkOperatorSpec;
 import org.apache.samza.operators.spec.WindowOperatorSpec;
+import org.apache.samza.operators.util.InternalInMemoryStore;
+import org.apache.samza.storage.kv.KeyValueStore;
 import org.apache.samza.system.SystemStream;
 import org.apache.samza.task.TaskContext;
 import org.apache.samza.util.Clock;
-import org.apache.samza.util.SystemClock;
 
 import java.util.ArrayList;
 import java.util.Collection;
@@ -42,67 +47,59 @@ import java.util.Map;
 
 
 /**
- * Instantiates the DAG of {@link OperatorImpl}s corresponding to the {@link OperatorSpec}s for the input
- * {@link MessageStreamImpl}s.
+ * The DAG of {@link OperatorImpl}s corresponding to the DAG of {@link OperatorSpec}s.
  */
 public class OperatorImplGraph {
 
   /**
-   * A mapping from {@link OperatorSpec}s to their {@link OperatorImpl}s in this graph. Used to avoid creating
-   * multiple {@link OperatorImpl}s for an {@link OperatorSpec}, e.g., when it's reached from different
-   * input {@link MessageStreamImpl}s.
-   *
-   * Using LHM for deterministic ordering in initializing and closing operators.
+   * A mapping from operator names to their {@link OperatorImpl}s in this graph. Used to avoid creating
+   * multiple {@link OperatorImpl}s for an {@link OperatorSpec} when it's reached from different
+   * {@link OperatorSpec}s during DAG traversals (e.g., for the merge operator).
+   * We use a LHM for deterministic ordering in initializing and closing operators.
    */
-  private final Map<OperatorSpec, OperatorImpl> operatorImpls = new LinkedHashMap<>();
+  private final Map<String, OperatorImpl> operatorImpls = new LinkedHashMap<>();
 
   /**
-   * A mapping from input {@link SystemStream}s to their {@link OperatorImpl} sub-DAG in this graph.
+   * A mapping from input {@link SystemStream}s to their {@link InputOperatorImpl} sub-DAG in this graph.
    */
-  private final Map<SystemStream, RootOperatorImpl> rootOperators = new HashMap<>();
+  private final Map<SystemStream, InputOperatorImpl> inputOperators = new HashMap<>();
 
-  private final Clock clock;
-
-  public OperatorImplGraph(Clock clock) {
-    this.clock = clock;
-  }
+  /**
+   * A mapping from {@link JoinOperatorSpec}s to their two {@link PartialJoinFunction}s. Used to associate
+   * the two {@link PartialJoinOperatorImpl}s for a {@link JoinOperatorSpec} with each other since they're
+   * reached from different {@link OperatorSpec} during DAG traversals.
+   */
+  private final Map<Integer, Pair<PartialJoinFunction, PartialJoinFunction>> joinFunctions = new HashMap<>();
 
-  /* package private */ OperatorImplGraph() {
-    this(SystemClock.instance());
-  }
+  private final Clock clock;
 
   /**
-   * Initialize the DAG of {@link OperatorImpl}s for the input {@link MessageStreamImpl} in the provided
-   * {@link StreamGraphImpl}.
+   * Constructs the DAG of {@link OperatorImpl}s corresponding to the the DAG of {@link OperatorSpec}s
+   * in the {@code streamGraph}.
    *
-   * @param streamGraph  the logical {@link StreamGraphImpl}
+   * @param streamGraph  the {@link StreamGraphImpl} containing the logical {@link OperatorSpec} DAG
    * @param config  the {@link Config} required to instantiate operators
    * @param context  the {@link TaskContext} required to instantiate operators
+   * @param clock  the {@link Clock} to get current time
    */
-  public void init(StreamGraphImpl streamGraph, Config config, TaskContext context) {
-    streamGraph.getInputStreams().forEach((streamSpec, inputStream) -> {
+  public OperatorImplGraph(StreamGraphImpl streamGraph, Config config, TaskContext context, Clock clock) {
+    this.clock = clock;
+    streamGraph.getInputOperators().forEach((streamSpec, inputOpSpec) -> {
         SystemStream systemStream = new SystemStream(streamSpec.getSystemName(), streamSpec.getPhysicalName());
-        this.rootOperators.put(systemStream, this.createOperatorImpls((MessageStreamImpl) inputStream, config, context));
+        InputOperatorImpl inputOperatorImpl =
+            (InputOperatorImpl) createAndRegisterOperatorImpl(null, inputOpSpec, config, context);
+        this.inputOperators.put(systemStream, inputOperatorImpl);
       });
   }
 
   /**
-   * Get the {@link RootOperatorImpl} corresponding to the provided input {@code systemStream}.
+   * Get the {@link InputOperatorImpl} corresponding to the provided input {@code systemStream}.
    *
    * @param systemStream  input {@link SystemStream}
-   * @return  the {@link RootOperatorImpl} that starts processing the input message
+   * @return  the {@link InputOperatorImpl} that starts processing the input message
    */
-  public RootOperatorImpl getRootOperator(SystemStream systemStream) {
-    return this.rootOperators.get(systemStream);
-  }
-
-  /**
-   * Get all {@link RootOperatorImpl}s for the graph.
-   *
-   * @return  an unmodifiable view of all {@link RootOperatorImpl}s for the graph
-   */
-  public Collection<RootOperatorImpl> getAllRootOperators() {
-    return Collections.unmodifiableCollection(this.rootOperators.values());
+  public InputOperatorImpl getInputOperator(SystemStream systemStream) {
+    return this.inputOperators.get(systemStream);
   }
 
   public void close() {
@@ -112,65 +109,44 @@ public class OperatorImplGraph {
   }
 
   /**
-   * Traverses the DAG of {@link OperatorSpec}s starting from the provided {@link MessageStreamImpl},
-   * creates the corresponding DAG of {@link OperatorImpl}s, and returns its root {@link RootOperatorImpl} node.
+   * Get all {@link InputOperatorImpl}s for the graph.
    *
-   * @param source  the input {@link MessageStreamImpl} to instantiate {@link OperatorImpl}s for
-   * @param config  the {@link Config} required to instantiate operators
-   * @param context  the {@link TaskContext} required to instantiate operators
-   * @param <M>  the type of messages in the {@code source} {@link MessageStreamImpl}
-   * @return  root node for the {@link OperatorImpl} DAG
+   * @return  an unmodifiable view of all {@link InputOperatorImpl}s for the graph
    */
-  private <M> RootOperatorImpl<M> createOperatorImpls(MessageStreamImpl<M> source,
-      Config config, TaskContext context) {
-    // since the source message stream might have multiple operator specs registered on it,
-    // create a new root node as a single point of entry for the DAG.
-    RootOperatorImpl<M> rootOperator = new RootOperatorImpl<>();
-    rootOperator.init(config, context);
-    // create the pipeline/topology starting from the source
-    source.getRegisteredOperatorSpecs().forEach(registeredOperator -> {
-        // pass in the context so that operator implementations can initialize their functions
-        OperatorImpl<M, ?> operatorImpl =
-            createAndRegisterOperatorImpl(registeredOperator, config, context);
-        rootOperator.registerNextOperator(operatorImpl);
-      });
-    return rootOperator;
+  public Collection<InputOperatorImpl> getAllInputOperators() {
+    return Collections.unmodifiableCollection(this.inputOperators.values());
   }
 
   /**
-   * Helper method to recursively traverse the {@link OperatorSpec} DAG and instantiate and link the corresponding
-   * {@link OperatorImpl}s.
+   * Traverses the DAG of {@link OperatorSpec}s starting from the provided {@link OperatorSpec},
+   * creates the corresponding DAG of {@link OperatorImpl}s, and returns the root {@link OperatorImpl} node.
    *
+   * @param prevOperatorSpec  the parent of the current {@code operatorSpec} in the traversal
    * @param operatorSpec  the operatorSpec to create the {@link OperatorImpl} for
    * @param config  the {@link Config} required to instantiate operators
    * @param context  the {@link TaskContext} required to instantiate operators
-   * @param <M>  type of input message
    * @return  the operator implementation for the operatorSpec
    */
-  private <M> OperatorImpl<M, ?> createAndRegisterOperatorImpl(OperatorSpec operatorSpec,
+  OperatorImpl createAndRegisterOperatorImpl(OperatorSpec prevOperatorSpec, OperatorSpec operatorSpec,
       Config config, TaskContext context) {
-    if (!operatorImpls.containsKey(operatorSpec)) {
-      OperatorImpl<M, ?> operatorImpl = createOperatorImpl(operatorSpec, config, context);
-      if (operatorImpls.putIfAbsent(operatorSpec, operatorImpl) == null) {
-        // this is the first time we've added the operatorImpl corresponding to the operatorSpec,
-        // so traverse and initialize and register the rest of the DAG.
-        // initialize the corresponding operator function
-        operatorImpl.init(config, context);
-        MessageStreamImpl nextStream = operatorSpec.getNextStream();
-        if (nextStream != null) {
-          Collection<OperatorSpec> registeredSpecs = nextStream.getRegisteredOperatorSpecs();
-          registeredSpecs.forEach(registeredSpec -> {
-              OperatorImpl subImpl = createAndRegisterOperatorImpl(registeredSpec, config, context);
-              operatorImpl.registerNextOperator(subImpl);
-            });
-        }
-        return operatorImpl;
-      }
+    if (!operatorImpls.containsKey(operatorSpec) || operatorSpec instanceof JoinOperatorSpec) {
+      // Either this is the first time we've seen this operatorSpec, or this is a join operator spec
+      // and we need to create 2 partial join operator impls for it. Initialize and register the sub-DAG.
+      OperatorImpl operatorImpl = createOperatorImpl(prevOperatorSpec, operatorSpec, config, context);
+      operatorImpl.init(config, context);
+      operatorImpls.put(operatorImpl.getOperatorName(), operatorImpl);
+
+      Collection<OperatorSpec> registeredSpecs = operatorSpec.getRegisteredOperatorSpecs();
+      registeredSpecs.forEach(registeredSpec -> {
+          OperatorImpl nextImpl = createAndRegisterOperatorImpl(operatorSpec, registeredSpec, config, context);
+          operatorImpl.registerNextOperator(nextImpl);
+        });
+      return operatorImpl;
+    } else {
+      // the implementation corresponding to operatorSpec has already been instantiated
+      // and registered, so we do not need to traverse the DAG further.
+      return operatorImpls.get(operatorSpec);
     }
-
-    // the implementation corresponding to operatorSpec has already been instantiated
-    // and registered, so we do not need to traverse the DAG further.
-    return operatorImpls.get(operatorSpec);
   }
 
   /**
@@ -179,20 +155,96 @@ public class OperatorImplGraph {
    * @param operatorSpec  the immutable {@link OperatorSpec} definition.
    * @param config  the {@link Config} required to instantiate operators
    * @param context  the {@link TaskContext} required to instantiate operators
-   * @param <M>  type of input message
    * @return  the {@link OperatorImpl} implementation instance
    */
-  private <M> OperatorImpl<M, ?> createOperatorImpl(OperatorSpec operatorSpec, Config config, TaskContext context) {
-    if (operatorSpec instanceof StreamOperatorSpec) {
-      return new StreamOperatorImpl<>((StreamOperatorSpec<M, ?>) operatorSpec, config, context);
+  OperatorImpl createOperatorImpl(OperatorSpec prevOperatorSpec, OperatorSpec operatorSpec,
+      Config config, TaskContext context) {
+    if (operatorSpec instanceof InputOperatorSpec) {
+      return new InputOperatorImpl((InputOperatorSpec) operatorSpec);
+    } else if (operatorSpec instanceof StreamOperatorSpec) {
+      return new StreamOperatorImpl((StreamOperatorSpec) operatorSpec, config, context);
     } else if (operatorSpec instanceof SinkOperatorSpec) {
-      return new SinkOperatorImpl<>((SinkOperatorSpec<M>) operatorSpec, config, context);
+      return new SinkOperatorImpl((SinkOperatorSpec) operatorSpec, config, context);
+    } else if (operatorSpec instanceof OutputOperatorSpec) {
+      return new OutputOperatorImpl((OutputOperatorSpec) operatorSpec, config, context);
     } else if (operatorSpec instanceof WindowOperatorSpec) {
-      return new WindowOperatorImpl((WindowOperatorSpec<M, ?, ?>) operatorSpec, clock);
-    } else if (operatorSpec instanceof PartialJoinOperatorSpec) {
-      return new PartialJoinOperatorImpl<>((PartialJoinOperatorSpec) operatorSpec, config, context, clock);
+      return new WindowOperatorImpl((WindowOperatorSpec) operatorSpec, clock);
+    } else if (operatorSpec instanceof JoinOperatorSpec) {
+      return createPartialJoinOperatorImpl(prevOperatorSpec, (JoinOperatorSpec) operatorSpec, config, context, clock);
     }
     throw new IllegalArgumentException(
         String.format("Unsupported OperatorSpec: %s", operatorSpec.getClass().getName()));
   }
+
+  private PartialJoinOperatorImpl createPartialJoinOperatorImpl(OperatorSpec prevOperatorSpec,
+      JoinOperatorSpec joinOpSpec, Config config, TaskContext context, Clock clock) {
+    Pair<PartialJoinFunction, PartialJoinFunction> partialJoinFunctions = getOrCreatePartialJoinFunctions(joinOpSpec);
+
+    if (joinOpSpec.getLeftInputOpSpec().equals(prevOperatorSpec)) { // we got here from the left side of the join
+      return new PartialJoinOperatorImpl(joinOpSpec, /* isLeftSide */ true,
+          partialJoinFunctions.getLeft(), partialJoinFunctions.getRight(), config, context, clock);
+    } else { // we got here from the right side of the join
+      return new PartialJoinOperatorImpl(joinOpSpec, /* isLeftSide */ false,
+          partialJoinFunctions.getRight(), partialJoinFunctions.getLeft(), config, context, clock);
+    }
+  }
+
+  private Pair<PartialJoinFunction, PartialJoinFunction> getOrCreatePartialJoinFunctions(JoinOperatorSpec joinOpSpec) {
+    return joinFunctions.computeIfAbsent(joinOpSpec.getOpId(),
+        joinOpId -> Pair.of(createLeftJoinFn(joinOpSpec.getJoinFn()), createRightJoinFn(joinOpSpec.getJoinFn())));
+  }
+
+  private PartialJoinFunction<Object, Object, Object, Object> createLeftJoinFn(JoinFunction joinFn) {
+    return new PartialJoinFunction<Object, Object, Object, Object>() {
+      private KeyValueStore<Object, PartialJoinMessage<Object>> leftStreamState = new InternalInMemoryStore<>();
+
+      @Override
+      public Object apply(Object m, Object jm) {
+        return joinFn.apply(m, jm);
+      }
+
+      @Override
+      public Object getKey(Object message) {
+        return joinFn.getFirstKey(message);
+      }
+
+      @Override
+      public KeyValueStore<Object, PartialJoinMessage<Object>> getState() {
+        return leftStreamState;
+      }
+
+      @Override
+      public void init(Config config, TaskContext context) {
+        // user-defined joinFn should only be initialized once, so we do it only in left partial join function.
+        joinFn.init(config, context);
+      }
+
+      @Override
+      public void close() {
+        // joinFn#close() must only be called once, so we do it it only in left partial join function.
+        joinFn.close();
+      }
+    };
+  }
+
+  private PartialJoinFunction<Object, Object, Object, Object> createRightJoinFn(JoinFunction joinFn) {
+    return new PartialJoinFunction<Object, Object, Object, Object>() {
+      private KeyValueStore<Object, PartialJoinMessage<Object>> rightStreamState = new InternalInMemoryStore<>();
+
+      @Override
+      public Object apply(Object m, Object jm) {
+        return joinFn.apply(jm, m);
+      }
+
+      @Override
+      public Object getKey(Object message) {
+        return joinFn.getSecondKey(message);
+      }
+
+      @Override
+      public KeyValueStore<Object, PartialJoinMessage<Object>> getState() {
+        return rightStreamState;
+      }
+    };
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/c1c4289c/samza-core/src/main/java/org/apache/samza/operators/impl/OutputOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/OutputOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/OutputOperatorImpl.java
new file mode 100644
index 0000000..fe59b74
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/OutputOperatorImpl.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.impl;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.spec.OperatorSpec;
+import org.apache.samza.operators.spec.OutputOperatorSpec;
+import org.apache.samza.operators.spec.OutputStreamImpl;
+import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskContext;
+import org.apache.samza.task.TaskCoordinator;
+
+import java.util.Collection;
+import java.util.Collections;
+
+
+/**
+ * An operator that sends incoming messages to an output {@link SystemStream}.
+ */
+class OutputOperatorImpl<M> extends OperatorImpl<M, Void> {
+
+  private final OutputOperatorSpec<M> outputOpSpec;
+  private final OutputStreamImpl<?, ?, M> outputStream;
+
+  OutputOperatorImpl(OutputOperatorSpec<M> outputOpSpec, Config config, TaskContext context) {
+    this.outputOpSpec = outputOpSpec;
+    this.outputStream = outputOpSpec.getOutputStream();
+  }
+
+  @Override
+  protected void handleInit(Config config, TaskContext context) {
+  }
+
+  @Override
+  public Collection<Void> handleMessage(M message, MessageCollector collector,
+      TaskCoordinator coordinator) {
+    // TODO: SAMZA-1148 - need to find a way to directly pass in the serde class names
+    SystemStream systemStream = new SystemStream(outputStream.getStreamSpec().getSystemName(),
+        outputStream.getStreamSpec().getPhysicalName());
+    Object key = outputStream.getKeyExtractor().apply(message);
+    Object msg = outputStream.getMsgExtractor().apply(message);
+    collector.send(new OutgoingMessageEnvelope(systemStream, key, msg));
+    return Collections.emptyList();
+  }
+
+  @Override
+  protected void handleClose() {
+  }
+
+  @Override
+  protected OperatorSpec<M, Void> getOperatorSpec() {
+    return outputOpSpec;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/c1c4289c/samza-core/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java
index b00a2e9..ad66962 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java
@@ -22,8 +22,8 @@ import org.apache.samza.config.Config;
 import org.apache.samza.metrics.Counter;
 import org.apache.samza.operators.functions.PartialJoinFunction;
 import org.apache.samza.operators.functions.PartialJoinFunction.PartialJoinMessage;
+import org.apache.samza.operators.spec.JoinOperatorSpec;
 import org.apache.samza.operators.spec.OperatorSpec;
-import org.apache.samza.operators.spec.PartialJoinOperatorSpec;
 import org.apache.samza.storage.kv.Entry;
 import org.apache.samza.storage.kv.KeyValueIterator;
 import org.apache.samza.storage.kv.KeyValueStore;
@@ -31,8 +31,6 @@ import org.apache.samza.task.MessageCollector;
 import org.apache.samza.task.TaskContext;
 import org.apache.samza.task.TaskCoordinator;
 import org.apache.samza.util.Clock;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Collection;
@@ -40,18 +38,18 @@ import java.util.Collections;
 import java.util.List;
 
 /**
- * Implementation of a {@link PartialJoinOperatorSpec} that joins messages of type {@code M} in this stream
- * with buffered messages of type {@code JM} in the other stream.
+ * Implementation of one side of a {@link JoinOperatorSpec} that buffers and joins its input messages of
+ * type {@code M} with buffered input messages of type {@code JM} in the paired {@link PartialJoinOperatorImpl}.
  *
- * @param <M>  type of messages in the input stream
- * @param <JM>  type of messages in the stream to join with
- * @param <RM>  type of messages in the joined stream
+ * @param <K> the type of join key
+ * @param <M> the type of input messages on this side of the join
+ * @param <JM> the type of input message on the other side of the join
+ * @param <RM> the type of join result
  */
 class PartialJoinOperatorImpl<K, M, JM, RM> extends OperatorImpl<M, RM> {
 
-  private static final Logger LOGGER = LoggerFactory.getLogger(PartialJoinOperatorImpl.class);
-
-  private final PartialJoinOperatorSpec<K, M, JM, RM> partialJoinOpSpec;
+  private final JoinOperatorSpec<K, M, JM, RM> joinOpSpec;
+  private final boolean isLeftSide; // whether this operator impl is for the left side of the join
   private final PartialJoinFunction<K, M, JM, RM> thisPartialJoinFn;
   private final PartialJoinFunction<K, JM, M, RM> otherPartialJoinFn;
   private final long ttlMs;
@@ -59,19 +57,22 @@ class PartialJoinOperatorImpl<K, M, JM, RM> extends OperatorImpl<M, RM> {
 
   private Counter keysRemoved;
 
-  PartialJoinOperatorImpl(PartialJoinOperatorSpec<K, M, JM, RM> partialJoinOpSpec,
+  PartialJoinOperatorImpl(JoinOperatorSpec<K, M, JM, RM> joinOpSpec, boolean isLeftSide,
+      PartialJoinFunction<K, M, JM, RM> thisPartialJoinFn,
+      PartialJoinFunction<K, JM, M, RM> otherPartialJoinFn,
       Config config, TaskContext context, Clock clock) {
-    this.partialJoinOpSpec = partialJoinOpSpec;
-    this.thisPartialJoinFn = partialJoinOpSpec.getThisPartialJoinFn();
-    this.otherPartialJoinFn = partialJoinOpSpec.getOtherPartialJoinFn();
-    this.ttlMs = partialJoinOpSpec.getTtlMs();
+    this.joinOpSpec = joinOpSpec;
+    this.isLeftSide = isLeftSide;
+    this.thisPartialJoinFn = thisPartialJoinFn;
+    this.otherPartialJoinFn = otherPartialJoinFn;
+    this.ttlMs = joinOpSpec.getTtlMs();
     this.clock = clock;
   }
 
   @Override
   protected void handleInit(Config config, TaskContext context) {
     keysRemoved = context.getMetricsRegistry()
-        .newCounter(OperatorImpl.class.getName(), this.partialJoinOpSpec.getOpName() + "-keys-removed");
+        .newCounter(OperatorImpl.class.getName(), getOperatorName() + "-keys-removed");
     this.thisPartialJoinFn.init(config, context);
   }
 
@@ -116,8 +117,19 @@ class PartialJoinOperatorImpl<K, M, JM, RM> extends OperatorImpl<M, RM> {
     this.thisPartialJoinFn.close();
   }
 
+  protected OperatorSpec<M, RM> getOperatorSpec() {
+    return (OperatorSpec<M, RM>) joinOpSpec;
+  }
+
+  /**
+   * The name for this {@link PartialJoinOperatorImpl} that includes information about which
+   * side of the join it is for.
+   *
+   * @return the {@link PartialJoinOperatorImpl} name.
+   */
   @Override
-  protected OperatorSpec<RM> getOperatorSpec() {
-    return partialJoinOpSpec;
+  protected String getOperatorName() {
+    String side = isLeftSide ? "L" : "R";
+    return this.joinOpSpec.getOpName() + "-" + side;
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/c1c4289c/samza-core/src/main/java/org/apache/samza/operators/impl/RootOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/RootOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/RootOperatorImpl.java
deleted file mode 100644
index 45cb941..0000000
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/RootOperatorImpl.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.impl;
-
-import org.apache.samza.config.Config;
-import org.apache.samza.operators.MessageStreamImpl;
-import org.apache.samza.operators.spec.OperatorSpec;
-import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.TaskContext;
-import org.apache.samza.task.TaskCoordinator;
-
-import java.util.Collection;
-import java.util.Collections;
-
-
-/**
- * A no-op operator implementation that forwards incoming messages to all of its subscribers.
- * @param <M>  type of incoming messages
- */
-public final class RootOperatorImpl<M> extends OperatorImpl<M, M> {
-
-  @Override
-  protected void handleInit(Config config, TaskContext context) {
-  }
-
-  @Override
-  public Collection<M> handleMessage(M message, MessageCollector collector, TaskCoordinator coordinator) {
-    return Collections.singletonList(message);
-  }
-
-  @Override
-  protected void handleClose() {
-  }
-
-  // TODO: SAMZA-1221 - Change to InputOperatorSpec that also builds the message
-  @Override
-  protected OperatorSpec<M> getOperatorSpec() {
-    return new OperatorSpec<M>() {
-      @Override
-      public MessageStreamImpl<M> getNextStream() {
-        return null;
-      }
-
-      @Override
-      public OpCode getOpCode() {
-        return OpCode.INPUT;
-      }
-
-      @Override
-      public int getOpId() {
-        return -1;
-      }
-
-      @Override
-      public String getSourceLocation() {
-        return "";
-      }
-    };
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/c1c4289c/samza-core/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java
index 4f698f8..5dbe27f 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java
@@ -31,9 +31,9 @@ import java.util.Collections;
 
 
 /**
- * Implementation for {@link SinkOperatorSpec}
+ * An operator that sends incoming messages to an arbitrary output system using the provided {@link SinkFunction}.
  */
-class SinkOperatorImpl<M> extends OperatorImpl<M, M> {
+class SinkOperatorImpl<M> extends OperatorImpl<M, Void> {
 
   private final SinkOperatorSpec<M> sinkOpSpec;
   private final SinkFunction<M> sinkFn;
@@ -49,7 +49,7 @@ class SinkOperatorImpl<M> extends OperatorImpl<M, M> {
   }
 
   @Override
-  public Collection<M> handleMessage(M message, MessageCollector collector,
+  public Collection<Void> handleMessage(M message, MessageCollector collector,
       TaskCoordinator coordinator) {
     this.sinkFn.apply(message, collector, coordinator);
     // there should be no further chained operators since this is a terminal operator.
@@ -61,8 +61,7 @@ class SinkOperatorImpl<M> extends OperatorImpl<M, M> {
     this.sinkFn.close();
   }
 
-  @Override
-  protected OperatorSpec<M> getOperatorSpec() {
+  protected OperatorSpec<M, Void> getOperatorSpec() {
     return sinkOpSpec;
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/c1c4289c/samza-core/src/main/java/org/apache/samza/operators/impl/StreamOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/StreamOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/StreamOperatorImpl.java
index e720803..a51d5e6 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/StreamOperatorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/StreamOperatorImpl.java
@@ -30,10 +30,10 @@ import java.util.Collection;
 
 
 /**
- * A StreamOperator that accepts a 1:n transform function and applies it to each incoming message.
+ * A simple operator that accepts a 1:n transform function and applies it to each incoming message.
  *
- * @param <M>  type of message in the input stream
- * @param <RM>  type of message in the output stream
+ * @param <M>  the type of input message
+ * @param <RM>  the type of result
  */
 class StreamOperatorImpl<M, RM> extends OperatorImpl<M, RM> {
 
@@ -62,8 +62,7 @@ class StreamOperatorImpl<M, RM> extends OperatorImpl<M, RM> {
     this.transformFn.close();
   }
 
-  @Override
-  protected OperatorSpec<RM> getOperatorSpec() {
+  protected OperatorSpec<M, RM> getOperatorSpec() {
     return streamOpSpec;
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/c1c4289c/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java
index b258042..f9485f7 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java
@@ -21,7 +21,6 @@
 package org.apache.samza.operators.impl;
 
 import org.apache.samza.config.Config;
-import org.apache.samza.operators.WindowState;
 import org.apache.samza.operators.spec.OperatorSpec;
 import org.apache.samza.operators.spec.WindowOperatorSpec;
 import org.apache.samza.operators.triggers.FiringType;
@@ -154,7 +153,7 @@ public class WindowOperatorImpl<M, WK, WV> extends OperatorImpl<M, WindowPane<WK
   }
 
   @Override
-  protected OperatorSpec<WindowPane<WK, WV>> getOperatorSpec() {
+  protected OperatorSpec<M, WindowPane<WK, WV>> getOperatorSpec() {
     return windowOpSpec;
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/c1c4289c/samza-core/src/main/java/org/apache/samza/operators/impl/WindowState.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/WindowState.java b/samza-core/src/main/java/org/apache/samza/operators/impl/WindowState.java
new file mode 100644
index 0000000..4577a5c
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/WindowState.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.impl;
+
+/**
+ * Wraps the value stored for a particular {@link org.apache.samza.operators.windows.WindowKey} with additional metadata.
+ */
+class WindowState<WV> {
+
+  private final WV wv;
+  /**
+   * Time of the first message in the window
+   */
+  private final long earliestRecvTime;
+
+  WindowState(WV wv, long earliestRecvTime) {
+    this.wv = wv;
+    this.earliestRecvTime = earliestRecvTime;
+  }
+
+  WV getWindowValue() {
+    return wv;
+  }
+
+  long getEarliestTimestamp() {
+    return earliestRecvTime;
+  }
+
+  @Override
+  public String toString() {
+    return String.format("WindowState: {time=%d, value=%s}", earliestRecvTime, wv);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/c1c4289c/samza-core/src/main/java/org/apache/samza/operators/spec/InputOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/InputOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/InputOperatorSpec.java
new file mode 100644
index 0000000..6fbc3c1
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/InputOperatorSpec.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.spec;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.samza.system.StreamSpec;
+
+import java.util.function.BiFunction;
+
+/**
+ * The spec for an operator that receives incoming messages from an input stream
+ * and converts them to the input message.
+ *
+ * @param <K> the type of key in the incoming message
+ * @param <V> the type of message in the incoming message
+ * @param <M> the type of input message
+ */
+public class InputOperatorSpec<K, V, M> extends OperatorSpec<Pair<K, V>, M> {
+
+  private final StreamSpec streamSpec;
+  private final BiFunction<K, V, M> msgBuilder;
+
+  public InputOperatorSpec(StreamSpec streamSpec, BiFunction<K, V, M> msgBuilder, int opId) {
+    super(OpCode.INPUT, opId);
+    this.streamSpec = streamSpec;
+    this.msgBuilder = msgBuilder;
+  }
+
+  public StreamSpec getStreamSpec() {
+    return this.streamSpec;
+  }
+
+  public BiFunction<K, V, M> getMsgBuilder() {
+    return this.msgBuilder;
+  }
+}


[3/4] samza git commit: SAMZA-1221, SAMZA-1101: Internal cleanup for High-Level API implementation.

Posted by xi...@apache.org.
http://git-wip-us.apache.org/repos/asf/samza/blob/c1c4289c/samza-core/src/main/java/org/apache/samza/operators/spec/JoinOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/JoinOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/JoinOperatorSpec.java
new file mode 100644
index 0000000..16f59d7
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/JoinOperatorSpec.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.spec;
+
+import org.apache.samza.operators.functions.JoinFunction;
+
+
+/**
+ * The spec for the join operator that buffers messages from one stream and
+ * joins them with buffered messages from another stream.
+ *
+ * @param <K>  the type of join key
+ * @param <M>  the type of message in this stream
+ * @param <JM>  the type of message in the other stream
+ * @param <RM>  the type of join result
+ */
+public class JoinOperatorSpec<K, M, JM, RM> extends OperatorSpec<Object, RM> { // Object == M | JM
+
+  private final OperatorSpec<?, M> leftInputOpSpec;
+  private final OperatorSpec<?, JM> rightInputOpSpec;
+  private final JoinFunction<K, M, JM, RM> joinFn;
+  private final long ttlMs;
+
+  /**
+   * Default constructor for a {@link JoinOperatorSpec}.
+   *
+   * @param leftInputOpSpec  the operator spec for the stream on the left side of the join
+   * @param rightInputOpSpec  the operator spec for the stream on the right side of the join
+   * @param joinFn  the user-defined join function to get join keys and results
+   * @param ttlMs  the ttl in ms for retaining messages in each stream
+   * @param opId  the unique ID for this operator
+   */
+  JoinOperatorSpec(OperatorSpec<?, M> leftInputOpSpec, OperatorSpec<?, JM> rightInputOpSpec,
+      JoinFunction<K, M, JM, RM> joinFn, long ttlMs, int opId) {
+    super(OpCode.JOIN, opId);
+    this.leftInputOpSpec = leftInputOpSpec;
+    this.rightInputOpSpec = rightInputOpSpec;
+    this.joinFn = joinFn;
+    this.ttlMs = ttlMs;
+  }
+
+  public OperatorSpec getLeftInputOpSpec() {
+    return leftInputOpSpec;
+  }
+
+  public OperatorSpec getRightInputOpSpec() {
+    return rightInputOpSpec;
+  }
+
+  public JoinFunction<K, M, JM, RM> getJoinFn() {
+    return this.joinFn;
+  }
+
+  public long getTtlMs() {
+    return ttlMs;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/c1c4289c/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java
index 3ea52ca..f64e123 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java
@@ -19,19 +19,23 @@
 package org.apache.samza.operators.spec;
 
 import org.apache.samza.annotation.InterfaceStability;
-import org.apache.samza.operators.MessageStreamImpl;
 
+import java.util.Collection;
+import java.util.LinkedHashSet;
+import java.util.Set;
 
 /**
  * A stream operator specification that holds all the information required to transform 
- * the input {@link MessageStreamImpl} and produce the output {@link MessageStreamImpl}.
+ * the input {@link org.apache.samza.operators.MessageStreamImpl} and produce the output
+ * {@link org.apache.samza.operators.MessageStreamImpl}.
  *
+ * @param <M>  the type of input message to the operator
  * @param <OM>  the type of output message from the operator
  */
 @InterfaceStability.Unstable
-public interface OperatorSpec<OM> {
+public abstract class OperatorSpec<M, OM> {
 
-  enum OpCode {
+  public enum OpCode {
     INPUT,
     MAP,
     FLAT_MAP,
@@ -41,39 +45,77 @@ public interface OperatorSpec<OM> {
     JOIN,
     WINDOW,
     MERGE,
-    PARTITION_BY
+    PARTITION_BY,
+    OUTPUT
   }
 
+  private final int opId;
+  private final OpCode opCode;
+  private StackTraceElement[] creationStackTrace;
+
   /**
-   * Get the next {@link MessageStreamImpl} that receives the transformed messages produced by this operator.
-   * @return  the next {@link MessageStreamImpl}
+   * The set of operators that consume the messages produced from this operator.
+   * <p>
+   * We use a LinkedHashSet since we need deterministic ordering in initializing/closing operators.
    */
-  MessageStreamImpl<OM> getNextStream();
+  private final Set<OperatorSpec<OM, ?>> nextOperatorSpecs = new LinkedHashSet<>();
+
+  public OperatorSpec(OpCode opCode, int opId) {
+    this.opCode = opCode;
+    this.opId = opId;
+    this.creationStackTrace = Thread.currentThread().getStackTrace();
+  }
+
+  /**
+   * Register the next operator spec in the chain that this operator should propagate its output to.
+   * @param nextOperatorSpec  the next operator in the chain.
+   */
+  public void registerNextOperatorSpec(OperatorSpec<OM, ?> nextOperatorSpec) {
+    nextOperatorSpecs.add(nextOperatorSpec);
+  }
+
+  public Collection<OperatorSpec<OM, ?>> getRegisteredOperatorSpecs() {
+    return nextOperatorSpecs;
+  }
 
   /**
    * Get the {@link OpCode} for this operator.
    * @return  the {@link OpCode} for this operator
    */
-  OpCode getOpCode();
+  public final OpCode getOpCode() {
+    return this.opCode;
+  }
 
   /**
    * Get the unique ID of this operator in the {@link org.apache.samza.operators.StreamGraph}.
    * @return  the unique operator ID
    */
-  int getOpId();
+  public final int getOpId() {
+    return this.opId;
+  }
 
   /**
-   * Return the user source code location that creates the operator
-   * @return source location
+   * Get the user source code location that created the operator.
+   * @return  source code location for the operator
    */
-  String getSourceLocation();
+  public final String getSourceLocation() {
+    // The stack trace for most operators looks like:
+    // [0] Thread.getStackTrace()
+    // [1] OperatorSpec.init<>()
+    // [2] SomeOperatorSpec.<init>()
+    // [3] OperatorSpecs.createSomeOperatorSpec()
+    // [4] MessageStreamImpl.someOperator()
+    // [5] User code that calls [4]
+    // we are interested in [5] here
+    StackTraceElement element = this.creationStackTrace[5];
+    return String.format("%s:%s", element.getFileName(), element.getLineNumber());
+  }
 
   /**
    * Get the name for this operator based on its opCode and opId.
    * @return  the name for this operator
    */
-  default String getOpName() {
+  public final String getOpName() {
     return String.format("%s-%s", getOpCode().name().toLowerCase(), getOpId());
   }
-
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/c1c4289c/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java
index 66e2c58..ed5fc8f 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java
@@ -20,14 +20,11 @@
 package org.apache.samza.operators.spec;
 
 import org.apache.samza.config.Config;
-import org.apache.samza.operators.MessageStreamImpl;
 import org.apache.samza.operators.functions.FilterFunction;
 import org.apache.samza.operators.functions.FlatMapFunction;
+import org.apache.samza.operators.functions.JoinFunction;
 import org.apache.samza.operators.functions.MapFunction;
-import org.apache.samza.operators.functions.PartialJoinFunction;
 import org.apache.samza.operators.functions.SinkFunction;
-import org.apache.samza.operators.stream.OutputStreamInternal;
-import org.apache.samza.operators.windows.WindowPane;
 import org.apache.samza.operators.windows.internal.WindowInternal;
 import org.apache.samza.task.TaskContext;
 
@@ -46,14 +43,13 @@ public class OperatorSpecs {
    * Creates a {@link StreamOperatorSpec} for {@link MapFunction}
    *
    * @param mapFn  the map function
-   * @param nextStream  the output {@link MessageStreamImpl} to send messages to
    * @param opId  the unique ID of the operator
    * @param <M>  type of input message
    * @param <OM>  type of output message
    * @return  the {@link StreamOperatorSpec}
    */
   public static <M, OM> StreamOperatorSpec<M, OM> createMapOperatorSpec(
-      MapFunction<? super M, ? extends OM> mapFn, MessageStreamImpl<OM> nextStream, int opId) {
+      MapFunction<? super M, ? extends OM> mapFn, int opId) {
     return new StreamOperatorSpec<>(new FlatMapFunction<M, OM>() {
       @Override
       public Collection<OM> apply(M message) {
@@ -76,20 +72,19 @@ public class OperatorSpecs {
       public void close() {
         mapFn.close();
       }
-    }, nextStream, OperatorSpec.OpCode.MAP, opId);
+    }, OperatorSpec.OpCode.MAP, opId);
   }
 
   /**
    * Creates a {@link StreamOperatorSpec} for {@link FilterFunction}
    *
    * @param filterFn  the transformation function
-   * @param nextStream  the output {@link MessageStreamImpl} to send messages to
    * @param opId  the unique ID of the operator
    * @param <M>  type of input message
    * @return  the {@link StreamOperatorSpec}
    */
   public static <M> StreamOperatorSpec<M, M> createFilterOperatorSpec(
-      FilterFunction<? super M> filterFn, MessageStreamImpl<M> nextStream, int opId) {
+      FilterFunction<? super M> filterFn, int opId) {
     return new StreamOperatorSpec<>(new FlatMapFunction<M, M>() {
       @Override
       public Collection<M> apply(M message) {
@@ -111,23 +106,21 @@ public class OperatorSpecs {
       public void close() {
         filterFn.close();
       }
-
-    }, nextStream, OperatorSpec.OpCode.FILTER, opId);
+    }, OperatorSpec.OpCode.FILTER, opId);
   }
 
   /**
-   * Creates a {@link StreamOperatorSpec}.
+   * Creates a {@link StreamOperatorSpec} for {@link FlatMapFunction}.
    *
-   * @param transformFn  the transformation function
-   * @param nextStream  the output {@link MessageStreamImpl} to send messages to
+   * @param flatMapFn  the transformation function
    * @param opId  the unique ID of the operator
    * @param <M>  type of input message
    * @param <OM>  type of output message
    * @return  the {@link StreamOperatorSpec}
    */
-  public static <M, OM> StreamOperatorSpec<M, OM> createStreamOperatorSpec(
-      FlatMapFunction<? super M, ? extends OM> transformFn, MessageStreamImpl<OM> nextStream, int opId) {
-    return new StreamOperatorSpec<>((FlatMapFunction<M, OM>) transformFn, nextStream, OperatorSpec.OpCode.FLAT_MAP, opId);
+  public static <M, OM> StreamOperatorSpec<M, OM> createFlatMapOperatorSpec(
+      FlatMapFunction<? super M, ? extends OM> flatMapFn, int opId) {
+    return new StreamOperatorSpec<>((FlatMapFunction<M, OM>) flatMapFn, OperatorSpec.OpCode.FLAT_MAP, opId);
   }
 
   /**
@@ -139,91 +132,89 @@ public class OperatorSpecs {
    * @return  the {@link SinkOperatorSpec} for the sink operator
    */
   public static <M> SinkOperatorSpec<M> createSinkOperatorSpec(SinkFunction<? super M> sinkFn, int opId) {
-    return new SinkOperatorSpec<>((SinkFunction<M>) sinkFn, OperatorSpec.OpCode.SINK, opId);
+    return new SinkOperatorSpec<>((SinkFunction<M>) sinkFn, opId);
   }
 
   /**
-   * Creates a {@link SinkOperatorSpec} for the sendTo operator.
+   * Creates a {@link OutputOperatorSpec} for the sendTo operator.
    *
-   * @param outputStream  the {@link OutputStreamInternal} to send messages to
+   * @param outputStream  the {@link OutputStreamImpl} to send messages to
    * @param opId  the unique ID of the operator
    * @param <K> the type of key in the outgoing message
    * @param <V> the type of message in the outgoing message
-   * @param <M> the type of message in the {@link OutputStreamInternal}
-   * @return  the {@link SinkOperatorSpec} for the sendTo operator
+   * @param <M> the type of message in the {@link OutputStreamImpl}
+   * @return  the {@link OutputOperatorSpec} for the sendTo operator
    */
-  public static <K, V, M> SinkOperatorSpec<M> createSendToOperatorSpec(
-      OutputStreamInternal<K, V, M> outputStream, int opId) {
-    return new SinkOperatorSpec<>(outputStream, OperatorSpec.OpCode.SEND_TO, opId);
+  public static <K, V, M> OutputOperatorSpec<M> createSendToOperatorSpec(
+      OutputStreamImpl<K, V, M> outputStream, int opId) {
+    return new OutputOperatorSpec<>(outputStream, OperatorSpec.OpCode.SEND_TO, opId);
   }
 
   /**
-   * Creates a {@link SinkOperatorSpec} for the partitionBy operator.
+   * Creates a {@link OutputOperatorSpec} for the partitionBy operator.
    *
-   * @param outputStream  the {@link OutputStreamInternal} to send messages to
+   * @param outputStream  the {@link OutputStreamImpl} to send messages to
    * @param opId  the unique ID of the operator
    * @param <K> the type of key in the outgoing message
    * @param <V> the type of message in the outgoing message
-   * @param <M> the type of message in the {@link OutputStreamInternal}
-   * @return  the {@link SinkOperatorSpec} for the partitionBy operator
+   * @param <M> the type of message in the {@link OutputStreamImpl}
+   * @return  the {@link OutputOperatorSpec} for the partitionBy operator
    */
-  public static <K, V, M> SinkOperatorSpec<M> createPartitionByOperatorSpec(
-      OutputStreamInternal<K, V, M> outputStream, int opId) {
-    return new SinkOperatorSpec<>(outputStream, OperatorSpec.OpCode.PARTITION_BY, opId);
+  public static <K, V, M> OutputOperatorSpec<M> createPartitionByOperatorSpec(
+      OutputStreamImpl<K, V, M> outputStream, int opId) {
+    return new OutputOperatorSpec<>(outputStream, OperatorSpec.OpCode.PARTITION_BY, opId);
   }
 
   /**
    * Creates a {@link WindowOperatorSpec}.
    *
    * @param window  the description of the window.
-   * @param nextStream  the output {@link MessageStreamImpl} to send messages to
    * @param opId  the unique ID of the operator
    * @param <M>  the type of input message
-   * @param <WK>  the type of key in the {@link WindowPane}
+   * @param <WK>  the type of key in the window output
    * @param <WV>  the type of value in the window
    * @return  the {@link WindowOperatorSpec}
    */
 
   public static <M, WK, WV> WindowOperatorSpec<M, WK, WV> createWindowOperatorSpec(
-      WindowInternal<M, WK, WV> window, MessageStreamImpl<WindowPane<WK, WV>> nextStream, int opId) {
-    return new WindowOperatorSpec<>(window, nextStream, opId);
+      WindowInternal<M, WK, WV> window, int opId) {
+    return new WindowOperatorSpec<>(window, opId);
   }
 
   /**
-   * Creates a {@link PartialJoinOperatorSpec}.
+   * Creates a {@link JoinOperatorSpec}.
    *
-   * @param thisPartialJoinFn  the partial join function for this message stream
-   * @param otherPartialJoinFn  the partial join function for the other message stream
+   * @param leftInputOpSpec  the operator spec for the stream on the left side of the join
+   * @param rightInputOpSpec  the operator spec for the stream on the right side of the join
+   * @param joinFn  the user-defined join function to get join keys and results
    * @param ttlMs  the ttl in ms for retaining messages in each stream
-   * @param nextStream  the output {@link MessageStreamImpl} to send messages to
    * @param opId  the unique ID of the operator
    * @param <K>  the type of join key
    * @param <M>  the type of input message
    * @param <JM>  the type of message in the other join stream
-   * @param <RM>  the type of message in the join output
-   * @return  the {@link PartialJoinOperatorSpec}
+   * @param <RM>  the type of join result
+   * @return  the {@link JoinOperatorSpec}
    */
-  public static <K, M, JM, RM> PartialJoinOperatorSpec<K, M, JM, RM> createPartialJoinOperatorSpec(
-      PartialJoinFunction<K, M, JM, RM> thisPartialJoinFn, PartialJoinFunction<K, JM, M, RM> otherPartialJoinFn,
-      long ttlMs, MessageStreamImpl<RM> nextStream, int opId) {
-    return new PartialJoinOperatorSpec<>(thisPartialJoinFn, otherPartialJoinFn, ttlMs, nextStream, opId);
+  public static <K, M, JM, RM> JoinOperatorSpec<K, M, JM, RM> createJoinOperatorSpec(
+      OperatorSpec<?, M> leftInputOpSpec, OperatorSpec<?, JM> rightInputOpSpec,
+      JoinFunction<K, M, JM, RM> joinFn, long ttlMs, int opId) {
+    return new JoinOperatorSpec<>(leftInputOpSpec, rightInputOpSpec, joinFn, ttlMs, opId);
   }
 
   /**
    * Creates a {@link StreamOperatorSpec} with a merger function.
    *
-   * @param nextStream  the output {@link MessageStreamImpl} to send messages to
    * @param opId  the unique ID of the operator
    * @param <M>  the type of input message
    * @return  the {@link StreamOperatorSpec} for the merge
    */
-  public static <M> StreamOperatorSpec<M, M> createMergeOperatorSpec(MessageStreamImpl<M> nextStream, int opId) {
+  public static <M> StreamOperatorSpec<M, M> createMergeOperatorSpec(int opId) {
     return new StreamOperatorSpec<>(message ->
         new ArrayList<M>() {
           {
             this.add(message);
           }
         },
-        nextStream, OperatorSpec.OpCode.MERGE, opId);
+        OperatorSpec.OpCode.MERGE, opId);
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/c1c4289c/samza-core/src/main/java/org/apache/samza/operators/spec/OutputOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/OutputOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/OutputOperatorSpec.java
new file mode 100644
index 0000000..e6767ec
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/OutputOperatorSpec.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.spec;
+
+
+/**
+ * The spec for an operator that outputs a {@link org.apache.samza.operators.MessageStream} to a
+ * {@link org.apache.samza.system.SystemStream}.
+ * <p>
+ * This is a terminal operator and does not allow further operator chaining.
+ *
+ * @param <M>  the type of input message
+ */
+public class OutputOperatorSpec<M> extends OperatorSpec<M, Void> {
+
+  private OutputStreamImpl<?, ?, M> outputStream;
+
+
+  /**
+   * Constructs an {@link OutputOperatorSpec} to send messages to the provided {@code outStream}
+   *
+   * @param outputStream  the {@link OutputStreamImpl} to send messages to
+   * @param opCode the specific {@link OpCode} for this {@link SinkOperatorSpec}.
+   *               It could be {@link OpCode#SEND_TO}, or {@link OpCode#PARTITION_BY}
+   * @param opId  the unique ID of this {@link SinkOperatorSpec} in the graph
+   */
+  OutputOperatorSpec(OutputStreamImpl<?, ?, M> outputStream, OperatorSpec.OpCode opCode, int opId) {
+    super(opCode, opId);
+    this.outputStream = outputStream;
+  }
+
+  /**
+   * The {@link OutputStreamImpl} that this operator is sending its output to.
+   * @return the {@link OutputStreamImpl} for this operator if any, else null.
+   */
+  public OutputStreamImpl<?, ?, M> getOutputStream() {
+    return this.outputStream;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/c1c4289c/samza-core/src/main/java/org/apache/samza/operators/spec/OutputStreamImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/OutputStreamImpl.java b/samza-core/src/main/java/org/apache/samza/operators/spec/OutputStreamImpl.java
new file mode 100644
index 0000000..5506378
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/OutputStreamImpl.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.spec;
+
+import org.apache.samza.operators.OutputStream;
+import org.apache.samza.system.StreamSpec;
+
+import java.util.function.Function;
+
+public class OutputStreamImpl<K, V, M> implements OutputStream<K, V, M> {
+
+  private final StreamSpec streamSpec;
+  private final Function<M, K> keyExtractor;
+  private final Function<M, V> msgExtractor;
+
+  public OutputStreamImpl(StreamSpec streamSpec,
+      Function<M, K> keyExtractor, Function<M, V> msgExtractor) {
+    this.streamSpec = streamSpec;
+    this.keyExtractor = keyExtractor;
+    this.msgExtractor = msgExtractor;
+  }
+
+  public StreamSpec getStreamSpec() {
+    return streamSpec;
+  }
+
+  public Function<M, K> getKeyExtractor() {
+    return keyExtractor;
+  }
+
+  public Function<M, V> getMsgExtractor() {
+    return msgExtractor;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/c1c4289c/samza-core/src/main/java/org/apache/samza/operators/spec/PartialJoinOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/PartialJoinOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/PartialJoinOperatorSpec.java
deleted file mode 100644
index 92b4170..0000000
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/PartialJoinOperatorSpec.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.spec;
-
-import org.apache.samza.operators.MessageStreamImpl;
-import org.apache.samza.operators.functions.PartialJoinFunction;
-import org.apache.samza.operators.util.OperatorJsonUtils;
-
-
-/**
- * Spec for the partial join operator that takes messages from one input stream, joins with buffered
- * messages from another stream, and produces join results to an output {@link MessageStreamImpl}.
- *
- * @param <K>  the type of join key
- * @param <M>  the type of input message
- * @param <JM>  the type of message in the other join stream
- * @param <RM>  the type of message in the join output stream
- */
-public class PartialJoinOperatorSpec<K, M, JM, RM> implements OperatorSpec<RM> {
-
-  private final PartialJoinFunction<K, M, JM, RM> thisPartialJoinFn;
-  private final PartialJoinFunction<K, JM, M, RM> otherPartialJoinFn;
-  private final long ttlMs;
-  private final MessageStreamImpl<RM> nextStream;
-  private final int opId;
-  private final String sourceLocation;
-
-  /**
-   * Default constructor for a {@link PartialJoinOperatorSpec}.
-   *
-   * @param thisPartialJoinFn  partial join function that provides state and the join logic for input messages of
-   *                           type {@code M} in this stream
-   * @param otherPartialJoinFn  partial join function that provides state for input messages of type {@code JM}
-   *                            in the other stream
-   * @param ttlMs  the ttl in ms for retaining messages in each stream
-   * @param nextStream  the output {@link MessageStreamImpl} containing the messages produced from this operator
-   * @param opId  the unique ID for this operator
-   */
-  PartialJoinOperatorSpec(PartialJoinFunction<K, M, JM, RM> thisPartialJoinFn,
-      PartialJoinFunction<K, JM, M, RM> otherPartialJoinFn, long ttlMs,
-      MessageStreamImpl<RM> nextStream, int opId) {
-    this.thisPartialJoinFn = thisPartialJoinFn;
-    this.otherPartialJoinFn = otherPartialJoinFn;
-    this.ttlMs = ttlMs;
-    this.nextStream = nextStream;
-    this.opId = opId;
-    this.sourceLocation = OperatorJsonUtils.getSourceLocation();
-  }
-
-  @Override
-  public MessageStreamImpl<RM> getNextStream() {
-    return this.nextStream;
-  }
-
-  public PartialJoinFunction<K, M, JM, RM> getThisPartialJoinFn() {
-    return this.thisPartialJoinFn;
-  }
-
-  public PartialJoinFunction<K, JM, M, RM> getOtherPartialJoinFn() {
-    return this.otherPartialJoinFn;
-  }
-
-  public long getTtlMs() {
-    return ttlMs;
-  }
-
-  @Override
-  public OperatorSpec.OpCode getOpCode() {
-    return OpCode.JOIN;
-  }
-
-  @Override
-  public int getOpId() {
-    return this.opId;
-  }
-
-  @Override
-  public String getSourceLocation() {
-    return sourceLocation;
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/c1c4289c/samza-core/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java
index afdd6b9..2b55d95 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java
@@ -18,29 +18,19 @@
  */
 package org.apache.samza.operators.spec;
 
-import org.apache.samza.operators.MessageStreamImpl;
 import org.apache.samza.operators.functions.SinkFunction;
-import org.apache.samza.operators.stream.OutputStreamInternal;
-import org.apache.samza.operators.util.OperatorJsonUtils;
-import org.apache.samza.system.OutgoingMessageEnvelope;
-import org.apache.samza.system.SystemStream;
-import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.TaskCoordinator;
 
 
 /**
- * The spec for an operator that outputs a {@link MessageStreamImpl} to an external system.
+ * The spec for an operator that outputs a stream to an arbitrary external system.
+ * <p>
  * This is a terminal operator and does not allow further operator chaining.
  *
  * @param <M>  the type of input message
  */
-public class SinkOperatorSpec<M> implements OperatorSpec {
+public class SinkOperatorSpec<M> extends OperatorSpec<M, Void> {
 
   private final SinkFunction<M> sinkFn;
-  private OutputStreamInternal<?, ?, M> outputStream; // may be null
-  private final OperatorSpec.OpCode opCode;
-  private final int opId;
-  private final String sourceLocation;
 
   /**
    * Constructs a {@link SinkOperatorSpec} with a user defined {@link SinkFunction}.
@@ -48,79 +38,14 @@ public class SinkOperatorSpec<M> implements OperatorSpec {
    * @param sinkFn  a user defined {@link SinkFunction} that will be called with the output message,
    *                the output {@link org.apache.samza.task.MessageCollector} and the
    *                {@link org.apache.samza.task.TaskCoordinator}.
-   * @param opCode  the specific {@link OpCode} for this {@link SinkOperatorSpec}.
-   *                It could be {@link OpCode#SINK}, {@link OpCode#SEND_TO}, or {@link OpCode#PARTITION_BY}.
    * @param opId  the unique ID of this {@link OperatorSpec} in the graph
    */
-  SinkOperatorSpec(SinkFunction<M> sinkFn, OperatorSpec.OpCode opCode, int opId) {
+  SinkOperatorSpec(SinkFunction<M> sinkFn, int opId) {
+    super(OpCode.SINK, opId);
     this.sinkFn = sinkFn;
-    this.opCode = opCode;
-    this.opId = opId;
-    this.sourceLocation = OperatorJsonUtils.getSourceLocation();
-  }
-
-  /**
-   * Constructs a {@link SinkOperatorSpec} to send messages to the provided {@code outStream}
-   * @param outputStream  the {@link OutputStreamInternal} to send messages to
-   * @param opCode the specific {@link OpCode} for this {@link SinkOperatorSpec}.
-   *               It could be {@link OpCode#SINK}, {@link OpCode#SEND_TO}, or {@link OpCode#PARTITION_BY}
-   * @param opId  the unique ID of this {@link SinkOperatorSpec} in the graph
-   */
-  SinkOperatorSpec(OutputStreamInternal<?, ?, M> outputStream, OperatorSpec.OpCode opCode, int opId) {
-    this(createSinkFn(outputStream), opCode, opId);
-    this.outputStream = outputStream;
-  }
-
-  /**
-   * This is a terminal operator and doesn't allow further operator chaining.
-   * @return  null
-   */
-  @Override
-  public MessageStreamImpl<M> getNextStream() {
-    return null;
-  }
-
-  /**
-   * The {@link OutputStreamInternal} that this operator is sending its output to.
-   * @return the {@link OutputStreamInternal} for this operator if any, else null.
-   */
-  public OutputStreamInternal<?, ?, M> getOutputStream() {
-    return this.outputStream;
   }
 
   public SinkFunction<M> getSinkFn() {
     return this.sinkFn;
   }
-
-  @Override
-  public OperatorSpec.OpCode getOpCode() {
-    return this.opCode;
-  }
-
-  @Override
-  public int getOpId() {
-    return this.opId;
-  }
-
-  @Override
-  public String getSourceLocation() {
-    return sourceLocation;
-  }
-
-  /**
-   * Creates a {@link SinkFunction} to send messages to the provided {@code output}.
-   * @param outputStream  the {@link OutputStreamInternal} to send messages to
-   * @param <M>  the type of input message
-   * @return  a {@link SinkFunction} that sends messages to the provided {@code output}
-   */
-  private static <M> SinkFunction<M> createSinkFn(OutputStreamInternal<?, ?, M> outputStream) {
-    return (M message, MessageCollector mc, TaskCoordinator tc) -> {
-      // TODO: SAMZA-1148 - need to find a way to directly pass in the serde class names
-      SystemStream systemStream = new SystemStream(outputStream.getStreamSpec().getSystemName(),
-          outputStream.getStreamSpec().getPhysicalName());
-      Object key = outputStream.getKeyExtractor().apply(message);
-      Object msg = outputStream.getMsgExtractor().apply(message);
-      mc.send(new OutgoingMessageEnvelope(systemStream, key, msg));
-    };
-  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/c1c4289c/samza-core/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java
index c53efae..1f2f683 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java
@@ -18,63 +18,32 @@
  */
 package org.apache.samza.operators.spec;
 
-import org.apache.samza.operators.MessageStreamImpl;
 import org.apache.samza.operators.functions.FlatMapFunction;
-import org.apache.samza.operators.util.OperatorJsonUtils;
 
 
 /**
- * The spec for a linear stream operator that outputs 0 or more messages for each input message.
+ * The spec for a simple stream operator that outputs 0 or more messages for each input message.
  *
  * @param <M>  the type of input message
  * @param <OM>  the type of output message
  */
-public class StreamOperatorSpec<M, OM> implements OperatorSpec<OM> {
+public class StreamOperatorSpec<M, OM> extends OperatorSpec<M, OM> {
 
   private final FlatMapFunction<M, OM> transformFn;
-  private final MessageStreamImpl<OM> nextStream;
-  private final OperatorSpec.OpCode opCode;
-  private final int opId;
-  private final String sourceLocation;
 
   /**
-   * Constructor for a {@link StreamOperatorSpec} that accepts an output {@link MessageStreamImpl}.
+   * Constructor for a {@link StreamOperatorSpec}.
    *
    * @param transformFn  the transformation function
-   * @param nextStream  the output {@link MessageStreamImpl} containing the messages produced from this operator
    * @param opCode  the {@link OpCode} for this {@link StreamOperatorSpec}
-   * @param opId  the unique id for this {@link StreamOperatorSpec} in a {@link org.apache.samza.operators.StreamGraph}
+   * @param opId  the unique ID for this {@link StreamOperatorSpec}
    */
-  StreamOperatorSpec(FlatMapFunction<M, OM> transformFn, MessageStreamImpl<OM> nextStream,
-      OperatorSpec.OpCode opCode, int opId) {
+  StreamOperatorSpec(FlatMapFunction<M, OM> transformFn, OperatorSpec.OpCode opCode, int opId) {
+    super(opCode, opId);
     this.transformFn = transformFn;
-    this.nextStream = nextStream;
-    this.opCode = opCode;
-    this.opId = opId;
-    this.sourceLocation = OperatorJsonUtils.getSourceLocation();
-  }
-
-  @Override
-  public MessageStreamImpl<OM> getNextStream() {
-    return this.nextStream;
   }
 
   public FlatMapFunction<M, OM> getTransformFn() {
     return this.transformFn;
   }
-
-  @Override
-  public OperatorSpec.OpCode getOpCode() {
-    return this.opCode;
-  }
-
-  @Override
-  public int getOpId() {
-    return this.opId;
-  }
-
-  @Override
-  public String getSourceLocation() {
-    return sourceLocation;
-  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/c1c4289c/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java
index 3c2be0a..0937499 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java
@@ -19,13 +19,11 @@
 
 package org.apache.samza.operators.spec;
 
-import org.apache.samza.operators.MessageStreamImpl;
 import org.apache.samza.operators.triggers.AnyTrigger;
 import org.apache.samza.operators.triggers.RepeatingTrigger;
 import org.apache.samza.operators.triggers.TimeBasedTrigger;
 import org.apache.samza.operators.triggers.Trigger;
 import org.apache.samza.operators.util.MathUtils;
-import org.apache.samza.operators.util.OperatorJsonUtils;
 import org.apache.samza.operators.windows.WindowPane;
 import org.apache.samza.operators.windows.internal.WindowInternal;
 import org.slf4j.Logger;
@@ -37,58 +35,32 @@ import java.util.stream.Collectors;
 
 
 /**
- * Default window operator spec object
+ * The spec for an operator that groups messages into finite windows for processing
  *
  * @param <M>  the type of input message to the window
  * @param <WK>  the type of key of the window
  * @param <WV>  the type of aggregated value in the window output {@link WindowPane}
  */
-public class WindowOperatorSpec<M, WK, WV> implements OperatorSpec<WindowPane<WK, WV>> {
+public class WindowOperatorSpec<M, WK, WV> extends OperatorSpec<M, WindowPane<WK, WV>> {
 
   private static final Logger LOG = LoggerFactory.getLogger(WindowOperatorSpec.class);
   private final WindowInternal<M, WK, WV> window;
-  private final MessageStreamImpl<WindowPane<WK, WV>> nextStream;
-  private final int opId;
-  private final String sourceLocation;
 
   /**
    * Constructor for {@link WindowOperatorSpec}.
    *
    * @param window  the window function
-   * @param nextStream  the output {@link MessageStreamImpl} containing the messages produced from this operator
    * @param opId  auto-generated unique ID of this operator
    */
-  WindowOperatorSpec(WindowInternal<M, WK, WV> window, MessageStreamImpl<WindowPane<WK, WV>> nextStream, int opId) {
-    this.nextStream = nextStream;
+  WindowOperatorSpec(WindowInternal<M, WK, WV> window, int opId) {
+    super(OpCode.WINDOW, opId);
     this.window = window;
-    this.opId = opId;
-    this.sourceLocation = OperatorJsonUtils.getSourceLocation();
-  }
-
-  @Override
-  public MessageStreamImpl<WindowPane<WK, WV>> getNextStream() {
-    return this.nextStream;
   }
 
   public WindowInternal<M, WK, WV> getWindow() {
     return window;
   }
 
-  @Override
-  public OpCode getOpCode() {
-    return OpCode.WINDOW;
-  }
-
-  @Override
-  public int getOpId() {
-    return this.opId;
-  }
-
-  @Override
-  public String getSourceLocation() {
-    return sourceLocation;
-  }
-
   /**
    * Get the default triggering interval for this {@link WindowOperatorSpec}
    *

http://git-wip-us.apache.org/repos/asf/samza/blob/c1c4289c/samza-core/src/main/java/org/apache/samza/operators/stream/InputStreamInternal.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/stream/InputStreamInternal.java b/samza-core/src/main/java/org/apache/samza/operators/stream/InputStreamInternal.java
deleted file mode 100644
index e67b326..0000000
--- a/samza-core/src/main/java/org/apache/samza/operators/stream/InputStreamInternal.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.stream;
-
-import org.apache.samza.annotation.InterfaceStability;
-import org.apache.samza.operators.MessageStream;
-import org.apache.samza.system.StreamSpec;
-
-import java.util.function.BiFunction;
-
-/**
- * Internal representation of an input stream.
- *
- * @param <M> the type of messages in the input stream
- */
-@InterfaceStability.Unstable
-public interface InputStreamInternal<K, V, M> extends MessageStream<M> {
-
-  StreamSpec getStreamSpec();
-
-  BiFunction<K, V, M> getMsgBuilder();
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/c1c4289c/samza-core/src/main/java/org/apache/samza/operators/stream/InputStreamInternalImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/stream/InputStreamInternalImpl.java b/samza-core/src/main/java/org/apache/samza/operators/stream/InputStreamInternalImpl.java
deleted file mode 100644
index c4337d0..0000000
--- a/samza-core/src/main/java/org/apache/samza/operators/stream/InputStreamInternalImpl.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.stream;
-
-import org.apache.samza.operators.MessageStreamImpl;
-import org.apache.samza.operators.StreamGraphImpl;
-import org.apache.samza.system.StreamSpec;
-
-import java.util.function.BiFunction;
-
-public class InputStreamInternalImpl<K, V, M> extends MessageStreamImpl<M> implements InputStreamInternal<K, V, M> {
-
-  private final StreamSpec streamSpec;
-  private final BiFunction<K, V, M> msgBuilder;
-
-  public InputStreamInternalImpl(StreamGraphImpl graph, StreamSpec streamSpec, BiFunction<K, V, M> msgBuilder) {
-    super(graph);
-    this.streamSpec = streamSpec;
-    this.msgBuilder = msgBuilder;
-  }
-
-  public StreamSpec getStreamSpec() {
-    return this.streamSpec;
-  }
-
-  public BiFunction<K, V, M> getMsgBuilder() {
-    return this.msgBuilder;
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/c1c4289c/samza-core/src/main/java/org/apache/samza/operators/stream/IntermediateMessageStreamImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/stream/IntermediateMessageStreamImpl.java b/samza-core/src/main/java/org/apache/samza/operators/stream/IntermediateMessageStreamImpl.java
new file mode 100644
index 0000000..f0bb1dc
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/operators/stream/IntermediateMessageStreamImpl.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.stream;
+
+import org.apache.samza.operators.MessageStreamImpl;
+import org.apache.samza.operators.OutputStream;
+import org.apache.samza.operators.StreamGraphImpl;
+import org.apache.samza.operators.spec.InputOperatorSpec;
+import org.apache.samza.operators.spec.OutputStreamImpl;
+import org.apache.samza.system.StreamSpec;
+
+/**
+ * An intermediate stream is both an input and an output stream (e.g. a repartitioned stream).
+ * <p>
+ * This implementation accepts a pair of {@link InputOperatorSpec} and {@link OutputStreamImpl} associated
+ * with the same logical {@code streamId}. It provides access to its {@link OutputStreamImpl} for
+ * {@link MessageStreamImpl#partitionBy} to send messages out to. It's also a {@link MessageStreamImpl} with
+ * {@link InputOperatorSpec} as its operator spec, so that further operations can be chained on the
+ * {@link InputOperatorSpec}.
+ *
+ * @param <K> the type of key in the outgoing/incoming message
+ * @param <V> the type of message in the outgoing/incoming message
+ * @param <M> the type of message in the output {@link MessageStreamImpl}
+ */
+public class IntermediateMessageStreamImpl<K, V, M> extends MessageStreamImpl<M> implements OutputStream<K, V, M> {
+
+  private final OutputStreamImpl<K, V, M> outputStream;
+
+  public IntermediateMessageStreamImpl(StreamGraphImpl graph, InputOperatorSpec<K, V, M> inputOperatorSpec,
+      OutputStreamImpl<K, V, M> outputStream) {
+    super(graph, inputOperatorSpec);
+    this.outputStream = outputStream;
+  }
+
+  public StreamSpec getStreamSpec() {
+    return this.outputStream.getStreamSpec();
+  }
+
+  public OutputStreamImpl<K, V, M> getOutputStream() {
+    return this.outputStream;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/c1c4289c/samza-core/src/main/java/org/apache/samza/operators/stream/IntermediateStreamInternalImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/stream/IntermediateStreamInternalImpl.java b/samza-core/src/main/java/org/apache/samza/operators/stream/IntermediateStreamInternalImpl.java
deleted file mode 100644
index 8f45f7a..0000000
--- a/samza-core/src/main/java/org/apache/samza/operators/stream/IntermediateStreamInternalImpl.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.stream;
-
-import org.apache.samza.operators.MessageStreamImpl;
-import org.apache.samza.operators.StreamGraphImpl;
-import org.apache.samza.system.StreamSpec;
-
-import java.util.function.BiFunction;
-import java.util.function.Function;
-
-public class IntermediateStreamInternalImpl<K, V, M> extends MessageStreamImpl<M>
-    implements InputStreamInternal<K, V, M>, OutputStreamInternal<K, V, M> {
-
-  private final StreamSpec streamSpec;
-  private final Function<M, K> keyExtractor;
-  private final Function<M, V> msgExtractor;
-  private final BiFunction<K, V, M> msgBuilder;
-
-  public IntermediateStreamInternalImpl(StreamGraphImpl graph, StreamSpec streamSpec, Function<M, K> keyExtractor,
-      Function<M, V> msgExtractor, BiFunction<K, V, M> msgBuilder) {
-    super(graph);
-    this.streamSpec = streamSpec;
-    this.keyExtractor = keyExtractor;
-    this.msgExtractor = msgExtractor;
-    this.msgBuilder = msgBuilder;
-  }
-
-  public StreamSpec getStreamSpec() {
-    return this.streamSpec;
-  }
-
-  public Function<M, K> getKeyExtractor() {
-    return this.keyExtractor;
-  }
-
-  public Function<M, V> getMsgExtractor() {
-    return this.msgExtractor;
-  }
-
-  @Override
-  public BiFunction<K, V, M> getMsgBuilder() {
-    return this.msgBuilder;
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/c1c4289c/samza-core/src/main/java/org/apache/samza/operators/stream/OutputStreamInternal.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/stream/OutputStreamInternal.java b/samza-core/src/main/java/org/apache/samza/operators/stream/OutputStreamInternal.java
deleted file mode 100644
index 48ce641..0000000
--- a/samza-core/src/main/java/org/apache/samza/operators/stream/OutputStreamInternal.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.operators.stream;
-
-import org.apache.samza.annotation.InterfaceStability;
-import org.apache.samza.operators.OutputStream;
-import org.apache.samza.system.StreamSpec;
-
-import java.util.function.Function;
-
-
-/**
- * Internal representation of an output stream.
- *
- * @param <M> the type of messages in the output stream
- */
-@InterfaceStability.Unstable
-public interface OutputStreamInternal<K, V, M> extends OutputStream<K, V, M> {
-
-  StreamSpec getStreamSpec();
-
-  Function<M, K> getKeyExtractor();
-
-  Function<M, V> getMsgExtractor();
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/c1c4289c/samza-core/src/main/java/org/apache/samza/operators/stream/OutputStreamInternalImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/stream/OutputStreamInternalImpl.java b/samza-core/src/main/java/org/apache/samza/operators/stream/OutputStreamInternalImpl.java
deleted file mode 100644
index a2d0cca..0000000
--- a/samza-core/src/main/java/org/apache/samza/operators/stream/OutputStreamInternalImpl.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.stream;
-
-import org.apache.samza.operators.MessageStreamImpl;
-import org.apache.samza.operators.StreamGraphImpl;
-import org.apache.samza.system.StreamSpec;
-
-import java.util.function.Function;
-
-public class OutputStreamInternalImpl<K, V, M> extends MessageStreamImpl<M> implements OutputStreamInternal<K, V, M> {
-
-  private final StreamSpec streamSpec;
-  private final Function<M, K> keyExtractor;
-  private final Function<M, V> msgExtractor;
-
-  public OutputStreamInternalImpl(StreamGraphImpl graph, StreamSpec streamSpec,
-      Function<M, K> keyExtractor, Function<M, V> msgExtractor) {
-    super(graph);
-    this.streamSpec = streamSpec;
-    this.keyExtractor = keyExtractor;
-    this.msgExtractor = msgExtractor;
-  }
-
-  public StreamSpec getStreamSpec() {
-    return this.streamSpec;
-  }
-
-  public Function<M, K> getKeyExtractor() {
-    return this.keyExtractor;
-  }
-
-  public Function<M, V> getMsgExtractor() {
-    return this.msgExtractor;
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/c1c4289c/samza-core/src/main/java/org/apache/samza/operators/util/OperatorJsonUtils.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/util/OperatorJsonUtils.java b/samza-core/src/main/java/org/apache/samza/operators/util/OperatorJsonUtils.java
deleted file mode 100644
index b971607..0000000
--- a/samza-core/src/main/java/org/apache/samza/operators/util/OperatorJsonUtils.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.operators.util;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.stream.Collectors;
-import org.apache.samza.operators.spec.OperatorSpec;
-import org.apache.samza.operators.spec.PartialJoinOperatorSpec;
-import org.apache.samza.operators.spec.SinkOperatorSpec;
-import org.apache.samza.operators.stream.OutputStreamInternal;
-
-public class OperatorJsonUtils {
-  private static final String OP_CODE = "opCode";
-  private static final String OP_ID = "opId";
-  private static final String SOURCE_LOCATION = "sourceLocation";
-  private static final String NEXT_OPERATOR_IDS = "nextOperatorIds";
-  private static final String OUTPUT_STREAM_ID = "outputStreamId";
-  private static final String TTL_MS = "ttlMs";
-
-  /**
-   * Format the operator properties into a map
-   * @param spec a {@link OperatorSpec} instance
-   * @return map of the operator properties
-   */
-  public static Map<String, Object> operatorToMap(OperatorSpec spec) {
-    Map<String, Object> map = new HashMap<>();
-    map.put(OP_CODE, spec.getOpCode().name());
-    map.put(OP_ID, spec.getOpId());
-    map.put(SOURCE_LOCATION, spec.getSourceLocation());
-
-    if (spec.getNextStream() != null) {
-      Collection<OperatorSpec> nextOperators = spec.getNextStream().getRegisteredOperatorSpecs();
-      map.put(NEXT_OPERATOR_IDS, nextOperators.stream().map(OperatorSpec::getOpId).collect(Collectors.toSet()));
-    } else {
-      map.put(NEXT_OPERATOR_IDS, Collections.emptySet());
-    }
-
-    if (spec instanceof SinkOperatorSpec) {
-      OutputStreamInternal outputStream = ((SinkOperatorSpec) spec).getOutputStream();
-      if (outputStream != null) {
-        map.put(OUTPUT_STREAM_ID, outputStream.getStreamSpec().getId());
-      }
-    }
-
-    if (spec instanceof PartialJoinOperatorSpec) {
-      map.put(TTL_MS, ((PartialJoinOperatorSpec) spec).getTtlMs());
-    }
-
-    return map;
-  }
-
-  /**
-   * Return the location of source code that creates the operator.
-   * This function is invoked in the constructor of each operator.
-   * @return formatted source location including file and line number
-   */
-  public static String getSourceLocation() {
-    // The stack trace looks like:
-    // [0] Thread.getStackTrace()
-    // [1] OperatorJsonUtils.getSourceLocation()
-    // [2] SomeOperator.<init>()
-    // [3] OperatorSpecs.createSomeOperator()
-    // [4] MessageStreamImpl.someOperator()
-    // [5] User code that calls [2]
-    // we are only interested in [5] here
-    StackTraceElement location = Thread.currentThread().getStackTrace()[5];
-    return String.format("%s:%s", location.getFileName(), location.getLineNumber());
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/c1c4289c/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java b/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
index 50ae775..a77ef3b 100644
--- a/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
+++ b/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
@@ -18,22 +18,19 @@
  */
 package org.apache.samza.task;
 
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.samza.application.StreamApplication;
 import org.apache.samza.config.Config;
 import org.apache.samza.operators.ContextManager;
 import org.apache.samza.operators.StreamGraphImpl;
+import org.apache.samza.operators.impl.InputOperatorImpl;
 import org.apache.samza.operators.impl.OperatorImplGraph;
-import org.apache.samza.operators.impl.RootOperatorImpl;
-import org.apache.samza.operators.stream.InputStreamInternal;
 import org.apache.samza.runtime.ApplicationRunner;
 import org.apache.samza.system.IncomingMessageEnvelope;
 import org.apache.samza.system.SystemStream;
 import org.apache.samza.util.Clock;
 import org.apache.samza.util.SystemClock;
 
-import java.util.HashMap;
-import java.util.Map;
-
 
 /**
  * A {@link StreamTask} implementation that brings all the operator API implementation components together and
@@ -47,7 +44,6 @@ public final class StreamOperatorTask implements StreamTask, InitableTask, Windo
 
   private OperatorImplGraph operatorImplGraph;
   private ContextManager contextManager;
-  private Map<SystemStream, InputStreamInternal> inputSystemStreamToInputStream;
 
   /**
    * Constructs an adaptor task to run the user-implemented {@link StreamApplication}.
@@ -70,11 +66,8 @@ public final class StreamOperatorTask implements StreamTask, InitableTask, Windo
    * <p>
    * Implementation: Initializes the user-implemented {@link StreamApplication}. The {@link StreamApplication} sets
    * the input and output streams and the task-wide context manager using the {@link StreamGraphImpl} APIs,
-   * and the logical transforms using the {@link org.apache.samza.operators.MessageStream} APIs.
-   *<p>
-   * It then uses the {@link StreamGraphImpl} to create the {@link OperatorImplGraph} corresponding to the logical
-   * DAG. It also saves the mapping between input {@link SystemStream}s and their corresponding
-   * {@link InputStreamInternal}s for delivering incoming messages to the appropriate sub-DAG.
+   * and the logical transforms using the {@link org.apache.samza.operators.MessageStream} APIs. It then uses
+   * the {@link StreamGraphImpl} to create the {@link OperatorImplGraph} corresponding to the logical DAG.
    *
    * @param config allows accessing of fields in the configuration files that this StreamTask is specified in
    * @param context allows initializing and accessing contextual data of this StreamTask
@@ -93,20 +86,11 @@ public final class StreamOperatorTask implements StreamTask, InitableTask, Windo
     }
 
     // create the operator impl DAG corresponding to the logical operator spec DAG
-    OperatorImplGraph operatorImplGraph = new OperatorImplGraph(clock);
-    operatorImplGraph.init(streamGraph, config, context);
-    this.operatorImplGraph = operatorImplGraph;
-
-    // TODO: SAMZA-1118 - Remove mapping after SystemConsumer starts returning logical streamId with incoming messages
-    inputSystemStreamToInputStream = new HashMap<>();
-    streamGraph.getInputStreams().forEach((streamSpec, inputStream)-> {
-        SystemStream systemStream = new SystemStream(streamSpec.getSystemName(), streamSpec.getPhysicalName());
-        inputSystemStreamToInputStream.put(systemStream, inputStream);
-      });
+    this.operatorImplGraph = new OperatorImplGraph(streamGraph, config, context, clock);
   }
 
   /**
-   * Passes the incoming message envelopes along to the {@link org.apache.samza.operators.impl.RootOperatorImpl} node
+   * Passes the incoming message envelopes along to the {@link InputOperatorImpl} node
    * for the input {@link SystemStream}.
    * <p>
    * From then on, each {@link org.apache.samza.operators.impl.OperatorImpl} propagates its transformed output to
@@ -119,20 +103,16 @@ public final class StreamOperatorTask implements StreamTask, InitableTask, Windo
   @Override
   public final void process(IncomingMessageEnvelope ime, MessageCollector collector, TaskCoordinator coordinator) {
     SystemStream systemStream = ime.getSystemStreamPartition().getSystemStream();
-    InputStreamInternal inputStream = inputSystemStreamToInputStream.get(systemStream);
-    RootOperatorImpl rootOperatorImpl = operatorImplGraph.getRootOperator(systemStream);
-    if (rootOperatorImpl != null) {
-      // TODO: SAMZA-1148 - Cast to appropriate input (key, msg) types based on the serde
-      // before applying the msgBuilder.
-      Object message = inputStream.getMsgBuilder().apply(ime.getKey(), ime.getMessage());
-      rootOperatorImpl.onMessage(message, collector, coordinator);
+    InputOperatorImpl inputOpImpl = operatorImplGraph.getInputOperator(systemStream);
+    if (inputOpImpl != null) {
+      inputOpImpl.onMessage(Pair.of(ime.getKey(), ime.getMessage()), collector, coordinator);
     }
   }
 
   @Override
   public final void window(MessageCollector collector, TaskCoordinator coordinator)  {
-    operatorImplGraph.getAllRootOperators()
-        .forEach(rootOperator -> rootOperator.onTimer(collector, coordinator));
+    operatorImplGraph.getAllInputOperators()
+        .forEach(inputOperator -> inputOperator.onTimer(collector, coordinator));
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/samza/blob/c1c4289c/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
index daa223a..2c8f682 100644
--- a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
+++ b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
@@ -224,12 +224,12 @@ public class TestExecutionPlanner {
     when(runner.getStreamSpec("output2")).thenReturn(output2);
 
     // intermediate streams used in tests
-    when(runner.getStreamSpec("test-app-1-partition_by-0"))
-        .thenReturn(new StreamSpec("test-app-1-partition_by-0", "test-app-1-partition_by-0", "default-system"));
     when(runner.getStreamSpec("test-app-1-partition_by-1"))
         .thenReturn(new StreamSpec("test-app-1-partition_by-1", "test-app-1-partition_by-1", "default-system"));
-    when(runner.getStreamSpec("test-app-1-partition_by-4"))
-        .thenReturn(new StreamSpec("test-app-1-partition_by-4", "test-app-1-partition_by-4", "default-system"));
+    when(runner.getStreamSpec("test-app-1-partition_by-3"))
+        .thenReturn(new StreamSpec("test-app-1-partition_by-3", "test-app-1-partition_by-3", "default-system"));
+    when(runner.getStreamSpec("test-app-1-partition_by-8"))
+        .thenReturn(new StreamSpec("test-app-1-partition_by-8", "test-app-1-partition_by-8", "default-system"));
   }
 
   @Test
@@ -272,7 +272,7 @@ public class TestExecutionPlanner {
 
     // the partitions should be the same as input1
     jobGraph.getIntermediateStreams().forEach(edge -> {
-        assertTrue(edge.getPartitionCount() == 64);
+        assertEquals(64, edge.getPartitionCount());
       });
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/c1c4289c/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java b/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java
index e53cd42..4bda86b 100644
--- a/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java
+++ b/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java
@@ -39,7 +39,7 @@ import org.codehaus.jackson.map.ObjectMapper;
 import org.junit.Test;
 
 import static org.apache.samza.execution.TestExecutionPlanner.createSystemAdmin;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertEquals;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -50,13 +50,16 @@ public class TestJobGraphJsonGenerator {
   public void test() throws Exception {
 
     /**
-     * the graph looks like the following. number of partitions in parentheses. quotes indicate expected value.
+     * the graph looks like the following.
+     * number in parentheses () indicates number of stream partitions.
+     * number in parentheses in quotes ("") indicates expected partition count.
+     * number in square brackets [] indicates operator ID.
      *
-     *                               input1 (64) -> map -> join -> output1 (8)
-     *                                                       |
-     *          input2 (16) -> partitionBy ("64") -> filter -|
-     *                                                       |
-     * input3 (32) -> filter -> partitionBy ("64") -> map -> join -> output2 (16)
+     * input3 (32) -> filter [7] -> partitionBy [8] ("64") -> map [10] -> join [14] -> sendTo(output2) [15] (16)
+     *                                                                   |
+     *              input2 (16) -> partitionBy [3] ("64") -> filter [5] -| -> sink [13]
+     *                                                                   |
+     *                                         input1 (64) -> map [1] -> join [11] -> sendTo(output1) [12] (8)
      *
      */
 
@@ -80,12 +83,10 @@ public class TestJobGraphJsonGenerator {
     when(runner.getStreamSpec("output2")).thenReturn(output2);
 
     // intermediate streams used in tests
-    when(runner.getStreamSpec("test-app-1-partition_by-0"))
-        .thenReturn(new StreamSpec("test-app-1-partition_by-0", "test-app-1-partition_by-0", "default-system"));
-    when(runner.getStreamSpec("test-app-1-partition_by-1"))
-        .thenReturn(new StreamSpec("test-app-1-partition_by-1", "test-app-1-partition_by-1", "default-system"));
-    when(runner.getStreamSpec("test-app-1-partition_by-4"))
-        .thenReturn(new StreamSpec("test-app-1-partition_by-4", "test-app-1-partition_by-4", "default-system"));
+    when(runner.getStreamSpec("test-app-1-partition_by-3"))
+        .thenReturn(new StreamSpec("test-app-1-partition_by-3", "test-app-1-partition_by-3", "default-system"));
+    when(runner.getStreamSpec("test-app-1-partition_by-8"))
+        .thenReturn(new StreamSpec("test-app-1-partition_by-8", "test-app-1-partition_by-8", "default-system"));
 
     // set up external partition count
     Map<String, Integer> system1Map = new HashMap<>();
@@ -124,10 +125,10 @@ public class TestJobGraphJsonGenerator {
     // deserialize
     ObjectMapper mapper = new ObjectMapper();
     JobGraphJsonGenerator.JobGraphJson nodes = mapper.readValue(json, JobGraphJsonGenerator.JobGraphJson.class);
-    assertTrue(nodes.jobs.get(0).operatorGraph.inputStreams.size() == 5);
-    assertTrue(nodes.jobs.get(0).operatorGraph.operators.size() == 13);
-    assertTrue(nodes.sourceStreams.size() == 3);
-    assertTrue(nodes.sinkStreams.size() == 2);
-    assertTrue(nodes.intermediateStreams.size() == 2);
+    assertEquals(5, nodes.jobs.get(0).operatorGraph.inputStreams.size());
+    assertEquals(11, nodes.jobs.get(0).operatorGraph.operators.size());
+    assertEquals(3, nodes.sourceStreams.size());
+    assertEquals(2, nodes.sinkStreams.size());
+    assertEquals(2, nodes.intermediateStreams.size());
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/c1c4289c/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java b/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java
index 39745bf..0c41fb8 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java
@@ -74,18 +74,18 @@ public class TestJoinOperator {
   public void testJoinFnInitAndClose() throws Exception {
     TestJoinFunction joinFn = new TestJoinFunction();
     StreamOperatorTask sot = createStreamOperatorTask(new SystemClock(), new TestJoinStreamApplication(joinFn));
-    assertEquals(joinFn.getNumInitCalls(), 1);
+    assertEquals(1, joinFn.getNumInitCalls());
     MessageCollector messageCollector = mock(MessageCollector.class);
 
     // push messages to first stream
     numbers.forEach(n -> sot.process(new FirstStreamIME(n, n), messageCollector, taskCoordinator));
 
     // close should not be called till now
-    assertEquals(joinFn.getNumCloseCalls(), 0);
+    assertEquals(0, joinFn.getNumCloseCalls());
     sot.close();
 
     // close should be called from sot.close()
-    assertEquals(joinFn.getNumCloseCalls(), 1);
+    assertEquals(1, joinFn.getNumCloseCalls());
   }
 
   @Test