You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2017/02/16 18:40:46 UTC

[03/14] samza git commit: SAMZA-1073: Addressing review feedbacks. Change StreamGraphFactory to StreamGraphBuilder.

SAMZA-1073: Addressing review feedbacks. Change StreamGraphFactory to StreamGraphBuilder.


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/4bde68b4
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/4bde68b4
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/4bde68b4

Branch: refs/heads/samza-fluent-api-v1
Commit: 4bde68b4a542ac465e4f536bc548c2d6c4366fae
Parents: b3dd886
Author: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Authored: Tue Feb 14 01:33:23 2017 -0800
Committer: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Committed: Wed Feb 15 15:13:09 2017 -0800

----------------------------------------------------------------------
 .../apache/samza/operators/MessageStream.java   |  16 +--
 .../org/apache/samza/operators/StreamGraph.java |  24 ----
 .../samza/operators/StreamGraphBuilder.java     |  38 ++++++
 .../samza/operators/StreamGraphFactory.java     |  38 ------
 .../operators/functions/FilterFunction.java     |  12 +-
 .../operators/functions/FlatMapFunction.java    |  12 +-
 .../samza/operators/functions/InitFunction.java |  38 ------
 .../operators/functions/InitableFunction.java   |  40 ++++++
 .../samza/operators/functions/JoinFunction.java |  11 +-
 .../functions/KeyValueJoinFunction.java         |  44 -------
 .../samza/operators/functions/MapFunction.java  |  12 +-
 .../samza/operators/functions/SinkFunction.java |  14 +-
 .../apache/samza/operators/windows/Window.java  |   2 +-
 .../samza/system/ExecutionEnvironment.java      |   6 +-
 .../java/org/apache/samza/task/TaskContext.java |   2 +-
 .../samza/operators/windows/TestWindowPane.java |   2 +-
 .../samza/operators/MessageStreamImpl.java      |   5 -
 .../functions/PartialJoinFunction.java          |  11 +-
 .../system/RemoteExecutionEnvironment.java      |  37 ++++++
 .../system/SingleJobExecutionEnvironment.java   |  37 ------
 .../system/StandaloneExecutionEnvironment.java  |  19 ++-
 .../apache/samza/task/StreamOperatorTask.java   |  25 ++--
 .../apache/samza/example/BroadcastGraph.java    | 121 -----------------
 .../org/apache/samza/example/JoinGraph.java     | 118 -----------------
 .../samza/example/KeyValueStoreExample.java     |  14 +-
 .../samza/example/NoContextStreamExample.java   |  15 +--
 .../samza/example/OrderShipmentJoinExample.java |   8 +-
 .../samza/example/PageViewCounterExample.java   |  14 +-
 .../samza/example/RepartitionExample.java       |  15 +--
 .../samza/example/TestBasicStreamGraphs.java    |  99 ++++++++++++++
 .../samza/example/TestBroadcastExample.java     | 113 ++++++++++++++++
 .../apache/samza/example/TestExampleBase.java   |  46 +++++++
 .../samza/example/TestFluentStreamTasks.java    |  99 --------------
 .../apache/samza/example/TestJoinExample.java   | 129 +++++++++++++++++++
 .../apache/samza/example/TestWindowExample.java |  81 ++++++++++++
 .../org/apache/samza/example/WindowGraph.java   |  87 -------------
 .../samza/operators/TestMessageStreamImpl.java  |  19 ++-
 .../samza/operators/impl/TestOperatorImpls.java |  21 ++-
 38 files changed, 687 insertions(+), 757 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/4bde68b4/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java b/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java
index 87a9fd3..adeb4c8 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java
@@ -42,7 +42,7 @@ import java.util.function.Function;
 public interface MessageStream<M> {
 
   /**
-   * Applies the provided 1:1 {@link Function} to messages in this {@link MessageStream} and returns the
+   * Applies the provided 1:1 function to messages in this {@link MessageStream} and returns the
    * transformed {@link MessageStream}.
    *
    * @param mapFn the function to transform a message to another message
@@ -52,7 +52,7 @@ public interface MessageStream<M> {
   <TM> MessageStream<TM> map(MapFunction<M, TM> mapFn);
 
   /**
-   * Applies the provided 1:n {@link Function} to transform a message in this {@link MessageStream}
+   * Applies the provided 1:n function to transform a message in this {@link MessageStream}
    * to n messages in the transformed {@link MessageStream}
    *
    * @param flatMapFn the function to transform a message to zero or more messages
@@ -62,7 +62,7 @@ public interface MessageStream<M> {
   <TM> MessageStream<TM> flatMap(FlatMapFunction<M, TM> flatMapFn);
 
   /**
-   * Applies the provided {@link Function} to messages in this {@link MessageStream} and returns the
+   * Applies the provided function to messages in this {@link MessageStream} and returns the
    * transformed {@link MessageStream}.
    * <p>
    * The {@link Function} is a predicate which determines whether a message in this {@link MessageStream}
@@ -92,16 +92,6 @@ public interface MessageStream<M> {
   void sendTo(OutputStream<M> stream);
 
   /**
-   * Allows sending messages to an intermediate {@link MessageStream}.
-   *
-   * NOTE: the {@code stream} has to be a {@link MessageStream}.
-   *
-   * @param stream  the intermediate {@link MessageStream} to send the message to
-   * @return  the intermediate {@link MessageStream} to consume the messages sent
-   */
-  MessageStream<M> sendThrough(OutputStream<M> stream);
-
-  /**
    * Groups the messages in this {@link MessageStream} according to the provided {@link Window} semantics
    * (e.g. tumbling, sliding or session windows) and returns the transformed {@link MessageStream} of
    * {@link WindowPane}s.

http://git-wip-us.apache.org/repos/asf/samza/blob/4bde68b4/samza-api/src/main/java/org/apache/samza/operators/StreamGraph.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/StreamGraph.java b/samza-api/src/main/java/org/apache/samza/operators/StreamGraph.java
index 9e6644b..abc9861 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/StreamGraph.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/StreamGraph.java
@@ -19,8 +19,6 @@
 package org.apache.samza.operators;
 
 import org.apache.samza.annotation.InterfaceStability;
-import org.apache.samza.config.Config;
-import org.apache.samza.config.ConfigException;
 import org.apache.samza.operators.data.MessageEnvelope;
 import org.apache.samza.serializers.Serde;
 
@@ -93,26 +91,4 @@ public interface StreamGraph {
    */
   StreamGraph withContextManager(ContextManager manager);
 
-  String GRAPH_CONFIG = "job.stream.graph.impl.class";
-  String DEFAULT_GRAPH_IMPL_CLASS = "org.apache.samza.operators.StreamGraphImpl";
-
-  /**
-   * Static method to instantiate the implementation class of {@link StreamGraph}.
-   *
-   * @param config  the {@link Config} object for this job
-   * @return  the {@link StreamGraph} object created
-   */
-  static StreamGraph fromConfig(Config config) {
-
-    try {
-      if (StreamGraph.class.isAssignableFrom(Class.forName(config.get(GRAPH_CONFIG, DEFAULT_GRAPH_IMPL_CLASS)))) {
-        return (StreamGraph) Class.forName(config.get(GRAPH_CONFIG, DEFAULT_GRAPH_IMPL_CLASS)).newInstance();
-      }
-    } catch (Exception e) {
-      throw new ConfigException(String.format("Problem in loading StreamGraphImpl class %s", config.get(GRAPH_CONFIG)), e);
-    }
-    throw new ConfigException(String.format(
-        "Class %s does not implement interface StreamGraphBuilder properly",
-        config.get(GRAPH_CONFIG)));
-  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/4bde68b4/samza-api/src/main/java/org/apache/samza/operators/StreamGraphBuilder.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/StreamGraphBuilder.java b/samza-api/src/main/java/org/apache/samza/operators/StreamGraphBuilder.java
new file mode 100644
index 0000000..b415cf8
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/operators/StreamGraphBuilder.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators;
+
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.config.Config;
+
+
+/**
+ * This interface defines a factory class that user will implement to create user-defined operator DAG in a {@link StreamGraph} object.
+ */
+@InterfaceStability.Unstable
+public interface StreamGraphBuilder {
+  /**
+   * Users are required to implement this abstract method to initialize the processing logic of the application, in terms
+   * of a DAG of {@link org.apache.samza.operators.MessageStream}s and operators
+   *
+   * @param graph  an empty {@link StreamGraph} object to be initialized
+   * @param config  the {@link Config} of the application
+   */
+  void init(StreamGraph graph, Config config);
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/4bde68b4/samza-api/src/main/java/org/apache/samza/operators/StreamGraphFactory.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/StreamGraphFactory.java b/samza-api/src/main/java/org/apache/samza/operators/StreamGraphFactory.java
deleted file mode 100644
index c292363..0000000
--- a/samza-api/src/main/java/org/apache/samza/operators/StreamGraphFactory.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators;
-
-import org.apache.samza.annotation.InterfaceStability;
-import org.apache.samza.config.Config;
-
-
-/**
- * This interface defines a factory class that user will implement to create user-defined operator DAG in a {@link StreamGraph} object.
- */
-@InterfaceStability.Unstable
-public interface StreamGraphFactory {
-  /**
-   * Users are required to implement this abstract method to initialize the processing logic of the application, in terms
-   * of a DAG of {@link org.apache.samza.operators.MessageStream}s and operators
-   *
-   * @param config  the {@link Config} of the application
-   * @return  the {@link StreamGraph} object which contains user-defined processing logic of the application
-   */
-  StreamGraph create(Config config);
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/4bde68b4/samza-api/src/main/java/org/apache/samza/operators/functions/FilterFunction.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/FilterFunction.java b/samza-api/src/main/java/org/apache/samza/operators/functions/FilterFunction.java
index 73c5c9d..58479d6 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/functions/FilterFunction.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/functions/FilterFunction.java
@@ -19,8 +19,6 @@
 package org.apache.samza.operators.functions;
 
 import org.apache.samza.annotation.InterfaceStability;
-import org.apache.samza.config.Config;
-import org.apache.samza.task.TaskContext;
 
 
 /**
@@ -28,7 +26,8 @@ import org.apache.samza.task.TaskContext;
  * @param <M>  type of the input message
  */
 @InterfaceStability.Unstable
-public interface FilterFunction<M> extends InitFunction {
+@FunctionalInterface
+public interface FilterFunction<M> extends InitableFunction {
 
   /**
    * Returns a boolean indicating whether this message should be retained or filtered out.
@@ -37,11 +36,4 @@ public interface FilterFunction<M> extends InitFunction {
    */
   boolean apply(M message);
 
-  /**
-   * Init method to initialize the context for this {@link FilterFunction}. The default implementation is NO-OP.
-   *
-   * @param config  the {@link Config} object for this task
-   * @param context  the {@link TaskContext} object for this task
-   */
-  default void init(Config config, TaskContext context) { }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/4bde68b4/samza-api/src/main/java/org/apache/samza/operators/functions/FlatMapFunction.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/FlatMapFunction.java b/samza-api/src/main/java/org/apache/samza/operators/functions/FlatMapFunction.java
index f8458f2..bbbddeb 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/functions/FlatMapFunction.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/functions/FlatMapFunction.java
@@ -19,8 +19,6 @@
 package org.apache.samza.operators.functions;
 
 import org.apache.samza.annotation.InterfaceStability;
-import org.apache.samza.config.Config;
-import org.apache.samza.task.TaskContext;
 
 import java.util.Collection;
 
@@ -32,7 +30,8 @@ import java.util.Collection;
  * @param <OM>  type of the transformed messages
  */
 @InterfaceStability.Unstable
-public interface FlatMapFunction<M, OM>  extends InitFunction {
+@FunctionalInterface
+public interface FlatMapFunction<M, OM>  extends InitableFunction {
 
   /**
    * Transforms the provided message into a collection of 0 or more messages.
@@ -41,11 +40,4 @@ public interface FlatMapFunction<M, OM>  extends InitFunction {
    */
   Collection<OM> apply(M message);
 
-  /**
-   * Init method to initialize the context for this {@link FlatMapFunction}. The default implementation is NO-OP.
-   *
-   * @param config  the {@link Config} object for this task
-   * @param context  the {@link TaskContext} object for this task
-   */
-  default void init(Config config, TaskContext context) { };
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/4bde68b4/samza-api/src/main/java/org/apache/samza/operators/functions/InitFunction.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/InitFunction.java b/samza-api/src/main/java/org/apache/samza/operators/functions/InitFunction.java
deleted file mode 100644
index eec56df..0000000
--- a/samza-api/src/main/java/org/apache/samza/operators/functions/InitFunction.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.functions;
-
-import org.apache.samza.annotation.InterfaceStability;
-import org.apache.samza.config.Config;
-import org.apache.samza.task.TaskContext;
-
-
-/**
- * interface defined to initalize the context of message transformation functions
- */
-@InterfaceStability.Unstable
-public interface InitFunction {
-  /**
-   * Interface method to initialize the context for a specific message transformation function.
-   *
-   * @param config  the {@link Config} object for this task
-   * @param context  the {@link TaskContext} object for this task
-   */
-  void init(Config config, TaskContext context);
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/4bde68b4/samza-api/src/main/java/org/apache/samza/operators/functions/InitableFunction.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/InitableFunction.java b/samza-api/src/main/java/org/apache/samza/operators/functions/InitableFunction.java
new file mode 100644
index 0000000..2f738da
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/operators/functions/InitableFunction.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.functions;
+
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.config.Config;
+import org.apache.samza.task.TaskContext;
+
+
+/**
+ * interface defined to initalize the context of message transformation functions
+ */
+@InterfaceStability.Unstable
+public interface InitableFunction {
+
+  /**
+   * Interface method to initialize the context for a specific message transformation function.
+   *
+   * @param config  the {@link Config} object for this task
+   * @param context  the {@link TaskContext} object for this task
+   */
+  default void init(Config config, TaskContext context) { }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/4bde68b4/samza-api/src/main/java/org/apache/samza/operators/functions/JoinFunction.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/JoinFunction.java b/samza-api/src/main/java/org/apache/samza/operators/functions/JoinFunction.java
index afc92ee..fc38177 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/functions/JoinFunction.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/functions/JoinFunction.java
@@ -19,8 +19,6 @@
 package org.apache.samza.operators.functions;
 
 import org.apache.samza.annotation.InterfaceStability;
-import org.apache.samza.config.Config;
-import org.apache.samza.task.TaskContext;
 
 
 /**
@@ -32,7 +30,7 @@ import org.apache.samza.task.TaskContext;
  * @param <RM>  type of the joined message
  */
 @InterfaceStability.Unstable
-public interface JoinFunction<K, M, JM, RM>  extends InitFunction {
+public interface JoinFunction<K, M, JM, RM>  extends InitableFunction {
 
   /**
    * Join the provided input messages and produces the joined messages.
@@ -58,11 +56,4 @@ public interface JoinFunction<K, M, JM, RM>  extends InitFunction {
    */
   K getSecondKey(JM message);
 
-  /**
-   * Init method to initialize the context for this {@link JoinFunction}. The default implementation is NO-OP.
-   *
-   * @param config  the {@link Config} object for this task
-   * @param context  the {@link TaskContext} object for this task
-   */
-  default void init(Config config, TaskContext context) { }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/4bde68b4/samza-api/src/main/java/org/apache/samza/operators/functions/KeyValueJoinFunction.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/KeyValueJoinFunction.java b/samza-api/src/main/java/org/apache/samza/operators/functions/KeyValueJoinFunction.java
deleted file mode 100644
index b651b3d..0000000
--- a/samza-api/src/main/java/org/apache/samza/operators/functions/KeyValueJoinFunction.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.functions;
-
-import org.apache.samza.annotation.InterfaceStability;
-import org.apache.samza.operators.data.MessageEnvelope;
-
-/**
- * A specific {@link JoinFunction} that joins {@link MessageEnvelope}s from two {@link org.apache.samza.operators.MessageStream}s and produces
- * a joined message.
- *
- * @param <K>  type of the join key
- * @param <M>  type of the input {@link MessageEnvelope}
- * @param <JM>  type of the {@link MessageEnvelope} to join with
- * @param <RM>  type of the joined message
- */
-@InterfaceStability.Unstable
-@FunctionalInterface
-public interface KeyValueJoinFunction<K, M extends MessageEnvelope<K, ?>, JM extends MessageEnvelope<K, ?>, RM> extends JoinFunction<K, M, JM, RM> {
-
-  default K getFirstKey(M message) {
-    return message.getKey();
-  }
-
-  default K getSecondKey(JM message) {
-    return message.getKey();
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/4bde68b4/samza-api/src/main/java/org/apache/samza/operators/functions/MapFunction.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/MapFunction.java b/samza-api/src/main/java/org/apache/samza/operators/functions/MapFunction.java
index a051914..05a554f 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/functions/MapFunction.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/functions/MapFunction.java
@@ -19,8 +19,6 @@
 package org.apache.samza.operators.functions;
 
 import org.apache.samza.annotation.InterfaceStability;
-import org.apache.samza.config.Config;
-import org.apache.samza.task.TaskContext;
 
 
 /**
@@ -29,7 +27,8 @@ import org.apache.samza.task.TaskContext;
  * @param <OM>  type of the transformed message
  */
 @InterfaceStability.Unstable
-public interface MapFunction<M, OM>  extends InitFunction {
+@FunctionalInterface
+public interface MapFunction<M, OM>  extends InitableFunction {
 
   /**
    * Transforms the provided message into another message
@@ -38,11 +37,4 @@ public interface MapFunction<M, OM>  extends InitFunction {
    */
   OM apply(M message);
 
-  /**
-   * Init method to initialize the context for this {@link MapFunction}. The default implementation is NO-OP.
-   *
-   * @param config  the {@link Config} object for this task
-   * @param context  the {@link TaskContext} object for this task
-   */
-  default void init(Config config, TaskContext context) { }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/4bde68b4/samza-api/src/main/java/org/apache/samza/operators/functions/SinkFunction.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/SinkFunction.java b/samza-api/src/main/java/org/apache/samza/operators/functions/SinkFunction.java
index 1050771..08e090a 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/functions/SinkFunction.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/functions/SinkFunction.java
@@ -19,9 +19,7 @@
 package org.apache.samza.operators.functions;
 
 import org.apache.samza.annotation.InterfaceStability;
-import org.apache.samza.config.Config;
 import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.TaskContext;
 import org.apache.samza.task.TaskCoordinator;
 
 
@@ -30,7 +28,8 @@ import org.apache.samza.task.TaskCoordinator;
  * @param <M>  type of the input message
  */
 @InterfaceStability.Unstable
-public interface SinkFunction<M>  extends InitFunction {
+@FunctionalInterface
+public interface SinkFunction<M>  extends InitableFunction {
 
   /**
    * Allows sending the provided message to an output {@link org.apache.samza.system.SystemStream} using
@@ -38,16 +37,9 @@ public interface SinkFunction<M>  extends InitFunction {
    * or shut the container down.
    *
    * @param message  the input message to be sent to an output {@link org.apache.samza.system.SystemStream}
-   * @param messageCollector  the {@link MessageCollector} to use to send the {@link org.apache.samza.operators.data.MessageEnvelope}
+   * @param messageCollector  the {@link MessageCollector} to send the {@link org.apache.samza.operators.data.MessageEnvelope}
    * @param taskCoordinator  the {@link TaskCoordinator} to request commits or shutdown
    */
   void apply(M message, MessageCollector messageCollector, TaskCoordinator taskCoordinator);
 
-  /**
-   * Init method to initialize the context for this {@link MapFunction}. The default implementation is NO-OP.
-   *
-   * @param config  the {@link Config} object for this task
-   * @param context  the {@link TaskContext} object for this task
-   */
-  default void init(Config config, TaskContext context) { }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/4bde68b4/samza-api/src/main/java/org/apache/samza/operators/windows/Window.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/windows/Window.java b/samza-api/src/main/java/org/apache/samza/operators/windows/Window.java
index 8aa665a..9609292 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/windows/Window.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/windows/Window.java
@@ -29,7 +29,7 @@ import org.apache.samza.operators.triggers.Trigger;
  * that determine when results from the {@link Window} are emitted.
  *
  * <p> Each emitted result contains one or more messages in the window and is called a {@link WindowPane}.
- * A pane can include all messagess collected for the window so far or only the new messages
+ * A pane can include all messages collected for the window so far or only the new messages
  * since the last emitted pane. (as determined by the {@link AccumulationMode})
  *
  * <p> A window can have early triggers that allow emitting {@link WindowPane}s speculatively before all data for the window

http://git-wip-us.apache.org/repos/asf/samza/blob/4bde68b4/samza-api/src/main/java/org/apache/samza/system/ExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/ExecutionEnvironment.java b/samza-api/src/main/java/org/apache/samza/system/ExecutionEnvironment.java
index d0c5985..ad37eb3 100644
--- a/samza-api/src/main/java/org/apache/samza/system/ExecutionEnvironment.java
+++ b/samza-api/src/main/java/org/apache/samza/system/ExecutionEnvironment.java
@@ -20,7 +20,7 @@ package org.apache.samza.system;
 
 import org.apache.samza.annotation.InterfaceStability;
 import org.apache.samza.config.ConfigException;
-import org.apache.samza.operators.StreamGraphFactory;
+import org.apache.samza.operators.StreamGraphBuilder;
 import org.apache.samza.config.Config;
 
 
@@ -65,9 +65,9 @@ public interface ExecutionEnvironment {
   /**
    * Method to be invoked to deploy and run the actual Samza jobs to execute {@link org.apache.samza.operators.StreamGraph}
    *
-   * @param graphFactory  the user-defined {@link StreamGraphFactory} object
+   * @param graphBuilder  the user-defined {@link StreamGraphBuilder} object
    * @param config  the {@link Config} object for this job
    */
-  void run(StreamGraphFactory graphFactory, Config config);
+  void run(StreamGraphBuilder graphBuilder, Config config);
 
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/4bde68b4/samza-api/src/main/java/org/apache/samza/task/TaskContext.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/task/TaskContext.java b/samza-api/src/main/java/org/apache/samza/task/TaskContext.java
index 5779071..128cff1 100644
--- a/samza-api/src/main/java/org/apache/samza/task/TaskContext.java
+++ b/samza-api/src/main/java/org/apache/samza/task/TaskContext.java
@@ -61,7 +61,7 @@ public interface TaskContext {
    * @param <T>  the type of user-defined task context
    * @return  user-defined task context object
    */
-  default <T extends TaskContext> T getUserDefinedContext() {
+  default <T> T getUserDefinedContext() {
     return null;
   };
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/4bde68b4/samza-api/src/test/java/org/apache/samza/operators/windows/TestWindowPane.java
----------------------------------------------------------------------
diff --git a/samza-api/src/test/java/org/apache/samza/operators/windows/TestWindowPane.java b/samza-api/src/test/java/org/apache/samza/operators/windows/TestWindowPane.java
index 809c5b4..54d0b2f 100644
--- a/samza-api/src/test/java/org/apache/samza/operators/windows/TestWindowPane.java
+++ b/samza-api/src/test/java/org/apache/samza/operators/windows/TestWindowPane.java
@@ -26,7 +26,7 @@ import static org.junit.Assert.assertEquals;
 public class TestWindowPane {
   @Test
   public void testConstructor() {
-    WindowPane<String, Integer> wndOutput = WindowPane.of(new WindowKey("testMsg", null), 10);
+    WindowPane<String, Integer> wndOutput = WindowPane.of(new WindowKey<>("testMsg", null), 10);
     assertEquals(wndOutput.getKey().getKey(), "testMsg");
     assertEquals(wndOutput.getMessage(), Integer.valueOf(10));
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/4bde68b4/samza-operator/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/MessageStreamImpl.java b/samza-operator/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
index d85d488..830e4a5 100644
--- a/samza-operator/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
+++ b/samza-operator/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
@@ -93,11 +93,6 @@ public class MessageStreamImpl<M> implements MessageStream<M> {
     this.registeredOperatorSpecs.add(OperatorSpecs.createSendToOperatorSpec(stream.getSinkFunction(), this.graph, stream));
   }
 
-  @Override public MessageStream<M> sendThrough(OutputStream<M> stream) {
-    this.sendTo(stream);
-    return this.graph.getIntStream(stream);
-  }
-
   @Override
   public <K, WV> MessageStream<WindowPane<K, WV>> window(Window<M, K, WV> window) {
     OperatorSpec<WindowPane<K, WV>> wndOp = OperatorSpecs.createWindowOperatorSpec((WindowInternal<M, K, WV>) window,

http://git-wip-us.apache.org/repos/asf/samza/blob/4bde68b4/samza-operator/src/main/java/org/apache/samza/operators/functions/PartialJoinFunction.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/functions/PartialJoinFunction.java b/samza-operator/src/main/java/org/apache/samza/operators/functions/PartialJoinFunction.java
index 3583b92..809a70a 100644
--- a/samza-operator/src/main/java/org/apache/samza/operators/functions/PartialJoinFunction.java
+++ b/samza-operator/src/main/java/org/apache/samza/operators/functions/PartialJoinFunction.java
@@ -19,8 +19,6 @@
 package org.apache.samza.operators.functions;
 
 import org.apache.samza.annotation.InterfaceStability;
-import org.apache.samza.config.Config;
-import org.apache.samza.task.TaskContext;
 
 
 /**
@@ -28,7 +26,7 @@ import org.apache.samza.task.TaskContext;
  * {@link org.apache.samza.operators.MessageStream}s and merge them into a single output joined message in the join output
  */
 @InterfaceStability.Unstable
-public interface PartialJoinFunction<K, M, OM, RM> extends InitFunction {
+public interface PartialJoinFunction<K, M, OM, RM> extends InitableFunction {
 
   /**
    * Method to perform join method on the two input messages
@@ -55,11 +53,4 @@ public interface PartialJoinFunction<K, M, OM, RM> extends InitFunction {
    */
   K getOtherKey(OM message);
 
-  /**
-   * Init method to initialize the context for this {@link PartialJoinFunction}. The default implementation is NO-OP.
-   *
-   * @param config  the {@link Config} object for this task
-   * @param context  the {@link TaskContext} object for this task
-   */
-  default void init(Config config, TaskContext context) { }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/4bde68b4/samza-operator/src/main/java/org/apache/samza/system/RemoteExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/system/RemoteExecutionEnvironment.java b/samza-operator/src/main/java/org/apache/samza/system/RemoteExecutionEnvironment.java
new file mode 100644
index 0000000..fafa2cb
--- /dev/null
+++ b/samza-operator/src/main/java/org/apache/samza/system/RemoteExecutionEnvironment.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.system;
+
+import org.apache.samza.operators.StreamGraphBuilder;
+import org.apache.samza.config.Config;
+
+/**
+ * This class implements the {@link ExecutionEnvironment} that runs the applications in YARN environment
+ */
+public class RemoteExecutionEnvironment implements ExecutionEnvironment {
+
+  @Override public void run(StreamGraphBuilder app, Config config) {
+    // TODO: add description of ProcessContext that is going to create a sub-DAG of the {@code graph}
+    // TODO: actually instantiate the tasks and run the job, i.e.
+    // 1. create all input/output/intermediate topics
+    // 2. create the single job configuration
+    // 3. execute JobRunner to submit the single job for the whole graph
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/4bde68b4/samza-operator/src/main/java/org/apache/samza/system/SingleJobExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/system/SingleJobExecutionEnvironment.java b/samza-operator/src/main/java/org/apache/samza/system/SingleJobExecutionEnvironment.java
deleted file mode 100644
index 60a4c60..0000000
--- a/samza-operator/src/main/java/org/apache/samza/system/SingleJobExecutionEnvironment.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.system;
-
-import org.apache.samza.operators.StreamGraphFactory;
-import org.apache.samza.config.Config;
-
-/**
- * This class implements the {@link ExecutionEnvironment} that runs the applications in YARN environment
- */
-public class SingleJobExecutionEnvironment implements ExecutionEnvironment {
-
-  @Override public void run(StreamGraphFactory app, Config config) {
-    // TODO: add description of ProcessContext that is going to create a sub-DAG of the {@code graph}
-    // TODO: actually instantiate the tasks and run the job, i.e.
-    // 1. create all input/output/intermediate topics
-    // 2. create the single job configuration
-    // 3. execute JobRunner to submit the single job for the whole graph
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/4bde68b4/samza-operator/src/main/java/org/apache/samza/system/StandaloneExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/system/StandaloneExecutionEnvironment.java b/samza-operator/src/main/java/org/apache/samza/system/StandaloneExecutionEnvironment.java
index f60ff82..f0f6ef2 100644
--- a/samza-operator/src/main/java/org/apache/samza/system/StandaloneExecutionEnvironment.java
+++ b/samza-operator/src/main/java/org/apache/samza/system/StandaloneExecutionEnvironment.java
@@ -19,8 +19,10 @@
 
 package org.apache.samza.system;
 
-import org.apache.samza.operators.StreamGraphFactory;
+import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.operators.StreamGraphBuilder;
 import org.apache.samza.config.Config;
+import org.apache.samza.operators.StreamGraphImpl;
 
 
 /**
@@ -28,14 +30,21 @@ import org.apache.samza.config.Config;
  */
 public class StandaloneExecutionEnvironment implements ExecutionEnvironment {
 
-  @Override public void run(StreamGraphFactory app, Config config) {
+  // TODO: may want to move this to a common base class for all {@link ExecutionEnvironment}
+  StreamGraph createGraph(StreamGraphBuilder app, Config config) {
+    StreamGraphImpl graph = new StreamGraphImpl();
+    app.init(graph, config);
+    return graph;
+  }
+
+  @Override public void run(StreamGraphBuilder app, Config config) {
     // 1. get logic graph for optimization
-    // StreamGraph logicGraph = app.create(config);
+    // StreamGraph logicGraph = this.createGraph(app, config);
     // 2. potential optimization....
-    // 3. create new instance of StreamGraphFactory that would generate the optimized graph
+    // 3. create new instance of StreamGraphBuilder that would generate the optimized graph
     // 4. create all input/output/intermediate topics
     // 5. create the configuration for StreamProcessor
-    // 6. start the StreamProcessor w/ optimized instance of StreamGraphFactory
+    // 6. start the StreamProcessor w/ optimized instance of StreamGraphBuilder
   }
 
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/4bde68b4/samza-operator/src/main/java/org/apache/samza/task/StreamOperatorTask.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/task/StreamOperatorTask.java b/samza-operator/src/main/java/org/apache/samza/task/StreamOperatorTask.java
index fa7ec5e..b007e3c 100644
--- a/samza-operator/src/main/java/org/apache/samza/task/StreamOperatorTask.java
+++ b/samza-operator/src/main/java/org/apache/samza/task/StreamOperatorTask.java
@@ -21,7 +21,7 @@ package org.apache.samza.task;
 import org.apache.samza.config.Config;
 import org.apache.samza.operators.ContextManager;
 import org.apache.samza.operators.MessageStreamImpl;
-import org.apache.samza.operators.StreamGraphFactory;
+import org.apache.samza.operators.StreamGraphBuilder;
 import org.apache.samza.operators.StreamGraphImpl;
 import org.apache.samza.operators.data.InputMessageEnvelope;
 import org.apache.samza.operators.impl.OperatorGraph;
@@ -43,9 +43,9 @@ import java.util.Map;
  * This class brings all the operator API implementation components together and feeds the
  * {@link InputMessageEnvelope}s into the transformation chains.
  * <p>
- * It accepts an instance of the user implemented factory {@link StreamGraphFactory} as input parameter of the constructor.
+ * It accepts an instance of the user implemented factory {@link StreamGraphBuilder} as input parameter of the constructor.
  * When its own {@link #init(Config, TaskContext)} method is called during startup, it instantiate a user-defined {@link StreamGraphImpl}
- * from the {@link StreamGraphFactory}, calls {@link StreamGraphImpl#getContextManager()} to initialize the task-wide context
+ * from the {@link StreamGraphBuilder}, calls {@link StreamGraphImpl#getContextManager()} to initialize the task-wide context
  * for the graph, and creates a {@link MessageStreamImpl} corresponding to each of its input
  * {@link org.apache.samza.system.SystemStreamPartition}s. Each input {@link MessageStreamImpl}
  * will be corresponding to either an input stream or intermediate stream in {@link StreamGraphImpl}.
@@ -67,27 +67,30 @@ public final class StreamOperatorTask implements StreamTask, InitableTask, Windo
    */
   private final OperatorGraph operatorGraph = new OperatorGraph();
 
-  private final StreamGraphFactory graphFactory;
+  private final StreamGraphBuilder graphBuilder;
 
-  private ContextManager taskManager;
+  private ContextManager contextManager;
 
-  public StreamOperatorTask(StreamGraphFactory graphFactory) {
-    this.graphFactory = graphFactory;
+  public StreamOperatorTask(StreamGraphBuilder graphBuilder) {
+    this.graphBuilder = graphBuilder;
   }
 
   @Override
   public final void init(Config config, TaskContext context) throws Exception {
     // create the MessageStreamsImpl object and initialize app-specific logic DAG within the task
-    StreamGraphImpl streams = (StreamGraphImpl) this.graphFactory.create(config);
-    this.taskManager = streams.getContextManager();
+    StreamGraphImpl streams = new StreamGraphImpl();
+    this.graphBuilder.init(streams, config);
+    // get the context manager of the {@link StreamGraph} and initialize the task-specific context
+    this.contextManager = streams.getContextManager();
 
     Map<SystemStream, MessageStreamImpl> inputBySystemStream = new HashMap<>();
     context.getSystemStreamPartitions().forEach(ssp -> {
         if (!inputBySystemStream.containsKey(ssp.getSystemStream())) {
+          // create mapping from the physical input {@link SystemStream} to the logic {@link MessageStream}
           inputBySystemStream.putIfAbsent(ssp.getSystemStream(), streams.getInputStream(ssp.getSystemStream()));
         }
       });
-    operatorGraph.init(inputBySystemStream, config, this.taskManager.initTaskContext(config, context));
+    operatorGraph.init(inputBySystemStream, config, this.contextManager.initTaskContext(config, context));
   }
 
   @Override
@@ -103,6 +106,6 @@ public final class StreamOperatorTask implements StreamTask, InitableTask, Windo
 
   @Override
   public void close() throws Exception {
-    this.taskManager.finalizeTaskContext();
+    this.contextManager.finalizeTaskContext();
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/4bde68b4/samza-operator/src/test/java/org/apache/samza/example/BroadcastGraph.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/example/BroadcastGraph.java b/samza-operator/src/test/java/org/apache/samza/example/BroadcastGraph.java
deleted file mode 100644
index a91ce09..0000000
--- a/samza-operator/src/test/java/org/apache/samza/example/BroadcastGraph.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.example;
-
-import org.apache.samza.config.Config;
-import org.apache.samza.operators.MessageStream;
-import org.apache.samza.operators.StreamGraph;
-import org.apache.samza.operators.StreamGraphFactory;
-import org.apache.samza.operators.StreamGraphImpl;
-import org.apache.samza.operators.StreamSpec;
-import org.apache.samza.operators.data.InputMessageEnvelope;
-import org.apache.samza.operators.data.JsonIncomingSystemMessageEnvelope;
-import org.apache.samza.operators.data.Offset;
-import org.apache.samza.operators.triggers.Triggers;
-import org.apache.samza.operators.windows.Windows;
-import org.apache.samza.system.SystemStream;
-import org.apache.samza.system.SystemStreamPartition;
-
-import java.time.Duration;
-import java.util.function.BiFunction;
-import java.util.Properties;
-import java.util.Set;
-
-
-/**
- * Example implementation of split stream tasks
- *
- */
-public class BroadcastGraph implements StreamGraphFactory {
-
-  private final Set<SystemStreamPartition> inputs;
-
-  BroadcastGraph(Set<SystemStreamPartition> inputs) {
-    this.inputs = inputs;
-  }
-
-  class MessageType {
-    String field1;
-    String field2;
-    String field3;
-    String field4;
-    String parKey;
-    private long timestamp;
-
-    public long getTimestamp() {
-      return this.timestamp;
-    }
-  }
-
-  class JsonMessageEnvelope extends JsonIncomingSystemMessageEnvelope<MessageType> {
-    JsonMessageEnvelope(String key, MessageType data, Offset offset, SystemStreamPartition partition) {
-      super(key, data, offset, partition);
-    }
-  }
-
-  @Override
-  public StreamGraph create(Config config) {
-    StreamGraphImpl graph = new StreamGraphImpl();
-
-    BiFunction<JsonMessageEnvelope, Integer, Integer> sumAggregator = (m, c) -> c + 1;
-    inputs.forEach(entry -> {
-        MessageStream<JsonMessageEnvelope> inputStream = graph.<Object, Object, InputMessageEnvelope>createInStream(new StreamSpec() {
-          @Override public SystemStream getSystemStream() {
-            return entry.getSystemStream();
-          }
-
-          @Override public Properties getProperties() {
-            return null;
-          }
-        }, null, null).
-            map(this::getInputMessage);
-
-        inputStream.filter(this::myFilter1).window(Windows.tumblingWindow(Duration.ofMillis(100), sumAggregator)
-            .setLateTrigger(Triggers.any(Triggers.count(30000), Triggers.timeSinceFirstMessage(Duration.ofMillis(10)))));
-
-        inputStream.filter(this::myFilter2).window(Windows.tumblingWindow(Duration.ofMillis(100), sumAggregator)
-            .setLateTrigger(Triggers.any(Triggers.count(30000), Triggers.timeSinceFirstMessage(Duration.ofMillis(10)))));
-
-        inputStream.filter(this::myFilter3).window(Windows.tumblingWindow(Duration.ofMillis(100), sumAggregator)
-            .setLateTrigger(Triggers.any(Triggers.count(30000), Triggers.timeSinceFirstMessage(Duration.ofMillis(10)))));
-
-      });
-    return graph;
-  }
-
-  JsonMessageEnvelope getInputMessage(InputMessageEnvelope m1) {
-    return (JsonMessageEnvelope) m1.getMessage();
-  }
-
-  boolean myFilter1(JsonMessageEnvelope m1) {
-    // Do user defined processing here
-    return m1.getMessage().parKey.equals("key1");
-  }
-
-  boolean myFilter2(JsonMessageEnvelope m1) {
-    // Do user defined processing here
-    return m1.getMessage().parKey.equals("key2");
-  }
-
-  boolean myFilter3(JsonMessageEnvelope m1) {
-    return m1.getMessage().parKey.equals("key3");
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/4bde68b4/samza-operator/src/test/java/org/apache/samza/example/JoinGraph.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/example/JoinGraph.java b/samza-operator/src/test/java/org/apache/samza/example/JoinGraph.java
deleted file mode 100644
index 2313f63..0000000
--- a/samza-operator/src/test/java/org/apache/samza/example/JoinGraph.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.example;
-
-import org.apache.samza.config.Config;
-import org.apache.samza.operators.MessageStream;
-import org.apache.samza.operators.StreamGraph;
-import org.apache.samza.operators.StreamGraphFactory;
-import org.apache.samza.operators.StreamGraphImpl;
-import org.apache.samza.operators.StreamSpec;
-import org.apache.samza.operators.data.InputMessageEnvelope;
-import org.apache.samza.operators.data.JsonIncomingSystemMessageEnvelope;
-import org.apache.samza.operators.data.Offset;
-import org.apache.samza.operators.functions.KeyValueJoinFunction;
-import org.apache.samza.serializers.JsonSerde;
-import org.apache.samza.serializers.StringSerde;
-import org.apache.samza.system.SystemStream;
-import org.apache.samza.system.SystemStreamPartition;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Properties;
-import java.util.Set;
-
-
-/**
- * Example implementation of unique key-based stream-stream join tasks
- *
- */
-public class JoinGraph implements StreamGraphFactory {
-  private final Set<SystemStreamPartition> inputs;
-
-  JoinGraph(Set<SystemStreamPartition> inputs) {
-    this.inputs = inputs;
-  }
-
-  class MessageType {
-    String joinKey;
-    List<String> joinFields = new ArrayList<>();
-  }
-
-  class JsonMessageEnvelope extends JsonIncomingSystemMessageEnvelope<MessageType> {
-    JsonMessageEnvelope(String key, MessageType data, Offset offset, SystemStreamPartition partition) {
-      super(key, data, offset, partition);
-    }
-  }
-
-  MessageStream<JsonMessageEnvelope> joinOutput = null;
-
-  @Override
-  public StreamGraph create(Config config) {
-    StreamGraphImpl graph = new StreamGraphImpl();
-
-    for (SystemStreamPartition input : inputs) {
-      MessageStream<JsonMessageEnvelope> newSource = graph.<Object, Object, InputMessageEnvelope>createInStream(
-          new StreamSpec() {
-            @Override public SystemStream getSystemStream() {
-              return input.getSystemStream();
-            }
-
-            @Override public Properties getProperties() {
-              return null;
-            }
-          }, null, null).map(this::getInputMessage);
-      if (joinOutput == null) {
-        joinOutput = newSource;
-      } else {
-        joinOutput = joinOutput.join(newSource,
-            (KeyValueJoinFunction<String, JsonMessageEnvelope, JsonMessageEnvelope, JsonMessageEnvelope>) this::myJoinResult);
-      }
-    }
-
-    joinOutput.sendTo(graph.createOutStream(new StreamSpec() {
-      @Override public SystemStream getSystemStream() {
-        return null;
-      }
-
-      @Override public Properties getProperties() {
-        return null;
-      }
-    }, new StringSerde("UTF-8"), new JsonSerde<>()));
-
-    return graph;
-  }
-
-  private JsonMessageEnvelope getInputMessage(InputMessageEnvelope ism) {
-    return new JsonMessageEnvelope(
-        ((MessageType) ism.getMessage()).joinKey,
-        (MessageType) ism.getMessage(),
-        ism.getOffset(),
-        ism.getSystemStreamPartition());
-  }
-
-  JsonMessageEnvelope myJoinResult(JsonMessageEnvelope m1, JsonMessageEnvelope m2) {
-    MessageType newJoinMsg = new MessageType();
-    newJoinMsg.joinKey = m1.getKey();
-    newJoinMsg.joinFields.addAll(m1.getMessage().joinFields);
-    newJoinMsg.joinFields.addAll(m2.getMessage().joinFields);
-    return new JsonMessageEnvelope(m1.getMessage().joinKey, newJoinMsg, null, null);
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/4bde68b4/samza-operator/src/test/java/org/apache/samza/example/KeyValueStoreExample.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/example/KeyValueStoreExample.java b/samza-operator/src/test/java/org/apache/samza/example/KeyValueStoreExample.java
index ad6336a..85ebc6c 100644
--- a/samza-operator/src/test/java/org/apache/samza/example/KeyValueStoreExample.java
+++ b/samza-operator/src/test/java/org/apache/samza/example/KeyValueStoreExample.java
@@ -22,12 +22,10 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
-import org.apache.samza.operators.MessageStream;
-import org.apache.samza.operators.OutputStream;
-import org.apache.samza.operators.StreamGraphFactory;
+
+import org.apache.samza.operators.*;
+import org.apache.samza.operators.StreamGraphBuilder;
 import org.apache.samza.config.Config;
-import org.apache.samza.operators.StreamGraph;
-import org.apache.samza.operators.StreamSpec;
 import org.apache.samza.operators.data.MessageEnvelope;
 import org.apache.samza.operators.functions.FlatMapFunction;
 import org.apache.samza.serializers.JsonSerde;
@@ -44,7 +42,7 @@ import java.util.Properties;
 /**
  * Example code using {@link KeyValueStore} to implement event-time window
  */
-public class KeyValueStoreExample implements StreamGraphFactory {
+public class KeyValueStoreExample implements StreamGraphBuilder {
 
   /**
    * used by remote execution environment to launch the job in remote program. The remote program should follow the similar
@@ -59,8 +57,7 @@ public class KeyValueStoreExample implements StreamGraphFactory {
    *   }
    *
    */
-  @Override public StreamGraph create(Config config) {
-    StreamGraph graph = StreamGraph.fromConfig(config);
+  @Override public void init(StreamGraph graph, Config config) {
 
     MessageStream<PageViewEvent> pageViewEvents = graph.createInStream(input1, new StringSerde("UTF-8"), new JsonSerde<>());
     OutputStream<StatsOutput> pageViewPerMemberCounters = graph.createOutStream(output, new StringSerde("UTF-8"), new JsonSerde<StatsOutput>());
@@ -70,7 +67,6 @@ public class KeyValueStoreExample implements StreamGraphFactory {
         flatMap(new MyStatsCounter()).
         sendTo(pageViewPerMemberCounters);
 
-    return graph;
   }
 
   // standalone local program model

http://git-wip-us.apache.org/repos/asf/samza/blob/4bde68b4/samza-operator/src/test/java/org/apache/samza/example/NoContextStreamExample.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/example/NoContextStreamExample.java b/samza-operator/src/test/java/org/apache/samza/example/NoContextStreamExample.java
index 577d06f..c6d2e6e 100644
--- a/samza-operator/src/test/java/org/apache/samza/example/NoContextStreamExample.java
+++ b/samza-operator/src/test/java/org/apache/samza/example/NoContextStreamExample.java
@@ -18,12 +18,9 @@
  */
 package org.apache.samza.example;
 
-import org.apache.samza.operators.OutputStream;
-import org.apache.samza.operators.StreamGraphFactory;
+import org.apache.samza.operators.*;
+import org.apache.samza.operators.StreamGraphBuilder;
 import org.apache.samza.config.Config;
-import org.apache.samza.operators.MessageStream;
-import org.apache.samza.operators.StreamGraph;
-import org.apache.samza.operators.StreamSpec;
 import org.apache.samza.operators.data.InputMessageEnvelope;
 import org.apache.samza.operators.data.JsonIncomingSystemMessageEnvelope;
 import org.apache.samza.operators.data.Offset;
@@ -41,9 +38,9 @@ import java.util.Properties;
 
 
 /**
- * Example {@link StreamGraphFactory} code to test the API methods
+ * Example {@link StreamGraphBuilder} code to test the API methods
  */
-public class NoContextStreamExample implements StreamGraphFactory {
+public class NoContextStreamExample implements StreamGraphBuilder {
 
   StreamSpec input1 = new StreamSpec() {
     @Override public SystemStream getSystemStream() {
@@ -129,8 +126,7 @@ public class NoContextStreamExample implements StreamGraphFactory {
    *   }
    *
    */
-  @Override public StreamGraph create(Config config) {
-    StreamGraph graph = StreamGraph.fromConfig(config);
+  @Override public void init(StreamGraph graph, Config config) {
     MessageStream<InputMessageEnvelope> inputSource1 = graph.<Object, Object, InputMessageEnvelope>createInStream(
         input1, null, null);
     MessageStream<InputMessageEnvelope> inputSource2 = graph.<Object, Object, InputMessageEnvelope>createInStream(
@@ -142,7 +138,6 @@ public class NoContextStreamExample implements StreamGraphFactory {
         join(inputSource2.map(this::getInputMessage), new MyJoinFunction()).
         sendTo(outStream);
 
-    return graph;
   }
 
   // standalone local program model

http://git-wip-us.apache.org/repos/asf/samza/blob/4bde68b4/samza-operator/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java b/samza-operator/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java
index ad433b6..0477066 100644
--- a/samza-operator/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java
+++ b/samza-operator/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java
@@ -20,7 +20,7 @@ package org.apache.samza.example;
 
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.OutputStream;
-import org.apache.samza.operators.StreamGraphFactory;
+import org.apache.samza.operators.StreamGraphBuilder;
 import org.apache.samza.config.Config;
 import org.apache.samza.operators.StreamGraph;
 import org.apache.samza.operators.StreamSpec;
@@ -38,7 +38,7 @@ import java.util.Properties;
 /**
  * Simple 2-way stream-to-stream join example
  */
-public class OrderShipmentJoinExample implements StreamGraphFactory {
+public class OrderShipmentJoinExample implements StreamGraphBuilder {
 
   /**
    * used by remote execution environment to launch the job in remote program. The remote program should follow the similar
@@ -53,8 +53,7 @@ public class OrderShipmentJoinExample implements StreamGraphFactory {
    *   }
    *
    */
-  @Override public StreamGraph create(Config config) {
-    StreamGraph graph = StreamGraph.fromConfig(config);
+  @Override public void init(StreamGraph graph, Config config) {
 
     MessageStream<OrderRecord> orders = graph.createInStream(input1, new StringSerde("UTF-8"), new JsonSerde<>());
     MessageStream<ShipmentRecord> shipments = graph.createInStream(input2, new StringSerde("UTF-8"), new JsonSerde<>());
@@ -62,7 +61,6 @@ public class OrderShipmentJoinExample implements StreamGraphFactory {
 
     orders.join(shipments, new MyJoinFunction()).sendTo(fulfilledOrders);
 
-    return graph;
   }
 
   // standalone local program model

http://git-wip-us.apache.org/repos/asf/samza/blob/4bde68b4/samza-operator/src/test/java/org/apache/samza/example/PageViewCounterExample.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/example/PageViewCounterExample.java b/samza-operator/src/test/java/org/apache/samza/example/PageViewCounterExample.java
index 1502aa2..f7d8bda 100644
--- a/samza-operator/src/test/java/org/apache/samza/example/PageViewCounterExample.java
+++ b/samza-operator/src/test/java/org/apache/samza/example/PageViewCounterExample.java
@@ -18,12 +18,9 @@
  */
 package org.apache.samza.example;
 
-import org.apache.samza.operators.MessageStream;
-import org.apache.samza.operators.OutputStream;
-import org.apache.samza.operators.StreamGraphFactory;
+import org.apache.samza.operators.*;
+import org.apache.samza.operators.StreamGraphBuilder;
 import org.apache.samza.config.Config;
-import org.apache.samza.operators.StreamGraph;
-import org.apache.samza.operators.StreamSpec;
 import org.apache.samza.operators.data.MessageEnvelope;
 import org.apache.samza.operators.triggers.Triggers;
 import org.apache.samza.operators.windows.AccumulationMode;
@@ -42,10 +39,9 @@ import java.util.Properties;
 /**
  * Example code to implement window-based counter
  */
-public class PageViewCounterExample implements StreamGraphFactory {
+public class PageViewCounterExample implements StreamGraphBuilder {
 
-  @Override public StreamGraph create(Config config) {
-    StreamGraph graph = StreamGraph.fromConfig(config);
+  @Override public void init(StreamGraph graph, Config config) {
 
     MessageStream<PageViewEvent> pageViewEvents = graph.createInStream(input1, new StringSerde("UTF-8"), new JsonSerde<>());
     OutputStream<MyStreamOutput> pageViewPerMemberCounters = graph.createOutStream(output, new StringSerde("UTF-8"), new JsonSerde<>());
@@ -56,7 +52,7 @@ public class PageViewCounterExample implements StreamGraphFactory {
             setAccumulationMode(AccumulationMode.DISCARDING)).
         map(MyStreamOutput::new).
         sendTo(pageViewPerMemberCounters);
-    return graph;
+
   }
 
   public static void main(String[] args) {

http://git-wip-us.apache.org/repos/asf/samza/blob/4bde68b4/samza-operator/src/test/java/org/apache/samza/example/RepartitionExample.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/example/RepartitionExample.java b/samza-operator/src/test/java/org/apache/samza/example/RepartitionExample.java
index f15e514..6994ac4 100644
--- a/samza-operator/src/test/java/org/apache/samza/example/RepartitionExample.java
+++ b/samza-operator/src/test/java/org/apache/samza/example/RepartitionExample.java
@@ -18,12 +18,9 @@
  */
 package org.apache.samza.example;
 
-import org.apache.samza.operators.MessageStream;
-import org.apache.samza.operators.OutputStream;
-import org.apache.samza.operators.StreamGraphFactory;
+import org.apache.samza.operators.*;
+import org.apache.samza.operators.StreamGraphBuilder;
 import org.apache.samza.config.Config;
-import org.apache.samza.operators.StreamGraph;
-import org.apache.samza.operators.StreamSpec;
 import org.apache.samza.operators.data.MessageEnvelope;
 import org.apache.samza.operators.windows.WindowPane;
 import org.apache.samza.operators.windows.Windows;
@@ -38,9 +35,9 @@ import java.util.*;
 
 
 /**
- * Example {@link StreamGraphFactory} code to test the API methods with re-partition operator
+ * Example {@link StreamGraphBuilder} code to test the API methods with re-partition operator
  */
-public class RepartitionExample implements StreamGraphFactory {
+public class RepartitionExample implements StreamGraphBuilder {
 
   /**
    * used by remote execution environment to launch the job in remote program. The remote program should follow the similar
@@ -54,8 +51,7 @@ public class RepartitionExample implements StreamGraphFactory {
    *   }
    *
    */
-  @Override public StreamGraph create(Config config) {
-    StreamGraph graph = StreamGraph.fromConfig(config);
+  @Override public void init(StreamGraph graph, Config config) {
 
     MessageStream<PageViewEvent> pageViewEvents = graph.createInStream(input1, new StringSerde("UTF-8"), new JsonSerde<>());
     OutputStream<MyStreamOutput> pageViewPerMemberCounters = graph.createOutStream(output, new StringSerde("UTF-8"), new JsonSerde<>());
@@ -67,7 +63,6 @@ public class RepartitionExample implements StreamGraphFactory {
         map(MyStreamOutput::new).
         sendTo(pageViewPerMemberCounters);
 
-    return graph;
   }
 
   // standalone local program model

http://git-wip-us.apache.org/repos/asf/samza/blob/4bde68b4/samza-operator/src/test/java/org/apache/samza/example/TestBasicStreamGraphs.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/example/TestBasicStreamGraphs.java b/samza-operator/src/test/java/org/apache/samza/example/TestBasicStreamGraphs.java
new file mode 100644
index 0000000..8ecd44f
--- /dev/null
+++ b/samza-operator/src/test/java/org/apache/samza/example/TestBasicStreamGraphs.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.example;
+
+import java.lang.reflect.Field;
+import org.apache.samza.Partition;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.impl.OperatorGraph;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.task.StreamOperatorTask;
+import org.apache.samza.task.TaskContext;
+import org.junit.Test;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+
+/**
+ * Unit test for {@link StreamOperatorTask}
+ */
+public class TestBasicStreamGraphs {
+
+  private final Set<SystemStreamPartition> inputPartitions = new HashSet<SystemStreamPartition>() { {
+      for (int i = 0; i < 4; i++) {
+        this.add(new SystemStreamPartition("my-system", String.format("my-topic%d", i), new Partition(i)));
+      }
+    } };
+
+  @Test
+  public void testUserTask() throws Exception {
+    Config mockConfig = mock(Config.class);
+    TaskContext mockContext = mock(TaskContext.class);
+    when(mockContext.getSystemStreamPartitions()).thenReturn(this.inputPartitions);
+    TestWindowExample userTask = new TestWindowExample(this.inputPartitions);
+    StreamOperatorTask adaptorTask = new StreamOperatorTask(userTask);
+    Field pipelineMapFld = StreamOperatorTask.class.getDeclaredField("operatorGraph");
+    pipelineMapFld.setAccessible(true);
+    OperatorGraph opGraph = (OperatorGraph) pipelineMapFld.get(adaptorTask);
+
+    adaptorTask.init(mockConfig, mockContext);
+    this.inputPartitions.forEach(partition -> {
+        assertNotNull(opGraph.get(partition.getSystemStream()));
+      });
+  }
+
+  @Test
+  public void testSplitTask() throws Exception {
+    Config mockConfig = mock(Config.class);
+    TaskContext mockContext = mock(TaskContext.class);
+    when(mockContext.getSystemStreamPartitions()).thenReturn(this.inputPartitions);
+    TestBroadcastExample splitTask = new TestBroadcastExample(this.inputPartitions);
+    StreamOperatorTask adaptorTask = new StreamOperatorTask(splitTask);
+    Field pipelineMapFld = StreamOperatorTask.class.getDeclaredField("operatorGraph");
+    pipelineMapFld.setAccessible(true);
+    OperatorGraph opGraph = (OperatorGraph) pipelineMapFld.get(adaptorTask);
+
+    adaptorTask.init(mockConfig, mockContext);
+    this.inputPartitions.forEach(partition -> {
+        assertNotNull(opGraph.get(partition.getSystemStream()));
+      });
+  }
+
+  @Test
+  public void testJoinTask() throws Exception {
+    Config mockConfig = mock(Config.class);
+    TaskContext mockContext = mock(TaskContext.class);
+    when(mockContext.getSystemStreamPartitions()).thenReturn(this.inputPartitions);
+    TestJoinExample joinTask = new TestJoinExample(this.inputPartitions);
+    StreamOperatorTask adaptorTask = new StreamOperatorTask(joinTask);
+    Field pipelineMapFld = StreamOperatorTask.class.getDeclaredField("operatorGraph");
+    pipelineMapFld.setAccessible(true);
+    OperatorGraph opGraph = (OperatorGraph) pipelineMapFld.get(adaptorTask);
+
+    adaptorTask.init(mockConfig, mockContext);
+    this.inputPartitions.forEach(partition -> {
+        assertNotNull(opGraph.get(partition.getSystemStream()));
+      });
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/4bde68b4/samza-operator/src/test/java/org/apache/samza/example/TestBroadcastExample.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/example/TestBroadcastExample.java b/samza-operator/src/test/java/org/apache/samza/example/TestBroadcastExample.java
new file mode 100644
index 0000000..d22324b
--- /dev/null
+++ b/samza-operator/src/test/java/org/apache/samza/example/TestBroadcastExample.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.example;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.operators.StreamSpec;
+import org.apache.samza.operators.data.InputMessageEnvelope;
+import org.apache.samza.operators.data.JsonIncomingSystemMessageEnvelope;
+import org.apache.samza.operators.data.Offset;
+import org.apache.samza.operators.triggers.Triggers;
+import org.apache.samza.operators.windows.Windows;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamPartition;
+
+import java.time.Duration;
+import java.util.function.BiFunction;
+import java.util.Properties;
+import java.util.Set;
+
+
+/**
+ * Example implementation of split stream tasks
+ *
+ */
+public class TestBroadcastExample extends TestExampleBase {
+
+  TestBroadcastExample(Set<SystemStreamPartition> inputs) {
+    super(inputs);
+  }
+
+  class MessageType {
+    String field1;
+    String field2;
+    String field3;
+    String field4;
+    String parKey;
+    private long timestamp;
+
+    public long getTimestamp() {
+      return this.timestamp;
+    }
+  }
+
+  class JsonMessageEnvelope extends JsonIncomingSystemMessageEnvelope<MessageType> {
+    JsonMessageEnvelope(String key, MessageType data, Offset offset, SystemStreamPartition partition) {
+      super(key, data, offset, partition);
+    }
+  }
+
+  @Override
+  public void init(StreamGraph graph, Config config) {
+    BiFunction<JsonMessageEnvelope, Integer, Integer> sumAggregator = (m, c) -> c + 1;
+    inputs.keySet().forEach(entry -> {
+        MessageStream<JsonMessageEnvelope> inputStream = graph.<Object, Object, InputMessageEnvelope>createInStream(new StreamSpec() {
+          @Override public SystemStream getSystemStream() {
+            return entry;
+          }
+
+          @Override public Properties getProperties() {
+            return null;
+          }
+        }, null, null).map(this::getInputMessage);
+
+        inputStream.filter(this::myFilter1).window(Windows.tumblingWindow(Duration.ofMillis(100), sumAggregator)
+            .setLateTrigger(Triggers.any(Triggers.count(30000), Triggers.timeSinceFirstMessage(Duration.ofMillis(10)))));
+
+        inputStream.filter(this::myFilter2).window(Windows.tumblingWindow(Duration.ofMillis(100), sumAggregator)
+            .setLateTrigger(Triggers.any(Triggers.count(30000), Triggers.timeSinceFirstMessage(Duration.ofMillis(10)))));
+
+        inputStream.filter(this::myFilter3).window(Windows.tumblingWindow(Duration.ofMillis(100), sumAggregator)
+            .setLateTrigger(Triggers.any(Triggers.count(30000), Triggers.timeSinceFirstMessage(Duration.ofMillis(10)))));
+
+      });
+  }
+
+  JsonMessageEnvelope getInputMessage(InputMessageEnvelope m1) {
+    return (JsonMessageEnvelope) m1.getMessage();
+  }
+
+  boolean myFilter1(JsonMessageEnvelope m1) {
+    // Do user defined processing here
+    return m1.getMessage().parKey.equals("key1");
+  }
+
+  boolean myFilter2(JsonMessageEnvelope m1) {
+    // Do user defined processing here
+    return m1.getMessage().parKey.equals("key2");
+  }
+
+  boolean myFilter3(JsonMessageEnvelope m1) {
+    return m1.getMessage().parKey.equals("key3");
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/4bde68b4/samza-operator/src/test/java/org/apache/samza/example/TestExampleBase.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/example/TestExampleBase.java b/samza-operator/src/test/java/org/apache/samza/example/TestExampleBase.java
new file mode 100644
index 0000000..c4df9d4
--- /dev/null
+++ b/samza-operator/src/test/java/org/apache/samza/example/TestExampleBase.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.example;
+
+import org.apache.samza.operators.StreamGraphBuilder;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamPartition;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Base class for test examples
+ *
+ */
+public abstract class TestExampleBase implements StreamGraphBuilder {
+
+  protected final Map<SystemStream, Set<SystemStreamPartition>> inputs;
+
+  TestExampleBase(Set<SystemStreamPartition> inputs) {
+    this.inputs = new HashMap<>();
+    for (SystemStreamPartition input : inputs) {
+      this.inputs.putIfAbsent(input.getSystemStream(), new HashSet<>());
+      this.inputs.get(input.getSystemStream()).add(input);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/4bde68b4/samza-operator/src/test/java/org/apache/samza/example/TestFluentStreamTasks.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/example/TestFluentStreamTasks.java b/samza-operator/src/test/java/org/apache/samza/example/TestFluentStreamTasks.java
deleted file mode 100644
index 5f659ba..0000000
--- a/samza-operator/src/test/java/org/apache/samza/example/TestFluentStreamTasks.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.example;
-
-import java.lang.reflect.Field;
-import org.apache.samza.Partition;
-import org.apache.samza.config.Config;
-import org.apache.samza.operators.impl.OperatorGraph;
-import org.apache.samza.system.SystemStreamPartition;
-import org.apache.samza.task.StreamOperatorTask;
-import org.apache.samza.task.TaskContext;
-import org.junit.Test;
-
-import java.util.HashSet;
-import java.util.Set;
-
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.*;
-
-
-/**
- * Unit test for {@link StreamOperatorTask}
- */
-public class TestFluentStreamTasks {
-
-  private final Set<SystemStreamPartition> inputPartitions = new HashSet<SystemStreamPartition>() { {
-      for (int i = 0; i < 4; i++) {
-        this.add(new SystemStreamPartition("my-system", String.format("my-topic%d", i), new Partition(i)));
-      }
-    } };
-
-  @Test
-  public void testUserTask() throws Exception {
-    Config mockConfig = mock(Config.class);
-    TaskContext mockContext = mock(TaskContext.class);
-    when(mockContext.getSystemStreamPartitions()).thenReturn(this.inputPartitions);
-    WindowGraph userTask = new WindowGraph(this.inputPartitions);
-    StreamOperatorTask adaptorTask = new StreamOperatorTask(userTask);
-    Field pipelineMapFld = StreamOperatorTask.class.getDeclaredField("operatorGraph");
-    pipelineMapFld.setAccessible(true);
-    OperatorGraph opGraph = (OperatorGraph) pipelineMapFld.get(adaptorTask);
-
-    adaptorTask.init(mockConfig, mockContext);
-    this.inputPartitions.forEach(partition -> {
-        assertNotNull(opGraph.get(partition.getSystemStream()));
-      });
-  }
-
-  @Test
-  public void testSplitTask() throws Exception {
-    Config mockConfig = mock(Config.class);
-    TaskContext mockContext = mock(TaskContext.class);
-    when(mockContext.getSystemStreamPartitions()).thenReturn(this.inputPartitions);
-    BroadcastGraph splitTask = new BroadcastGraph(this.inputPartitions);
-    StreamOperatorTask adaptorTask = new StreamOperatorTask(splitTask);
-    Field pipelineMapFld = StreamOperatorTask.class.getDeclaredField("operatorGraph");
-    pipelineMapFld.setAccessible(true);
-    OperatorGraph opGraph = (OperatorGraph) pipelineMapFld.get(adaptorTask);
-
-    adaptorTask.init(mockConfig, mockContext);
-    this.inputPartitions.forEach(partition -> {
-        assertNotNull(opGraph.get(partition.getSystemStream()));
-      });
-  }
-
-  @Test
-  public void testJoinTask() throws Exception {
-    Config mockConfig = mock(Config.class);
-    TaskContext mockContext = mock(TaskContext.class);
-    when(mockContext.getSystemStreamPartitions()).thenReturn(this.inputPartitions);
-    JoinGraph joinTask = new JoinGraph(this.inputPartitions);
-    StreamOperatorTask adaptorTask = new StreamOperatorTask(joinTask);
-    Field pipelineMapFld = StreamOperatorTask.class.getDeclaredField("operatorGraph");
-    pipelineMapFld.setAccessible(true);
-    OperatorGraph opGraph = (OperatorGraph) pipelineMapFld.get(adaptorTask);
-
-    adaptorTask.init(mockConfig, mockContext);
-    this.inputPartitions.forEach(partition -> {
-        assertNotNull(opGraph.get(partition.getSystemStream()));
-      });
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/4bde68b4/samza-operator/src/test/java/org/apache/samza/example/TestJoinExample.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/example/TestJoinExample.java b/samza-operator/src/test/java/org/apache/samza/example/TestJoinExample.java
new file mode 100644
index 0000000..fe6e7e7
--- /dev/null
+++ b/samza-operator/src/test/java/org/apache/samza/example/TestJoinExample.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.example;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.operators.StreamSpec;
+import org.apache.samza.operators.data.InputMessageEnvelope;
+import org.apache.samza.operators.data.JsonIncomingSystemMessageEnvelope;
+import org.apache.samza.operators.data.Offset;
+import org.apache.samza.operators.functions.JoinFunction;
+import org.apache.samza.serializers.JsonSerde;
+import org.apache.samza.serializers.StringSerde;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamPartition;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+
+
+/**
+ * Example implementation of unique key-based stream-stream join tasks
+ *
+ */
+public class TestJoinExample  extends TestExampleBase {
+
+  TestJoinExample(Set<SystemStreamPartition> inputs) {
+    super(inputs);
+  }
+
+  class MessageType {
+    String joinKey;
+    List<String> joinFields = new ArrayList<>();
+  }
+
+  class JsonMessageEnvelope extends JsonIncomingSystemMessageEnvelope<MessageType> {
+    JsonMessageEnvelope(String key, MessageType data, Offset offset, SystemStreamPartition partition) {
+      super(key, data, offset, partition);
+    }
+  }
+
+  MessageStream<JsonMessageEnvelope> joinOutput = null;
+
+  @Override
+  public void init(StreamGraph graph, Config config) {
+
+    for (SystemStream input : inputs.keySet()) {
+      MessageStream<JsonMessageEnvelope> newSource = graph.<Object, Object, InputMessageEnvelope>createInStream(
+          new StreamSpec() {
+            @Override public SystemStream getSystemStream() {
+              return input;
+            }
+
+            @Override public Properties getProperties() {
+              return null;
+            }
+          }, null, null).map(this::getInputMessage);
+      if (joinOutput == null) {
+        joinOutput = newSource;
+      } else {
+        joinOutput = joinOutput.join(newSource, new MyJoinFunction());
+      }
+    }
+
+    joinOutput.sendTo(graph.createOutStream(new StreamSpec() {
+      @Override public SystemStream getSystemStream() {
+        return null;
+      }
+
+      @Override public Properties getProperties() {
+        return null;
+      }
+    }, new StringSerde("UTF-8"), new JsonSerde<>()));
+
+  }
+
+  private JsonMessageEnvelope getInputMessage(InputMessageEnvelope ism) {
+    return new JsonMessageEnvelope(
+        ((MessageType) ism.getMessage()).joinKey,
+        (MessageType) ism.getMessage(),
+        ism.getOffset(),
+        ism.getSystemStreamPartition());
+  }
+
+  class MyJoinFunction implements JoinFunction<String, JsonMessageEnvelope, JsonMessageEnvelope, JsonMessageEnvelope> {
+    JsonMessageEnvelope myJoinResult(JsonMessageEnvelope m1, JsonMessageEnvelope m2) {
+      MessageType newJoinMsg = new MessageType();
+      newJoinMsg.joinKey = m1.getKey();
+      newJoinMsg.joinFields.addAll(m1.getMessage().joinFields);
+      newJoinMsg.joinFields.addAll(m2.getMessage().joinFields);
+      return new JsonMessageEnvelope(m1.getMessage().joinKey, newJoinMsg, null, null);
+    }
+
+    @Override
+    public JsonMessageEnvelope apply(JsonMessageEnvelope message, JsonMessageEnvelope otherMessage) {
+      return this.myJoinResult(message, otherMessage);
+    }
+
+    @Override
+    public String getFirstKey(JsonMessageEnvelope message) {
+      return message.getKey();
+    }
+
+    @Override
+    public String getSecondKey(JsonMessageEnvelope message) {
+      return message.getKey();
+    }
+  }
+}