You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2017/02/16 18:18:24 UTC

[3/4] samza git commit: SAMZA-1073: top-level fluent API

http://git-wip-us.apache.org/repos/asf/samza/blob/c249443b/samza-api/src/main/java/org/apache/samza/operators/windows/Windows.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/windows/Windows.java b/samza-api/src/main/java/org/apache/samza/operators/windows/Windows.java
index f7e1f36..73fb5c8 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/windows/Windows.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/windows/Windows.java
@@ -20,7 +20,6 @@
 package org.apache.samza.operators.windows;
 
 import org.apache.samza.annotation.InterfaceStability;
-import org.apache.samza.operators.data.MessageEnvelope;
 import org.apache.samza.operators.triggers.TimeTrigger;
 import org.apache.samza.operators.triggers.Trigger;
 import org.apache.samza.operators.triggers.Triggers;
@@ -34,11 +33,11 @@ import java.util.function.Function;
 /**
  * APIs for creating different types of {@link Window}s.
  *
- * Groups the incoming {@link MessageEnvelope}s in the {@link org.apache.samza.operators.MessageStream} into finite windows for processing.
+ * Groups the incoming messages in the {@link org.apache.samza.operators.MessageStream} into finite windows for processing.
  *
  * <p> Each window is uniquely identified by its {@link WindowKey}. A window can have one or more associated {@link Trigger}s
  * that determine when results from the {@link Window} are emitted. Each emitted result contains one or more
- * {@link MessageEnvelope}s in the window and is called a {@link WindowPane}.
+ * messages in the window and is called a {@link WindowPane}.
  *
  * <p> A window can have early triggers that allow emitting {@link WindowPane}s speculatively before all data for the window
  * has arrived or late triggers that allow handling of late data arrivals.
@@ -74,14 +73,14 @@ import java.util.function.Function;
  *   <li>
  *     Session Windows: A session window groups a {@link org.apache.samza.operators.MessageStream} into sessions.
  *     A <i>session</i> captures some period of activity over a {@link org.apache.samza.operators.MessageStream}.
- *     The boundary for a session is defined by a {@code sessionGap}. All {@link MessageEnvelope}s that that arrive within
+ *     The boundary for a session is defined by a {@code sessionGap}. All messages that that arrive within
  *     the gap are grouped into the same session.
  *   <li>
  *     Global Windows: A global window defines a single infinite window over the entire {@link org.apache.samza.operators.MessageStream}.
  *     An early trigger must be specified when defining a global window.
  * </ul>
  *
- * <p> A {@link Window} is defined as "keyed" when the incoming {@link MessageEnvelope}s are first grouped based on their key
+ * <p> A {@link Window} is defined as "keyed" when the incoming messages are first grouped based on their key
  * and triggers are fired and window panes are emitted per-key. It is possible to construct "keyed" variants of all the above window
  * types.
  *
@@ -92,7 +91,7 @@ public final class Windows {
   private Windows() { }
 
   /**
-   * Creates a {@link Window} that groups incoming {@link MessageEnvelope}s into fixed-size, non-overlapping processing
+   * Creates a {@link Window} that groups incoming messages into fixed-size, non-overlapping processing
    * time based windows based on the provided keyFn and applies the provided fold function to them.
    *
    * <p>The below example computes the maximum value per-key over fixed size 10 second windows.
@@ -101,29 +100,29 @@ public final class Windows {
    *    MessageStream<UserClick> stream = ...;
    *    Function<UserClick, String> keyFn = ...;
    *    BiFunction<UserClick, Integer, Integer> maxAggregator = (m, c)-> Math.max(parseInt(m), c);
-   *    MessageStream<WindowOutput<WindowKey<String>, Integer>> windowedStream = stream.window(
-   *    Windows.keyedTumblingWindow(keyFn, Duration.ofSeconds(10), maxAggregator));
+   *    MessageStream<WindowPane<String, Integer>> windowedStream = stream.window(
+   *        Windows.keyedTumblingWindow(keyFn, Duration.ofSeconds(10), maxAggregator));
    * }
    * </pre>
    *
-   * @param keyFn the function to extract the window key from a {@link MessageEnvelope}
+   * @param keyFn the function to extract the window key from a message
    * @param interval the duration in processing time
-   * @param foldFn the function to aggregate {@link MessageEnvelope}s in the {@link WindowPane}
-   * @param <M> the type of the input {@link MessageEnvelope}
+   * @param foldFn the function to aggregate messages in the {@link WindowPane}
+   * @param <M> the type of the input message
    * @param <WV> the type of the {@link WindowPane} output value
    * @param <K> the type of the key in the {@link Window}
    * @return the created {@link Window} function.
    */
-  public static <M extends MessageEnvelope, K, WV> Window<M, K, WV, WindowPane<K, WV>>
+  public static <M, K, WV> Window<M, K, WV>
     keyedTumblingWindow(Function<M, K> keyFn, Duration interval, BiFunction<M, WV, WV> foldFn) {
 
-    Trigger defaultTrigger = new TimeTrigger(interval);
+    Trigger<M> defaultTrigger = new TimeTrigger<>(interval);
     return new WindowInternal<M, K, WV>(defaultTrigger, foldFn, keyFn, null);
   }
 
 
   /**
-   * Creates a {@link Window} that groups incoming {@link MessageEnvelope}s into fixed-size, non-overlapping
+   * Creates a {@link Window} that groups incoming messages into fixed-size, non-overlapping
    * processing time based windows using the provided keyFn.
    *
    * <p>The below example groups the stream into fixed-size 10 second windows for each key.
@@ -131,19 +130,18 @@ public final class Windows {
    * <pre> {@code
    *    MessageStream<UserClick> stream = ...;
    *    Function<UserClick, String> keyFn = ...;
-   *    MessageStream<WindowOutput<WindowKey<String>, Collection<UserClick>>> windowedStream = stream.window(
-   *    Windows.keyedTumblingWindow(keyFn, Duration.ofSeconds(10)));
+   *    MessageStream<WindowPane<String, Collection<UserClick>>> windowedStream = stream.window(
+   *        Windows.keyedTumblingWindow(keyFn, Duration.ofSeconds(10)));
    * }
    * </pre>
    *
-   * @param keyFn function to extract key from the {@link MessageEnvelope}
+   * @param keyFn function to extract key from the message
    * @param interval the duration in processing time
-   * @param <M> the type of the input {@link MessageEnvelope}
+   * @param <M> the type of the input message
    * @param <K> the type of the key in the {@link Window}
    * @return the created {@link Window} function
    */
-  public static <M extends MessageEnvelope, K> Window<M, K, Collection<M>, WindowPane<K, Collection<M>>>
-    keyedTumblingWindow(Function<M, K> keyFn, Duration interval) {
+  public static <M, K> Window<M, K, Collection<M>> keyedTumblingWindow(Function<M, K> keyFn, Duration interval) {
     BiFunction<M, Collection<M>, Collection<M>> aggregator = (m, c) -> {
       c.add(m);
       return c;
@@ -160,25 +158,25 @@ public final class Windows {
    * <pre> {@code
    *    MessageStream<String> stream = ...;
    *    BiFunction<String, Integer, Integer> maxAggregator = (m, c)-> Math.max(parseInt(m), c);
-   *    MessageStream<WindowOutput<WindowKey, Integer>> windowedStream = stream.window(
-   *    Windows.tumblingWindow(Duration.ofSeconds(10), maxAggregator));
+   *    MessageStream<WindowPane<Void, Integer>> windowedStream = stream.window(
+   *        Windows.tumblingWindow(Duration.ofSeconds(10), maxAggregator));
    * }
    * </pre>
    *
    * @param duration the duration in processing time
-   * @param foldFn to aggregate {@link MessageEnvelope}s in the {@link WindowPane}
-   * @param <M> the type of the input {@link MessageEnvelope}
+   * @param foldFn to aggregate messages in the {@link WindowPane}
+   * @param <M> the type of the input message
    * @param <WV> the type of the {@link WindowPane} output value
    * @return the created {@link Window} function
    */
-  public static <M extends MessageEnvelope, WV> Window<M, Void, WV, WindowPane<Void, WV>>
+  public static <M, WV> Window<M, Void, WV>
     tumblingWindow(Duration duration, BiFunction<M, WV, WV> foldFn) {
-    Trigger defaultTrigger = Triggers.repeat(new TimeTrigger(duration));
-    return new WindowInternal<M, Void, WV>(defaultTrigger, foldFn, null, null);
+    Trigger<M> defaultTrigger = Triggers.repeat(new TimeTrigger<>(duration));
+    return new WindowInternal<>(defaultTrigger, foldFn, null, null);
   }
 
   /**
-   * Creates a {@link Window} that groups incoming {@link MessageEnvelope}s into fixed-size, non-overlapping
+   * Creates a {@link Window} that groups incoming messages into fixed-size, non-overlapping
    * processing time based windows.
    *
    * <p>The below example groups the stream into fixed-size 10 second windows and computes a windowed-percentile.
@@ -187,16 +185,16 @@ public final class Windows {
    *    MessageStream<Long> stream = ...;
    *    Function<Collection<Long, Long>> percentile99 = ..
    *
-   *    MessageStream<WindowOutput<WindowKey, Collection<Long>>> windowedStream = integerStream.window(Windows.tumblingWindow(Duration.ofSeconds(10)));
+   *    MessageStream<WindowPane<Void, Collection<Long>>> windowedStream = integerStream.window(Windows.tumblingWindow(Duration.ofSeconds(10)));
    *    MessageStream<Long> windowedPercentiles = windowed.map(windowedOutput -> percentile99(windowedOutput.getMessage());
    * }
    * </pre>
    *
    * @param duration the duration in processing time
-   * @param <M> the type of the input {@link MessageEnvelope}
+   * @param <M> the type of the input message
    * @return the created {@link Window} function
    */
-  public static <M extends MessageEnvelope> Window<M, Void, Collection<M>, WindowPane<Void, Collection<M>>> tumblingWindow(Duration duration) {
+  public static <M> Window<M, Void, Collection<M>> tumblingWindow(Duration duration) {
     BiFunction<M, Collection<M>, Collection<M>> aggregator = (m, c) -> {
       c.add(m);
       return c;
@@ -205,11 +203,11 @@ public final class Windows {
   }
 
   /**
-   * Creates a {@link Window} that groups incoming {@link MessageEnvelope}s into sessions per-key based on the provided {@code sessionGap}
+   * Creates a {@link Window} that groups incoming messages into sessions per-key based on the provided {@code sessionGap}
    * and applies the provided fold function to them.
    *
    * <p>A <i>session</i> captures some period of activity over a {@link org.apache.samza.operators.MessageStream}.
-   * A session is considered complete when no new messages arrive within the {@code sessionGap}. All {@link MessageEnvelope}s that arrive within
+   * A session is considered complete when no new messages arrive within the {@code sessionGap}. All messages that arrive within
    * the gap are grouped into the same session.
    *
    * <p>The below example computes the maximum value per-key over a session window of gap 10 seconds.
@@ -218,29 +216,29 @@ public final class Windows {
    *    MessageStream<UserClick> stream = ...;
    *    BiFunction<UserClick, Integer, Integer> maxAggregator = (m, c)-> Math.max(parseInt(m), c);
    *    Function<UserClick, String> userIdExtractor = m -> m.getUserId()..;
-   *    MessageStream<WindowOutput<WindowKey<String>, Integer>> windowedStream = stream.window(
-   *    Windows.keyedSessionWindow(userIdExtractor, Duration.minute(1), maxAggregator));
+   *    MessageStream<WindowPane<String, Integer>> windowedStream = stream.window(
+   *        Windows.keyedSessionWindow(userIdExtractor, Duration.minute(1), maxAggregator));
    * }
    * </pre>
    *
-   * @param keyFn the function to extract the window key from a {@link MessageEnvelope}
+   * @param keyFn the function to extract the window key from a message
    * @param sessionGap the timeout gap for defining the session
-   * @param foldFn the function to aggregate {@link MessageEnvelope}s in the {@link WindowPane}
-   * @param <M> the type of the input {@link MessageEnvelope}
+   * @param foldFn the function to aggregate messages in the {@link WindowPane}
+   * @param <M> the type of the input message
    * @param <K> the type of the key in the {@link Window}
    * @param <WV> the type of the output value in the {@link WindowPane}
    * @return the created {@link Window} function
    */
-  public static <M extends MessageEnvelope, K, WV> Window<M, K, WV, WindowPane<K, WV>> keyedSessionWindow(Function<M, K> keyFn, Duration sessionGap, BiFunction<M, WV, WV> foldFn) {
-    Trigger defaultTrigger = Triggers.timeSinceLastMessage(sessionGap);
-    return new WindowInternal<M, K, WV>(defaultTrigger, foldFn, keyFn, null);
+  public static <M, K, WV> Window<M, K, WV> keyedSessionWindow(Function<M, K> keyFn, Duration sessionGap, BiFunction<M, WV, WV> foldFn) {
+    Trigger<M> defaultTrigger = Triggers.timeSinceLastMessage(sessionGap);
+    return new WindowInternal<>(defaultTrigger, foldFn, keyFn, null);
   }
 
   /**
-   * Creates a {@link Window} that groups incoming {@link MessageEnvelope}s into sessions per-key based on the provided {@code sessionGap}.
+   * Creates a {@link Window} that groups incoming messages into sessions per-key based on the provided {@code sessionGap}.
    *
    * <p>A <i>session</i> captures some period of activity over a {@link org.apache.samza.operators.MessageStream}. The
-   * boundary for the session is defined by a {@code sessionGap}. All {@link MessageEnvelope}s that that arrive within
+   * boundary for the session is defined by a {@code sessionGap}. All messages that that arrive within
    * the gap are grouped into the same session.
    *
    * <p>The below example groups the stream into per-key session windows of gap 10 seconds.
@@ -249,18 +247,18 @@ public final class Windows {
    *    MessageStream<UserClick> stream = ...;
    *    BiFunction<UserClick, Integer, Integer> maxAggregator = (m, c)-> Math.max(parseIntField(m), c);
    *    Function<UserClick, String> userIdExtractor = m -> m.getUserId()..;
-   *    MessageStream<WindowOutput<WindowKey<String>, Collection<M>>> windowedStream = stream.window(
-   *    Windows.keyedSessionWindow(userIdExtractor, Duration.ofSeconds(10)));
+   *    MessageStream<WindowPane<String>, Collection<M>> windowedStream = stream.window(
+   *        Windows.keyedSessionWindow(userIdExtractor, Duration.ofSeconds(10)));
    * }
    * </pre>
    *
-   * @param keyFn the function to extract the window key from a {@link MessageEnvelope}
+   * @param keyFn the function to extract the window key from a message}
    * @param sessionGap the timeout gap for defining the session
-   * @param <M> the type of the input {@link MessageEnvelope}
+   * @param <M> the type of the input message
    * @param <K> the type of the key in the {@link Window}
    * @return the created {@link Window} function
    */
-  public static <M extends MessageEnvelope, K> Window<M, K, Collection<M>, WindowPane<K, Collection<M>>> keyedSessionWindow(Function<M, K> keyFn, Duration sessionGap) {
+  public static <M, K> Window<M, K, Collection<M>> keyedSessionWindow(Function<M, K> keyFn, Duration sessionGap) {
 
     BiFunction<M, Collection<M>, Collection<M>> aggregator = (m, c) -> {
       c.add(m);
@@ -271,7 +269,7 @@ public final class Windows {
 
 
   /**
-   * Creates a {@link Window} that groups incoming {@link MessageEnvelope}s into a single global window. This window does not have a
+   * Creates a {@link Window} that groups incoming messages into a single global window. This window does not have a
    * default trigger. The triggering behavior must be specified by setting an early trigger.
    *
    * <p>The below example computes the maximum value over a count based window. The window emits {@link WindowPane}s when
@@ -280,36 +278,36 @@ public final class Windows {
    * <pre> {@code
    *    MessageStream<Long> stream = ...;
    *    BiFunction<Long, Long, Long> maxAggregator = (m, c)-> Math.max(m, c);
-   *    MessageStream<WindowOutput<WindowKey, Long>> windowedStream = stream.window(Windows.globalWindow(maxAggregator)
+   *    MessageStream<WindowPane<Void, Long>> windowedStream = stream.window(Windows.globalWindow(maxAggregator)
    *      .setEarlyTriggers(Triggers.repeat(Triggers.any(Triggers.count(50), Triggers.timeSinceFirstMessage(Duration.ofSeconds(10))))))
    * }
    * </pre>
    *
-   * @param foldFn the function to aggregate {@link MessageEnvelope}s in the {@link WindowPane}
-   * @param <M> the type of {@link MessageEnvelope}
+   * @param foldFn the function to aggregate messages in the {@link WindowPane}
+   * @param <M> the type of message
    * @param <WV> type of the output value in the {@link WindowPane}
    * @return the created {@link Window} function.
    */
-  public static <M extends MessageEnvelope, WV> Window<M, Void, WV, WindowPane<Void, WV>> globalWindow(BiFunction<M, WV, WV> foldFn) {
-    return new WindowInternal<M, Void, WV>(null, foldFn, null, null);
+  public static <M, WV> Window<M, Void, WV> globalWindow(BiFunction<M, WV, WV> foldFn) {
+    return new WindowInternal<>(null, foldFn, null, null);
   }
 
   /**
-   * Creates a {@link Window} that groups incoming {@link MessageEnvelope}s into a single global window. This window does not have a
+   * Creates a {@link Window} that groups incoming messages into a single global window. This window does not have a
    * default trigger. The triggering behavior must be specified by setting an early trigger.
    *
    * The below example groups the stream into count based windows that trigger every 50 messages or every 10 minutes.
    * <pre> {@code
    *    MessageStream<Long> stream = ...;
-   *    MessageStream<WindowOutput<WindowKey, Collection<Long>>> windowedStream = stream.window(Windows.globalWindow()
+   *    MessageStream<WindowPane<Void, Collection<Long>> windowedStream = stream.window(Windows.globalWindow()
    *      .setEarlyTrigger(Triggers.repeat(Triggers.any(Triggers.count(50), Triggers.timeSinceFirstMessage(Duration.ofSeconds(10))))))
    * }
    * </pre>
    *
-   * @param <M> the type of {@link MessageEnvelope}
+   * @param <M> the type of message
    * @return the created {@link Window} function.
    */
-  public static <M extends MessageEnvelope> Window<M, Void, Collection<M>, WindowPane<Void, Collection<M>>> globalWindow() {
+  public static <M> Window<M, Void, Collection<M>> globalWindow() {
     BiFunction<M, Collection<M>, Collection<M>> aggregator = (m, c) -> {
       c.add(m);
       return c;
@@ -318,7 +316,7 @@ public final class Windows {
   }
 
   /**
-   * Returns a global {@link Window} that groups incoming {@link MessageEnvelope}s using the provided keyFn.
+   * Returns a global {@link Window} that groups incoming messages using the provided keyFn.
    * The window does not have a default trigger. The triggering behavior must be specified by setting an early
    * trigger.
    *
@@ -329,24 +327,24 @@ public final class Windows {
    *    MessageStream<UserClick> stream = ...;
    *    BiFunction<UserClick, Long, Long> maxAggregator = (m, c)-> Math.max(parseLongField(m), c);
    *    Function<UserClick, String> keyFn = ...;
-   *    MessageStream<WindowOutput<WindowKey<String>, Long>> windowedStream = stream.window(Windows.keyedGlobalWindow(keyFn, maxAggregator)
+   *    MessageStream<WindowPane<String, Long>> windowedStream = stream.window(Windows.keyedGlobalWindow(keyFn, maxAggregator)
    *      .setEarlyTrigger(Triggers.repeat(Triggers.any(Triggers.count(50), Triggers.timeSinceFirstMessage(Duration.minutes(10))))))
    * }
    * </pre>
    *
-   * @param keyFn the function to extract the window key from a {@link MessageEnvelope}
-   * @param foldFn the function to aggregate {@link MessageEnvelope}s in the {@link WindowPane}
-   * @param <M> the type of {@link MessageEnvelope}
+   * @param keyFn the function to extract the window key from a message
+   * @param foldFn the function to aggregate messages in the {@link WindowPane}
+   * @param <M> the type of message
    * @param <K> type of the key in the {@link Window}
    * @param <WV> the type of the output value in the {@link WindowPane}
    * @return the created {@link Window} function
    */
-  public static <M extends MessageEnvelope, K, WV> Window<M, K, WV, WindowPane<K, WV>> keyedGlobalWindow(Function<M, K> keyFn, BiFunction<M, WV, WV> foldFn) {
+  public static <M, K, WV> Window<M, K, WV> keyedGlobalWindow(Function<M, K> keyFn, BiFunction<M, WV, WV> foldFn) {
     return new WindowInternal<M, K, WV>(null, foldFn, keyFn, null);
   }
 
   /**
-   * Returns a global {@link Window} that groups incoming {@link MessageEnvelope}s using the provided keyFn.
+   * Returns a global {@link Window} that groups incoming messages using the provided keyFn.
    * The window does not have a default trigger. The triggering behavior must be specified by setting an early trigger.
    *
    * <p> The below example groups the stream per-key into count based windows. The window triggers every 50 messages or
@@ -355,17 +353,17 @@ public final class Windows {
    * <pre> {@code
    *    MessageStream<UserClick> stream = ...;
    *    Function<UserClick, String> keyFn = ...;
-   *    MessageStream<WindowOutput<WindowKey<String>, Collection<UserClick>>> windowedStream = stream.window(Windows.keyedGlobalWindow(keyFn)
+   *    MessageStream<WindowPane<String, Collection<UserClick>> windowedStream = stream.window(Windows.keyedGlobalWindow(keyFn)
    *      .setEarlyTrigger(Triggers.repeat(Triggers.any(Triggers.count(50), Triggers.timeSinceFirstMessage(Duration.minutes(10))))))
    * }
    * </pre>
    *
-   * @param keyFn the function to extract the window key from a {@link MessageEnvelope}
-   * @param <M> the type of {@link MessageEnvelope}
+   * @param keyFn the function to extract the window key from a message
+   * @param <M> the type of message
    * @param <K> the type of the key in the {@link Window}
    * @return the created {@link Window} function
    */
-  public static <M extends MessageEnvelope, K> Window<M, K, Collection<M>, WindowPane<K, Collection<M>>> keyedGlobalWindow(Function<M, K> keyFn) {
+  public static <M, K> Window<M, K, Collection<M>> keyedGlobalWindow(Function<M, K> keyFn) {
     BiFunction<M, Collection<M>, Collection<M>> aggregator = (m, c) -> {
       c.add(m);
       return c;

http://git-wip-us.apache.org/repos/asf/samza/blob/c249443b/samza-api/src/main/java/org/apache/samza/operators/windows/internal/WindowInternal.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/windows/internal/WindowInternal.java b/samza-api/src/main/java/org/apache/samza/operators/windows/internal/WindowInternal.java
index 8825867..9479eea 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/windows/internal/WindowInternal.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/windows/internal/WindowInternal.java
@@ -18,11 +18,9 @@
  */
 package org.apache.samza.operators.windows.internal;
 import org.apache.samza.annotation.InterfaceStability;
-import org.apache.samza.operators.data.MessageEnvelope;
 import org.apache.samza.operators.triggers.Trigger;
 import org.apache.samza.operators.windows.AccumulationMode;
 import org.apache.samza.operators.windows.Window;
-import org.apache.samza.operators.windows.WindowPane;
 
 import java.util.function.BiFunction;
 import java.util.function.Function;
@@ -32,9 +30,13 @@ import java.util.function.Function;
  *  and whether to accumulate or discard previously emitted panes.
  *
  *  Note: This class is meant to be used internally by Samza, and is not to be instantiated by programmers.
+ *
+ * @param <M>  the type of input message
+ * @param <K>  the type of key for the window
+ * @param <WV>  the type of aggregated value in the window output
  */
 @InterfaceStability.Unstable
-public final class WindowInternal<M extends MessageEnvelope, K, WV> implements Window<M, K, WV, WindowPane<K, WV>> {
+public final class WindowInternal<M, K, WV> implements Window<M, K, WV> {
 
   private final Trigger defaultTrigger;
 
@@ -67,19 +69,19 @@ public final class WindowInternal<M extends MessageEnvelope, K, WV> implements W
   }
 
   @Override
-  public Window<M, K, WV, WindowPane<K, WV>> setEarlyTrigger(Trigger trigger) {
+  public Window<M, K, WV> setEarlyTrigger(Trigger trigger) {
     this.earlyTrigger = trigger;
     return this;
   }
 
   @Override
-  public Window<M, K, WV, WindowPane<K, WV>> setLateTrigger(Trigger trigger) {
+  public Window<M, K, WV> setLateTrigger(Trigger trigger) {
     this.lateTrigger = trigger;
     return this;
   }
 
   @Override
-  public Window<M, K, WV, WindowPane<K, WV>> setAccumulationMode(AccumulationMode mode) {
+  public Window<M, K, WV> setAccumulationMode(AccumulationMode mode) {
     this.mode = mode;
     return this;
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/c249443b/samza-api/src/main/java/org/apache/samza/system/ExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/ExecutionEnvironment.java b/samza-api/src/main/java/org/apache/samza/system/ExecutionEnvironment.java
new file mode 100644
index 0000000..ad37eb3
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/system/ExecutionEnvironment.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.system;
+
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.config.ConfigException;
+import org.apache.samza.operators.StreamGraphBuilder;
+import org.apache.samza.config.Config;
+
+
+/**
+ * Interface to be implemented by physical execution engine to deploy the config and jobs to run the {@link org.apache.samza.operators.StreamGraph}
+ */
+@InterfaceStability.Unstable
+public interface ExecutionEnvironment {
+
+  String ENVIRONMENT_CONFIG = "job.execution.environment.class";
+  String DEFAULT_ENVIRONMENT_CLASS = "org.apache.samza.system.StandaloneExecutionEnvironment";
+
+  /**
+   * Static method to load the local standalone environment
+   *
+   * @param config  configuration passed in to initialize the Samza standalone process
+   * @return  the standalone {@link ExecutionEnvironment} to run the user-defined stream applications
+   */
+  static ExecutionEnvironment getLocalEnvironment(Config config) {
+    return null;
+  }
+
+  /**
+   * Static method to load the non-standalone environment.
+   *
+   * @param config  configuration passed in to initialize the Samza processes
+   * @return  the configure-driven {@link ExecutionEnvironment} to run the user-defined stream applications
+   */
+  static ExecutionEnvironment fromConfig(Config config) {
+    try {
+      if (ExecutionEnvironment.class.isAssignableFrom(Class.forName(config.get(ENVIRONMENT_CONFIG, DEFAULT_ENVIRONMENT_CLASS)))) {
+        return (ExecutionEnvironment) Class.forName(config.get(ENVIRONMENT_CONFIG, DEFAULT_ENVIRONMENT_CLASS)).newInstance();
+      }
+    } catch (Exception e) {
+      throw new ConfigException(String.format("Problem in loading ExecutionEnvironment class %s", config.get(ENVIRONMENT_CONFIG)), e);
+    }
+    throw new ConfigException(String.format(
+        "Class %s does not implement interface ExecutionEnvironment properly",
+        config.get(ENVIRONMENT_CONFIG)));
+  }
+
+  /**
+   * Method to be invoked to deploy and run the actual Samza jobs to execute {@link org.apache.samza.operators.StreamGraph}
+   *
+   * @param graphBuilder  the user-defined {@link StreamGraphBuilder} object
+   * @param config  the {@link Config} object for this job
+   */
+  void run(StreamGraphBuilder graphBuilder, Config config);
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/c249443b/samza-api/src/main/java/org/apache/samza/task/TaskContext.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/task/TaskContext.java b/samza-api/src/main/java/org/apache/samza/task/TaskContext.java
index a85e0b4..128cff1 100644
--- a/samza-api/src/main/java/org/apache/samza/task/TaskContext.java
+++ b/samza-api/src/main/java/org/apache/samza/task/TaskContext.java
@@ -54,4 +54,14 @@ public interface TaskContext {
    *
    */
   void setStartingOffset(SystemStreamPartition ssp, String offset);
+
+  /**
+   * Method to allow user to return customized context
+   *
+   * @param <T>  the type of user-defined task context
+   * @return  user-defined task context object
+   */
+  default <T> T getUserDefinedContext() {
+    return null;
+  };
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/c249443b/samza-api/src/test/java/org/apache/samza/operators/data/TestIncomingSystemMessage.java
----------------------------------------------------------------------
diff --git a/samza-api/src/test/java/org/apache/samza/operators/data/TestIncomingSystemMessage.java b/samza-api/src/test/java/org/apache/samza/operators/data/TestIncomingSystemMessage.java
index 7bd62a7..e3a1290 100644
--- a/samza-api/src/test/java/org/apache/samza/operators/data/TestIncomingSystemMessage.java
+++ b/samza-api/src/test/java/org/apache/samza/operators/data/TestIncomingSystemMessage.java
@@ -33,7 +33,7 @@ public class TestIncomingSystemMessage {
   @Test
   public void testConstructor() {
     IncomingMessageEnvelope ime = mock(IncomingMessageEnvelope.class);
-    IncomingSystemMessageEnvelope ism = new IncomingSystemMessageEnvelope(ime);
+    InputMessageEnvelope ism = new InputMessageEnvelope(ime);
 
     Object mockKey = mock(Object.class);
     Object mockValue = mock(Object.class);

http://git-wip-us.apache.org/repos/asf/samza/blob/c249443b/samza-api/src/test/java/org/apache/samza/operators/windows/TestWindowOutput.java
----------------------------------------------------------------------
diff --git a/samza-api/src/test/java/org/apache/samza/operators/windows/TestWindowOutput.java b/samza-api/src/test/java/org/apache/samza/operators/windows/TestWindowOutput.java
deleted file mode 100644
index 9679e1d..0000000
--- a/samza-api/src/test/java/org/apache/samza/operators/windows/TestWindowOutput.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.windows;
-
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-
-
-public class TestWindowOutput {
-  @Test
-  public void testConstructor() {
-    WindowPane<String, Integer> wndOutput = WindowPane.of(new WindowKey("testMsg", null), 10);
-    assertEquals(wndOutput.getKey().getKey(), "testMsg");
-    assertEquals(wndOutput.getMessage(), Integer.valueOf(10));
-    assertFalse(wndOutput.isDelete());
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/c249443b/samza-api/src/test/java/org/apache/samza/operators/windows/TestWindowPane.java
----------------------------------------------------------------------
diff --git a/samza-api/src/test/java/org/apache/samza/operators/windows/TestWindowPane.java b/samza-api/src/test/java/org/apache/samza/operators/windows/TestWindowPane.java
new file mode 100644
index 0000000..54d0b2f
--- /dev/null
+++ b/samza-api/src/test/java/org/apache/samza/operators/windows/TestWindowPane.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.windows;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+
+public class TestWindowPane {
+  @Test
+  public void testConstructor() {
+    WindowPane<String, Integer> wndOutput = WindowPane.of(new WindowKey<>("testMsg", null), 10);
+    assertEquals(wndOutput.getKey().getKey(), "testMsg");
+    assertEquals(wndOutput.getMessage(), Integer.valueOf(10));
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/c249443b/samza-operator/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/MessageStreamImpl.java b/samza-operator/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
index 286893c..830e4a5 100644
--- a/samza-operator/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
+++ b/samza-operator/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
@@ -19,107 +19,156 @@
 
 package org.apache.samza.operators;
 
-import org.apache.samza.operators.data.MessageEnvelope;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.function.Function;
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.samza.config.Config;
 import org.apache.samza.operators.functions.FilterFunction;
 import org.apache.samza.operators.functions.FlatMapFunction;
 import org.apache.samza.operators.functions.JoinFunction;
 import org.apache.samza.operators.functions.MapFunction;
+import org.apache.samza.operators.functions.PartialJoinFunction;
 import org.apache.samza.operators.functions.SinkFunction;
 import org.apache.samza.operators.spec.OperatorSpec;
 import org.apache.samza.operators.spec.OperatorSpecs;
 import org.apache.samza.operators.windows.Window;
 import org.apache.samza.operators.windows.WindowPane;
 import org.apache.samza.operators.windows.internal.WindowInternal;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.function.BiFunction;
+import org.apache.samza.task.TaskContext;
 
 
 /**
  * The implementation for input/output {@link MessageStream}s to/from the operators.
  * Users use the {@link MessageStream} API methods to describe and chain the operators specs.
  *
- * @param <M>  type of {@link MessageEnvelope}s in this {@link MessageStream}
+ * @param <M>  type of messages in this {@link MessageStream}
  */
-public class MessageStreamImpl<M extends MessageEnvelope> implements MessageStream<M> {
+public class MessageStreamImpl<M> implements MessageStream<M> {
+  /**
+   * The {@link StreamGraphImpl} object that contains this {@link MessageStreamImpl}
+   */
+  private final StreamGraphImpl graph;
 
   /**
-   * The set of operators that consume the {@link MessageEnvelope}s in this {@link MessageStream}
+   * The set of operators that consume the messages in this {@link MessageStream}
    */
   private final Set<OperatorSpec> registeredOperatorSpecs = new HashSet<>();
 
-  @Override
-  public <OM extends MessageEnvelope> MessageStream<OM> map(MapFunction<M, OM> mapFn) {
-    OperatorSpec<OM> op = OperatorSpecs.<M, OM>createStreamOperatorSpec(m -> new ArrayList<OM>() { {
-        OM r = mapFn.apply(m);
-        if (r != null) {
-          this.add(r);
-        }
-      } });
+  /**
+   * Default constructor
+   *
+   * @param graph the {@link StreamGraphImpl} object that this stream belongs to
+   */
+  MessageStreamImpl(StreamGraphImpl graph) {
+    this.graph = graph;
+  }
+
+  @Override public <TM> MessageStream<TM> map(MapFunction<M, TM> mapFn) {
+    OperatorSpec<TM> op = OperatorSpecs.<M, TM>createMapOperatorSpec(mapFn, this.graph, new MessageStreamImpl<>(this.graph));
     this.registeredOperatorSpecs.add(op);
-    return op.getOutputStream();
+    return op.getNextStream();
   }
 
-  @Override
-  public <OM extends MessageEnvelope> MessageStream<OM> flatMap(FlatMapFunction<M, OM> flatMapFn) {
-    OperatorSpec<OM> op = OperatorSpecs.createStreamOperatorSpec(flatMapFn);
+  @Override public MessageStream<M> filter(FilterFunction<M> filterFn) {
+    OperatorSpec<M> op = OperatorSpecs.<M>createFilterOperatorSpec(filterFn, this.graph, new MessageStreamImpl<>(this.graph));
     this.registeredOperatorSpecs.add(op);
-    return op.getOutputStream();
+    return op.getNextStream();
   }
 
   @Override
-  public MessageStream<M> filter(FilterFunction<M> filterFn) {
-    OperatorSpec<M> op = OperatorSpecs.<M, M>createStreamOperatorSpec(t -> new ArrayList<M>() { {
-        if (filterFn.apply(t)) {
-          this.add(t);
-        }
-      } });
+  public <TM> MessageStream<TM> flatMap(FlatMapFunction<M, TM> flatMapFn) {
+    OperatorSpec<TM> op = OperatorSpecs.createStreamOperatorSpec(flatMapFn, this.graph, new MessageStreamImpl<>(this.graph));
     this.registeredOperatorSpecs.add(op);
-    return op.getOutputStream();
+    return op.getNextStream();
   }
 
   @Override
   public void sink(SinkFunction<M> sinkFn) {
-    this.registeredOperatorSpecs.add(OperatorSpecs.createSinkOperatorSpec(sinkFn));
+    this.registeredOperatorSpecs.add(OperatorSpecs.createSinkOperatorSpec(sinkFn, this.graph));
   }
 
-  @Override
-  public <K, WV, WM extends WindowPane<K, WV>> MessageStream<WM> window(
-      Window<M, K, WV, WM> window) {
-    OperatorSpec<WM> wndOp = OperatorSpecs.createWindowOperatorSpec((WindowInternal<MessageEnvelope, K, WV>) window);
-    this.registeredOperatorSpecs.add(wndOp);
-    return wndOp.getOutputStream();
+  @Override public void sendTo(OutputStream<M> stream) {
+    this.registeredOperatorSpecs.add(OperatorSpecs.createSendToOperatorSpec(stream.getSinkFunction(), this.graph, stream));
   }
 
   @Override
-  public <K, JM extends MessageEnvelope<K, ?>, RM extends MessageEnvelope> MessageStream<RM> join(
-      MessageStream<JM> otherStream, JoinFunction<M, JM, RM> joinFn) {
-    MessageStreamImpl<RM> outputStream = new MessageStreamImpl<>();
+  public <K, WV> MessageStream<WindowPane<K, WV>> window(Window<M, K, WV> window) {
+    OperatorSpec<WindowPane<K, WV>> wndOp = OperatorSpecs.createWindowOperatorSpec((WindowInternal<M, K, WV>) window,
+        this.graph, new MessageStreamImpl<>(this.graph));
+    this.registeredOperatorSpecs.add(wndOp);
+    return wndOp.getNextStream();
+  }
 
-    BiFunction<M, JM, RM> parJoin1 = joinFn::apply;
-    BiFunction<JM, M, RM> parJoin2 = (m, t1) -> joinFn.apply(t1, m);
+  @Override public <K, OM, RM> MessageStream<RM> join(MessageStream<OM> otherStream, JoinFunction<K, M, OM, RM> joinFn) {
+    MessageStreamImpl<RM> outputStream = new MessageStreamImpl<>(this.graph);
+
+    PartialJoinFunction<K, M, OM, RM> parJoin1 = new PartialJoinFunction<K, M, OM, RM>() {
+      @Override
+      public RM apply(M m1, OM om) {
+        return joinFn.apply(m1, om);
+      }
+
+      @Override
+      public K getKey(M message) {
+        return joinFn.getFirstKey(message);
+      }
+
+      @Override
+      public K getOtherKey(OM message) {
+        return joinFn.getSecondKey(message);
+      }
+
+      @Override
+      public void init(Config config, TaskContext context) {
+        joinFn.init(config, context);
+      }
+    };
+
+    PartialJoinFunction<K, OM, M, RM> parJoin2 = new PartialJoinFunction<K, OM, M, RM>() {
+      @Override
+      public RM apply(OM m1, M m) {
+        return joinFn.apply(m, m1);
+      }
+
+      @Override
+      public K getKey(OM message) {
+        return joinFn.getSecondKey(message);
+      }
+
+      @Override
+      public K getOtherKey(M message) {
+        return joinFn.getFirstKey(message);
+      }
+    };
 
     // TODO: need to add default store functions for the two partial join functions
 
-    ((MessageStreamImpl<JM>) otherStream).registeredOperatorSpecs.add(OperatorSpecs.createPartialJoinOperatorSpec(parJoin2, outputStream));
-    this.registeredOperatorSpecs.add(OperatorSpecs.createPartialJoinOperatorSpec(parJoin1, outputStream));
+    ((MessageStreamImpl<OM>) otherStream).registeredOperatorSpecs.add(
+        OperatorSpecs.<OM, K, M, RM>createPartialJoinOperatorSpec(parJoin2, this.graph, outputStream));
+    this.registeredOperatorSpecs.add(OperatorSpecs.<M, K, OM, RM>createPartialJoinOperatorSpec(parJoin1, this.graph, outputStream));
     return outputStream;
   }
 
   @Override
   public MessageStream<M> merge(Collection<MessageStream<M>> otherStreams) {
-    MessageStreamImpl<M> outputStream = new MessageStreamImpl<>();
+    MessageStreamImpl<M> outputStream = new MessageStreamImpl<>(this.graph);
 
     otherStreams.add(this);
-    otherStreams.forEach(other ->
-        ((MessageStreamImpl<M>) other).registeredOperatorSpecs.add(OperatorSpecs.createMergeOperatorSpec(outputStream)));
+    otherStreams.forEach(other -> ((MessageStreamImpl<M>) other).registeredOperatorSpecs.
+        add(OperatorSpecs.createMergeOperatorSpec(this.graph, outputStream)));
     return outputStream;
   }
 
+  @Override
+  public <K> MessageStream<M> partitionBy(Function<M, K> parKeyExtractor) {
+    MessageStreamImpl<M> intStream = this.graph.createIntStream(parKeyExtractor);
+    OutputStream<M> outputStream = this.graph.getOutputStream(intStream);
+    this.registeredOperatorSpecs.add(OperatorSpecs.createPartitionOperatorSpec(outputStream.getSinkFunction(),
+        this.graph, outputStream));
+    return intStream;
+  }
   /**
    * Gets the operator specs registered to consume the output of this {@link MessageStream}. This is an internal API and
    * should not be exposed to users.
@@ -129,4 +178,5 @@ public class MessageStreamImpl<M extends MessageEnvelope> implements MessageStre
   public Collection<OperatorSpec> getRegisteredOperatorSpecs() {
     return Collections.unmodifiableSet(this.registeredOperatorSpecs);
   }
+
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/c249443b/samza-operator/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/StreamGraphImpl.java b/samza-operator/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
new file mode 100644
index 0000000..dca3469
--- /dev/null
+++ b/samza-operator/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators;
+
+import java.util.Properties;
+import java.util.function.Function;
+import org.apache.samza.operators.data.MessageEnvelope;
+import org.apache.samza.operators.functions.SinkFunction;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskCoordinator;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * The implementation of {@link StreamGraph} interface. This class provides implementation of methods to allow users to
+ * create system input/output/intermediate streams.
+ */
+public class StreamGraphImpl implements StreamGraph {
+
+  /**
+   * Unique identifier for each {@link org.apache.samza.operators.spec.OperatorSpec} added to transform the {@link MessageEnvelope}
+   * in the input {@link MessageStream}s.
+   */
+  private int opId = 0;
+
+  private class InputStreamImpl<K, V, M extends MessageEnvelope<K, V>> extends MessageStreamImpl<M> {
+    final StreamSpec spec;
+    final Serde<K> keySerde;
+    final Serde<V> msgSerde;
+
+    InputStreamImpl(StreamGraphImpl graph, StreamSpec streamSpec, Serde<K> keySerde, Serde<V> msgSerde) {
+      super(graph);
+      this.spec = streamSpec;
+      this.keySerde = keySerde;
+      this.msgSerde = msgSerde;
+    }
+
+    StreamSpec getSpec() {
+      return this.spec;
+    }
+
+  }
+
+  private class OutputStreamImpl<K, V, M extends MessageEnvelope<K, V>> implements OutputStream<M> {
+    final StreamSpec spec;
+    final Serde<K> keySerde;
+    final Serde<V> msgSerde;
+
+    OutputStreamImpl(StreamGraphImpl graph, StreamSpec streamSpec, Serde<K> keySerde, Serde<V> msgSerde) {
+      this.spec = streamSpec;
+      this.keySerde = keySerde;
+      this.msgSerde = msgSerde;
+    }
+
+    StreamSpec getSpec() {
+      return this.spec;
+    }
+
+    @Override
+    public SinkFunction<M> getSinkFunction() {
+      return (M message, MessageCollector mc, TaskCoordinator tc) -> {
+        // TODO: need to find a way to directly pass in the serde class names
+        // mc.send(new OutgoingMessageEnvelope(this.spec.getSystemStream(), this.keySerde.getClass().getName(), this.msgSerde.getClass().getName(),
+        //    message.getKey(), message.getKey(), message.getMessage()));
+        mc.send(new OutgoingMessageEnvelope(this.spec.getSystemStream(), message.getKey(), message.getMessage()));
+      };
+    }
+  }
+
+  private class IntermediateStreamImpl<PK, K, V, M extends MessageEnvelope<K, V>> extends InputStreamImpl<K, V, M> implements OutputStream<M> {
+    final Function<M, PK> parKeyFn;
+
+    /**
+     * Default constructor
+     *
+     * @param graph the {@link StreamGraphImpl} object that this stream belongs to
+     */
+    IntermediateStreamImpl(StreamGraphImpl graph, StreamSpec streamSpec, Serde<K> keySerde, Serde<V> msgSerde) {
+      this(graph, streamSpec, keySerde, msgSerde, null);
+    }
+
+    IntermediateStreamImpl(StreamGraphImpl graph, StreamSpec streamSpec, Serde<K> keySerde, Serde<V> msgSerde, Function<M, PK> parKeyFn) {
+      super(graph, streamSpec, keySerde, msgSerde);
+      this.parKeyFn = parKeyFn;
+    }
+
+    @Override
+    public SinkFunction<M> getSinkFunction() {
+      return (M message, MessageCollector mc, TaskCoordinator tc) -> {
+        // TODO: need to find a way to directly pass in the serde class names
+        // mc.send(new OutgoingMessageEnvelope(this.spec.getSystemStream(), this.keySerde.getClass().getName(), this.msgSerde.getClass().getName(),
+        //    message.getKey(), message.getKey(), message.getMessage()));
+        if (this.parKeyFn == null) {
+          mc.send(new OutgoingMessageEnvelope(this.spec.getSystemStream(), message.getKey(), message.getMessage()));
+        } else {
+          // apply partition key function
+          mc.send(new OutgoingMessageEnvelope(this.spec.getSystemStream(), this.parKeyFn.apply(message), message.getKey(), message.getMessage()));
+        }
+      };
+    }
+  }
+
+  /**
+   * Maps keeping all {@link SystemStream}s that are input and output of operators in {@link StreamGraphImpl}
+   */
+  private final Map<SystemStream, MessageStream> inStreams = new HashMap<>();
+  private final Map<SystemStream, OutputStream> outStreams = new HashMap<>();
+
+  private ContextManager contextManager = new ContextManager() { };
+
+  @Override
+  public <K, V, M extends MessageEnvelope<K, V>> MessageStream<M> createInStream(StreamSpec streamSpec, Serde<K> keySerde, Serde<V> msgSerde) {
+    if (!this.inStreams.containsKey(streamSpec.getSystemStream())) {
+      this.inStreams.putIfAbsent(streamSpec.getSystemStream(), new InputStreamImpl<K, V, M>(this, streamSpec, keySerde, msgSerde));
+    }
+    return this.inStreams.get(streamSpec.getSystemStream());
+  }
+
+  /**
+   * Helper method to be used by {@link MessageStreamImpl} class
+   *
+   * @param streamSpec  the {@link StreamSpec} object defining the {@link SystemStream} as the output
+   * @param <M>  the type of {@link MessageEnvelope}s in the output {@link SystemStream}
+   * @return  the {@link MessageStreamImpl} object
+   */
+  @Override
+  public <K, V, M extends MessageEnvelope<K, V>> OutputStream<M> createOutStream(StreamSpec streamSpec, Serde<K> keySerde, Serde<V> msgSerde) {
+    if (!this.outStreams.containsKey(streamSpec.getSystemStream())) {
+      this.outStreams.putIfAbsent(streamSpec.getSystemStream(), new OutputStreamImpl<K, V, M>(this, streamSpec, keySerde, msgSerde));
+    }
+    return this.outStreams.get(streamSpec.getSystemStream());
+  }
+
+  /**
+   * Helper method to be used by {@link MessageStreamImpl} class
+   *
+   * @param streamSpec  the {@link StreamSpec} object defining the {@link SystemStream} as an intermediate {@link SystemStream}
+   * @param <M>  the type of {@link MessageEnvelope}s in the output {@link SystemStream}
+   * @return  the {@link MessageStreamImpl} object
+   */
+  @Override
+  public <K, V, M extends MessageEnvelope<K, V>> OutputStream<M> createIntStream(StreamSpec streamSpec, Serde<K> keySerde, Serde<V> msgSerde) {
+    if (!this.inStreams.containsKey(streamSpec.getSystemStream())) {
+      this.inStreams.putIfAbsent(streamSpec.getSystemStream(), new IntermediateStreamImpl<K, K, V, M>(this, streamSpec, keySerde, msgSerde));
+    }
+    IntermediateStreamImpl<K, K, V, M> intStream = (IntermediateStreamImpl<K, K, V, M>) this.inStreams.get(streamSpec.getSystemStream());
+    if (!this.outStreams.containsKey(streamSpec.getSystemStream())) {
+      this.outStreams.putIfAbsent(streamSpec.getSystemStream(), intStream);
+    }
+    return intStream;
+  }
+
+  @Override public Map<StreamSpec, MessageStream> getInStreams() {
+    Map<StreamSpec, MessageStream> inStreamMap = new HashMap<>();
+    this.inStreams.forEach((ss, entry) -> inStreamMap.put(((InputStreamImpl) entry).getSpec(), entry));
+    return Collections.unmodifiableMap(inStreamMap);
+  }
+
+  @Override public Map<StreamSpec, OutputStream> getOutStreams() {
+    Map<StreamSpec, OutputStream> outStreamMap = new HashMap<>();
+    this.outStreams.forEach((ss, entry) -> outStreamMap.put(((OutputStreamImpl) entry).getSpec(), entry));
+    return Collections.unmodifiableMap(outStreamMap);
+  }
+
+  @Override
+  public StreamGraph withContextManager(ContextManager manager) {
+    this.contextManager = manager;
+    return this;
+  }
+
+  public int getNextOpId() {
+    return this.opId++;
+  }
+
+  public ContextManager getContextManager() {
+    return this.contextManager;
+  }
+
+  /**
+   * Helper method to be get the input stream via {@link SystemStream}
+   *
+   * @param systemStream  the {@link SystemStream}
+   * @return  a {@link MessageStreamImpl} object corresponding to the {@code systemStream}
+   */
+  public MessageStreamImpl getInputStream(SystemStream systemStream) {
+    if (this.inStreams.containsKey(systemStream)) {
+      return (MessageStreamImpl) this.inStreams.get(systemStream);
+    }
+    return null;
+  }
+
+  <M> OutputStream<M> getOutputStream(MessageStreamImpl<M> intStream) {
+    if (this.outStreams.containsValue(intStream)) {
+      return (OutputStream<M>) intStream;
+    }
+    return null;
+  }
+
+  <M> MessageStream<M> getIntStream(OutputStream<M> outStream) {
+    if (this.inStreams.containsValue(outStream)) {
+      return (MessageStream<M>) outStream;
+    }
+    return null;
+  }
+
+  /**
+   * Method to create intermediate topics for {@link MessageStreamImpl#partitionBy(Function)} method.
+   *
+   * @param parKeyFn  the function to extract the partition key from the input message
+   * @param <PK>  the type of partition key
+   * @param <M>  the type of input message
+   * @return  the {@link OutputStream} object for the re-partitioned stream
+   */
+  <PK, M> MessageStreamImpl<M> createIntStream(Function<M, PK> parKeyFn) {
+    // TODO: placeholder to auto-generate intermediate streams via {@link StreamSpec}
+    StreamSpec streamSpec = new StreamSpec() {
+      @Override
+      public SystemStream getSystemStream() {
+        // TODO: should auto-generate intermedaite stream name here
+        return new SystemStream("intermediate", String.format("par-%d", StreamGraphImpl.this.opId));
+      }
+
+      @Override
+      public Properties getProperties() {
+        return null;
+      }
+    };
+
+    if (!this.inStreams.containsKey(streamSpec.getSystemStream())) {
+      this.inStreams.putIfAbsent(streamSpec.getSystemStream(), new IntermediateStreamImpl(this, streamSpec, null, null, parKeyFn));
+    }
+    IntermediateStreamImpl intStream = (IntermediateStreamImpl) this.inStreams.get(streamSpec.getSystemStream());
+    if (!this.outStreams.containsKey(streamSpec.getSystemStream())) {
+      this.outStreams.putIfAbsent(streamSpec.getSystemStream(), intStream);
+    }
+    return intStream;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/c249443b/samza-operator/src/main/java/org/apache/samza/operators/StreamOperatorAdaptorTask.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/StreamOperatorAdaptorTask.java b/samza-operator/src/main/java/org/apache/samza/operators/StreamOperatorAdaptorTask.java
deleted file mode 100644
index 152cd92..0000000
--- a/samza-operator/src/main/java/org/apache/samza/operators/StreamOperatorAdaptorTask.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators;
-
-import org.apache.samza.config.Config;
-import org.apache.samza.operators.data.IncomingSystemMessageEnvelope;
-import org.apache.samza.operators.data.MessageEnvelope;
-import org.apache.samza.operators.impl.OperatorImpl;
-import org.apache.samza.operators.impl.OperatorImpls;
-import org.apache.samza.system.IncomingMessageEnvelope;
-import org.apache.samza.system.SystemStreamPartition;
-import org.apache.samza.task.InitableTask;
-import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.StreamTask;
-import org.apache.samza.task.TaskContext;
-import org.apache.samza.task.TaskCoordinator;
-import org.apache.samza.task.WindowableTask;
-
-import java.util.HashMap;
-import java.util.Map;
-
-
-/**
- * An {@link StreamTask} implementation that receives {@link IncomingSystemMessageEnvelope}s and propagates them
- * through the user's stream transformations defined in {@link StreamOperatorTask#transform(Map)} using the
- * {@link MessageStream} APIs.
- * <p>
- * This class brings all the operator API implementation components together and feeds the
- * {@link IncomingSystemMessageEnvelope}s into the transformation chains.
- * <p>
- * It accepts an instance of the user implemented {@link StreamOperatorTask}. When its own {@link #init(Config, TaskContext)}
- * method is called during startup, it creates a {@link MessageStreamImpl} corresponding to each of its input
- * {@link SystemStreamPartition}s and then calls the user's {@link StreamOperatorTask#transform(Map)} method.
- * <p>
- * When users invoke the methods on the {@link MessageStream} API to describe their stream transformations in the
- * {@link StreamOperatorTask#transform(Map)} method, the underlying {@link MessageStreamImpl} creates the
- * corresponding {@link org.apache.samza.operators.spec.OperatorSpec} to record information about the desired
- * transformation, and returns the output {@link MessageStream} to allow further transform chaining.
- * <p>
- * Once the user's transformation DAGs have been described for all {@link MessageStream}s (i.e., when the
- * {@link StreamOperatorTask#transform(Map)} call returns), it calls
- * {@link OperatorImpls#createOperatorImpls(MessageStreamImpl, TaskContext)} for each of the input
- * {@link MessageStreamImpl}. This instantiates the {@link org.apache.samza.operators.impl.OperatorImpl} DAG
- * corresponding to the aforementioned {@link org.apache.samza.operators.spec.OperatorSpec} DAG and returns the
- * root node of the DAG, which this class saves.
- * <p>
- * Now that it has the root for the DAG corresponding to each {@link SystemStreamPartition}, it can pass the message
- * envelopes received in {@link StreamTask#process(IncomingMessageEnvelope, MessageCollector, TaskCoordinator)} along
- * to the appropriate root nodes. From then on, each {@link org.apache.samza.operators.impl.OperatorImpl} propagates
- * its transformed output to the next set of {@link org.apache.samza.operators.impl.OperatorImpl}s.
- */
-public final class StreamOperatorAdaptorTask implements StreamTask, InitableTask, WindowableTask {
-
-  /**
-   * A mapping from each {@link SystemStreamPartition} to the root node of its operator chain DAG.
-   */
-  private final Map<SystemStreamPartition, OperatorImpl<IncomingSystemMessageEnvelope, ? extends MessageEnvelope>> operatorChains = new HashMap<>();
-
-  private final StreamOperatorTask userTask;
-
-  public StreamOperatorAdaptorTask(StreamOperatorTask userTask) {
-    this.userTask = userTask;
-  }
-
-  @Override
-  public final void init(Config config, TaskContext context) throws Exception {
-    if (this.userTask instanceof InitableTask) {
-      ((InitableTask) this.userTask).init(config, context);
-    }
-    Map<SystemStreamPartition, MessageStream<IncomingSystemMessageEnvelope>> messageStreams = new HashMap<>();
-    context.getSystemStreamPartitions().forEach(ssp -> messageStreams.put(ssp, new MessageStreamImpl<>()));
-    this.userTask.transform(messageStreams);
-    messageStreams.forEach((ssp, ms) ->
-        operatorChains.put(ssp, OperatorImpls.createOperatorImpls((MessageStreamImpl<IncomingSystemMessageEnvelope>) ms, context)));
-  }
-
-  @Override
-  public final void process(IncomingMessageEnvelope ime, MessageCollector collector, TaskCoordinator coordinator) {
-    this.operatorChains.get(ime.getSystemStreamPartition())
-        .onNext(new IncomingSystemMessageEnvelope(ime), collector, coordinator);
-  }
-
-  @Override
-  public final void window(MessageCollector collector, TaskCoordinator coordinator) throws Exception {
-    if (this.userTask instanceof WindowableTask) {
-      ((WindowableTask) this.userTask).window(collector, coordinator);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/c249443b/samza-operator/src/main/java/org/apache/samza/operators/functions/PartialJoinFunction.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/functions/PartialJoinFunction.java b/samza-operator/src/main/java/org/apache/samza/operators/functions/PartialJoinFunction.java
new file mode 100644
index 0000000..809a70a
--- /dev/null
+++ b/samza-operator/src/main/java/org/apache/samza/operators/functions/PartialJoinFunction.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.functions;
+
+import org.apache.samza.annotation.InterfaceStability;
+
+
+/**
+ * This defines the interface function a two-way join functions that takes input messages from two input
+ * {@link org.apache.samza.operators.MessageStream}s and merge them into a single output joined message in the join output
+ */
+@InterfaceStability.Unstable
+public interface PartialJoinFunction<K, M, OM, RM> extends InitableFunction {
+
+  /**
+   * Method to perform join method on the two input messages
+   *
+   * @param m1  message from the first input stream
+   * @param om  message from the second input stream
+   * @return  the joined message in the output stream
+   */
+  RM apply(M m1, OM om);
+
+  /**
+   * Method to get the key from the input message
+   *
+   * @param message  the input message from the first strean
+   * @return  the join key in the {@code message}
+   */
+  K getKey(M message);
+
+  /**
+   * Method to get the key from the input message in the other stream
+   *
+   * @param message  the input message from the other stream
+   * @return  the join key in the {@code message}
+   */
+  K getOtherKey(OM message);
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/c249443b/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorGraph.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorGraph.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorGraph.java
new file mode 100644
index 0000000..66336f8
--- /dev/null
+++ b/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorGraph.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.impl;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.MessageStreamImpl;
+import org.apache.samza.operators.spec.OperatorSpec;
+import org.apache.samza.operators.spec.PartialJoinOperatorSpec;
+import org.apache.samza.operators.spec.SinkOperatorSpec;
+import org.apache.samza.operators.spec.StreamOperatorSpec;
+import org.apache.samza.operators.spec.WindowOperatorSpec;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.task.TaskContext;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+
+/**
+ * Instantiates the DAG of {@link OperatorImpl}s corresponding to the {@link OperatorSpec}s for a
+ * {@link MessageStreamImpl}
+ */
+public class OperatorGraph {
+
+  /**
+   * A {@link Map} from {@link OperatorSpec} to {@link OperatorImpl}. This map registers all {@link OperatorImpl} in the DAG
+   * of {@link OperatorImpl} in a {@link org.apache.samza.container.TaskInstance}. Each {@link OperatorImpl} is created
+   * according to a single instance of {@link OperatorSpec}.
+   */
+  private final Map<OperatorSpec, OperatorImpl> operators = new HashMap<>();
+
+  /**
+   * This {@link Map} describes the DAG of {@link OperatorImpl} that are chained together to process the input messages.
+   */
+  private final Map<SystemStream, RootOperatorImpl> operatorGraph = new HashMap<>();
+
+  /**
+   * Initialize the whole DAG of {@link OperatorImpl}s, based on the input {@link MessageStreamImpl} from the {@link org.apache.samza.operators.StreamGraph}.
+   * This method will traverse each input {@link org.apache.samza.operators.MessageStream} in the {@code inputStreams} and
+   * instantiate the corresponding {@link OperatorImpl} chains that take the {@link org.apache.samza.operators.MessageStream} as input.
+   *
+   * @param inputStreams  the map of input {@link org.apache.samza.operators.MessageStream}s
+   * @param config  the {@link Config} required to instantiate operators
+   * @param context  the {@link TaskContext} required to instantiate operators
+   */
+  public void init(Map<SystemStream, MessageStreamImpl> inputStreams, Config config, TaskContext context) {
+    inputStreams.forEach((ss, mstream) -> this.operatorGraph.put(ss, this.createOperatorImpls(mstream, config, context)));
+  }
+
+  /**
+   * Method to get the corresponding {@link RootOperatorImpl}
+   *
+   * @param ss  input {@link SystemStream}
+   * @param <M>  the type of input message
+   * @return  the {@link OperatorImpl} that starts processing the input message
+   */
+  public <M> OperatorImpl<M, M> get(SystemStream ss) {
+    return this.operatorGraph.get(ss);
+  }
+
+  /**
+   * Traverses the DAG of {@link OperatorSpec}s starting from the provided {@link MessageStreamImpl},
+   * creates the corresponding DAG of {@link OperatorImpl}s, and returns its root {@link RootOperatorImpl} node.
+   *
+   * @param source  the input {@link MessageStreamImpl} to instantiate {@link OperatorImpl}s for
+   * @param <M>  the type of messagess in the {@code source} {@link MessageStreamImpl}
+   * @param config  the {@link Config} required to instantiate operators
+   * @param context  the {@link TaskContext} required to instantiate operators
+   * @return  root node for the {@link OperatorImpl} DAG
+   */
+  private <M> RootOperatorImpl<M> createOperatorImpls(MessageStreamImpl<M> source, Config config,
+      TaskContext context) {
+    // since the source message stream might have multiple operator specs registered on it,
+    // create a new root node as a single point of entry for the DAG.
+    RootOperatorImpl<M> rootOperator = new RootOperatorImpl<>();
+    // create the pipeline/topology starting from the source
+    source.getRegisteredOperatorSpecs().forEach(registeredOperator -> {
+        // pass in the source and context s.t. stateful stream operators can initialize their stores
+        OperatorImpl<M, ?> operatorImpl =
+            this.createAndRegisterOperatorImpl(registeredOperator, source, config, context);
+        rootOperator.registerNextOperator(operatorImpl);
+      });
+    return rootOperator;
+  }
+
+  /**
+   * Helper method to recursively traverse the {@link OperatorSpec} DAG and instantiate and link the corresponding
+   * {@link OperatorImpl}s.
+   *
+   * @param operatorSpec  the operatorSpec registered with the {@code source}
+   * @param source  the source {@link MessageStreamImpl}
+   * @param <M>  type of input message
+   * @param config  the {@link Config} required to instantiate operators
+   * @param context  the {@link TaskContext} required to instantiate operators
+   * @return  the operator implementation for the operatorSpec
+   */
+  private <M> OperatorImpl<M, ?> createAndRegisterOperatorImpl(OperatorSpec operatorSpec,
+      MessageStreamImpl<M> source, Config config, TaskContext context) {
+    if (!operators.containsKey(operatorSpec)) {
+      OperatorImpl<M, ?> operatorImpl = createOperatorImpl(source, operatorSpec, config, context);
+      if (operators.putIfAbsent(operatorSpec, operatorImpl) == null) {
+        // this is the first time we've added the operatorImpl corresponding to the operatorSpec,
+        // so traverse and initialize and register the rest of the DAG.
+        // initialize the corresponding operator function
+        operatorSpec.init(config, context);
+        MessageStreamImpl nextStream = operatorSpec.getNextStream();
+        if (nextStream != null) {
+          Collection<OperatorSpec> registeredSpecs = nextStream.getRegisteredOperatorSpecs();
+          registeredSpecs.forEach(registeredSpec -> {
+              OperatorImpl subImpl = this.createAndRegisterOperatorImpl(registeredSpec, nextStream, config, context);
+              operatorImpl.registerNextOperator(subImpl);
+            });
+        }
+        return operatorImpl;
+      }
+    }
+
+    // the implementation corresponding to operatorSpec has already been instantiated
+    // and registered, so we do not need to traverse the DAG further.
+    return operators.get(operatorSpec);
+  }
+
+  /**
+   * Creates a new {@link OperatorImpl} instance for the provided {@link OperatorSpec}.
+   *
+   * @param source  the source {@link MessageStreamImpl}
+   * @param <M>  type of input message
+   * @param operatorSpec  the immutable {@link OperatorSpec} definition.
+   * @param config  the {@link Config} required to instantiate operators
+   * @param context  the {@link TaskContext} required to instantiate operators
+   * @return  the {@link OperatorImpl} implementation instance
+   */
+  private static <M> OperatorImpl<M, ?> createOperatorImpl(MessageStreamImpl<M> source, OperatorSpec operatorSpec, Config config, TaskContext context) {
+    if (operatorSpec instanceof StreamOperatorSpec) {
+      StreamOperatorSpec<M, ?> streamOpSpec = (StreamOperatorSpec<M, ?>) operatorSpec;
+      return new StreamOperatorImpl<>(streamOpSpec, source, config, context);
+    } else if (operatorSpec instanceof SinkOperatorSpec) {
+      return new SinkOperatorImpl<>((SinkOperatorSpec<M>) operatorSpec, config, context);
+    } else if (operatorSpec instanceof WindowOperatorSpec) {
+      return new WindowOperatorImpl<>((WindowOperatorSpec<M, ?, ?>) operatorSpec, source, config, context);
+    } else if (operatorSpec instanceof PartialJoinOperatorSpec) {
+      return new PartialJoinOperatorImpl<>((PartialJoinOperatorSpec) operatorSpec, source, config, context);
+    }
+    throw new IllegalArgumentException(
+        String.format("Unsupported OperatorSpec: %s", operatorSpec.getClass().getName()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/c249443b/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
index c77914e..abb1fa9 100644
--- a/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
+++ b/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
@@ -18,10 +18,7 @@
  */
 package org.apache.samza.operators.impl;
 
-import org.apache.samza.operators.MessageStream;
-import org.apache.samza.operators.data.MessageEnvelope;
 import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.TaskContext;
 import org.apache.samza.task.TaskCoordinator;
 
 import java.util.HashSet;
@@ -31,32 +28,24 @@ import java.util.Set;
 /**
  * Abstract base class for all stream operator implementations.
  */
-public abstract class OperatorImpl<M extends MessageEnvelope, RM extends MessageEnvelope> {
+public abstract class OperatorImpl<M, RM> {
 
-  private final Set<OperatorImpl<RM, ? extends MessageEnvelope>> nextOperators = new HashSet<>();
+  private final Set<OperatorImpl<RM, ?>> nextOperators = new HashSet<>();
 
   /**
    * Register the next operator in the chain that this operator should propagate its output to.
    * @param nextOperator  the next operator in the chain.
    */
-  void registerNextOperator(OperatorImpl<RM, ? extends MessageEnvelope> nextOperator) {
+  void registerNextOperator(OperatorImpl<RM, ?> nextOperator) {
     nextOperators.add(nextOperator);
   }
 
   /**
-   * Initialize the initial state for stateful operators.
-   *
-   * @param source  the source that this {@link OperatorImpl} operator is registered with
-   * @param context  the task context to initialize the operator implementation
-   */
-  public void init(MessageStream<M> source, TaskContext context) {}
-
-  /**
    * Perform the transformation required for this operator and call the downstream operators.
    *
    * Must call {@link #propagateResult} to propage the output to registered downstream operators correctly.
    *
-   * @param message  the input {@link MessageEnvelope}
+   * @param message  the input message
    * @param collector  the {@link MessageCollector} in the context
    * @param coordinator  the {@link TaskCoordinator} in the context
    */
@@ -67,11 +56,12 @@ public abstract class OperatorImpl<M extends MessageEnvelope, RM extends Message
    *
    * This method <b>must</b> be called from {@link #onNext} to propagate the operator output correctly.
    *
-   * @param outputMessage  output {@link MessageEnvelope}
+   * @param outputMessage  output message
    * @param collector  the {@link MessageCollector} in the context
    * @param coordinator  the {@link TaskCoordinator} in the context
    */
   void propagateResult(RM outputMessage, MessageCollector collector, TaskCoordinator coordinator) {
     nextOperators.forEach(sub -> sub.onNext(outputMessage, collector, coordinator));
   }
+
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/c249443b/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorImpls.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorImpls.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorImpls.java
deleted file mode 100644
index 02095cb..0000000
--- a/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorImpls.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.impl;
-
-
-import org.apache.samza.operators.MessageStream;
-import org.apache.samza.operators.MessageStreamImpl;
-import org.apache.samza.operators.data.MessageEnvelope;
-import org.apache.samza.operators.spec.OperatorSpec;
-import org.apache.samza.operators.spec.PartialJoinOperatorSpec;
-import org.apache.samza.operators.spec.SinkOperatorSpec;
-import org.apache.samza.operators.spec.StreamOperatorSpec;
-import org.apache.samza.operators.spec.WindowOperatorSpec;
-import org.apache.samza.operators.windows.WindowPane;
-import org.apache.samza.task.TaskContext;
-
-import java.util.Collection;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-
-/**
- * Instantiates the DAG of {@link OperatorImpl}s corresponding to the {@link OperatorSpec}s for a
- * {@link MessageStreamImpl}
- */
-public class OperatorImpls {
-
-  /**
-   * Holds the mapping between the {@link OperatorSpec} and {@link OperatorImpl}s instances.
-   */
-  private static final Map<OperatorSpec, OperatorImpl> OPERATOR_IMPLS = new ConcurrentHashMap<>();
-
-  /**
-   * Traverses the DAG of {@link OperatorSpec}s starting from the provided {@link MessageStreamImpl},
-   * creates the corresponding DAG of {@link OperatorImpl}s, and returns its root {@link RootOperatorImpl} node.
-   *
-   * @param source  the input {@link MessageStreamImpl} to instantiate {@link OperatorImpl}s for
-   * @param <M>  the type of {@link MessageEnvelope}s in the {@code source} {@link MessageStream}
-   * @param context  the {@link TaskContext} required to instantiate operators
-   * @return  root node for the {@link OperatorImpl} DAG
-   */
-  public static <M extends MessageEnvelope> RootOperatorImpl createOperatorImpls(MessageStreamImpl<M> source, TaskContext context) {
-    // since the source message stream might have multiple operator specs registered on it,
-    // create a new root node as a single point of entry for the DAG.
-    RootOperatorImpl<M> rootOperator = new RootOperatorImpl<>();
-    // create the pipeline/topology starting from the source
-    source.getRegisteredOperatorSpecs().forEach(registeredOperator -> {
-        // pass in the source and context s.t. stateful stream operators can initialize their stores
-        OperatorImpl<M, ? extends MessageEnvelope> operatorImpl =
-            createAndRegisterOperatorImpl(registeredOperator, source, context);
-        rootOperator.registerNextOperator(operatorImpl);
-      });
-    return rootOperator;
-  }
-
-  /**
-   * Helper method to recursively traverse the {@link OperatorSpec} DAG and instantiate and link the corresponding
-   * {@link OperatorImpl}s.
-   *
-   * @param operatorSpec  the operatorSpec registered with the {@code source}
-   * @param source  the source {@link MessageStreamImpl}
-   * @param context  the context of the task
-   * @return  the operator implementation for the operatorSpec
-   */
-  private static <M extends MessageEnvelope> OperatorImpl<M, ? extends MessageEnvelope> createAndRegisterOperatorImpl(OperatorSpec operatorSpec,
-      MessageStream source, TaskContext context) {
-    if (!OPERATOR_IMPLS.containsKey(operatorSpec)) {
-      OperatorImpl<M, ? extends MessageEnvelope> operatorImpl = createOperatorImpl(operatorSpec);
-      if (OPERATOR_IMPLS.putIfAbsent(operatorSpec, operatorImpl) == null) {
-        // this is the first time we've added the operatorImpl corresponding to the operatorSpec,
-        // so traverse and initialize and register the rest of the DAG.
-        MessageStream<? extends MessageEnvelope> outStream = operatorSpec.getOutputStream();
-        Collection<OperatorSpec> registeredSpecs = ((MessageStreamImpl) outStream).getRegisteredOperatorSpecs();
-        registeredSpecs.forEach(registeredSpec -> {
-            OperatorImpl subImpl = createAndRegisterOperatorImpl(registeredSpec, outStream, context);
-            operatorImpl.registerNextOperator(subImpl);
-          });
-        operatorImpl.init(source, context);
-        return operatorImpl;
-      }
-    }
-
-    // the implementation corresponding to operatorSpec has already been instantiated
-    // and registered, so we do not need to traverse the DAG further.
-    return OPERATOR_IMPLS.get(operatorSpec);
-  }
-
-  /**
-   * Creates a new {@link OperatorImpl} instance for the provided {@link OperatorSpec}.
-   *
-   * @param operatorSpec  the immutable {@link OperatorSpec} definition.
-   * @param <M>  type of input {@link MessageEnvelope}
-   * @return  the {@link OperatorImpl} implementation instance
-   */
-  protected static <M extends MessageEnvelope> OperatorImpl<M, ? extends MessageEnvelope> createOperatorImpl(OperatorSpec operatorSpec) {
-    if (operatorSpec instanceof StreamOperatorSpec) {
-      return new StreamOperatorImpl<>((StreamOperatorSpec<M, ? extends MessageEnvelope>) operatorSpec);
-    } else if (operatorSpec instanceof SinkOperatorSpec) {
-      return new SinkOperatorImpl<>((SinkOperatorSpec<M>) operatorSpec);
-    } else if (operatorSpec instanceof WindowOperatorSpec) {
-      return new WindowOperatorImpl<>((WindowOperatorSpec<M, ?, ?, ?, ? extends WindowPane>) operatorSpec);
-    } else if (operatorSpec instanceof PartialJoinOperatorSpec) {
-      return new PartialJoinOperatorImpl<>((PartialJoinOperatorSpec) operatorSpec);
-    }
-    throw new IllegalArgumentException(
-        String.format("Unsupported OperatorSpec: %s", operatorSpec.getClass().getName()));
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/c249443b/samza-operator/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java
index 90569b4..c8515e1 100644
--- a/samza-operator/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java
+++ b/samza-operator/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java
@@ -18,9 +18,11 @@
  */
 package org.apache.samza.operators.impl;
 
-import org.apache.samza.operators.data.MessageEnvelope;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.MessageStreamImpl;
 import org.apache.samza.operators.spec.PartialJoinOperatorSpec;
 import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskContext;
 import org.apache.samza.task.TaskCoordinator;
 
 
@@ -28,14 +30,13 @@ import org.apache.samza.task.TaskCoordinator;
  * Implementation of a {@link PartialJoinOperatorSpec}. This class implements function
  * that only takes in one input stream among all inputs to the join and generate the join output.
  *
- * @param <M>  type of {@link MessageEnvelope}s in the input stream
- * @param <JM>  type of {@link MessageEnvelope}s in the stream to join with
- * @param <RM>  type of {@link MessageEnvelope}s in the joined stream
+ * @param <M>  type of messages in the input stream
+ * @param <JM>  type of messages in the stream to join with
+ * @param <RM>  type of messages in the joined stream
  */
-class PartialJoinOperatorImpl<M extends MessageEnvelope<K, ?>, K, JM extends MessageEnvelope<K, ?>, RM extends MessageEnvelope>
-    extends OperatorImpl<M, RM> {
+class PartialJoinOperatorImpl<M, K, JM, RM> extends OperatorImpl<M, RM> {
 
-  PartialJoinOperatorImpl(PartialJoinOperatorSpec<M, K, JM, RM> joinOp) {
+  PartialJoinOperatorImpl(PartialJoinOperatorSpec<M, K, JM, RM> joinOp, MessageStreamImpl<M> source, Config config, TaskContext context) {
     // TODO: implement PartialJoinOperatorImpl constructor
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/c249443b/samza-operator/src/main/java/org/apache/samza/operators/impl/RootOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/RootOperatorImpl.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/RootOperatorImpl.java
index 7132b86..4b30a5d 100644
--- a/samza-operator/src/main/java/org/apache/samza/operators/impl/RootOperatorImpl.java
+++ b/samza-operator/src/main/java/org/apache/samza/operators/impl/RootOperatorImpl.java
@@ -18,16 +18,15 @@
  */
 package org.apache.samza.operators.impl;
 
-import org.apache.samza.operators.data.MessageEnvelope;
 import org.apache.samza.task.MessageCollector;
 import org.apache.samza.task.TaskCoordinator;
 
 
 /**
- * A no-op operator implementation that forwards incoming {@link MessageEnvelope}s to all of its subscribers.
- * @param <M>  type of incoming {@link MessageEnvelope}s
+ * A no-op operator implementation that forwards incoming messages to all of its subscribers.
+ * @param <M>  type of incoming messages
  */
-final class RootOperatorImpl<M extends MessageEnvelope> extends OperatorImpl<M, M> {
+final class RootOperatorImpl<M> extends OperatorImpl<M, M> {
 
   @Override
   public void onNext(M message, MessageCollector collector, TaskCoordinator coordinator) {

http://git-wip-us.apache.org/repos/asf/samza/blob/c249443b/samza-operator/src/main/java/org/apache/samza/operators/impl/SessionWindowOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/SessionWindowOperatorImpl.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/SessionWindowOperatorImpl.java
new file mode 100644
index 0000000..2bb362c
--- /dev/null
+++ b/samza-operator/src/main/java/org/apache/samza/operators/impl/SessionWindowOperatorImpl.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.impl;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.MessageStreamImpl;
+import org.apache.samza.operators.spec.WindowOperatorSpec;
+import org.apache.samza.operators.windows.WindowPane;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskContext;
+import org.apache.samza.task.TaskCoordinator;
+
+
+/**
+ * Default implementation class of a {@link WindowOperatorSpec} for a session window.
+ *
+ * @param <M>  the type of input message
+ * @param <RK>  the type of window key
+ * @param <WV>  the type of window state
+ */
+class SessionWindowOperatorImpl<M, RK, WV> extends OperatorImpl<M, WindowPane<RK, WV>> {
+
+  private final WindowOperatorSpec<M, RK, WV> windowSpec;
+
+  SessionWindowOperatorImpl(WindowOperatorSpec<M, RK, WV> windowSpec, MessageStreamImpl<M> source, Config config, TaskContext context) {
+    this.windowSpec = windowSpec;
+  }
+
+  @Override
+  public void onNext(M message, MessageCollector collector, TaskCoordinator coordinator) {
+  }
+
+  public void onTimer(MessageCollector collector, TaskCoordinator coordinator) {
+    // This is to periodically check the timeout triggers to get the list of window states to be updated
+  }
+}