You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2017/04/05 23:42:30 UTC

[2/4] samza git commit: SAMZA-1094 SAMZA-1101 SAMZA-1159; Remove MessageEnvelope from public operator APIs. : Delay the creation of SinkFunction for output streams. : Move StreamSpec from a public API to an internal class.

http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/test/java/org/apache/samza/example/TestBroadcastExample.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/TestBroadcastExample.java b/samza-core/src/test/java/org/apache/samza/example/TestBroadcastExample.java
deleted file mode 100644
index 1c30a21..0000000
--- a/samza-core/src/test/java/org/apache/samza/example/TestBroadcastExample.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.example;
-
-import java.time.Duration;
-import java.util.Set;
-import java.util.function.Supplier;
-
-import org.apache.samza.config.Config;
-import org.apache.samza.operators.MessageStream;
-import org.apache.samza.operators.StreamGraph;
-import org.apache.samza.operators.data.InputMessageEnvelope;
-import org.apache.samza.operators.data.JsonIncomingSystemMessageEnvelope;
-import org.apache.samza.operators.data.Offset;
-import org.apache.samza.operators.functions.FoldLeftFunction;
-import org.apache.samza.operators.triggers.Triggers;
-import org.apache.samza.operators.windows.Windows;
-import org.apache.samza.system.StreamSpec;
-import org.apache.samza.system.SystemStreamPartition;
-
-
-/**
- * Example implementation of split stream tasks
- *
- */
-public class TestBroadcastExample extends TestExampleBase {
-
-  TestBroadcastExample(Set<SystemStreamPartition> inputs) {
-    super(inputs);
-  }
-
-  class MessageType {
-    String field1;
-    String field2;
-    String field3;
-    String field4;
-    String parKey;
-    private long timestamp;
-
-    public long getTimestamp() {
-      return this.timestamp;
-    }
-  }
-
-  class JsonMessageEnvelope extends JsonIncomingSystemMessageEnvelope<MessageType> {
-    JsonMessageEnvelope(String key, MessageType data, Offset offset, SystemStreamPartition partition) {
-      super(key, data, offset, partition);
-    }
-  }
-
-  @Override
-  public void init(StreamGraph graph, Config config) {
-    FoldLeftFunction<JsonMessageEnvelope, Integer> sumAggregator = (m, c) -> c + 1;
-    Supplier<Integer> initialValue = () -> 0;
-
-    inputs.keySet().forEach(entry -> {
-        MessageStream<JsonMessageEnvelope> inputStream = graph.<Object, Object, InputMessageEnvelope>createInStream(
-                new StreamSpec(entry.getSystem() + "-" + entry.getStream(), entry.getStream(), entry.getSystem()), null, null).map(this::getInputMessage);
-
-        inputStream.filter(this::myFilter1).window(Windows.tumblingWindow(Duration.ofMillis(100), initialValue, sumAggregator)
-            .setLateTrigger(Triggers.any(Triggers.count(30000), Triggers.timeSinceFirstMessage(Duration.ofMillis(10)))));
-
-        inputStream.filter(this::myFilter2).window(Windows.tumblingWindow(Duration.ofMillis(100), initialValue, sumAggregator)
-            .setLateTrigger(Triggers.any(Triggers.count(30000), Triggers.timeSinceFirstMessage(Duration.ofMillis(10)))));
-
-        inputStream.filter(this::myFilter3).window(Windows.tumblingWindow(Duration.ofMillis(100), initialValue, sumAggregator)
-            .setLateTrigger(Triggers.any(Triggers.count(30000), Triggers.timeSinceFirstMessage(Duration.ofMillis(10)))));
-
-      });
-  }
-
-  JsonMessageEnvelope getInputMessage(InputMessageEnvelope m1) {
-    return (JsonMessageEnvelope) m1.getMessage();
-  }
-
-  boolean myFilter1(JsonMessageEnvelope m1) {
-    // Do user defined processing here
-    return m1.getMessage().parKey.equals("key1");
-  }
-
-  boolean myFilter2(JsonMessageEnvelope m1) {
-    // Do user defined processing here
-    return m1.getMessage().parKey.equals("key2");
-  }
-
-  boolean myFilter3(JsonMessageEnvelope m1) {
-    return m1.getMessage().parKey.equals("key3");
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/test/java/org/apache/samza/example/TestExampleBase.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/TestExampleBase.java b/samza-core/src/test/java/org/apache/samza/example/TestExampleBase.java
deleted file mode 100644
index dd661a0..0000000
--- a/samza-core/src/test/java/org/apache/samza/example/TestExampleBase.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.example;
-
-import org.apache.samza.application.StreamApplication;
-import org.apache.samza.system.SystemStream;
-import org.apache.samza.system.SystemStreamPartition;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-/**
- * Base class for test examples
- *
- */
-public abstract class TestExampleBase implements StreamApplication {
-
-  protected final Map<SystemStream, Set<SystemStreamPartition>> inputs;
-
-  TestExampleBase(Set<SystemStreamPartition> inputs) {
-    this.inputs = new HashMap<>();
-    for (SystemStreamPartition input : inputs) {
-      this.inputs.putIfAbsent(input.getSystemStream(), new HashSet<>());
-      this.inputs.get(input.getSystemStream()).add(input);
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/test/java/org/apache/samza/example/TestJoinExample.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/TestJoinExample.java b/samza-core/src/test/java/org/apache/samza/example/TestJoinExample.java
deleted file mode 100644
index 6c9f8c2..0000000
--- a/samza-core/src/test/java/org/apache/samza/example/TestJoinExample.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.example;
-
-import org.apache.samza.config.Config;
-import org.apache.samza.operators.MessageStream;
-import org.apache.samza.operators.StreamGraph;
-import org.apache.samza.operators.data.InputMessageEnvelope;
-import org.apache.samza.operators.data.JsonIncomingSystemMessageEnvelope;
-import org.apache.samza.operators.data.Offset;
-import org.apache.samza.operators.functions.JoinFunction;
-import org.apache.samza.serializers.JsonSerde;
-import org.apache.samza.serializers.StringSerde;
-import org.apache.samza.system.StreamSpec;
-import org.apache.samza.system.SystemStream;
-import org.apache.samza.system.SystemStreamPartition;
-
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Set;
-
-
-/**
- * Example implementation of unique key-based stream-stream join tasks
- *
- */
-public class TestJoinExample  extends TestExampleBase {
-
-  TestJoinExample(Set<SystemStreamPartition> inputs) {
-    super(inputs);
-  }
-
-  class MessageType {
-    String joinKey;
-    List<String> joinFields = new ArrayList<>();
-  }
-
-  class JsonMessageEnvelope extends JsonIncomingSystemMessageEnvelope<MessageType> {
-    JsonMessageEnvelope(String key, MessageType data, Offset offset, SystemStreamPartition partition) {
-      super(key, data, offset, partition);
-    }
-  }
-
-  MessageStream<JsonMessageEnvelope> joinOutput = null;
-
-  @Override
-  public void init(StreamGraph graph, Config config) {
-
-    for (SystemStream input : inputs.keySet()) {
-      StreamSpec inputStreamSpec = new StreamSpec(input.getSystem() + "-" + input.getStream(), input.getStream(), input.getSystem());
-      MessageStream<JsonMessageEnvelope> newSource = graph.<Object, Object, InputMessageEnvelope>createInStream(
-          inputStreamSpec, null, null).map(this::getInputMessage);
-      if (joinOutput == null) {
-        joinOutput = newSource;
-      } else {
-        joinOutput = joinOutput.join(newSource, new MyJoinFunction(), Duration.ofMinutes(1));
-      }
-    }
-
-    joinOutput.sendTo(graph.createOutStream(
-            new StreamSpec("joinOutput", "JoinOutputEvent", "kafka"),
-            new StringSerde("UTF-8"), new JsonSerde<>()));
-
-  }
-
-  private JsonMessageEnvelope getInputMessage(InputMessageEnvelope ism) {
-    return new JsonMessageEnvelope(
-        ((MessageType) ism.getMessage()).joinKey,
-        (MessageType) ism.getMessage(),
-        ism.getOffset(),
-        ism.getSystemStreamPartition());
-  }
-
-  class MyJoinFunction implements JoinFunction<String, JsonMessageEnvelope, JsonMessageEnvelope, JsonMessageEnvelope> {
-    JsonMessageEnvelope myJoinResult(JsonMessageEnvelope m1, JsonMessageEnvelope m2) {
-      MessageType newJoinMsg = new MessageType();
-      newJoinMsg.joinKey = m1.getKey();
-      newJoinMsg.joinFields.addAll(m1.getMessage().joinFields);
-      newJoinMsg.joinFields.addAll(m2.getMessage().joinFields);
-      return new JsonMessageEnvelope(m1.getMessage().joinKey, newJoinMsg, null, null);
-    }
-
-    @Override
-    public JsonMessageEnvelope apply(JsonMessageEnvelope message, JsonMessageEnvelope otherMessage) {
-      return this.myJoinResult(message, otherMessage);
-    }
-
-    @Override
-    public String getFirstKey(JsonMessageEnvelope message) {
-      return message.getKey();
-    }
-
-    @Override
-    public String getSecondKey(JsonMessageEnvelope message) {
-      return message.getKey();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/test/java/org/apache/samza/example/TestWindowExample.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/TestWindowExample.java b/samza-core/src/test/java/org/apache/samza/example/TestWindowExample.java
deleted file mode 100644
index c88df7c..0000000
--- a/samza-core/src/test/java/org/apache/samza/example/TestWindowExample.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.example;
-
-import org.apache.samza.config.Config;
-import org.apache.samza.operators.StreamGraph;
-import org.apache.samza.operators.data.InputMessageEnvelope;
-import org.apache.samza.operators.data.JsonIncomingSystemMessageEnvelope;
-import org.apache.samza.operators.data.MessageEnvelope;
-import org.apache.samza.operators.data.Offset;
-import org.apache.samza.operators.functions.FoldLeftFunction;
-import org.apache.samza.operators.windows.Windows;
-import org.apache.samza.system.StreamSpec;
-import org.apache.samza.system.SystemStreamPartition;
-
-import java.time.Duration;
-import java.util.Set;
-import java.util.function.Supplier;
-
-
-/**
- * Example implementation of a simple user-defined tasks w/ window operators
- *
- */
-public class TestWindowExample extends TestExampleBase {
-  class MessageType {
-    String field1;
-    String field2;
-  }
-
-  TestWindowExample(Set<SystemStreamPartition> inputs) {
-    super(inputs);
-  }
-
-  class JsonMessageEnvelope extends JsonIncomingSystemMessageEnvelope<MessageType> {
-
-    JsonMessageEnvelope(String key, MessageType data, Offset offset, SystemStreamPartition partition) {
-      super(key, data, offset, partition);
-    }
-  }
-
-  @Override
-  public void init(StreamGraph graph, Config config) {
-    FoldLeftFunction<JsonMessageEnvelope, Integer> maxAggregator = (m, c) -> c + 1;
-    Supplier<Integer> initialValue = () -> 0;
-    inputs.keySet().forEach(source -> graph.<Object, Object, InputMessageEnvelope>createInStream(
-            new StreamSpec(source.getSystem() + "-" + source.getStream(), source.getStream(), source.getSystem()), null, null).
-        map(m1 -> new JsonMessageEnvelope(this.myMessageKeyFunction(m1), (MessageType) m1.getMessage(), m1.getOffset(),
-            m1.getSystemStreamPartition())).window(Windows.tumblingWindow(Duration.ofMillis(200), initialValue, maxAggregator)));
-
-  }
-
-  String myMessageKeyFunction(MessageEnvelope<Object, Object> m) {
-    return m.getKey().toString();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/test/java/org/apache/samza/example/WindowExample.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/WindowExample.java b/samza-core/src/test/java/org/apache/samza/example/WindowExample.java
new file mode 100644
index 0000000..159dba2
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/example/WindowExample.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.example;
+
+import org.apache.samza.application.StreamApplication;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.OutputStream;
+import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.operators.functions.FoldLeftFunction;
+import org.apache.samza.operators.triggers.Triggers;
+import org.apache.samza.operators.windows.WindowPane;
+import org.apache.samza.operators.windows.Windows;
+import org.apache.samza.runtime.ApplicationRunner;
+import org.apache.samza.util.CommandLine;
+
+import java.time.Duration;
+import java.util.function.Supplier;
+
+
+/**
+ * Example implementation of a simple user-defined task w/ a window operator.
+ *
+ */
+public class WindowExample implements StreamApplication {
+
+  @Override
+  public void init(StreamGraph graph, Config config) {
+    Supplier<Integer> initialValue = () -> 0;
+    FoldLeftFunction<PageViewEvent, Integer> counter = (m, c) -> c == null ? 1 : c + 1;
+    MessageStream<PageViewEvent> inputStream = graph.getInputStream("inputStream", (k, m) -> (PageViewEvent) m);
+    OutputStream<String, Integer, WindowPane<Void, Integer>> outputStream = graph
+        .getOutputStream("outputStream", m -> m.getKey().getPaneId(), m -> m.getMessage());
+
+    // create a tumbling window that outputs the number of message collected every 10 minutes.
+    // also emit early results if either the number of messages collected reaches 30000, or if no new messages arrive
+    // for 1 minute.
+    inputStream
+        .window(Windows.tumblingWindow(Duration.ofMinutes(10), initialValue, counter)
+            .setLateTrigger(Triggers.any(Triggers.count(30000), Triggers.timeSinceLastMessage(Duration.ofMinutes(1)))))
+        .sendTo(outputStream);
+  }
+
+  // local execution mode
+  public static void main(String[] args) throws Exception {
+    CommandLine cmdLine = new CommandLine();
+    Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
+    ApplicationRunner localRunner = ApplicationRunner.getLocalRunner(config);
+    localRunner.run(new WindowExample());
+  }
+
+  class PageViewEvent {
+    String key;
+    long timestamp;
+
+    public PageViewEvent(String key, long timestamp) {
+      this.key = key;
+      this.timestamp = timestamp;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
index e524ba1..e661798 100644
--- a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
+++ b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
@@ -19,56 +19,52 @@
 
 package org.apache.samza.execution;
 
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
 import org.apache.samza.Partition;
-import org.apache.samza.application.StreamApplication;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.MapConfig;
-import org.apache.samza.job.ApplicationStatus;
 import org.apache.samza.operators.MessageStream;
-import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.operators.OutputStream;
 import org.apache.samza.operators.StreamGraphImpl;
 import org.apache.samza.operators.functions.JoinFunction;
-import org.apache.samza.operators.functions.SinkFunction;
-import org.apache.samza.runtime.AbstractApplicationRunner;
 import org.apache.samza.runtime.ApplicationRunner;
 import org.apache.samza.system.StreamSpec;
 import org.apache.samza.system.SystemAdmin;
 import org.apache.samza.system.SystemStreamMetadata;
 import org.apache.samza.system.SystemStreamPartition;
-import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.TaskCoordinator;
 import org.junit.Before;
 import org.junit.Test;
 
-import static org.junit.Assert.*;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 
 public class TestExecutionPlanner {
 
-  private Config config;
-
   private static final String DEFAULT_SYSTEM = "test-system";
   private static final int DEFAULT_PARTITIONS = 10;
 
+  private Map<String, SystemAdmin> systemAdmins;
+  private StreamManager streamManager;
+  private ApplicationRunner runner;
+  private Config config;
+
   private StreamSpec input1;
   private StreamSpec input2;
   private StreamSpec input3;
   private StreamSpec output1;
   private StreamSpec output2;
 
-  private Map<String, SystemAdmin> systemAdmins;
-  private StreamManager streamManager;
-
-  private ApplicationRunner runner;
-
   private JoinFunction createJoin() {
     return new JoinFunction() {
       @Override
@@ -88,14 +84,6 @@ public class TestExecutionPlanner {
     };
   }
 
-  private SinkFunction createSink() {
-    return new SinkFunction() {
-      @Override
-      public void apply(Object message, MessageCollector messageCollector, TaskCoordinator taskCoordinator) {
-      }
-    };
-  }
-
   private SystemAdmin createSystemAdmin(Map<String, Integer> streamToPartitions) {
 
     return new SystemAdmin() {
@@ -139,37 +127,43 @@ public class TestExecutionPlanner {
     };
   }
 
-  private StreamGraph createSimpleGraph() {
+  private StreamGraphImpl createSimpleGraph() {
     /**
      * a simple graph of partitionBy and map
      *
      * input1 -> partitionBy -> map -> output1
      *
      */
-    StreamGraph streamGraph = new StreamGraphImpl(runner, config);
-    streamGraph.createInStream(input1, null, null).partitionBy(m -> "yes!!!").map(m -> m).sendTo(streamGraph.createOutStream(output1, null, null));
+    StreamGraphImpl streamGraph = new StreamGraphImpl(runner, config);
+    OutputStream<Object, Object, Object> output1 = streamGraph.getOutputStream("output1", null, null);
+    streamGraph.getInputStream("input1", null)
+        .partitionBy(m -> "yes!!!").map(m -> m)
+        .sendTo(output1);
     return streamGraph;
   }
 
-  private StreamGraph createStreamGraphWithJoin() {
+  private StreamGraphImpl createStreamGraphWithJoin() {
 
-    /** the graph looks like the following
+    /**
+     * the graph looks like the following. number of partitions in parentheses. quotes indicate expected value.
      *
-     *                        input1 -> map -> join -> output1
-     *                                           |
-     *          input2 -> partitionBy -> filter -|
-     *                                           |
-     * input3 -> filter -> partitionBy -> map -> join -> output2
+     *                               input1 (64) -> map -> join -> output1 (8)
+     *                                                       |
+     *          input2 (16) -> partitionBy ("64") -> filter -|
+     *                                                       |
+     * input3 (32) -> filter -> partitionBy ("64") -> map -> join -> output2 (16)
      *
      */
 
-    StreamGraph streamGraph = new StreamGraphImpl(runner, config);
-    MessageStream m1 = streamGraph.createInStream(input1, null, null).map(m -> m);
-    MessageStream m2 = streamGraph.createInStream(input2, null, null).partitionBy(m -> "haha").filter(m -> true);
-    MessageStream m3 = streamGraph.createInStream(input3, null, null).filter(m -> true).partitionBy(m -> "hehe").map(m -> m);
+    StreamGraphImpl streamGraph = new StreamGraphImpl(runner, config);
+    MessageStream m1 = streamGraph.getInputStream("input1", null).map(m -> m);
+    MessageStream m2 = streamGraph.getInputStream("input2", null).partitionBy(m -> "haha").filter(m -> true);
+    MessageStream m3 = streamGraph.getInputStream("input3", null).filter(m -> true).partitionBy(m -> "hehe").map(m -> m);
+    OutputStream<Object, Object, Object> output1 = streamGraph.getOutputStream("output1", null, null);
+    OutputStream<Object, Object, Object> output2 = streamGraph.getOutputStream("output2", null, null);
 
-    m1.join(m2, createJoin(), Duration.ofHours(1)).sendTo(streamGraph.createOutStream(output1, null, null));
-    m3.join(m2, createJoin(), Duration.ofHours(1)).sendTo(streamGraph.createOutStream(output2, null, null));
+    m1.join(m2, createJoin(), Duration.ofHours(2)).sendTo(output1);
+    m3.join(m2, createJoin(), Duration.ofHours(1)).sendTo(output2);
 
     return streamGraph;
   }
@@ -205,27 +199,26 @@ public class TestExecutionPlanner {
     systemAdmins.put("system2", systemAdmin2);
     streamManager = new StreamManager(systemAdmins);
 
-    runner = new AbstractApplicationRunner(config) {
-      @Override
-      public void run(StreamApplication streamApp) {
-      }
-
-      @Override
-      public void kill(StreamApplication streamApp) {
-
-      }
-
-      @Override
-      public ApplicationStatus status(StreamApplication streamApp) {
-        return null;
-      }
-    };
+    runner = mock(ApplicationRunner.class);
+    when(runner.getStreamSpec("input1")).thenReturn(input1);
+    when(runner.getStreamSpec("input2")).thenReturn(input2);
+    when(runner.getStreamSpec("input3")).thenReturn(input3);
+    when(runner.getStreamSpec("output1")).thenReturn(output1);
+    when(runner.getStreamSpec("output2")).thenReturn(output2);
+
+    // intermediate streams used in tests
+    when(runner.getStreamSpec("test-app-1-partition_by-0"))
+        .thenReturn(new StreamSpec("test-app-1-partition_by-0", "test-app-1-partition_by-0", "default-system"));
+    when(runner.getStreamSpec("test-app-1-partition_by-1"))
+        .thenReturn(new StreamSpec("test-app-1-partition_by-1", "test-app-1-partition_by-1", "default-system"));
+    when(runner.getStreamSpec("test-app-1-partition_by-4"))
+        .thenReturn(new StreamSpec("test-app-1-partition_by-4", "test-app-1-partition_by-4", "default-system"));
   }
 
   @Test
   public void testCreateProcessorGraph() {
     ExecutionPlanner planner = new ExecutionPlanner(config, streamManager);
-    StreamGraph streamGraph = createStreamGraphWithJoin();
+    StreamGraphImpl streamGraph = createStreamGraphWithJoin();
 
     JobGraph jobGraph = planner.createJobGraph(streamGraph);
     assertTrue(jobGraph.getSources().size() == 3);
@@ -236,7 +229,7 @@ public class TestExecutionPlanner {
   @Test
   public void testFetchExistingStreamPartitions() {
     ExecutionPlanner planner = new ExecutionPlanner(config, streamManager);
-    StreamGraph streamGraph = createStreamGraphWithJoin();
+    StreamGraphImpl streamGraph = createStreamGraphWithJoin();
     JobGraph jobGraph = planner.createJobGraph(streamGraph);
 
     ExecutionPlanner.updateExistingPartitions(jobGraph, streamManager);
@@ -254,7 +247,7 @@ public class TestExecutionPlanner {
   @Test
   public void testCalculateJoinInputPartitions() {
     ExecutionPlanner planner = new ExecutionPlanner(config, streamManager);
-    StreamGraph streamGraph = createStreamGraphWithJoin();
+    StreamGraphImpl streamGraph = createStreamGraphWithJoin();
     JobGraph jobGraph = planner.createJobGraph(streamGraph);
 
     ExecutionPlanner.updateExistingPartitions(jobGraph, streamManager);
@@ -273,7 +266,7 @@ public class TestExecutionPlanner {
     Config cfg = new MapConfig(map);
 
     ExecutionPlanner planner = new ExecutionPlanner(cfg, streamManager);
-    StreamGraph streamGraph = createSimpleGraph();
+    StreamGraphImpl streamGraph = createSimpleGraph();
     JobGraph jobGraph = planner.createJobGraph(streamGraph);
     planner.calculatePartitions(streamGraph, jobGraph);
 
@@ -286,7 +279,7 @@ public class TestExecutionPlanner {
   @Test
   public void testCalculateIntStreamPartitions() {
     ExecutionPlanner planner = new ExecutionPlanner(config, streamManager);
-    StreamGraph streamGraph = createSimpleGraph();
+    StreamGraphImpl streamGraph = createSimpleGraph();
     JobGraph jobGraph = planner.createJobGraph(streamGraph);
     planner.calculatePartitions(streamGraph, jobGraph);
 

http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java b/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java
index 4e6c750..acc8588 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java
@@ -22,7 +22,6 @@ import com.google.common.collect.ImmutableSet;
 import org.apache.samza.Partition;
 import org.apache.samza.application.StreamApplication;
 import org.apache.samza.config.Config;
-import org.apache.samza.operators.data.MessageEnvelope;
 import org.apache.samza.operators.functions.JoinFunction;
 import org.apache.samza.runtime.ApplicationRunner;
 import org.apache.samza.system.IncomingMessageEnvelope;
@@ -48,7 +47,6 @@ import static org.mockito.Mockito.when;
 
 public class TestJoinOperator {
   private final TaskCoordinator taskCoordinator = mock(TaskCoordinator.class);
-  private final ApplicationRunner runner = mock(ApplicationRunner.class);
   private final Set<Integer> numbers = ImmutableSet.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
 
   @Test
@@ -226,6 +224,10 @@ public class TestJoinOperator {
   }
 
   private StreamOperatorTask createStreamOperatorTask() throws Exception {
+    ApplicationRunner runner = mock(ApplicationRunner.class);
+    when(runner.getStreamSpec("instream")).thenReturn(new StreamSpec("instream", "instream", "insystem"));
+    when(runner.getStreamSpec("instream2")).thenReturn(new StreamSpec("instream2", "instream2", "insystem2"));
+
     TaskContext taskContext = mock(TaskContext.class);
     when(taskContext.getSystemStreamPartitions()).thenReturn(ImmutableSet
         .of(new SystemStreamPartition("insystem", "instream", new Partition(0)),
@@ -239,13 +241,12 @@ public class TestJoinOperator {
   }
 
   private class TestStreamApplication implements StreamApplication {
-    StreamSpec inStreamSpec = new StreamSpec("instream", "instream", "insystem");
-    StreamSpec inStreamSpec2 = new StreamSpec("instream2", "instream2", "insystem2");
-
     @Override
     public void init(StreamGraph graph, Config config) {
-      MessageStream<MessageEnvelope<Integer, Integer>> inStream = graph.createInStream(inStreamSpec, null, null);
-      MessageStream<MessageEnvelope<Integer, Integer>> inStream2 = graph.createInStream(inStreamSpec2, null, null);
+      MessageStream<FirstStreamIME> inStream =
+          graph.getInputStream("instream", FirstStreamIME::new);
+      MessageStream<SecondStreamIME> inStream2 =
+          graph.getInputStream("instream2", SecondStreamIME::new);
 
       SystemStream outputSystemStream = new SystemStream("outputSystem", "outputStream");
       inStream
@@ -256,22 +257,20 @@ public class TestJoinOperator {
     }
   }
 
-  private class TestJoinFunction
-      implements JoinFunction<Integer, MessageEnvelope<Integer, Integer>, MessageEnvelope<Integer, Integer>, Integer> {
+  private class TestJoinFunction implements JoinFunction<Integer, FirstStreamIME, SecondStreamIME, Integer> {
     @Override
-    public Integer apply(MessageEnvelope<Integer, Integer> message,
-        MessageEnvelope<Integer, Integer> otherMessage) {
-      return message.getMessage() + otherMessage.getMessage();
+    public Integer apply(FirstStreamIME message, SecondStreamIME otherMessage) {
+      return (Integer) message.getMessage() + (Integer) otherMessage.getMessage();
     }
 
     @Override
-    public Integer getFirstKey(MessageEnvelope<Integer, Integer> message) {
-      return message.getKey();
+    public Integer getFirstKey(FirstStreamIME message) {
+      return (Integer) message.getKey();
     }
 
     @Override
-    public Integer getSecondKey(MessageEnvelope<Integer, Integer> message) {
-      return message.getKey();
+    public Integer getSecondKey(SecondStreamIME message) {
+      return (Integer) message.getKey();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java b/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
index 8a2dd95..e815b81 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
@@ -18,25 +18,19 @@
  */
 package org.apache.samza.operators;
 
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.function.Function;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.MapConfig;
+import org.apache.samza.operators.data.TestMessageEnvelope;
+import org.apache.samza.operators.data.TestOutputMessageEnvelope;
 import org.apache.samza.operators.functions.FilterFunction;
 import org.apache.samza.operators.functions.FlatMapFunction;
 import org.apache.samza.operators.functions.JoinFunction;
 import org.apache.samza.operators.functions.MapFunction;
 import org.apache.samza.operators.functions.SinkFunction;
 import org.apache.samza.operators.spec.OperatorSpec;
-import org.apache.samza.operators.spec.PartialJoinOperatorSpec;
 import org.apache.samza.operators.spec.SinkOperatorSpec;
+import org.apache.samza.operators.spec.PartialJoinOperatorSpec;
 import org.apache.samza.operators.spec.StreamOperatorSpec;
 import org.apache.samza.runtime.ApplicationRunner;
 import org.apache.samza.system.OutgoingMessageEnvelope;
@@ -45,6 +39,15 @@ import org.apache.samza.task.MessageCollector;
 import org.apache.samza.task.TaskCoordinator;
 import org.junit.Test;
 
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
@@ -138,7 +141,6 @@ public class TestMessageStreamImpl {
     OperatorSpec<TestMessageEnvelope> sinkOp = subs.iterator().next();
     assertTrue(sinkOp instanceof SinkOperatorSpec);
     assertEquals(((SinkOperatorSpec) sinkOp).getSinkFn(), xSink);
-    assertNull(((SinkOperatorSpec) sinkOp).getNextStream());
   }
 
   @Test
@@ -220,8 +222,8 @@ public class TestMessageStreamImpl {
     MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(streamGraph);
     Function<TestMessageEnvelope, String> keyExtractorFunc = m -> "222";
     inputStream.partitionBy(keyExtractorFunc);
-    assertTrue(streamGraph.getInStreams().size() == 1);
-    assertTrue(streamGraph.getOutStreams().size() == 1);
+    assertTrue(streamGraph.getInputStreams().size() == 1);
+    assertTrue(streamGraph.getOutputStreams().size() == 1);
 
     Collection<OperatorSpec> subs = inputStream.getRegisteredOperatorSpecs();
     assertEquals(subs.size(), 1);
@@ -229,11 +231,7 @@ public class TestMessageStreamImpl {
     assertTrue(partitionByOp instanceof SinkOperatorSpec);
     assertNull(partitionByOp.getNextStream());
 
-    ((SinkOperatorSpec) partitionByOp).getSinkFn().apply(new TestMessageEnvelope("111", "test", 1000), new MessageCollector() {
-      @Override
-      public void send(OutgoingMessageEnvelope envelope) {
-        assertTrue(envelope.getPartitionKey().equals("222"));
-      }
-    }, null);
+    ((SinkOperatorSpec) partitionByOp).getSinkFn().apply(new TestMessageEnvelope("111", "test", 1000),
+        envelope -> assertTrue(envelope.getPartitionKey().equals("222")), null);
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/test/java/org/apache/samza/operators/TestWindowOperator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/TestWindowOperator.java b/samza-core/src/test/java/org/apache/samza/operators/TestWindowOperator.java
new file mode 100644
index 0000000..6603137
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/operators/TestWindowOperator.java
@@ -0,0 +1,413 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.operators;
+
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import junit.framework.Assert;
+import org.apache.samza.Partition;
+import org.apache.samza.application.StreamApplication;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.triggers.FiringType;
+import org.apache.samza.system.StreamSpec;
+import org.apache.samza.testUtils.TestClock;
+import org.apache.samza.operators.triggers.Trigger;
+import org.apache.samza.operators.triggers.Triggers;
+import org.apache.samza.operators.windows.AccumulationMode;
+import org.apache.samza.operators.windows.WindowPane;
+import org.apache.samza.operators.windows.Windows;
+import org.apache.samza.runtime.ApplicationRunner;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.StreamOperatorTask;
+import org.apache.samza.task.TaskContext;
+import org.apache.samza.task.TaskCoordinator;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.function.Function;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestWindowOperator {
+  private final MessageCollector messageCollector = mock(MessageCollector.class);
+  private final TaskCoordinator taskCoordinator = mock(TaskCoordinator.class);
+  private final List<WindowPane<Integer, Collection<MessageEnvelope<Integer, Integer>>>> windowPanes = new ArrayList<>();
+  private final List<Integer> integers = ImmutableList.of(1, 2, 1, 2, 1, 2, 1, 2, 3);
+  private Config config;
+  private TaskContext taskContext;
+  private ApplicationRunner runner;
+
+  @Before
+  public void setup() throws Exception {
+    windowPanes.clear();
+
+    config = mock(Config.class);
+    taskContext = mock(TaskContext.class);
+    runner = mock(ApplicationRunner.class);
+    when(taskContext.getSystemStreamPartitions()).thenReturn(ImmutableSet
+        .of(new SystemStreamPartition("kafka", "integers", new Partition(0))));
+    when(runner.getStreamSpec("integer-stream")).thenReturn(new StreamSpec("integer-stream", "integers", "kafka"));
+  }
+
+  @Test
+  public void testTumblingWindowsDiscardingMode() throws Exception {
+
+    StreamApplication sgb = new KeyedTumblingWindowStreamApplication(AccumulationMode.DISCARDING,
+        Duration.ofSeconds(1), Triggers.repeat(Triggers.count(2)));
+    TestClock testClock = new TestClock();
+    StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock);
+    task.init(config, taskContext);
+
+    integers.forEach(n -> task.process(new IntegerMessageEnvelope(n, n), messageCollector, taskCoordinator));
+    testClock.advanceTime(Duration.ofSeconds(1));
+
+    task.window(messageCollector, taskCoordinator);
+    Assert.assertEquals(windowPanes.size(), 5);
+    Assert.assertEquals(windowPanes.get(0).getKey().getKey(), new Integer(1));
+    Assert.assertEquals((windowPanes.get(0).getMessage()).size(), 2);
+
+    Assert.assertEquals(windowPanes.get(1).getKey().getKey(), new Integer(2));
+    Assert.assertEquals((windowPanes.get(1).getMessage()).size(), 2);
+
+    Assert.assertEquals(windowPanes.get(2).getKey().getKey(), new Integer(1));
+    Assert.assertEquals((windowPanes.get(2).getMessage()).size(), 2);
+
+    Assert.assertEquals(windowPanes.get(3).getKey().getKey(), new Integer(2));
+    Assert.assertEquals((windowPanes.get(3).getMessage()).size(), 2);
+
+    Assert.assertEquals(windowPanes.get(4).getKey().getKey(), new Integer(3));
+    Assert.assertEquals((windowPanes.get(4).getMessage()).size(), 1);
+  }
+
+  @Test
+  public void testTumblingWindowsAccumulatingMode() throws Exception {
+    StreamApplication sgb = new KeyedTumblingWindowStreamApplication(AccumulationMode.ACCUMULATING,
+        Duration.ofSeconds(1), Triggers.repeat(Triggers.count(2)));
+    TestClock testClock = new TestClock();
+    StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock);
+    task.init(config, taskContext);
+
+    integers.forEach(n -> task.process(new IntegerMessageEnvelope(n, n), messageCollector, taskCoordinator));
+    testClock.advanceTime(Duration.ofSeconds(1));
+    task.window(messageCollector, taskCoordinator);
+
+    Assert.assertEquals(windowPanes.size(), 7);
+    Assert.assertEquals(windowPanes.get(0).getKey().getKey(), new Integer(1));
+    Assert.assertEquals((windowPanes.get(0).getMessage()).size(), 2);
+
+    Assert.assertEquals(windowPanes.get(1).getKey().getKey(), new Integer(2));
+    Assert.assertEquals((windowPanes.get(1).getMessage()).size(), 2);
+
+    Assert.assertEquals(windowPanes.get(2).getKey().getKey(), new Integer(1));
+    Assert.assertEquals((windowPanes.get(2).getMessage()).size(), 4);
+
+    Assert.assertEquals(windowPanes.get(3).getKey().getKey(), new Integer(2));
+    Assert.assertEquals((windowPanes.get(3).getMessage()).size(), 4);
+  }
+
+  @Test
+  public void testSessionWindowsDiscardingMode() throws Exception {
+    StreamApplication sgb = new KeyedSessionWindowStreamApplication(AccumulationMode.DISCARDING, Duration.ofMillis(500));
+    TestClock testClock = new TestClock();
+    StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock);
+    task.init(config, taskContext);
+
+    task.process(new IntegerMessageEnvelope(1, 1), messageCollector, taskCoordinator);
+    task.process(new IntegerMessageEnvelope(1, 1), messageCollector, taskCoordinator);
+    testClock.advanceTime(Duration.ofSeconds(1));
+    task.window(messageCollector, taskCoordinator);
+
+    Assert.assertEquals(windowPanes.size(), 1);
+    Assert.assertEquals(windowPanes.get(0).getKey().getPaneId(), "1");
+    Assert.assertEquals(windowPanes.get(0).getKey().getKey(), new Integer(1));
+
+    task.process(new IntegerMessageEnvelope(2, 2), messageCollector, taskCoordinator);
+    task.process(new IntegerMessageEnvelope(2, 2), messageCollector, taskCoordinator);
+    task.process(new IntegerMessageEnvelope(3, 3), messageCollector, taskCoordinator);
+    task.process(new IntegerMessageEnvelope(3, 3), messageCollector, taskCoordinator);
+
+    testClock.advanceTime(Duration.ofSeconds(1));
+    task.window(messageCollector, taskCoordinator);
+
+    Assert.assertEquals(windowPanes.size(), 3);
+    Assert.assertEquals(windowPanes.get(0).getKey().getPaneId(), "1");
+    Assert.assertEquals(windowPanes.get(1).getKey().getPaneId(), "1001");
+    Assert.assertEquals(windowPanes.get(2).getKey().getPaneId(), "1001");
+    Assert.assertEquals((windowPanes.get(0).getMessage()).size(), 2);
+    Assert.assertEquals((windowPanes.get(1).getMessage()).size(), 2);
+    Assert.assertEquals((windowPanes.get(2).getMessage()).size(), 2);
+
+    task.process(new IntegerMessageEnvelope(2, 2), messageCollector, taskCoordinator);
+    task.process(new IntegerMessageEnvelope(2, 2), messageCollector, taskCoordinator);
+
+    testClock.advanceTime(Duration.ofSeconds(1));
+    task.window(messageCollector, taskCoordinator);
+    Assert.assertEquals(windowPanes.size(), 4);
+    Assert.assertEquals(windowPanes.get(3).getKey().getKey(), new Integer(2));
+    Assert.assertEquals(windowPanes.get(3).getKey().getPaneId(), "2001");
+    Assert.assertEquals((windowPanes.get(3).getMessage()).size(), 2);
+
+  }
+
+  @Test
+  public void testSessionWindowsAccumulatingMode() throws Exception {
+    StreamApplication sgb = new KeyedSessionWindowStreamApplication(AccumulationMode.DISCARDING,
+        Duration.ofMillis(500));
+    TestClock testClock = new TestClock();
+    StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock);
+    task.init(config, taskContext);
+
+    task.process(new IntegerMessageEnvelope(1, 1), messageCollector, taskCoordinator);
+    task.process(new IntegerMessageEnvelope(1, 1), messageCollector, taskCoordinator);
+    testClock.advanceTime(Duration.ofSeconds(1));
+
+    task.process(new IntegerMessageEnvelope(2, 2), messageCollector, taskCoordinator);
+    task.process(new IntegerMessageEnvelope(2, 2), messageCollector, taskCoordinator);
+
+    task.process(new IntegerMessageEnvelope(2, 2), messageCollector, taskCoordinator);
+    task.process(new IntegerMessageEnvelope(2, 2), messageCollector, taskCoordinator);
+
+    testClock.advanceTime(Duration.ofSeconds(1));
+    task.window(messageCollector, taskCoordinator);
+    Assert.assertEquals(windowPanes.size(), 2);
+    Assert.assertEquals((windowPanes.get(0).getMessage()).size(), 2);
+    Assert.assertEquals(windowPanes.get(0).getKey().getKey(), new Integer(1));
+    Assert.assertEquals(windowPanes.get(1).getKey().getKey(), new Integer(2));
+    Assert.assertEquals((windowPanes.get(0).getMessage()).size(), 2);
+    Assert.assertEquals((windowPanes.get(1).getMessage()).size(), 4);
+  }
+
+  @Test
+  public void testCancellationOfOnceTrigger() throws Exception {
+    StreamApplication sgb = new KeyedTumblingWindowStreamApplication(AccumulationMode.ACCUMULATING,
+        Duration.ofSeconds(1), Triggers.count(2));
+    TestClock testClock = new TestClock();
+    StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock);
+    task.init(config, taskContext);
+
+    task.process(new IntegerMessageEnvelope(1, 1), messageCollector, taskCoordinator);
+    task.process(new IntegerMessageEnvelope(1, 2), messageCollector, taskCoordinator);
+    Assert.assertEquals(windowPanes.size(), 1);
+    Assert.assertEquals(windowPanes.get(0).getKey().getPaneId(), "0");
+    Assert.assertEquals(windowPanes.get(0).getKey().getKey(), new Integer(1));
+    Assert.assertEquals(windowPanes.get(0).getFiringType(), FiringType.EARLY);
+
+    task.process(new IntegerMessageEnvelope(1, 3), messageCollector, taskCoordinator);
+    task.process(new IntegerMessageEnvelope(1, 4), messageCollector, taskCoordinator);
+    task.process(new IntegerMessageEnvelope(1, 5), messageCollector, taskCoordinator);
+
+    Assert.assertEquals(windowPanes.size(), 1);
+
+    testClock.advanceTime(Duration.ofSeconds(1));
+    task.window(messageCollector, taskCoordinator);
+
+    Assert.assertEquals(windowPanes.size(), 2);
+    Assert.assertEquals(windowPanes.get(0).getKey().getPaneId(), "0");
+    Assert.assertEquals(windowPanes.get(1).getKey().getPaneId(), "0");
+    Assert.assertEquals(windowPanes.get(1).getFiringType(), FiringType.DEFAULT);
+
+    task.process(new IntegerMessageEnvelope(3, 6), messageCollector, taskCoordinator);
+    testClock.advanceTime(Duration.ofSeconds(1));
+    task.window(messageCollector, taskCoordinator);
+
+    Assert.assertEquals(windowPanes.size(), 3);
+    Assert.assertEquals(windowPanes.get(2).getKey().getKey(), new Integer(3));
+    Assert.assertEquals(windowPanes.get(2).getKey().getPaneId(), "1000");
+    Assert.assertEquals(windowPanes.get(2).getFiringType(), FiringType.DEFAULT);
+    Assert.assertEquals((windowPanes.get(2).getMessage()).size(), 1);
+
+  }
+
+  @Test
+  public void testCancellationOfAnyTrigger() throws Exception {
+    StreamApplication sgb = new KeyedTumblingWindowStreamApplication(AccumulationMode.ACCUMULATING, Duration.ofSeconds(1),
+        Triggers.any(Triggers.count(2), Triggers.timeSinceFirstMessage(Duration.ofMillis(500))));
+    TestClock testClock = new TestClock();
+    StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock);
+    task.init(config, taskContext);
+
+    task.process(new IntegerMessageEnvelope(1, 1), messageCollector, taskCoordinator);
+    task.process(new IntegerMessageEnvelope(1, 2), messageCollector, taskCoordinator);
+    //assert that the count trigger fired
+    Assert.assertEquals(windowPanes.size(), 1);
+
+    //advance the timer to enable the triggering of the inner timeSinceFirstMessage trigger
+    testClock.advanceTime(Duration.ofMillis(500));
+
+    //assert that the triggering of the count trigger cancelled the inner timeSinceFirstMessage trigger
+    Assert.assertEquals(windowPanes.size(), 1);
+
+    task.process(new IntegerMessageEnvelope(1, 3), messageCollector, taskCoordinator);
+    task.process(new IntegerMessageEnvelope(1, 4), messageCollector, taskCoordinator);
+    task.process(new IntegerMessageEnvelope(1, 5), messageCollector, taskCoordinator);
+
+    //advance timer by 500 more millis to enable the default trigger
+    testClock.advanceTime(Duration.ofMillis(500));
+    task.window(messageCollector, taskCoordinator);
+
+    //assert that the default trigger fired
+    Assert.assertEquals(windowPanes.size(), 2);
+    Assert.assertEquals(windowPanes.get(1).getFiringType(), FiringType.DEFAULT);
+    Assert.assertEquals(windowPanes.get(1).getKey().getKey(), new Integer(1));
+    Assert.assertEquals(windowPanes.get(1).getKey().getPaneId(), "0");
+    Assert.assertEquals((windowPanes.get(1).getMessage()).size(), 5);
+
+    task.process(new IntegerMessageEnvelope(1, 5), messageCollector, taskCoordinator);
+
+    //advance timer by 500 millis to enable the inner timeSinceFirstMessage trigger
+    testClock.advanceTime(Duration.ofMillis(500));
+    task.window(messageCollector, taskCoordinator);
+
+    Assert.assertEquals(windowPanes.size(), 3);
+    Assert.assertEquals(windowPanes.get(2).getFiringType(), FiringType.EARLY);
+    Assert.assertEquals(windowPanes.get(2).getKey().getKey(), new Integer(1));
+    Assert.assertEquals(windowPanes.get(2).getKey().getPaneId(), "1000");
+
+    //advance timer by > 500 millis to enable the default trigger
+    testClock.advanceTime(Duration.ofMillis(900));
+    task.window(messageCollector, taskCoordinator);
+    Assert.assertEquals(windowPanes.size(), 4);
+    Assert.assertEquals(windowPanes.get(3).getFiringType(), FiringType.DEFAULT);
+    Assert.assertEquals(windowPanes.get(3).getKey().getKey(), new Integer(1));
+    Assert.assertEquals(windowPanes.get(3).getKey().getPaneId(), "1000");
+  }
+
+  @Test
+  public void testCancelationOfRepeatingNestedTriggers() throws Exception {
+
+    StreamApplication sgb = new KeyedTumblingWindowStreamApplication(AccumulationMode.ACCUMULATING, Duration.ofSeconds(1),
+        Triggers.repeat(Triggers.any(Triggers.count(2), Triggers.timeSinceFirstMessage(Duration.ofMillis(500)))));
+    TestClock testClock = new TestClock();
+    StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock);
+    task.init(config, taskContext);
+
+    task.process(new IntegerMessageEnvelope(1, 1), messageCollector, taskCoordinator);
+
+    task.process(new IntegerMessageEnvelope(1, 2), messageCollector, taskCoordinator);
+    //assert that the count trigger fired
+    Assert.assertEquals(windowPanes.size(), 1);
+
+    //advance the timer to enable the potential triggering of the inner timeSinceFirstMessage trigger
+    task.process(new IntegerMessageEnvelope(1, 3), messageCollector, taskCoordinator);
+    testClock.advanceTime(Duration.ofMillis(500));
+    //assert that the triggering of the count trigger cancelled the inner timeSinceFirstMessage trigger
+    task.window(messageCollector, taskCoordinator);
+    Assert.assertEquals(windowPanes.size(), 2);
+
+    task.process(new IntegerMessageEnvelope(1, 3), messageCollector, taskCoordinator);
+    task.process(new IntegerMessageEnvelope(1, 4), messageCollector, taskCoordinator);
+    Assert.assertEquals(windowPanes.size(), 3);
+
+    task.process(new IntegerMessageEnvelope(1, 5), messageCollector, taskCoordinator);
+    //advance timer by 500 more millis to enable the default trigger
+    testClock.advanceTime(Duration.ofMillis(500));
+    task.window(messageCollector, taskCoordinator);
+    //assert that the default trigger fired
+    Assert.assertEquals(windowPanes.size(), 4);
+  }
+
+  private class KeyedTumblingWindowStreamApplication implements StreamApplication {
+
+    private final AccumulationMode mode;
+    private final Duration duration;
+    private final Trigger<MessageEnvelope<Integer, Integer>> earlyTrigger;
+
+    KeyedTumblingWindowStreamApplication(AccumulationMode mode,
+        Duration timeDuration, Trigger<MessageEnvelope<Integer, Integer>> earlyTrigger) {
+      this.mode = mode;
+      this.duration = timeDuration;
+      this.earlyTrigger = earlyTrigger;
+    }
+
+    @Override
+    public void init(StreamGraph graph, Config config) {
+      MessageStream<MessageEnvelope<Integer, Integer>> inStream = graph.getInputStream("integer-stream",
+          (k, m) -> new MessageEnvelope(k, m));
+      Function<MessageEnvelope<Integer, Integer>, Integer> keyFn = m -> m.getKey();
+      inStream
+        .map(m -> m)
+        .window(Windows.keyedTumblingWindow(keyFn, duration).setEarlyTrigger(earlyTrigger)
+          .setAccumulationMode(mode))
+        .map(m -> {
+            windowPanes.add(m);
+            return m;
+          });
+    }
+  }
+
+  private class KeyedSessionWindowStreamApplication implements StreamApplication {
+
+    private final AccumulationMode mode;
+    private final Duration duration;
+
+    KeyedSessionWindowStreamApplication(AccumulationMode mode, Duration duration) {
+      this.mode = mode;
+      this.duration = duration;
+    }
+
+    @Override
+    public void init(StreamGraph graph, Config config) {
+      MessageStream<MessageEnvelope<Integer, Integer>> inStream = graph.getInputStream("integer-stream",
+          (k, m) -> new MessageEnvelope(k, m));
+      Function<MessageEnvelope<Integer, Integer>, Integer> keyFn = m -> m.getKey();
+
+      inStream
+          .map(m -> m)
+          .window(Windows.keyedSessionWindow(keyFn, duration)
+              .setAccumulationMode(mode))
+          .map(m -> {
+              windowPanes.add(m);
+              return m;
+            });
+    }
+  }
+
+  private class IntegerMessageEnvelope extends IncomingMessageEnvelope {
+    IntegerMessageEnvelope(int key, int msg) {
+      super(new SystemStreamPartition("kafka", "integers", new Partition(0)), "1", key, msg);
+    }
+  }
+
+  private class MessageEnvelope<K, V> {
+    private final K key;
+    private final V value;
+
+    MessageEnvelope(K key, V value) {
+      this.key = key;
+      this.value = value;
+    }
+
+    public K getKey() {
+      return key;
+    }
+
+    public V getValue() {
+      return value;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/test/java/org/apache/samza/operators/data/JsonIncomingSystemMessageEnvelope.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/data/JsonIncomingSystemMessageEnvelope.java b/samza-core/src/test/java/org/apache/samza/operators/data/JsonIncomingSystemMessageEnvelope.java
deleted file mode 100644
index 9a425d1..0000000
--- a/samza-core/src/test/java/org/apache/samza/operators/data/JsonIncomingSystemMessageEnvelope.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.data;
-
-import org.apache.samza.system.SystemStreamPartition;
-
-
-/**
- * Example input {@link MessageEnvelope} w/ Json message and string as the key.
- */
-
-public class JsonIncomingSystemMessageEnvelope<T> implements MessageEnvelope<String, T> {
-
-  private final String key;
-  private final T data;
-  private final Offset offset;
-  private final SystemStreamPartition partition;
-
-  public JsonIncomingSystemMessageEnvelope(String key, T data, Offset offset, SystemStreamPartition partition) {
-    this.key = key;
-    this.data = data;
-    this.offset = offset;
-    this.partition = partition;
-  }
-
-  @Override
-  public T getMessage() {
-    return this.data;
-  }
-
-  @Override
-  public String getKey() {
-    return this.key;
-  }
-
-  public Offset getOffset() {
-    return this.offset;
-  }
-
-  public SystemStreamPartition getSystemStreamPartition() {
-    return this.partition;
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/test/java/org/apache/samza/operators/data/TestMessageEnvelope.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/data/TestMessageEnvelope.java b/samza-core/src/test/java/org/apache/samza/operators/data/TestMessageEnvelope.java
new file mode 100644
index 0000000..2524c28
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/operators/data/TestMessageEnvelope.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.data;
+
+
+public class TestMessageEnvelope {
+
+  private final String key;
+  private final MessageType value;
+
+  public TestMessageEnvelope(String key, String value, long eventTime) {
+    this.key = key;
+    this.value = new MessageType(value, eventTime);
+  }
+
+  public MessageType getMessage() {
+    return this.value;
+  }
+
+  public String getKey() {
+    return this.key;
+  }
+
+  public class MessageType {
+    private final String value;
+    private final long eventTime;
+
+    public MessageType(String value, long eventTime) {
+      this.value = value;
+      this.eventTime = eventTime;
+    }
+
+    public long getEventTime() {
+      return eventTime;
+    }
+
+    public String getValue() {
+      return value;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/test/java/org/apache/samza/operators/data/TestOutputMessageEnvelope.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/data/TestOutputMessageEnvelope.java b/samza-core/src/test/java/org/apache/samza/operators/data/TestOutputMessageEnvelope.java
new file mode 100644
index 0000000..f9537a3
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/operators/data/TestOutputMessageEnvelope.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.data;
+
+
+public class TestOutputMessageEnvelope {
+  private final String key;
+  private final Integer value;
+
+  public TestOutputMessageEnvelope(String key, Integer value) {
+    this.key = key;
+    this.value = value;
+  }
+
+  public Integer getMessage() {
+    return this.value;
+  }
+
+  public String getKey() {
+    return this.key;
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
index 5722dbd..f978c3c 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
@@ -18,8 +18,8 @@
  */
 package org.apache.samza.operators.impl;
 
-import org.apache.samza.operators.TestMessageEnvelope;
-import org.apache.samza.operators.TestOutputMessageEnvelope;
+import org.apache.samza.operators.data.TestMessageEnvelope;
+import org.apache.samza.operators.data.TestOutputMessageEnvelope;
 import org.apache.samza.task.MessageCollector;
 import org.apache.samza.task.TaskCoordinator;
 import org.hamcrest.core.IsEqual;

http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java
index 31f6f4a..267cdfc 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java
@@ -18,24 +18,21 @@
  */
 package org.apache.samza.operators.impl;
 
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
 import org.apache.samza.config.Config;
 import org.apache.samza.operators.MessageStreamImpl;
 import org.apache.samza.operators.StreamGraphImpl;
-import org.apache.samza.operators.TestMessageEnvelope;
 import org.apache.samza.operators.TestMessageStreamImplUtil;
-import org.apache.samza.operators.TestOutputMessageEnvelope;
-import org.apache.samza.operators.data.MessageEnvelope;
+import org.apache.samza.operators.data.TestMessageEnvelope;
+import org.apache.samza.operators.data.TestOutputMessageEnvelope;
 import org.apache.samza.operators.functions.FlatMapFunction;
 import org.apache.samza.operators.functions.JoinFunction;
 import org.apache.samza.operators.functions.PartialJoinFunction;
 import org.apache.samza.operators.functions.SinkFunction;
 import org.apache.samza.operators.spec.OperatorSpec;
-import org.apache.samza.operators.spec.WindowOperatorSpec;
-import org.apache.samza.operators.spec.PartialJoinOperatorSpec;
 import org.apache.samza.operators.spec.SinkOperatorSpec;
+import org.apache.samza.operators.spec.PartialJoinOperatorSpec;
 import org.apache.samza.operators.spec.StreamOperatorSpec;
+import org.apache.samza.operators.spec.WindowOperatorSpec;
 import org.apache.samza.operators.windows.Windows;
 import org.apache.samza.operators.windows.internal.WindowInternal;
 import org.apache.samza.operators.windows.internal.WindowType;
@@ -44,6 +41,8 @@ import org.junit.Before;
 import org.junit.Test;
 
 import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Iterator;
@@ -66,11 +65,11 @@ public class TestOperatorImpls {
     nextOperatorsField = OperatorImpl.class.getDeclaredField("nextOperators");
     nextOperatorsField.setAccessible(true);
 
-    createOpMethod = OperatorGraph.class.getDeclaredMethod("createOperatorImpl", MessageStreamImpl.class,
+    createOpMethod = OperatorImplGraph.class.getDeclaredMethod("createOperatorImpl", MessageStreamImpl.class,
         OperatorSpec.class, Config.class, TaskContext.class);
     createOpMethod.setAccessible(true);
 
-    createOpsMethod = OperatorGraph.class.getDeclaredMethod("createOperatorImpls", MessageStreamImpl.class, Config.class, TaskContext.class);
+    createOpsMethod = OperatorImplGraph.class.getDeclaredMethod("createOperatorImpls", MessageStreamImpl.class, Config.class, TaskContext.class);
     createOpsMethod.setAccessible(true);
   }
 
@@ -84,8 +83,8 @@ public class TestOperatorImpls {
     Config mockConfig = mock(Config.class);
     TaskContext mockContext = mock(TaskContext.class);
 
-    OperatorGraph opGraph = new OperatorGraph();
-    OperatorImpl<TestMessageEnvelope, ? extends MessageEnvelope> opImpl = (OperatorImpl<TestMessageEnvelope, ? extends MessageEnvelope>)
+    OperatorImplGraph opGraph = new OperatorImplGraph();
+    OperatorImpl<TestMessageEnvelope, ?> opImpl = (OperatorImpl<TestMessageEnvelope, ?>)
         createOpMethod.invoke(opGraph, mockStream, mockWnd, mockConfig, mockContext);
     assertTrue(opImpl instanceof WindowOperatorImpl);
     Field wndInternalField = WindowOperatorImpl.class.getDeclaredField("window");
@@ -97,7 +96,7 @@ public class TestOperatorImpls {
     StreamOperatorSpec<TestMessageEnvelope, TestOutputMessageEnvelope> mockSimpleOp = mock(StreamOperatorSpec.class);
     FlatMapFunction<TestMessageEnvelope, TestOutputMessageEnvelope> mockTxfmFn = mock(FlatMapFunction.class);
     when(mockSimpleOp.getTransformFn()).thenReturn(mockTxfmFn);
-    opImpl = (OperatorImpl<TestMessageEnvelope, ? extends MessageEnvelope>) createOpMethod.invoke(opGraph, mockStream, mockSimpleOp, mockConfig, mockContext);
+    opImpl = (OperatorImpl<TestMessageEnvelope, ?>) createOpMethod.invoke(opGraph, mockStream, mockSimpleOp, mockConfig, mockContext);
     assertTrue(opImpl instanceof StreamOperatorImpl);
     Field txfmFnField = StreamOperatorImpl.class.getDeclaredField("transformFn");
     txfmFnField.setAccessible(true);
@@ -107,7 +106,7 @@ public class TestOperatorImpls {
     SinkFunction<TestMessageEnvelope> sinkFn = (m, mc, tc) -> { };
     SinkOperatorSpec<TestMessageEnvelope> sinkOp = mock(SinkOperatorSpec.class);
     when(sinkOp.getSinkFn()).thenReturn(sinkFn);
-    opImpl = (OperatorImpl<TestMessageEnvelope, ? extends MessageEnvelope>) createOpMethod.invoke(opGraph, mockStream, sinkOp, mockConfig, mockContext);
+    opImpl = (OperatorImpl<TestMessageEnvelope, ?>) createOpMethod.invoke(opGraph, mockStream, sinkOp, mockConfig, mockContext);
     assertTrue(opImpl instanceof SinkOperatorImpl);
     Field sinkFnField = SinkOperatorImpl.class.getDeclaredField("sinkFn");
     sinkFnField.setAccessible(true);
@@ -116,7 +115,7 @@ public class TestOperatorImpls {
     // get join operator
     PartialJoinOperatorSpec<String, TestMessageEnvelope, TestMessageEnvelope, TestOutputMessageEnvelope> joinOp = mock(PartialJoinOperatorSpec.class);
     PartialJoinFunction<String, TestMessageEnvelope, TestMessageEnvelope, TestOutputMessageEnvelope> joinFn = mock(PartialJoinFunction.class);
-    opImpl = (OperatorImpl<TestMessageEnvelope, ? extends MessageEnvelope>) createOpMethod.invoke(opGraph, mockStream, joinOp, mockConfig, mockContext);
+    opImpl = (OperatorImpl<TestMessageEnvelope, ?>) createOpMethod.invoke(opGraph, mockStream, joinOp, mockConfig, mockContext);
     assertTrue(opImpl instanceof PartialJoinOperatorImpl);
   }
 
@@ -126,7 +125,7 @@ public class TestOperatorImpls {
     MessageStreamImpl<TestMessageEnvelope> testStream = mock(MessageStreamImpl.class);
     TaskContext mockContext = mock(TaskContext.class);
     Config mockConfig = mock(Config.class);
-    OperatorGraph opGraph = new OperatorGraph();
+    OperatorImplGraph opGraph = new OperatorImplGraph();
     RootOperatorImpl operatorChain = (RootOperatorImpl) createOpsMethod.invoke(opGraph, testStream, mockConfig, mockContext);
     assertTrue(operatorChain != null);
   }
@@ -139,7 +138,7 @@ public class TestOperatorImpls {
     TaskContext mockContext = mock(TaskContext.class);
     Config mockConfig = mock(Config.class);
     testInput.map(m -> m).window(Windows.keyedSessionWindow(TestMessageEnvelope::getKey, Duration.ofMinutes(10)));
-    OperatorGraph opGraph = new OperatorGraph();
+    OperatorImplGraph opGraph = new OperatorImplGraph();
     RootOperatorImpl operatorChain = (RootOperatorImpl) createOpsMethod.invoke(opGraph, testInput, mockConfig, mockContext);
     Set<OperatorImpl> subsSet = (Set<OperatorImpl>) nextOperatorsField.get(operatorChain);
     assertEquals(subsSet.size(), 1);
@@ -160,7 +159,7 @@ public class TestOperatorImpls {
     Config mockConfig = mock(Config.class);
     testInput.filter(m -> m.getMessage().getEventTime() > 123456L).flatMap(m -> new ArrayList() { { this.add(m); this.add(m); } });
     testInput.filter(m -> m.getMessage().getEventTime() < 123456L).map(m -> m);
-    OperatorGraph opGraph = new OperatorGraph();
+    OperatorImplGraph opGraph = new OperatorImplGraph();
     RootOperatorImpl operatorChain = (RootOperatorImpl) createOpsMethod.invoke(opGraph, testInput, mockConfig, mockContext);
     Set<OperatorImpl> subsSet = (Set<OperatorImpl>) nextOperatorsField.get(operatorChain);
     assertEquals(subsSet.size(), 2);
@@ -208,7 +207,7 @@ public class TestOperatorImpls {
               }
             }, Duration.ofMinutes(1))
         .map(m -> m);
-    OperatorGraph opGraph = new OperatorGraph();
+    OperatorImplGraph opGraph = new OperatorImplGraph();
     // now, we create chained operators from each input sources
     RootOperatorImpl chain1 = (RootOperatorImpl) createOpsMethod.invoke(opGraph, input1, mockConfig, mockContext);
     RootOperatorImpl chain2 = (RootOperatorImpl) createOpsMethod.invoke(opGraph, input2, mockConfig, mockContext);

http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java
index ce9fdd2..abd7740 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java
@@ -19,7 +19,7 @@
 package org.apache.samza.operators.impl;
 
 import org.apache.samza.config.Config;
-import org.apache.samza.operators.TestOutputMessageEnvelope;
+import org.apache.samza.operators.data.TestOutputMessageEnvelope;
 import org.apache.samza.operators.functions.SinkFunction;
 import org.apache.samza.operators.spec.SinkOperatorSpec;
 import org.apache.samza.task.MessageCollector;

http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java
index 0a873fd..9dd161a 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java
@@ -22,8 +22,8 @@ import java.util.ArrayList;
 import java.util.Collection;
 import org.apache.samza.config.Config;
 import org.apache.samza.operators.MessageStreamImpl;
-import org.apache.samza.operators.TestMessageEnvelope;
-import org.apache.samza.operators.TestOutputMessageEnvelope;
+import org.apache.samza.operators.data.TestMessageEnvelope;
+import org.apache.samza.operators.data.TestOutputMessageEnvelope;
 import org.apache.samza.operators.functions.FlatMapFunction;
 import org.apache.samza.operators.spec.StreamOperatorSpec;
 import org.apache.samza.task.MessageCollector;

http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpecs.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpecs.java b/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpecs.java
index ec1d74c..37e3d1a 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpecs.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpecs.java
@@ -20,16 +20,16 @@ package org.apache.samza.operators.spec;
 
 import org.apache.samza.operators.MessageStreamImpl;
 import org.apache.samza.operators.StreamGraphImpl;
-import org.apache.samza.operators.TestMessageEnvelope;
 import org.apache.samza.operators.TestMessageStreamImplUtil;
-import org.apache.samza.operators.TestOutputMessageEnvelope;
-import org.apache.samza.operators.data.MessageEnvelope;
+import org.apache.samza.operators.data.TestMessageEnvelope;
+import org.apache.samza.operators.data.TestOutputMessageEnvelope;
 import org.apache.samza.operators.functions.FlatMapFunction;
 import org.apache.samza.operators.functions.FoldLeftFunction;
 import org.apache.samza.operators.functions.PartialJoinFunction;
 import org.apache.samza.operators.functions.SinkFunction;
-import org.apache.samza.operators.windows.internal.WindowInternal;
+import org.apache.samza.operators.stream.OutputStreamInternalImpl;
 import org.apache.samza.operators.windows.WindowPane;
+import org.apache.samza.operators.windows.internal.WindowInternal;
 import org.apache.samza.operators.windows.internal.WindowType;
 import org.apache.samza.task.MessageCollector;
 import org.apache.samza.task.TaskCoordinator;
@@ -41,58 +41,79 @@ import java.util.function.Function;
 import java.util.function.Supplier;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertNotNull;
 import static org.mockito.Mockito.mock;
 
 
 public class TestOperatorSpecs {
   @Test
-  public void testGetStreamOperator() {
-    FlatMapFunction<MessageEnvelope, TestMessageEnvelope> transformFn = m -> new ArrayList<TestMessageEnvelope>() { {
-          this.add(new TestMessageEnvelope(m.getKey().toString(), m.getMessage().toString(), 12345L));
+  public void testCreateStreamOperator() {
+    FlatMapFunction<?, TestMessageEnvelope> transformFn = m -> new ArrayList<TestMessageEnvelope>() { {
+          this.add(new TestMessageEnvelope(m.toString(), m.toString(), 12345L));
         } };
     MessageStreamImpl<TestMessageEnvelope> mockOutput = mock(MessageStreamImpl.class);
-    StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
-    StreamOperatorSpec<MessageEnvelope, TestMessageEnvelope> strmOp = OperatorSpecs.createStreamOperatorSpec(transformFn, mockGraph, mockOutput);
-    assertEquals(strmOp.getTransformFn(), transformFn);
-    assertEquals(strmOp.getNextStream(), mockOutput);
+    StreamOperatorSpec<?, TestMessageEnvelope> streamOp =
+        OperatorSpecs.createStreamOperatorSpec(transformFn, mockOutput, 1);
+    assertEquals(streamOp.getTransformFn(), transformFn);
+    assertEquals(streamOp.getNextStream(), mockOutput);
   }
 
   @Test
-  public void testGetSinkOperator() {
+  public void testCreateSinkOperator() {
     SinkFunction<TestMessageEnvelope> sinkFn = (TestMessageEnvelope message, MessageCollector messageCollector,
           TaskCoordinator taskCoordinator) -> { };
-    StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
-    SinkOperatorSpec<TestMessageEnvelope> sinkOp = OperatorSpecs.createSinkOperatorSpec(sinkFn, mockGraph);
+    SinkOperatorSpec<TestMessageEnvelope> sinkOp = OperatorSpecs.createSinkOperatorSpec(sinkFn, 1);
     assertEquals(sinkOp.getSinkFn(), sinkFn);
-    assertTrue(sinkOp.getNextStream() == null);
+    assertEquals(sinkOp.getOpCode(), OperatorSpec.OpCode.SINK);
+    assertEquals(sinkOp.getNextStream(), null);
+  }
+
+  @Test
+  public void testCreateSendToOperator() {
+    OutputStreamInternalImpl<Object, Object, TestMessageEnvelope> mockOutput = mock(OutputStreamInternalImpl.class);
+    SinkOperatorSpec<TestMessageEnvelope> sinkOp = OperatorSpecs.createSendToOperatorSpec(mockOutput, 1);
+    assertNotNull(sinkOp.getSinkFn());
+    assertEquals(sinkOp.getOpCode(), OperatorSpec.OpCode.SEND_TO);
+    assertEquals(sinkOp.getNextStream(), null);
+  }
+
+
+  @Test
+  public void testCreatePartitionByOperator() {
+    OutputStreamInternalImpl<Object, Object, TestMessageEnvelope> mockOutput = mock(OutputStreamInternalImpl.class);
+    SinkOperatorSpec<TestMessageEnvelope> sinkOp = OperatorSpecs.createPartitionByOperatorSpec(mockOutput, 1);
+    assertNotNull(sinkOp.getSinkFn());
+    assertEquals(sinkOp.getOpCode(), OperatorSpec.OpCode.PARTITION_BY);
+    assertEquals(sinkOp.getNextStream(), null);
   }
 
   @Test
-  public void testGetWindowOperator() throws Exception {
+  public void testCreateWindowOperator() throws Exception {
     Function<TestMessageEnvelope, String> keyExtractor = m -> "globalkey";
     FoldLeftFunction<TestMessageEnvelope, Integer> aggregator = (m, c) -> c + 1;
     Supplier<Integer> initialValue = () -> 0;
     //instantiate a window using reflection
     WindowInternal window = new WindowInternal(null, initialValue, aggregator, keyExtractor, null, WindowType.TUMBLING);
 
-    StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
     MessageStreamImpl<WindowPane<String, Integer>> mockWndOut = mock(MessageStreamImpl.class);
-    WindowOperatorSpec spec = OperatorSpecs.<TestMessageEnvelope, String, Integer>createWindowOperatorSpec(window, mockGraph, mockWndOut);
+    WindowOperatorSpec spec =
+        OperatorSpecs.<TestMessageEnvelope, String, Integer>createWindowOperatorSpec(window, mockWndOut, 1);
     assertEquals(spec.getWindow(), window);
     assertEquals(spec.getWindow().getKeyExtractor(), keyExtractor);
     assertEquals(spec.getWindow().getFoldLeftFunction(), aggregator);
   }
 
   @Test
-  public void testGetPartialJoinOperator() {
-    PartialJoinFunction<String, TestMessageEnvelope, TestMessageEnvelope, TestOutputMessageEnvelope> thisPartialJoinFn = mock(PartialJoinFunction.class);
-    PartialJoinFunction<String, TestMessageEnvelope, TestMessageEnvelope, TestOutputMessageEnvelope> otherPartialJoinFn = mock(PartialJoinFunction.class);
+  public void testCreatePartialJoinOperator() {
+    PartialJoinFunction<String, TestMessageEnvelope, TestMessageEnvelope, TestOutputMessageEnvelope> thisPartialJoinFn
+        = mock(PartialJoinFunction.class);
+    PartialJoinFunction<String, TestMessageEnvelope, TestMessageEnvelope, TestOutputMessageEnvelope> otherPartialJoinFn
+        = mock(PartialJoinFunction.class);
     StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
     MessageStreamImpl<TestOutputMessageEnvelope> joinOutput = TestMessageStreamImplUtil.getMessageStreamImpl(mockGraph);
 
-    PartialJoinOperatorSpec<String, TestMessageEnvelope, TestMessageEnvelope, TestOutputMessageEnvelope> partialJoinSpec =
-        OperatorSpecs.createPartialJoinOperatorSpec(thisPartialJoinFn, otherPartialJoinFn, 1000 * 60, mockGraph, joinOutput);
+    PartialJoinOperatorSpec<String, TestMessageEnvelope, TestMessageEnvelope, TestOutputMessageEnvelope> partialJoinSpec
+        = OperatorSpecs.createPartialJoinOperatorSpec(thisPartialJoinFn, otherPartialJoinFn, 1000 * 60, joinOutput, 1);
 
     assertEquals(partialJoinSpec.getNextStream(), joinOutput);
     assertEquals(partialJoinSpec.getThisPartialJoinFn(), thisPartialJoinFn);
@@ -100,13 +121,15 @@ public class TestOperatorSpecs {
   }
 
   @Test
-  public void testGetMergeOperator() {
+  public void testCreateMergeOperator() {
     StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
     MessageStreamImpl<TestMessageEnvelope> output = TestMessageStreamImplUtil.<TestMessageEnvelope>getMessageStreamImpl(mockGraph);
-    StreamOperatorSpec<TestMessageEnvelope, TestMessageEnvelope> mergeOp = OperatorSpecs.createMergeOperatorSpec(mockGraph, output);
-    Function<TestMessageEnvelope, Collection<TestMessageEnvelope>> mergeFn = t -> new ArrayList<TestMessageEnvelope>() { {
-        this.add(t);
-      } };
+    StreamOperatorSpec<TestMessageEnvelope, TestMessageEnvelope> mergeOp =
+        OperatorSpecs.createMergeOperatorSpec(output, 1);
+    Function<TestMessageEnvelope, Collection<TestMessageEnvelope>> mergeFn =
+        t -> new ArrayList<TestMessageEnvelope>() { {
+            this.add(t);
+          } };
     TestMessageEnvelope t = mock(TestMessageEnvelope.class);
     assertEquals(mergeOp.getTransformFn().apply(t), mergeFn.apply(t));
     assertEquals(mergeOp.getNextStream(), output);

http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/test/java/org/apache/samza/operators/triggers/TestClock.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/triggers/TestClock.java b/samza-core/src/test/java/org/apache/samza/operators/triggers/TestClock.java
deleted file mode 100644
index 674a8f1..0000000
--- a/samza-core/src/test/java/org/apache/samza/operators/triggers/TestClock.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.triggers;
-
-import org.apache.samza.util.Clock;
-
-import java.time.Duration;
-
-/**
- * An implementation of {@link Clock} that allows to advance the time by an arbitrary duration.
- * Used for testing.
- */
-public class TestClock implements Clock {
-
-  long currentTime = 1;
-
-  public void advanceTime(Duration duration) {
-    currentTime += duration.toMillis();
-  }
-
-  public void advanceTime(long millis) {
-    currentTime += millis;
-  }
-
-  @Override
-  public long currentTimeMillis() {
-    return currentTime;
-  }
-}