You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2017/02/16 18:40:48 UTC

[05/14] samza git commit: SAMZA-1073: top-level fluent API `

http://git-wip-us.apache.org/repos/asf/samza/blob/b3dd886d/samza-operator/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java
index abed03f..41d1778 100644
--- a/samza-operator/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java
+++ b/samza-operator/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java
@@ -18,21 +18,22 @@
  */
 package org.apache.samza.operators.impl;
 
-import org.apache.samza.operators.data.MessageEnvelope;
+import org.apache.samza.config.Config;
 import org.apache.samza.operators.functions.SinkFunction;
 import org.apache.samza.operators.spec.SinkOperatorSpec;
 import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskContext;
 import org.apache.samza.task.TaskCoordinator;
 
 
 /**
  * Implementation for {@link SinkOperatorSpec}
  */
-class SinkOperatorImpl<M extends MessageEnvelope> extends OperatorImpl<M, MessageEnvelope> {
+class SinkOperatorImpl<M> extends OperatorImpl<M, M> {
 
   private final SinkFunction<M> sinkFn;
 
-  SinkOperatorImpl(SinkOperatorSpec<M> sinkOp) {
+  SinkOperatorImpl(SinkOperatorSpec<M> sinkOp, Config config, TaskContext context) {
     this.sinkFn = sinkOp.getSinkFn();
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/b3dd886d/samza-operator/src/main/java/org/apache/samza/operators/impl/StreamOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/StreamOperatorImpl.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/StreamOperatorImpl.java
index 3a5c56e..644de20 100644
--- a/samza-operator/src/main/java/org/apache/samza/operators/impl/StreamOperatorImpl.java
+++ b/samza-operator/src/main/java/org/apache/samza/operators/impl/StreamOperatorImpl.java
@@ -18,24 +18,26 @@
  */
 package org.apache.samza.operators.impl;
 
-import org.apache.samza.operators.data.MessageEnvelope;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.MessageStreamImpl;
 import org.apache.samza.operators.functions.FlatMapFunction;
 import org.apache.samza.operators.spec.StreamOperatorSpec;
 import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskContext;
 import org.apache.samza.task.TaskCoordinator;
 
 
 /**
- * A StreamOperator that accepts a 1:n transform function and applies it to each incoming {@link MessageEnvelope}.
+ * A StreamOperator that accepts a 1:n transform function and applies it to each incoming message.
  *
- * @param <M>  type of {@link MessageEnvelope} in the input stream
- * @param <RM>  type of {@link MessageEnvelope} in the output stream
+ * @param <M>  type of message in the input stream
+ * @param <RM>  type of message in the output stream
  */
-class StreamOperatorImpl<M extends MessageEnvelope, RM extends MessageEnvelope> extends OperatorImpl<M, RM> {
+class StreamOperatorImpl<M, RM> extends OperatorImpl<M, RM> {
 
   private final FlatMapFunction<M, RM> transformFn;
 
-  StreamOperatorImpl(StreamOperatorSpec<M, RM> streamOperatorSpec) {
+  StreamOperatorImpl(StreamOperatorSpec<M, RM> streamOperatorSpec, MessageStreamImpl<M> source, Config config, TaskContext context) {
     this.transformFn = streamOperatorSpec.getTransformFn();
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/b3dd886d/samza-operator/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java
index a5b71a7..af00553 100644
--- a/samza-operator/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java
+++ b/samza-operator/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java
@@ -18,18 +18,21 @@
  */
 package org.apache.samza.operators.impl;
 
-import org.apache.samza.operators.data.MessageEnvelope;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.MessageStreamImpl;
 import org.apache.samza.operators.spec.WindowOperatorSpec;
 import org.apache.samza.operators.windows.WindowPane;
 import org.apache.samza.operators.windows.internal.WindowInternal;
 import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskContext;
 import org.apache.samza.task.TaskCoordinator;
 
-public class WindowOperatorImpl<M extends MessageEnvelope, K, WK, WV, WM extends WindowPane<WK, WV>> extends OperatorImpl<M, WM> {
+public class WindowOperatorImpl<M, WK, WV> extends OperatorImpl<M, WindowPane<WK, WV>> {
 
-  private final WindowInternal<M, K, WV> window;
+  private final WindowInternal<M, WK, WV> window;
 
-  public WindowOperatorImpl(WindowOperatorSpec spec) {
+  public WindowOperatorImpl(WindowOperatorSpec spec, MessageStreamImpl<M> source, Config config, TaskContext context) {
+    // source, config, and context are used to initialize the window kv-store
     window = spec.getWindow();
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/b3dd886d/samza-operator/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java b/samza-operator/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java
index 8b75cdc..1444662 100644
--- a/samza-operator/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java
+++ b/samza-operator/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java
@@ -18,20 +18,45 @@
  */
 package org.apache.samza.operators.spec;
 
-import org.apache.samza.operators.MessageStream;
-import org.apache.samza.operators.data.MessageEnvelope;
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.MessageStreamImpl;
+import org.apache.samza.task.TaskContext;
 
 
 /**
  * A stateless serializable stream operator specification that holds all the information required
- * to transform the input {@link MessageStream} and produce the output {@link MessageStream}.
+ * to transform the input {@link MessageStreamImpl} and produce the output {@link MessageStreamImpl}.
+ *
+ * @param <OM>  the type of output message from the operator
  */
-public interface OperatorSpec<OM extends MessageEnvelope> {
+@InterfaceStability.Unstable
+public interface OperatorSpec<OM> {
+
+  enum OpCode {
+    MAP,
+    FLAT_MAP,
+    FILTER,
+    SINK,
+    SEND_TO,
+    JOIN,
+    WINDOW,
+    MERGE,
+    PARTITION_BY
+  }
+
 
   /**
-   * Get the output stream containing transformed {@link MessageEnvelope} produced by this operator.
-   * @return  the output stream containing transformed {@link MessageEnvelope} produced by this operator.
+   * Get the output stream containing transformed messages produced by this operator.
+   * @return  the output stream containing transformed messages produced by this operator.
    */
-  MessageStream<OM> getOutputStream();
+  MessageStreamImpl<OM> getNextStream();
 
+  /**
+   * Init method to initialize the context for this {@link OperatorSpec}. The default implementation is NO-OP.
+   *
+   * @param config  the {@link Config} object for this task
+   * @param context  the {@link TaskContext} object for this task
+   */
+  default void init(Config config, TaskContext context) { }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/b3dd886d/samza-operator/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java b/samza-operator/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java
index fc25929..d626852 100644
--- a/samza-operator/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java
+++ b/samza-operator/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java
@@ -19,16 +19,21 @@
 
 package org.apache.samza.operators.spec;
 
-import org.apache.samza.operators.data.MessageEnvelope;
+import java.util.Collection;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.MessageStreamImpl;
+import org.apache.samza.operators.OutputStream;
+import org.apache.samza.operators.StreamGraphImpl;
+import org.apache.samza.operators.functions.FilterFunction;
 import org.apache.samza.operators.functions.FlatMapFunction;
+import org.apache.samza.operators.functions.MapFunction;
+import org.apache.samza.operators.functions.PartialJoinFunction;
 import org.apache.samza.operators.functions.SinkFunction;
-import org.apache.samza.operators.MessageStreamImpl;
 import org.apache.samza.operators.windows.WindowPane;
 import org.apache.samza.operators.windows.internal.WindowInternal;
 
 import java.util.ArrayList;
-import java.util.UUID;
-import java.util.function.BiFunction;
+import org.apache.samza.task.TaskContext;
 
 
 /**
@@ -38,80 +43,168 @@ public class OperatorSpecs {
 
   private OperatorSpecs() {}
 
-  private static String getOperatorId() {
-    // TODO: need to change the IDs to be a consistent, durable IDs that can be recovered across container and job restarts
-    return UUID.randomUUID().toString();
+  /**
+   * Creates a {@link StreamOperatorSpec} for {@link MapFunction}
+   *
+   * @param mapFn  the map function
+   * @param graph  the {@link StreamGraphImpl} object
+   * @param output  the output {@link MessageStreamImpl} object
+   * @param <M>  type of input message
+   * @param <OM>  type of output message
+   * @return  the {@link StreamOperatorSpec}
+   */
+  public static <M, OM> StreamOperatorSpec<M, OM> createMapOperatorSpec(MapFunction<M, OM> mapFn, StreamGraphImpl graph, MessageStreamImpl<OM> output) {
+    return new StreamOperatorSpec<>(new FlatMapFunction<M, OM>() {
+      @Override
+      public Collection<OM> apply(M message) {
+        return new ArrayList<OM>() {
+          {
+            OM r = mapFn.apply(message);
+            if (r != null) {
+              this.add(r);
+            }
+          }
+        };
+      }
+
+      @Override
+      public void init(Config config, TaskContext context) {
+        mapFn.init(config, context);
+      }
+    }, output, OperatorSpec.OpCode.MAP, graph.getNextOpId());
+  }
+
+  /**
+   * Creates a {@link StreamOperatorSpec} for {@link FilterFunction}
+   *
+   * @param filterFn  the transformation function
+   * @param graph  the {@link StreamGraphImpl} object
+   * @param output  the output {@link MessageStreamImpl} object
+   * @param <M>  type of input message
+   * @return  the {@link StreamOperatorSpec}
+   */
+  public static <M> StreamOperatorSpec<M, M> createFilterOperatorSpec(FilterFunction<M> filterFn, StreamGraphImpl graph, MessageStreamImpl<M> output) {
+    return new StreamOperatorSpec<>(new FlatMapFunction<M, M>() {
+      @Override
+      public Collection<M> apply(M message) {
+        return new ArrayList<M>() {
+          {
+            if (filterFn.apply(message)) {
+              this.add(message);
+            }
+          }
+        };
+      }
+
+      @Override
+      public void init(Config config, TaskContext context) {
+        filterFn.init(config, context);
+      }
+    }, output, OperatorSpec.OpCode.FILTER, graph.getNextOpId());
   }
 
   /**
    * Creates a {@link StreamOperatorSpec}.
    *
    * @param transformFn  the transformation function
-   * @param <M>  type of input {@link MessageEnvelope}
-   * @param <OM>  type of output {@link MessageEnvelope}
+   * @param graph  the {@link StreamGraphImpl} object
+   * @param output  the output {@link MessageStreamImpl} object
+   * @param <M>  type of input message
+   * @param <OM>  type of output message
    * @return  the {@link StreamOperatorSpec}
    */
-  public static <M extends MessageEnvelope, OM extends MessageEnvelope> StreamOperatorSpec<M, OM> createStreamOperatorSpec(
-      FlatMapFunction<M, OM> transformFn) {
-    return new StreamOperatorSpec<>(transformFn);
+  public static <M, OM> StreamOperatorSpec<M, OM> createStreamOperatorSpec(
+      FlatMapFunction<M, OM> transformFn, StreamGraphImpl graph, MessageStreamImpl<OM> output) {
+    return new StreamOperatorSpec<>(transformFn, output, OperatorSpec.OpCode.FLAT_MAP, graph.getNextOpId());
+  }
+
+  /**
+   * Creates a {@link SinkOperatorSpec}.
+   *
+   * @param sinkFn  the sink function
+   * @param <M>  type of input message
+   * @param graph  the {@link StreamGraphImpl} object
+   * @return  the {@link SinkOperatorSpec}
+   */
+  public static <M> SinkOperatorSpec<M> createSinkOperatorSpec(SinkFunction<M> sinkFn, StreamGraphImpl graph) {
+    return new SinkOperatorSpec<>(sinkFn, OperatorSpec.OpCode.SINK, graph.getNextOpId());
+  }
+
+  /**
+   * Creates a {@link SinkOperatorSpec}.
+   *
+   * @param sinkFn  the sink function
+   * @param graph  the {@link StreamGraphImpl} object
+   * @param stream  the {@link OutputStream} where the message is sent to
+   * @param <M>  type of input message
+   * @return  the {@link SinkOperatorSpec}
+   */
+  public static <M> SinkOperatorSpec<M> createSendToOperatorSpec(SinkFunction<M> sinkFn, StreamGraphImpl graph, OutputStream<M> stream) {
+    return new SinkOperatorSpec<>(sinkFn, OperatorSpec.OpCode.SEND_TO, graph.getNextOpId(), stream);
   }
 
   /**
    * Creates a {@link SinkOperatorSpec}.
    *
    * @param sinkFn  the sink function
-   * @param <M>  type of input {@link MessageEnvelope}
+   * @param graph  the {@link StreamGraphImpl} object
+   * @param stream  the {@link OutputStream} where the message is sent to
+   * @param <M>  type of input message
    * @return  the {@link SinkOperatorSpec}
    */
-  public static <M extends MessageEnvelope> SinkOperatorSpec<M> createSinkOperatorSpec(SinkFunction<M> sinkFn) {
-    return new SinkOperatorSpec<>(sinkFn);
+  public static <M> SinkOperatorSpec<M> createPartitionOperatorSpec(SinkFunction<M> sinkFn, StreamGraphImpl graph, OutputStream<M> stream) {
+    return new SinkOperatorSpec<>(sinkFn, OperatorSpec.OpCode.PARTITION_BY, graph.getNextOpId(), stream);
   }
 
   /**
    * Creates a {@link WindowOperatorSpec}.
    *
    * @param window the description of the window.
-   * @param <M> the type of input {@link MessageEnvelope}
-   * @param <K> the type of key in the {@link MessageEnvelope} in this {@link org.apache.samza.operators.MessageStream}. If a key is specified,
-   *            results are emitted per-key
+   * @param graph  the {@link StreamGraphImpl} object
+   * @param wndOutput  the window output {@link MessageStreamImpl} object
+   * @param <M> the type of input message
    * @param <WK> the type of key in the {@link WindowPane}
    * @param <WV> the type of value in the window
-   * @param <WM> the type of output {@link WindowPane}
    * @return  the {@link WindowOperatorSpec}
    */
 
-  public static <M extends MessageEnvelope, K, WK, WV, WM extends WindowPane<WK, WV>> WindowOperatorSpec<M, K, WK, WV, WM> createWindowOperatorSpec(WindowInternal<M, K, WV> window) {
-    return new WindowOperatorSpec<>(window, OperatorSpecs.getOperatorId());
+  public static <M, WK, WV> WindowOperatorSpec<M, WK, WV> createWindowOperatorSpec(
+      WindowInternal<M, WK, WV> window, StreamGraphImpl graph, MessageStreamImpl<WindowPane<WK, WV>> wndOutput) {
+    return new WindowOperatorSpec<>(window, wndOutput, graph.getNextOpId());
   }
 
   /**
    * Creates a {@link PartialJoinOperatorSpec}.
    *
    * @param partialJoinFn  the join function
+   * @param graph  the {@link StreamGraphImpl} object
    * @param joinOutput  the output {@link MessageStreamImpl}
-   * @param <M>  type of input {@link MessageEnvelope}
+   * @param <M>  type of input message
    * @param <K>  type of join key
-   * @param <JM>  the type of {@link MessageEnvelope} in the other join stream
-   * @param <OM>  the type of {@link MessageEnvelope} in the join output
+   * @param <JM>  the type of message in the other join stream
+   * @param <OM>  the type of message in the join output
    * @return  the {@link PartialJoinOperatorSpec}
    */
-  public static <M extends MessageEnvelope<K, ?>, K, JM extends MessageEnvelope<K, ?>, OM extends MessageEnvelope> PartialJoinOperatorSpec<M, K, JM, OM> createPartialJoinOperatorSpec(
-      BiFunction<M, JM, OM> partialJoinFn, MessageStreamImpl<OM> joinOutput) {
-    return new PartialJoinOperatorSpec<>(partialJoinFn, joinOutput, OperatorSpecs.getOperatorId());
+  public static <M, K, JM, OM> PartialJoinOperatorSpec<M, K, JM, OM> createPartialJoinOperatorSpec(
+      PartialJoinFunction<K, M, JM, OM> partialJoinFn, StreamGraphImpl graph, MessageStreamImpl<OM> joinOutput) {
+    return new PartialJoinOperatorSpec<>(partialJoinFn, joinOutput, graph.getNextOpId());
   }
 
   /**
    * Creates a {@link StreamOperatorSpec} with a merger function.
    *
+   * @param graph  the {@link StreamGraphImpl} object
    * @param mergeOutput  the output {@link MessageStreamImpl} from the merger
-   * @param <M>  the type of input {@link MessageEnvelope}
+   * @param <M>  the type of input message
    * @return  the {@link StreamOperatorSpec} for the merge
    */
-  public static <M extends MessageEnvelope> StreamOperatorSpec<M, M> createMergeOperatorSpec(MessageStreamImpl<M> mergeOutput) {
-    return new StreamOperatorSpec<M, M>(t ->
-      new ArrayList<M>() { {
-          this.add(t);
-        } },
-      mergeOutput);
+  public static <M> StreamOperatorSpec<M, M> createMergeOperatorSpec(StreamGraphImpl graph, MessageStreamImpl<M> mergeOutput) {
+    return new StreamOperatorSpec<M, M>(message ->
+        new ArrayList<M>() {
+          {
+            this.add(message);
+          }
+        },
+        mergeOutput, OperatorSpec.OpCode.MERGE, graph.getNextOpId());
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/b3dd886d/samza-operator/src/main/java/org/apache/samza/operators/spec/PartialJoinOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/spec/PartialJoinOperatorSpec.java b/samza-operator/src/main/java/org/apache/samza/operators/spec/PartialJoinOperatorSpec.java
index e6d77f6..e057c2b 100644
--- a/samza-operator/src/main/java/org/apache/samza/operators/spec/PartialJoinOperatorSpec.java
+++ b/samza-operator/src/main/java/org/apache/samza/operators/spec/PartialJoinOperatorSpec.java
@@ -18,63 +18,69 @@
  */
 package org.apache.samza.operators.spec;
 
-import org.apache.samza.operators.data.MessageEnvelope;
+import org.apache.samza.config.Config;
 import org.apache.samza.operators.MessageStreamImpl;
-
-import java.util.function.BiFunction;
+import org.apache.samza.operators.functions.PartialJoinFunction;
+import org.apache.samza.task.TaskContext;
 
 
 /**
- * Spec for the partial join operator that takes {@link MessageEnvelope}s from one input stream, joins with buffered
- * {@link MessageEnvelope}s from another stream, and produces join results to an output {@link MessageStreamImpl}.
+ * Spec for the partial join operator that takes messages from one input stream, joins with buffered
+ * messages from another stream, and produces join results to an output {@link MessageStreamImpl}.
  *
- * @param <M>  the type of input {@link MessageEnvelope}
+ * @param <M>  the type of input message
  * @param <K>  the type of join key
- * @param <JM>  the type of {@link MessageEnvelope} in the other join stream
- * @param <RM>  the type of {@link MessageEnvelope} in the join output stream
+ * @param <JM>  the type of message in the other join stream
+ * @param <RM>  the type of message in the join output stream
  */
-public class PartialJoinOperatorSpec<M extends MessageEnvelope<K, ?>, K, JM extends MessageEnvelope<K, ?>, RM extends MessageEnvelope>
-    implements OperatorSpec<RM> {
+public class PartialJoinOperatorSpec<M, K, JM, RM> implements OperatorSpec<RM> {
 
   private final MessageStreamImpl<RM> joinOutput;
 
   /**
-   * The transformation function of {@link PartialJoinOperatorSpec} that takes an input {@link MessageEnvelope} of
-   * type {@code M}, joins with a stream of buffered {@link MessageEnvelope}s of type {@code JM} from another stream,
-   * and generates a joined result {@link MessageEnvelope} of type {@code RM}.
+   * The transformation function of {@link PartialJoinOperatorSpec} that takes an input message of
+   * type {@code M}, joins with a stream of buffered messages of type {@code JM} from another stream,
+   * and generates a joined result message of type {@code RM}.
    */
-  private final BiFunction<M, JM, RM> transformFn;
+  private final PartialJoinFunction<K, M, JM, RM> transformFn;
 
 
   /**
    * The unique ID for this operator.
    */
-  private final String operatorId;
+  private final int opId;
 
   /**
    * Default constructor for a {@link PartialJoinOperatorSpec}.
    *
-   * @param partialJoinFn  partial join function that take type {@code M} of input {@link MessageEnvelope} and join
-   *                       w/ type {@code JM} of buffered {@link MessageEnvelope} from another stream
+   * @param partialJoinFn  partial join function that take type {@code M} of input message and join
+   *                       w/ type {@code JM} of buffered message from another stream
    * @param joinOutput  the output {@link MessageStreamImpl} of the join results
    */
-  PartialJoinOperatorSpec(BiFunction<M, JM, RM> partialJoinFn, MessageStreamImpl<RM> joinOutput, String operatorId) {
+  PartialJoinOperatorSpec(PartialJoinFunction<K, M, JM, RM> partialJoinFn, MessageStreamImpl<RM> joinOutput, int opId) {
     this.joinOutput = joinOutput;
     this.transformFn = partialJoinFn;
-    this.operatorId = operatorId;
+    this.opId = opId;
   }
 
   @Override
-  public String toString() {
-    return this.operatorId;
-  }
-
-  @Override
-  public MessageStreamImpl<RM> getOutputStream() {
+  public MessageStreamImpl<RM> getNextStream() {
     return this.joinOutput;
   }
 
-  public BiFunction<M, JM, RM> getTransformFn() {
+  public PartialJoinFunction<K, M, JM, RM> getTransformFn() {
     return this.transformFn;
   }
+
+  public OperatorSpec.OpCode getOpCode() {
+    return OpCode.JOIN;
+  }
+
+  public int getOpId() {
+    return this.opId;
+  }
+
+  @Override public void init(Config config, TaskContext context) {
+    this.transformFn.init(config, context);
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/b3dd886d/samza-operator/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java b/samza-operator/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java
index 4348bc0..ba30d67 100644
--- a/samza-operator/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java
+++ b/samza-operator/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java
@@ -18,18 +18,30 @@
  */
 package org.apache.samza.operators.spec;
 
-import org.apache.samza.operators.data.MessageEnvelope;
-import org.apache.samza.operators.functions.SinkFunction;
+import org.apache.samza.config.Config;
 import org.apache.samza.operators.MessageStreamImpl;
+import org.apache.samza.operators.OutputStream;
+import org.apache.samza.operators.functions.SinkFunction;
+import org.apache.samza.task.TaskContext;
 
 
 /**
  * The spec for a sink operator that accepts user-defined logic to output a {@link MessageStreamImpl} to an external
  * system. This is a terminal operator and does allows further operator chaining.
  *
- * @param <M>  the type of input {@link MessageEnvelope}
+ * @param <M>  the type of input message
  */
-public class SinkOperatorSpec<M extends MessageEnvelope> implements OperatorSpec {
+public class SinkOperatorSpec<M> implements OperatorSpec {
+
+  /**
+   * {@link OpCode} for this {@link SinkOperatorSpec}
+   */
+  private final OperatorSpec.OpCode opCode;
+
+  /**
+   * The unique ID for this operator.
+   */
+  private final int opId;
 
   /**
    * The user-defined sink function
@@ -37,14 +49,40 @@ public class SinkOperatorSpec<M extends MessageEnvelope> implements OperatorSpec
   private final SinkFunction<M> sinkFn;
 
   /**
-   * Default constructor for a {@link SinkOperatorSpec}.
+   * Potential output stream defined by the {@link SinkFunction}
+   */
+  private final OutputStream<M> outStream;
+
+  /**
+   * Default constructor for a {@link SinkOperatorSpec} w/o an output stream. (e.g. output is sent to remote database)
+   *
+   * @param sinkFn  a user defined {@link SinkFunction} that will be called with the output message,
+   *                the output {@link org.apache.samza.task.MessageCollector} and the
+   *                {@link org.apache.samza.task.TaskCoordinator}.
+   * @param opCode  the specific {@link OpCode} for this {@link SinkOperatorSpec}. It could be {@link OpCode#SINK}, {@link OpCode#SEND_TO},
+   *                or {@link OpCode#PARTITION_BY}
+   * @param opId  the unique id of this {@link SinkOperatorSpec} in the {@link org.apache.samza.operators.StreamGraph}
+   */
+  SinkOperatorSpec(SinkFunction<M> sinkFn, OperatorSpec.OpCode opCode, int opId) {
+    this(sinkFn, opCode, opId, null);
+  }
+
+  /**
+   * Default constructor for a {@link SinkOperatorSpec} that sends the output to an {@link OutputStream}
    *
-   * @param sinkFn  a user defined {@link SinkFunction} that will be called with the output {@link MessageEnvelope},
+   * @param sinkFn  a user defined {@link SinkFunction} that will be called with the output message,
    *                the output {@link org.apache.samza.task.MessageCollector} and the
    *                {@link org.apache.samza.task.TaskCoordinator}.
+   * @param opCode  the specific {@link OpCode} for this {@link SinkOperatorSpec}. It could be {@link OpCode#SINK}, {@link OpCode#SEND_TO},
+   *                or {@link OpCode#PARTITION_BY}
+   * @param opId  the unique id of this {@link SinkOperatorSpec} in the {@link org.apache.samza.operators.StreamGraph}
+   * @param opId  the {@link OutputStream} for this {@link SinkOperatorSpec}
    */
-  SinkOperatorSpec(SinkFunction<M> sinkFn) {
+  SinkOperatorSpec(SinkFunction<M> sinkFn, OperatorSpec.OpCode opCode, int opId, OutputStream<M> outStream) {
     this.sinkFn = sinkFn;
+    this.opCode = opCode;
+    this.opId = opId;
+    this.outStream = outStream;
   }
 
   /**
@@ -52,11 +90,27 @@ public class SinkOperatorSpec<M extends MessageEnvelope> implements OperatorSpec
    * @return  null
    */
   @Override
-  public MessageStreamImpl getOutputStream() {
+  public MessageStreamImpl<M> getNextStream() {
     return null;
   }
 
   public SinkFunction<M> getSinkFn() {
     return this.sinkFn;
   }
+
+  public OperatorSpec.OpCode getOpCode() {
+    return this.opCode;
+  }
+
+  public int getOpId() {
+    return this.opId;
+  }
+
+  public OutputStream<M> getOutStream() {
+    return this.outStream;
+  }
+
+  @Override public void init(Config config, TaskContext context) {
+    this.sinkFn.init(config, context);
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/b3dd886d/samza-operator/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java b/samza-operator/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java
index ed18da4..d7813f7 100644
--- a/samza-operator/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java
+++ b/samza-operator/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java
@@ -18,50 +18,74 @@
  */
 package org.apache.samza.operators.spec;
 
-import org.apache.samza.operators.data.MessageEnvelope;
-import org.apache.samza.operators.functions.FlatMapFunction;
+import org.apache.samza.config.Config;
 import org.apache.samza.operators.MessageStreamImpl;
+import org.apache.samza.operators.functions.FlatMapFunction;
+import org.apache.samza.task.TaskContext;
 
 
 /**
- * The spec for a linear stream operator that outputs 0 or more {@link MessageEnvelope}s for each input {@link MessageEnvelope}.
+ * The spec for a linear stream operator that outputs 0 or more messages for each input message.
  *
- * @param <M>  the type of input {@link MessageEnvelope}
- * @param <OM>  the type of output {@link MessageEnvelope}
+ * @param <M>  the type of input message
+ * @param <OM>  the type of output message
  */
-public class StreamOperatorSpec<M extends MessageEnvelope, OM extends MessageEnvelope> implements OperatorSpec<OM> {
+public class StreamOperatorSpec<M, OM> implements OperatorSpec<OM> {
 
-  private final MessageStreamImpl<OM> outputStream;
+  /**
+   * {@link OpCode} for this {@link StreamOperatorSpec}
+   */
+  private final OperatorSpec.OpCode opCode;
 
-  private final FlatMapFunction<M, OM> transformFn;
+  /**
+   * The unique ID for this operator.
+   */
+  private final int opId;
 
   /**
-   * Default constructor for a {@link StreamOperatorSpec}.
-   *
-   * @param transformFn  the transformation function that transforms each input {@link MessageEnvelope} into a collection
-   *                     of output {@link MessageEnvelope}s
+   * The output {@link MessageStreamImpl} from this {@link StreamOperatorSpec}
    */
-  StreamOperatorSpec(FlatMapFunction<M, OM> transformFn) {
-    this(transformFn, new MessageStreamImpl<>());
-  }
+  private final MessageStreamImpl<OM> outputStream;
+
+  /**
+   * Transformation function applied in this {@link StreamOperatorSpec}
+   */
+  private final FlatMapFunction<M, OM> transformFn;
 
   /**
    * Constructor for a {@link StreamOperatorSpec} that accepts an output {@link MessageStreamImpl}.
    *
    * @param transformFn  the transformation function
    * @param outputStream  the output {@link MessageStreamImpl}
+   * @param opCode  the {@link OpCode} for this {@link StreamOperatorSpec}
+   * @param opId  the unique id for this {@link StreamOperatorSpec} in a {@link org.apache.samza.operators.StreamGraph}
    */
-  StreamOperatorSpec(FlatMapFunction<M, OM> transformFn, MessageStreamImpl<OM> outputStream) {
+  StreamOperatorSpec(FlatMapFunction<M, OM> transformFn, MessageStreamImpl outputStream, OperatorSpec.OpCode opCode, int opId) {
     this.outputStream = outputStream;
     this.transformFn = transformFn;
+    this.opCode = opCode;
+    this.opId = opId;
   }
 
   @Override
-  public MessageStreamImpl<OM> getOutputStream() {
+  public MessageStreamImpl<OM> getNextStream() {
     return this.outputStream;
   }
 
   public FlatMapFunction<M, OM> getTransformFn() {
     return this.transformFn;
   }
+
+  public OperatorSpec.OpCode getOpCode() {
+    return this.opCode;
+  }
+
+  public int getOpId() {
+    return this.opId;
+  }
+
+  @Override
+  public void init(Config config, TaskContext context) {
+    this.transformFn.init(config, context);
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/b3dd886d/samza-operator/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java b/samza-operator/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java
index cdc02a8..46417ed 100644
--- a/samza-operator/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java
+++ b/samza-operator/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java
@@ -19,29 +19,42 @@
 
 package org.apache.samza.operators.spec;
 
-import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.MessageStreamImpl;
-import org.apache.samza.operators.data.MessageEnvelope;
 import org.apache.samza.operators.windows.WindowPane;
 import org.apache.samza.operators.windows.internal.WindowInternal;
 
-public class WindowOperatorSpec<M extends MessageEnvelope, K, WK, WV, WM extends WindowPane<WK, WV>> implements OperatorSpec<WM> {
 
-  private final WindowInternal window;
+/**
+ * Default window operator spec object
+ *
+ * @param <M>  the type of input message to the window
+ * @param <WK>  the type of key of the window
+ * @param <WV>  the type of aggregated value in the window output {@link WindowPane}
+ */
+public class WindowOperatorSpec<M, WK, WV> implements OperatorSpec<WindowPane<WK, WV>> {
+
+  private final WindowInternal<M, WK, WV> window;
 
-  private final MessageStreamImpl<WM> outputStream;
+  private final MessageStreamImpl<WindowPane<WK, WV>> outputStream;
 
-  private final String operatorId;
+  private final int opId;
 
 
-  public WindowOperatorSpec(WindowInternal window, String operatorId) {
+  /**
+   * Constructor for {@link WindowOperatorSpec}.
+   *
+   * @param window  the window function
+   * @param outputStream  the output {@link MessageStreamImpl} from this {@link WindowOperatorSpec}
+   * @param opId  auto-generated unique ID of this operator
+   */
+  WindowOperatorSpec(WindowInternal<M, WK, WV> window, MessageStreamImpl<WindowPane<WK, WV>> outputStream, int opId) {
+    this.outputStream = outputStream;
     this.window = window;
-    this.outputStream = new MessageStreamImpl<>();
-    this.operatorId = operatorId;
+    this.opId = opId;
   }
 
   @Override
-  public MessageStream<WM> getOutputStream() {
+  public MessageStreamImpl<WindowPane<WK, WV>> getNextStream() {
     return this.outputStream;
   }
 
@@ -49,7 +62,11 @@ public class WindowOperatorSpec<M extends MessageEnvelope, K, WK, WV, WM extends
     return window;
   }
 
-  public String getOperatorId() {
-    return operatorId;
+  public OpCode getOpCode() {
+    return OpCode.WINDOW;
+  }
+
+  public int getOpId() {
+    return this.opId;
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/b3dd886d/samza-operator/src/main/java/org/apache/samza/operators/spec/WindowState.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/spec/WindowState.java b/samza-operator/src/main/java/org/apache/samza/operators/spec/WindowState.java
index e9af043..53bca2e 100644
--- a/samza-operator/src/main/java/org/apache/samza/operators/spec/WindowState.java
+++ b/samza-operator/src/main/java/org/apache/samza/operators/spec/WindowState.java
@@ -30,20 +30,16 @@ import org.apache.samza.annotation.InterfaceStability;
 @InterfaceStability.Unstable
 public interface WindowState<WV> {
   /**
-   * Method to get the system time when the first {@link org.apache.samza.operators.data.MessageEnvelope}
-   * in the window is received
+   * Method to get the system time when the first message in the window is received
    *
-   * @return  nano-second of system time for the first {@link org.apache.samza.operators.data.MessageEnvelope}
-   *          received in the window
+   * @return  nano-second of system time for the first message received in the window
    */
   long getFirstMessageTimeNs();
 
   /**
-   * Method to get the system time when the last {@link org.apache.samza.operators.data.MessageEnvelope}
-   * in the window is received
+   * Method to get the system time when the last message in the window is received
    *
-   * @return  nano-second of system time for the last {@link org.apache.samza.operators.data.MessageEnvelope}
-   *          received in the window
+   * @return  nano-second of system time for the last message received in the window
    */
   long getLastMessageTimeNs();
 
@@ -62,9 +58,9 @@ public interface WindowState<WV> {
   long getLatestEventTimeNs();
 
   /**
-   * Method to get the total number of {@link org.apache.samza.operators.data.MessageEnvelope}s received in the window
+   * Method to get the total number of messages received in the window
    *
-   * @return  number of {@link org.apache.samza.operators.data.MessageEnvelope}s in the window
+   * @return  number of messages in the window
    */
   long getNumberMessages();
 

http://git-wip-us.apache.org/repos/asf/samza/blob/b3dd886d/samza-operator/src/main/java/org/apache/samza/system/SingleJobExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/system/SingleJobExecutionEnvironment.java b/samza-operator/src/main/java/org/apache/samza/system/SingleJobExecutionEnvironment.java
new file mode 100644
index 0000000..60a4c60
--- /dev/null
+++ b/samza-operator/src/main/java/org/apache/samza/system/SingleJobExecutionEnvironment.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.system;
+
+import org.apache.samza.operators.StreamGraphFactory;
+import org.apache.samza.config.Config;
+
+/**
+ * This class implements the {@link ExecutionEnvironment} that runs the applications in YARN environment
+ */
+public class SingleJobExecutionEnvironment implements ExecutionEnvironment {
+
+  @Override public void run(StreamGraphFactory app, Config config) {
+    // TODO: add description of ProcessContext that is going to create a sub-DAG of the {@code graph}
+    // TODO: actually instantiate the tasks and run the job, i.e.
+    // 1. create all input/output/intermediate topics
+    // 2. create the single job configuration
+    // 3. execute JobRunner to submit the single job for the whole graph
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/b3dd886d/samza-operator/src/main/java/org/apache/samza/system/StandaloneExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/system/StandaloneExecutionEnvironment.java b/samza-operator/src/main/java/org/apache/samza/system/StandaloneExecutionEnvironment.java
new file mode 100644
index 0000000..f60ff82
--- /dev/null
+++ b/samza-operator/src/main/java/org/apache/samza/system/StandaloneExecutionEnvironment.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system;
+
+import org.apache.samza.operators.StreamGraphFactory;
+import org.apache.samza.config.Config;
+
+
+/**
+ * This class implements the {@link ExecutionEnvironment} that runs the applications in standalone environment
+ */
+public class StandaloneExecutionEnvironment implements ExecutionEnvironment {
+
+  @Override public void run(StreamGraphFactory app, Config config) {
+    // 1. get logic graph for optimization
+    // StreamGraph logicGraph = app.create(config);
+    // 2. potential optimization....
+    // 3. create new instance of StreamGraphFactory that would generate the optimized graph
+    // 4. create all input/output/intermediate topics
+    // 5. create the configuration for StreamProcessor
+    // 6. start the StreamProcessor w/ optimized instance of StreamGraphFactory
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/b3dd886d/samza-operator/src/main/java/org/apache/samza/task/StreamOperatorTask.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/task/StreamOperatorTask.java b/samza-operator/src/main/java/org/apache/samza/task/StreamOperatorTask.java
new file mode 100644
index 0000000..fa7ec5e
--- /dev/null
+++ b/samza-operator/src/main/java/org/apache/samza/task/StreamOperatorTask.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.task;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.ContextManager;
+import org.apache.samza.operators.MessageStreamImpl;
+import org.apache.samza.operators.StreamGraphFactory;
+import org.apache.samza.operators.StreamGraphImpl;
+import org.apache.samza.operators.data.InputMessageEnvelope;
+import org.apache.samza.operators.impl.OperatorGraph;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.SystemStream;
+
+import java.util.HashMap;
+import java.util.Map;
+
+
+/**
+ * Execution of the logic sub-DAG
+ *
+ *
+ * An {@link StreamTask} implementation that receives {@link InputMessageEnvelope}s and propagates them
+ * through the user's stream transformations defined in {@link StreamGraphImpl} using the
+ * {@link org.apache.samza.operators.MessageStream} APIs.
+ * <p>
+ * This class brings all the operator API implementation components together and feeds the
+ * {@link InputMessageEnvelope}s into the transformation chains.
+ * <p>
+ * It accepts an instance of the user implemented factory {@link StreamGraphFactory} as input parameter of the constructor.
+ * When its own {@link #init(Config, TaskContext)} method is called during startup, it instantiate a user-defined {@link StreamGraphImpl}
+ * from the {@link StreamGraphFactory}, calls {@link StreamGraphImpl#getContextManager()} to initialize the task-wide context
+ * for the graph, and creates a {@link MessageStreamImpl} corresponding to each of its input
+ * {@link org.apache.samza.system.SystemStreamPartition}s. Each input {@link MessageStreamImpl}
+ * will be corresponding to either an input stream or intermediate stream in {@link StreamGraphImpl}.
+ * <p>
+ * Then, this task calls {@link org.apache.samza.operators.impl.OperatorGraph#init(Map, Config, TaskContext)} for each of the input
+ * {@link MessageStreamImpl}. This instantiates the {@link org.apache.samza.operators.impl.OperatorImpl} DAG
+ * corresponding to the aforementioned {@link org.apache.samza.operators.spec.OperatorSpec} DAG and returns the
+ * root node of the DAG, which this class saves.
+ * <p>
+ * Now that it has the root for the DAG corresponding to each {@link org.apache.samza.system.SystemStreamPartition}, it
+ * can pass the message envelopes received in {@link StreamTask#process(IncomingMessageEnvelope, MessageCollector, TaskCoordinator)}
+ * along to the appropriate root nodes. From then on, each {@link org.apache.samza.operators.impl.OperatorImpl} propagates
+ * its transformed output to the next set of {@link org.apache.samza.operators.impl.OperatorImpl}s.
+ */
+public final class StreamOperatorTask implements StreamTask, InitableTask, WindowableTask, ClosableTask {
+
+  /**
+   * A mapping from each {@link SystemStream} to the root node of its operator chain DAG.
+   */
+  private final OperatorGraph operatorGraph = new OperatorGraph();
+
+  private final StreamGraphFactory graphFactory;
+
+  private ContextManager taskManager;
+
+  public StreamOperatorTask(StreamGraphFactory graphFactory) {
+    this.graphFactory = graphFactory;
+  }
+
+  @Override
+  public final void init(Config config, TaskContext context) throws Exception {
+    // create the MessageStreamsImpl object and initialize app-specific logic DAG within the task
+    StreamGraphImpl streams = (StreamGraphImpl) this.graphFactory.create(config);
+    this.taskManager = streams.getContextManager();
+
+    Map<SystemStream, MessageStreamImpl> inputBySystemStream = new HashMap<>();
+    context.getSystemStreamPartitions().forEach(ssp -> {
+        if (!inputBySystemStream.containsKey(ssp.getSystemStream())) {
+          inputBySystemStream.putIfAbsent(ssp.getSystemStream(), streams.getInputStream(ssp.getSystemStream()));
+        }
+      });
+    operatorGraph.init(inputBySystemStream, config, this.taskManager.initTaskContext(config, context));
+  }
+
+  @Override
+  public final void process(IncomingMessageEnvelope ime, MessageCollector collector, TaskCoordinator coordinator) {
+    this.operatorGraph.get(ime.getSystemStreamPartition().getSystemStream())
+        .onNext(new InputMessageEnvelope(ime), collector, coordinator);
+  }
+
+  @Override
+  public final void window(MessageCollector collector, TaskCoordinator coordinator) throws Exception {
+    // TODO: invoke timer based triggers
+  }
+
+  @Override
+  public void close() throws Exception {
+    this.taskManager.finalizeTaskContext();
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/b3dd886d/samza-operator/src/test/java/org/apache/samza/example/BroadcastGraph.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/example/BroadcastGraph.java b/samza-operator/src/test/java/org/apache/samza/example/BroadcastGraph.java
new file mode 100644
index 0000000..a91ce09
--- /dev/null
+++ b/samza-operator/src/test/java/org/apache/samza/example/BroadcastGraph.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.example;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.operators.StreamGraphFactory;
+import org.apache.samza.operators.StreamGraphImpl;
+import org.apache.samza.operators.StreamSpec;
+import org.apache.samza.operators.data.InputMessageEnvelope;
+import org.apache.samza.operators.data.JsonIncomingSystemMessageEnvelope;
+import org.apache.samza.operators.data.Offset;
+import org.apache.samza.operators.triggers.Triggers;
+import org.apache.samza.operators.windows.Windows;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamPartition;
+
+import java.time.Duration;
+import java.util.function.BiFunction;
+import java.util.Properties;
+import java.util.Set;
+
+
+/**
+ * Example implementation of split stream tasks
+ *
+ */
+public class BroadcastGraph implements StreamGraphFactory {
+
+  private final Set<SystemStreamPartition> inputs;
+
+  BroadcastGraph(Set<SystemStreamPartition> inputs) {
+    this.inputs = inputs;
+  }
+
+  class MessageType {
+    String field1;
+    String field2;
+    String field3;
+    String field4;
+    String parKey;
+    private long timestamp;
+
+    public long getTimestamp() {
+      return this.timestamp;
+    }
+  }
+
+  class JsonMessageEnvelope extends JsonIncomingSystemMessageEnvelope<MessageType> {
+    JsonMessageEnvelope(String key, MessageType data, Offset offset, SystemStreamPartition partition) {
+      super(key, data, offset, partition);
+    }
+  }
+
+  @Override
+  public StreamGraph create(Config config) {
+    StreamGraphImpl graph = new StreamGraphImpl();
+
+    BiFunction<JsonMessageEnvelope, Integer, Integer> sumAggregator = (m, c) -> c + 1;
+    inputs.forEach(entry -> {
+        MessageStream<JsonMessageEnvelope> inputStream = graph.<Object, Object, InputMessageEnvelope>createInStream(new StreamSpec() {
+          @Override public SystemStream getSystemStream() {
+            return entry.getSystemStream();
+          }
+
+          @Override public Properties getProperties() {
+            return null;
+          }
+        }, null, null).
+            map(this::getInputMessage);
+
+        inputStream.filter(this::myFilter1).window(Windows.tumblingWindow(Duration.ofMillis(100), sumAggregator)
+            .setLateTrigger(Triggers.any(Triggers.count(30000), Triggers.timeSinceFirstMessage(Duration.ofMillis(10)))));
+
+        inputStream.filter(this::myFilter2).window(Windows.tumblingWindow(Duration.ofMillis(100), sumAggregator)
+            .setLateTrigger(Triggers.any(Triggers.count(30000), Triggers.timeSinceFirstMessage(Duration.ofMillis(10)))));
+
+        inputStream.filter(this::myFilter3).window(Windows.tumblingWindow(Duration.ofMillis(100), sumAggregator)
+            .setLateTrigger(Triggers.any(Triggers.count(30000), Triggers.timeSinceFirstMessage(Duration.ofMillis(10)))));
+
+      });
+    return graph;
+  }
+
+  JsonMessageEnvelope getInputMessage(InputMessageEnvelope m1) {
+    return (JsonMessageEnvelope) m1.getMessage();
+  }
+
+  boolean myFilter1(JsonMessageEnvelope m1) {
+    // Do user defined processing here
+    return m1.getMessage().parKey.equals("key1");
+  }
+
+  boolean myFilter2(JsonMessageEnvelope m1) {
+    // Do user defined processing here
+    return m1.getMessage().parKey.equals("key2");
+  }
+
+  boolean myFilter3(JsonMessageEnvelope m1) {
+    return m1.getMessage().parKey.equals("key3");
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/b3dd886d/samza-operator/src/test/java/org/apache/samza/example/JoinGraph.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/example/JoinGraph.java b/samza-operator/src/test/java/org/apache/samza/example/JoinGraph.java
new file mode 100644
index 0000000..2313f63
--- /dev/null
+++ b/samza-operator/src/test/java/org/apache/samza/example/JoinGraph.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.example;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.operators.StreamGraphFactory;
+import org.apache.samza.operators.StreamGraphImpl;
+import org.apache.samza.operators.StreamSpec;
+import org.apache.samza.operators.data.InputMessageEnvelope;
+import org.apache.samza.operators.data.JsonIncomingSystemMessageEnvelope;
+import org.apache.samza.operators.data.Offset;
+import org.apache.samza.operators.functions.KeyValueJoinFunction;
+import org.apache.samza.serializers.JsonSerde;
+import org.apache.samza.serializers.StringSerde;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamPartition;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+
+
+/**
+ * Example implementation of unique key-based stream-stream join tasks
+ *
+ */
+public class JoinGraph implements StreamGraphFactory {
+  private final Set<SystemStreamPartition> inputs;
+
+  JoinGraph(Set<SystemStreamPartition> inputs) {
+    this.inputs = inputs;
+  }
+
+  class MessageType {
+    String joinKey;
+    List<String> joinFields = new ArrayList<>();
+  }
+
+  class JsonMessageEnvelope extends JsonIncomingSystemMessageEnvelope<MessageType> {
+    JsonMessageEnvelope(String key, MessageType data, Offset offset, SystemStreamPartition partition) {
+      super(key, data, offset, partition);
+    }
+  }
+
+  MessageStream<JsonMessageEnvelope> joinOutput = null;
+
+  @Override
+  public StreamGraph create(Config config) {
+    StreamGraphImpl graph = new StreamGraphImpl();
+
+    for (SystemStreamPartition input : inputs) {
+      MessageStream<JsonMessageEnvelope> newSource = graph.<Object, Object, InputMessageEnvelope>createInStream(
+          new StreamSpec() {
+            @Override public SystemStream getSystemStream() {
+              return input.getSystemStream();
+            }
+
+            @Override public Properties getProperties() {
+              return null;
+            }
+          }, null, null).map(this::getInputMessage);
+      if (joinOutput == null) {
+        joinOutput = newSource;
+      } else {
+        joinOutput = joinOutput.join(newSource,
+            (KeyValueJoinFunction<String, JsonMessageEnvelope, JsonMessageEnvelope, JsonMessageEnvelope>) this::myJoinResult);
+      }
+    }
+
+    joinOutput.sendTo(graph.createOutStream(new StreamSpec() {
+      @Override public SystemStream getSystemStream() {
+        return null;
+      }
+
+      @Override public Properties getProperties() {
+        return null;
+      }
+    }, new StringSerde("UTF-8"), new JsonSerde<>()));
+
+    return graph;
+  }
+
+  private JsonMessageEnvelope getInputMessage(InputMessageEnvelope ism) {
+    return new JsonMessageEnvelope(
+        ((MessageType) ism.getMessage()).joinKey,
+        (MessageType) ism.getMessage(),
+        ism.getOffset(),
+        ism.getSystemStreamPartition());
+  }
+
+  JsonMessageEnvelope myJoinResult(JsonMessageEnvelope m1, JsonMessageEnvelope m2) {
+    MessageType newJoinMsg = new MessageType();
+    newJoinMsg.joinKey = m1.getKey();
+    newJoinMsg.joinFields.addAll(m1.getMessage().joinFields);
+    newJoinMsg.joinFields.addAll(m2.getMessage().joinFields);
+    return new JsonMessageEnvelope(m1.getMessage().joinKey, newJoinMsg, null, null);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/b3dd886d/samza-operator/src/test/java/org/apache/samza/example/KeyValueStoreExample.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/example/KeyValueStoreExample.java b/samza-operator/src/test/java/org/apache/samza/example/KeyValueStoreExample.java
new file mode 100644
index 0000000..ad6336a
--- /dev/null
+++ b/samza-operator/src/test/java/org/apache/samza/example/KeyValueStoreExample.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.example;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.OutputStream;
+import org.apache.samza.operators.StreamGraphFactory;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.operators.StreamSpec;
+import org.apache.samza.operators.data.MessageEnvelope;
+import org.apache.samza.operators.functions.FlatMapFunction;
+import org.apache.samza.serializers.JsonSerde;
+import org.apache.samza.serializers.StringSerde;
+import org.apache.samza.storage.kv.KeyValueStore;
+import org.apache.samza.system.ExecutionEnvironment;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.task.TaskContext;
+import org.apache.samza.util.CommandLine;
+
+import java.util.Properties;
+
+
+/**
+ * Example code using {@link KeyValueStore} to implement event-time window
+ */
+public class KeyValueStoreExample implements StreamGraphFactory {
+
+  /**
+   * used by remote execution environment to launch the job in remote program. The remote program should follow the similar
+   * invoking context as in standalone:
+   *
+   *   public static void main(String args[]) throws Exception {
+   *     CommandLine cmdLine = new CommandLine();
+   *     Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
+   *     ExecutionEnvironment remoteEnv = ExecutionEnvironment.getRemoteEnvironment(config);
+   *     UserMainExample runnableApp = new UserMainExample();
+   *     runnableApp.run(remoteEnv, config);
+   *   }
+   *
+   */
+  @Override public StreamGraph create(Config config) {
+    StreamGraph graph = StreamGraph.fromConfig(config);
+
+    MessageStream<PageViewEvent> pageViewEvents = graph.createInStream(input1, new StringSerde("UTF-8"), new JsonSerde<>());
+    OutputStream<StatsOutput> pageViewPerMemberCounters = graph.createOutStream(output, new StringSerde("UTF-8"), new JsonSerde<StatsOutput>());
+
+    pageViewEvents.
+        partitionBy(m -> m.getMessage().memberId).
+        flatMap(new MyStatsCounter()).
+        sendTo(pageViewPerMemberCounters);
+
+    return graph;
+  }
+
+  // standalone local program model
+  public static void main(String[] args) throws Exception {
+    CommandLine cmdLine = new CommandLine();
+    Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
+    ExecutionEnvironment standaloneEnv = ExecutionEnvironment.getLocalEnvironment(config);
+    standaloneEnv.run(new KeyValueStoreExample(), config);
+  }
+
+  class MyStatsCounter implements FlatMapFunction<PageViewEvent, StatsOutput> {
+    private final int timeoutMs = 10 * 60 * 1000;
+
+    KeyValueStore<String, StatsWindowState> statsStore;
+
+    class StatsWindowState {
+      int lastCount = 0;
+      long timeAtLastOutput = 0;
+      int newCount = 0;
+    }
+
+    @Override
+    public Collection<StatsOutput> apply(PageViewEvent message) {
+      List<StatsOutput> outputStats = new ArrayList<>();
+      long wndTimestamp = (long) Math.floor(TimeUnit.MILLISECONDS.toMinutes(message.getMessage().timestamp) / 5) * 5;
+      String wndKey = String.format("%s-%d", message.getMessage().memberId, wndTimestamp);
+      StatsWindowState curState = this.statsStore.get(wndKey);
+      curState.newCount++;
+      long curTimeMs = System.currentTimeMillis();
+      if (curState.newCount > 0 && curState.timeAtLastOutput + timeoutMs < curTimeMs) {
+        curState.timeAtLastOutput = curTimeMs;
+        curState.lastCount += curState.newCount;
+        curState.newCount = 0;
+        outputStats.add(new StatsOutput(message.getMessage().memberId, wndTimestamp, curState.lastCount));
+      }
+      // update counter w/o generating output
+      this.statsStore.put(wndKey, curState);
+      return outputStats;
+    }
+
+    @Override
+    public void init(Config config, TaskContext context) {
+      this.statsStore = (KeyValueStore<String, StatsWindowState>) context.getStore("my-stats-wnd-store");
+    }
+  }
+
+  StreamSpec input1 = new StreamSpec() {
+    @Override public SystemStream getSystemStream() {
+      return new SystemStream("kafka", "PageViewEvent");
+    }
+
+    @Override public Properties getProperties() {
+      return null;
+    }
+  };
+
+  StreamSpec output = new StreamSpec() {
+    @Override public SystemStream getSystemStream() {
+      return new SystemStream("kafka", "PageViewPerMember5min");
+    }
+
+    @Override public Properties getProperties() {
+      return null;
+    }
+  };
+
+  class PageViewEvent implements MessageEnvelope<String, PageViewEvent> {
+    String pageId;
+    String memberId;
+    long timestamp;
+
+    PageViewEvent(String pageId, String memberId, long timestamp) {
+      this.pageId = pageId;
+      this.memberId = memberId;
+      this.timestamp = timestamp;
+    }
+
+    @Override
+    public String getKey() {
+      return this.pageId;
+    }
+
+    @Override
+    public PageViewEvent getMessage() {
+      return this;
+    }
+  }
+
+  class StatsOutput implements MessageEnvelope<String, StatsOutput> {
+    private String memberId;
+    private long timestamp;
+    private Integer count;
+
+    StatsOutput(String key, long timestamp, Integer count) {
+      this.memberId = key;
+      this.timestamp = timestamp;
+      this.count = count;
+    }
+
+    @Override
+    public String getKey() {
+      return this.memberId;
+    }
+
+    @Override
+    public StatsOutput getMessage() {
+      return this;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/b3dd886d/samza-operator/src/test/java/org/apache/samza/example/NoContextStreamExample.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/example/NoContextStreamExample.java b/samza-operator/src/test/java/org/apache/samza/example/NoContextStreamExample.java
new file mode 100644
index 0000000..577d06f
--- /dev/null
+++ b/samza-operator/src/test/java/org/apache/samza/example/NoContextStreamExample.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.example;
+
+import org.apache.samza.operators.OutputStream;
+import org.apache.samza.operators.StreamGraphFactory;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.operators.StreamSpec;
+import org.apache.samza.operators.data.InputMessageEnvelope;
+import org.apache.samza.operators.data.JsonIncomingSystemMessageEnvelope;
+import org.apache.samza.operators.data.Offset;
+import org.apache.samza.operators.functions.JoinFunction;
+import org.apache.samza.serializers.JsonSerde;
+import org.apache.samza.serializers.StringSerde;
+import org.apache.samza.system.ExecutionEnvironment;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.CommandLine;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+
+/**
+ * Example {@link StreamGraphFactory} code to test the API methods
+ */
+public class NoContextStreamExample implements StreamGraphFactory {
+
+  StreamSpec input1 = new StreamSpec() {
+    @Override public SystemStream getSystemStream() {
+      return new SystemStream("kafka", "input1");
+    }
+
+    @Override public Properties getProperties() {
+      return null;
+    }
+  };
+
+  StreamSpec input2 = new StreamSpec() {
+    @Override public SystemStream getSystemStream() {
+      return new SystemStream("kafka", "input2");
+    }
+
+    @Override public Properties getProperties() {
+      return null;
+    }
+  };
+
+  StreamSpec output = new StreamSpec() {
+    @Override public SystemStream getSystemStream() {
+      return new SystemStream("kafka", "output");
+    }
+
+    @Override public Properties getProperties() {
+      return null;
+    }
+  };
+
+  class MessageType {
+    String joinKey;
+    List<String> joinFields = new ArrayList<>();
+  }
+
+  class JsonMessageEnvelope extends JsonIncomingSystemMessageEnvelope<MessageType> {
+    JsonMessageEnvelope(String key, MessageType data, Offset offset, SystemStreamPartition partition) {
+      super(key, data, offset, partition);
+    }
+  }
+
+  private JsonMessageEnvelope getInputMessage(InputMessageEnvelope ism) {
+    return new JsonMessageEnvelope(
+        ((MessageType) ism.getMessage()).joinKey,
+        (MessageType) ism.getMessage(),
+        ism.getOffset(),
+        ism.getSystemStreamPartition());
+  }
+
+  class MyJoinFunction implements JoinFunction<String, JsonMessageEnvelope, JsonMessageEnvelope, JsonIncomingSystemMessageEnvelope<MessageType>> {
+
+    @Override
+    public JsonIncomingSystemMessageEnvelope<MessageType> apply(JsonMessageEnvelope m1,
+        JsonMessageEnvelope m2) {
+      MessageType newJoinMsg = new MessageType();
+      newJoinMsg.joinKey = m1.getKey();
+      newJoinMsg.joinFields.addAll(m1.getMessage().joinFields);
+      newJoinMsg.joinFields.addAll(m2.getMessage().joinFields);
+      return new JsonMessageEnvelope(m1.getMessage().joinKey, newJoinMsg, null, null);
+    }
+
+    @Override
+    public String getFirstKey(JsonMessageEnvelope message) {
+      return message.getKey();
+    }
+
+    @Override
+    public String getSecondKey(JsonMessageEnvelope message) {
+      return message.getKey();
+    }
+  }
+
+  /**
+   * used by remote execution environment to launch the job in remote program. The remote program should follow the similar
+   * invoking context as in standalone:
+   *
+   *   public static void main(String args[]) throws Exception {
+   *     CommandLine cmdLine = new CommandLine();
+   *     Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
+   *     ExecutionEnvironment remoteEnv = ExecutionEnvironment.fromConfig(config);
+   *     remoteEnv.run(new NoContextStreamExample(), config);
+   *   }
+   *
+   */
+  @Override public StreamGraph create(Config config) {
+    StreamGraph graph = StreamGraph.fromConfig(config);
+    MessageStream<InputMessageEnvelope> inputSource1 = graph.<Object, Object, InputMessageEnvelope>createInStream(
+        input1, null, null);
+    MessageStream<InputMessageEnvelope> inputSource2 = graph.<Object, Object, InputMessageEnvelope>createInStream(
+        input2, null, null);
+    OutputStream<JsonIncomingSystemMessageEnvelope<MessageType>> outStream = graph.createOutStream(output,
+        new StringSerde("UTF-8"), new JsonSerde<>());
+
+    inputSource1.map(this::getInputMessage).
+        join(inputSource2.map(this::getInputMessage), new MyJoinFunction()).
+        sendTo(outStream);
+
+    return graph;
+  }
+
+  // standalone local program model
+  public static void main(String[] args) throws Exception {
+    CommandLine cmdLine = new CommandLine();
+    Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
+    ExecutionEnvironment standaloneEnv = ExecutionEnvironment.getLocalEnvironment(config);
+    standaloneEnv.run(new NoContextStreamExample(), config);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/b3dd886d/samza-operator/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java b/samza-operator/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java
new file mode 100644
index 0000000..ad433b6
--- /dev/null
+++ b/samza-operator/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.example;
+
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.OutputStream;
+import org.apache.samza.operators.StreamGraphFactory;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.operators.StreamSpec;
+import org.apache.samza.operators.data.MessageEnvelope;
+import org.apache.samza.operators.functions.JoinFunction;
+import org.apache.samza.serializers.JsonSerde;
+import org.apache.samza.serializers.StringSerde;
+import org.apache.samza.system.ExecutionEnvironment;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.util.CommandLine;
+
+import java.util.Properties;
+
+
+/**
+ * Simple 2-way stream-to-stream join example
+ */
+public class OrderShipmentJoinExample implements StreamGraphFactory {
+
+  /**
+   * used by remote execution environment to launch the job in remote program. The remote program should follow the similar
+   * invoking context as in standalone:
+   *
+   *   public static void main(String args[]) throws Exception {
+   *     CommandLine cmdLine = new CommandLine();
+   *     Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
+   *     ExecutionEnvironment remoteEnv = ExecutionEnvironment.getRemoteEnvironment(config);
+   *     UserMainExample runnableApp = new UserMainExample();
+   *     runnableApp.run(remoteEnv, config);
+   *   }
+   *
+   */
+  @Override public StreamGraph create(Config config) {
+    StreamGraph graph = StreamGraph.fromConfig(config);
+
+    MessageStream<OrderRecord> orders = graph.createInStream(input1, new StringSerde("UTF-8"), new JsonSerde<>());
+    MessageStream<ShipmentRecord> shipments = graph.createInStream(input2, new StringSerde("UTF-8"), new JsonSerde<>());
+    OutputStream<FulFilledOrderRecord> fulfilledOrders = graph.createOutStream(output, new StringSerde("UTF-8"), new JsonSerde<>());
+
+    orders.join(shipments, new MyJoinFunction()).sendTo(fulfilledOrders);
+
+    return graph;
+  }
+
+  // standalone local program model
+  public static void main(String[] args) throws Exception {
+    CommandLine cmdLine = new CommandLine();
+    Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
+    ExecutionEnvironment standaloneEnv = ExecutionEnvironment.getLocalEnvironment(config);
+    standaloneEnv.run(new OrderShipmentJoinExample(), config);
+  }
+
+  StreamSpec input1 = new StreamSpec() {
+    @Override public SystemStream getSystemStream() {
+      return new SystemStream("kafka", "Orders");
+    }
+
+    @Override public Properties getProperties() {
+      return null;
+    }
+  };
+
+  StreamSpec input2 = new StreamSpec() {
+    @Override public SystemStream getSystemStream() {
+      return new SystemStream("kafka", "Shipment");
+    }
+
+    @Override public Properties getProperties() {
+      return null;
+    }
+  };
+
+  StreamSpec output = new StreamSpec() {
+    @Override public SystemStream getSystemStream() {
+      return new SystemStream("kafka", "FulfilledOrders");
+    }
+
+    @Override public Properties getProperties() {
+      return null;
+    }
+  };
+
+  class OrderRecord implements MessageEnvelope<String, OrderRecord> {
+    String orderId;
+    long orderTimeMs;
+
+    OrderRecord(String orderId, long timeMs) {
+      this.orderId = orderId;
+      this.orderTimeMs = timeMs;
+    }
+
+    @Override
+    public String getKey() {
+      return this.orderId;
+    }
+
+    @Override
+    public OrderRecord getMessage() {
+      return this;
+    }
+  }
+
+  class ShipmentRecord implements MessageEnvelope<String, ShipmentRecord> {
+    String orderId;
+    long shipTimeMs;
+
+    ShipmentRecord(String orderId, long timeMs) {
+      this.orderId = orderId;
+      this.shipTimeMs = timeMs;
+    }
+
+    @Override
+    public String getKey() {
+      return this.orderId;
+    }
+
+    @Override
+    public ShipmentRecord getMessage() {
+      return this;
+    }
+  }
+
+  class FulFilledOrderRecord implements MessageEnvelope<String, FulFilledOrderRecord> {
+    String orderId;
+    long orderTimeMs;
+    long shipTimeMs;
+
+    FulFilledOrderRecord(String orderId, long orderTimeMs, long shipTimeMs) {
+      this.orderId = orderId;
+      this.orderTimeMs = orderTimeMs;
+      this.shipTimeMs = shipTimeMs;
+    }
+
+
+    @Override
+    public String getKey() {
+      return this.orderId;
+    }
+
+    @Override
+    public FulFilledOrderRecord getMessage() {
+      return this;
+    }
+  }
+
+  FulFilledOrderRecord myJoinResult(OrderRecord m1, ShipmentRecord m2) {
+    return new FulFilledOrderRecord(m1.getKey(), m1.orderTimeMs, m2.shipTimeMs);
+  }
+
+  class MyJoinFunction implements JoinFunction<String, OrderRecord, ShipmentRecord, FulFilledOrderRecord> {
+
+    @Override
+    public FulFilledOrderRecord apply(OrderRecord message, ShipmentRecord otherMessage) {
+      return OrderShipmentJoinExample.this.myJoinResult(message, otherMessage);
+    }
+
+    @Override
+    public String getFirstKey(OrderRecord message) {
+      return message.getKey();
+    }
+
+    @Override
+    public String getSecondKey(ShipmentRecord message) {
+      return message.getKey();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/b3dd886d/samza-operator/src/test/java/org/apache/samza/example/PageViewCounterExample.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/example/PageViewCounterExample.java b/samza-operator/src/test/java/org/apache/samza/example/PageViewCounterExample.java
new file mode 100644
index 0000000..1502aa2
--- /dev/null
+++ b/samza-operator/src/test/java/org/apache/samza/example/PageViewCounterExample.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.example;
+
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.OutputStream;
+import org.apache.samza.operators.StreamGraphFactory;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.operators.StreamSpec;
+import org.apache.samza.operators.data.MessageEnvelope;
+import org.apache.samza.operators.triggers.Triggers;
+import org.apache.samza.operators.windows.AccumulationMode;
+import org.apache.samza.operators.windows.WindowPane;
+import org.apache.samza.operators.windows.Windows;
+import org.apache.samza.serializers.JsonSerde;
+import org.apache.samza.serializers.StringSerde;
+import org.apache.samza.system.ExecutionEnvironment;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.util.CommandLine;
+
+import java.time.Duration;
+import java.util.Properties;
+
+
+/**
+ * Example code to implement window-based counter
+ */
+public class PageViewCounterExample implements StreamGraphFactory {
+
+  @Override public StreamGraph create(Config config) {
+    StreamGraph graph = StreamGraph.fromConfig(config);
+
+    MessageStream<PageViewEvent> pageViewEvents = graph.createInStream(input1, new StringSerde("UTF-8"), new JsonSerde<>());
+    OutputStream<MyStreamOutput> pageViewPerMemberCounters = graph.createOutStream(output, new StringSerde("UTF-8"), new JsonSerde<>());
+
+    pageViewEvents.
+        window(Windows.<PageViewEvent, String, Integer>keyedTumblingWindow(m -> m.getMessage().memberId, Duration.ofSeconds(10), (m, c) -> c + 1).
+            setEarlyTrigger(Triggers.repeat(Triggers.count(5))).
+            setAccumulationMode(AccumulationMode.DISCARDING)).
+        map(MyStreamOutput::new).
+        sendTo(pageViewPerMemberCounters);
+    return graph;
+  }
+
+  public static void main(String[] args) {
+    CommandLine cmdLine = new CommandLine();
+    Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
+    ExecutionEnvironment standaloneEnv = ExecutionEnvironment.getLocalEnvironment(config);
+    standaloneEnv.run(new PageViewCounterExample(), config);
+  }
+
+  StreamSpec input1 = new StreamSpec() {
+    @Override public SystemStream getSystemStream() {
+      return new SystemStream("kafka", "PageViewEvent");
+    }
+
+    @Override public Properties getProperties() {
+      return null;
+    }
+  };
+
+  StreamSpec output = new StreamSpec() {
+    @Override public SystemStream getSystemStream() {
+      return new SystemStream("kafka", "PageViewPerMember5min");
+    }
+
+    @Override public Properties getProperties() {
+      return null;
+    }
+  };
+
+  class PageViewEvent implements MessageEnvelope<String, PageViewEvent> {
+    String pageId;
+    String memberId;
+    long timestamp;
+
+    PageViewEvent(String pageId, String memberId, long timestamp) {
+      this.pageId = pageId;
+      this.memberId = memberId;
+      this.timestamp = timestamp;
+    }
+
+    @Override
+    public String getKey() {
+      return this.pageId;
+    }
+
+    @Override
+    public PageViewEvent getMessage() {
+      return this;
+    }
+  }
+
+  class MyStreamOutput implements MessageEnvelope<String, MyStreamOutput> {
+    String memberId;
+    long timestamp;
+    int count;
+
+    MyStreamOutput(WindowPane<String, Integer> m) {
+      this.memberId = m.getKey().getKey();
+      this.timestamp = Long.valueOf(m.getKey().getPaneId());
+      this.count = m.getMessage();
+    }
+
+    @Override
+    public String getKey() {
+      return this.memberId;
+    }
+
+    @Override
+    public MyStreamOutput getMessage() {
+      return this;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/b3dd886d/samza-operator/src/test/java/org/apache/samza/example/RepartitionExample.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/example/RepartitionExample.java b/samza-operator/src/test/java/org/apache/samza/example/RepartitionExample.java
new file mode 100644
index 0000000..f15e514
--- /dev/null
+++ b/samza-operator/src/test/java/org/apache/samza/example/RepartitionExample.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.example;
+
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.OutputStream;
+import org.apache.samza.operators.StreamGraphFactory;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.operators.StreamSpec;
+import org.apache.samza.operators.data.MessageEnvelope;
+import org.apache.samza.operators.windows.WindowPane;
+import org.apache.samza.operators.windows.Windows;
+import org.apache.samza.serializers.JsonSerde;
+import org.apache.samza.serializers.StringSerde;
+import org.apache.samza.system.ExecutionEnvironment;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.util.CommandLine;
+
+import java.time.Duration;
+import java.util.*;
+
+
+/**
+ * Example {@link StreamGraphFactory} code to test the API methods with re-partition operator
+ */
+public class RepartitionExample implements StreamGraphFactory {
+
+  /**
+   * used by remote execution environment to launch the job in remote program. The remote program should follow the similar
+   * invoking context as in standalone:
+   *
+   *   public static void main(String args[]) throws Exception {
+   *     CommandLine cmdLine = new CommandLine();
+   *     Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
+   *     ExecutionEnvironment remoteEnv = ExecutionEnvironment.getRemoteEnvironment(config);
+   *     remoteEnv.run(new UserMainExample(), config);
+   *   }
+   *
+   */
+  @Override public StreamGraph create(Config config) {
+    StreamGraph graph = StreamGraph.fromConfig(config);
+
+    MessageStream<PageViewEvent> pageViewEvents = graph.createInStream(input1, new StringSerde("UTF-8"), new JsonSerde<>());
+    OutputStream<MyStreamOutput> pageViewPerMemberCounters = graph.createOutStream(output, new StringSerde("UTF-8"), new JsonSerde<>());
+
+    pageViewEvents.
+        partitionBy(m -> m.getMessage().memberId).
+        window(Windows.<PageViewEvent, String, Integer>keyedTumblingWindow(
+            msg -> msg.getMessage().memberId, Duration.ofMinutes(5), (m, c) -> c + 1)).
+        map(MyStreamOutput::new).
+        sendTo(pageViewPerMemberCounters);
+
+    return graph;
+  }
+
+  // standalone local program model
+  public static void main(String[] args) throws Exception {
+    CommandLine cmdLine = new CommandLine();
+    Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
+    ExecutionEnvironment standaloneEnv = ExecutionEnvironment.getLocalEnvironment(config);
+    standaloneEnv.run(new RepartitionExample(), config);
+  }
+
+  StreamSpec input1 = new StreamSpec() {
+    @Override public SystemStream getSystemStream() {
+      return new SystemStream("kafka", "PageViewEvent");
+    }
+
+    @Override public Properties getProperties() {
+      return null;
+    }
+  };
+
+  StreamSpec output = new StreamSpec() {
+    @Override public SystemStream getSystemStream() {
+      return new SystemStream("kafka", "PageViewPerMember5min");
+    }
+
+    @Override public Properties getProperties() {
+      return null;
+    }
+  };
+
+  class PageViewEvent implements MessageEnvelope<String, PageViewEvent> {
+    String pageId;
+    String memberId;
+    long timestamp;
+
+    PageViewEvent(String pageId, String memberId, long timestamp) {
+      this.pageId = pageId;
+      this.memberId = memberId;
+      this.timestamp = timestamp;
+    }
+
+    @Override
+    public String getKey() {
+      return this.pageId;
+    }
+
+    @Override
+    public PageViewEvent getMessage() {
+      return this;
+    }
+  }
+
+  class MyStreamOutput implements MessageEnvelope<String, MyStreamOutput> {
+    String memberId;
+    long timestamp;
+    int count;
+
+    MyStreamOutput(WindowPane<String, Integer> m) {
+      this.memberId = m.getKey().getKey();
+      this.timestamp = Long.valueOf(m.getKey().getPaneId());
+      this.count = m.getMessage();
+    }
+
+    @Override
+    public String getKey() {
+      return this.memberId;
+    }
+
+    @Override
+    public MyStreamOutput getMessage() {
+      return this;
+    }
+  }
+
+}