You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2017/02/16 22:56:56 UTC

[4/5] samza git commit: SAMZA-1073: Remove operator module. Move all classes into samza-core

http://git-wip-us.apache.org/repos/asf/samza/blob/2c7309cf/samza-core/src/main/java/org/apache/samza/system/StandaloneExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/system/StandaloneExecutionEnvironment.java b/samza-core/src/main/java/org/apache/samza/system/StandaloneExecutionEnvironment.java
new file mode 100644
index 0000000..f0f6ef2
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/system/StandaloneExecutionEnvironment.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system;
+
+import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.operators.StreamGraphBuilder;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.StreamGraphImpl;
+
+
+/**
+ * This class implements the {@link ExecutionEnvironment} that runs the applications in standalone environment
+ */
+public class StandaloneExecutionEnvironment implements ExecutionEnvironment {
+
+  // TODO: may want to move this to a common base class for all {@link ExecutionEnvironment}
+  StreamGraph createGraph(StreamGraphBuilder app, Config config) {
+    StreamGraphImpl graph = new StreamGraphImpl();
+    app.init(graph, config);
+    return graph;
+  }
+
+  @Override public void run(StreamGraphBuilder app, Config config) {
+    // 1. get logic graph for optimization
+    // StreamGraph logicGraph = this.createGraph(app, config);
+    // 2. potential optimization....
+    // 3. create new instance of StreamGraphBuilder that would generate the optimized graph
+    // 4. create all input/output/intermediate topics
+    // 5. create the configuration for StreamProcessor
+    // 6. start the StreamProcessor w/ optimized instance of StreamGraphBuilder
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2c7309cf/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java b/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
new file mode 100644
index 0000000..b007e3c
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.task;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.ContextManager;
+import org.apache.samza.operators.MessageStreamImpl;
+import org.apache.samza.operators.StreamGraphBuilder;
+import org.apache.samza.operators.StreamGraphImpl;
+import org.apache.samza.operators.data.InputMessageEnvelope;
+import org.apache.samza.operators.impl.OperatorGraph;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.SystemStream;
+
+import java.util.HashMap;
+import java.util.Map;
+
+
+/**
+ * Execution of the logic sub-DAG
+ *
+ *
+ * An {@link StreamTask} implementation that receives {@link InputMessageEnvelope}s and propagates them
+ * through the user's stream transformations defined in {@link StreamGraphImpl} using the
+ * {@link org.apache.samza.operators.MessageStream} APIs.
+ * <p>
+ * This class brings all the operator API implementation components together and feeds the
+ * {@link InputMessageEnvelope}s into the transformation chains.
+ * <p>
+ * It accepts an instance of the user implemented factory {@link StreamGraphBuilder} as input parameter of the constructor.
+ * When its own {@link #init(Config, TaskContext)} method is called during startup, it instantiate a user-defined {@link StreamGraphImpl}
+ * from the {@link StreamGraphBuilder}, calls {@link StreamGraphImpl#getContextManager()} to initialize the task-wide context
+ * for the graph, and creates a {@link MessageStreamImpl} corresponding to each of its input
+ * {@link org.apache.samza.system.SystemStreamPartition}s. Each input {@link MessageStreamImpl}
+ * will be corresponding to either an input stream or intermediate stream in {@link StreamGraphImpl}.
+ * <p>
+ * Then, this task calls {@link org.apache.samza.operators.impl.OperatorGraph#init(Map, Config, TaskContext)} for each of the input
+ * {@link MessageStreamImpl}. This instantiates the {@link org.apache.samza.operators.impl.OperatorImpl} DAG
+ * corresponding to the aforementioned {@link org.apache.samza.operators.spec.OperatorSpec} DAG and returns the
+ * root node of the DAG, which this class saves.
+ * <p>
+ * Now that it has the root for the DAG corresponding to each {@link org.apache.samza.system.SystemStreamPartition}, it
+ * can pass the message envelopes received in {@link StreamTask#process(IncomingMessageEnvelope, MessageCollector, TaskCoordinator)}
+ * along to the appropriate root nodes. From then on, each {@link org.apache.samza.operators.impl.OperatorImpl} propagates
+ * its transformed output to the next set of {@link org.apache.samza.operators.impl.OperatorImpl}s.
+ */
+public final class StreamOperatorTask implements StreamTask, InitableTask, WindowableTask, ClosableTask {
+
+  /**
+   * A mapping from each {@link SystemStream} to the root node of its operator chain DAG.
+   */
+  private final OperatorGraph operatorGraph = new OperatorGraph();
+
+  private final StreamGraphBuilder graphBuilder;
+
+  private ContextManager contextManager;
+
+  public StreamOperatorTask(StreamGraphBuilder graphBuilder) {
+    this.graphBuilder = graphBuilder;
+  }
+
+  @Override
+  public final void init(Config config, TaskContext context) throws Exception {
+    // create the MessageStreamsImpl object and initialize app-specific logic DAG within the task
+    StreamGraphImpl streams = new StreamGraphImpl();
+    this.graphBuilder.init(streams, config);
+    // get the context manager of the {@link StreamGraph} and initialize the task-specific context
+    this.contextManager = streams.getContextManager();
+
+    Map<SystemStream, MessageStreamImpl> inputBySystemStream = new HashMap<>();
+    context.getSystemStreamPartitions().forEach(ssp -> {
+        if (!inputBySystemStream.containsKey(ssp.getSystemStream())) {
+          // create mapping from the physical input {@link SystemStream} to the logic {@link MessageStream}
+          inputBySystemStream.putIfAbsent(ssp.getSystemStream(), streams.getInputStream(ssp.getSystemStream()));
+        }
+      });
+    operatorGraph.init(inputBySystemStream, config, this.contextManager.initTaskContext(config, context));
+  }
+
+  @Override
+  public final void process(IncomingMessageEnvelope ime, MessageCollector collector, TaskCoordinator coordinator) {
+    this.operatorGraph.get(ime.getSystemStreamPartition().getSystemStream())
+        .onNext(new InputMessageEnvelope(ime), collector, coordinator);
+  }
+
+  @Override
+  public final void window(MessageCollector collector, TaskCoordinator coordinator) throws Exception {
+    // TODO: invoke timer based triggers
+  }
+
+  @Override
+  public void close() throws Exception {
+    this.contextManager.finalizeTaskContext();
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2c7309cf/samza-core/src/test/java/org/apache/samza/example/KeyValueStoreExample.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/KeyValueStoreExample.java b/samza-core/src/test/java/org/apache/samza/example/KeyValueStoreExample.java
new file mode 100644
index 0000000..85ebc6c
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/example/KeyValueStoreExample.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.example;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.samza.operators.*;
+import org.apache.samza.operators.StreamGraphBuilder;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.data.MessageEnvelope;
+import org.apache.samza.operators.functions.FlatMapFunction;
+import org.apache.samza.serializers.JsonSerde;
+import org.apache.samza.serializers.StringSerde;
+import org.apache.samza.storage.kv.KeyValueStore;
+import org.apache.samza.system.ExecutionEnvironment;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.task.TaskContext;
+import org.apache.samza.util.CommandLine;
+
+import java.util.Properties;
+
+
+/**
+ * Example code using {@link KeyValueStore} to implement event-time window
+ */
+public class KeyValueStoreExample implements StreamGraphBuilder {
+
+  /**
+   * used by remote execution environment to launch the job in remote program. The remote program should follow the similar
+   * invoking context as in standalone:
+   *
+   *   public static void main(String args[]) throws Exception {
+   *     CommandLine cmdLine = new CommandLine();
+   *     Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
+   *     ExecutionEnvironment remoteEnv = ExecutionEnvironment.getRemoteEnvironment(config);
+   *     UserMainExample runnableApp = new UserMainExample();
+   *     runnableApp.run(remoteEnv, config);
+   *   }
+   *
+   */
+  @Override public void init(StreamGraph graph, Config config) {
+
+    MessageStream<PageViewEvent> pageViewEvents = graph.createInStream(input1, new StringSerde("UTF-8"), new JsonSerde<>());
+    OutputStream<StatsOutput> pageViewPerMemberCounters = graph.createOutStream(output, new StringSerde("UTF-8"), new JsonSerde<StatsOutput>());
+
+    pageViewEvents.
+        partitionBy(m -> m.getMessage().memberId).
+        flatMap(new MyStatsCounter()).
+        sendTo(pageViewPerMemberCounters);
+
+  }
+
+  // standalone local program model
+  public static void main(String[] args) throws Exception {
+    CommandLine cmdLine = new CommandLine();
+    Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
+    ExecutionEnvironment standaloneEnv = ExecutionEnvironment.getLocalEnvironment(config);
+    standaloneEnv.run(new KeyValueStoreExample(), config);
+  }
+
+  class MyStatsCounter implements FlatMapFunction<PageViewEvent, StatsOutput> {
+    private final int timeoutMs = 10 * 60 * 1000;
+
+    KeyValueStore<String, StatsWindowState> statsStore;
+
+    class StatsWindowState {
+      int lastCount = 0;
+      long timeAtLastOutput = 0;
+      int newCount = 0;
+    }
+
+    @Override
+    public Collection<StatsOutput> apply(PageViewEvent message) {
+      List<StatsOutput> outputStats = new ArrayList<>();
+      long wndTimestamp = (long) Math.floor(TimeUnit.MILLISECONDS.toMinutes(message.getMessage().timestamp) / 5) * 5;
+      String wndKey = String.format("%s-%d", message.getMessage().memberId, wndTimestamp);
+      StatsWindowState curState = this.statsStore.get(wndKey);
+      curState.newCount++;
+      long curTimeMs = System.currentTimeMillis();
+      if (curState.newCount > 0 && curState.timeAtLastOutput + timeoutMs < curTimeMs) {
+        curState.timeAtLastOutput = curTimeMs;
+        curState.lastCount += curState.newCount;
+        curState.newCount = 0;
+        outputStats.add(new StatsOutput(message.getMessage().memberId, wndTimestamp, curState.lastCount));
+      }
+      // update counter w/o generating output
+      this.statsStore.put(wndKey, curState);
+      return outputStats;
+    }
+
+    @Override
+    public void init(Config config, TaskContext context) {
+      this.statsStore = (KeyValueStore<String, StatsWindowState>) context.getStore("my-stats-wnd-store");
+    }
+  }
+
+  StreamSpec input1 = new StreamSpec() {
+    @Override public SystemStream getSystemStream() {
+      return new SystemStream("kafka", "PageViewEvent");
+    }
+
+    @Override public Properties getProperties() {
+      return null;
+    }
+  };
+
+  StreamSpec output = new StreamSpec() {
+    @Override public SystemStream getSystemStream() {
+      return new SystemStream("kafka", "PageViewPerMember5min");
+    }
+
+    @Override public Properties getProperties() {
+      return null;
+    }
+  };
+
+  class PageViewEvent implements MessageEnvelope<String, PageViewEvent> {
+    String pageId;
+    String memberId;
+    long timestamp;
+
+    PageViewEvent(String pageId, String memberId, long timestamp) {
+      this.pageId = pageId;
+      this.memberId = memberId;
+      this.timestamp = timestamp;
+    }
+
+    @Override
+    public String getKey() {
+      return this.pageId;
+    }
+
+    @Override
+    public PageViewEvent getMessage() {
+      return this;
+    }
+  }
+
+  class StatsOutput implements MessageEnvelope<String, StatsOutput> {
+    private String memberId;
+    private long timestamp;
+    private Integer count;
+
+    StatsOutput(String key, long timestamp, Integer count) {
+      this.memberId = key;
+      this.timestamp = timestamp;
+      this.count = count;
+    }
+
+    @Override
+    public String getKey() {
+      return this.memberId;
+    }
+
+    @Override
+    public StatsOutput getMessage() {
+      return this;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2c7309cf/samza-core/src/test/java/org/apache/samza/example/NoContextStreamExample.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/NoContextStreamExample.java b/samza-core/src/test/java/org/apache/samza/example/NoContextStreamExample.java
new file mode 100644
index 0000000..c6d2e6e
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/example/NoContextStreamExample.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.example;
+
+import org.apache.samza.operators.*;
+import org.apache.samza.operators.StreamGraphBuilder;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.data.InputMessageEnvelope;
+import org.apache.samza.operators.data.JsonIncomingSystemMessageEnvelope;
+import org.apache.samza.operators.data.Offset;
+import org.apache.samza.operators.functions.JoinFunction;
+import org.apache.samza.serializers.JsonSerde;
+import org.apache.samza.serializers.StringSerde;
+import org.apache.samza.system.ExecutionEnvironment;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.CommandLine;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+
+/**
+ * Example {@link StreamGraphBuilder} code to test the API methods
+ */
+public class NoContextStreamExample implements StreamGraphBuilder {
+
+  StreamSpec input1 = new StreamSpec() {
+    @Override public SystemStream getSystemStream() {
+      return new SystemStream("kafka", "input1");
+    }
+
+    @Override public Properties getProperties() {
+      return null;
+    }
+  };
+
+  StreamSpec input2 = new StreamSpec() {
+    @Override public SystemStream getSystemStream() {
+      return new SystemStream("kafka", "input2");
+    }
+
+    @Override public Properties getProperties() {
+      return null;
+    }
+  };
+
+  StreamSpec output = new StreamSpec() {
+    @Override public SystemStream getSystemStream() {
+      return new SystemStream("kafka", "output");
+    }
+
+    @Override public Properties getProperties() {
+      return null;
+    }
+  };
+
+  class MessageType {
+    String joinKey;
+    List<String> joinFields = new ArrayList<>();
+  }
+
+  class JsonMessageEnvelope extends JsonIncomingSystemMessageEnvelope<MessageType> {
+    JsonMessageEnvelope(String key, MessageType data, Offset offset, SystemStreamPartition partition) {
+      super(key, data, offset, partition);
+    }
+  }
+
+  private JsonMessageEnvelope getInputMessage(InputMessageEnvelope ism) {
+    return new JsonMessageEnvelope(
+        ((MessageType) ism.getMessage()).joinKey,
+        (MessageType) ism.getMessage(),
+        ism.getOffset(),
+        ism.getSystemStreamPartition());
+  }
+
+  class MyJoinFunction implements JoinFunction<String, JsonMessageEnvelope, JsonMessageEnvelope, JsonIncomingSystemMessageEnvelope<MessageType>> {
+
+    @Override
+    public JsonIncomingSystemMessageEnvelope<MessageType> apply(JsonMessageEnvelope m1,
+        JsonMessageEnvelope m2) {
+      MessageType newJoinMsg = new MessageType();
+      newJoinMsg.joinKey = m1.getKey();
+      newJoinMsg.joinFields.addAll(m1.getMessage().joinFields);
+      newJoinMsg.joinFields.addAll(m2.getMessage().joinFields);
+      return new JsonMessageEnvelope(m1.getMessage().joinKey, newJoinMsg, null, null);
+    }
+
+    @Override
+    public String getFirstKey(JsonMessageEnvelope message) {
+      return message.getKey();
+    }
+
+    @Override
+    public String getSecondKey(JsonMessageEnvelope message) {
+      return message.getKey();
+    }
+  }
+
+  /**
+   * used by remote execution environment to launch the job in remote program. The remote program should follow the similar
+   * invoking context as in standalone:
+   *
+   *   public static void main(String args[]) throws Exception {
+   *     CommandLine cmdLine = new CommandLine();
+   *     Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
+   *     ExecutionEnvironment remoteEnv = ExecutionEnvironment.fromConfig(config);
+   *     remoteEnv.run(new NoContextStreamExample(), config);
+   *   }
+   *
+   */
+  @Override public void init(StreamGraph graph, Config config) {
+    MessageStream<InputMessageEnvelope> inputSource1 = graph.<Object, Object, InputMessageEnvelope>createInStream(
+        input1, null, null);
+    MessageStream<InputMessageEnvelope> inputSource2 = graph.<Object, Object, InputMessageEnvelope>createInStream(
+        input2, null, null);
+    OutputStream<JsonIncomingSystemMessageEnvelope<MessageType>> outStream = graph.createOutStream(output,
+        new StringSerde("UTF-8"), new JsonSerde<>());
+
+    inputSource1.map(this::getInputMessage).
+        join(inputSource2.map(this::getInputMessage), new MyJoinFunction()).
+        sendTo(outStream);
+
+  }
+
+  // standalone local program model
+  public static void main(String[] args) throws Exception {
+    CommandLine cmdLine = new CommandLine();
+    Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
+    ExecutionEnvironment standaloneEnv = ExecutionEnvironment.getLocalEnvironment(config);
+    standaloneEnv.run(new NoContextStreamExample(), config);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2c7309cf/samza-core/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java b/samza-core/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java
new file mode 100644
index 0000000..0477066
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.example;
+
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.OutputStream;
+import org.apache.samza.operators.StreamGraphBuilder;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.operators.StreamSpec;
+import org.apache.samza.operators.data.MessageEnvelope;
+import org.apache.samza.operators.functions.JoinFunction;
+import org.apache.samza.serializers.JsonSerde;
+import org.apache.samza.serializers.StringSerde;
+import org.apache.samza.system.ExecutionEnvironment;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.util.CommandLine;
+
+import java.util.Properties;
+
+
+/**
+ * Simple 2-way stream-to-stream join example
+ */
+public class OrderShipmentJoinExample implements StreamGraphBuilder {
+
+  /**
+   * used by remote execution environment to launch the job in remote program. The remote program should follow the similar
+   * invoking context as in standalone:
+   *
+   *   public static void main(String args[]) throws Exception {
+   *     CommandLine cmdLine = new CommandLine();
+   *     Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
+   *     ExecutionEnvironment remoteEnv = ExecutionEnvironment.getRemoteEnvironment(config);
+   *     UserMainExample runnableApp = new UserMainExample();
+   *     runnableApp.run(remoteEnv, config);
+   *   }
+   *
+   */
+  @Override public void init(StreamGraph graph, Config config) {
+
+    MessageStream<OrderRecord> orders = graph.createInStream(input1, new StringSerde("UTF-8"), new JsonSerde<>());
+    MessageStream<ShipmentRecord> shipments = graph.createInStream(input2, new StringSerde("UTF-8"), new JsonSerde<>());
+    OutputStream<FulFilledOrderRecord> fulfilledOrders = graph.createOutStream(output, new StringSerde("UTF-8"), new JsonSerde<>());
+
+    orders.join(shipments, new MyJoinFunction()).sendTo(fulfilledOrders);
+
+  }
+
+  // standalone local program model
+  public static void main(String[] args) throws Exception {
+    CommandLine cmdLine = new CommandLine();
+    Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
+    ExecutionEnvironment standaloneEnv = ExecutionEnvironment.getLocalEnvironment(config);
+    standaloneEnv.run(new OrderShipmentJoinExample(), config);
+  }
+
+  StreamSpec input1 = new StreamSpec() {
+    @Override public SystemStream getSystemStream() {
+      return new SystemStream("kafka", "Orders");
+    }
+
+    @Override public Properties getProperties() {
+      return null;
+    }
+  };
+
+  StreamSpec input2 = new StreamSpec() {
+    @Override public SystemStream getSystemStream() {
+      return new SystemStream("kafka", "Shipment");
+    }
+
+    @Override public Properties getProperties() {
+      return null;
+    }
+  };
+
+  StreamSpec output = new StreamSpec() {
+    @Override public SystemStream getSystemStream() {
+      return new SystemStream("kafka", "FulfilledOrders");
+    }
+
+    @Override public Properties getProperties() {
+      return null;
+    }
+  };
+
+  class OrderRecord implements MessageEnvelope<String, OrderRecord> {
+    String orderId;
+    long orderTimeMs;
+
+    OrderRecord(String orderId, long timeMs) {
+      this.orderId = orderId;
+      this.orderTimeMs = timeMs;
+    }
+
+    @Override
+    public String getKey() {
+      return this.orderId;
+    }
+
+    @Override
+    public OrderRecord getMessage() {
+      return this;
+    }
+  }
+
+  class ShipmentRecord implements MessageEnvelope<String, ShipmentRecord> {
+    String orderId;
+    long shipTimeMs;
+
+    ShipmentRecord(String orderId, long timeMs) {
+      this.orderId = orderId;
+      this.shipTimeMs = timeMs;
+    }
+
+    @Override
+    public String getKey() {
+      return this.orderId;
+    }
+
+    @Override
+    public ShipmentRecord getMessage() {
+      return this;
+    }
+  }
+
+  class FulFilledOrderRecord implements MessageEnvelope<String, FulFilledOrderRecord> {
+    String orderId;
+    long orderTimeMs;
+    long shipTimeMs;
+
+    FulFilledOrderRecord(String orderId, long orderTimeMs, long shipTimeMs) {
+      this.orderId = orderId;
+      this.orderTimeMs = orderTimeMs;
+      this.shipTimeMs = shipTimeMs;
+    }
+
+
+    @Override
+    public String getKey() {
+      return this.orderId;
+    }
+
+    @Override
+    public FulFilledOrderRecord getMessage() {
+      return this;
+    }
+  }
+
+  FulFilledOrderRecord myJoinResult(OrderRecord m1, ShipmentRecord m2) {
+    return new FulFilledOrderRecord(m1.getKey(), m1.orderTimeMs, m2.shipTimeMs);
+  }
+
+  class MyJoinFunction implements JoinFunction<String, OrderRecord, ShipmentRecord, FulFilledOrderRecord> {
+
+    @Override
+    public FulFilledOrderRecord apply(OrderRecord message, ShipmentRecord otherMessage) {
+      return OrderShipmentJoinExample.this.myJoinResult(message, otherMessage);
+    }
+
+    @Override
+    public String getFirstKey(OrderRecord message) {
+      return message.getKey();
+    }
+
+    @Override
+    public String getSecondKey(ShipmentRecord message) {
+      return message.getKey();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2c7309cf/samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java b/samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java
new file mode 100644
index 0000000..f7d8bda
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.example;
+
+import org.apache.samza.operators.*;
+import org.apache.samza.operators.StreamGraphBuilder;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.data.MessageEnvelope;
+import org.apache.samza.operators.triggers.Triggers;
+import org.apache.samza.operators.windows.AccumulationMode;
+import org.apache.samza.operators.windows.WindowPane;
+import org.apache.samza.operators.windows.Windows;
+import org.apache.samza.serializers.JsonSerde;
+import org.apache.samza.serializers.StringSerde;
+import org.apache.samza.system.ExecutionEnvironment;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.util.CommandLine;
+
+import java.time.Duration;
+import java.util.Properties;
+
+
+/**
+ * Example code to implement window-based counter
+ */
+public class PageViewCounterExample implements StreamGraphBuilder {
+
+  @Override public void init(StreamGraph graph, Config config) {
+
+    MessageStream<PageViewEvent> pageViewEvents = graph.createInStream(input1, new StringSerde("UTF-8"), new JsonSerde<>());
+    OutputStream<MyStreamOutput> pageViewPerMemberCounters = graph.createOutStream(output, new StringSerde("UTF-8"), new JsonSerde<>());
+
+    pageViewEvents.
+        window(Windows.<PageViewEvent, String, Integer>keyedTumblingWindow(m -> m.getMessage().memberId, Duration.ofSeconds(10), (m, c) -> c + 1).
+            setEarlyTrigger(Triggers.repeat(Triggers.count(5))).
+            setAccumulationMode(AccumulationMode.DISCARDING)).
+        map(MyStreamOutput::new).
+        sendTo(pageViewPerMemberCounters);
+
+  }
+
+  public static void main(String[] args) {
+    CommandLine cmdLine = new CommandLine();
+    Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
+    ExecutionEnvironment standaloneEnv = ExecutionEnvironment.getLocalEnvironment(config);
+    standaloneEnv.run(new PageViewCounterExample(), config);
+  }
+
+  StreamSpec input1 = new StreamSpec() {
+    @Override public SystemStream getSystemStream() {
+      return new SystemStream("kafka", "PageViewEvent");
+    }
+
+    @Override public Properties getProperties() {
+      return null;
+    }
+  };
+
+  StreamSpec output = new StreamSpec() {
+    @Override public SystemStream getSystemStream() {
+      return new SystemStream("kafka", "PageViewPerMember5min");
+    }
+
+    @Override public Properties getProperties() {
+      return null;
+    }
+  };
+
+  class PageViewEvent implements MessageEnvelope<String, PageViewEvent> {
+    String pageId;
+    String memberId;
+    long timestamp;
+
+    PageViewEvent(String pageId, String memberId, long timestamp) {
+      this.pageId = pageId;
+      this.memberId = memberId;
+      this.timestamp = timestamp;
+    }
+
+    @Override
+    public String getKey() {
+      return this.pageId;
+    }
+
+    @Override
+    public PageViewEvent getMessage() {
+      return this;
+    }
+  }
+
+  class MyStreamOutput implements MessageEnvelope<String, MyStreamOutput> {
+    String memberId;
+    long timestamp;
+    int count;
+
+    MyStreamOutput(WindowPane<String, Integer> m) {
+      this.memberId = m.getKey().getKey();
+      this.timestamp = Long.valueOf(m.getKey().getPaneId());
+      this.count = m.getMessage();
+    }
+
+    @Override
+    public String getKey() {
+      return this.memberId;
+    }
+
+    @Override
+    public MyStreamOutput getMessage() {
+      return this;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2c7309cf/samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java b/samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java
new file mode 100644
index 0000000..6994ac4
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.example;
+
+import org.apache.samza.operators.*;
+import org.apache.samza.operators.StreamGraphBuilder;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.data.MessageEnvelope;
+import org.apache.samza.operators.windows.WindowPane;
+import org.apache.samza.operators.windows.Windows;
+import org.apache.samza.serializers.JsonSerde;
+import org.apache.samza.serializers.StringSerde;
+import org.apache.samza.system.ExecutionEnvironment;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.util.CommandLine;
+
+import java.time.Duration;
+import java.util.*;
+
+
+/**
+ * Example {@link StreamGraphBuilder} code to test the API methods with re-partition operator
+ */
+public class RepartitionExample implements StreamGraphBuilder {
+
+  /**
+   * used by remote execution environment to launch the job in remote program. The remote program should follow the similar
+   * invoking context as in standalone:
+   *
+   *   public static void main(String args[]) throws Exception {
+   *     CommandLine cmdLine = new CommandLine();
+   *     Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
+   *     ExecutionEnvironment remoteEnv = ExecutionEnvironment.getRemoteEnvironment(config);
+   *     remoteEnv.run(new UserMainExample(), config);
+   *   }
+   *
+   */
+  @Override public void init(StreamGraph graph, Config config) {
+
+    MessageStream<PageViewEvent> pageViewEvents = graph.createInStream(input1, new StringSerde("UTF-8"), new JsonSerde<>());
+    OutputStream<MyStreamOutput> pageViewPerMemberCounters = graph.createOutStream(output, new StringSerde("UTF-8"), new JsonSerde<>());
+
+    pageViewEvents.
+        partitionBy(m -> m.getMessage().memberId).
+        window(Windows.<PageViewEvent, String, Integer>keyedTumblingWindow(
+            msg -> msg.getMessage().memberId, Duration.ofMinutes(5), (m, c) -> c + 1)).
+        map(MyStreamOutput::new).
+        sendTo(pageViewPerMemberCounters);
+
+  }
+
+  // standalone local program model
+  public static void main(String[] args) throws Exception {
+    CommandLine cmdLine = new CommandLine();
+    Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
+    ExecutionEnvironment standaloneEnv = ExecutionEnvironment.getLocalEnvironment(config);
+    standaloneEnv.run(new RepartitionExample(), config);
+  }
+
+  StreamSpec input1 = new StreamSpec() {
+    @Override public SystemStream getSystemStream() {
+      return new SystemStream("kafka", "PageViewEvent");
+    }
+
+    @Override public Properties getProperties() {
+      return null;
+    }
+  };
+
+  StreamSpec output = new StreamSpec() {
+    @Override public SystemStream getSystemStream() {
+      return new SystemStream("kafka", "PageViewPerMember5min");
+    }
+
+    @Override public Properties getProperties() {
+      return null;
+    }
+  };
+
+  class PageViewEvent implements MessageEnvelope<String, PageViewEvent> {
+    String pageId;
+    String memberId;
+    long timestamp;
+
+    PageViewEvent(String pageId, String memberId, long timestamp) {
+      this.pageId = pageId;
+      this.memberId = memberId;
+      this.timestamp = timestamp;
+    }
+
+    @Override
+    public String getKey() {
+      return this.pageId;
+    }
+
+    @Override
+    public PageViewEvent getMessage() {
+      return this;
+    }
+  }
+
+  class MyStreamOutput implements MessageEnvelope<String, MyStreamOutput> {
+    String memberId;
+    long timestamp;
+    int count;
+
+    MyStreamOutput(WindowPane<String, Integer> m) {
+      this.memberId = m.getKey().getKey();
+      this.timestamp = Long.valueOf(m.getKey().getPaneId());
+      this.count = m.getMessage();
+    }
+
+    @Override
+    public String getKey() {
+      return this.memberId;
+    }
+
+    @Override
+    public MyStreamOutput getMessage() {
+      return this;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2c7309cf/samza-core/src/test/java/org/apache/samza/example/TestBasicStreamGraphs.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/TestBasicStreamGraphs.java b/samza-core/src/test/java/org/apache/samza/example/TestBasicStreamGraphs.java
new file mode 100644
index 0000000..8ecd44f
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/example/TestBasicStreamGraphs.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.example;
+
+import java.lang.reflect.Field;
+import org.apache.samza.Partition;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.impl.OperatorGraph;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.task.StreamOperatorTask;
+import org.apache.samza.task.TaskContext;
+import org.junit.Test;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+
+/**
+ * Unit test for {@link StreamOperatorTask}
+ */
+public class TestBasicStreamGraphs {
+
+  private final Set<SystemStreamPartition> inputPartitions = new HashSet<SystemStreamPartition>() { {
+      for (int i = 0; i < 4; i++) {
+        this.add(new SystemStreamPartition("my-system", String.format("my-topic%d", i), new Partition(i)));
+      }
+    } };
+
+  @Test
+  public void testUserTask() throws Exception {
+    Config mockConfig = mock(Config.class);
+    TaskContext mockContext = mock(TaskContext.class);
+    when(mockContext.getSystemStreamPartitions()).thenReturn(this.inputPartitions);
+    TestWindowExample userTask = new TestWindowExample(this.inputPartitions);
+    StreamOperatorTask adaptorTask = new StreamOperatorTask(userTask);
+    Field pipelineMapFld = StreamOperatorTask.class.getDeclaredField("operatorGraph");
+    pipelineMapFld.setAccessible(true);
+    OperatorGraph opGraph = (OperatorGraph) pipelineMapFld.get(adaptorTask);
+
+    adaptorTask.init(mockConfig, mockContext);
+    this.inputPartitions.forEach(partition -> {
+        assertNotNull(opGraph.get(partition.getSystemStream()));
+      });
+  }
+
+  @Test
+  public void testSplitTask() throws Exception {
+    Config mockConfig = mock(Config.class);
+    TaskContext mockContext = mock(TaskContext.class);
+    when(mockContext.getSystemStreamPartitions()).thenReturn(this.inputPartitions);
+    TestBroadcastExample splitTask = new TestBroadcastExample(this.inputPartitions);
+    StreamOperatorTask adaptorTask = new StreamOperatorTask(splitTask);
+    Field pipelineMapFld = StreamOperatorTask.class.getDeclaredField("operatorGraph");
+    pipelineMapFld.setAccessible(true);
+    OperatorGraph opGraph = (OperatorGraph) pipelineMapFld.get(adaptorTask);
+
+    adaptorTask.init(mockConfig, mockContext);
+    this.inputPartitions.forEach(partition -> {
+        assertNotNull(opGraph.get(partition.getSystemStream()));
+      });
+  }
+
+  @Test
+  public void testJoinTask() throws Exception {
+    Config mockConfig = mock(Config.class);
+    TaskContext mockContext = mock(TaskContext.class);
+    when(mockContext.getSystemStreamPartitions()).thenReturn(this.inputPartitions);
+    TestJoinExample joinTask = new TestJoinExample(this.inputPartitions);
+    StreamOperatorTask adaptorTask = new StreamOperatorTask(joinTask);
+    Field pipelineMapFld = StreamOperatorTask.class.getDeclaredField("operatorGraph");
+    pipelineMapFld.setAccessible(true);
+    OperatorGraph opGraph = (OperatorGraph) pipelineMapFld.get(adaptorTask);
+
+    adaptorTask.init(mockConfig, mockContext);
+    this.inputPartitions.forEach(partition -> {
+        assertNotNull(opGraph.get(partition.getSystemStream()));
+      });
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2c7309cf/samza-core/src/test/java/org/apache/samza/example/TestBroadcastExample.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/TestBroadcastExample.java b/samza-core/src/test/java/org/apache/samza/example/TestBroadcastExample.java
new file mode 100644
index 0000000..d22324b
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/example/TestBroadcastExample.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.example;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.operators.StreamSpec;
+import org.apache.samza.operators.data.InputMessageEnvelope;
+import org.apache.samza.operators.data.JsonIncomingSystemMessageEnvelope;
+import org.apache.samza.operators.data.Offset;
+import org.apache.samza.operators.triggers.Triggers;
+import org.apache.samza.operators.windows.Windows;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamPartition;
+
+import java.time.Duration;
+import java.util.function.BiFunction;
+import java.util.Properties;
+import java.util.Set;
+
+
+/**
+ * Example implementation of split stream tasks
+ *
+ */
+public class TestBroadcastExample extends TestExampleBase {
+
+  TestBroadcastExample(Set<SystemStreamPartition> inputs) {
+    super(inputs);
+  }
+
+  class MessageType {
+    String field1;
+    String field2;
+    String field3;
+    String field4;
+    String parKey;
+    private long timestamp;
+
+    public long getTimestamp() {
+      return this.timestamp;
+    }
+  }
+
+  class JsonMessageEnvelope extends JsonIncomingSystemMessageEnvelope<MessageType> {
+    JsonMessageEnvelope(String key, MessageType data, Offset offset, SystemStreamPartition partition) {
+      super(key, data, offset, partition);
+    }
+  }
+
+  @Override
+  public void init(StreamGraph graph, Config config) {
+    BiFunction<JsonMessageEnvelope, Integer, Integer> sumAggregator = (m, c) -> c + 1;
+    inputs.keySet().forEach(entry -> {
+        MessageStream<JsonMessageEnvelope> inputStream = graph.<Object, Object, InputMessageEnvelope>createInStream(new StreamSpec() {
+          @Override public SystemStream getSystemStream() {
+            return entry;
+          }
+
+          @Override public Properties getProperties() {
+            return null;
+          }
+        }, null, null).map(this::getInputMessage);
+
+        inputStream.filter(this::myFilter1).window(Windows.tumblingWindow(Duration.ofMillis(100), sumAggregator)
+            .setLateTrigger(Triggers.any(Triggers.count(30000), Triggers.timeSinceFirstMessage(Duration.ofMillis(10)))));
+
+        inputStream.filter(this::myFilter2).window(Windows.tumblingWindow(Duration.ofMillis(100), sumAggregator)
+            .setLateTrigger(Triggers.any(Triggers.count(30000), Triggers.timeSinceFirstMessage(Duration.ofMillis(10)))));
+
+        inputStream.filter(this::myFilter3).window(Windows.tumblingWindow(Duration.ofMillis(100), sumAggregator)
+            .setLateTrigger(Triggers.any(Triggers.count(30000), Triggers.timeSinceFirstMessage(Duration.ofMillis(10)))));
+
+      });
+  }
+
+  JsonMessageEnvelope getInputMessage(InputMessageEnvelope m1) {
+    return (JsonMessageEnvelope) m1.getMessage();
+  }
+
+  boolean myFilter1(JsonMessageEnvelope m1) {
+    // Do user defined processing here
+    return m1.getMessage().parKey.equals("key1");
+  }
+
+  boolean myFilter2(JsonMessageEnvelope m1) {
+    // Do user defined processing here
+    return m1.getMessage().parKey.equals("key2");
+  }
+
+  boolean myFilter3(JsonMessageEnvelope m1) {
+    return m1.getMessage().parKey.equals("key3");
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2c7309cf/samza-core/src/test/java/org/apache/samza/example/TestExampleBase.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/TestExampleBase.java b/samza-core/src/test/java/org/apache/samza/example/TestExampleBase.java
new file mode 100644
index 0000000..c4df9d4
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/example/TestExampleBase.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.example;
+
+import org.apache.samza.operators.StreamGraphBuilder;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamPartition;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Base class for test examples
+ *
+ */
+public abstract class TestExampleBase implements StreamGraphBuilder {
+
+  protected final Map<SystemStream, Set<SystemStreamPartition>> inputs;
+
+  TestExampleBase(Set<SystemStreamPartition> inputs) {
+    this.inputs = new HashMap<>();
+    for (SystemStreamPartition input : inputs) {
+      this.inputs.putIfAbsent(input.getSystemStream(), new HashSet<>());
+      this.inputs.get(input.getSystemStream()).add(input);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2c7309cf/samza-core/src/test/java/org/apache/samza/example/TestJoinExample.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/TestJoinExample.java b/samza-core/src/test/java/org/apache/samza/example/TestJoinExample.java
new file mode 100644
index 0000000..fe6e7e7
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/example/TestJoinExample.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.example;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.operators.StreamSpec;
+import org.apache.samza.operators.data.InputMessageEnvelope;
+import org.apache.samza.operators.data.JsonIncomingSystemMessageEnvelope;
+import org.apache.samza.operators.data.Offset;
+import org.apache.samza.operators.functions.JoinFunction;
+import org.apache.samza.serializers.JsonSerde;
+import org.apache.samza.serializers.StringSerde;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamPartition;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+
+
+/**
+ * Example implementation of unique key-based stream-stream join tasks
+ *
+ */
+public class TestJoinExample  extends TestExampleBase {
+
+  TestJoinExample(Set<SystemStreamPartition> inputs) {
+    super(inputs);
+  }
+
+  class MessageType {
+    String joinKey;
+    List<String> joinFields = new ArrayList<>();
+  }
+
+  class JsonMessageEnvelope extends JsonIncomingSystemMessageEnvelope<MessageType> {
+    JsonMessageEnvelope(String key, MessageType data, Offset offset, SystemStreamPartition partition) {
+      super(key, data, offset, partition);
+    }
+  }
+
+  MessageStream<JsonMessageEnvelope> joinOutput = null;
+
+  @Override
+  public void init(StreamGraph graph, Config config) {
+
+    for (SystemStream input : inputs.keySet()) {
+      MessageStream<JsonMessageEnvelope> newSource = graph.<Object, Object, InputMessageEnvelope>createInStream(
+          new StreamSpec() {
+            @Override public SystemStream getSystemStream() {
+              return input;
+            }
+
+            @Override public Properties getProperties() {
+              return null;
+            }
+          }, null, null).map(this::getInputMessage);
+      if (joinOutput == null) {
+        joinOutput = newSource;
+      } else {
+        joinOutput = joinOutput.join(newSource, new MyJoinFunction());
+      }
+    }
+
+    joinOutput.sendTo(graph.createOutStream(new StreamSpec() {
+      @Override public SystemStream getSystemStream() {
+        return null;
+      }
+
+      @Override public Properties getProperties() {
+        return null;
+      }
+    }, new StringSerde("UTF-8"), new JsonSerde<>()));
+
+  }
+
+  private JsonMessageEnvelope getInputMessage(InputMessageEnvelope ism) {
+    return new JsonMessageEnvelope(
+        ((MessageType) ism.getMessage()).joinKey,
+        (MessageType) ism.getMessage(),
+        ism.getOffset(),
+        ism.getSystemStreamPartition());
+  }
+
+  class MyJoinFunction implements JoinFunction<String, JsonMessageEnvelope, JsonMessageEnvelope, JsonMessageEnvelope> {
+    JsonMessageEnvelope myJoinResult(JsonMessageEnvelope m1, JsonMessageEnvelope m2) {
+      MessageType newJoinMsg = new MessageType();
+      newJoinMsg.joinKey = m1.getKey();
+      newJoinMsg.joinFields.addAll(m1.getMessage().joinFields);
+      newJoinMsg.joinFields.addAll(m2.getMessage().joinFields);
+      return new JsonMessageEnvelope(m1.getMessage().joinKey, newJoinMsg, null, null);
+    }
+
+    @Override
+    public JsonMessageEnvelope apply(JsonMessageEnvelope message, JsonMessageEnvelope otherMessage) {
+      return this.myJoinResult(message, otherMessage);
+    }
+
+    @Override
+    public String getFirstKey(JsonMessageEnvelope message) {
+      return message.getKey();
+    }
+
+    @Override
+    public String getSecondKey(JsonMessageEnvelope message) {
+      return message.getKey();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2c7309cf/samza-core/src/test/java/org/apache/samza/example/TestWindowExample.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/TestWindowExample.java b/samza-core/src/test/java/org/apache/samza/example/TestWindowExample.java
new file mode 100644
index 0000000..e08ca20
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/example/TestWindowExample.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.example;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.operators.StreamSpec;
+import org.apache.samza.operators.data.InputMessageEnvelope;
+import org.apache.samza.operators.data.JsonIncomingSystemMessageEnvelope;
+import org.apache.samza.operators.data.MessageEnvelope;
+import org.apache.samza.operators.data.Offset;
+import org.apache.samza.operators.windows.Windows;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamPartition;
+
+import java.time.Duration;
+import java.util.function.BiFunction;
+import java.util.Properties;
+import java.util.Set;
+
+
+/**
+ * Example implementation of a simple user-defined tasks w/ window operators
+ *
+ */
+public class TestWindowExample extends TestExampleBase {
+  class MessageType {
+    String field1;
+    String field2;
+  }
+
+  TestWindowExample(Set<SystemStreamPartition> inputs) {
+    super(inputs);
+  }
+
+  class JsonMessageEnvelope extends JsonIncomingSystemMessageEnvelope<MessageType> {
+
+    JsonMessageEnvelope(String key, MessageType data, Offset offset, SystemStreamPartition partition) {
+      super(key, data, offset, partition);
+    }
+  }
+
+  @Override
+  public void init(StreamGraph graph, Config config) {
+    BiFunction<JsonMessageEnvelope, Integer, Integer> maxAggregator = (m, c) -> c + 1;
+    inputs.keySet().forEach(source -> graph.<Object, Object, InputMessageEnvelope>createInStream(new StreamSpec() {
+      @Override public SystemStream getSystemStream() {
+        return source;
+      }
+
+      @Override public Properties getProperties() {
+        return null;
+      }
+    }, null, null).
+        map(m1 -> new JsonMessageEnvelope(this.myMessageKeyFunction(m1), (MessageType) m1.getMessage(), m1.getOffset(),
+            m1.getSystemStreamPartition())).window(Windows.tumblingWindow(Duration.ofMillis(200), maxAggregator)));
+
+  }
+
+  String myMessageKeyFunction(MessageEnvelope<Object, Object> m) {
+    return m.getKey().toString();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2c7309cf/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java b/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
new file mode 100644
index 0000000..160a47a
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators;
+
+import org.apache.samza.operators.functions.FilterFunction;
+import org.apache.samza.operators.functions.FlatMapFunction;
+import org.apache.samza.operators.functions.JoinFunction;
+import org.apache.samza.operators.functions.MapFunction;
+import org.apache.samza.operators.functions.SinkFunction;
+import org.apache.samza.operators.spec.OperatorSpec;
+import org.apache.samza.operators.spec.PartialJoinOperatorSpec;
+import org.apache.samza.operators.spec.SinkOperatorSpec;
+import org.apache.samza.operators.spec.StreamOperatorSpec;
+import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskCoordinator;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+public class TestMessageStreamImpl {
+
+  private StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
+
+  @Test
+  public void testMap() {
+    MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph);
+    MapFunction<TestMessageEnvelope, TestOutputMessageEnvelope> xMap = (TestMessageEnvelope m)  ->
+        new TestOutputMessageEnvelope(m.getKey(), m.getMessage().getValue().length() + 1);
+    MessageStream<TestOutputMessageEnvelope> outputStream = inputStream.map(xMap);
+    Collection<OperatorSpec> subs = inputStream.getRegisteredOperatorSpecs();
+    assertEquals(subs.size(), 1);
+    OperatorSpec<TestOutputMessageEnvelope> mapOp = subs.iterator().next();
+    assertTrue(mapOp instanceof StreamOperatorSpec);
+    assertEquals(mapOp.getNextStream(), outputStream);
+    // assert that the transformation function is what we defined above
+    TestMessageEnvelope xTestMsg = mock(TestMessageEnvelope.class);
+    TestMessageEnvelope.MessageType mockInnerTestMessage = mock(TestMessageEnvelope.MessageType.class);
+    when(xTestMsg.getKey()).thenReturn("test-msg-key");
+    when(xTestMsg.getMessage()).thenReturn(mockInnerTestMessage);
+    when(mockInnerTestMessage.getValue()).thenReturn("123456789");
+
+    Collection<TestOutputMessageEnvelope> cOutputMsg = ((StreamOperatorSpec<TestMessageEnvelope, TestOutputMessageEnvelope>) mapOp).getTransformFn().apply(xTestMsg);
+    assertEquals(cOutputMsg.size(), 1);
+    TestOutputMessageEnvelope outputMessage = cOutputMsg.iterator().next();
+    assertEquals(outputMessage.getKey(), xTestMsg.getKey());
+    assertEquals(outputMessage.getMessage(), Integer.valueOf(xTestMsg.getMessage().getValue().length() + 1));
+  }
+
+  @Test
+  public void testFlatMap() {
+    MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph);
+    Set<TestOutputMessageEnvelope> flatOuts = new HashSet<TestOutputMessageEnvelope>() { {
+        this.add(mock(TestOutputMessageEnvelope.class));
+        this.add(mock(TestOutputMessageEnvelope.class));
+        this.add(mock(TestOutputMessageEnvelope.class));
+      } };
+    FlatMapFunction<TestMessageEnvelope, TestOutputMessageEnvelope> xFlatMap = (TestMessageEnvelope message) -> flatOuts;
+    MessageStream<TestOutputMessageEnvelope> outputStream = inputStream.flatMap(xFlatMap);
+    Collection<OperatorSpec> subs = inputStream.getRegisteredOperatorSpecs();
+    assertEquals(subs.size(), 1);
+    OperatorSpec<TestOutputMessageEnvelope> flatMapOp = subs.iterator().next();
+    assertTrue(flatMapOp instanceof StreamOperatorSpec);
+    assertEquals(flatMapOp.getNextStream(), outputStream);
+    // assert that the transformation function is what we defined above
+    assertEquals(((StreamOperatorSpec<TestMessageEnvelope, TestOutputMessageEnvelope>) flatMapOp).getTransformFn(), xFlatMap);
+  }
+
+  @Test
+  public void testFilter() {
+    MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph);
+    FilterFunction<TestMessageEnvelope> xFilter = (TestMessageEnvelope m) -> m.getMessage().getEventTime() > 123456L;
+    MessageStream<TestMessageEnvelope> outputStream = inputStream.filter(xFilter);
+    Collection<OperatorSpec> subs = inputStream.getRegisteredOperatorSpecs();
+    assertEquals(subs.size(), 1);
+    OperatorSpec<TestMessageEnvelope> filterOp = subs.iterator().next();
+    assertTrue(filterOp instanceof StreamOperatorSpec);
+    assertEquals(filterOp.getNextStream(), outputStream);
+    // assert that the transformation function is what we defined above
+    FlatMapFunction<TestMessageEnvelope, TestMessageEnvelope> txfmFn = ((StreamOperatorSpec<TestMessageEnvelope, TestMessageEnvelope>) filterOp).getTransformFn();
+    TestMessageEnvelope mockMsg = mock(TestMessageEnvelope.class);
+    TestMessageEnvelope.MessageType mockInnerTestMessage = mock(TestMessageEnvelope.MessageType.class);
+    when(mockMsg.getMessage()).thenReturn(mockInnerTestMessage);
+    when(mockInnerTestMessage.getEventTime()).thenReturn(11111L);
+    Collection<TestMessageEnvelope> output = txfmFn.apply(mockMsg);
+    assertTrue(output.isEmpty());
+    when(mockMsg.getMessage()).thenReturn(mockInnerTestMessage);
+    when(mockInnerTestMessage.getEventTime()).thenReturn(999999L);
+    output = txfmFn.apply(mockMsg);
+    assertEquals(output.size(), 1);
+    assertEquals(output.iterator().next(), mockMsg);
+  }
+
+  @Test
+  public void testSink() {
+    MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph);
+    SinkFunction<TestMessageEnvelope> xSink = (TestMessageEnvelope m, MessageCollector mc, TaskCoordinator tc) -> {
+      mc.send(new OutgoingMessageEnvelope(new SystemStream("test-sys", "test-stream"), m.getMessage()));
+      tc.commit(TaskCoordinator.RequestScope.CURRENT_TASK);
+    };
+    inputStream.sink(xSink);
+    Collection<OperatorSpec> subs = inputStream.getRegisteredOperatorSpecs();
+    assertEquals(subs.size(), 1);
+    OperatorSpec<TestMessageEnvelope> sinkOp = subs.iterator().next();
+    assertTrue(sinkOp instanceof SinkOperatorSpec);
+    assertEquals(((SinkOperatorSpec) sinkOp).getSinkFn(), xSink);
+    assertNull(((SinkOperatorSpec) sinkOp).getNextStream());
+  }
+
+  @Test
+  public void testJoin() {
+    MessageStreamImpl<TestMessageEnvelope> source1 = new MessageStreamImpl<>(mockGraph);
+    MessageStreamImpl<TestMessageEnvelope> source2 = new MessageStreamImpl<>(mockGraph);
+    JoinFunction<String, TestMessageEnvelope, TestMessageEnvelope, TestOutputMessageEnvelope> joiner =
+      new JoinFunction<String, TestMessageEnvelope, TestMessageEnvelope, TestOutputMessageEnvelope>() {
+        @Override
+        public TestOutputMessageEnvelope apply(TestMessageEnvelope m1, TestMessageEnvelope m2) {
+          return new TestOutputMessageEnvelope(m1.getKey(), m1.getMessage().getValue().length() + m2.getMessage().getValue().length());
+        }
+
+        @Override
+        public String getFirstKey(TestMessageEnvelope message) {
+          return message.getKey();
+        }
+
+        @Override
+        public String getSecondKey(TestMessageEnvelope message) {
+          return message.getKey();
+        }
+      };
+
+    MessageStream<TestOutputMessageEnvelope> joinOutput = source1.join(source2, joiner);
+    Collection<OperatorSpec> subs = source1.getRegisteredOperatorSpecs();
+    assertEquals(subs.size(), 1);
+    OperatorSpec<TestMessageEnvelope> joinOp1 = subs.iterator().next();
+    assertTrue(joinOp1 instanceof PartialJoinOperatorSpec);
+    assertEquals(((PartialJoinOperatorSpec) joinOp1).getNextStream(), joinOutput);
+    subs = source2.getRegisteredOperatorSpecs();
+    assertEquals(subs.size(), 1);
+    OperatorSpec<TestMessageEnvelope> joinOp2 = subs.iterator().next();
+    assertTrue(joinOp2 instanceof PartialJoinOperatorSpec);
+    assertEquals(((PartialJoinOperatorSpec) joinOp2).getNextStream(), joinOutput);
+    TestMessageEnvelope joinMsg1 = new TestMessageEnvelope("test-join-1", "join-msg-001", 11111L);
+    TestMessageEnvelope joinMsg2 = new TestMessageEnvelope("test-join-2", "join-msg-002", 22222L);
+    TestOutputMessageEnvelope xOut = (TestOutputMessageEnvelope) ((PartialJoinOperatorSpec) joinOp1).getTransformFn().apply(joinMsg1, joinMsg2);
+    assertEquals(xOut.getKey(), "test-join-1");
+    assertEquals(xOut.getMessage(), Integer.valueOf(24));
+    xOut = (TestOutputMessageEnvelope) ((PartialJoinOperatorSpec) joinOp2).getTransformFn().apply(joinMsg2, joinMsg1);
+    assertEquals(xOut.getKey(), "test-join-1");
+    assertEquals(xOut.getMessage(), Integer.valueOf(24));
+  }
+
+  @Test
+  public void testMerge() {
+    MessageStream<TestMessageEnvelope> merge1 = new MessageStreamImpl<>(mockGraph);
+    Collection<MessageStream<TestMessageEnvelope>> others = new ArrayList<MessageStream<TestMessageEnvelope>>() { {
+        this.add(new MessageStreamImpl<>(mockGraph));
+        this.add(new MessageStreamImpl<>(mockGraph));
+      } };
+    MessageStream<TestMessageEnvelope> mergeOutput = merge1.merge(others);
+    validateMergeOperator(merge1, mergeOutput);
+
+    others.forEach(merge -> validateMergeOperator(merge, mergeOutput));
+  }
+
+  private void validateMergeOperator(MessageStream<TestMessageEnvelope> mergeSource, MessageStream<TestMessageEnvelope> mergeOutput) {
+    Collection<OperatorSpec> subs = ((MessageStreamImpl<TestMessageEnvelope>) mergeSource).getRegisteredOperatorSpecs();
+    assertEquals(subs.size(), 1);
+    OperatorSpec<TestMessageEnvelope> mergeOp = subs.iterator().next();
+    assertTrue(mergeOp instanceof StreamOperatorSpec);
+    assertEquals(((StreamOperatorSpec) mergeOp).getNextStream(), mergeOutput);
+    TestMessageEnvelope mockMsg = mock(TestMessageEnvelope.class);
+    Collection<TestMessageEnvelope> outputs = ((StreamOperatorSpec<TestMessageEnvelope, TestMessageEnvelope>) mergeOp).getTransformFn().apply(mockMsg);
+    assertEquals(outputs.size(), 1);
+    assertEquals(outputs.iterator().next(), mockMsg);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2c7309cf/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImplUtil.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImplUtil.java b/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImplUtil.java
new file mode 100644
index 0000000..c4e9f51
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImplUtil.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators;
+
+
+public class TestMessageStreamImplUtil {
+  public static <M> MessageStreamImpl<M> getMessageStreamImpl(StreamGraphImpl graph) {
+    return new MessageStreamImpl<M>(graph);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2c7309cf/samza-core/src/test/java/org/apache/samza/operators/data/JsonIncomingSystemMessageEnvelope.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/data/JsonIncomingSystemMessageEnvelope.java b/samza-core/src/test/java/org/apache/samza/operators/data/JsonIncomingSystemMessageEnvelope.java
new file mode 100644
index 0000000..9a425d1
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/operators/data/JsonIncomingSystemMessageEnvelope.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.data;
+
+import org.apache.samza.system.SystemStreamPartition;
+
+
+/**
+ * Example input {@link MessageEnvelope} w/ Json message and string as the key.
+ */
+
+public class JsonIncomingSystemMessageEnvelope<T> implements MessageEnvelope<String, T> {
+
+  private final String key;
+  private final T data;
+  private final Offset offset;
+  private final SystemStreamPartition partition;
+
+  public JsonIncomingSystemMessageEnvelope(String key, T data, Offset offset, SystemStreamPartition partition) {
+    this.key = key;
+    this.data = data;
+    this.offset = offset;
+    this.partition = partition;
+  }
+
+  @Override
+  public T getMessage() {
+    return this.data;
+  }
+
+  @Override
+  public String getKey() {
+    return this.key;
+  }
+
+  public Offset getOffset() {
+    return this.offset;
+  }
+
+  public SystemStreamPartition getSystemStreamPartition() {
+    return this.partition;
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/samza/blob/2c7309cf/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
new file mode 100644
index 0000000..361972e
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.impl;
+
+import org.apache.samza.operators.TestMessageEnvelope;
+import org.apache.samza.operators.TestOutputMessageEnvelope;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskCoordinator;
+import org.hamcrest.core.IsEqual;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.argThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+
+public class TestOperatorImpl {
+
+  TestMessageEnvelope curInputMsg;
+  MessageCollector curCollector;
+  TaskCoordinator curCoordinator;
+
+  @Test
+  public void testSubscribers() {
+    this.curInputMsg = null;
+    this.curCollector = null;
+    this.curCoordinator = null;
+    OperatorImpl<TestMessageEnvelope, TestOutputMessageEnvelope> opImpl = new OperatorImpl<TestMessageEnvelope, TestOutputMessageEnvelope>() {
+      @Override
+      public void onNext(TestMessageEnvelope message, MessageCollector collector, TaskCoordinator coordinator) {
+        TestOperatorImpl.this.curInputMsg = message;
+        TestOperatorImpl.this.curCollector = collector;
+        TestOperatorImpl.this.curCoordinator = coordinator;
+      }
+    };
+    // verify registerNextOperator() added the mockSub and propagateResult() invoked the mockSub.onNext()
+    OperatorImpl mockSub = mock(OperatorImpl.class);
+    opImpl.registerNextOperator(mockSub);
+    TestOutputMessageEnvelope xOutput = mock(TestOutputMessageEnvelope.class);
+    MessageCollector mockCollector = mock(MessageCollector.class);
+    TaskCoordinator mockCoordinator = mock(TaskCoordinator.class);
+    opImpl.propagateResult(xOutput, mockCollector, mockCoordinator);
+    verify(mockSub, times(1)).onNext(
+        argThat(new IsEqual<>(xOutput)),
+        argThat(new IsEqual<>(mockCollector)),
+        argThat(new IsEqual<>(mockCoordinator))
+    );
+    // verify onNext() is invoked correctly
+    TestMessageEnvelope mockInput = mock(TestMessageEnvelope.class);
+    opImpl.onNext(mockInput, mockCollector, mockCoordinator);
+    assertEquals(mockInput, this.curInputMsg);
+    assertEquals(mockCollector, this.curCollector);
+    assertEquals(mockCoordinator, this.curCoordinator);
+  }
+}