You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2017/04/05 23:42:32 UTC

[4/4] samza git commit: SAMZA-1094 SAMZA-1101 SAMZA-1159; Remove MessageEnvelope from public operator APIs. : Delay the creation of SinkFunction for output streams. : Move StreamSpec from a public API to an internal class.

SAMZA-1094 SAMZA-1101 SAMZA-1159; Remove MessageEnvelope from public operator APIs. : Delay the creation of SinkFunction for output streams. : Move StreamSpec from a public API to an internal class.

Removed the MessageEnvelope and OutputStream interfaces from public operator APIs.
Moved the creation of SinkFunction for output streams to SinkOperatorSpec.
Moved StreamSpec from a public API to an internal class.

Additionally,
1. Removed references to StreamGraph in OperatorSpecs. It was being used to getNextOpId(). MessageStreamsImpl now gets the ID and gives it to OperatorSpecs itself.
2. Updated and cleaned up the StreamGraphBuilder examples.
3. Renamed SinkOperatorSpec to OutputOperatorSpec since its used by sink, sendTo and partitionBy.

nickpan47 and xinyuiscool, please take a look.

Author: Prateek Maheshwari <pm...@linkedin.com>

Reviewers: Jagadish <ja...@apache.org>, Yi Pan <ni...@gmail.com>

Closes #92 from prateekm/message-envelope-removal


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/4bf8ab6e
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/4bf8ab6e
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/4bf8ab6e

Branch: refs/heads/master
Commit: 4bf8ab6ebdf95cdf78f07b81a3b450a7f3fd9d45
Parents: 65af13d
Author: Prateek Maheshwari <pm...@linkedin.com>
Authored: Wed Apr 5 16:42:15 2017 -0700
Committer: vjagadish1989 <jv...@linkedin.com>
Committed: Wed Apr 5 16:42:15 2017 -0700

----------------------------------------------------------------------
 .../apache/samza/operators/MessageStream.java   |  35 +-
 .../apache/samza/operators/OutputStream.java    |  18 +-
 .../org/apache/samza/operators/StreamGraph.java |  80 ++--
 .../operators/data/InputMessageEnvelope.java    |  63 ---
 .../apache/samza/operators/data/LongOffset.java |  80 ----
 .../samza/operators/data/MessageEnvelope.java   |  54 ---
 .../org/apache/samza/operators/data/Offset.java |  31 --
 .../samza/operators/functions/SinkFunction.java |   2 +-
 .../apache/samza/runtime/ApplicationRunner.java |   3 +-
 .../samza/operators/TestMessageEnvelope.java    |  61 ---
 .../operators/TestOutputMessageEnvelope.java    |  43 --
 .../data/TestIncomingSystemMessage.java         |  54 ---
 .../samza/operators/data/TestLongOffset.java    |  79 ----
 .../samza/execution/ExecutionPlanner.java       |  16 +-
 .../samza/operators/MessageStreamImpl.java      |  63 +--
 .../apache/samza/operators/StreamGraphImpl.java | 262 +++---------
 .../samza/operators/impl/OperatorGraph.java     | 185 ---------
 .../samza/operators/impl/OperatorImplGraph.java | 188 +++++++++
 .../samza/operators/impl/SinkOperatorImpl.java  |   2 +
 .../samza/operators/spec/OperatorSpec.java      |  19 +-
 .../samza/operators/spec/OperatorSpecs.java     | 113 ++---
 .../operators/spec/PartialJoinOperatorSpec.java |  13 +-
 .../samza/operators/spec/SinkOperatorSpec.java  |  95 +++--
 .../operators/spec/StreamOperatorSpec.java      |  30 +-
 .../operators/spec/WindowOperatorSpec.java      |  15 +-
 .../operators/stream/InputStreamInternal.java   |  39 ++
 .../stream/InputStreamInternalImpl.java         |  45 ++
 .../stream/IntermediateStreamInternalImpl.java  |  61 +++
 .../operators/stream/OutputStreamInternal.java  |  43 ++
 .../stream/OutputStreamInternalImpl.java        |  52 +++
 .../samza/runtime/RemoteApplicationRunner.java  |   3 +-
 .../apache/samza/task/StreamOperatorTask.java   | 145 +++----
 .../apache/samza/example/BroadcastExample.java  |  69 ++++
 .../samza/example/KeyValueStoreExample.java     |  80 +---
 .../samza/example/NoContextStreamExample.java   | 128 ------
 .../samza/example/OrderShipmentJoinExample.java | 112 ++---
 .../samza/example/PageViewCounterExample.java   |  62 +--
 .../samza/example/RepartitionExample.java       |  85 +---
 .../samza/example/TestBasicStreamGraphs.java    | 103 -----
 .../samza/example/TestBroadcastExample.java     | 107 -----
 .../apache/samza/example/TestExampleBase.java   |  46 ---
 .../apache/samza/example/TestJoinExample.java   | 116 ------
 .../apache/samza/example/TestWindowExample.java |  74 ----
 .../org/apache/samza/example/WindowExample.java |  78 ++++
 .../samza/execution/TestExecutionPlanner.java   | 125 +++---
 .../samza/operators/TestJoinOperator.java       |  31 +-
 .../samza/operators/TestMessageStreamImpl.java  |  34 +-
 .../samza/operators/TestWindowOperator.java     | 413 +++++++++++++++++++
 .../data/JsonIncomingSystemMessageEnvelope.java |  60 ---
 .../operators/data/TestMessageEnvelope.java     |  57 +++
 .../data/TestOutputMessageEnvelope.java         |  39 ++
 .../samza/operators/impl/TestOperatorImpl.java  |   4 +-
 .../samza/operators/impl/TestOperatorImpls.java |  35 +-
 .../operators/impl/TestSinkOperatorImpl.java    |   2 +-
 .../operators/impl/TestStreamOperatorImpl.java  |   4 +-
 .../samza/operators/spec/TestOperatorSpecs.java |  81 ++--
 .../samza/operators/triggers/TestClock.java     |  45 --
 .../operators/triggers/TestWindowOperator.java  | 389 -----------------
 .../org/apache/samza/testUtils/TestClock.java   |  45 ++
 59 files changed, 1762 insertions(+), 2654 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java b/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java
index 16c5976..345bff0 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java
@@ -75,22 +75,24 @@ public interface MessageStream<M> {
   MessageStream<M> filter(FilterFunction<M> filterFn);
 
   /**
-   * Allows sending messages in this {@link MessageStream} to an output using the provided {@link SinkFunction}.
+   * Allows sending messages in this {@link MessageStream} to an output system using the provided {@link SinkFunction}.
    *
-   * NOTE: the output may not be a {@link org.apache.samza.system.SystemStream}. It can be an external database, etc.
+   * NOTE: If the output is for a {@link org.apache.samza.system.SystemStream}, use
+   * {@link #sendTo(OutputStream)} instead. This transform should only be used to output to
+   * non-stream systems (e.g., an external database).
    *
-   * @param sinkFn  the function to send messages in this stream to output
+   * @param sinkFn the function to send messages in this stream to an external system
    */
   void sink(SinkFunction<M> sinkFn);
 
   /**
    * Allows sending messages in this {@link MessageStream} to an output {@link MessageStream}.
    *
-   * NOTE: the {@code stream} has to be a {@link MessageStream}.
-   *
-   * @param stream  the output {@link MessageStream}
+   * @param outputStream the output stream to send messages to
+   * @param <K> the type of key in the outgoing message
+   * @param <V> the type of message in the outgoing message
    */
-  void sendTo(OutputStream<M> stream);
+  <K, V> void sendTo(OutputStream<K, V, M> outputStream);
 
   /**
    * Groups the messages in this {@link MessageStream} according to the provided {@link Window} semantics
@@ -128,19 +130,20 @@ public interface MessageStream<M> {
    * <p>
    * The merging streams must have the same messages of type {@code M}.
    *
-   * @param otherStreams  other {@link MessageStream}s to be merged with this {@link MessageStream}
-   * @return  the merged {@link MessageStream}
+   * @param otherStreams other {@link MessageStream}s to be merged with this {@link MessageStream}
+   * @return the merged {@link MessageStream}
    */
   MessageStream<M> merge(Collection<MessageStream<M>> otherStreams);
 
   /**
-   * Send the input message to an output {@link org.apache.samza.system.SystemStream} and consume it as input {@link MessageStream} again.
-   *
-   * Note: this is an transform function only used in logic DAG. In a physical DAG, this is either translated to a NOOP function, or a {@code MessageStream#sendThrough} function.
+   * Sends the messages of type {@code M}in this {@link MessageStream} to a repartitioned output stream and consumes
+   * them as an input {@link MessageStream} again. Uses keys returned by the {@code keyExtractor} as the partition key.
    *
-   * @param parKeyExtractor  a {@link Function} that extract the partition key from a message in this {@link MessageStream}
-   * @param <K>  the type of partition key
-   * @return  a {@link MessageStream} object after the re-partition
+   * @param keyExtractor the {@link Function} to extract the output message key and partition key from
+   *                     the input message
+   * @param <K> the type of output message key and partition key
+   * @return the repartitioned {@link MessageStream}
    */
-  <K> MessageStream<M> partitionBy(Function<M, K> parKeyExtractor);
+  <K> MessageStream<M> partitionBy(Function<M, K> keyExtractor);
+
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-api/src/main/java/org/apache/samza/operators/OutputStream.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/OutputStream.java b/samza-api/src/main/java/org/apache/samza/operators/OutputStream.java
index 179f0e7..7335d56 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/OutputStream.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/OutputStream.java
@@ -19,23 +19,15 @@
 package org.apache.samza.operators;
 
 import org.apache.samza.annotation.InterfaceStability;
-import org.apache.samza.operators.functions.SinkFunction;
-
 
 /**
- * The interface class defining the specific {@link SinkFunction} for a system {@link OutputStream}.
+ * An output stream to send messages to.
  *
- * @param <M>  The type of message to be send to this output stream
+ * @param <K> the type of key in the outgoing message
+ * @param <V> the type of message in the outgoing message
+ * @param <M> the type of message in this {@link OutputStream}
  */
 @InterfaceStability.Unstable
-public interface OutputStream<M> {
+public interface OutputStream<K, V, M> {
 
-  /**
-   * Returns the specific {@link SinkFunction} for this output stream. The {@link OutputStream} is created
-   * via {@link StreamGraph#createOutStream(StreamSpec, Serde, Serde)} or {@link StreamGraph#createIntStream(StreamSpec, Serde, Serde)}.
-   * Hence, the proper types of serdes for key and value are instantiated and are used in the {@link SinkFunction} returned.
-   *
-   * @return  The pre-defined {@link SinkFunction} to apply proper serdes before sending the message to the output stream.
-   */
-  SinkFunction<M> getSinkFunction();
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-api/src/main/java/org/apache/samza/operators/StreamGraph.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/StreamGraph.java b/samza-api/src/main/java/org/apache/samza/operators/StreamGraph.java
index 30c4576..ff1c580 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/StreamGraph.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/StreamGraph.java
@@ -19,76 +19,52 @@
 package org.apache.samza.operators;
 
 import org.apache.samza.annotation.InterfaceStability;
-import org.apache.samza.operators.data.MessageEnvelope;
-import org.apache.samza.serializers.Serde;
-import org.apache.samza.system.StreamSpec;
 
-import java.util.Map;
+import java.util.function.BiFunction;
+import java.util.function.Function;
 
 /**
- * Job-level programming interface to create an operator DAG and run in various different runtime environments.
+ * Provides APIs for accessing�{@link MessageStream}s to be used to create the DAG of transforms.
  */
 @InterfaceStability.Unstable
 public interface StreamGraph {
-  /**
-   * Method to add an input {@link MessageStream} from the system
-   *
-   * @param streamSpec  the {@link StreamSpec} describing the physical characteristics of the input {@link MessageStream}
-   * @param keySerde  the serde used to serialize/deserialize the message key from the input {@link MessageStream}
-   * @param msgSerde  the serde used to serialize/deserialize the message body from the input {@link MessageStream}
-   * @param <K>  the type of key in the input message
-   * @param <V>  the type of message in the input message
-   * @param <M>  the type of {@link MessageEnvelope} in the input {@link MessageStream}
-   * @return   the input {@link MessageStream} object
-   */
-  <K, V, M extends MessageEnvelope<K, V>> MessageStream<M> createInStream(StreamSpec streamSpec, Serde<K> keySerde, Serde<V> msgSerde);
-
-  /**
-   * Method to add an output {@link MessageStream} from the system
-   *
-   * @param streamSpec  the {@link StreamSpec} describing the physical characteristics of the output {@link MessageStream}
-   * @param keySerde  the serde used to serialize/deserialize the message key from the output {@link MessageStream}
-   * @param msgSerde  the serde used to serialize/deserialize the message body from the output {@link MessageStream}
-   * @param <K>  the type of key in the output message
-   * @param <V>  the type of message in the output message
-   * @param <M>  the type of {@link MessageEnvelope} in the output {@link MessageStream}
-   * @return   the output {@link MessageStream} object
-   */
-  <K, V, M extends MessageEnvelope<K, V>> OutputStream<M> createOutStream(StreamSpec streamSpec, Serde<K> keySerde, Serde<V> msgSerde);
-
-  /**
-   * Method to add an intermediate {@link MessageStream} from the system
-   *
-   * @param streamSpec  the {@link StreamSpec} describing the physical characteristics of the intermediate {@link MessageStream}
-   * @param keySerde  the serde used to serialize/deserialize the message key from the intermediate {@link MessageStream}
-   * @param msgSerde  the serde used to serialize/deserialize the message body from the intermediate {@link MessageStream}
-   * @param <K>  the type of key in the intermediate message
-   * @param <V>  the type of message in the intermediate message
-   * @param <M>  the type of {@link MessageEnvelope} in the intermediate {@link MessageStream}
-   * @return   the intermediate {@link MessageStream} object
-   */
-  <K, V, M extends MessageEnvelope<K, V>> OutputStream<M> createIntStream(StreamSpec streamSpec, Serde<K> keySerde, Serde<V> msgSerde);
 
   /**
-   * Method to get the input {@link MessageStream}s
+   * Gets the input {@link MessageStream} corresponding to the logical {@code streamId}.
    *
+   * @param streamId the unique logical ID for the stream
+   * @param msgBuilder the {@link BiFunction} to convert the incoming key and message to a message
+   *                   in the input {@link MessageStream}
+   * @param <K> the type of key in the incoming message
+   * @param <V> the type of message in the incoming message
+   * @param <M> the type of message in the input {@link MessageStream}
    * @return the input {@link MessageStream}
    */
-  Map<StreamSpec, MessageStream> getInStreams();
+  <K, V, M> MessageStream<M> getInputStream(String streamId, BiFunction<K, V, M> msgBuilder);
 
   /**
-   * Method to get the {@link OutputStream}s
+   * Gets the {@link OutputStream} corresponding to the logical {@code streamId}.
    *
-   * @return  the map of all {@link OutputStream}s
+   * @param streamId the unique logical ID for the stream
+   * @param keyExtractor the {@link Function} to extract the outgoing key from the output message
+   * @param msgExtractor the {@link Function} to extract the outgoing message from the output message
+   * @param <K> the type of key in the outgoing message
+   * @param <V> the type of message in the outgoing message
+   * @param <M> the type of message in the {@link OutputStream}
+   * @return the output {@link MessageStream}
    */
-  Map<StreamSpec, OutputStream> getOutStreams();
+  <K, V, M> OutputStream<K, V, M> getOutputStream(String streamId,
+      Function<M, K> keyExtractor, Function<M, V> msgExtractor);
 
   /**
-   * Method to set the {@link ContextManager} for this {@link StreamGraph}
+   * Sets the {@link ContextManager} for this {@link StreamGraph}.
+   *
+   * The provided {@code contextManager} will be initialized before the transformation functions
+   * and can be used to setup shared context between them.
    *
-   * @param manager  the {@link ContextManager} object
-   * @return  this {@link StreamGraph} object
+   * @param contextManager the {@link ContextManager} to use for the {@link StreamGraph}
+   * @return the {@link StreamGraph} with the {@code contextManager} as its {@link ContextManager}
    */
-  StreamGraph withContextManager(ContextManager manager);
+  StreamGraph withContextManager(ContextManager contextManager);
 
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-api/src/main/java/org/apache/samza/operators/data/InputMessageEnvelope.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/data/InputMessageEnvelope.java b/samza-api/src/main/java/org/apache/samza/operators/data/InputMessageEnvelope.java
deleted file mode 100644
index 306145b..0000000
--- a/samza-api/src/main/java/org/apache/samza/operators/data/InputMessageEnvelope.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.data;
-
-import org.apache.samza.system.IncomingMessageEnvelope;
-import org.apache.samza.system.SystemStreamPartition;
-
-
-/**
- * A {@link MessageEnvelope} that provides additional information about its input {@link SystemStreamPartition}
- * and its {@link Offset} within the {@link SystemStreamPartition}.
- * <p>
- * Note: the {@link Offset} is only unique and comparable within its {@link SystemStreamPartition}.
- */
-public class InputMessageEnvelope implements MessageEnvelope<Object, Object> {
-
-  private final IncomingMessageEnvelope ime;
-
-  /**
-   * Creates an {@code InputMessageEnvelope} from the {@link IncomingMessageEnvelope}.
-   *
-   * @param ime  the {@link IncomingMessageEnvelope} from the input system.
-   */
-  public InputMessageEnvelope(IncomingMessageEnvelope ime) {
-    this.ime = ime;
-  }
-
-  @Override
-  public Object getKey() {
-    return this.ime.getKey();
-  }
-
-  @Override
-  public Object getMessage() {
-    return this.ime.getMessage();
-  }
-
-  public Offset getOffset() {
-    // TODO: need to add offset factory to generate different types of offset. This is just a placeholder,
-    // assuming incoming message envelope carries long value as offset (i.e. Kafka case)
-    return new LongOffset(this.ime.getOffset());
-  }
-
-  public SystemStreamPartition getSystemStreamPartition() {
-    return this.ime.getSystemStreamPartition();
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-api/src/main/java/org/apache/samza/operators/data/LongOffset.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/data/LongOffset.java b/samza-api/src/main/java/org/apache/samza/operators/data/LongOffset.java
deleted file mode 100644
index 0b6c0fa..0000000
--- a/samza-api/src/main/java/org/apache/samza/operators/data/LongOffset.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.operators.data;
-
-/**
- * An implementation of {@link org.apache.samza.operators.data.Offset}, w/ {@code long} value as the offset
- */
-public class LongOffset implements Offset {
-
-  /**
-   * The offset value in {@code long}
-   */
-  private final Long offset;
-
-  private LongOffset(long offset) {
-    this.offset = offset;
-  }
-
-  public LongOffset(String offset) {
-    this.offset = Long.valueOf(offset);
-  }
-
-  @Override
-  public int compareTo(Offset o) {
-    if (!(o instanceof LongOffset)) {
-      throw new IllegalArgumentException("Not comparable offset classes. LongOffset vs " + o.getClass().getName());
-    }
-    LongOffset other = (LongOffset) o;
-    return this.offset.compareTo(other.offset);
-  }
-
-  @Override
-  public boolean equals(Object other) {
-    if (!(other instanceof LongOffset)) {
-      return false;
-    }
-    LongOffset o = (LongOffset) other;
-    return this.offset.equals(o.offset);
-  }
-
-  @Override
-  public int hashCode() {
-    return offset.hashCode();
-  }
-
-  /**
-   * Helper method to get the minimum offset
-   *
-   * @return The minimum offset
-   */
-  public static LongOffset getMinOffset() {
-    return new LongOffset(Long.MIN_VALUE);
-  }
-
-  /**
-   * Helper method to get the maximum offset
-   *
-   * @return The maximum offset
-   */
-  public static LongOffset getMaxOffset() {
-    return new LongOffset(Long.MAX_VALUE);
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-api/src/main/java/org/apache/samza/operators/data/MessageEnvelope.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/data/MessageEnvelope.java b/samza-api/src/main/java/org/apache/samza/operators/data/MessageEnvelope.java
deleted file mode 100644
index 703a44c..0000000
--- a/samza-api/src/main/java/org/apache/samza/operators/data/MessageEnvelope.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.operators.data;
-
-import org.apache.samza.annotation.InterfaceStability;
-
-
-/**
- * An entry in the input/output {@link org.apache.samza.operators.MessageStream}s
- */
-@InterfaceStability.Unstable
-public interface MessageEnvelope<K, M> {
-
-  /**
-   * Get the key for this {@link MessageEnvelope}.
-   *
-   * @return  the key for this {@link MessageEnvelope}
-   */
-  K getKey();
-
-  /**
-   * Get the message in this {@link MessageEnvelope}.
-   *
-   * @return  the message in this {@link MessageEnvelope}
-   */
-  M getMessage();
-
-  /**
-   * Whether this {@link MessageEnvelope} indicates deletion of a previous message with this key.
-   *
-   * @return  true if the current {@link MessageEnvelope} indicates deletion of a previous message with this key
-   */
-  default boolean isDelete() {
-    return false;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-api/src/main/java/org/apache/samza/operators/data/Offset.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/data/Offset.java b/samza-api/src/main/java/org/apache/samza/operators/data/Offset.java
deleted file mode 100644
index 5ac1ad7..0000000
--- a/samza-api/src/main/java/org/apache/samza/operators/data/Offset.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.operators.data;
-
-import org.apache.samza.annotation.InterfaceStability;
-
-
-/**
- * A generic interface extending {@link java.lang.Comparable} to be used as {@code Offset} in a stream
- */
-@InterfaceStability.Unstable
-public interface Offset extends Comparable<Offset> {
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-api/src/main/java/org/apache/samza/operators/functions/SinkFunction.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/SinkFunction.java b/samza-api/src/main/java/org/apache/samza/operators/functions/SinkFunction.java
index 08e090a..1d140ee 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/functions/SinkFunction.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/functions/SinkFunction.java
@@ -37,7 +37,7 @@ public interface SinkFunction<M>  extends InitableFunction {
    * or shut the container down.
    *
    * @param message  the input message to be sent to an output {@link org.apache.samza.system.SystemStream}
-   * @param messageCollector  the {@link MessageCollector} to send the {@link org.apache.samza.operators.data.MessageEnvelope}
+   * @param messageCollector  the {@link MessageCollector} to send the message
    * @param taskCoordinator  the {@link TaskCoordinator} to request commits or shutdown
    */
   void apply(M message, MessageCollector messageCollector, TaskCoordinator taskCoordinator);

http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-api/src/main/java/org/apache/samza/runtime/ApplicationRunner.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/runtime/ApplicationRunner.java b/samza-api/src/main/java/org/apache/samza/runtime/ApplicationRunner.java
index e4e24b4..b761d86 100644
--- a/samza-api/src/main/java/org/apache/samza/runtime/ApplicationRunner.java
+++ b/samza-api/src/main/java/org/apache/samza/runtime/ApplicationRunner.java
@@ -18,7 +18,6 @@
  */
 package org.apache.samza.runtime;
 
-import java.lang.reflect.Constructor;
 import org.apache.samza.annotation.InterfaceStability;
 import org.apache.samza.application.StreamApplication;
 import org.apache.samza.config.Config;
@@ -26,6 +25,8 @@ import org.apache.samza.config.ConfigException;
 import org.apache.samza.job.ApplicationStatus;
 import org.apache.samza.system.StreamSpec;
 
+import java.lang.reflect.Constructor;
+
 
 /**
  * A physical execution engine to deploy the config and jobs to run the {@link org.apache.samza.operators.StreamGraph}

http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-api/src/test/java/org/apache/samza/operators/TestMessageEnvelope.java
----------------------------------------------------------------------
diff --git a/samza-api/src/test/java/org/apache/samza/operators/TestMessageEnvelope.java b/samza-api/src/test/java/org/apache/samza/operators/TestMessageEnvelope.java
deleted file mode 100644
index dfa69ac..0000000
--- a/samza-api/src/test/java/org/apache/samza/operators/TestMessageEnvelope.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators;
-
-import org.apache.samza.operators.data.MessageEnvelope;
-
-
-public class TestMessageEnvelope implements MessageEnvelope<String, TestMessageEnvelope.MessageType> {
-
-  private final String key;
-  private final MessageType value;
-
-  public TestMessageEnvelope(String key, String value, long eventTime) {
-    this.key = key;
-    this.value = new MessageType(value, eventTime);
-  }
-
-  @Override
-  public MessageType getMessage() {
-    return this.value;
-  }
-
-  @Override
-  public String getKey() {
-    return this.key;
-  }
-
-  public class MessageType {
-    private final String value;
-    private final long eventTime;
-
-    public MessageType(String value, long eventTime) {
-      this.value = value;
-      this.eventTime = eventTime;
-    }
-
-    public long getEventTime() {
-      return eventTime;
-    }
-
-    public String getValue() {
-      return value;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-api/src/test/java/org/apache/samza/operators/TestOutputMessageEnvelope.java
----------------------------------------------------------------------
diff --git a/samza-api/src/test/java/org/apache/samza/operators/TestOutputMessageEnvelope.java b/samza-api/src/test/java/org/apache/samza/operators/TestOutputMessageEnvelope.java
deleted file mode 100644
index 284b30b..0000000
--- a/samza-api/src/test/java/org/apache/samza/operators/TestOutputMessageEnvelope.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators;
-
-import org.apache.samza.operators.data.MessageEnvelope;
-
-
-public class TestOutputMessageEnvelope implements MessageEnvelope<String, Integer> {
-  private final String key;
-  private final Integer value;
-
-  public TestOutputMessageEnvelope(String key, Integer value) {
-    this.key = key;
-    this.value = value;
-  }
-
-  @Override
-  public Integer getMessage() {
-    return this.value;
-  }
-
-  @Override
-  public String getKey() {
-    return this.key;
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-api/src/test/java/org/apache/samza/operators/data/TestIncomingSystemMessage.java
----------------------------------------------------------------------
diff --git a/samza-api/src/test/java/org/apache/samza/operators/data/TestIncomingSystemMessage.java b/samza-api/src/test/java/org/apache/samza/operators/data/TestIncomingSystemMessage.java
deleted file mode 100644
index e3a1290..0000000
--- a/samza-api/src/test/java/org/apache/samza/operators/data/TestIncomingSystemMessage.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.data;
-
-import org.apache.samza.system.IncomingMessageEnvelope;
-import org.apache.samza.system.SystemStreamPartition;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-
-public class TestIncomingSystemMessage {
-
-  @Test
-  public void testConstructor() {
-    IncomingMessageEnvelope ime = mock(IncomingMessageEnvelope.class);
-    InputMessageEnvelope ism = new InputMessageEnvelope(ime);
-
-    Object mockKey = mock(Object.class);
-    Object mockValue = mock(Object.class);
-    LongOffset testOffset = new LongOffset("12345");
-    SystemStreamPartition mockSsp = mock(SystemStreamPartition.class);
-
-    when(ime.getKey()).thenReturn(mockKey);
-    when(ime.getMessage()).thenReturn(mockValue);
-    when(ime.getSystemStreamPartition()).thenReturn(mockSsp);
-    when(ime.getOffset()).thenReturn("12345");
-
-    assertEquals(ism.getKey(), mockKey);
-    assertEquals(ism.getMessage(), mockValue);
-    assertEquals(ism.getSystemStreamPartition(), mockSsp);
-    assertEquals(ism.getOffset(), testOffset);
-    assertFalse(ism.isDelete());
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-api/src/test/java/org/apache/samza/operators/data/TestLongOffset.java
----------------------------------------------------------------------
diff --git a/samza-api/src/test/java/org/apache/samza/operators/data/TestLongOffset.java b/samza-api/src/test/java/org/apache/samza/operators/data/TestLongOffset.java
deleted file mode 100644
index 7838896..0000000
--- a/samza-api/src/test/java/org/apache/samza/operators/data/TestLongOffset.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.data;
-
-import org.junit.Test;
-
-import java.lang.reflect.Field;
-
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.mock;
-
-
-public class TestLongOffset {
-
-  @Test
-  public void testConstructor() throws Exception {
-    LongOffset o1 = new LongOffset("12345");
-    Field offsetField = LongOffset.class.getDeclaredField("offset");
-    offsetField.setAccessible(true);
-    Long x = (Long) offsetField.get(o1);
-    assertEquals(x.longValue(), 12345L);
-
-    o1 = new LongOffset("012345");
-    x = (Long) offsetField.get(o1);
-    assertEquals(x.longValue(), 12345L);
-
-    try {
-      o1 = new LongOffset("xyz");
-      fail("Constructor of LongOffset should have failed w/ mal-formatted numbers");
-    } catch (NumberFormatException nfe) {
-      // expected
-    }
-  }
-
-  @Test
-  public void testComparator() {
-    LongOffset o1 = new LongOffset("11111");
-    Offset other = mock(Offset.class);
-    try {
-      o1.compareTo(other);
-      fail("compareTo() should have have failed when comparing to an object of a different class");
-    } catch (IllegalArgumentException iae) {
-      // expected
-    }
-
-    LongOffset o2 = new LongOffset("-10000");
-    assertEquals(o1.compareTo(o2), 1);
-    LongOffset o3 = new LongOffset("22222");
-    assertEquals(o1.compareTo(o3), -1);
-    LongOffset o4 = new LongOffset("11111");
-    assertEquals(o1.compareTo(o4), 0);
-  }
-
-  @Test
-  public void testEquals() {
-    LongOffset o1 = new LongOffset("12345");
-    Offset other = mock(Offset.class);
-    assertFalse(o1.equals(other));
-
-    LongOffset o2 = new LongOffset("0012345");
-    assertTrue(o1.equals(o2));
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java b/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
index 47deecd..be807e9 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
@@ -33,7 +33,7 @@ import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.MessageStreamImpl;
-import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.operators.StreamGraphImpl;
 import org.apache.samza.operators.spec.OperatorSpec;
 import org.apache.samza.operators.spec.PartialJoinOperatorSpec;
 import org.apache.samza.system.StreamSpec;
@@ -57,7 +57,7 @@ public class ExecutionPlanner {
     this.streamManager = streamManager;
   }
 
-  public JobGraph plan(StreamGraph streamGraph) throws Exception {
+  public JobGraph plan(StreamGraphImpl streamGraph) throws Exception {
     // create physical job graph based on stream graph
     JobGraph jobGraph = createJobGraph(streamGraph);
 
@@ -72,10 +72,10 @@ public class ExecutionPlanner {
   /**
    * Create the physical graph from StreamGraph
    */
-  /* package private */ JobGraph createJobGraph(StreamGraph streamGraph) {
+  /* package private */ JobGraph createJobGraph(StreamGraphImpl streamGraph) {
     JobGraph jobGraph = new JobGraph(streamGraph, config);
-    Set<StreamSpec> sourceStreams = new HashSet<>(streamGraph.getInStreams().keySet());
-    Set<StreamSpec> sinkStreams = new HashSet<>(streamGraph.getOutStreams().keySet());
+    Set<StreamSpec> sourceStreams = new HashSet<>(streamGraph.getInputStreams().keySet());
+    Set<StreamSpec> sinkStreams = new HashSet<>(streamGraph.getOutputStreams().keySet());
     Set<StreamSpec> intStreams = new HashSet<>(sourceStreams);
     intStreams.retainAll(sinkStreams);
     sourceStreams.removeAll(intStreams);
@@ -103,7 +103,7 @@ public class ExecutionPlanner {
   /**
    * Figure out the number of partitions of all streams
    */
-  /* package private */ void calculatePartitions(StreamGraph streamGraph, JobGraph jobGraph) {
+  /* package private */ void calculatePartitions(StreamGraphImpl streamGraph, JobGraph jobGraph) {
     // fetch the external streams partition info
     updateExistingPartitions(jobGraph, streamManager);
 
@@ -152,7 +152,7 @@ public class ExecutionPlanner {
   /**
    * Calculate the partitions for the input streams of join operators
    */
-  /* package private */ static void calculateJoinInputPartitions(StreamGraph streamGraph, JobGraph jobGraph) {
+  /* package private */ static void calculateJoinInputPartitions(StreamGraphImpl streamGraph, JobGraph jobGraph) {
     // mapping from a source stream to all join specs reachable from it
     Multimap<OperatorSpec, StreamEdge> joinSpecToStreamEdges = HashMultimap.create();
     // reverse mapping of the above
@@ -166,7 +166,7 @@ public class ExecutionPlanner {
     // The visited set keeps track of the join specs that have been already inserted in the queue before
     Set<OperatorSpec> visited = new HashSet<>();
 
-    streamGraph.getInStreams().entrySet().forEach(entry -> {
+    streamGraph.getInputStreams().entrySet().forEach(entry -> {
         StreamEdge streamEdge = jobGraph.getOrCreateEdge(entry.getKey());
         // Traverses the StreamGraph to find and update mappings for all Joins reachable from this input StreamEdge
         findReachableJoins(entry.getValue(), streamEdge, joinSpecToStreamEdges, streamEdgeToJoinSpecs,

http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java b/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
index 339df7a..dfe231e 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
@@ -28,6 +28,8 @@ import org.apache.samza.operators.functions.PartialJoinFunction;
 import org.apache.samza.operators.functions.SinkFunction;
 import org.apache.samza.operators.spec.OperatorSpec;
 import org.apache.samza.operators.spec.OperatorSpecs;
+import org.apache.samza.operators.spec.SinkOperatorSpec;
+import org.apache.samza.operators.stream.OutputStreamInternal;
 import org.apache.samza.operators.util.InternalInMemoryStore;
 import org.apache.samza.operators.windows.Window;
 import org.apache.samza.operators.windows.WindowPane;
@@ -51,7 +53,7 @@ import java.util.function.Function;
  */
 public class MessageStreamImpl<M> implements MessageStream<M> {
   /**
-   * The {@link StreamGraphImpl} object that contains this {@link MessageStreamImpl}
+   * The {@link StreamGraphImpl} that contains this {@link MessageStreamImpl}
    */
   private final StreamGraphImpl graph;
 
@@ -65,52 +67,59 @@ public class MessageStreamImpl<M> implements MessageStream<M> {
    *
    * @param graph the {@link StreamGraphImpl} object that this stream belongs to
    */
-  MessageStreamImpl(StreamGraphImpl graph) {
+  public MessageStreamImpl(StreamGraphImpl graph) {
     this.graph = graph;
   }
 
   @Override
   public <TM> MessageStream<TM> map(MapFunction<M, TM> mapFn) {
-    OperatorSpec<TM> op = OperatorSpecs.<M, TM>createMapOperatorSpec(mapFn, this.graph, new MessageStreamImpl<>(this.graph));
+    OperatorSpec<TM> op = OperatorSpecs.createMapOperatorSpec(
+        mapFn, new MessageStreamImpl<>(this.graph), this.graph.getNextOpId());
     this.registeredOperatorSpecs.add(op);
     return op.getNextStream();
   }
 
   @Override
   public MessageStream<M> filter(FilterFunction<M> filterFn) {
-    OperatorSpec<M> op = OperatorSpecs.<M>createFilterOperatorSpec(filterFn, this.graph, new MessageStreamImpl<>(this.graph));
+    OperatorSpec<M> op = OperatorSpecs.createFilterOperatorSpec(
+        filterFn, new MessageStreamImpl<>(this.graph), this.graph.getNextOpId());
     this.registeredOperatorSpecs.add(op);
     return op.getNextStream();
   }
 
   @Override
   public <TM> MessageStream<TM> flatMap(FlatMapFunction<M, TM> flatMapFn) {
-    OperatorSpec<TM> op = OperatorSpecs.createStreamOperatorSpec(flatMapFn, this.graph, new MessageStreamImpl<>(this.graph));
+    OperatorSpec<TM> op = OperatorSpecs.createStreamOperatorSpec(
+        flatMapFn, new MessageStreamImpl<>(this.graph), this.graph.getNextOpId());
     this.registeredOperatorSpecs.add(op);
     return op.getNextStream();
   }
 
   @Override
   public void sink(SinkFunction<M> sinkFn) {
-    this.registeredOperatorSpecs.add(OperatorSpecs.createSinkOperatorSpec(sinkFn, this.graph));
+    SinkOperatorSpec<M> op = OperatorSpecs.createSinkOperatorSpec(sinkFn, this.graph.getNextOpId());
+    this.registeredOperatorSpecs.add(op);
   }
 
   @Override
-  public void sendTo(OutputStream<M> stream) {
-    this.registeredOperatorSpecs.add(OperatorSpecs.createSendToOperatorSpec(stream.getSinkFunction(), this.graph, stream));
+  public <K, V> void sendTo(OutputStream<K, V, M> outputStream) {
+    SinkOperatorSpec<M> op = OperatorSpecs.createSendToOperatorSpec(
+        (OutputStreamInternal<K, V, M>) outputStream, this.graph.getNextOpId());
+    this.registeredOperatorSpecs.add(op);
   }
 
   @Override
   public <K, WV> MessageStream<WindowPane<K, WV>> window(Window<M, K, WV> window) {
     OperatorSpec<WindowPane<K, WV>> wndOp = OperatorSpecs.createWindowOperatorSpec((WindowInternal<M, K, WV>) window,
-        this.graph, new MessageStreamImpl<>(this.graph));
+        new MessageStreamImpl<>(this.graph), this.graph.getNextOpId());
     this.registeredOperatorSpecs.add(wndOp);
     return wndOp.getNextStream();
   }
 
   @Override
-  public <K, JM, RM> MessageStream<RM> join(MessageStream<JM> otherStream, JoinFunction<K, M, JM, RM> joinFn, Duration ttl) {
-    MessageStreamImpl<RM> outputStream = new MessageStreamImpl<>(this.graph);
+  public <K, JM, RM> MessageStream<RM> join(
+      MessageStream<JM> otherStream, JoinFunction<K, M, JM, RM> joinFn, Duration ttl) {
+    MessageStreamImpl<RM> nextStream = new MessageStreamImpl<>(this.graph);
 
     PartialJoinFunction<K, M, JM, RM> thisPartialJoinFn = new PartialJoinFunction<K, M, JM, RM>() {
       private KeyValueStore<K, PartialJoinMessage<M>> thisStreamState;
@@ -163,32 +172,36 @@ public class MessageStreamImpl<M> implements MessageStream<M> {
       }
     };
 
-    this.registeredOperatorSpecs.add(OperatorSpecs.<K, M, JM, RM>createPartialJoinOperatorSpec(
-        thisPartialJoinFn, otherPartialJoinFn, ttl.toMillis(), this.graph, outputStream));
+    this.registeredOperatorSpecs.add(OperatorSpecs.createPartialJoinOperatorSpec(
+        thisPartialJoinFn, otherPartialJoinFn, ttl.toMillis(), nextStream, this.graph.getNextOpId()));
 
-    ((MessageStreamImpl<JM>) otherStream).registeredOperatorSpecs.add(OperatorSpecs.<K, JM, M, RM>createPartialJoinOperatorSpec(
-        otherPartialJoinFn, thisPartialJoinFn, ttl.toMillis(), this.graph, outputStream));
+    ((MessageStreamImpl<JM>) otherStream).registeredOperatorSpecs
+        .add(OperatorSpecs.createPartialJoinOperatorSpec(
+            otherPartialJoinFn, thisPartialJoinFn, ttl.toMillis(), nextStream, this.graph.getNextOpId()));
 
-    return outputStream;
+    return nextStream;
   }
 
   @Override
   public MessageStream<M> merge(Collection<MessageStream<M>> otherStreams) {
-    MessageStreamImpl<M> outputStream = new MessageStreamImpl<>(this.graph);
+    MessageStreamImpl<M> nextStream = new MessageStreamImpl<>(this.graph);
 
     otherStreams.add(this);
     otherStreams.forEach(other -> ((MessageStreamImpl<M>) other).registeredOperatorSpecs.
-        add(OperatorSpecs.createMergeOperatorSpec(this.graph, outputStream)));
-    return outputStream;
+        add(OperatorSpecs.createMergeOperatorSpec(nextStream, this.graph.getNextOpId())));
+    return nextStream;
   }
 
   @Override
-  public <K> MessageStream<M> partitionBy(Function<M, K> parKeyExtractor) {
-    int opId = graph.getNextOpId();
-    MessageStreamImpl<M> intStream = this.graph.generateIntStreamFromOpId(opId, parKeyExtractor);
-    OutputStream<M> outputStream = this.graph.getOutputStream(intStream);
-    this.registeredOperatorSpecs.add(OperatorSpecs.createPartitionOperatorSpec(outputStream.getSinkFunction(), outputStream, opId));
-    return intStream;
+  public <K> MessageStream<M> partitionBy(Function<M, K> keyExtractor) {
+    int opId = this.graph.getNextOpId();
+    String opName = String.format("%s-%s", OperatorSpec.OpCode.PARTITION_BY.name().toLowerCase(), opId);
+    MessageStreamImpl<M> intermediateStream =
+        this.graph.<K, M, M>getIntermediateStream(opName, keyExtractor, m -> m, (k, m) -> m);
+    SinkOperatorSpec<M> partitionByOperatorSpec = OperatorSpecs.createPartitionByOperatorSpec(
+        (OutputStreamInternal<K, M, M>) intermediateStream, opId);
+    this.registeredOperatorSpecs.add(partitionByOperatorSpec);
+    return intermediateStream;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java b/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
index 6f7377b..a49b68e 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
@@ -18,248 +18,110 @@
  */
 package org.apache.samza.operators;
 
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.function.Function;
-
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
-import org.apache.samza.operators.data.MessageEnvelope;
-import org.apache.samza.operators.functions.SinkFunction;
-import org.apache.samza.operators.spec.OperatorSpec;
+import org.apache.samza.operators.stream.InputStreamInternal;
+import org.apache.samza.operators.stream.InputStreamInternalImpl;
+import org.apache.samza.operators.stream.IntermediateStreamInternalImpl;
+import org.apache.samza.operators.stream.OutputStreamInternal;
+import org.apache.samza.operators.stream.OutputStreamInternalImpl;
 import org.apache.samza.runtime.ApplicationRunner;
-import org.apache.samza.serializers.Serde;
-import org.apache.samza.system.OutgoingMessageEnvelope;
 import org.apache.samza.system.StreamSpec;
-import org.apache.samza.system.SystemStream;
-import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.TaskCoordinator;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.BiFunction;
+import java.util.function.Function;
 
 /**
- * The implementation of {@link StreamGraph} interface. This class provides implementation of methods to allow users to
- * create system input/output/intermediate streams.
+ * A {@link StreamGraph} that provides APIs for accessing�{@link MessageStream}s to be used to
+ * create the DAG of transforms.
  */
 public class StreamGraphImpl implements StreamGraph {
 
   /**
-   * Unique identifier for each {@link org.apache.samza.operators.spec.OperatorSpec} added to transform the {@link MessageEnvelope}
-   * in the input {@link MessageStream}s.
+   * Unique identifier for each {@link org.apache.samza.operators.spec.OperatorSpec} in the graph.
+   * Should only be accessed by {@link MessageStreamImpl} via {@link #getNextOpId()}.
    */
   private int opId = 0;
 
-  // TODO: SAMZA-1101: the instantiation of physical streams and the physical sink functions should be delayed
-  // after physical deployment. The input/output/intermediate stream creation should also be delegated to {@link ExecutionEnvironment}
-  // s.t. we can allow different physical instantiation of stream under different execution environment w/o code change.
-  private class InputStreamImpl<K, V, M extends MessageEnvelope<K, V>> extends MessageStreamImpl<M> {
-    final StreamSpec spec;
-    final Serde<K> keySerde;
-    final Serde<V> msgSerde;
-
-    InputStreamImpl(StreamGraphImpl graph, StreamSpec streamSpec, Serde<K> keySerde, Serde<V> msgSerde) {
-      super(graph);
-      this.spec = streamSpec;
-      this.keySerde = keySerde;
-      this.msgSerde = msgSerde;
-    }
-
-    StreamSpec getSpec() {
-      return this.spec;
-    }
-
-  }
-
-  private class OutputStreamImpl<K, V, M extends MessageEnvelope<K, V>> implements OutputStream<M> {
-    final StreamSpec spec;
-    final Serde<K> keySerde;
-    final Serde<V> msgSerde;
-
-    OutputStreamImpl(StreamGraphImpl graph, StreamSpec streamSpec, Serde<K> keySerde, Serde<V> msgSerde) {
-      this.spec = streamSpec;
-      this.keySerde = keySerde;
-      this.msgSerde = msgSerde;
-    }
-
-    StreamSpec getSpec() {
-      return this.spec;
-    }
-
-    @Override
-    public SinkFunction<M> getSinkFunction() {
-      return (M message, MessageCollector mc, TaskCoordinator tc) -> {
-        // TODO: need to find a way to directly pass in the serde class names
-        // mc.send(new OutgoingMessageEnvelope(this.spec.getSystemStream(), this.keySerde.getClass().getName(), this.msgSerde.getClass().getName(),
-        //    message.getKey(), message.getKey(), message.getMessage()));
-        mc.send(new OutgoingMessageEnvelope(new SystemStream(this.spec.getSystemName(), this.spec.getPhysicalName()), message.getKey(), message.getMessage()));
-      };
-    }
-  }
-
-  private class IntermediateStreamImpl<PK, K, V, M extends MessageEnvelope<K, V>> extends InputStreamImpl<K, V, M> implements OutputStream<M> {
-    final Function<M, PK> parKeyFn;
-
-    /**
-     * Default constructor
-     *
-     * @param graph the {@link StreamGraphImpl} object that this stream belongs to
-     */
-    IntermediateStreamImpl(StreamGraphImpl graph, StreamSpec streamSpec, Serde<K> keySerde, Serde<V> msgSerde) {
-      this(graph, streamSpec, keySerde, msgSerde, null);
-    }
-
-    IntermediateStreamImpl(StreamGraphImpl graph, StreamSpec streamSpec, Serde<K> keySerde, Serde<V> msgSerde, Function<M, PK> parKeyFn) {
-      super(graph, streamSpec, keySerde, msgSerde);
-      this.parKeyFn = parKeyFn;
-    }
-
-    @Override
-    public SinkFunction<M> getSinkFunction() {
-      return (M message, MessageCollector mc, TaskCoordinator tc) -> {
-        // TODO: need to find a way to directly pass in the serde class names
-        // mc.send(new OutgoingMessageEnvelope(this.spec.getSystemStream(), this.keySerde.getClass().getName(), this.msgSerde.getClass().getName(),
-        //    message.getKey(), message.getKey(), message.getMessage()));
-        if (this.parKeyFn == null) {
-          mc.send(new OutgoingMessageEnvelope(new SystemStream(this.spec.getSystemName(), this.spec.getPhysicalName()), message.getKey(), message.getMessage()));
-        } else {
-          // apply partition key function
-          mc.send(new OutgoingMessageEnvelope(new SystemStream(this.spec.getSystemName(), this.spec.getPhysicalName()), this.parKeyFn.apply(message), message.getKey(), message.getMessage()));
-        }
-      };
-    }
-  }
-
-  /**
-   * Maps keeping all {@link SystemStream}s that are input and output of operators in {@link StreamGraphImpl}
-   */
-  private final Map<String, MessageStream> inStreams = new HashMap<>();
-  private final Map<String, OutputStream> outStreams = new HashMap<>();
+  private final Map<StreamSpec, InputStreamInternal> inStreams = new HashMap<>();
+  private final Map<StreamSpec, OutputStreamInternal> outStreams = new HashMap<>();
   private final ApplicationRunner runner;
   private final Config config;
 
   private ContextManager contextManager = new ContextManager() { };
 
   public StreamGraphImpl(ApplicationRunner runner, Config config) {
+    // TODO: SAMZA-1118 - Move StreamSpec and ApplicationRunner out of StreamGraphImpl once Systems
+    // can use streamId to send and receive messages.
     this.runner = runner;
     this.config = config;
   }
 
   @Override
-  public <K, V, M extends MessageEnvelope<K, V>> MessageStream<M> createInStream(StreamSpec streamSpec, Serde<K> keySerde, Serde<V> msgSerde) {
-    if (!this.inStreams.containsKey(streamSpec.getId())) {
-      this.inStreams.putIfAbsent(streamSpec.getId(), new InputStreamImpl<K, V, M>(this, streamSpec, keySerde, msgSerde));
-    }
-    return this.inStreams.get(streamSpec.getId());
+  public <K, V, M> MessageStream<M> getInputStream(String streamId, BiFunction<K, V, M> msgBuilder) {
+    return inStreams.computeIfAbsent(runner.getStreamSpec(streamId),
+        streamSpec -> new InputStreamInternalImpl<>(this, streamSpec, msgBuilder));
   }
 
-  /**
-   * Helper method to be used by {@link MessageStreamImpl} class
-   *
-   * @param streamSpec  the {@link StreamSpec} object defining the {@link SystemStream} as the output
-   * @param <M>  the type of {@link MessageEnvelope}s in the output {@link SystemStream}
-   * @return  the {@link MessageStreamImpl} object
-   */
   @Override
-  public <K, V, M extends MessageEnvelope<K, V>> OutputStream<M> createOutStream(StreamSpec streamSpec, Serde<K> keySerde, Serde<V> msgSerde) {
-    if (!this.outStreams.containsKey(streamSpec.getId())) {
-      this.outStreams.putIfAbsent(streamSpec.getId(), new OutputStreamImpl<K, V, M>(this, streamSpec, keySerde, msgSerde));
-    }
-    return this.outStreams.get(streamSpec.getId());
+  public <K, V, M> OutputStream<K, V, M> getOutputStream(String streamId,
+      Function<M, K> keyExtractor, Function<M, V> msgExtractor) {
+    return outStreams.computeIfAbsent(runner.getStreamSpec(streamId),
+        streamSpec -> new OutputStreamInternalImpl<>(this, streamSpec, keyExtractor, msgExtractor));
+  }
+
+  @Override
+  public StreamGraph withContextManager(ContextManager contextManager) {
+    this.contextManager = contextManager;
+    return this;
   }
 
   /**
-   * Helper method to be used by {@link MessageStreamImpl} class
+   * Internal helper for {@link MessageStreamImpl} to add an intermediate {@link MessageStream} to the graph.
+   * An intermediate {@link MessageStream} is both an output and an input stream.
    *
-   * @param streamSpec  the {@link StreamSpec} object defining the {@link SystemStream} as an intermediate {@link SystemStream}
-   * @param <M>  the type of {@link MessageEnvelope}s in the output {@link SystemStream}
-   * @return  the {@link MessageStreamImpl} object
+   * @param streamName the name of the stream to be created. Will be prefixed with job name and id to generate the
+   *                   logical streamId.
+   * @param keyExtractor the {@link Function} to extract the outgoing key from the intermediate message
+   * @param msgExtractor the {@link Function} to extract the outgoing message from the intermediate message
+   * @param msgBuilder the {@link BiFunction} to convert the incoming key and message to a message
+   *                   in the intermediate {@link MessageStream}
+   * @param <K> the type of key in the intermediate message
+   * @param <V> the type of message in the intermediate message
+   * @param <M> the type of messages in the intermediate {@link MessageStream}
+   * @return  the intermediate {@link MessageStreamImpl}
    */
-  @Override
-  public <K, V, M extends MessageEnvelope<K, V>> OutputStream<M> createIntStream(StreamSpec streamSpec,
-      Serde<K> keySerde, Serde<V> msgSerde) {
-    if (!this.inStreams.containsKey(streamSpec.getId())) {
-      this.inStreams.putIfAbsent(streamSpec.getId(), new IntermediateStreamImpl<K, K, V, M>(this, streamSpec, keySerde, msgSerde));
-    }
-    IntermediateStreamImpl<K, K, V, M> intStream = (IntermediateStreamImpl<K, K, V, M>) this.inStreams.get(streamSpec.getId());
-    if (!this.outStreams.containsKey(streamSpec.getId())) {
-      this.outStreams.putIfAbsent(streamSpec.getId(), intStream);
-    }
+  <K, V, M> MessageStreamImpl<M> getIntermediateStream(String streamName,
+      Function<M, K> keyExtractor, Function<M, V> msgExtractor, BiFunction<K, V, M> msgBuilder) {
+    String streamId = String.format("%s-%s-%s",
+        config.get(JobConfig.JOB_NAME()),
+        config.get(JobConfig.JOB_ID(), "1"),
+        streamName);
+    StreamSpec streamSpec = runner.getStreamSpec(streamId);
+    IntermediateStreamInternalImpl<K, V, M> intStream =
+        (IntermediateStreamInternalImpl<K, V, M>) inStreams
+            .computeIfAbsent(streamSpec,
+                k -> new IntermediateStreamInternalImpl<>(this, streamSpec, keyExtractor, msgExtractor, msgBuilder));
+    outStreams.putIfAbsent(streamSpec, intStream);
     return intStream;
   }
 
-  @Override public Map<StreamSpec, MessageStream> getInStreams() {
-    Map<StreamSpec, MessageStream> inStreamMap = new HashMap<>();
-    this.inStreams.forEach((ss, entry) -> inStreamMap.put(((InputStreamImpl) entry).getSpec(), entry));
-    return Collections.unmodifiableMap(inStreamMap);
-  }
-
-  @Override public Map<StreamSpec, OutputStream> getOutStreams() {
-    Map<StreamSpec, OutputStream> outStreamMap = new HashMap<>();
-    this.outStreams.forEach((ss, entry) -> {
-        StreamSpec streamSpec = (entry instanceof IntermediateStreamImpl) ?
-          ((IntermediateStreamImpl) entry).getSpec() :
-          ((OutputStreamImpl) entry).getSpec();
-        outStreamMap.put(streamSpec, entry);
-      });
-    return Collections.unmodifiableMap(outStreamMap);
-  }
-
-  @Override
-  public StreamGraph withContextManager(ContextManager manager) {
-    this.contextManager = manager;
-    return this;
+  public Map<StreamSpec, InputStreamInternal> getInputStreams() {
+    return Collections.unmodifiableMap(inStreams);
   }
 
-  public int getNextOpId() {
-    return this.opId++;
+  public Map<StreamSpec, OutputStreamInternal> getOutputStreams() {
+    return Collections.unmodifiableMap(outStreams);
   }
 
   public ContextManager getContextManager() {
     return this.contextManager;
   }
 
-  /**
-   * Helper method to be get the input stream via {@link SystemStream}
-   *
-   * @param sstream  the {@link SystemStream}
-   * @return  a {@link MessageStreamImpl} object corresponding to the {@code systemStream}
-   */
-  public MessageStreamImpl getInputStream(SystemStream sstream) {
-    for (MessageStream entry: this.inStreams.values()) {
-      if (((InputStreamImpl) entry).getSpec().getSystemName().equals(sstream.getSystem()) &&
-          ((InputStreamImpl) entry).getSpec().getPhysicalName().equals(sstream.getStream())) {
-        return (MessageStreamImpl) entry;
-      }
-    }
-    return null;
-  }
-
-  <M> OutputStream<M> getOutputStream(MessageStreamImpl<M> intStream) {
-    if (this.outStreams.containsValue(intStream)) {
-      return (OutputStream<M>) intStream;
-    }
-    return null;
-  }
-
-  /**
-   * Method to generate intermediate stream from an operator ID.
-   *
-   * @param opId  operator ID
-   * @param parKeyFn  the function to extract the partition key from the input message
-   * @param <PK>  the type of partition key
-   * @param <M>  the type of input message
-   * @return  the {@link OutputStream} object for the re-partitioned stream
-   */
-  <PK, M> MessageStreamImpl<M> generateIntStreamFromOpId(int opId, Function<M, PK> parKeyFn) {
-    String opNameWithId = String.format("%s-%s", OperatorSpec.OpCode.PARTITION_BY.name().toLowerCase(), opId);
-    String streamId = String.format("%s-%s-%s",
-        config.get(JobConfig.JOB_NAME()),
-        config.get(JobConfig.JOB_ID(), "1"),
-        opNameWithId);
-    StreamSpec streamSpec = runner.getStreamSpec(streamId);
-
-    this.inStreams.putIfAbsent(streamSpec.getId(), new IntermediateStreamImpl(this, streamSpec, null, null, parKeyFn));
-    IntermediateStreamImpl intStream = (IntermediateStreamImpl) this.inStreams.get(streamSpec.getId());
-    this.outStreams.putIfAbsent(streamSpec.getId(), intStream);
-    return intStream;
+  /* package private */ int getNextOpId() {
+    return this.opId++;
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorGraph.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorGraph.java b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorGraph.java
deleted file mode 100644
index ca8e34b..0000000
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorGraph.java
+++ /dev/null
@@ -1,185 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.impl;
-
-import org.apache.samza.config.Config;
-import org.apache.samza.operators.MessageStreamImpl;
-import org.apache.samza.operators.spec.OperatorSpec;
-import org.apache.samza.operators.spec.PartialJoinOperatorSpec;
-import org.apache.samza.operators.spec.SinkOperatorSpec;
-import org.apache.samza.operators.spec.StreamOperatorSpec;
-import org.apache.samza.operators.spec.WindowOperatorSpec;
-import org.apache.samza.system.SystemStream;
-import org.apache.samza.task.TaskContext;
-import org.apache.samza.util.Clock;
-import org.apache.samza.util.SystemClock;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-
-
-/**
- * Instantiates the DAG of {@link OperatorImpl}s corresponding to the {@link OperatorSpec}s for a
- * {@link MessageStreamImpl}
- */
-public class OperatorGraph {
-
-  /**
-   * A {@link Map} from {@link OperatorSpec} to {@link OperatorImpl}. This map registers all {@link OperatorImpl} in the DAG
-   * of {@link OperatorImpl} in a {@link org.apache.samza.container.TaskInstance}. Each {@link OperatorImpl} is created
-   * according to a single instance of {@link OperatorSpec}.
-   */
-  private final Map<OperatorSpec, OperatorImpl> operators = new HashMap<>();
-
-  /**
-   * This {@link Map} describes the DAG of {@link OperatorImpl} that are chained together to process the input messages.
-   */
-  private final Map<SystemStream, RootOperatorImpl> operatorGraph = new HashMap<>();
-
-  private final Clock clock;
-
-  public OperatorGraph(Clock clock) {
-    this.clock = clock;
-  }
-
-  public OperatorGraph() {
-    this(SystemClock.instance());
-  }
-
-  /**
-   * Initialize the whole DAG of {@link OperatorImpl}s, based on the input {@link MessageStreamImpl} from the {@link org.apache.samza.operators.StreamGraph}.
-   * This method will traverse each input {@link org.apache.samza.operators.MessageStream} in the {@code inputStreams} and
-   * instantiate the corresponding {@link OperatorImpl} chains that take the {@link org.apache.samza.operators.MessageStream} as input.
-   *
-   * @param inputStreams  the map of input {@link org.apache.samza.operators.MessageStream}s
-   * @param config  the {@link Config} required to instantiate operators
-   * @param context  the {@link TaskContext} required to instantiate operators
-   */
-  public void init(Map<SystemStream, MessageStreamImpl> inputStreams, Config config, TaskContext context) {
-    inputStreams.forEach((ss, mstream) -> this.operatorGraph.put(ss, this.createOperatorImpls(mstream, config, context)));
-  }
-
-  /**
-   * Get the {@link RootOperatorImpl} corresponding to the provided {@code ss}.
-   *
-   * @param ss  input {@link SystemStream}
-   * @return  the {@link RootOperatorImpl} that starts processing the input message
-   */
-  public RootOperatorImpl get(SystemStream ss) {
-    return this.operatorGraph.get(ss);
-  }
-
-  /**
-   * Get all {@link RootOperatorImpl}s for the graph.
-   *
-   * @return  an immutable view of all {@link RootOperatorImpl}s for the graph
-   */
-  public Collection<RootOperatorImpl> getAll() {
-    return Collections.unmodifiableCollection(this.operatorGraph.values());
-  }
-
-  /**
-   * Traverses the DAG of {@link OperatorSpec}s starting from the provided {@link MessageStreamImpl},
-   * creates the corresponding DAG of {@link OperatorImpl}s, and returns its root {@link RootOperatorImpl} node.
-   *
-   * @param source  the input {@link MessageStreamImpl} to instantiate {@link OperatorImpl}s for
-   * @param <M>  the type of messagess in the {@code source} {@link MessageStreamImpl}
-   * @param config  the {@link Config} required to instantiate operators
-   * @param context  the {@link TaskContext} required to instantiate operators
-   * @return  root node for the {@link OperatorImpl} DAG
-   */
-  private <M> RootOperatorImpl<M> createOperatorImpls(MessageStreamImpl<M> source, Config config,
-      TaskContext context) {
-    // since the source message stream might have multiple operator specs registered on it,
-    // create a new root node as a single point of entry for the DAG.
-    RootOperatorImpl<M> rootOperator = new RootOperatorImpl<>();
-    // create the pipeline/topology starting from the source
-    source.getRegisteredOperatorSpecs().forEach(registeredOperator -> {
-        // pass in the source and context s.t. stateful stream operators can initialize their stores
-        OperatorImpl<M, ?> operatorImpl =
-            this.createAndRegisterOperatorImpl(registeredOperator, source, config, context);
-        rootOperator.registerNextOperator(operatorImpl);
-      });
-    return rootOperator;
-  }
-
-  /**
-   * Helper method to recursively traverse the {@link OperatorSpec} DAG and instantiate and link the corresponding
-   * {@link OperatorImpl}s.
-   *
-   * @param operatorSpec  the operatorSpec registered with the {@code source}
-   * @param source  the source {@link MessageStreamImpl}
-   * @param <M>  type of input message
-   * @param config  the {@link Config} required to instantiate operators
-   * @param context  the {@link TaskContext} required to instantiate operators
-   * @return  the operator implementation for the operatorSpec
-   */
-  private <M> OperatorImpl<M, ?> createAndRegisterOperatorImpl(OperatorSpec operatorSpec,
-      MessageStreamImpl<M> source, Config config, TaskContext context) {
-    if (!operators.containsKey(operatorSpec)) {
-      OperatorImpl<M, ?> operatorImpl = createOperatorImpl(source, operatorSpec, config, context);
-      if (operators.putIfAbsent(operatorSpec, operatorImpl) == null) {
-        // this is the first time we've added the operatorImpl corresponding to the operatorSpec,
-        // so traverse and initialize and register the rest of the DAG.
-        // initialize the corresponding operator function
-        operatorSpec.init(config, context);
-        MessageStreamImpl nextStream = operatorSpec.getNextStream();
-        if (nextStream != null) {
-          Collection<OperatorSpec> registeredSpecs = nextStream.getRegisteredOperatorSpecs();
-          registeredSpecs.forEach(registeredSpec -> {
-              OperatorImpl subImpl = this.createAndRegisterOperatorImpl(registeredSpec, nextStream, config, context);
-              operatorImpl.registerNextOperator(subImpl);
-            });
-        }
-        return operatorImpl;
-      }
-    }
-
-    // the implementation corresponding to operatorSpec has already been instantiated
-    // and registered, so we do not need to traverse the DAG further.
-    return operators.get(operatorSpec);
-  }
-
-  /**
-   * Creates a new {@link OperatorImpl} instance for the provided {@link OperatorSpec}.
-   *
-   * @param source  the source {@link MessageStreamImpl}
-   * @param <M>  type of input message
-   * @param operatorSpec  the immutable {@link OperatorSpec} definition.
-   * @param config  the {@link Config} required to instantiate operators
-   * @param context  the {@link TaskContext} required to instantiate operators
-   * @return  the {@link OperatorImpl} implementation instance
-   */
-  private <M> OperatorImpl<M, ?> createOperatorImpl(MessageStreamImpl<M> source, OperatorSpec operatorSpec, Config config, TaskContext context) {
-    if (operatorSpec instanceof StreamOperatorSpec) {
-      StreamOperatorSpec<M, ?> streamOpSpec = (StreamOperatorSpec<M, ?>) operatorSpec;
-      return new StreamOperatorImpl<>(streamOpSpec, source, config, context);
-    } else if (operatorSpec instanceof SinkOperatorSpec) {
-      return new SinkOperatorImpl<>((SinkOperatorSpec<M>) operatorSpec, config, context);
-    } else if (operatorSpec instanceof WindowOperatorSpec) {
-      return new WindowOperatorImpl((WindowOperatorSpec<M, ?, ?>) operatorSpec, clock);
-    } else if (operatorSpec instanceof PartialJoinOperatorSpec) {
-      return new PartialJoinOperatorImpl<>((PartialJoinOperatorSpec) operatorSpec, source, config, context);
-    }
-    throw new IllegalArgumentException(
-        String.format("Unsupported OperatorSpec: %s", operatorSpec.getClass().getName()));
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
new file mode 100644
index 0000000..709f2a0
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.impl;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.MessageStreamImpl;
+import org.apache.samza.operators.StreamGraphImpl;
+import org.apache.samza.operators.spec.OperatorSpec;
+import org.apache.samza.operators.spec.SinkOperatorSpec;
+import org.apache.samza.operators.spec.PartialJoinOperatorSpec;
+import org.apache.samza.operators.spec.StreamOperatorSpec;
+import org.apache.samza.operators.spec.WindowOperatorSpec;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.task.TaskContext;
+import org.apache.samza.util.Clock;
+import org.apache.samza.util.SystemClock;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+
+/**
+ * Instantiates the DAG of {@link OperatorImpl}s corresponding to the {@link OperatorSpec}s for the input
+ * {@link MessageStreamImpl}s.
+ */
+public class OperatorImplGraph {
+
+  /**
+   * A mapping from {@link OperatorSpec}s to their {@link OperatorImpl}s in this graph. Used to avoid creating
+   * multiple {@link OperatorImpl}s for an {@link OperatorSpec}, e.g., when it's reached from different
+   * input {@link MessageStreamImpl}s.
+   */
+  private final Map<OperatorSpec, OperatorImpl> operatorImpls = new HashMap<>();
+
+  /**
+   * A mapping from input {@link SystemStream}s to their {@link OperatorImpl} sub-DAG in this graph.
+   */
+  private final Map<SystemStream, RootOperatorImpl> rootOperators = new HashMap<>();
+
+  private final Clock clock;
+
+  public OperatorImplGraph(Clock clock) {
+    this.clock = clock;
+  }
+
+  /* package private */ OperatorImplGraph() {
+    this(SystemClock.instance());
+  }
+
+  /**
+   * Initialize the DAG of {@link OperatorImpl}s for the input {@link MessageStreamImpl} in the provided
+   * {@link StreamGraphImpl}.
+   *
+   * @param streamGraph  the logical {@link StreamGraphImpl}
+   * @param config  the {@link Config} required to instantiate operators
+   * @param context  the {@link TaskContext} required to instantiate operators
+   */
+  public void init(StreamGraphImpl streamGraph, Config config, TaskContext context) {
+    streamGraph.getInputStreams().forEach((streamSpec, inputStream) -> {
+        SystemStream systemStream = new SystemStream(streamSpec.getSystemName(), streamSpec.getPhysicalName());
+        this.rootOperators.put(systemStream, this.createOperatorImpls((MessageStreamImpl) inputStream, config, context));
+      });
+  }
+
+  /**
+   * Get the {@link RootOperatorImpl} corresponding to the provided input {@code systemStream}.
+   *
+   * @param systemStream  input {@link SystemStream}
+   * @return  the {@link RootOperatorImpl} that starts processing the input message
+   */
+  public RootOperatorImpl getRootOperator(SystemStream systemStream) {
+    return this.rootOperators.get(systemStream);
+  }
+
+  /**
+   * Get all {@link RootOperatorImpl}s for the graph.
+   *
+   * @return  an unmodifiable view of all {@link RootOperatorImpl}s for the graph
+   */
+  public Collection<RootOperatorImpl> getAllRootOperators() {
+    return Collections.unmodifiableCollection(this.rootOperators.values());
+  }
+
+  /**
+   * Traverses the DAG of {@link OperatorSpec}s starting from the provided {@link MessageStreamImpl},
+   * creates the corresponding DAG of {@link OperatorImpl}s, and returns its root {@link RootOperatorImpl} node.
+   *
+   * @param source  the input {@link MessageStreamImpl} to instantiate {@link OperatorImpl}s for
+   * @param <M>  the type of messagess in the {@code source} {@link MessageStreamImpl}
+   * @param config  the {@link Config} required to instantiate operators
+   * @param context  the {@link TaskContext} required to instantiate operators
+   * @return  root node for the {@link OperatorImpl} DAG
+   */
+  private <M> RootOperatorImpl<M> createOperatorImpls(MessageStreamImpl<M> source, Config config, TaskContext context) {
+    // since the source message stream might have multiple operator specs registered on it,
+    // create a new root node as a single point of entry for the DAG.
+    RootOperatorImpl<M> rootOperator = new RootOperatorImpl<>();
+    // create the pipeline/topology starting from the source
+    source.getRegisteredOperatorSpecs().forEach(registeredOperator -> {
+        // pass in the source and context s.t. stateful stream operators can initialize their stores
+        OperatorImpl<M, ?> operatorImpl =
+            createAndRegisterOperatorImpl(registeredOperator, source, config, context);
+        rootOperator.registerNextOperator(operatorImpl);
+      });
+    return rootOperator;
+  }
+
+  /**
+   * Helper method to recursively traverse the {@link OperatorSpec} DAG and instantiate and link the corresponding
+   * {@link OperatorImpl}s.
+   *
+   * @param operatorSpec  the operatorSpec registered with the {@code source}
+   * @param source  the source {@link MessageStreamImpl}
+   * @param <M>  type of input message
+   * @param config  the {@link Config} required to instantiate operators
+   * @param context  the {@link TaskContext} required to instantiate operators
+   * @return  the operator implementation for the operatorSpec
+   */
+  private <M> OperatorImpl<M, ?> createAndRegisterOperatorImpl(OperatorSpec operatorSpec,
+      MessageStreamImpl<M> source, Config config, TaskContext context) {
+    if (!operatorImpls.containsKey(operatorSpec)) {
+      OperatorImpl<M, ?> operatorImpl = createOperatorImpl(source, operatorSpec, config, context);
+      if (operatorImpls.putIfAbsent(operatorSpec, operatorImpl) == null) {
+        // this is the first time we've added the operatorImpl corresponding to the operatorSpec,
+        // so traverse and initialize and register the rest of the DAG.
+        // initialize the corresponding operator function
+        operatorSpec.init(config, context);
+        MessageStreamImpl nextStream = operatorSpec.getNextStream();
+        if (nextStream != null) {
+          Collection<OperatorSpec> registeredSpecs = nextStream.getRegisteredOperatorSpecs();
+          registeredSpecs.forEach(registeredSpec -> {
+              OperatorImpl subImpl = createAndRegisterOperatorImpl(registeredSpec, nextStream, config, context);
+              operatorImpl.registerNextOperator(subImpl);
+            });
+        }
+        return operatorImpl;
+      }
+    }
+
+    // the implementation corresponding to operatorSpec has already been instantiated
+    // and registered, so we do not need to traverse the DAG further.
+    return operatorImpls.get(operatorSpec);
+  }
+
+  /**
+   * Creates a new {@link OperatorImpl} instance for the provided {@link OperatorSpec}.
+   *
+   * @param source  the source {@link MessageStreamImpl}
+   * @param <M>  type of input message
+   * @param operatorSpec  the immutable {@link OperatorSpec} definition.
+   * @param config  the {@link Config} required to instantiate operators
+   * @param context  the {@link TaskContext} required to instantiate operators
+   * @return  the {@link OperatorImpl} implementation instance
+   */
+  private <M> OperatorImpl<M, ?> createOperatorImpl(MessageStreamImpl<M> source,
+      OperatorSpec operatorSpec, Config config, TaskContext context) {
+    if (operatorSpec instanceof StreamOperatorSpec) {
+      StreamOperatorSpec<M, ?> streamOpSpec = (StreamOperatorSpec<M, ?>) operatorSpec;
+      return new StreamOperatorImpl<>(streamOpSpec, source, config, context);
+    } else if (operatorSpec instanceof SinkOperatorSpec) {
+      return new SinkOperatorImpl<>((SinkOperatorSpec<M>) operatorSpec, config, context);
+    } else if (operatorSpec instanceof WindowOperatorSpec) {
+      return new WindowOperatorImpl((WindowOperatorSpec<M, ?, ?>) operatorSpec, clock);
+    } else if (operatorSpec instanceof PartialJoinOperatorSpec) {
+      return new PartialJoinOperatorImpl<>((PartialJoinOperatorSpec) operatorSpec, source, config, context);
+    }
+    throw new IllegalArgumentException(
+        String.format("Unsupported OperatorSpec: %s", operatorSpec.getClass().getName()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java
index 41d1778..f92fbfb 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java
@@ -40,5 +40,7 @@ class SinkOperatorImpl<M> extends OperatorImpl<M, M> {
   @Override
   public void onNext(M message, MessageCollector collector, TaskCoordinator coordinator) {
     this.sinkFn.apply(message, collector, coordinator);
+    // there should be no further chained operators since this is a terminal operator.
+    // hence we don't call #propogateResult() here.
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java
index 5a125a2..18090e2 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java
@@ -25,8 +25,8 @@ import org.apache.samza.task.TaskContext;
 
 
 /**
- * A stateless serializable stream operator specification that holds all the information required
- * to transform the input {@link MessageStreamImpl} and produce the output {@link MessageStreamImpl}.
+ * A stream operator specification that holds all the information required to transform 
+ * the input {@link MessageStreamImpl} and produce the output {@link MessageStreamImpl}.
  *
  * @param <OM>  the type of output message from the operator
  */
@@ -45,16 +45,21 @@ public interface OperatorSpec<OM> {
     PARTITION_BY
   }
 
-
   /**
-   * Get the output stream containing transformed messages produced by this operator.
-   * @return  the output stream containing transformed messages produced by this operator.
+   * Get the next {@link MessageStreamImpl} that receives the transformed messages produced by this operator.
+   * @return  the next {@link MessageStreamImpl}
    */
   MessageStreamImpl<OM> getNextStream();
 
   /**
-   * Return the ID for this operator
-   * @return ID integer
+   * Get the {@link OpCode} for this operator.
+   * @return  the {@link OpCode} for this operator
+   */
+  OpCode getOpCode();
+
+  /**
+   * Get the unique ID of this operator in the {@link org.apache.samza.operators.StreamGraph}.
+   * @return  the unique operator ID
    */
   int getOpId();