You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2018/05/25 16:36:45 UTC

[02/10] samza git commit: SAMZA-1659: Serializable OperatorSpec

http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-test/src/main/java/org/apache/samza/example/BroadcastExample.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/example/BroadcastExample.java b/samza-test/src/main/java/org/apache/samza/example/BroadcastExample.java
new file mode 100644
index 0000000..9ca4f35
--- /dev/null
+++ b/samza-test/src/main/java/org/apache/samza/example/BroadcastExample.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.example;
+
+import org.apache.samza.application.StreamApplication;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.KV;
+import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.runtime.LocalApplicationRunner;
+import org.apache.samza.serializers.JsonSerdeV2;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.serializers.StringSerde;
+import org.apache.samza.util.CommandLine;
+
+
+/**
+ * Example implementation of a task that splits its input into multiple output streams.
+ */
+public class BroadcastExample implements StreamApplication {
+
+  // local execution mode
+  public static void main(String[] args) throws Exception {
+    CommandLine cmdLine = new CommandLine();
+    Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
+
+    StreamApplication app = new BroadcastExample();
+    LocalApplicationRunner runner = new LocalApplicationRunner(config);
+
+    runner.run(app);
+    runner.waitForFinish();
+  }
+
+  @Override
+  public void init(StreamGraph graph, Config config) {
+    KVSerde<String, PageViewEvent> pgeMsgSerde = KVSerde.of(new StringSerde("UTF-8"), new JsonSerdeV2<>(PageViewEvent.class));
+    MessageStream<KV<String, PageViewEvent>> inputStream = graph.getInputStream("pageViewEventStream", pgeMsgSerde);
+
+    inputStream.filter(m -> m.key.equals("key1")).sendTo(graph.getOutputStream("outStream1", pgeMsgSerde));
+    inputStream.filter(m -> m.key.equals("key2")).sendTo(graph.getOutputStream("outStream2", pgeMsgSerde));
+    inputStream.filter(m -> m.key.equals("key3")).sendTo(graph.getOutputStream("outStream3", pgeMsgSerde));
+  }
+
+  class PageViewEvent {
+    String key;
+    long timestamp;
+
+    public PageViewEvent(String key, long timestamp) {
+      this.key = key;
+      this.timestamp = timestamp;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-test/src/main/java/org/apache/samza/example/KeyValueStoreExample.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/example/KeyValueStoreExample.java b/samza-test/src/main/java/org/apache/samza/example/KeyValueStoreExample.java
new file mode 100644
index 0000000..9edaabe
--- /dev/null
+++ b/samza-test/src/main/java/org/apache/samza/example/KeyValueStoreExample.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.example;
+
+
+import org.apache.samza.application.StreamApplication;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.KV;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.OutputStream;
+import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.operators.functions.FlatMapFunction;
+import org.apache.samza.runtime.LocalApplicationRunner;
+import org.apache.samza.serializers.JsonSerdeV2;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.StringSerde;
+import org.apache.samza.storage.kv.KeyValueStore;
+import org.apache.samza.task.TaskContext;
+import org.apache.samza.util.CommandLine;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+
+/**
+ * Example code using {@link KeyValueStore} to implement event-time window
+ */
+public class KeyValueStoreExample implements StreamApplication {
+
+  // local execution mode
+  public static void main(String[] args) throws Exception {
+    CommandLine cmdLine = new CommandLine();
+    Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
+    KeyValueStoreExample app = new KeyValueStoreExample();
+    LocalApplicationRunner runner = new LocalApplicationRunner(config);
+
+    runner.run(app);
+    runner.waitForFinish();
+  }
+
+  @Override
+  public void init(StreamGraph graph, Config config) {
+    MessageStream<PageViewEvent> pageViewEvents =
+        graph.getInputStream("pageViewEventStream", new JsonSerdeV2<>(PageViewEvent.class));
+    OutputStream<KV<String, StatsOutput>> pageViewEventPerMember =
+        graph.getOutputStream("pageViewEventPerMember",
+            KVSerde.of(new StringSerde(), new JsonSerdeV2<>(StatsOutput.class)));
+
+    pageViewEvents
+        .partitionBy(pve -> pve.memberId, pve -> pve,
+            KVSerde.of(new StringSerde(), new JsonSerdeV2<>(PageViewEvent.class)), "partitionBy")
+        .map(KV::getValue)
+        .flatMap(new MyStatsCounter())
+        .map(stats -> KV.of(stats.memberId, stats))
+        .sendTo(pageViewEventPerMember);
+  }
+
+  static class MyStatsCounter implements FlatMapFunction<PageViewEvent, StatsOutput> {
+    private final int timeoutMs = 10 * 60 * 1000;
+
+    KeyValueStore<String, StatsWindowState> statsStore;
+
+    class StatsWindowState {
+      int lastCount = 0;
+      long timeAtLastOutput = 0;
+      int newCount = 0;
+    }
+
+    @Override
+    public Collection<StatsOutput> apply(PageViewEvent message) {
+      List<StatsOutput> outputStats = new ArrayList<>();
+      long wndTimestamp = (long) Math.floor(TimeUnit.MILLISECONDS.toMinutes(message.timestamp) / 5) * 5;
+      String wndKey = String.format("%s-%d", message.memberId, wndTimestamp);
+      StatsWindowState curState = this.statsStore.get(wndKey);
+      if (curState == null) {
+        curState = new StatsWindowState();
+      }
+      curState.newCount++;
+      long curTimeMs = System.currentTimeMillis();
+      if (curState.newCount > 0 && curState.timeAtLastOutput + timeoutMs < curTimeMs) {
+        curState.timeAtLastOutput = curTimeMs;
+        curState.lastCount += curState.newCount;
+        curState.newCount = 0;
+        outputStats.add(new StatsOutput(message.memberId, wndTimestamp, curState.lastCount));
+      }
+      // update counter w/o generating output
+      this.statsStore.put(wndKey, curState);
+      return outputStats;
+    }
+
+    @Override
+    public void init(Config config, TaskContext context) {
+      this.statsStore = (KeyValueStore<String, StatsWindowState>) context.getStore("my-stats-wnd-store");
+    }
+  }
+
+  class PageViewEvent {
+    String pageId;
+    String memberId;
+    long timestamp;
+
+    PageViewEvent(String pageId, String memberId, long timestamp) {
+      this.pageId = pageId;
+      this.memberId = memberId;
+      this.timestamp = timestamp;
+    }
+  }
+
+  static class StatsOutput {
+    private String memberId;
+    private long timestamp;
+    private Integer count;
+
+    StatsOutput(String key, long timestamp, Integer count) {
+      this.memberId = key;
+      this.timestamp = timestamp;
+      this.count = count;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-test/src/main/java/org/apache/samza/example/MergeExample.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/example/MergeExample.java b/samza-test/src/main/java/org/apache/samza/example/MergeExample.java
new file mode 100644
index 0000000..ff983a4
--- /dev/null
+++ b/samza-test/src/main/java/org/apache/samza/example/MergeExample.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.example;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.samza.application.StreamApplication;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.runtime.LocalApplicationRunner;
+import org.apache.samza.serializers.JsonSerdeV2;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.serializers.StringSerde;
+import org.apache.samza.util.CommandLine;
+
+public class MergeExample implements StreamApplication {
+
+  // local execution mode
+  public static void main(String[] args) throws Exception {
+    CommandLine cmdLine = new CommandLine();
+    Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
+    MergeExample app = new MergeExample();
+    LocalApplicationRunner runner = new LocalApplicationRunner(config);
+
+    runner.run(app);
+    runner.waitForFinish();
+  }
+
+  @Override
+  public void init(StreamGraph graph, Config config) {
+
+    KVSerde<String, PageViewEvent>
+        pgeMsgSerde = KVSerde.of(new StringSerde("UTF-8"), new JsonSerdeV2<>(PageViewEvent.class));
+
+    MessageStream.mergeAll(ImmutableList.of(graph.getInputStream("viewStream1", pgeMsgSerde),
+        graph.getInputStream("viewStream2", pgeMsgSerde), graph.getInputStream("viewStream3", pgeMsgSerde)))
+        .sendTo(graph.getOutputStream("mergedStream", pgeMsgSerde));
+
+  }
+
+  class PageViewEvent {
+    String pageId;
+    long viewTimestamp;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-test/src/main/java/org/apache/samza/example/OrderShipmentJoinExample.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/example/OrderShipmentJoinExample.java b/samza-test/src/main/java/org/apache/samza/example/OrderShipmentJoinExample.java
new file mode 100644
index 0000000..1c0bc25
--- /dev/null
+++ b/samza-test/src/main/java/org/apache/samza/example/OrderShipmentJoinExample.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.example;
+
+import org.apache.samza.application.StreamApplication;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.KV;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.OutputStream;
+import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.operators.functions.JoinFunction;
+import org.apache.samza.runtime.LocalApplicationRunner;
+import org.apache.samza.serializers.JsonSerdeV2;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.StringSerde;
+import org.apache.samza.util.CommandLine;
+
+import java.time.Duration;
+
+
+/**
+ * Simple 2-way stream-to-stream join example
+ */
+public class OrderShipmentJoinExample implements StreamApplication {
+
+  // local execution mode
+  public static void main(String[] args) throws Exception {
+    CommandLine cmdLine = new CommandLine();
+    Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
+    OrderShipmentJoinExample app = new OrderShipmentJoinExample();
+    LocalApplicationRunner runner = new LocalApplicationRunner(config);
+
+    runner.run(app);
+    runner.waitForFinish();
+  }
+
+  @Override
+  public void init(StreamGraph graph, Config config) {
+
+    MessageStream<OrderRecord> orders =
+        graph.getInputStream("orders", new JsonSerdeV2<>(OrderRecord.class));
+    MessageStream<ShipmentRecord> shipments =
+        graph.getInputStream("shipments", new JsonSerdeV2<>(ShipmentRecord.class));
+    OutputStream<KV<String, FulfilledOrderRecord>> fulfilledOrders =
+        graph.getOutputStream("fulfilledOrders",
+            KVSerde.of(new StringSerde(), new JsonSerdeV2<>(FulfilledOrderRecord.class)));
+
+    orders
+        .join(shipments, new MyJoinFunction(),
+            new StringSerde(), new JsonSerdeV2<>(OrderRecord.class), new JsonSerdeV2<>(ShipmentRecord.class),
+            Duration.ofMinutes(1), "join")
+        .map(fulFilledOrder -> KV.of(fulFilledOrder.orderId, fulFilledOrder))
+        .sendTo(fulfilledOrders);
+
+  }
+
+  static class MyJoinFunction implements JoinFunction<String, OrderRecord, ShipmentRecord, FulfilledOrderRecord> {
+    @Override
+    public FulfilledOrderRecord apply(OrderRecord message, ShipmentRecord otherMessage) {
+      return new FulfilledOrderRecord(message.orderId, message.orderTimeMs, otherMessage.shipTimeMs);
+    }
+
+    @Override
+    public String getFirstKey(OrderRecord message) {
+      return message.orderId;
+    }
+
+    @Override
+    public String getSecondKey(ShipmentRecord message) {
+      return message.orderId;
+    }
+  }
+
+  class OrderRecord {
+    String orderId;
+    long orderTimeMs;
+
+    OrderRecord(String orderId, long timeMs) {
+      this.orderId = orderId;
+      this.orderTimeMs = timeMs;
+    }
+  }
+
+  class ShipmentRecord {
+    String orderId;
+    long shipTimeMs;
+
+    ShipmentRecord(String orderId, long timeMs) {
+      this.orderId = orderId;
+      this.shipTimeMs = timeMs;
+    }
+  }
+
+  static class FulfilledOrderRecord {
+    String orderId;
+    long orderTimeMs;
+    long shipTimeMs;
+
+    FulfilledOrderRecord(String orderId, long orderTimeMs, long shipTimeMs) {
+      this.orderId = orderId;
+      this.orderTimeMs = orderTimeMs;
+      this.shipTimeMs = shipTimeMs;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-test/src/main/java/org/apache/samza/example/PageViewCounterExample.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/example/PageViewCounterExample.java b/samza-test/src/main/java/org/apache/samza/example/PageViewCounterExample.java
new file mode 100644
index 0000000..2581506
--- /dev/null
+++ b/samza-test/src/main/java/org/apache/samza/example/PageViewCounterExample.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.example;
+
+import java.time.Duration;
+import org.apache.samza.application.StreamApplication;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.KV;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.OutputStream;
+import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.operators.functions.FoldLeftFunction;
+import org.apache.samza.operators.functions.SupplierFunction;
+import org.apache.samza.operators.triggers.Triggers;
+import org.apache.samza.operators.windows.AccumulationMode;
+import org.apache.samza.operators.windows.WindowPane;
+import org.apache.samza.operators.windows.Windows;
+import org.apache.samza.runtime.LocalApplicationRunner;
+import org.apache.samza.serializers.JsonSerdeV2;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.StringSerde;
+import org.apache.samza.util.CommandLine;
+
+
+/**
+ * Example code to implement window-based counter
+ */
+public class PageViewCounterExample implements StreamApplication {
+
+  // local execution mode
+  public static void main(String[] args) {
+    CommandLine cmdLine = new CommandLine();
+    Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
+    PageViewCounterExample app = new PageViewCounterExample();
+    LocalApplicationRunner runner = new LocalApplicationRunner(config);
+
+    runner.run(app);
+    runner.waitForFinish();
+  }
+
+  @Override
+  public void init(StreamGraph graph, Config config) {
+
+    MessageStream<PageViewEvent> pageViewEvents = null;
+    pageViewEvents = graph.getInputStream("pageViewEventStream", new JsonSerdeV2<>(PageViewEvent.class));
+    OutputStream<KV<String, PageViewCount>> pageViewEventPerMemberStream =
+        graph.getOutputStream("pageViewEventPerMemberStream",
+            KVSerde.of(new StringSerde(), new JsonSerdeV2<>(PageViewCount.class)));
+
+    SupplierFunction<Integer> initialValue = () -> 0;
+    FoldLeftFunction<PageViewEvent, Integer> foldLeftFn = (m, c) -> c + 1;
+    pageViewEvents
+        .window(Windows.keyedTumblingWindow(m -> m.memberId, Duration.ofSeconds(10), initialValue, foldLeftFn, null, null)
+            .setEarlyTrigger(Triggers.repeat(Triggers.count(5)))
+            .setAccumulationMode(AccumulationMode.DISCARDING), "tumblingWindow")
+        .map(windowPane -> KV.of(windowPane.getKey().getKey(), new PageViewCount(windowPane)))
+        .sendTo(pageViewEventPerMemberStream);
+
+  }
+
+  class PageViewEvent {
+    String pageId;
+    String memberId;
+    long timestamp;
+
+    PageViewEvent(String pageId, String memberId, long timestamp) {
+      this.pageId = pageId;
+      this.memberId = memberId;
+      this.timestamp = timestamp;
+    }
+  }
+
+  static class PageViewCount {
+    String memberId;
+    long timestamp;
+    int count;
+
+    PageViewCount(WindowPane<String, Integer> m) {
+      this.memberId = m.getKey().getKey();
+      this.timestamp = Long.valueOf(m.getKey().getPaneId());
+      this.count = m.getMessage();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-test/src/main/java/org/apache/samza/example/RepartitionExample.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/example/RepartitionExample.java b/samza-test/src/main/java/org/apache/samza/example/RepartitionExample.java
new file mode 100644
index 0000000..7f28346
--- /dev/null
+++ b/samza-test/src/main/java/org/apache/samza/example/RepartitionExample.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.example;
+
+import org.apache.samza.application.StreamApplication;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.KV;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.OutputStream;
+import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.operators.windows.WindowPane;
+import org.apache.samza.operators.windows.Windows;
+import org.apache.samza.runtime.LocalApplicationRunner;
+import org.apache.samza.serializers.JsonSerdeV2;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.StringSerde;
+import org.apache.samza.util.CommandLine;
+
+import java.time.Duration;
+
+
+/**
+ * Example {@link StreamApplication} code to test the API methods with re-partition operator
+ */
+public class RepartitionExample implements StreamApplication {
+
+  // local execution mode
+  public static void main(String[] args) throws Exception {
+    CommandLine cmdLine = new CommandLine();
+    Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
+    RepartitionExample app = new RepartitionExample();
+    LocalApplicationRunner runner = new LocalApplicationRunner(config);
+
+    runner.run(app);
+    runner.waitForFinish();
+  }
+
+  @Override
+  public void init(StreamGraph graph, Config config) {
+
+    MessageStream<PageViewEvent> pageViewEvents =
+        graph.getInputStream("pageViewEvent", new JsonSerdeV2<>(PageViewEvent.class));
+    OutputStream<KV<String, MyStreamOutput>> pageViewEventPerMember =
+        graph.getOutputStream("pageViewEventPerMember",
+            KVSerde.of(new StringSerde(), new JsonSerdeV2<>(MyStreamOutput.class)));
+
+    pageViewEvents
+        .partitionBy(pve -> pve.memberId, pve -> pve,
+            KVSerde.of(new StringSerde(), new JsonSerdeV2<>(PageViewEvent.class)), "partitionBy")
+        .window(Windows.keyedTumblingWindow(KV::getKey, Duration.ofMinutes(5), () -> 0, (m, c) -> c + 1, null, null),
+            "window")
+        .map(windowPane -> KV.of(windowPane.getKey().getKey(), new MyStreamOutput(windowPane)))
+        .sendTo(pageViewEventPerMember);
+
+  }
+
+  class PageViewEvent {
+    String pageId;
+    String memberId;
+    long timestamp;
+
+    PageViewEvent(String pageId, String memberId, long timestamp) {
+      this.pageId = pageId;
+      this.memberId = memberId;
+      this.timestamp = timestamp;
+    }
+  }
+
+  static class MyStreamOutput {
+    String memberId;
+    long timestamp;
+    int count;
+
+    MyStreamOutput(WindowPane<String, Integer> m) {
+      this.memberId = m.getKey().getKey();
+      this.timestamp = Long.valueOf(m.getKey().getPaneId());
+      this.count = m.getMessage();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-test/src/main/java/org/apache/samza/example/WindowExample.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/example/WindowExample.java b/samza-test/src/main/java/org/apache/samza/example/WindowExample.java
new file mode 100644
index 0000000..4950695
--- /dev/null
+++ b/samza-test/src/main/java/org/apache/samza/example/WindowExample.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.example;
+
+import org.apache.samza.application.StreamApplication;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.OutputStream;
+import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.operators.functions.FoldLeftFunction;
+import org.apache.samza.operators.functions.SupplierFunction;
+import org.apache.samza.operators.windows.WindowPane;
+import org.apache.samza.operators.triggers.Triggers;
+import org.apache.samza.operators.windows.Windows;
+import org.apache.samza.runtime.LocalApplicationRunner;
+import org.apache.samza.serializers.IntegerSerde;
+import org.apache.samza.serializers.JsonSerdeV2;
+import org.apache.samza.util.CommandLine;
+
+import java.time.Duration;
+
+
+/**
+ * Example implementation of a simple user-defined task w/ a window operator.
+ *
+ */
+public class WindowExample implements StreamApplication {
+
+  // local execution mode
+  public static void main(String[] args) throws Exception {
+    CommandLine cmdLine = new CommandLine();
+    Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
+    WindowExample app = new WindowExample();
+    LocalApplicationRunner runner = new LocalApplicationRunner(config);
+
+    runner.run(app);
+    runner.waitForFinish();
+  }
+
+  @Override
+  public void init(StreamGraph graph, Config config) {
+
+    SupplierFunction<Integer> initialValue = () -> 0;
+    FoldLeftFunction<PageViewEvent, Integer> counter = (m, c) -> c == null ? 1 : c + 1;
+    MessageStream<PageViewEvent> inputStream = graph.getInputStream("inputStream", new JsonSerdeV2<PageViewEvent>());
+    OutputStream<Integer> outputStream = graph.getOutputStream("outputStream", new IntegerSerde());
+
+    // create a tumbling window that outputs the number of message collected every 10 minutes.
+    // also emit early results if either the number of messages collected reaches 30000, or if no new messages arrive
+    // for 1 minute.
+    inputStream
+        .window(Windows.tumblingWindow(Duration.ofMinutes(10), initialValue, counter, new IntegerSerde())
+            .setLateTrigger(Triggers.any(Triggers.count(30000),
+                Triggers.timeSinceLastMessage(Duration.ofMinutes(1)))), "window")
+        .map(WindowPane::getMessage)
+        .sendTo(outputStream);
+
+  }
+
+  class PageViewEvent {
+    String key;
+    long timestamp;
+
+    public PageViewEvent(String key, long timestamp) {
+      this.key = key;
+      this.timestamp = timestamp;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-test/src/main/java/org/apache/samza/test/framework/StreamAssert.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/test/framework/StreamAssert.java b/samza-test/src/main/java/org/apache/samza/test/framework/StreamAssert.java
index de0d962..a1ac299 100644
--- a/samza-test/src/main/java/org/apache/samza/test/framework/StreamAssert.java
+++ b/samza-test/src/main/java/org/apache/samza/test/framework/StreamAssert.java
@@ -20,6 +20,8 @@
 package org.apache.samza.test.framework;
 
 import com.google.common.collect.Iterables;
+import java.io.IOException;
+import java.io.ObjectInputStream;
 import org.apache.samza.config.Config;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.functions.SinkFunction;
@@ -155,6 +157,18 @@ public class StreamAssert<M> {
       }
     }
 
+    private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+      in.defaultReadObject();
+      timer = new Timer();
+      actual = Collections.synchronizedList(new ArrayList<>());
+      timerTask = new TimerTask() {
+        @Override
+        public void run() {
+          check();
+        }
+      };
+    }
+
     private void check() {
       final CountDownLatch latch = LATCHES.get(id);
       try {

http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java
index 29c509d..3301af8 100644
--- a/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java
+++ b/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java
@@ -40,11 +40,10 @@ import org.apache.samza.test.controlmessages.TestData.PageViewJsonSerdeFactory;
 import org.apache.samza.test.harness.AbstractIntegrationTestHarness;
 import org.apache.samza.test.util.ArraySystemFactory;
 import org.apache.samza.test.util.Base64Serializer;
-import org.junit.Test;
 
+import org.junit.Test;
 import static org.junit.Assert.assertEquals;
 
-
 /**
  * This test uses an array as a bounded input source, and does a partitionBy() and sink() after reading the input.
  * It verifies the pipeline will stop and the number of output messages should equal to the input.
@@ -53,6 +52,8 @@ public class EndOfStreamIntegrationTest extends AbstractIntegrationTestHarness {
 
   private static final String[] PAGEKEYS = {"inbox", "home", "search", "pymk", "group", "job"};
 
+  private static List<PageView> received = new ArrayList<>();
+
   @Test
   public void testPipeline() throws  Exception {
     Random random = new Random();
@@ -66,6 +67,7 @@ public class EndOfStreamIntegrationTest extends AbstractIntegrationTestHarness {
 
     int partitionCount = 4;
     Map<String, String> configs = new HashMap<>();
+    configs.put("app.runner.class", "org.apache.samza.runtime.LocalApplicationRunner");
     configs.put("systems.test.samza.factory", ArraySystemFactory.class.getName());
     configs.put("streams.PageView.samza.system", "test");
     configs.put("streams.PageView.source", Base64Serializer.serialize(pageviews));
@@ -89,7 +91,6 @@ public class EndOfStreamIntegrationTest extends AbstractIntegrationTestHarness {
     configs.put("serializers.registry.json.class", PageViewJsonSerdeFactory.class.getName());
 
     final LocalApplicationRunner runner = new LocalApplicationRunner(new MapConfig(configs));
-    List<PageView> received = new ArrayList<>();
     final StreamApplication app = (streamGraph, cfg) -> {
       streamGraph.<KV<String, PageView>>getInputStream("PageView")
         .map(Values.create())
@@ -98,6 +99,7 @@ public class EndOfStreamIntegrationTest extends AbstractIntegrationTestHarness {
             received.add(m.getValue());
           });
     };
+
     runner.run(app);
     runner.waitForFinish();
 

http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java
index dda3d24..d4dc4ed 100644
--- a/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java
+++ b/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java
@@ -118,6 +118,7 @@ public class WatermarkIntegrationTest extends AbstractIntegrationTestHarness {
   @Test
   public void testWatermark() throws Exception {
     Map<String, String> configs = new HashMap<>();
+    configs.put("app.runner.class", "org.apache.samza.runtime.LocalApplicationRunner");
     configs.put("systems.test.samza.factory", TestSystemFactory.class.getName());
     configs.put("streams.PageView.samza.system", "test");
     configs.put("streams.PageView.partitionCount", String.valueOf(PARTITION_COUNT));
@@ -140,7 +141,6 @@ public class WatermarkIntegrationTest extends AbstractIntegrationTestHarness {
     configs.put("serializers.registry.string.class", StringSerdeFactory.class.getName());
     configs.put("serializers.registry.json.class", PageViewJsonSerdeFactory.class.getName());
 
-    final LocalApplicationRunner runner = new LocalApplicationRunner(new MapConfig(configs));
     List<PageView> received = new ArrayList<>();
     final StreamApplication app = (streamGraph, cfg) -> {
       streamGraph.<KV<String, PageView>>getInputStream("PageView")
@@ -150,11 +150,14 @@ public class WatermarkIntegrationTest extends AbstractIntegrationTestHarness {
               received.add(m.getValue());
             });
     };
+
+    LocalApplicationRunner runner = new LocalApplicationRunner(new MapConfig(configs));
     runner.run(app);
+    // processors are only available when the app is running
     Map<String, StreamOperatorTask> tasks = getTaskOperationGraphs(runner);
 
     runner.waitForFinish();
-
+    // wait for the completion to ensure that all tasks are actually initialized and the OperatorImplGraph is initialized
     StreamOperatorTask task0 = tasks.get("Partition 0");
     OperatorImplGraph graph = TestStreamOperatorTask.getOperatorImplGraph(task0);
     OperatorImpl pb = getOperator(graph, OperatorSpec.OpCode.PARTITION_BY);

http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionJoinWindowApp.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionJoinWindowApp.java b/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionJoinWindowApp.java
index 120f902..2171d07 100644
--- a/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionJoinWindowApp.java
+++ b/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionJoinWindowApp.java
@@ -19,17 +19,22 @@
 
 package org.apache.samza.test.operator;
 
+import java.util.ArrayList;
+import java.util.List;
 import org.apache.samza.application.StreamApplication;
 import org.apache.samza.config.Config;
 import org.apache.samza.operators.KV;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.StreamGraph;
 import org.apache.samza.operators.functions.JoinFunction;
+import org.apache.samza.operators.stream.IntermediateMessageStreamImpl;
 import org.apache.samza.operators.windows.Windows;
+import org.apache.samza.runtime.LocalApplicationRunner;
 import org.apache.samza.serializers.JsonSerdeV2;
 import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.StringSerde;
 import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.apache.samza.system.StreamSpec;
 import org.apache.samza.system.SystemStream;
 import org.apache.samza.task.TaskCoordinator;
 import org.apache.samza.test.operator.data.AdClick;
@@ -37,6 +42,8 @@ import org.apache.samza.test.operator.data.PageView;
 import org.apache.samza.test.operator.data.UserPageAdClick;
 
 import java.time.Duration;
+import org.apache.samza.util.CommandLine;
+
 
 /**
  * A {@link StreamApplication} that demonstrates a partitionBy, stream-stream join and a windowed count.
@@ -47,6 +54,19 @@ public class RepartitionJoinWindowApp implements StreamApplication {
   public static final String INPUT_TOPIC_NAME_2_PROP = "inputTopicName2";
   public static final String OUTPUT_TOPIC_NAME_PROP = "outputTopicName";
 
+  private final List<StreamSpec> intermediateStreams = new ArrayList<>();
+
+  public static void main(String[] args) throws Exception {
+    CommandLine cmdLine = new CommandLine();
+    Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
+
+    RepartitionJoinWindowApp application = new RepartitionJoinWindowApp();
+    LocalApplicationRunner runner = new LocalApplicationRunner(config);
+
+    runner.run(application);
+    runner.waitForFinish();
+  }
+
   @Override
   public void init(StreamGraph graph, Config config) {
     String inputTopicName1 = config.get(INPUT_TOPIC_NAME_1_PROP);
@@ -56,25 +76,27 @@ public class RepartitionJoinWindowApp implements StreamApplication {
     MessageStream<PageView> pageViews = graph.getInputStream(inputTopicName1, new JsonSerdeV2<>(PageView.class));
     MessageStream<AdClick> adClicks = graph.getInputStream(inputTopicName2, new JsonSerdeV2<>(AdClick.class));
 
-    MessageStream<PageView> pageViewsRepartitionedByViewId = pageViews
+    MessageStream<KV<String, PageView>> pageViewsRepartitionedByViewId = pageViews
         .partitionBy(PageView::getViewId, pv -> pv,
-            new KVSerde<>(new StringSerde(), new JsonSerdeV2<>(PageView.class)), "pageViewsByViewId")
-        .map(KV::getValue);
+            new KVSerde<>(new StringSerde(), new JsonSerdeV2<>(PageView.class)), "pageViewsByViewId");
+
+    MessageStream<PageView> pageViewsRepartitionedByViewIdValueONly = pageViewsRepartitionedByViewId.map(KV::getValue);
 
-    MessageStream<AdClick> adClicksRepartitionedByViewId = adClicks
+    MessageStream<KV<String, AdClick>> adClicksRepartitionedByViewId = adClicks
         .partitionBy(AdClick::getViewId, ac -> ac,
-            new KVSerde<>(new StringSerde(), new JsonSerdeV2<>(AdClick.class)), "adClicksByViewId")
-        .map(KV::getValue);
+            new KVSerde<>(new StringSerde(), new JsonSerdeV2<>(AdClick.class)), "adClicksByViewId");
+    MessageStream<AdClick> adClicksRepartitionedByViewIdValueOnly = adClicksRepartitionedByViewId.map(KV::getValue);
 
-    MessageStream<UserPageAdClick> userPageAdClicks = pageViewsRepartitionedByViewId
-        .join(adClicksRepartitionedByViewId, new UserPageViewAdClicksJoiner(),
+    MessageStream<UserPageAdClick> userPageAdClicks = pageViewsRepartitionedByViewIdValueONly
+        .join(adClicksRepartitionedByViewIdValueOnly, new UserPageViewAdClicksJoiner(),
             new StringSerde(), new JsonSerdeV2<>(PageView.class), new JsonSerdeV2<>(AdClick.class),
             Duration.ofMinutes(1), "pageViewAdClickJoin");
 
-    userPageAdClicks
+    MessageStream<KV<String, UserPageAdClick>> userPageAdClicksByUserId = userPageAdClicks
         .partitionBy(UserPageAdClick::getUserId, upac -> upac,
-            KVSerde.of(new StringSerde(), new JsonSerdeV2<>(UserPageAdClick.class)), "userPageAdClicksByUserId")
-        .map(KV::getValue)
+            KVSerde.of(new StringSerde(), new JsonSerdeV2<>(UserPageAdClick.class)), "userPageAdClicksByUserId");
+
+    userPageAdClicksByUserId.map(KV::getValue)
         .window(Windows.keyedSessionWindow(UserPageAdClick::getUserId, Duration.ofSeconds(3),
             new StringSerde(), new JsonSerdeV2<>(UserPageAdClick.class)), "userAdClickWindow")
         .map(windowPane -> KV.of(windowPane.getKey().getKey(), String.valueOf(windowPane.getMessage().size())))
@@ -82,6 +104,16 @@ public class RepartitionJoinWindowApp implements StreamApplication {
             taskCoordinator.commit(TaskCoordinator.RequestScope.ALL_TASKS_IN_CONTAINER);
             messageCollector.send(new OutgoingMessageEnvelope(new SystemStream("kafka", outputTopic), null, message.getKey(), message.getValue()));
           });
+
+
+    intermediateStreams.add(((IntermediateMessageStreamImpl) pageViewsRepartitionedByViewId).getStreamSpec());
+    intermediateStreams.add(((IntermediateMessageStreamImpl) adClicksRepartitionedByViewId).getStreamSpec());
+    intermediateStreams.add(((IntermediateMessageStreamImpl) userPageAdClicksByUserId).getStreamSpec());
+
+  }
+
+  public List<StreamSpec> getIntermediateStreams() {
+    return intermediateStreams;
   }
 
   private static class UserPageViewAdClicksJoiner implements JoinFunction<String, PageView, AdClick, UserPageAdClick> {

http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionWindowApp.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionWindowApp.java b/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionWindowApp.java
new file mode 100644
index 0000000..e233793
--- /dev/null
+++ b/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionWindowApp.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.test.operator;
+
+import java.time.Duration;
+import org.apache.samza.application.StreamApplication;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.KV;
+import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.operators.windows.Windows;
+import org.apache.samza.runtime.LocalApplicationRunner;
+import org.apache.samza.serializers.IntegerSerde;
+import org.apache.samza.serializers.JsonSerdeV2;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.StringSerde;
+import org.apache.samza.test.operator.data.PageView;
+import org.apache.samza.util.CommandLine;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link StreamApplication} that demonstrates a repartition followed by a windowed count.
+ */
+public class RepartitionWindowApp implements StreamApplication {
+
+  private static final Logger LOG = LoggerFactory.getLogger(RepartitionWindowApp.class);
+
+  static final String INPUT_TOPIC = "page-views";
+  static final String OUTPUT_TOPIC = "Result";
+
+  public static void main(String[] args) {
+    CommandLine cmdLine = new CommandLine();
+    Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
+    RepartitionWindowApp reparApp = new RepartitionWindowApp();
+    LocalApplicationRunner runner = new LocalApplicationRunner(config);
+
+    runner.run(reparApp);
+    runner.waitForFinish();
+  }
+
+  @Override
+  public void init(StreamGraph graph, Config config) {
+
+    KVSerde<String, PageView>
+        pgeMsgSerde = KVSerde.of(new StringSerde("UTF-8"), new JsonSerdeV2<>(PageView.class));
+
+    graph.getInputStream(INPUT_TOPIC, pgeMsgSerde)
+        .map(KV::getValue)
+        .partitionBy(PageView::getUserId, m -> m, pgeMsgSerde, "p1")
+        .window(Windows.keyedSessionWindow(m -> m.getKey(), Duration.ofSeconds(3), () -> 0, (m, c) -> c + 1, new StringSerde("UTF-8"), new IntegerSerde()), "w1")
+        .map(wp -> KV.of(wp.getKey().getKey().toString(), String.valueOf(wp.getMessage())))
+        .sendTo(graph.getOutputStream(OUTPUT_TOPIC));
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-test/src/test/java/org/apache/samza/test/operator/SessionWindowApp.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/operator/SessionWindowApp.java b/samza-test/src/test/java/org/apache/samza/test/operator/SessionWindowApp.java
index 997127e..3224d24 100644
--- a/samza-test/src/test/java/org/apache/samza/test/operator/SessionWindowApp.java
+++ b/samza-test/src/test/java/org/apache/samza/test/operator/SessionWindowApp.java
@@ -19,6 +19,7 @@
 
 package org.apache.samza.test.operator;
 
+import java.time.Duration;
 import org.apache.samza.application.StreamApplication;
 import org.apache.samza.config.Config;
 import org.apache.samza.operators.KV;
@@ -26,13 +27,15 @@ import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.OutputStream;
 import org.apache.samza.operators.StreamGraph;
 import org.apache.samza.operators.windows.Windows;
+import org.apache.samza.runtime.LocalApplicationRunner;
 import org.apache.samza.serializers.IntegerSerde;
 import org.apache.samza.serializers.JsonSerdeV2;
 import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.StringSerde;
 import org.apache.samza.test.operator.data.PageView;
-
-import java.time.Duration;
+import org.apache.samza.util.CommandLine;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * A {@link StreamApplication} that demonstrates a filter followed by a session window.
@@ -40,10 +43,23 @@ import java.time.Duration;
 public class SessionWindowApp implements StreamApplication {
   private static final String INPUT_TOPIC = "page-views";
   private static final String OUTPUT_TOPIC = "page-view-counts";
+
+  private static final Logger LOG = LoggerFactory.getLogger(SessionWindowApp.class);
   private static final String FILTER_KEY = "badKey";
 
+  public static void main(String[] args) {
+    CommandLine cmdLine = new CommandLine();
+    Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
+    SessionWindowApp app = new SessionWindowApp();
+    LocalApplicationRunner runner = new LocalApplicationRunner(config);
+
+    runner.run(app);
+    runner.waitForFinish();
+  }
+
   @Override
   public void init(StreamGraph graph, Config config) {
+
     MessageStream<PageView> pageViews = graph.getInputStream(INPUT_TOPIC, new JsonSerdeV2<>(PageView.class));
     OutputStream<KV<String, Integer>> outputStream =
         graph.getOutputStream(OUTPUT_TOPIC, new KVSerde<>(new StringSerde(), new IntegerSerde()));
@@ -54,5 +70,6 @@ public class SessionWindowApp implements StreamApplication {
             new StringSerde(), new JsonSerdeV2<>(PageView.class)), "sessionWindow")
         .map(m -> KV.of(m.getKey().getKey(), m.getMessage().size()))
         .sendTo(outputStream);
+
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java b/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java
index 77cd19a..5424888 100644
--- a/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java
+++ b/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java
@@ -122,7 +122,7 @@ public class TestRepartitionJoinWindowApp extends StreamApplicationIntegrationTe
 
     // Verify that messages in the intermediate stream will be deleted in 10 seconds
     long startTimeMs = System.currentTimeMillis();
-    for (StreamSpec spec: runner.getExecutionPlan(app).getIntermediateStreams()) {
+    for (StreamSpec spec: app.getIntermediateStreams()) {
       long remainingMessageNum = -1;
 
       while (remainingMessageNum != 0 && System.currentTimeMillis() - startTimeMs < 10000) {

http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionWindowApp.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionWindowApp.java b/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionWindowApp.java
new file mode 100644
index 0000000..fbc315f
--- /dev/null
+++ b/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionWindowApp.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.test.operator;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.samza.config.JobCoordinatorConfig;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.TaskConfig;
+import org.apache.samza.test.operator.data.PageView;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.samza.test.operator.RepartitionWindowApp.*;
+
+/**
+ * Test driver for {@link RepartitionWindowApp}.
+ */
+public class TestRepartitionWindowApp extends StreamApplicationIntegrationTestHarness {
+
+  static final String APP_NAME = "PageViewCounterApp";
+
+  @Test
+  public void testRepartitionedSessionWindowCounter() throws Exception {
+    // create topics
+    createTopic(INPUT_TOPIC, 3);
+    createTopic(OUTPUT_TOPIC, 1);
+
+    // produce messages to different partitions.
+    ObjectMapper mapper = new ObjectMapper();
+    PageView pv = new PageView("india", "5.com", "userId1");
+    produceMessage(INPUT_TOPIC, 0, "userId1", mapper.writeValueAsString(pv));
+    pv = new PageView("china", "4.com", "userId2");
+    produceMessage(INPUT_TOPIC, 1, "userId2", mapper.writeValueAsString(pv));
+    pv = new PageView("india", "1.com", "userId1");
+    produceMessage(INPUT_TOPIC, 2, "userId1", mapper.writeValueAsString(pv));
+    pv = new PageView("india", "2.com", "userId1");
+    produceMessage(INPUT_TOPIC, 0, "userId1", mapper.writeValueAsString(pv));
+    pv = new PageView("india", "3.com", "userId1");
+    produceMessage(INPUT_TOPIC, 1, "userId1", mapper.writeValueAsString(pv));
+
+    Map<String, String> configs = new HashMap<>();
+    configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, "org.apache.samza.standalone.PassthroughJobCoordinatorFactory");
+    configs.put(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, "org.apache.samza.standalone.PassthroughCoordinationUtilsFactory");
+    configs.put(TaskConfig.GROUPER_FACTORY(), "org.apache.samza.container.grouper.task.GroupByContainerIdsFactory");
+    configs.put(String.format("streams.%s.samza.msg.serde", INPUT_TOPIC), "string");
+    configs.put(String.format("streams.%s.samza.key.serde", INPUT_TOPIC), "string");
+
+    // run the application
+    runApplication(new RepartitionWindowApp(), APP_NAME, new MapConfig(configs));
+
+    // consume and validate result
+    List<ConsumerRecord<String, String>> messages = consumeMessages(Collections.singletonList(OUTPUT_TOPIC), 2);
+    Assert.assertEquals(messages.size(), 2);
+
+    for (ConsumerRecord<String, String> message : messages) {
+      String key = message.key();
+      String value = message.value();
+      // Assert that there are 4 messages for userId1 and 1 message for userId2.
+      Assert.assertTrue(key.equals("userId1") || key.equals("userId2"));
+      if ("userId1".equals(key)) {
+        Assert.assertEquals(value, "4");
+      } else {
+        Assert.assertEquals(value, "1");
+      }
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-test/src/test/java/org/apache/samza/test/operator/TumblingWindowApp.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/operator/TumblingWindowApp.java b/samza-test/src/test/java/org/apache/samza/test/operator/TumblingWindowApp.java
index 5d2a17c..40a3f91 100644
--- a/samza-test/src/test/java/org/apache/samza/test/operator/TumblingWindowApp.java
+++ b/samza-test/src/test/java/org/apache/samza/test/operator/TumblingWindowApp.java
@@ -19,6 +19,7 @@
 
 package org.apache.samza.test.operator;
 
+import java.time.Duration;
 import org.apache.samza.application.StreamApplication;
 import org.apache.samza.config.Config;
 import org.apache.samza.operators.KV;
@@ -26,13 +27,16 @@ import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.OutputStream;
 import org.apache.samza.operators.StreamGraph;
 import org.apache.samza.operators.windows.Windows;
+import org.apache.samza.runtime.LocalApplicationRunner;
 import org.apache.samza.serializers.IntegerSerde;
 import org.apache.samza.serializers.JsonSerdeV2;
 import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.StringSerde;
 import org.apache.samza.test.operator.data.PageView;
+import org.apache.samza.util.CommandLine;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import java.time.Duration;
 
 /**
  * A {@link StreamApplication} that demonstrates a filter followed by a tumbling window.
@@ -40,10 +44,23 @@ import java.time.Duration;
 public class TumblingWindowApp implements StreamApplication {
   private static final String INPUT_TOPIC = "page-views";
   private static final String OUTPUT_TOPIC = "page-view-counts";
+
+  private static final Logger LOG = LoggerFactory.getLogger(TumblingWindowApp.class);
   private static final String FILTER_KEY = "badKey";
 
+  public static void main(String[] args) {
+    CommandLine cmdLine = new CommandLine();
+    Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
+    TumblingWindowApp app = new TumblingWindowApp();
+    LocalApplicationRunner runner = new LocalApplicationRunner(config);
+
+    runner.run(app);
+    runner.waitForFinish();
+  }
+
   @Override
   public void init(StreamGraph graph, Config config) {
+
     MessageStream<PageView> pageViews =
         graph.getInputStream(INPUT_TOPIC, new JsonSerdeV2<>(PageView.class));
     OutputStream<KV<String, Integer>> outputStream =
@@ -55,5 +72,6 @@ public class TumblingWindowApp implements StreamApplication {
             new StringSerde(), new JsonSerdeV2<>(PageView.class)), "tumblingWindow")
         .map(m -> KV.of(m.getKey().getKey(), m.getMessage().size()))
         .sendTo(outputStream);
+
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-test/src/test/java/org/apache/samza/test/operator/data/PageView.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/operator/data/PageView.java b/samza-test/src/test/java/org/apache/samza/test/operator/data/PageView.java
index b114b43..e950366 100644
--- a/samza-test/src/test/java/org/apache/samza/test/operator/data/PageView.java
+++ b/samza-test/src/test/java/org/apache/samza/test/operator/data/PageView.java
@@ -19,9 +19,10 @@
 package org.apache.samza.test.operator.data;
 
 
+import java.io.Serializable;
 import org.codehaus.jackson.annotate.JsonProperty;
 
-public class PageView {
+public class PageView implements Serializable {
   private String viewId;
   private String pageId;
   private String userId;

http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-test/src/test/java/org/apache/samza/test/processor/SharedContextFactories.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/processor/SharedContextFactories.java b/samza-test/src/test/java/org/apache/samza/test/processor/SharedContextFactories.java
new file mode 100644
index 0000000..9072bd2
--- /dev/null
+++ b/samza-test/src/test/java/org/apache/samza/test/processor/SharedContextFactories.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.test.processor;
+
+import java.util.HashMap;
+import java.util.Map;
+
+
+/**
+ * Shared context factories used in unit tests. This is a temporarily solution to enable sharing of test latches in different
+ * scope of context (i.e. in the container or the task). This is not intended for production usage.
+ */
+public class SharedContextFactories {
+
+  public static class SharedContextFactory {
+    static Map<String, SharedContextFactory> sharedContextFactories = new HashMap<>();
+
+    Map<String, Object> sharedObjects = new HashMap<>();
+
+    public Object getSharedObject(String resourceName) {
+      return this.sharedObjects.get(resourceName);
+    }
+
+    public void addSharedObject(String resourceName, Object sharedObj) {
+      this.sharedObjects.put(resourceName, sharedObj);
+    }
+
+    public static SharedContextFactory getInstance(String taskName) {
+      if (sharedContextFactories.get(taskName) == null) {
+        sharedContextFactories.putIfAbsent(taskName, new SharedContextFactory());
+      }
+      return sharedContextFactories.get(taskName);
+    }
+
+    public static void clearAll() {
+      sharedContextFactories.clear();
+    }
+  }
+
+  public static class ProcessorSharedContextFactory extends SharedContextFactory {
+    static Map<String, ProcessorSharedContextFactory> processorSharedFactories = new HashMap<>();
+
+    private final String processorId;
+
+    SharedContextFactory getTaskSharedContextFactory(String taskName) {
+      String globalTaskName = String.format("%s-%s", this.processorId, taskName);
+      return SharedContextFactory.getInstance(globalTaskName);
+    }
+
+    public static ProcessorSharedContextFactory getInstance(String processorId) {
+      if (processorSharedFactories.get(processorId) == null) {
+        processorSharedFactories.putIfAbsent(processorId, new ProcessorSharedContextFactory(processorId));
+      }
+      return processorSharedFactories.get(processorId);
+    }
+
+    ProcessorSharedContextFactory(String processorId) {
+      this.processorId = processorId;
+    }
+
+    public static void clearAll() {
+      processorSharedFactories.clear();
+    }
+  }
+
+  public static class GlobalSharedContextFactory extends SharedContextFactory {
+    static Map<String, GlobalSharedContextFactory> globalSharedContextFactories = new HashMap<>();
+
+    private final String appName;
+
+    GlobalSharedContextFactory(String appName) {
+      this.appName = appName;
+    }
+
+    ProcessorSharedContextFactory getProcessorSharedContextFactory(String processorName) {
+      String globalProcessorName = String.format("%s-%s", this.appName, processorName);
+      return ProcessorSharedContextFactory.getInstance(globalProcessorName);
+    }
+
+    public static GlobalSharedContextFactory getInstance(String appName) {
+      if (globalSharedContextFactories.get(appName) == null) {
+        globalSharedContextFactories.putIfAbsent(appName, new GlobalSharedContextFactory(appName));
+      }
+      return globalSharedContextFactories.get(appName);
+    }
+
+    public static void clearAll() {
+      globalSharedContextFactories.clear();
+    }
+  }
+
+  public static GlobalSharedContextFactory getGlobalSharedContextFactory(String appName) {
+    return GlobalSharedContextFactory.getInstance(appName);
+  }
+
+  public static void clearAll() {
+    GlobalSharedContextFactory.clearAll();
+    ProcessorSharedContextFactory.clearAll();
+    SharedContextFactory.clearAll();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamApplication.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamApplication.java b/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamApplication.java
new file mode 100644
index 0000000..db12351
--- /dev/null
+++ b/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamApplication.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.test.processor;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.Serializable;
+import java.util.concurrent.CountDownLatch;
+import org.apache.samza.application.StreamApplication;
+import org.apache.samza.config.ApplicationConfig;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.OutputStream;
+import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.operators.functions.MapFunction;
+import org.apache.samza.serializers.NoOpSerde;
+import org.apache.samza.serializers.StringSerde;
+
+
+/**
+ * Test class to create an {@link StreamApplication} instance
+ */
+public class TestStreamApplication implements StreamApplication, Serializable {
+
+  private final String inputTopic;
+  private final String outputTopic;
+  private final String appName;
+  private final String processorName;
+
+  private TestStreamApplication(String inputTopic, String outputTopic, String appName, String processorName) {
+    this.inputTopic = inputTopic;
+    this.outputTopic = outputTopic;
+    this.appName = appName;
+    this.processorName = processorName;
+  }
+
+  @Override
+  public void init(StreamGraph graph, Config config) {
+    MessageStream<String> inputStream = graph.getInputStream(inputTopic, new NoOpSerde<String>());
+    OutputStream<String> outputStream = graph.getOutputStream(outputTopic, new StringSerde());
+    inputStream.map(new MapFunction<String, String>() {
+      transient CountDownLatch latch1;
+      transient CountDownLatch latch2;
+      transient StreamApplicationCallback callback;
+
+      @Override
+      public String apply(String message) {
+        TestKafkaEvent incomingMessage = TestKafkaEvent.fromString(message);
+        if (callback != null) {
+          callback.onMessage(incomingMessage);
+        }
+        if (latch1 != null) {
+          latch1.countDown();
+        }
+        if (latch2 != null) {
+          latch2.countDown();
+        }
+        return incomingMessage.toString();
+      }
+
+      private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+        in.defaultReadObject();
+        SharedContextFactories.SharedContextFactory contextFactory =
+            SharedContextFactories.getGlobalSharedContextFactory(appName).getProcessorSharedContextFactory(processorName);
+        this.latch1 = (CountDownLatch) contextFactory.getSharedObject("processedMsgLatch");
+        this.latch2 = (CountDownLatch) contextFactory.getSharedObject("kafkaMsgsConsumedLatch");
+        this.callback = (StreamApplicationCallback) contextFactory.getSharedObject("callback");
+      }
+    }).sendTo(outputStream);
+  }
+
+  public interface StreamApplicationCallback {
+    void onMessage(TestKafkaEvent m);
+  }
+
+  public static class TestKafkaEvent implements Serializable {
+
+    // Actual content of the event.
+    private String eventData;
+
+    // Contains Integer value, which is greater than previous message id.
+    private String eventId;
+
+    TestKafkaEvent(String eventId, String eventData) {
+      this.eventData = eventData;
+      this.eventId = eventId;
+    }
+
+    String getEventId() {
+      return eventId;
+    }
+
+    String getEventData() {
+      return eventData;
+    }
+
+    @Override
+    public String toString() {
+      return eventId + "|" + eventData;
+    }
+
+    static TestKafkaEvent fromString(String message) {
+      String[] messageComponents = message.split("|");
+      return new TestKafkaEvent(messageComponents[0], messageComponents[1]);
+    }
+  }
+
+  public static StreamApplication getInstance(
+      String inputTopic,
+      String outputTopic,
+      CountDownLatch processedMessageLatch,
+      StreamApplicationCallback callback,
+      CountDownLatch kafkaEventsConsumedLatch,
+      Config config) {
+    String appName = String.format("%s-%s", config.get(ApplicationConfig.APP_NAME), config.get(ApplicationConfig.APP_ID));
+    String processorName = config.get(JobConfig.PROCESSOR_ID());
+    registerLatches(processedMessageLatch, kafkaEventsConsumedLatch, callback, appName, processorName);
+
+    StreamApplication app = new TestStreamApplication(inputTopic, outputTopic, appName, processorName);
+    return app;
+  }
+
+  private static void registerLatches(CountDownLatch processedMessageLatch, CountDownLatch kafkaEventsConsumedLatch,
+      StreamApplicationCallback callback, String appName, String processorName) {
+    SharedContextFactories.SharedContextFactory contextFactory = SharedContextFactories.getGlobalSharedContextFactory(appName).getProcessorSharedContextFactory(processorName);
+    contextFactory.addSharedObject("processedMsgLatch", processedMessageLatch);
+    contextFactory.addSharedObject("kafkaMsgsConsumedLatch", kafkaEventsConsumedLatch);
+    contextFactory.addSharedObject("callback", callback);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
index 9d2cd92..0b0a271 100644
--- a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
+++ b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
@@ -23,6 +23,14 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
 import kafka.admin.AdminUtils;
 import kafka.utils.TestUtils;
 import org.I0Itec.zkclient.ZkClient;
@@ -42,33 +50,23 @@ import org.apache.samza.container.TaskName;
 import org.apache.samza.job.ApplicationStatus;
 import org.apache.samza.job.model.JobModel;
 import org.apache.samza.job.model.TaskModel;
-import org.apache.samza.operators.MessageStream;
-import org.apache.samza.operators.OutputStream;
-import org.apache.samza.operators.StreamGraph;
 import org.apache.samza.runtime.LocalApplicationRunner;
-import org.apache.samza.serializers.NoOpSerde;
-import org.apache.samza.serializers.StringSerde;
 import org.apache.samza.test.StandaloneIntegrationTestHarness;
 import org.apache.samza.test.StandaloneTestUtils;
+import org.apache.samza.test.processor.TestStreamApplication.StreamApplicationCallback;
+import org.apache.samza.test.processor.TestStreamApplication.TestKafkaEvent;
 import org.apache.samza.util.NoOpMetricsRegistry;
 import org.apache.samza.zk.ZkJobCoordinatorFactory;
 import org.apache.samza.zk.ZkKeyBuilder;
 import org.apache.samza.zk.ZkUtils;
-import org.junit.*;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
 import org.junit.rules.ExpectedException;
 import org.junit.rules.Timeout;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.Serializable;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.UUID;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicBoolean;
-
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
@@ -102,9 +100,6 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne
   private ApplicationConfig applicationConfig1;
   private ApplicationConfig applicationConfig2;
   private ApplicationConfig applicationConfig3;
-  private LocalApplicationRunner applicationRunner1;
-  private LocalApplicationRunner applicationRunner2;
-  private LocalApplicationRunner applicationRunner3;
   private String testStreamAppName;
   private String testStreamAppId;
 
@@ -141,11 +136,6 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne
     zkUtils = new ZkUtils(zkKeyBuilder, zkClient, ZK_CONNECTION_TIMEOUT_MS, ZK_SESSION_TIMEOUT_MS, new NoOpMetricsRegistry());
     zkUtils.connect();
 
-    // Create local application runners.
-    applicationRunner1 = new LocalApplicationRunner(applicationConfig1);
-    applicationRunner2 = new LocalApplicationRunner(applicationConfig2);
-    applicationRunner3 = new LocalApplicationRunner(applicationConfig3);
-
     for (String kafkaTopic : ImmutableList.of(inputKafkaTopic, outputKafkaTopic)) {
       LOGGER.info("Creating kafka topic: {}.", kafkaTopic);
       TestUtils.createTopic(zkUtils(), kafkaTopic, 5, 1, servers(), new Properties());
@@ -251,9 +241,10 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne
 
     // Set up stream app 2.
     CountDownLatch processedMessagesLatch = new CountDownLatch(NUM_KAFKA_EVENTS);
-    LocalApplicationRunner localApplicationRunner2 = new LocalApplicationRunner(new MapConfig(applicationConfig2, testConfig));
-    StreamApplication streamApp2 = new TestStreamApplication(inputSinglePartitionKafkaTopic, outputSinglePartitionKafkaTopic,
-        processedMessagesLatch, null, null);
+    Config testAppConfig2 = new MapConfig(applicationConfig2, testConfig);
+    LocalApplicationRunner localApplicationRunner2 = new LocalApplicationRunner(testAppConfig2);
+    StreamApplication streamApp2 = TestStreamApplication.getInstance(inputSinglePartitionKafkaTopic, outputSinglePartitionKafkaTopic,
+        processedMessagesLatch, null, null, testAppConfig2);
 
     // Callback handler for streamApp1.
     StreamApplicationCallback streamApplicationCallback = message -> {
@@ -272,9 +263,10 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne
     CountDownLatch kafkaEventsConsumedLatch = new CountDownLatch(NUM_KAFKA_EVENTS * 2);
 
     // Set up stream app 1.
-    LocalApplicationRunner localApplicationRunner1 = new LocalApplicationRunner(new MapConfig(applicationConfig1, testConfig));
-    StreamApplication streamApp1 = new TestStreamApplication(inputSinglePartitionKafkaTopic, outputSinglePartitionKafkaTopic,
-        null, streamApplicationCallback, kafkaEventsConsumedLatch);
+    Config testAppConfig1 = new MapConfig(applicationConfig1, testConfig);
+    LocalApplicationRunner localApplicationRunner1 = new LocalApplicationRunner(testAppConfig1);
+    StreamApplication streamApp1 = TestStreamApplication.getInstance(inputSinglePartitionKafkaTopic, outputSinglePartitionKafkaTopic,
+        null, streamApplicationCallback, kafkaEventsConsumedLatch, testAppConfig1);
     localApplicationRunner1.run(streamApp1);
 
     kafkaEventsConsumedLatch.await();
@@ -288,6 +280,13 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne
     // TODO: After SAMZA-1364 add assertion for localApplicationRunner2.status(streamApp)
     // ProcessedMessagesLatch shouldn't have changed. Should retain it's initial value.
     assertEquals(NUM_KAFKA_EVENTS, processedMessagesLatch.getCount());
+
+    // TODO: re-enable the following kill and waitForFinish calls after fixing SAMZA-1665
+    // localApplicationRunner1.kill(streamApp1);
+    // localApplicationRunner2.kill(streamApp2);
+
+    // localApplicationRunner1.waitForFinish();
+    // localApplicationRunner2.waitForFinish();
   }
 
   /**
@@ -326,8 +325,9 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne
 
     // Set up streamApp2.
     CountDownLatch processedMessagesLatch = new CountDownLatch(NUM_KAFKA_EVENTS * 2);
-    LocalApplicationRunner localApplicationRunner2 = new LocalApplicationRunner(new MapConfig(applicationConfig2, testConfig));
-    StreamApplication streamApp2 = new TestStreamApplication(inputKafkaTopic, outputKafkaTopic, processedMessagesLatch, null, null);
+    Config testAppConfig2 = new MapConfig(applicationConfig2, testConfig);
+    LocalApplicationRunner localApplicationRunner2 = new LocalApplicationRunner(testAppConfig2);
+    StreamApplication streamApp2 = TestStreamApplication.getInstance(inputKafkaTopic, outputKafkaTopic, processedMessagesLatch, null, null, testAppConfig2);
 
     // Callback handler for streamApp1.
     StreamApplicationCallback streamApplicationCallback = message -> {
@@ -348,9 +348,10 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne
     CountDownLatch kafkaEventsConsumedLatch = new CountDownLatch(NUM_KAFKA_EVENTS * 2 + 1);
 
     // Set up stream app 1.
-    LocalApplicationRunner localApplicationRunner1 = new LocalApplicationRunner(new MapConfig(applicationConfig1, testConfig));
-    StreamApplication streamApp1 = new TestStreamApplication(inputKafkaTopic, outputKafkaTopic, null,
-        streamApplicationCallback, kafkaEventsConsumedLatch);
+    Config testAppConfig1 = new MapConfig(applicationConfig1, testConfig);
+    LocalApplicationRunner localApplicationRunner1 = new LocalApplicationRunner(testAppConfig1);
+    StreamApplication streamApp1 = TestStreamApplication.getInstance(inputKafkaTopic, outputKafkaTopic, null,
+        streamApplicationCallback, kafkaEventsConsumedLatch, testAppConfig1);
     localApplicationRunner1.run(streamApp1);
 
     kafkaEventsConsumedLatch.await();
@@ -381,8 +382,16 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne
     assertEquals(taskModel1.getSystemStreamPartitions(), taskModel2.getSystemStreamPartitions());
     assertTrue(!taskModel1.getTaskName().getTaskName().equals(taskModel2.getTaskName().getTaskName()));
 
-    // TODO: After SAMZA-1364 add assertion for localApplicationRunner2.status(streamApp)
     processedMessagesLatch.await();
+
+    assertEquals(ApplicationStatus.Running, localApplicationRunner2.status(streamApp2));
+
+    // TODO: re-enable the following kill and waitForFinish calls after fixing SAMZA-1665
+    // localApplicationRunner1.kill(streamApp1);
+    // localApplicationRunner2.kill(streamApp2);
+
+    // localApplicationRunner1.waitForFinish();
+    // localApplicationRunner2.waitForFinish();
   }
 
   @Test
@@ -396,8 +405,13 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne
     CountDownLatch processedMessagesLatch2 = new CountDownLatch(1);
     CountDownLatch processedMessagesLatch3 = new CountDownLatch(1);
 
-    StreamApplication streamApp1 = new TestStreamApplication(inputKafkaTopic, outputKafkaTopic, processedMessagesLatch1, null, kafkaEventsConsumedLatch);
-    StreamApplication streamApp2 = new TestStreamApplication(inputKafkaTopic, outputKafkaTopic, processedMessagesLatch2, null, kafkaEventsConsumedLatch);
+    StreamApplication streamApp1 = TestStreamApplication.getInstance(inputKafkaTopic, outputKafkaTopic, processedMessagesLatch1, null, kafkaEventsConsumedLatch, applicationConfig1);
+    StreamApplication streamApp2 = TestStreamApplication.getInstance(inputKafkaTopic, outputKafkaTopic, processedMessagesLatch2, null, kafkaEventsConsumedLatch, applicationConfig2);
+    StreamApplication streamApp3 = TestStreamApplication.getInstance(inputKafkaTopic, outputKafkaTopic, processedMessagesLatch3, null, kafkaEventsConsumedLatch, applicationConfig3);
+
+    // Create LocalApplicationRunners
+    LocalApplicationRunner applicationRunner1 = new LocalApplicationRunner(applicationConfig1);
+    LocalApplicationRunner applicationRunner2 = new LocalApplicationRunner(applicationConfig2);
 
     // Run stream applications.
     applicationRunner1.run(streamApp1);
@@ -428,7 +442,7 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne
     kafkaEventsConsumedLatch.await();
     publishKafkaEvents(inputKafkaTopic, 0, 2 * NUM_KAFKA_EVENTS, PROCESSOR_IDS[0]);
 
-    StreamApplication streamApp3 = new TestStreamApplication(inputKafkaTopic, outputKafkaTopic, processedMessagesLatch3, null, kafkaEventsConsumedLatch);
+    LocalApplicationRunner applicationRunner3 = new LocalApplicationRunner(applicationConfig3);
     applicationRunner3.run(streamApp3);
     processedMessagesLatch3.await();
 
@@ -441,6 +455,13 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne
     jobModel = zkUtils.getJobModel(jobModelVersion);
     assertEquals(Sets.newHashSet("0000000001", "0000000002"), jobModel.getContainers().keySet());
     assertEquals(2, jobModel.getContainers().size());
+
+    // TODO: re-enable the following kill and waitForFinish calls after fixing SAMZA-1665
+    // applicationRunner2.kill(streamApp2);
+    // applicationRunner3.kill(streamApp3);
+
+    // applicationRunner2.waitForFinish();
+    // applicationRunner3.waitForFinish();
   }
 
   @Test
@@ -453,8 +474,12 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne
     CountDownLatch processedMessagesLatch1 = new CountDownLatch(1);
     CountDownLatch processedMessagesLatch2 = new CountDownLatch(1);
 
-    StreamApplication streamApp1 = new TestStreamApplication(inputKafkaTopic, outputKafkaTopic, processedMessagesLatch1, null, kafkaEventsConsumedLatch);
-    StreamApplication streamApp2 = new TestStreamApplication(inputKafkaTopic, outputKafkaTopic, processedMessagesLatch2, null, kafkaEventsConsumedLatch);
+    StreamApplication streamApp1 = TestStreamApplication.getInstance(inputKafkaTopic, outputKafkaTopic, processedMessagesLatch1, null, kafkaEventsConsumedLatch, applicationConfig1);
+    StreamApplication streamApp2 = TestStreamApplication.getInstance(inputKafkaTopic, outputKafkaTopic, processedMessagesLatch2, null, kafkaEventsConsumedLatch, applicationConfig2);
+
+    // Create LocalApplicationRunners
+    LocalApplicationRunner applicationRunner1 = new LocalApplicationRunner(applicationConfig1);
+    LocalApplicationRunner applicationRunner2 = new LocalApplicationRunner(applicationConfig2);
 
     // Run stream applications.
     applicationRunner1.run(streamApp1);
@@ -464,15 +489,24 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne
     processedMessagesLatch1.await();
     processedMessagesLatch2.await();
 
-    LocalApplicationRunner applicationRunner3 = new LocalApplicationRunner(new MapConfig(applicationConfig2));
+    LocalApplicationRunner applicationRunner3 = new LocalApplicationRunner(applicationConfig2);
 
     // Create a stream app with same processor id as SP2 and run it. It should fail.
     publishKafkaEvents(inputKafkaTopic, NUM_KAFKA_EVENTS, 2 * NUM_KAFKA_EVENTS, PROCESSOR_IDS[2]);
     kafkaEventsConsumedLatch = new CountDownLatch(NUM_KAFKA_EVENTS);
-    StreamApplication streamApp3 = new TestStreamApplication(inputKafkaTopic, outputKafkaTopic, null, null, kafkaEventsConsumedLatch);
+    StreamApplication streamApp3 = TestStreamApplication.getInstance(inputKafkaTopic, outputKafkaTopic, null, null, kafkaEventsConsumedLatch, applicationConfig2);
     // Fail when the duplicate processor joins.
     expectedException.expect(SamzaException.class);
-    applicationRunner3.run(streamApp3);
+    try {
+      applicationRunner3.run(streamApp3);
+    } finally {
+      // TODO: re-enable the following kill and waitForFinish calls after fixing SAMZA-1665
+      // applicationRunner1.kill(streamApp1);
+      // applicationRunner2.kill(streamApp2);
+
+      // applicationRunner1.waitForFinish();
+      // applicationRunner2.waitForFinish();
+    }
   }
 
   @Test
@@ -496,13 +530,16 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne
     LocalApplicationRunner applicationRunner1 = new LocalApplicationRunner(applicationConfig1);
     LocalApplicationRunner applicationRunner2 = new LocalApplicationRunner(applicationConfig2);
 
+    List<TestKafkaEvent> messagesProcessed = new ArrayList<>();
+    StreamApplicationCallback streamApplicationCallback = m -> messagesProcessed.add(m);
+
     // Create StreamApplication from configuration.
     CountDownLatch kafkaEventsConsumedLatch = new CountDownLatch(NUM_KAFKA_EVENTS);
     CountDownLatch processedMessagesLatch1 = new CountDownLatch(1);
     CountDownLatch processedMessagesLatch2 = new CountDownLatch(1);
 
-    StreamApplication streamApp1 = new TestStreamApplication(inputKafkaTopic, outputKafkaTopic, processedMessagesLatch1, null, kafkaEventsConsumedLatch);
-    StreamApplication streamApp2 = new TestStreamApplication(inputKafkaTopic, outputKafkaTopic, processedMessagesLatch2, null, kafkaEventsConsumedLatch);
+    StreamApplication streamApp1 = TestStreamApplication.getInstance(inputKafkaTopic, outputKafkaTopic, processedMessagesLatch1, streamApplicationCallback, kafkaEventsConsumedLatch, applicationConfig1);
+    StreamApplication streamApp2 = TestStreamApplication.getInstance(inputKafkaTopic, outputKafkaTopic, processedMessagesLatch2, null, kafkaEventsConsumedLatch, applicationConfig2);
 
     // Run stream application.
     applicationRunner1.run(streamApp1);
@@ -521,7 +558,7 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne
     LocalApplicationRunner applicationRunner4 = new LocalApplicationRunner(applicationConfig1);
     processedMessagesLatch1 = new CountDownLatch(1);
     publishKafkaEvents(inputKafkaTopic, NUM_KAFKA_EVENTS, 2 * NUM_KAFKA_EVENTS, PROCESSOR_IDS[0]);
-    streamApp1 = new TestStreamApplication(inputKafkaTopic, outputKafkaTopic, processedMessagesLatch1, null, kafkaEventsConsumedLatch);
+    streamApp1 = TestStreamApplication.getInstance(inputKafkaTopic, outputKafkaTopic, processedMessagesLatch1, streamApplicationCallback, kafkaEventsConsumedLatch, applicationConfig1);
     applicationRunner4.run(streamApp1);
 
     processedMessagesLatch1.await();
@@ -532,85 +569,13 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne
 
     assertEquals(Integer.parseInt(jobModelVersion) + 1, Integer.parseInt(newJobModelVersion));
     assertEquals(jobModel.getContainers(), newJobModel.getContainers());
-  }
-
-  public interface StreamApplicationCallback {
-    void onMessageReceived(TestKafkaEvent message);
-  }
-
-  private static class TestKafkaEvent implements Serializable {
-
-    // Actual content of the event.
-    private String eventData;
-
-    // Contains Integer value, which is greater than previous message id.
-    private String eventId;
 
-    TestKafkaEvent(String eventId, String eventData) {
-      this.eventData = eventData;
-      this.eventId = eventId;
-    }
-
-    String getEventId() {
-      return eventId;
-    }
-
-    String getEventData() {
-      return eventData;
-    }
-
-    @Override
-    public String toString() {
-      return eventId + "|" + eventData;
-    }
+    // TODO: re-enable the following kill and waitForFinish calls after fixing SAMZA-1665
+    // applicationRunner2.kill(streamApp2);
+    // applicationRunner4.kill(streamApp1);
 
-    static TestKafkaEvent fromString(String message) {
-      String[] messageComponents = message.split("\\|");
-      return new TestKafkaEvent(messageComponents[0], messageComponents[1]);
-    }
+    // applicationRunner2.waitForFinish();
+    // applicationRunner4.waitForFinish();
   }
 
-  /**
-   * Publishes all input events to output topic(has no processing logic)
-   * and triggers {@link StreamApplicationCallback} with each received event.
-   **/
-  private static class TestStreamApplication implements StreamApplication {
-
-    private final String inputTopic;
-    private final String outputTopic;
-    private final CountDownLatch processedMessagesLatch;
-    private final StreamApplicationCallback streamApplicationCallback;
-    private final CountDownLatch kafkaEventsConsumedLatch;
-
-    TestStreamApplication(String inputTopic, String outputTopic,
-        CountDownLatch processedMessagesLatch,
-        StreamApplicationCallback streamApplicationCallback, CountDownLatch kafkaEventsConsumedLatch) {
-      this.inputTopic = inputTopic;
-      this.outputTopic = outputTopic;
-      this.processedMessagesLatch = processedMessagesLatch;
-      this.streamApplicationCallback = streamApplicationCallback;
-      this.kafkaEventsConsumedLatch = kafkaEventsConsumedLatch;
-    }
-
-    @Override
-    public void init(StreamGraph graph, Config config) {
-      MessageStream<String> inputStream = graph.getInputStream(inputTopic, new NoOpSerde<String>());
-      OutputStream<String> outputStream = graph.getOutputStream(outputTopic, new StringSerde());
-      inputStream
-          .map(msg -> {
-              TestKafkaEvent incomingMessage = TestKafkaEvent.fromString((String) msg);
-              if (streamApplicationCallback != null) {
-                streamApplicationCallback.onMessageReceived(incomingMessage);
-              }
-              if (processedMessagesLatch != null) {
-                processedMessagesLatch.countDown();
-              }
-              if (kafkaEventsConsumedLatch != null) {
-                kafkaEventsConsumedLatch.countDown();
-              }
-              return incomingMessage.toString();
-            })
-          .sendTo(outputStream);
-    }
-  }
 }