You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2017/04/05 23:42:31 UTC

[3/4] samza git commit: SAMZA-1094 SAMZA-1101 SAMZA-1159; Remove MessageEnvelope from public operator APIs. : Delay the creation of SinkFunction for output streams. : Move StreamSpec from a public API to an internal class.

http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java
index c00f470..e2c4b9a 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java
@@ -19,21 +19,20 @@
 
 package org.apache.samza.operators.spec;
 
-import java.util.Collection;
 import org.apache.samza.config.Config;
 import org.apache.samza.operators.MessageStreamImpl;
-import org.apache.samza.operators.OutputStream;
-import org.apache.samza.operators.StreamGraphImpl;
 import org.apache.samza.operators.functions.FilterFunction;
 import org.apache.samza.operators.functions.FlatMapFunction;
 import org.apache.samza.operators.functions.MapFunction;
 import org.apache.samza.operators.functions.PartialJoinFunction;
 import org.apache.samza.operators.functions.SinkFunction;
+import org.apache.samza.operators.stream.OutputStreamInternal;
 import org.apache.samza.operators.windows.WindowPane;
 import org.apache.samza.operators.windows.internal.WindowInternal;
+import org.apache.samza.task.TaskContext;
 
 import java.util.ArrayList;
-import org.apache.samza.task.TaskContext;
+import java.util.Collection;
 
 
 /**
@@ -47,13 +46,14 @@ public class OperatorSpecs {
    * Creates a {@link StreamOperatorSpec} for {@link MapFunction}
    *
    * @param mapFn  the map function
-   * @param graph  the {@link StreamGraphImpl} object
-   * @param output  the output {@link MessageStreamImpl} object
+   * @param nextStream  the output {@link MessageStreamImpl} to send messages to
+   * @param opId  the unique ID of the operator
    * @param <M>  type of input message
    * @param <OM>  type of output message
    * @return  the {@link StreamOperatorSpec}
    */
-  public static <M, OM> StreamOperatorSpec<M, OM> createMapOperatorSpec(MapFunction<M, OM> mapFn, StreamGraphImpl graph, MessageStreamImpl<OM> output) {
+  public static <M, OM> StreamOperatorSpec<M, OM> createMapOperatorSpec(
+      MapFunction<M, OM> mapFn, MessageStreamImpl<OM> nextStream, int opId) {
     return new StreamOperatorSpec<>(new FlatMapFunction<M, OM>() {
       @Override
       public Collection<OM> apply(M message) {
@@ -71,19 +71,20 @@ public class OperatorSpecs {
       public void init(Config config, TaskContext context) {
         mapFn.init(config, context);
       }
-    }, output, OperatorSpec.OpCode.MAP, graph.getNextOpId());
+    }, nextStream, OperatorSpec.OpCode.MAP, opId);
   }
 
   /**
    * Creates a {@link StreamOperatorSpec} for {@link FilterFunction}
    *
    * @param filterFn  the transformation function
-   * @param graph  the {@link StreamGraphImpl} object
-   * @param output  the output {@link MessageStreamImpl} object
+   * @param nextStream  the output {@link MessageStreamImpl} to send messages to
+   * @param opId  the unique ID of the operator
    * @param <M>  type of input message
    * @return  the {@link StreamOperatorSpec}
    */
-  public static <M> StreamOperatorSpec<M, M> createFilterOperatorSpec(FilterFunction<M> filterFn, StreamGraphImpl graph, MessageStreamImpl<M> output) {
+  public static <M> StreamOperatorSpec<M, M> createFilterOperatorSpec(
+      FilterFunction<M> filterFn, MessageStreamImpl<M> nextStream, int opId) {
     return new StreamOperatorSpec<>(new FlatMapFunction<M, M>() {
       @Override
       public Collection<M> apply(M message) {
@@ -100,77 +101,81 @@ public class OperatorSpecs {
       public void init(Config config, TaskContext context) {
         filterFn.init(config, context);
       }
-    }, output, OperatorSpec.OpCode.FILTER, graph.getNextOpId());
+    }, nextStream, OperatorSpec.OpCode.FILTER, opId);
   }
 
   /**
    * Creates a {@link StreamOperatorSpec}.
    *
    * @param transformFn  the transformation function
-   * @param graph  the {@link StreamGraphImpl} object
-   * @param output  the output {@link MessageStreamImpl} object
+   * @param nextStream  the output {@link MessageStreamImpl} to send messages to
+   * @param opId  the unique ID of the operator
    * @param <M>  type of input message
    * @param <OM>  type of output message
    * @return  the {@link StreamOperatorSpec}
    */
   public static <M, OM> StreamOperatorSpec<M, OM> createStreamOperatorSpec(
-      FlatMapFunction<M, OM> transformFn, StreamGraphImpl graph, MessageStreamImpl<OM> output) {
-    return new StreamOperatorSpec<>(transformFn, output, OperatorSpec.OpCode.FLAT_MAP, graph.getNextOpId());
+      FlatMapFunction<M, OM> transformFn, MessageStreamImpl<OM> nextStream, int opId) {
+    return new StreamOperatorSpec<>(transformFn, nextStream, OperatorSpec.OpCode.FLAT_MAP, opId);
   }
 
   /**
-   * Creates a {@link SinkOperatorSpec}.
+   * Creates a {@link SinkOperatorSpec} for the sink operator.
    *
-   * @param sinkFn  the sink function
+   * @param sinkFn  the sink function provided by the user
+   * @param opId  the unique ID of the operator
    * @param <M>  type of input message
-   * @param graph  the {@link StreamGraphImpl} object
-   * @return  the {@link SinkOperatorSpec}
+   * @return  the {@link SinkOperatorSpec} for the sink operator
    */
-  public static <M> SinkOperatorSpec<M> createSinkOperatorSpec(SinkFunction<M> sinkFn, StreamGraphImpl graph) {
-    return new SinkOperatorSpec<>(sinkFn, OperatorSpec.OpCode.SINK, graph.getNextOpId());
+  public static <M> SinkOperatorSpec<M> createSinkOperatorSpec(SinkFunction<M> sinkFn, int opId) {
+    return new SinkOperatorSpec<>(sinkFn, OperatorSpec.OpCode.SINK, opId);
   }
 
   /**
-   * Creates a {@link SinkOperatorSpec}.
+   * Creates a {@link SinkOperatorSpec} for the sendTo operator.
    *
-   * @param sinkFn  the sink function
-   * @param graph  the {@link StreamGraphImpl} object
-   * @param stream  the {@link OutputStream} where the message is sent to
-   * @param <M>  type of input message
-   * @return  the {@link SinkOperatorSpec}
+   * @param outputStream  the {@link OutputStreamInternal} to send messages to
+   * @param opId  the unique ID of the operator
+   * @param <K> the type of key in the outgoing message
+   * @param <V> the type of message in the outgoing message
+   * @param <M> the type of message in the {@link OutputStreamInternal}
+   * @return  the {@link SinkOperatorSpec} for the sendTo operator
    */
-  public static <M> SinkOperatorSpec<M> createSendToOperatorSpec(SinkFunction<M> sinkFn, StreamGraphImpl graph, OutputStream<M> stream) {
-    return new SinkOperatorSpec<>(sinkFn, OperatorSpec.OpCode.SEND_TO, graph.getNextOpId(), stream);
+  public static <K, V, M> SinkOperatorSpec<M> createSendToOperatorSpec(
+      OutputStreamInternal<K, V, M> outputStream, int opId) {
+    return new SinkOperatorSpec<>(outputStream, OperatorSpec.OpCode.SEND_TO, opId);
   }
 
   /**
-   * Creates a {@link SinkOperatorSpec}.
+   * Creates a {@link SinkOperatorSpec} for the partitionBy operator.
    *
-   * @param sinkFn  the sink function
-   * @param stream  the {@link OutputStream} where the message is sent to
-   * @param opId operator ID
-   * @param <M>  type of input message
-   * @return  the {@link SinkOperatorSpec}
+   * @param outputStream  the {@link OutputStreamInternal} to send messages to
+   * @param opId  the unique ID of the operator
+   * @param <K> the type of key in the outgoing message
+   * @param <V> the type of message in the outgoing message
+   * @param <M> the type of message in the {@link OutputStreamInternal}
+   * @return  the {@link SinkOperatorSpec} for the partitionBy operator
    */
-  public static <M> SinkOperatorSpec<M> createPartitionOperatorSpec(SinkFunction<M> sinkFn, OutputStream<M> stream, int opId) {
-    return new SinkOperatorSpec<>(sinkFn, OperatorSpec.OpCode.PARTITION_BY, opId, stream);
+  public static <K, V, M> SinkOperatorSpec<M> createPartitionByOperatorSpec(
+      OutputStreamInternal<K, V, M> outputStream, int opId) {
+    return new SinkOperatorSpec<>(outputStream, OperatorSpec.OpCode.PARTITION_BY, opId);
   }
 
   /**
    * Creates a {@link WindowOperatorSpec}.
    *
-   * @param window the description of the window.
-   * @param graph  the {@link StreamGraphImpl} object
-   * @param wndOutput  the window output {@link MessageStreamImpl} object
-   * @param <M> the type of input message
-   * @param <WK> the type of key in the {@link WindowPane}
-   * @param <WV> the type of value in the window
+   * @param window  the description of the window.
+   * @param nextStream  the output {@link MessageStreamImpl} to send messages to
+   * @param opId  the unique ID of the operator
+   * @param <M>  the type of input message
+   * @param <WK>  the type of key in the {@link WindowPane}
+   * @param <WV>  the type of value in the window
    * @return  the {@link WindowOperatorSpec}
    */
 
   public static <M, WK, WV> WindowOperatorSpec<M, WK, WV> createWindowOperatorSpec(
-      WindowInternal<M, WK, WV> window, StreamGraphImpl graph, MessageStreamImpl<WindowPane<WK, WV>> wndOutput) {
-    return new WindowOperatorSpec<>(window, wndOutput, graph.getNextOpId());
+      WindowInternal<M, WK, WV> window, MessageStreamImpl<WindowPane<WK, WV>> nextStream, int opId) {
+    return new WindowOperatorSpec<>(window, nextStream, opId);
   }
 
   /**
@@ -179,8 +184,8 @@ public class OperatorSpecs {
    * @param thisPartialJoinFn  the partial join function for this message stream
    * @param otherPartialJoinFn  the partial join function for the other message stream
    * @param ttlMs  the ttl in ms for retaining messages in each stream
-   * @param graph  the {@link StreamGraphImpl} object
-   * @param joinOutput  the output {@link MessageStreamImpl}
+   * @param nextStream  the output {@link MessageStreamImpl} to send messages to
+   * @param opId  the unique ID of the operator
    * @param <K>  the type of join key
    * @param <M>  the type of input message
    * @param <JM>  the type of message in the other join stream
@@ -189,25 +194,25 @@ public class OperatorSpecs {
    */
   public static <K, M, JM, RM> PartialJoinOperatorSpec<K, M, JM, RM> createPartialJoinOperatorSpec(
       PartialJoinFunction<K, M, JM, RM> thisPartialJoinFn, PartialJoinFunction<K, JM, M, RM> otherPartialJoinFn,
-      long ttlMs, StreamGraphImpl graph, MessageStreamImpl<RM> joinOutput) {
-    return new PartialJoinOperatorSpec<K, M, JM, RM>(thisPartialJoinFn, otherPartialJoinFn, ttlMs, joinOutput, graph.getNextOpId());
+      long ttlMs, MessageStreamImpl<RM> nextStream, int opId) {
+    return new PartialJoinOperatorSpec<K, M, JM, RM>(thisPartialJoinFn, otherPartialJoinFn, ttlMs, nextStream, opId);
   }
 
   /**
    * Creates a {@link StreamOperatorSpec} with a merger function.
    *
-   * @param graph  the {@link StreamGraphImpl} object
-   * @param mergeOutput  the output {@link MessageStreamImpl} from the merger
+   * @param nextStream  the output {@link MessageStreamImpl} to send messages to
+   * @param opId  the unique ID of the operator
    * @param <M>  the type of input message
    * @return  the {@link StreamOperatorSpec} for the merge
    */
-  public static <M> StreamOperatorSpec<M, M> createMergeOperatorSpec(StreamGraphImpl graph, MessageStreamImpl<M> mergeOutput) {
+  public static <M> StreamOperatorSpec<M, M> createMergeOperatorSpec(MessageStreamImpl<M> nextStream, int opId) {
     return new StreamOperatorSpec<M, M>(message ->
         new ArrayList<M>() {
           {
             this.add(message);
           }
         },
-        mergeOutput, OperatorSpec.OpCode.MERGE, graph.getNextOpId());
+        nextStream, OperatorSpec.OpCode.MERGE, opId);
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/main/java/org/apache/samza/operators/spec/PartialJoinOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/PartialJoinOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/PartialJoinOperatorSpec.java
index 669895f..b1dc529 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/PartialJoinOperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/PartialJoinOperatorSpec.java
@@ -35,11 +35,10 @@ import org.apache.samza.task.TaskContext;
  */
 public class PartialJoinOperatorSpec<K, M, JM, RM> implements OperatorSpec<RM> {
 
-
   private final PartialJoinFunction<K, M, JM, RM> thisPartialJoinFn;
   private final PartialJoinFunction<K, JM, M, RM> otherPartialJoinFn;
   private final long ttlMs;
-  private final MessageStreamImpl<RM> joinOutput;
+  private final MessageStreamImpl<RM> nextStream;
   private final int opId;
 
   /**
@@ -50,22 +49,22 @@ public class PartialJoinOperatorSpec<K, M, JM, RM> implements OperatorSpec<RM> {
    * @param otherPartialJoinFn  partial join function that provides state for input messages of type {@code JM}
    *                            in the other stream
    * @param ttlMs  the ttl in ms for retaining messages in each stream
-   * @param joinOutput  the output {@link MessageStreamImpl} of the join results
+   * @param nextStream  the output {@link MessageStreamImpl} containing the messages produced from this operator
    * @param opId  the unique ID for this operator
    */
   PartialJoinOperatorSpec(PartialJoinFunction<K, M, JM, RM> thisPartialJoinFn,
       PartialJoinFunction<K, JM, M, RM> otherPartialJoinFn, long ttlMs,
-      MessageStreamImpl<RM> joinOutput, int opId) {
+      MessageStreamImpl<RM> nextStream, int opId) {
     this.thisPartialJoinFn = thisPartialJoinFn;
     this.otherPartialJoinFn = otherPartialJoinFn;
     this.ttlMs = ttlMs;
-    this.joinOutput = joinOutput;
+    this.nextStream = nextStream;
     this.opId = opId;
   }
 
   @Override
   public MessageStreamImpl<RM> getNextStream() {
-    return this.joinOutput;
+    return this.nextStream;
   }
 
   public PartialJoinFunction<K, M, JM, RM> getThisPartialJoinFn() {
@@ -80,10 +79,12 @@ public class PartialJoinOperatorSpec<K, M, JM, RM> implements OperatorSpec<RM> {
     return ttlMs;
   }
 
+  @Override
   public OperatorSpec.OpCode getOpCode() {
     return OpCode.JOIN;
   }
 
+  @Override
   public int getOpId() {
     return this.opId;
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java
index ba30d67..7de85f3 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java
@@ -20,69 +20,54 @@ package org.apache.samza.operators.spec;
 
 import org.apache.samza.config.Config;
 import org.apache.samza.operators.MessageStreamImpl;
-import org.apache.samza.operators.OutputStream;
 import org.apache.samza.operators.functions.SinkFunction;
+import org.apache.samza.operators.stream.OutputStreamInternal;
+import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.task.MessageCollector;
 import org.apache.samza.task.TaskContext;
+import org.apache.samza.task.TaskCoordinator;
 
 
 /**
- * The spec for a sink operator that accepts user-defined logic to output a {@link MessageStreamImpl} to an external
- * system. This is a terminal operator and does allows further operator chaining.
+ * The spec for an operator that outputs a {@link MessageStreamImpl} to an external system.
+ * This is a terminal operator and does not allow further operator chaining.
  *
  * @param <M>  the type of input message
  */
 public class SinkOperatorSpec<M> implements OperatorSpec {
 
-  /**
-   * {@link OpCode} for this {@link SinkOperatorSpec}
-   */
+  private final SinkFunction<M> sinkFn;
+  private OutputStreamInternal<?, ?, M> outputStream; // may be null
   private final OperatorSpec.OpCode opCode;
-
-  /**
-   * The unique ID for this operator.
-   */
   private final int opId;
 
   /**
-   * The user-defined sink function
-   */
-  private final SinkFunction<M> sinkFn;
-
-  /**
-   * Potential output stream defined by the {@link SinkFunction}
-   */
-  private final OutputStream<M> outStream;
-
-  /**
-   * Default constructor for a {@link SinkOperatorSpec} w/o an output stream. (e.g. output is sent to remote database)
+   * Constructs a {@link SinkOperatorSpec} with a user defined {@link SinkFunction}.
    *
    * @param sinkFn  a user defined {@link SinkFunction} that will be called with the output message,
    *                the output {@link org.apache.samza.task.MessageCollector} and the
    *                {@link org.apache.samza.task.TaskCoordinator}.
-   * @param opCode  the specific {@link OpCode} for this {@link SinkOperatorSpec}. It could be {@link OpCode#SINK}, {@link OpCode#SEND_TO},
-   *                or {@link OpCode#PARTITION_BY}
-   * @param opId  the unique id of this {@link SinkOperatorSpec} in the {@link org.apache.samza.operators.StreamGraph}
+   * @param opCode  the specific {@link OpCode} for this {@link SinkOperatorSpec}.
+   *                It could be {@link OpCode#SINK}, {@link OpCode#SEND_TO}, or {@link OpCode#PARTITION_BY}.
+   * @param opId  the unique ID of this {@link OperatorSpec} in the graph
    */
   SinkOperatorSpec(SinkFunction<M> sinkFn, OperatorSpec.OpCode opCode, int opId) {
-    this(sinkFn, opCode, opId, null);
+    this.sinkFn = sinkFn;
+    this.opCode = opCode;
+    this.opId = opId;
   }
 
   /**
-   * Default constructor for a {@link SinkOperatorSpec} that sends the output to an {@link OutputStream}
-   *
-   * @param sinkFn  a user defined {@link SinkFunction} that will be called with the output message,
-   *                the output {@link org.apache.samza.task.MessageCollector} and the
-   *                {@link org.apache.samza.task.TaskCoordinator}.
-   * @param opCode  the specific {@link OpCode} for this {@link SinkOperatorSpec}. It could be {@link OpCode#SINK}, {@link OpCode#SEND_TO},
-   *                or {@link OpCode#PARTITION_BY}
-   * @param opId  the unique id of this {@link SinkOperatorSpec} in the {@link org.apache.samza.operators.StreamGraph}
-   * @param opId  the {@link OutputStream} for this {@link SinkOperatorSpec}
+   * Constructs a {@link SinkOperatorSpec} to send messages to the provided {@code outStream}
+   * @param outputStream  the {@link OutputStreamInternal} to send messages to
+   * @param opCode the specific {@link OpCode} for this {@link SinkOperatorSpec}.
+   *               It could be {@link OpCode#SINK}, {@link OpCode#SEND_TO}, or {@link OpCode#PARTITION_BY}
+   * @param opId  the unique ID of this {@link SinkOperatorSpec} in the graph
    */
-  SinkOperatorSpec(SinkFunction<M> sinkFn, OperatorSpec.OpCode opCode, int opId, OutputStream<M> outStream) {
-    this.sinkFn = sinkFn;
-    this.opCode = opCode;
-    this.opId = opId;
-    this.outStream = outStream;
+  SinkOperatorSpec(OutputStreamInternal<?, ?, M> outputStream, OperatorSpec.OpCode opCode, int opId) {
+    this(createSinkFn(outputStream), opCode, opId);
+    this.outputStream = outputStream;
   }
 
   /**
@@ -94,23 +79,47 @@ public class SinkOperatorSpec<M> implements OperatorSpec {
     return null;
   }
 
+  /**
+   * The {@link OutputStreamInternal} that this operator is sending its output to.
+   * @return the {@link OutputStreamInternal} for this operator if any, else null.
+   */
+  public OutputStreamInternal<?, ?, M> getOutputStream() {
+    return this.outputStream;
+  }
+
   public SinkFunction<M> getSinkFn() {
     return this.sinkFn;
   }
 
+  @Override
   public OperatorSpec.OpCode getOpCode() {
     return this.opCode;
   }
 
+  @Override
   public int getOpId() {
     return this.opId;
   }
 
-  public OutputStream<M> getOutStream() {
-    return this.outStream;
+  @Override
+  public void init(Config config, TaskContext context) {
+    this.sinkFn.init(config, context);
   }
 
-  @Override public void init(Config config, TaskContext context) {
-    this.sinkFn.init(config, context);
+  /**
+   * Creates a {@link SinkFunction} to send messages to the provided {@code output}.
+   * @param outputStream  the {@link OutputStreamInternal} to send messages to
+   * @param <M>  the type of input message
+   * @return  a {@link SinkFunction} that sends messages to the provided {@code output}
+   */
+  private static <M> SinkFunction<M> createSinkFn(OutputStreamInternal<?, ?, M> outputStream) {
+    return (M message, MessageCollector mc, TaskCoordinator tc) -> {
+      // TODO: SAMZA-1148 - need to find a way to directly pass in the serde class names
+      SystemStream systemStream = new SystemStream(outputStream.getStreamSpec().getSystemName(),
+          outputStream.getStreamSpec().getPhysicalName());
+      Object key = outputStream.getKeyExtractor().apply(message);
+      Object msg = outputStream.getMsgExtractor().apply(message);
+      mc.send(new OutgoingMessageEnvelope(systemStream, key, msg));
+    };
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java
index d7813f7..3c427c7 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java
@@ -32,54 +32,42 @@ import org.apache.samza.task.TaskContext;
  */
 public class StreamOperatorSpec<M, OM> implements OperatorSpec<OM> {
 
-  /**
-   * {@link OpCode} for this {@link StreamOperatorSpec}
-   */
+  private final FlatMapFunction<M, OM> transformFn;
+  private final MessageStreamImpl<OM> nextStream;
   private final OperatorSpec.OpCode opCode;
-
-  /**
-   * The unique ID for this operator.
-   */
   private final int opId;
 
   /**
-   * The output {@link MessageStreamImpl} from this {@link StreamOperatorSpec}
-   */
-  private final MessageStreamImpl<OM> outputStream;
-
-  /**
-   * Transformation function applied in this {@link StreamOperatorSpec}
-   */
-  private final FlatMapFunction<M, OM> transformFn;
-
-  /**
    * Constructor for a {@link StreamOperatorSpec} that accepts an output {@link MessageStreamImpl}.
    *
    * @param transformFn  the transformation function
-   * @param outputStream  the output {@link MessageStreamImpl}
+   * @param nextStream  the output {@link MessageStreamImpl} containing the messages produced from this operator
    * @param opCode  the {@link OpCode} for this {@link StreamOperatorSpec}
    * @param opId  the unique id for this {@link StreamOperatorSpec} in a {@link org.apache.samza.operators.StreamGraph}
    */
-  StreamOperatorSpec(FlatMapFunction<M, OM> transformFn, MessageStreamImpl outputStream, OperatorSpec.OpCode opCode, int opId) {
-    this.outputStream = outputStream;
+  StreamOperatorSpec(FlatMapFunction<M, OM> transformFn, MessageStreamImpl nextStream,
+      OperatorSpec.OpCode opCode, int opId) {
     this.transformFn = transformFn;
+    this.nextStream = nextStream;
     this.opCode = opCode;
     this.opId = opId;
   }
 
   @Override
   public MessageStreamImpl<OM> getNextStream() {
-    return this.outputStream;
+    return this.nextStream;
   }
 
   public FlatMapFunction<M, OM> getTransformFn() {
     return this.transformFn;
   }
 
+  @Override
   public OperatorSpec.OpCode getOpCode() {
     return this.opCode;
   }
 
+  @Override
   public int getOpId() {
     return this.opId;
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java
index 6d948d7..9515e38 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java
@@ -36,21 +36,18 @@ import org.apache.samza.task.TaskContext;
 public class WindowOperatorSpec<M, WK, WV> implements OperatorSpec<WindowPane<WK, WV>> {
 
   private final WindowInternal<M, WK, WV> window;
-
-  private final MessageStreamImpl<WindowPane<WK, WV>> outputStream;
-
+  private final MessageStreamImpl<WindowPane<WK, WV>> nextStream;
   private final int opId;
 
-
   /**
    * Constructor for {@link WindowOperatorSpec}.
    *
    * @param window  the window function
-   * @param outputStream  the output {@link MessageStreamImpl} from this {@link WindowOperatorSpec}
+   * @param nextStream  the output {@link MessageStreamImpl} containing the messages produced from this operator
    * @param opId  auto-generated unique ID of this operator
    */
-  WindowOperatorSpec(WindowInternal<M, WK, WV> window, MessageStreamImpl<WindowPane<WK, WV>> outputStream, int opId) {
-    this.outputStream = outputStream;
+  WindowOperatorSpec(WindowInternal<M, WK, WV> window, MessageStreamImpl<WindowPane<WK, WV>> nextStream, int opId) {
+    this.nextStream = nextStream;
     this.window = window;
     this.opId = opId;
   }
@@ -64,17 +61,19 @@ public class WindowOperatorSpec<M, WK, WV> implements OperatorSpec<WindowPane<WK
 
   @Override
   public MessageStreamImpl<WindowPane<WK, WV>> getNextStream() {
-    return this.outputStream;
+    return this.nextStream;
   }
 
   public WindowInternal<M, WK, WV> getWindow() {
     return window;
   }
 
+  @Override
   public OpCode getOpCode() {
     return OpCode.WINDOW;
   }
 
+  @Override
   public int getOpId() {
     return this.opId;
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/main/java/org/apache/samza/operators/stream/InputStreamInternal.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/stream/InputStreamInternal.java b/samza-core/src/main/java/org/apache/samza/operators/stream/InputStreamInternal.java
new file mode 100644
index 0000000..e67b326
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/operators/stream/InputStreamInternal.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.stream;
+
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.system.StreamSpec;
+
+import java.util.function.BiFunction;
+
+/**
+ * Internal representation of an input stream.
+ *
+ * @param <M> the type of messages in the input stream
+ */
+@InterfaceStability.Unstable
+public interface InputStreamInternal<K, V, M> extends MessageStream<M> {
+
+  StreamSpec getStreamSpec();
+
+  BiFunction<K, V, M> getMsgBuilder();
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/main/java/org/apache/samza/operators/stream/InputStreamInternalImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/stream/InputStreamInternalImpl.java b/samza-core/src/main/java/org/apache/samza/operators/stream/InputStreamInternalImpl.java
new file mode 100644
index 0000000..c4337d0
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/operators/stream/InputStreamInternalImpl.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.stream;
+
+import org.apache.samza.operators.MessageStreamImpl;
+import org.apache.samza.operators.StreamGraphImpl;
+import org.apache.samza.system.StreamSpec;
+
+import java.util.function.BiFunction;
+
+public class InputStreamInternalImpl<K, V, M> extends MessageStreamImpl<M> implements InputStreamInternal<K, V, M> {
+
+  private final StreamSpec streamSpec;
+  private final BiFunction<K, V, M> msgBuilder;
+
+  public InputStreamInternalImpl(StreamGraphImpl graph, StreamSpec streamSpec, BiFunction<K, V, M> msgBuilder) {
+    super(graph);
+    this.streamSpec = streamSpec;
+    this.msgBuilder = msgBuilder;
+  }
+
+  public StreamSpec getStreamSpec() {
+    return this.streamSpec;
+  }
+
+  public BiFunction<K, V, M> getMsgBuilder() {
+    return this.msgBuilder;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/main/java/org/apache/samza/operators/stream/IntermediateStreamInternalImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/stream/IntermediateStreamInternalImpl.java b/samza-core/src/main/java/org/apache/samza/operators/stream/IntermediateStreamInternalImpl.java
new file mode 100644
index 0000000..a1bee6a
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/operators/stream/IntermediateStreamInternalImpl.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.stream;
+
+import org.apache.samza.operators.MessageStreamImpl;
+import org.apache.samza.operators.StreamGraphImpl;
+import org.apache.samza.system.StreamSpec;
+
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+public class IntermediateStreamInternalImpl<K, V, M> extends MessageStreamImpl<M>
+    implements InputStreamInternal<K, V, M>, OutputStreamInternal<K, V, M> {
+
+  private final StreamSpec streamSpec;
+  private final Function<M, K> keyExtractor;
+  private final Function<M, V> msgExtractor;
+  private final BiFunction<K, V, M> msgBuilder;
+
+  public IntermediateStreamInternalImpl(StreamGraphImpl graph, StreamSpec streamSpec,
+      Function<M, K> keyExtractor, Function<M, V> msgExtractor, BiFunction<K, V, M> msgBuilder) {
+    super(graph);
+    this.streamSpec = streamSpec;
+    this.keyExtractor = keyExtractor;
+    this.msgExtractor = msgExtractor;
+    this.msgBuilder = msgBuilder;
+  }
+
+  public StreamSpec getStreamSpec() {
+    return this.streamSpec;
+  }
+
+  public Function<M, K> getKeyExtractor() {
+    return this.keyExtractor;
+  }
+
+  public Function<M, V> getMsgExtractor() {
+    return this.msgExtractor;
+  }
+
+  @Override
+  public BiFunction<K, V, M> getMsgBuilder() {
+    return this.msgBuilder;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/main/java/org/apache/samza/operators/stream/OutputStreamInternal.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/stream/OutputStreamInternal.java b/samza-core/src/main/java/org/apache/samza/operators/stream/OutputStreamInternal.java
new file mode 100644
index 0000000..48ce641
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/operators/stream/OutputStreamInternal.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.operators.stream;
+
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.operators.OutputStream;
+import org.apache.samza.system.StreamSpec;
+
+import java.util.function.Function;
+
+
+/**
+ * Internal representation of an output stream.
+ *
+ * @param <M> the type of messages in the output stream
+ */
+@InterfaceStability.Unstable
+public interface OutputStreamInternal<K, V, M> extends OutputStream<K, V, M> {
+
+  StreamSpec getStreamSpec();
+
+  Function<M, K> getKeyExtractor();
+
+  Function<M, V> getMsgExtractor();
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/main/java/org/apache/samza/operators/stream/OutputStreamInternalImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/stream/OutputStreamInternalImpl.java b/samza-core/src/main/java/org/apache/samza/operators/stream/OutputStreamInternalImpl.java
new file mode 100644
index 0000000..a2d0cca
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/operators/stream/OutputStreamInternalImpl.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.stream;
+
+import org.apache.samza.operators.MessageStreamImpl;
+import org.apache.samza.operators.StreamGraphImpl;
+import org.apache.samza.system.StreamSpec;
+
+import java.util.function.Function;
+
+public class OutputStreamInternalImpl<K, V, M> extends MessageStreamImpl<M> implements OutputStreamInternal<K, V, M> {
+
+  private final StreamSpec streamSpec;
+  private final Function<M, K> keyExtractor;
+  private final Function<M, V> msgExtractor;
+
+  public OutputStreamInternalImpl(StreamGraphImpl graph, StreamSpec streamSpec,
+      Function<M, K> keyExtractor, Function<M, V> msgExtractor) {
+    super(graph);
+    this.streamSpec = streamSpec;
+    this.keyExtractor = keyExtractor;
+    this.msgExtractor = msgExtractor;
+  }
+
+  public StreamSpec getStreamSpec() {
+    return this.streamSpec;
+  }
+
+  public Function<M, K> getKeyExtractor() {
+    return this.keyExtractor;
+  }
+
+  public Function<M, V> getMsgExtractor() {
+    return this.msgExtractor;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java
index cd6c492..4cb0d28 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java
@@ -31,7 +31,6 @@ import org.apache.samza.execution.JobNode;
 import org.apache.samza.execution.StreamManager;
 import org.apache.samza.job.ApplicationStatus;
 import org.apache.samza.job.JobRunner;
-import org.apache.samza.operators.StreamGraph;
 import org.apache.samza.operators.StreamGraphImpl;
 import org.apache.samza.system.StreamSpec;
 import org.slf4j.Logger;
@@ -135,7 +134,7 @@ public class RemoteApplicationRunner extends AbstractApplicationRunner {
 
   private JobGraph getExecutionPlan(StreamApplication app) throws Exception {
     // build stream graph
-    StreamGraph streamGraph = new StreamGraphImpl(this, config);
+    StreamGraphImpl streamGraph = new StreamGraphImpl(this, config);
     app.init(streamGraph, config);
 
     // create the physical execution plan

http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java b/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
index d4224c3..73bb53f 100644
--- a/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
+++ b/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
@@ -18,113 +18,114 @@
  */
 package org.apache.samza.task;
 
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-
+import org.apache.samza.application.StreamApplication;
 import org.apache.samza.config.Config;
 import org.apache.samza.operators.ContextManager;
-import org.apache.samza.operators.MessageStreamImpl;
-import org.apache.samza.application.StreamApplication;
 import org.apache.samza.operators.StreamGraphImpl;
-import org.apache.samza.operators.data.InputMessageEnvelope;
-import org.apache.samza.operators.impl.OperatorGraph;
+import org.apache.samza.operators.impl.OperatorImplGraph;
+import org.apache.samza.operators.stream.InputStreamInternal;
 import org.apache.samza.runtime.ApplicationRunner;
 import org.apache.samza.system.IncomingMessageEnvelope;
 import org.apache.samza.system.SystemStream;
-import org.apache.samza.system.SystemStreamPartition;
 import org.apache.samza.util.Clock;
 import org.apache.samza.util.SystemClock;
 
+import java.util.HashMap;
+import java.util.Map;
+
 
 /**
- * Execution of the logic sub-DAG
- *
- *
- * An {@link StreamTask} implementation that receives {@link InputMessageEnvelope}s and propagates them
- * through the user's stream transformations defined in {@link StreamGraphImpl} using the
- * {@link org.apache.samza.operators.MessageStream} APIs.
- * <p>
- * This class brings all the operator API implementation components together and feeds the
- * {@link InputMessageEnvelope}s into the transformation chains.
- * <p>
- * It accepts an instance of the user implemented factory {@link StreamApplication} as input parameter of the constructor.
- * When its own {@link #init(Config, TaskContext)} method is called during startup, it instantiate a user-defined {@link StreamGraphImpl}
- * from the {@link StreamApplication}, calls {@link StreamGraphImpl#getContextManager()} to initialize the task-wide context
- * for the graph, and creates a {@link MessageStreamImpl} corresponding to each of its input
- * {@link org.apache.samza.system.SystemStreamPartition}s. Each input {@link MessageStreamImpl}
- * will be corresponding to either an input stream or intermediate stream in {@link StreamGraphImpl}.
- * <p>
- * Then, this task calls {@link org.apache.samza.operators.impl.OperatorGraph#init(Map, Config, TaskContext)} for each of the input
- * {@link MessageStreamImpl}. This instantiates the {@link org.apache.samza.operators.impl.OperatorImpl} DAG
- * corresponding to the aforementioned {@link org.apache.samza.operators.spec.OperatorSpec} DAG and returns the
- * root node of the DAG, which this class saves.
- * <p>
- * Now that it has the root for the DAG corresponding to each {@link org.apache.samza.system.SystemStreamPartition}, it
- * can pass the message envelopes received in {@link StreamTask#process(IncomingMessageEnvelope, MessageCollector, TaskCoordinator)}
- * along to the appropriate root nodes. From then on, each {@link org.apache.samza.operators.impl.OperatorImpl} propagates
- * its transformed output to the next set of {@link org.apache.samza.operators.impl.OperatorImpl}s.
+ * A {@link StreamTask} implementation that brings all the operator API implementation components together and
+ * feeds the input messages into the user-defined transformation chains in {@link StreamApplication}.
  */
 public final class StreamOperatorTask implements StreamTask, InitableTask, WindowableTask, ClosableTask {
 
-  /**
-   * A mapping from each {@link SystemStream} to the root node of its operator chain DAG.
-   */
-  private final OperatorGraph operatorGraph;
-
-  private final StreamApplication graphBuilder;
-
+  private final StreamApplication streamApplication;
   private final ApplicationRunner runner;
-
   private final Clock clock;
 
+  private OperatorImplGraph operatorImplGraph;
   private ContextManager contextManager;
+  private Map<SystemStream, InputStreamInternal> inputSystemStreamToInputStream;
 
-  private Set<SystemStreamPartition> systemStreamPartitions;
-
-  public StreamOperatorTask(StreamApplication graphBuilder, ApplicationRunner runner) {
-    this(graphBuilder, SystemClock.instance(), runner);
+  /**
+   * Constructs an adaptor task to run the user-implemented {@link StreamApplication}.
+   * @param streamApplication the user-implemented {@link StreamApplication} that creates the logical DAG
+   * @param runner the {@link ApplicationRunner} to get the mapping between logical and physical streams
+   * @param clock the {@link Clock} to use for time-keeping
+   */
+  public StreamOperatorTask(StreamApplication streamApplication, ApplicationRunner runner, Clock clock) {
+    this.streamApplication = streamApplication;
+    this.runner = runner;
+    this.clock = clock;
   }
 
-  // purely for testing.
-  public StreamOperatorTask(StreamApplication graphBuilder, Clock clock, ApplicationRunner runner) {
-    this.graphBuilder = graphBuilder;
-    this.operatorGraph = new OperatorGraph(clock);
-    this.clock = clock;
-    this.runner = runner;
+  public StreamOperatorTask(StreamApplication application, ApplicationRunner runner) {
+    this(application, runner, SystemClock.instance());
   }
 
+  /**
+   * Initializes this task during startup.
+   * <p>
+   * Implementation: Initializes the user-implemented {@link StreamApplication}. The {@link StreamApplication} sets
+   * the input and output streams and the task-wide context manager using the {@link StreamGraphImpl} APIs,
+   * and the logical transforms using the {@link org.apache.samza.operators.MessageStream} APIs.
+   *<p>
+   * It then uses the {@link StreamGraphImpl} to create the {@link OperatorImplGraph} corresponding to the logical
+   * DAG. It also saves the mapping between input {@link SystemStream}s and their corresponding
+   * {@link InputStreamInternal}s for delivering incoming messages to the appropriate sub-DAG.
+   *
+   * @param config allows accessing of fields in the configuration files that this StreamTask is specified in
+   * @param context allows initializing and accessing contextual data of this StreamTask
+   * @throws Exception in case of initialization errors
+   */
   @Override
   public final void init(Config config, TaskContext context) throws Exception {
-    // create the MessageStreamsImpl object and initialize app-specific logic DAG within the task
-    StreamGraphImpl streamGraph = new StreamGraphImpl(this.runner, config);
-    this.graphBuilder.init(streamGraph, config);
-    // get the context manager of the {@link StreamGraph} and initialize the task-specific context
-    this.contextManager = streamGraph.getContextManager();
-    this.systemStreamPartitions = context.getSystemStreamPartitions();
+    StreamGraphImpl streamGraph = new StreamGraphImpl(runner, config);
+    // initialize the user-implemented stream application.
+    this.streamApplication.init(streamGraph, config);
 
-    Map<SystemStream, MessageStreamImpl> inputBySystemStream = new HashMap<>();
-    systemStreamPartitions.forEach(ssp -> {
-        if (!inputBySystemStream.containsKey(ssp.getSystemStream())) {
-          // create mapping from the physical input {@link SystemStream} to the logic {@link MessageStream}
-          inputBySystemStream.putIfAbsent(ssp.getSystemStream(), streamGraph.getInputStream(ssp.getSystemStream()));
-        }
+    // get the user-implemented context manager and initialize the task-specific context.
+    this.contextManager = streamGraph.getContextManager();
+    TaskContext initializedTaskContext = this.contextManager.initTaskContext(config, context);
+
+    // create the operator impl DAG corresponding to the logical operator spec DAG
+    OperatorImplGraph operatorImplGraph = new OperatorImplGraph(clock);
+    operatorImplGraph.init(streamGraph, config, initializedTaskContext);
+    this.operatorImplGraph = operatorImplGraph;
+
+    // TODO: SAMZA-1118 - Remove mapping after SystemConsumer starts returning logical streamId with incoming messages
+    inputSystemStreamToInputStream = new HashMap<>();
+    streamGraph.getInputStreams().forEach((streamSpec, inputStream)-> {
+        SystemStream systemStream = new SystemStream(streamSpec.getSystemName(), streamSpec.getPhysicalName());
+        inputSystemStreamToInputStream.put(systemStream, inputStream);
       });
-    operatorGraph.init(inputBySystemStream, config, this.contextManager.initTaskContext(config, context));
   }
 
+  /**
+   * Passes the incoming message envelopes along to the {@link org.apache.samza.operators.impl.RootOperatorImpl} node
+   * for the input {@link SystemStream}.
+   * <p>
+   * From then on, each {@link org.apache.samza.operators.impl.OperatorImpl} propagates its transformed output to
+   * its chained {@link org.apache.samza.operators.impl.OperatorImpl}s itself.
+   *
+   * @param ime incoming message envelope to process
+   * @param collector the collector to send messages with
+   * @param coordinator the coordinator to request commits or shutdown
+   */
   @Override
   public final void process(IncomingMessageEnvelope ime, MessageCollector collector, TaskCoordinator coordinator) {
-    this.operatorGraph.get(ime.getSystemStreamPartition().getSystemStream())
-        .onNext(new InputMessageEnvelope(ime), collector, coordinator);
+    SystemStream systemStream = ime.getSystemStreamPartition().getSystemStream();
+    InputStreamInternal inputStream = inputSystemStreamToInputStream.get(systemStream);
+    // TODO: SAMZA-1148 - Cast to appropriate input (key, msg) types based on the serde before applying the msgBuilder.
+    operatorImplGraph.getRootOperator(systemStream)
+        .onNext(inputStream.getMsgBuilder().apply(ime.getKey(), ime.getMessage()), collector, coordinator);
   }
 
   @Override
   public final void window(MessageCollector collector, TaskCoordinator coordinator)  {
-    systemStreamPartitions.forEach(ssp -> {
-        this.operatorGraph.get(ssp.getSystemStream())
-          .onTick(collector, coordinator);
-      });
+    operatorImplGraph.getAllRootOperators()
+        .forEach(rootOperator -> rootOperator.onTick(collector, coordinator));
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/test/java/org/apache/samza/example/BroadcastExample.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/BroadcastExample.java b/samza-core/src/test/java/org/apache/samza/example/BroadcastExample.java
new file mode 100644
index 0000000..a09247a
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/example/BroadcastExample.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.example;
+
+import org.apache.samza.application.StreamApplication;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.OutputStream;
+import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.runtime.ApplicationRunner;
+import org.apache.samza.util.CommandLine;
+
+
+/**
+ * Example implementation of a task that splits its input into multiple output streams.
+ */
+public class BroadcastExample implements StreamApplication {
+
+  @Override
+  public void init(StreamGraph graph, Config config) {
+    MessageStream<PageViewEvent> inputStream =
+        graph.getInputStream("inputStream", (k, m) -> (PageViewEvent) m);
+    OutputStream<String, PageViewEvent, PageViewEvent> outputStream1 =
+        graph.getOutputStream("outputStream1", m -> m.key, m -> m);
+    OutputStream<String, PageViewEvent, PageViewEvent> outputStream2 =
+        graph.getOutputStream("outputStream2", m -> m.key, m -> m);
+    OutputStream<String, PageViewEvent, PageViewEvent> outputStream3 =
+        graph.getOutputStream("outputStream3", m -> m.key, m -> m);
+
+    inputStream.filter(m -> m.key.equals("key1")).sendTo(outputStream1);
+    inputStream.filter(m -> m.key.equals("key2")).sendTo(outputStream2);
+    inputStream.filter(m -> m.key.equals("key3")).sendTo(outputStream3);
+  }
+
+  // local execution mode
+  public static void main(String[] args) throws Exception {
+    CommandLine cmdLine = new CommandLine();
+    Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
+    ApplicationRunner localRunner = ApplicationRunner.getLocalRunner(config);
+    localRunner.run(new BroadcastExample());
+  }
+
+  class PageViewEvent {
+    String key;
+    long timestamp;
+
+    public PageViewEvent(String key, long timestamp) {
+      this.key = key;
+      this.timestamp = timestamp;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/test/java/org/apache/samza/example/KeyValueStoreExample.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/KeyValueStoreExample.java b/samza-core/src/test/java/org/apache/samza/example/KeyValueStoreExample.java
index 5898f1f..6b913c4 100644
--- a/samza-core/src/test/java/org/apache/samza/example/KeyValueStoreExample.java
+++ b/samza-core/src/test/java/org/apache/samza/example/KeyValueStoreExample.java
@@ -18,55 +18,42 @@
  */
 package org.apache.samza.example;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
 
-import org.apache.samza.operators.*;
 import org.apache.samza.application.StreamApplication;
 import org.apache.samza.config.Config;
-import org.apache.samza.operators.data.MessageEnvelope;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.OutputStream;
+import org.apache.samza.operators.StreamGraph;
 import org.apache.samza.operators.functions.FlatMapFunction;
-import org.apache.samza.serializers.JsonSerde;
-import org.apache.samza.serializers.StringSerde;
-import org.apache.samza.storage.kv.KeyValueStore;
 import org.apache.samza.runtime.ApplicationRunner;
-import org.apache.samza.system.StreamSpec;
+import org.apache.samza.storage.kv.KeyValueStore;
 import org.apache.samza.task.TaskContext;
 import org.apache.samza.util.CommandLine;
 
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
 
 /**
  * Example code using {@link KeyValueStore} to implement event-time window
  */
 public class KeyValueStoreExample implements StreamApplication {
 
-  /**
-   * used by remote application runner to launch the job in remote program. The remote program should follow the similar
-   * invoking context as in local runner:
-   *
-   *   public static void main(String args[]) throws Exception {
-   *     CommandLine cmdLine = new CommandLine();
-   *     Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
-   *     ApplicationRunner runner = ApplicationRunner.fromConfig(config);
-   *     runner.run(new UserMainExample(), config)
-   *   }
-   *
-   */
   @Override public void init(StreamGraph graph, Config config) {
-
-    MessageStream<PageViewEvent> pageViewEvents = graph.createInStream(input1, new StringSerde("UTF-8"), new JsonSerde<>());
-    OutputStream<StatsOutput> pageViewPerMemberCounters = graph.createOutStream(output, new StringSerde("UTF-8"), new JsonSerde<StatsOutput>());
+    MessageStream<PageViewEvent> pageViewEvents = graph.getInputStream(
+        "pageViewEventStream", (k, v) -> (PageViewEvent) v);
+    OutputStream<String, StatsOutput, StatsOutput> pageViewEventPerMemberStream = graph.getOutputStream(
+        "pageViewEventPerMemberStream", statsOutput -> statsOutput.memberId, statsOutput -> statsOutput);
 
     pageViewEvents.
-        partitionBy(m -> m.getMessage().memberId).
+        partitionBy(m -> m.memberId).
         flatMap(new MyStatsCounter()).
-        sendTo(pageViewPerMemberCounters);
-
+        sendTo(pageViewEventPerMemberStream);
   }
 
-  // local program model
+  // local execution mode
   public static void main(String[] args) throws Exception {
     CommandLine cmdLine = new CommandLine();
     Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
@@ -88,8 +75,8 @@ public class KeyValueStoreExample implements StreamApplication {
     @Override
     public Collection<StatsOutput> apply(PageViewEvent message) {
       List<StatsOutput> outputStats = new ArrayList<>();
-      long wndTimestamp = (long) Math.floor(TimeUnit.MILLISECONDS.toMinutes(message.getMessage().timestamp) / 5) * 5;
-      String wndKey = String.format("%s-%d", message.getMessage().memberId, wndTimestamp);
+      long wndTimestamp = (long) Math.floor(TimeUnit.MILLISECONDS.toMinutes(message.timestamp) / 5) * 5;
+      String wndKey = String.format("%s-%d", message.memberId, wndTimestamp);
       StatsWindowState curState = this.statsStore.get(wndKey);
       curState.newCount++;
       long curTimeMs = System.currentTimeMillis();
@@ -97,7 +84,7 @@ public class KeyValueStoreExample implements StreamApplication {
         curState.timeAtLastOutput = curTimeMs;
         curState.lastCount += curState.newCount;
         curState.newCount = 0;
-        outputStats.add(new StatsOutput(message.getMessage().memberId, wndTimestamp, curState.lastCount));
+        outputStats.add(new StatsOutput(message.memberId, wndTimestamp, curState.lastCount));
       }
       // update counter w/o generating output
       this.statsStore.put(wndKey, curState);
@@ -110,11 +97,7 @@ public class KeyValueStoreExample implements StreamApplication {
     }
   }
 
-  StreamSpec input1 = new StreamSpec("pageViewEventStream", "PageViewEvent", "kafka");
-
-  StreamSpec output = new StreamSpec("pageViewEventPerMemberStream", "PageViewEventCountByMemberId", "kafka");
-
-  class PageViewEvent implements MessageEnvelope<String, PageViewEvent> {
+  class PageViewEvent {
     String pageId;
     String memberId;
     long timestamp;
@@ -124,19 +107,9 @@ public class KeyValueStoreExample implements StreamApplication {
       this.memberId = memberId;
       this.timestamp = timestamp;
     }
-
-    @Override
-    public String getKey() {
-      return this.pageId;
-    }
-
-    @Override
-    public PageViewEvent getMessage() {
-      return this;
-    }
   }
 
-  class StatsOutput implements MessageEnvelope<String, StatsOutput> {
+  class StatsOutput {
     private String memberId;
     private long timestamp;
     private Integer count;
@@ -146,16 +119,5 @@ public class KeyValueStoreExample implements StreamApplication {
       this.timestamp = timestamp;
       this.count = count;
     }
-
-    @Override
-    public String getKey() {
-      return this.memberId;
-    }
-
-    @Override
-    public StatsOutput getMessage() {
-      return this;
-    }
   }
-
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/test/java/org/apache/samza/example/NoContextStreamExample.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/NoContextStreamExample.java b/samza-core/src/test/java/org/apache/samza/example/NoContextStreamExample.java
deleted file mode 100644
index 516daf5..0000000
--- a/samza-core/src/test/java/org/apache/samza/example/NoContextStreamExample.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.example;
-
-import org.apache.samza.operators.*;
-import org.apache.samza.application.StreamApplication;
-import org.apache.samza.config.Config;
-import org.apache.samza.operators.data.InputMessageEnvelope;
-import org.apache.samza.operators.data.JsonIncomingSystemMessageEnvelope;
-import org.apache.samza.operators.data.Offset;
-import org.apache.samza.operators.functions.JoinFunction;
-import org.apache.samza.serializers.JsonSerde;
-import org.apache.samza.serializers.StringSerde;
-import org.apache.samza.runtime.ApplicationRunner;
-import org.apache.samza.system.StreamSpec;
-import org.apache.samza.system.SystemStreamPartition;
-import org.apache.samza.util.CommandLine;
-
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.List;
-
-
-/**
- * Example {@link StreamApplication} code to test the API methods
- */
-public class NoContextStreamExample implements StreamApplication {
-
-  StreamSpec input1 = new StreamSpec("inputStreamA", "PageViewEvent", "kafka");
-
-  StreamSpec input2 = new StreamSpec("inputStreamB", "RumLixEvent", "kafka");
-
-  StreamSpec output = new StreamSpec("joinedPageViewStream", "PageViewJoinRumLix", "kafka");
-
-  class MessageType {
-    String joinKey;
-    List<String> joinFields = new ArrayList<>();
-  }
-
-  class JsonMessageEnvelope extends JsonIncomingSystemMessageEnvelope<MessageType> {
-    JsonMessageEnvelope(String key, MessageType data, Offset offset, SystemStreamPartition partition) {
-      super(key, data, offset, partition);
-    }
-  }
-
-  private JsonMessageEnvelope getInputMessage(InputMessageEnvelope ism) {
-    return new JsonMessageEnvelope(
-        ((MessageType) ism.getMessage()).joinKey,
-        (MessageType) ism.getMessage(),
-        ism.getOffset(),
-        ism.getSystemStreamPartition());
-  }
-
-  class MyJoinFunction implements JoinFunction<String, JsonMessageEnvelope, JsonMessageEnvelope, JsonIncomingSystemMessageEnvelope<MessageType>> {
-
-    @Override
-    public JsonIncomingSystemMessageEnvelope<MessageType> apply(JsonMessageEnvelope m1,
-        JsonMessageEnvelope m2) {
-      MessageType newJoinMsg = new MessageType();
-      newJoinMsg.joinKey = m1.getKey();
-      newJoinMsg.joinFields.addAll(m1.getMessage().joinFields);
-      newJoinMsg.joinFields.addAll(m2.getMessage().joinFields);
-      return new JsonMessageEnvelope(m1.getMessage().joinKey, newJoinMsg, null, null);
-    }
-
-    @Override
-    public String getFirstKey(JsonMessageEnvelope message) {
-      return message.getKey();
-    }
-
-    @Override
-    public String getSecondKey(JsonMessageEnvelope message) {
-      return message.getKey();
-    }
-  }
-
-
-  /**
-   * used by remote application runner to launch the job in remote program. The remote program should follow the similar
-   * invoking context as in local:
-   *
-   *   public static void main(String args[]) throws Exception {
-   *     CommandLine cmdLine = new CommandLine();
-   *     Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
-   *     ApplicationRunner runner = ApplicationRunner.fromConfig(config);
-   *     runner.run(new NoContextStreamExample(), config);
-   *   }
-   *
-   */
-  @Override public void init(StreamGraph graph, Config config) {
-    MessageStream<InputMessageEnvelope> inputSource1 = graph.<Object, Object, InputMessageEnvelope>createInStream(
-        input1, null, null);
-    MessageStream<InputMessageEnvelope> inputSource2 = graph.<Object, Object, InputMessageEnvelope>createInStream(
-        input2, null, null);
-    OutputStream<JsonIncomingSystemMessageEnvelope<MessageType>> outStream = graph.createOutStream(output,
-        new StringSerde("UTF-8"), new JsonSerde<>());
-
-    inputSource1.map(this::getInputMessage).
-        join(inputSource2.map(this::getInputMessage), new MyJoinFunction(), Duration.ofMinutes(1)).
-        sendTo(outStream);
-
-  }
-
-  // standalone local program model
-  public static void main(String[] args) throws Exception {
-    CommandLine cmdLine = new CommandLine();
-    Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
-    ApplicationRunner localRunner = ApplicationRunner.getLocalRunner(config);
-    localRunner.run(new NoContextStreamExample());
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java b/samza-core/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java
index a338f6b..80d0e16 100644
--- a/samza-core/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java
+++ b/samza-core/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java
@@ -18,17 +18,13 @@
  */
 package org.apache.samza.example;
 
+import org.apache.samza.application.StreamApplication;
 import org.apache.samza.config.Config;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.OutputStream;
-import org.apache.samza.application.StreamApplication;
 import org.apache.samza.operators.StreamGraph;
-import org.apache.samza.operators.data.MessageEnvelope;
 import org.apache.samza.operators.functions.JoinFunction;
-import org.apache.samza.serializers.JsonSerde;
-import org.apache.samza.serializers.StringSerde;
 import org.apache.samza.runtime.ApplicationRunner;
-import org.apache.samza.system.StreamSpec;
 import org.apache.samza.util.CommandLine;
 
 import java.time.Duration;
@@ -38,29 +34,19 @@ import java.time.Duration;
  */
 public class OrderShipmentJoinExample implements StreamApplication {
 
-  /**
-   * used by remote application runner to launch the job in remote program. The remote program should follow the similar
-   * invoking context as in local runner:
-   *
-   *   public static void main(String args[]) throws Exception {
-   *     CommandLine cmdLine = new CommandLine();
-   *     Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
-   *     ApplicationRunner runner = ApplicationRunner.fromConfig(config);
-   *     runner.run(new UserMainExample(), config);
-   *   }
-   *
-   */
-  @Override public void init(StreamGraph graph, Config config) {
-
-    MessageStream<OrderRecord> orders = graph.createInStream(input1, new StringSerde("UTF-8"), new JsonSerde<>());
-    MessageStream<ShipmentRecord> shipments = graph.createInStream(input2, new StringSerde("UTF-8"), new JsonSerde<>());
-    OutputStream<FulFilledOrderRecord> fulfilledOrders = graph.createOutStream(output, new StringSerde("UTF-8"), new JsonSerde<>());
-
-    orders.join(shipments, new MyJoinFunction(), Duration.ofMinutes(1)).sendTo(fulfilledOrders);
+  @Override
+  public void init(StreamGraph graph, Config config) {
+    MessageStream<OrderRecord> orders = graph.getInputStream("orderStream", (k, m) -> (OrderRecord) m);
+    MessageStream<ShipmentRecord> shipments = graph.getInputStream("shipmentStream", (k, m) -> (ShipmentRecord) m);
+    OutputStream<String, FulFilledOrderRecord, FulFilledOrderRecord> joinedOrderShipmentStream =
+        graph.getOutputStream("joinedOrderShipmentStream", m -> m.orderId, m -> m);
 
+    orders
+        .join(shipments, new MyJoinFunction(), Duration.ofMinutes(1))
+        .sendTo(joinedOrderShipmentStream);
   }
 
-  // standalone local program model
+  // local execution mode
   public static void main(String[] args) throws Exception {
     CommandLine cmdLine = new CommandLine();
     Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
@@ -68,13 +54,24 @@ public class OrderShipmentJoinExample implements StreamApplication {
     localRunner.run(new OrderShipmentJoinExample());
   }
 
-  StreamSpec input1 = new StreamSpec("orderStream", "OrderEvent", "kafka");
+  class MyJoinFunction implements JoinFunction<String, OrderRecord, ShipmentRecord, FulFilledOrderRecord> {
+    @Override
+    public FulFilledOrderRecord apply(OrderRecord message, ShipmentRecord otherMessage) {
+      return new FulFilledOrderRecord(message.orderId, message.orderTimeMs, otherMessage.shipTimeMs);
+    }
 
-  StreamSpec input2 = new StreamSpec("shipmentStream", "ShipmentEvent", "kafka");
+    @Override
+    public String getFirstKey(OrderRecord message) {
+      return message.orderId;
+    }
 
-  StreamSpec output = new StreamSpec("joinedOrderShipmentStream", "OrderShipmentJoinEvent", "kafka");
+    @Override
+    public String getSecondKey(ShipmentRecord message) {
+      return message.orderId;
+    }
+  }
 
-  class OrderRecord implements MessageEnvelope<String, OrderRecord> {
+  class OrderRecord {
     String orderId;
     long orderTimeMs;
 
@@ -82,19 +79,9 @@ public class OrderShipmentJoinExample implements StreamApplication {
       this.orderId = orderId;
       this.orderTimeMs = timeMs;
     }
-
-    @Override
-    public String getKey() {
-      return this.orderId;
-    }
-
-    @Override
-    public OrderRecord getMessage() {
-      return this;
-    }
   }
 
-  class ShipmentRecord implements MessageEnvelope<String, ShipmentRecord> {
+  class ShipmentRecord {
     String orderId;
     long shipTimeMs;
 
@@ -102,19 +89,9 @@ public class OrderShipmentJoinExample implements StreamApplication {
       this.orderId = orderId;
       this.shipTimeMs = timeMs;
     }
-
-    @Override
-    public String getKey() {
-      return this.orderId;
-    }
-
-    @Override
-    public ShipmentRecord getMessage() {
-      return this;
-    }
   }
 
-  class FulFilledOrderRecord implements MessageEnvelope<String, FulFilledOrderRecord> {
+  class FulFilledOrderRecord {
     String orderId;
     long orderTimeMs;
     long shipTimeMs;
@@ -124,38 +101,5 @@ public class OrderShipmentJoinExample implements StreamApplication {
       this.orderTimeMs = orderTimeMs;
       this.shipTimeMs = shipTimeMs;
     }
-
-
-    @Override
-    public String getKey() {
-      return this.orderId;
-    }
-
-    @Override
-    public FulFilledOrderRecord getMessage() {
-      return this;
-    }
-  }
-
-  FulFilledOrderRecord myJoinResult(OrderRecord m1, ShipmentRecord m2) {
-    return new FulFilledOrderRecord(m1.getKey(), m1.orderTimeMs, m2.shipTimeMs);
-  }
-
-  class MyJoinFunction implements JoinFunction<String, OrderRecord, ShipmentRecord, FulFilledOrderRecord> {
-
-    @Override
-    public FulFilledOrderRecord apply(OrderRecord message, ShipmentRecord otherMessage) {
-      return OrderShipmentJoinExample.this.myJoinResult(message, otherMessage);
-    }
-
-    @Override
-    public String getFirstKey(OrderRecord message) {
-      return message.getKey();
-    }
-
-    @Override
-    public String getSecondKey(ShipmentRecord message) {
-      return message.getKey();
-    }
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java b/samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java
index 6edf048..547cac6 100644
--- a/samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java
+++ b/samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java
@@ -18,18 +18,17 @@
  */
 package org.apache.samza.example;
 
-import org.apache.samza.operators.*;
 import org.apache.samza.application.StreamApplication;
 import org.apache.samza.config.Config;
-import org.apache.samza.operators.data.MessageEnvelope;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.OutputStream;
+import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.operators.functions.FoldLeftFunction;
 import org.apache.samza.operators.triggers.Triggers;
 import org.apache.samza.operators.windows.AccumulationMode;
 import org.apache.samza.operators.windows.WindowPane;
 import org.apache.samza.operators.windows.Windows;
-import org.apache.samza.serializers.JsonSerde;
-import org.apache.samza.serializers.StringSerde;
 import org.apache.samza.runtime.ApplicationRunner;
-import org.apache.samza.system.StreamSpec;
 import org.apache.samza.util.CommandLine;
 
 import java.time.Duration;
@@ -42,20 +41,22 @@ import java.util.function.Supplier;
 public class PageViewCounterExample implements StreamApplication {
 
   @Override public void init(StreamGraph graph, Config config) {
+    MessageStream<PageViewEvent> pageViewEvents =
+        graph.getInputStream("pageViewEventStream", (k, m) -> (PageViewEvent) m);
+    OutputStream<String, PageViewCount, PageViewCount> pageViewEventPerMemberStream = graph
+        .getOutputStream("pageViewEventPerMemberStream", m -> m.memberId, m -> m);
 
-    MessageStream<PageViewEvent> pageViewEvents = graph.createInStream(input1, new StringSerde("UTF-8"), new JsonSerde<>());
-    OutputStream<MyStreamOutput> pageViewPerMemberCounters = graph.createOutStream(output, new StringSerde("UTF-8"), new JsonSerde<>());
     Supplier<Integer> initialValue = () -> 0;
-
-    pageViewEvents.
-        window(Windows.<PageViewEvent, String, Integer>keyedTumblingWindow(m -> m.getMessage().memberId, Duration.ofSeconds(10), initialValue, (m, c) -> c + 1).
-            setEarlyTrigger(Triggers.repeat(Triggers.count(5))).
-            setAccumulationMode(AccumulationMode.DISCARDING)).
-        map(MyStreamOutput::new).
-        sendTo(pageViewPerMemberCounters);
-
+    FoldLeftFunction<PageViewEvent, Integer> foldLeftFn = (m, c) -> c + 1;
+    pageViewEvents
+        .window(Windows.keyedTumblingWindow(m -> m.memberId, Duration.ofSeconds(10), initialValue, foldLeftFn)
+            .setEarlyTrigger(Triggers.repeat(Triggers.count(5)))
+            .setAccumulationMode(AccumulationMode.DISCARDING))
+        .map(PageViewCount::new)
+        .sendTo(pageViewEventPerMemberStream);
   }
 
+  // local execution mode
   public static void main(String[] args) {
     CommandLine cmdLine = new CommandLine();
     Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
@@ -63,11 +64,7 @@ public class PageViewCounterExample implements StreamApplication {
     localRunner.run(new PageViewCounterExample());
   }
 
-  StreamSpec input1 = new StreamSpec("pageViewEventStream", "PageViewEvent", "kafka");
-
-  StreamSpec output = new StreamSpec("pageViewEventPerMemberStream", "PageViewEventCountByMemberId", "kafka");
-
-  class PageViewEvent implements MessageEnvelope<String, PageViewEvent> {
+  class PageViewEvent {
     String pageId;
     String memberId;
     long timestamp;
@@ -77,38 +74,17 @@ public class PageViewCounterExample implements StreamApplication {
       this.memberId = memberId;
       this.timestamp = timestamp;
     }
-
-    @Override
-    public String getKey() {
-      return this.pageId;
-    }
-
-    @Override
-    public PageViewEvent getMessage() {
-      return this;
-    }
   }
 
-  class MyStreamOutput implements MessageEnvelope<String, MyStreamOutput> {
+  class PageViewCount {
     String memberId;
     long timestamp;
     int count;
 
-    MyStreamOutput(WindowPane<String, Integer> m) {
+    PageViewCount(WindowPane<String, Integer> m) {
       this.memberId = m.getKey().getKey();
       this.timestamp = Long.valueOf(m.getKey().getPaneId());
       this.count = m.getMessage();
     }
-
-    @Override
-    public String getKey() {
-      return this.memberId;
-    }
-
-    @Override
-    public MyStreamOutput getMessage() {
-      return this;
-    }
   }
-
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java b/samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java
index e222fe4..37375cd 100644
--- a/samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java
+++ b/samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java
@@ -18,16 +18,14 @@
  */
 package org.apache.samza.example;
 
-import org.apache.samza.operators.*;
 import org.apache.samza.application.StreamApplication;
 import org.apache.samza.config.Config;
-import org.apache.samza.operators.data.MessageEnvelope;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.OutputStream;
+import org.apache.samza.operators.StreamGraph;
 import org.apache.samza.operators.windows.WindowPane;
 import org.apache.samza.operators.windows.Windows;
-import org.apache.samza.serializers.JsonSerde;
-import org.apache.samza.serializers.StringSerde;
 import org.apache.samza.runtime.ApplicationRunner;
-import org.apache.samza.system.StreamSpec;
 import org.apache.samza.util.CommandLine;
 
 import java.time.Duration;
@@ -39,47 +37,21 @@ import java.util.function.Supplier;
  */
 public class RepartitionExample implements StreamApplication {
 
-  /**
-   * used by remote application runner to launch the job in remote program. The remote program should follow the similar
-   * invoking context as in local runner:
-   *
-   *   public static void main(String args[]) throws Exception {
-   *     CommandLine cmdLine = new CommandLine();
-   *     Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
-   *     ApplicationRunner runner = ApplicationRunner.fromConfig(config);
-   *     runner.run(new UserMainExample(), config);
-   *   }
-   *
-   */
-
-  /**
-   * used by remote execution environment to launch the job in remote program. The remote program should follow the similar
-   * invoking context as in standalone:
-   *
-   *   public static void main(String args[]) throws Exception {
-   *     CommandLine cmdLine = new CommandLine();
-   *     Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
-   *     ExecutionEnvironment remoteEnv = ExecutionEnvironment.getRemoteEnvironment(config);
-   *     remoteEnv.run(new UserMainExample(), config);
-   *   }
-   *
-   */
   @Override public void init(StreamGraph graph, Config config) {
-
-    MessageStream<PageViewEvent> pageViewEvents = graph.createInStream(input1, new StringSerde("UTF-8"), new JsonSerde<>());
-    OutputStream<MyStreamOutput> pageViewPerMemberCounters = graph.createOutStream(output, new StringSerde("UTF-8"), new JsonSerde<>());
     Supplier<Integer> initialValue = () -> 0;
-
-    pageViewEvents.
-        partitionBy(m -> m.getMessage().memberId).
-        window(Windows.<PageViewEvent, String, Integer>keyedTumblingWindow(
-            msg -> msg.getMessage().memberId, Duration.ofMinutes(5), initialValue, (m, c) -> c + 1)).
-        map(MyStreamOutput::new).
-        sendTo(pageViewPerMemberCounters);
-
+    MessageStream<PageViewEvent> pageViewEvents =
+        graph.getInputStream("pageViewEventStream", (k, m) -> (PageViewEvent) m);
+    OutputStream<String, MyStreamOutput, MyStreamOutput> pageViewEventPerMemberStream = graph
+        .getOutputStream("pageViewEventPerMemberStream", m -> m.memberId, m -> m);
+
+    pageViewEvents
+        .partitionBy(m -> m.memberId)
+        .window(Windows.keyedTumblingWindow(m -> m.memberId, Duration.ofMinutes(5), initialValue, (m, c) -> c + 1))
+        .map(MyStreamOutput::new)
+        .sendTo(pageViewEventPerMemberStream);
   }
 
-  // standalone local program model
+  // local execution mode
   public static void main(String[] args) throws Exception {
     CommandLine cmdLine = new CommandLine();
     Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
@@ -87,11 +59,7 @@ public class RepartitionExample implements StreamApplication {
     localRunner.run(new RepartitionExample());
   }
 
-  StreamSpec input1 = new StreamSpec("pageViewEventStream", "PageViewEvent", "kafka");
-
-  StreamSpec output = new StreamSpec("pageViewEventPerMemberStream", "PageViewEventCountByMemberId", "kafka");
-
-  class PageViewEvent implements MessageEnvelope<String, PageViewEvent> {
+  class PageViewEvent {
     String pageId;
     String memberId;
     long timestamp;
@@ -101,19 +69,9 @@ public class RepartitionExample implements StreamApplication {
       this.memberId = memberId;
       this.timestamp = timestamp;
     }
-
-    @Override
-    public String getKey() {
-      return this.pageId;
-    }
-
-    @Override
-    public PageViewEvent getMessage() {
-      return this;
-    }
   }
 
-  class MyStreamOutput implements MessageEnvelope<String, MyStreamOutput> {
+  class MyStreamOutput {
     String memberId;
     long timestamp;
     int count;
@@ -123,16 +81,5 @@ public class RepartitionExample implements StreamApplication {
       this.timestamp = Long.valueOf(m.getKey().getPaneId());
       this.count = m.getMessage();
     }
-
-    @Override
-    public String getKey() {
-      return this.memberId;
-    }
-
-    @Override
-    public MyStreamOutput getMessage() {
-      return this;
-    }
   }
-
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/test/java/org/apache/samza/example/TestBasicStreamGraphs.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/TestBasicStreamGraphs.java b/samza-core/src/test/java/org/apache/samza/example/TestBasicStreamGraphs.java
deleted file mode 100644
index 6975955..0000000
--- a/samza-core/src/test/java/org/apache/samza/example/TestBasicStreamGraphs.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.example;
-
-import java.lang.reflect.Field;
-import org.apache.samza.Partition;
-import org.apache.samza.config.Config;
-import org.apache.samza.config.MapConfig;
-import org.apache.samza.operators.impl.OperatorGraph;
-import org.apache.samza.runtime.ApplicationRunner;
-import org.apache.samza.system.SystemStreamPartition;
-import org.apache.samza.task.StreamOperatorTask;
-import org.apache.samza.task.TaskContext;
-import org.junit.Test;
-
-import java.util.HashSet;
-import java.util.Set;
-
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.*;
-
-
-/**
- * Unit test for {@link StreamOperatorTask}
- */
-public class TestBasicStreamGraphs {
-
-  private final Set<SystemStreamPartition> inputPartitions = new HashSet<SystemStreamPartition>() { {
-      for (int i = 0; i < 4; i++) {
-        this.add(new SystemStreamPartition("my-system", String.format("my-topic%d", i), new Partition(i)));
-      }
-    } };
-
-  private final ApplicationRunner runner = mock(ApplicationRunner.class);
-
-  @Test
-  public void testUserTask() throws Exception {
-    Config config = new MapConfig();
-    TaskContext mockContext = mock(TaskContext.class);
-    when(mockContext.getSystemStreamPartitions()).thenReturn(this.inputPartitions);
-    TestWindowExample userTask = new TestWindowExample(this.inputPartitions);
-    StreamOperatorTask adaptorTask = new StreamOperatorTask(userTask, runner);
-    Field pipelineMapFld = StreamOperatorTask.class.getDeclaredField("operatorGraph");
-    pipelineMapFld.setAccessible(true);
-    OperatorGraph opGraph = (OperatorGraph) pipelineMapFld.get(adaptorTask);
-
-    adaptorTask.init(config, mockContext);
-    this.inputPartitions.forEach(partition -> {
-        assertNotNull(opGraph.get(partition.getSystemStream()));
-      });
-  }
-
-  @Test
-  public void testSplitTask() throws Exception {
-    Config config = new MapConfig();
-    TaskContext mockContext = mock(TaskContext.class);
-    when(mockContext.getSystemStreamPartitions()).thenReturn(this.inputPartitions);
-    TestBroadcastExample splitTask = new TestBroadcastExample(this.inputPartitions);
-    StreamOperatorTask adaptorTask = new StreamOperatorTask(splitTask, runner);
-    Field pipelineMapFld = StreamOperatorTask.class.getDeclaredField("operatorGraph");
-    pipelineMapFld.setAccessible(true);
-    OperatorGraph opGraph = (OperatorGraph) pipelineMapFld.get(adaptorTask);
-
-    adaptorTask.init(config, mockContext);
-    this.inputPartitions.forEach(partition -> {
-        assertNotNull(opGraph.get(partition.getSystemStream()));
-      });
-  }
-
-  @Test
-  public void testJoinTask() throws Exception {
-    Config config = new MapConfig();
-    TaskContext mockContext = mock(TaskContext.class);
-    when(mockContext.getSystemStreamPartitions()).thenReturn(this.inputPartitions);
-    TestJoinExample joinTask = new TestJoinExample(this.inputPartitions);
-    StreamOperatorTask adaptorTask = new StreamOperatorTask(joinTask, runner);
-    Field pipelineMapFld = StreamOperatorTask.class.getDeclaredField("operatorGraph");
-    pipelineMapFld.setAccessible(true);
-    OperatorGraph opGraph = (OperatorGraph) pipelineMapFld.get(adaptorTask);
-
-    adaptorTask.init(config, mockContext);
-    this.inputPartitions.forEach(partition -> {
-        assertNotNull(opGraph.get(partition.getSystemStream()));
-      });
-  }
-
-}