You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2017/02/16 18:40:47 UTC

[04/14] samza git commit: SAMZA-1073: top-level fluent API `

http://git-wip-us.apache.org/repos/asf/samza/blob/b3dd886d/samza-operator/src/test/java/org/apache/samza/example/TestFluentStreamTasks.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/example/TestFluentStreamTasks.java b/samza-operator/src/test/java/org/apache/samza/example/TestFluentStreamTasks.java
new file mode 100644
index 0000000..5f659ba
--- /dev/null
+++ b/samza-operator/src/test/java/org/apache/samza/example/TestFluentStreamTasks.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.example;
+
+import java.lang.reflect.Field;
+import org.apache.samza.Partition;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.impl.OperatorGraph;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.task.StreamOperatorTask;
+import org.apache.samza.task.TaskContext;
+import org.junit.Test;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+
+/**
+ * Unit test for {@link StreamOperatorTask}
+ */
+public class TestFluentStreamTasks {
+
+  private final Set<SystemStreamPartition> inputPartitions = new HashSet<SystemStreamPartition>() { {
+      for (int i = 0; i < 4; i++) {
+        this.add(new SystemStreamPartition("my-system", String.format("my-topic%d", i), new Partition(i)));
+      }
+    } };
+
+  @Test
+  public void testUserTask() throws Exception {
+    Config mockConfig = mock(Config.class);
+    TaskContext mockContext = mock(TaskContext.class);
+    when(mockContext.getSystemStreamPartitions()).thenReturn(this.inputPartitions);
+    WindowGraph userTask = new WindowGraph(this.inputPartitions);
+    StreamOperatorTask adaptorTask = new StreamOperatorTask(userTask);
+    Field pipelineMapFld = StreamOperatorTask.class.getDeclaredField("operatorGraph");
+    pipelineMapFld.setAccessible(true);
+    OperatorGraph opGraph = (OperatorGraph) pipelineMapFld.get(adaptorTask);
+
+    adaptorTask.init(mockConfig, mockContext);
+    this.inputPartitions.forEach(partition -> {
+        assertNotNull(opGraph.get(partition.getSystemStream()));
+      });
+  }
+
+  @Test
+  public void testSplitTask() throws Exception {
+    Config mockConfig = mock(Config.class);
+    TaskContext mockContext = mock(TaskContext.class);
+    when(mockContext.getSystemStreamPartitions()).thenReturn(this.inputPartitions);
+    BroadcastGraph splitTask = new BroadcastGraph(this.inputPartitions);
+    StreamOperatorTask adaptorTask = new StreamOperatorTask(splitTask);
+    Field pipelineMapFld = StreamOperatorTask.class.getDeclaredField("operatorGraph");
+    pipelineMapFld.setAccessible(true);
+    OperatorGraph opGraph = (OperatorGraph) pipelineMapFld.get(adaptorTask);
+
+    adaptorTask.init(mockConfig, mockContext);
+    this.inputPartitions.forEach(partition -> {
+        assertNotNull(opGraph.get(partition.getSystemStream()));
+      });
+  }
+
+  @Test
+  public void testJoinTask() throws Exception {
+    Config mockConfig = mock(Config.class);
+    TaskContext mockContext = mock(TaskContext.class);
+    when(mockContext.getSystemStreamPartitions()).thenReturn(this.inputPartitions);
+    JoinGraph joinTask = new JoinGraph(this.inputPartitions);
+    StreamOperatorTask adaptorTask = new StreamOperatorTask(joinTask);
+    Field pipelineMapFld = StreamOperatorTask.class.getDeclaredField("operatorGraph");
+    pipelineMapFld.setAccessible(true);
+    OperatorGraph opGraph = (OperatorGraph) pipelineMapFld.get(adaptorTask);
+
+    adaptorTask.init(mockConfig, mockContext);
+    this.inputPartitions.forEach(partition -> {
+        assertNotNull(opGraph.get(partition.getSystemStream()));
+      });
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/b3dd886d/samza-operator/src/test/java/org/apache/samza/example/WindowGraph.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/example/WindowGraph.java b/samza-operator/src/test/java/org/apache/samza/example/WindowGraph.java
new file mode 100644
index 0000000..a365411
--- /dev/null
+++ b/samza-operator/src/test/java/org/apache/samza/example/WindowGraph.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.example;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.operators.StreamGraphFactory;
+import org.apache.samza.operators.StreamGraphImpl;
+import org.apache.samza.operators.StreamSpec;
+import org.apache.samza.operators.data.InputMessageEnvelope;
+import org.apache.samza.operators.data.JsonIncomingSystemMessageEnvelope;
+import org.apache.samza.operators.data.MessageEnvelope;
+import org.apache.samza.operators.data.Offset;
+import org.apache.samza.operators.windows.Windows;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamPartition;
+
+import java.time.Duration;
+import java.util.function.BiFunction;
+import java.util.Properties;
+import java.util.Set;
+
+
+/**
+ * Example implementation of a simple user-defined tasks w/ window operators
+ *
+ */
+public class WindowGraph implements StreamGraphFactory {
+  class MessageType {
+    String field1;
+    String field2;
+  }
+
+  private final Set<SystemStreamPartition> inputs;
+
+  WindowGraph(Set<SystemStreamPartition> inputs) {
+    this.inputs = inputs;
+  }
+
+  class JsonMessageEnvelope extends JsonIncomingSystemMessageEnvelope<MessageType> {
+
+    JsonMessageEnvelope(String key, MessageType data, Offset offset, SystemStreamPartition partition) {
+      super(key, data, offset, partition);
+    }
+  }
+
+  @Override
+  public StreamGraph create(Config config) {
+    StreamGraphImpl graph = new StreamGraphImpl();
+    BiFunction<JsonMessageEnvelope, Integer, Integer> maxAggregator = (m, c) -> c + 1;
+    inputs.forEach(source -> graph.<Object, Object, InputMessageEnvelope>createInStream(new StreamSpec() {
+      @Override public SystemStream getSystemStream() {
+        return source.getSystemStream();
+      }
+
+      @Override public Properties getProperties() {
+        return null;
+      }
+    }, null, null).
+        map(m1 -> new JsonMessageEnvelope(this.myMessageKeyFunction(m1), (MessageType) m1.getMessage(), m1.getOffset(),
+            m1.getSystemStreamPartition())).window(Windows.tumblingWindow(Duration.ofMillis(200), maxAggregator)));
+
+    return graph;
+  }
+
+  String myMessageKeyFunction(MessageEnvelope<Object, Object> m) {
+    return m.getKey().toString();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/b3dd886d/samza-operator/src/test/java/org/apache/samza/operators/BroadcastTask.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/operators/BroadcastTask.java b/samza-operator/src/test/java/org/apache/samza/operators/BroadcastTask.java
deleted file mode 100644
index 663d98c..0000000
--- a/samza-operator/src/test/java/org/apache/samza/operators/BroadcastTask.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.operators;
-
-
-import org.apache.samza.operators.data.IncomingSystemMessageEnvelope;
-import org.apache.samza.operators.data.JsonIncomingSystemMessageEnvelope;
-import org.apache.samza.operators.data.Offset;
-import org.apache.samza.operators.triggers.Triggers;
-import org.apache.samza.operators.windows.Windows;
-import org.apache.samza.system.SystemStreamPartition;
-
-import java.time.Duration;
-import java.util.Map;
-import java.util.function.BiFunction;
-
-
-/**
- * Example implementation of split stream tasks
- *
- */
-public class BroadcastTask implements StreamOperatorTask {
-  class MessageType {
-    String field1;
-    String field2;
-    String field3;
-    String field4;
-    String parKey;
-    private long timestamp;
-
-    public long getTimestamp() {
-      return this.timestamp;
-    }
-  }
-
-  class JsonMessageEnvelope extends JsonIncomingSystemMessageEnvelope<MessageType> {
-    JsonMessageEnvelope(String key, MessageType data, Offset offset, SystemStreamPartition partition) {
-      super(key, data, offset, partition);
-    }
-  }
-
-  @Override
-  public void transform(Map<SystemStreamPartition, MessageStream<IncomingSystemMessageEnvelope>> messageStreams) {
-    BiFunction<JsonMessageEnvelope, Integer, Integer> sumAggregator = (m, c) -> c + 1;
-    messageStreams.values().forEach(entry -> {
-        MessageStream<JsonMessageEnvelope> inputStream = entry.map(this::getInputMessage);
-
-        inputStream.filter(this::myFilter1)
-          .window(Windows.tumblingWindow(Duration.ofMillis(100), sumAggregator)
-            .setLateTrigger(Triggers.any(Triggers.count(30000), Triggers.timeSinceFirstMessage(Duration.ofMillis(10)))));
-
-        inputStream.filter(this::myFilter1)
-          .window(Windows.tumblingWindow(Duration.ofMillis(100), sumAggregator)
-            .setLateTrigger(Triggers.any(Triggers.count(30000), Triggers.timeSinceFirstMessage(Duration.ofMillis(10)))));
-
-        inputStream.filter(this::myFilter1)
-          .window(Windows.tumblingWindow(Duration.ofMillis(100), sumAggregator)
-            .setLateTrigger(Triggers.any(Triggers.count(30000), Triggers.timeSinceFirstMessage(Duration.ofMillis(10)))));
-      });
-  }
-
-  JsonMessageEnvelope getInputMessage(IncomingSystemMessageEnvelope m1) {
-    return (JsonMessageEnvelope) m1.getMessage();
-  }
-
-  boolean myFilter1(JsonMessageEnvelope m1) {
-    // Do user defined processing here
-    return m1.getMessage().parKey.equals("key1");
-  }
-
-  boolean myFilter2(JsonMessageEnvelope m1) {
-    // Do user defined processing here
-    return m1.getMessage().parKey.equals("key2");
-  }
-
-  boolean myFilter3(JsonMessageEnvelope m1) {
-    return m1.getMessage().parKey.equals("key3");
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/b3dd886d/samza-operator/src/test/java/org/apache/samza/operators/JoinTask.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/operators/JoinTask.java b/samza-operator/src/test/java/org/apache/samza/operators/JoinTask.java
deleted file mode 100644
index 1b10609..0000000
--- a/samza-operator/src/test/java/org/apache/samza/operators/JoinTask.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.operators;
-
-import org.apache.samza.operators.data.IncomingSystemMessageEnvelope;
-import org.apache.samza.operators.data.Offset;
-import org.apache.samza.operators.data.JsonIncomingSystemMessageEnvelope;
-import org.apache.samza.system.SystemStreamPartition;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-
-/**
- * Example implementation of unique key-based stream-stream join tasks
- *
- */
-public class JoinTask implements StreamOperatorTask {
-  class MessageType {
-    String joinKey;
-    List<String> joinFields = new ArrayList<>();
-  }
-
-  class JsonMessageEnvelope extends JsonIncomingSystemMessageEnvelope<MessageType> {
-    JsonMessageEnvelope(String key, MessageType data, Offset offset, SystemStreamPartition partition) {
-      super(key, data, offset, partition);
-    }
-  }
-
-  MessageStream<JsonMessageEnvelope> joinOutput = null;
-
-  @Override
-  public void transform(Map<SystemStreamPartition, MessageStream<IncomingSystemMessageEnvelope>> messageStreams) {
-    messageStreams.values().forEach(messageStream -> {
-        MessageStream<JsonMessageEnvelope> newSource = messageStream.map(this::getInputMessage);
-        if (joinOutput == null) {
-          joinOutput = newSource;
-        } else {
-          joinOutput = joinOutput.join(newSource, (m1, m2) -> this.myJoinResult(m1, m2));
-        }
-      });
-  }
-
-  private JsonMessageEnvelope getInputMessage(IncomingSystemMessageEnvelope ism) {
-    return new JsonMessageEnvelope(
-        ((MessageType) ism.getMessage()).joinKey,
-        (MessageType) ism.getMessage(),
-        ism.getOffset(),
-        ism.getSystemStreamPartition());
-  }
-
-  JsonMessageEnvelope myJoinResult(JsonMessageEnvelope m1, JsonMessageEnvelope m2) {
-    MessageType newJoinMsg = new MessageType();
-    newJoinMsg.joinKey = m1.getKey();
-    newJoinMsg.joinFields.addAll(m1.getMessage().joinFields);
-    newJoinMsg.joinFields.addAll(m2.getMessage().joinFields);
-    return new JsonMessageEnvelope(m1.getMessage().joinKey, newJoinMsg, null, null);
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/b3dd886d/samza-operator/src/test/java/org/apache/samza/operators/TestFluentStreamAdaptorTask.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/operators/TestFluentStreamAdaptorTask.java b/samza-operator/src/test/java/org/apache/samza/operators/TestFluentStreamAdaptorTask.java
deleted file mode 100644
index 61bb32a..0000000
--- a/samza-operator/src/test/java/org/apache/samza/operators/TestFluentStreamAdaptorTask.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators;
-
-import org.apache.samza.Partition;
-import org.apache.samza.config.Config;
-import org.apache.samza.operators.impl.OperatorImpl;
-import org.apache.samza.system.SystemStreamPartition;
-import org.apache.samza.task.TaskContext;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.Mockito;
-
-import java.lang.reflect.Field;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-
-public class TestFluentStreamAdaptorTask {
-  Field userTaskField = null;
-  Field operatorChainsField = null;
-
-  @Before
-  public void prep() throws NoSuchFieldException {
-    userTaskField = StreamOperatorAdaptorTask.class.getDeclaredField("userTask");
-    operatorChainsField = StreamOperatorAdaptorTask.class.getDeclaredField("operatorChains");
-    userTaskField.setAccessible(true);
-    operatorChainsField.setAccessible(true);
-  }
-
-  @Test
-  public void testConstructor() throws IllegalAccessException {
-    StreamOperatorTask userTask = mock(StreamOperatorTask.class);
-    StreamOperatorAdaptorTask adaptorTask = new StreamOperatorAdaptorTask(userTask);
-    StreamOperatorTask taskMemberVar = (StreamOperatorTask) userTaskField.get(adaptorTask);
-    Map<SystemStreamPartition, OperatorImpl> chainsMap = (Map<SystemStreamPartition, OperatorImpl>) operatorChainsField.get(adaptorTask);
-    assertEquals(taskMemberVar, userTask);
-    assertTrue(chainsMap.isEmpty());
-  }
-
-  @Test
-  public void testInit() throws Exception {
-    StreamOperatorTask userTask = mock(StreamOperatorTask.class);
-    StreamOperatorAdaptorTask adaptorTask = new StreamOperatorAdaptorTask(userTask);
-    Config mockConfig = mock(Config.class);
-    TaskContext mockContext = mock(TaskContext.class);
-    Set<SystemStreamPartition> testInputs = new HashSet() { {
-        this.add(new SystemStreamPartition("test-sys", "test-strm", new Partition(0)));
-        this.add(new SystemStreamPartition("test-sys", "test-strm", new Partition(1)));
-      } };
-    when(mockContext.getSystemStreamPartitions()).thenReturn(testInputs);
-    adaptorTask.init(mockConfig, mockContext);
-    verify(userTask, times(1)).transform(Mockito.anyMap());
-    Map<SystemStreamPartition, OperatorImpl> chainsMap = (Map<SystemStreamPartition, OperatorImpl>) operatorChainsField.get(adaptorTask);
-    assertTrue(chainsMap.size() == 2);
-    assertTrue(chainsMap.containsKey(testInputs.toArray()[0]));
-    assertTrue(chainsMap.containsKey(testInputs.toArray()[1]));
-  }
-
-  // TODO: window and process methods to be added after implementation of ChainedOperators.create()
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/b3dd886d/samza-operator/src/test/java/org/apache/samza/operators/TestFluentStreamTasks.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/operators/TestFluentStreamTasks.java b/samza-operator/src/test/java/org/apache/samza/operators/TestFluentStreamTasks.java
deleted file mode 100644
index d804bf8..0000000
--- a/samza-operator/src/test/java/org/apache/samza/operators/TestFluentStreamTasks.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators;
-
-import org.apache.samza.Partition;
-import org.apache.samza.config.Config;
-
-import org.apache.samza.operators.impl.OperatorImpl;
-import org.apache.samza.system.SystemStreamPartition;
-import org.apache.samza.task.TaskContext;
-import org.junit.Test;
-
-import java.lang.reflect.Field;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-
-/**
- * Unit test for {@link StreamOperatorTask}
- */
-public class TestFluentStreamTasks {
-
-  private final WindowTask userTask = new WindowTask();
-
-  private final BroadcastTask splitTask = new BroadcastTask();
-
-  private final JoinTask joinTask = new JoinTask();
-
-  private final Set<SystemStreamPartition> inputPartitions = new HashSet<SystemStreamPartition>() { {
-      for (int i = 0; i < 4; i++) {
-        this.add(new SystemStreamPartition("my-system", "my-topic1", new Partition(i)));
-      }
-    } };
-
-  @Test
-  public void testUserTask() throws Exception {
-    Config mockConfig = mock(Config.class);
-    TaskContext mockContext = mock(TaskContext.class);
-    when(mockContext.getSystemStreamPartitions()).thenReturn(this.inputPartitions);
-    StreamOperatorAdaptorTask adaptorTask = new StreamOperatorAdaptorTask(this.userTask);
-    Field pipelineMapFld = StreamOperatorAdaptorTask.class.getDeclaredField("operatorChains");
-    pipelineMapFld.setAccessible(true);
-    Map<SystemStreamPartition, OperatorImpl> pipelineMap =
-        (Map<SystemStreamPartition, OperatorImpl>) pipelineMapFld.get(adaptorTask);
-
-    adaptorTask.init(mockConfig, mockContext);
-    assertEquals(pipelineMap.size(), 4);
-    this.inputPartitions.forEach(partition -> {
-        assertNotNull(pipelineMap.get(partition));
-      });
-  }
-
-  @Test
-  public void testSplitTask() throws Exception {
-    Config mockConfig = mock(Config.class);
-    TaskContext mockContext = mock(TaskContext.class);
-    when(mockContext.getSystemStreamPartitions()).thenReturn(this.inputPartitions);
-    StreamOperatorAdaptorTask adaptorTask = new StreamOperatorAdaptorTask(this.splitTask);
-    Field pipelineMapFld = StreamOperatorAdaptorTask.class.getDeclaredField("operatorChains");
-    pipelineMapFld.setAccessible(true);
-    Map<SystemStreamPartition, OperatorImpl> pipelineMap =
-        (Map<SystemStreamPartition, OperatorImpl>) pipelineMapFld.get(adaptorTask);
-
-    adaptorTask.init(mockConfig, mockContext);
-    assertEquals(pipelineMap.size(), 4);
-    this.inputPartitions.forEach(partition -> {
-        assertNotNull(pipelineMap.get(partition));
-      });
-  }
-
-  @Test
-  public void testJoinTask() throws Exception {
-    Config mockConfig = mock(Config.class);
-    TaskContext mockContext = mock(TaskContext.class);
-    when(mockContext.getSystemStreamPartitions()).thenReturn(this.inputPartitions);
-    StreamOperatorAdaptorTask adaptorTask = new StreamOperatorAdaptorTask(this.joinTask);
-    Field pipelineMapFld = StreamOperatorAdaptorTask.class.getDeclaredField("operatorChains");
-    pipelineMapFld.setAccessible(true);
-    Map<SystemStreamPartition, OperatorImpl> pipelineMap =
-        (Map<SystemStreamPartition, OperatorImpl>) pipelineMapFld.get(adaptorTask);
-
-    adaptorTask.init(mockConfig, mockContext);
-    assertEquals(pipelineMap.size(), 4);
-    this.inputPartitions.forEach(partition -> {
-        assertNotNull(pipelineMap.get(partition));
-      });
-  }
-
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/b3dd886d/samza-operator/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java b/samza-operator/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
index 5991e2f..d5607d8 100644
--- a/samza-operator/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
+++ b/samza-operator/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
@@ -21,6 +21,7 @@ package org.apache.samza.operators;
 import org.apache.samza.operators.functions.FilterFunction;
 import org.apache.samza.operators.functions.FlatMapFunction;
 import org.apache.samza.operators.functions.JoinFunction;
+import org.apache.samza.operators.functions.KeyValueJoinFunction;
 import org.apache.samza.operators.functions.MapFunction;
 import org.apache.samza.operators.functions.SinkFunction;
 import org.apache.samza.operators.spec.OperatorSpec;
@@ -29,6 +30,7 @@ import org.apache.samza.operators.spec.SinkOperatorSpec;
 import org.apache.samza.operators.spec.StreamOperatorSpec;
 import org.apache.samza.system.OutgoingMessageEnvelope;
 import org.apache.samza.system.SystemStream;
+import org.apache.samza.task.MessageCollector;
 import org.apache.samza.task.TaskCoordinator;
 import org.junit.Test;
 
@@ -46,17 +48,19 @@ import static org.mockito.Mockito.when;
 
 public class TestMessageStreamImpl {
 
+  private StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
+
   @Test
   public void testMap() {
-    MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>();
-    MapFunction<TestMessageEnvelope, TestOutputMessageEnvelope> xMap =
-        m -> new TestOutputMessageEnvelope(m.getKey(), m.getMessage().getValue().length() + 1);
+    MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph);
+    MapFunction<TestMessageEnvelope, TestOutputMessageEnvelope> xMap = (TestMessageEnvelope m)  ->
+        new TestOutputMessageEnvelope(m.getKey(), m.getMessage().getValue().length() + 1);
     MessageStream<TestOutputMessageEnvelope> outputStream = inputStream.map(xMap);
     Collection<OperatorSpec> subs = inputStream.getRegisteredOperatorSpecs();
     assertEquals(subs.size(), 1);
     OperatorSpec<TestOutputMessageEnvelope> mapOp = subs.iterator().next();
     assertTrue(mapOp instanceof StreamOperatorSpec);
-    assertEquals(mapOp.getOutputStream(), outputStream);
+    assertEquals(mapOp.getNextStream(), outputStream);
     // assert that the transformation function is what we defined above
     TestMessageEnvelope xTestMsg = mock(TestMessageEnvelope.class);
     TestMessageEnvelope.MessageType mockInnerTestMessage = mock(TestMessageEnvelope.MessageType.class);
@@ -73,33 +77,33 @@ public class TestMessageStreamImpl {
 
   @Test
   public void testFlatMap() {
-    MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>();
+    MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph);
     Set<TestOutputMessageEnvelope> flatOuts = new HashSet<TestOutputMessageEnvelope>() { {
         this.add(mock(TestOutputMessageEnvelope.class));
         this.add(mock(TestOutputMessageEnvelope.class));
         this.add(mock(TestOutputMessageEnvelope.class));
       } };
-    FlatMapFunction<TestMessageEnvelope, TestOutputMessageEnvelope> xFlatMap = m -> flatOuts;
+    FlatMapFunction<TestMessageEnvelope, TestOutputMessageEnvelope> xFlatMap = (TestMessageEnvelope message) -> flatOuts;
     MessageStream<TestOutputMessageEnvelope> outputStream = inputStream.flatMap(xFlatMap);
     Collection<OperatorSpec> subs = inputStream.getRegisteredOperatorSpecs();
     assertEquals(subs.size(), 1);
     OperatorSpec<TestOutputMessageEnvelope> flatMapOp = subs.iterator().next();
     assertTrue(flatMapOp instanceof StreamOperatorSpec);
-    assertEquals(flatMapOp.getOutputStream(), outputStream);
+    assertEquals(flatMapOp.getNextStream(), outputStream);
     // assert that the transformation function is what we defined above
     assertEquals(((StreamOperatorSpec<TestMessageEnvelope, TestOutputMessageEnvelope>) flatMapOp).getTransformFn(), xFlatMap);
   }
 
   @Test
   public void testFilter() {
-    MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>();
-    FilterFunction<TestMessageEnvelope> xFilter = m -> m.getMessage().getEventTime() > 123456L;
+    MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph);
+    FilterFunction<TestMessageEnvelope> xFilter = (TestMessageEnvelope m) -> m.getMessage().getEventTime() > 123456L;
     MessageStream<TestMessageEnvelope> outputStream = inputStream.filter(xFilter);
     Collection<OperatorSpec> subs = inputStream.getRegisteredOperatorSpecs();
     assertEquals(subs.size(), 1);
     OperatorSpec<TestMessageEnvelope> filterOp = subs.iterator().next();
     assertTrue(filterOp instanceof StreamOperatorSpec);
-    assertEquals(filterOp.getOutputStream(), outputStream);
+    assertEquals(filterOp.getNextStream(), outputStream);
     // assert that the transformation function is what we defined above
     FlatMapFunction<TestMessageEnvelope, TestMessageEnvelope> txfmFn = ((StreamOperatorSpec<TestMessageEnvelope, TestMessageEnvelope>) filterOp).getTransformFn();
     TestMessageEnvelope mockMsg = mock(TestMessageEnvelope.class);
@@ -117,8 +121,8 @@ public class TestMessageStreamImpl {
 
   @Test
   public void testSink() {
-    MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>();
-    SinkFunction<TestMessageEnvelope> xSink = (m, mc, tc) -> {
+    MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph);
+    SinkFunction<TestMessageEnvelope> xSink = (TestMessageEnvelope m, MessageCollector mc, TaskCoordinator tc) -> {
       mc.send(new OutgoingMessageEnvelope(new SystemStream("test-sys", "test-stream"), m.getMessage()));
       tc.commit(TaskCoordinator.RequestScope.CURRENT_TASK);
     };
@@ -128,26 +132,28 @@ public class TestMessageStreamImpl {
     OperatorSpec<TestMessageEnvelope> sinkOp = subs.iterator().next();
     assertTrue(sinkOp instanceof SinkOperatorSpec);
     assertEquals(((SinkOperatorSpec) sinkOp).getSinkFn(), xSink);
-    assertNull(((SinkOperatorSpec) sinkOp).getOutputStream());
+    assertNull(((SinkOperatorSpec) sinkOp).getNextStream());
   }
 
   @Test
   public void testJoin() {
-    MessageStreamImpl<TestMessageEnvelope> source1 = new MessageStreamImpl<>();
-    MessageStreamImpl<TestMessageEnvelope> source2 = new MessageStreamImpl<>();
-    JoinFunction<TestMessageEnvelope, TestMessageEnvelope, TestOutputMessageEnvelope> joiner =
-        (m1, m2) -> new TestOutputMessageEnvelope(m1.getKey(), m1.getMessage().getValue().length() + m2.getMessage().getValue().length());
+    MessageStreamImpl<TestMessageEnvelope> source1 = new MessageStreamImpl<>(mockGraph);
+    MessageStreamImpl<TestMessageEnvelope> source2 = new MessageStreamImpl<>(mockGraph);
+    JoinFunction<String, TestMessageEnvelope, TestMessageEnvelope, TestOutputMessageEnvelope> joiner =
+        (KeyValueJoinFunction<String, TestMessageEnvelope, TestMessageEnvelope, TestOutputMessageEnvelope>)
+            (m1, m2) -> new TestOutputMessageEnvelope(m1.getKey(), m1.getMessage().getValue().length() + m2.getMessage().getValue().length());
+
     MessageStream<TestOutputMessageEnvelope> joinOutput = source1.join(source2, joiner);
     Collection<OperatorSpec> subs = source1.getRegisteredOperatorSpecs();
     assertEquals(subs.size(), 1);
     OperatorSpec<TestMessageEnvelope> joinOp1 = subs.iterator().next();
     assertTrue(joinOp1 instanceof PartialJoinOperatorSpec);
-    assertEquals(((PartialJoinOperatorSpec) joinOp1).getOutputStream(), joinOutput);
+    assertEquals(((PartialJoinOperatorSpec) joinOp1).getNextStream(), joinOutput);
     subs = source2.getRegisteredOperatorSpecs();
     assertEquals(subs.size(), 1);
     OperatorSpec<TestMessageEnvelope> joinOp2 = subs.iterator().next();
     assertTrue(joinOp2 instanceof PartialJoinOperatorSpec);
-    assertEquals(((PartialJoinOperatorSpec) joinOp2).getOutputStream(), joinOutput);
+    assertEquals(((PartialJoinOperatorSpec) joinOp2).getNextStream(), joinOutput);
     TestMessageEnvelope joinMsg1 = new TestMessageEnvelope("test-join-1", "join-msg-001", 11111L);
     TestMessageEnvelope joinMsg2 = new TestMessageEnvelope("test-join-2", "join-msg-002", 22222L);
     TestOutputMessageEnvelope xOut = (TestOutputMessageEnvelope) ((PartialJoinOperatorSpec) joinOp1).getTransformFn().apply(joinMsg1, joinMsg2);
@@ -160,10 +166,10 @@ public class TestMessageStreamImpl {
 
   @Test
   public void testMerge() {
-    MessageStream<TestMessageEnvelope> merge1 = new MessageStreamImpl<>();
+    MessageStream<TestMessageEnvelope> merge1 = new MessageStreamImpl<>(mockGraph);
     Collection<MessageStream<TestMessageEnvelope>> others = new ArrayList<MessageStream<TestMessageEnvelope>>() { {
-        this.add(new MessageStreamImpl<>());
-        this.add(new MessageStreamImpl<>());
+        this.add(new MessageStreamImpl<>(mockGraph));
+        this.add(new MessageStreamImpl<>(mockGraph));
       } };
     MessageStream<TestMessageEnvelope> mergeOutput = merge1.merge(others);
     validateMergeOperator(merge1, mergeOutput);
@@ -176,7 +182,7 @@ public class TestMessageStreamImpl {
     assertEquals(subs.size(), 1);
     OperatorSpec<TestMessageEnvelope> mergeOp = subs.iterator().next();
     assertTrue(mergeOp instanceof StreamOperatorSpec);
-    assertEquals(((StreamOperatorSpec) mergeOp).getOutputStream(), mergeOutput);
+    assertEquals(((StreamOperatorSpec) mergeOp).getNextStream(), mergeOutput);
     TestMessageEnvelope mockMsg = mock(TestMessageEnvelope.class);
     Collection<TestMessageEnvelope> outputs = ((StreamOperatorSpec<TestMessageEnvelope, TestMessageEnvelope>) mergeOp).getTransformFn().apply(mockMsg);
     assertEquals(outputs.size(), 1);

http://git-wip-us.apache.org/repos/asf/samza/blob/b3dd886d/samza-operator/src/test/java/org/apache/samza/operators/TestMessageStreamImplUtil.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/operators/TestMessageStreamImplUtil.java b/samza-operator/src/test/java/org/apache/samza/operators/TestMessageStreamImplUtil.java
new file mode 100644
index 0000000..c4e9f51
--- /dev/null
+++ b/samza-operator/src/test/java/org/apache/samza/operators/TestMessageStreamImplUtil.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators;
+
+
+public class TestMessageStreamImplUtil {
+  public static <M> MessageStreamImpl<M> getMessageStreamImpl(StreamGraphImpl graph) {
+    return new MessageStreamImpl<M>(graph);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/b3dd886d/samza-operator/src/test/java/org/apache/samza/operators/WindowTask.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/operators/WindowTask.java b/samza-operator/src/test/java/org/apache/samza/operators/WindowTask.java
deleted file mode 100644
index e176063..0000000
--- a/samza-operator/src/test/java/org/apache/samza/operators/WindowTask.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.operators;
-
-import org.apache.samza.operators.data.IncomingSystemMessageEnvelope;
-import org.apache.samza.operators.data.JsonIncomingSystemMessageEnvelope;
-import org.apache.samza.operators.data.MessageEnvelope;
-import org.apache.samza.operators.data.Offset;
-import org.apache.samza.operators.windows.Windows;
-import org.apache.samza.system.SystemStreamPartition;
-
-import java.time.Duration;
-import java.util.Map;
-import java.util.function.BiFunction;
-
-
-/**
- * Example implementation of a simple user-defined tasks w/ window operators
- *
- */
-public class WindowTask implements StreamOperatorTask {
-  class MessageType {
-    String field1;
-    String field2;
-  }
-
-  class JsonMessageEnvelope extends JsonIncomingSystemMessageEnvelope<MessageType> {
-
-    JsonMessageEnvelope(String key, MessageType data, Offset offset, SystemStreamPartition partition) {
-      super(key, data, offset, partition);
-    }
-  }
-
-  @Override public void transform(Map<SystemStreamPartition, MessageStream<IncomingSystemMessageEnvelope>> messageStreams) {
-    BiFunction<JsonMessageEnvelope, Integer, Integer> maxAggregator = (m, c) -> c + 1;
-    messageStreams.values().forEach(source ->
-        source.map(m1 -> new JsonMessageEnvelope(this.myMessageKeyFunction(m1), (MessageType) m1.getMessage(), m1.getOffset(), m1.getSystemStreamPartition()))
-            .window(Windows.tumblingWindow(Duration.ofMillis(200), maxAggregator))
-    );
-  }
-
-  String myMessageKeyFunction(MessageEnvelope<Object, Object> m) {
-    return m.getKey().toString();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/b3dd886d/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java
index bae98e3..ec63d41 100644
--- a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java
+++ b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java
@@ -18,17 +18,26 @@
  */
 package org.apache.samza.operators.impl;
 
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import org.apache.samza.config.Config;
 import org.apache.samza.operators.MessageStreamImpl;
+import org.apache.samza.operators.StreamGraphImpl;
 import org.apache.samza.operators.TestMessageEnvelope;
+import org.apache.samza.operators.TestMessageStreamImplUtil;
 import org.apache.samza.operators.TestOutputMessageEnvelope;
 import org.apache.samza.operators.data.MessageEnvelope;
 import org.apache.samza.operators.functions.FlatMapFunction;
+import org.apache.samza.operators.functions.KeyValueJoinFunction;
+import org.apache.samza.operators.functions.PartialJoinFunction;
 import org.apache.samza.operators.functions.SinkFunction;
+import org.apache.samza.operators.spec.OperatorSpec;
 import org.apache.samza.operators.spec.WindowOperatorSpec;
 import org.apache.samza.operators.spec.PartialJoinOperatorSpec;
 import org.apache.samza.operators.spec.SinkOperatorSpec;
 import org.apache.samza.operators.spec.StreamOperatorSpec;
 import org.apache.samza.operators.windows.Windows;
+import org.apache.samza.operators.windows.internal.WindowInternal;
 import org.apache.samza.task.TaskContext;
 import org.junit.Before;
 import org.junit.Test;
@@ -38,7 +47,6 @@ import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.Set;
-import java.util.function.BiFunction;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotSame;
@@ -49,25 +57,46 @@ import static org.mockito.Mockito.when;
 
 public class TestOperatorImpls {
   Field nextOperatorsField = null;
+  Method createOpMethod = null;
+  Method createOpsMethod = null;
 
   @Before
-  public void prep() throws NoSuchFieldException {
+  public void prep() throws NoSuchFieldException, NoSuchMethodException {
     nextOperatorsField = OperatorImpl.class.getDeclaredField("nextOperators");
     nextOperatorsField.setAccessible(true);
+
+    createOpMethod = OperatorGraph.class.getDeclaredMethod("createOperatorImpl", MessageStreamImpl.class,
+        OperatorSpec.class, Config.class, TaskContext.class);
+    createOpMethod.setAccessible(true);
+
+    createOpsMethod = OperatorGraph.class.getDeclaredMethod("createOperatorImpls", MessageStreamImpl.class, Config.class, TaskContext.class);
+    createOpsMethod.setAccessible(true);
   }
-  
+
   @Test
-  public void testCreateOperator() throws NoSuchFieldException, IllegalAccessException {
+  public void testCreateOperator() throws NoSuchFieldException, IllegalAccessException, InvocationTargetException {
     // get window operator
     WindowOperatorSpec mockWnd = mock(WindowOperatorSpec.class);
-    OperatorImpl<TestMessageEnvelope, ? extends MessageEnvelope> opImpl = OperatorImpls.createOperatorImpl(mockWnd);
+    WindowInternal<TestMessageEnvelope, String, Integer> windowInternal = new WindowInternal<>(null, null, null, null);
+    when(mockWnd.getWindow()).thenReturn(windowInternal);
+    MessageStreamImpl<TestMessageEnvelope> mockStream = mock(MessageStreamImpl.class);
+    Config mockConfig = mock(Config.class);
+    TaskContext mockContext = mock(TaskContext.class);
+
+    OperatorGraph opGraph = new OperatorGraph();
+    OperatorImpl<TestMessageEnvelope, ? extends MessageEnvelope> opImpl = (OperatorImpl<TestMessageEnvelope, ? extends MessageEnvelope>)
+        createOpMethod.invoke(opGraph, mockStream, mockWnd, mockConfig, mockContext);
     assertTrue(opImpl instanceof WindowOperatorImpl);
+    Field wndInternalField = WindowOperatorImpl.class.getDeclaredField("window");
+    wndInternalField.setAccessible(true);
+    WindowInternal wndInternal = (WindowInternal) wndInternalField.get(opImpl);
+    assertEquals(wndInternal, windowInternal);
 
     // get simple operator
     StreamOperatorSpec<TestMessageEnvelope, TestOutputMessageEnvelope> mockSimpleOp = mock(StreamOperatorSpec.class);
     FlatMapFunction<TestMessageEnvelope, TestOutputMessageEnvelope> mockTxfmFn = mock(FlatMapFunction.class);
     when(mockSimpleOp.getTransformFn()).thenReturn(mockTxfmFn);
-    opImpl = OperatorImpls.createOperatorImpl(mockSimpleOp);
+    opImpl = (OperatorImpl<TestMessageEnvelope, ? extends MessageEnvelope>) createOpMethod.invoke(opGraph, mockStream, mockSimpleOp, mockConfig, mockContext);
     assertTrue(opImpl instanceof StreamOperatorImpl);
     Field txfmFnField = StreamOperatorImpl.class.getDeclaredField("transformFn");
     txfmFnField.setAccessible(true);
@@ -77,7 +106,7 @@ public class TestOperatorImpls {
     SinkFunction<TestMessageEnvelope> sinkFn = (m, mc, tc) -> { };
     SinkOperatorSpec<TestMessageEnvelope> sinkOp = mock(SinkOperatorSpec.class);
     when(sinkOp.getSinkFn()).thenReturn(sinkFn);
-    opImpl = OperatorImpls.createOperatorImpl(sinkOp);
+    opImpl = (OperatorImpl<TestMessageEnvelope, ? extends MessageEnvelope>) createOpMethod.invoke(opGraph, mockStream, sinkOp, mockConfig, mockContext);
     assertTrue(opImpl instanceof SinkOperatorImpl);
     Field sinkFnField = SinkOperatorImpl.class.getDeclaredField("sinkFn");
     sinkFnField.setAccessible(true);
@@ -86,28 +115,33 @@ public class TestOperatorImpls {
     // get join operator
     PartialJoinOperatorSpec<TestMessageEnvelope, String, TestMessageEnvelope, TestOutputMessageEnvelope> joinOp = mock(PartialJoinOperatorSpec.class);
     TestOutputMessageEnvelope mockOutput = mock(TestOutputMessageEnvelope.class);
-    BiFunction<TestMessageEnvelope, TestMessageEnvelope, TestOutputMessageEnvelope> joinFn = (m1, m2) -> mockOutput;
+    PartialJoinFunction<String, TestMessageEnvelope, TestMessageEnvelope, TestOutputMessageEnvelope> joinFn = mock(PartialJoinFunction.class);
     when(joinOp.getTransformFn()).thenReturn(joinFn);
-    opImpl = OperatorImpls.createOperatorImpl(joinOp);
+    opImpl = (OperatorImpl<TestMessageEnvelope, ? extends MessageEnvelope>) createOpMethod.invoke(opGraph, mockStream, joinOp, mockConfig, mockContext);
     assertTrue(opImpl instanceof PartialJoinOperatorImpl);
   }
 
   @Test
-  public void testEmptyChain() {
+  public void testEmptyChain() throws InvocationTargetException, IllegalAccessException {
     // test creation of empty chain
-    MessageStreamImpl<TestMessageEnvelope> testStream = new MessageStreamImpl<>();
+    MessageStreamImpl<TestMessageEnvelope> testStream = mock(MessageStreamImpl.class);
     TaskContext mockContext = mock(TaskContext.class);
-    RootOperatorImpl operatorChain = OperatorImpls.createOperatorImpls(testStream, mockContext);
+    Config mockConfig = mock(Config.class);
+    OperatorGraph opGraph = new OperatorGraph();
+    RootOperatorImpl operatorChain = (RootOperatorImpl) createOpsMethod.invoke(opGraph, testStream, mockConfig, mockContext);
     assertTrue(operatorChain != null);
   }
 
   @Test
-  public void testLinearChain() throws IllegalAccessException {
+  public void testLinearChain() throws IllegalAccessException, InvocationTargetException {
     // test creation of linear chain
-    MessageStreamImpl<TestMessageEnvelope> testInput = new MessageStreamImpl<>();
+    StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
+    MessageStreamImpl<TestMessageEnvelope> testInput = TestMessageStreamImplUtil.<TestMessageEnvelope>getMessageStreamImpl(mockGraph);
     TaskContext mockContext = mock(TaskContext.class);
-    testInput.map(m -> m).window(Windows.tumblingWindow(Duration.ofMillis(1000)));
-    RootOperatorImpl operatorChain = OperatorImpls.createOperatorImpls(testInput, mockContext);
+    Config mockConfig = mock(Config.class);
+    testInput.map(m -> m).window(Windows.keyedSessionWindow(TestMessageEnvelope::getKey, Duration.ofMinutes(10)));
+    OperatorGraph opGraph = new OperatorGraph();
+    RootOperatorImpl operatorChain = (RootOperatorImpl) createOpsMethod.invoke(opGraph, testInput, mockConfig, mockContext);
     Set<OperatorImpl> subsSet = (Set<OperatorImpl>) nextOperatorsField.get(operatorChain);
     assertEquals(subsSet.size(), 1);
     OperatorImpl<TestMessageEnvelope, TestMessageEnvelope> firstOpImpl = subsSet.iterator().next();
@@ -119,13 +153,16 @@ public class TestOperatorImpls {
   }
 
   @Test
-  public void testBroadcastChain() throws IllegalAccessException {
+  public void testBroadcastChain() throws IllegalAccessException, InvocationTargetException {
     // test creation of broadcast chain
-    MessageStreamImpl<TestMessageEnvelope> testInput = new MessageStreamImpl<>();
+    StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
+    MessageStreamImpl<TestMessageEnvelope> testInput = TestMessageStreamImplUtil.<TestMessageEnvelope>getMessageStreamImpl(mockGraph);
     TaskContext mockContext = mock(TaskContext.class);
+    Config mockConfig = mock(Config.class);
     testInput.filter(m -> m.getMessage().getEventTime() > 123456L).flatMap(m -> new ArrayList() { { this.add(m); this.add(m); } });
     testInput.filter(m -> m.getMessage().getEventTime() < 123456L).map(m -> m);
-    RootOperatorImpl operatorChain = OperatorImpls.createOperatorImpls(testInput, mockContext);
+    OperatorGraph opGraph = new OperatorGraph();
+    RootOperatorImpl operatorChain = (RootOperatorImpl) createOpsMethod.invoke(opGraph, testInput, mockConfig, mockContext);
     Set<OperatorImpl> subsSet = (Set<OperatorImpl>) nextOperatorsField.get(operatorChain);
     assertEquals(subsSet.size(), 2);
     Iterator<OperatorImpl> iter = subsSet.iterator();
@@ -146,18 +183,23 @@ public class TestOperatorImpls {
   }
 
   @Test
-  public void testJoinChain() throws IllegalAccessException {
+  public void testJoinChain() throws IllegalAccessException, InvocationTargetException {
     // test creation of join chain
-    MessageStreamImpl<TestMessageEnvelope> input1 = new MessageStreamImpl<>();
-    MessageStreamImpl<TestMessageEnvelope> input2 = new MessageStreamImpl<>();
+    StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
+    MessageStreamImpl<TestMessageEnvelope> input1 = TestMessageStreamImplUtil.getMessageStreamImpl(mockGraph);
+    MessageStreamImpl<TestMessageEnvelope> input2 = TestMessageStreamImplUtil.getMessageStreamImpl(mockGraph);
     TaskContext mockContext = mock(TaskContext.class);
+    Config mockConfig = mock(Config.class);
     input1
-        .join(input2, (m1, m2) ->
-            new TestOutputMessageEnvelope(m1.getKey(), m1.getMessage().getValue().length() + m2.getMessage().getValue().length()))
+        .join(input2,
+            (KeyValueJoinFunction<String, TestMessageEnvelope, TestMessageEnvelope, TestOutputMessageEnvelope>) (m1, m2) ->
+                new TestOutputMessageEnvelope(m1.getKey(), m1.getMessage().getValue().length() + m2.getMessage().getValue().length())
+            )
         .map(m -> m);
+    OperatorGraph opGraph = new OperatorGraph();
     // now, we create chained operators from each input sources
-    RootOperatorImpl chain1 = OperatorImpls.createOperatorImpls(input1, mockContext);
-    RootOperatorImpl chain2 = OperatorImpls.createOperatorImpls(input2, mockContext);
+    RootOperatorImpl chain1 = (RootOperatorImpl) createOpsMethod.invoke(opGraph, input1, mockConfig, mockContext);
+    RootOperatorImpl chain2 = (RootOperatorImpl) createOpsMethod.invoke(opGraph, input2, mockConfig, mockContext);
     // check that those two chains will merge at map operator
     // first branch of the join
     Set<OperatorImpl> subsSet = (Set<OperatorImpl>) nextOperatorsField.get(chain1);

http://git-wip-us.apache.org/repos/asf/samza/blob/b3dd886d/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java
index ba5b6f8..ce9fdd2 100644
--- a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java
+++ b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java
@@ -18,17 +18,16 @@
  */
 package org.apache.samza.operators.impl;
 
+import org.apache.samza.config.Config;
 import org.apache.samza.operators.TestOutputMessageEnvelope;
 import org.apache.samza.operators.functions.SinkFunction;
 import org.apache.samza.operators.spec.SinkOperatorSpec;
 import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskContext;
 import org.apache.samza.task.TaskCoordinator;
 import org.junit.Test;
 
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.*;
 
 
 public class TestSinkOperatorImpl {
@@ -38,7 +37,9 @@ public class TestSinkOperatorImpl {
     SinkOperatorSpec<TestOutputMessageEnvelope> sinkOp = mock(SinkOperatorSpec.class);
     SinkFunction<TestOutputMessageEnvelope> sinkFn = mock(SinkFunction.class);
     when(sinkOp.getSinkFn()).thenReturn(sinkFn);
-    SinkOperatorImpl<TestOutputMessageEnvelope> sinkImpl = new SinkOperatorImpl<>(sinkOp);
+    Config mockConfig = mock(Config.class);
+    TaskContext mockContext = mock(TaskContext.class);
+    SinkOperatorImpl<TestOutputMessageEnvelope> sinkImpl = new SinkOperatorImpl<>(sinkOp, mockConfig, mockContext);
     TestOutputMessageEnvelope mockMsg = mock(TestOutputMessageEnvelope.class);
     MessageCollector mockCollector = mock(MessageCollector.class);
     TaskCoordinator mockCoordinator = mock(TaskCoordinator.class);

http://git-wip-us.apache.org/repos/asf/samza/blob/b3dd886d/samza-operator/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java
index 5a3840c..010a210 100644
--- a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java
+++ b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java
@@ -18,22 +18,20 @@
  */
 package org.apache.samza.operators.impl;
 
+import java.util.ArrayList;
+import java.util.Collection;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.MessageStreamImpl;
 import org.apache.samza.operators.TestMessageEnvelope;
 import org.apache.samza.operators.TestOutputMessageEnvelope;
 import org.apache.samza.operators.functions.FlatMapFunction;
 import org.apache.samza.operators.spec.StreamOperatorSpec;
 import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskContext;
 import org.apache.samza.task.TaskCoordinator;
 import org.junit.Test;
 
-import java.util.ArrayList;
-import java.util.Collection;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.*;
 
 
 public class TestStreamOperatorImpl {
@@ -43,8 +41,10 @@ public class TestStreamOperatorImpl {
     StreamOperatorSpec<TestMessageEnvelope, TestOutputMessageEnvelope> mockOp = mock(StreamOperatorSpec.class);
     FlatMapFunction<TestMessageEnvelope, TestOutputMessageEnvelope> txfmFn = mock(FlatMapFunction.class);
     when(mockOp.getTransformFn()).thenReturn(txfmFn);
-
-    StreamOperatorImpl<TestMessageEnvelope, TestOutputMessageEnvelope> opImpl = spy(new StreamOperatorImpl<>(mockOp));
+    MessageStreamImpl<TestMessageEnvelope> mockInput = mock(MessageStreamImpl.class);
+    Config mockConfig = mock(Config.class);
+    TaskContext mockContext = mock(TaskContext.class);
+    StreamOperatorImpl<TestMessageEnvelope, TestOutputMessageEnvelope> opImpl = spy(new StreamOperatorImpl<>(mockOp, mockInput, mockConfig, mockContext));
     TestMessageEnvelope inMsg = mock(TestMessageEnvelope.class);
     TestOutputMessageEnvelope outMsg = mock(TestOutputMessageEnvelope.class);
     Collection<TestOutputMessageEnvelope> mockOutputs = new ArrayList() { {

http://git-wip-us.apache.org/repos/asf/samza/blob/b3dd886d/samza-operator/src/test/java/org/apache/samza/operators/spec/TestOperatorSpecs.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/operators/spec/TestOperatorSpecs.java b/samza-operator/src/test/java/org/apache/samza/operators/spec/TestOperatorSpecs.java
index ffe9df1..31257a4 100644
--- a/samza-operator/src/test/java/org/apache/samza/operators/spec/TestOperatorSpecs.java
+++ b/samza-operator/src/test/java/org/apache/samza/operators/spec/TestOperatorSpecs.java
@@ -18,14 +18,18 @@
  */
 package org.apache.samza.operators.spec;
 
+import org.apache.samza.operators.MessageStreamImpl;
+import org.apache.samza.operators.StreamGraphImpl;
 import org.apache.samza.operators.TestMessageEnvelope;
+import org.apache.samza.operators.TestMessageStreamImplUtil;
 import org.apache.samza.operators.data.MessageEnvelope;
 import org.apache.samza.operators.functions.FlatMapFunction;
+import org.apache.samza.operators.functions.PartialJoinFunction;
 import org.apache.samza.operators.functions.SinkFunction;
-import org.apache.samza.operators.MessageStreamImpl;
 import org.apache.samza.operators.windows.internal.WindowInternal;
-import org.apache.samza.operators.windows.WindowKey;
 import org.apache.samza.operators.windows.WindowPane;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskCoordinator;
 import org.junit.Test;
 
 import java.util.ArrayList;
@@ -42,19 +46,23 @@ public class TestOperatorSpecs {
   @Test
   public void testGetStreamOperator() {
     FlatMapFunction<MessageEnvelope, TestMessageEnvelope> transformFn = m -> new ArrayList<TestMessageEnvelope>() { {
-        this.add(new TestMessageEnvelope(m.getKey().toString(), m.getMessage().toString(), 12345L));
-      } };
-    StreamOperatorSpec<MessageEnvelope, TestMessageEnvelope> strmOp = OperatorSpecs.createStreamOperatorSpec(transformFn);
+          this.add(new TestMessageEnvelope(m.getKey().toString(), m.getMessage().toString(), 12345L));
+        } };
+    MessageStreamImpl<TestMessageEnvelope> mockOutput = mock(MessageStreamImpl.class);
+    StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
+    StreamOperatorSpec<MessageEnvelope, TestMessageEnvelope> strmOp = OperatorSpecs.createStreamOperatorSpec(transformFn, mockGraph, mockOutput);
     assertEquals(strmOp.getTransformFn(), transformFn);
-    assertTrue(strmOp.getOutputStream() instanceof MessageStreamImpl);
+    assertEquals(strmOp.getNextStream(), mockOutput);
   }
 
   @Test
   public void testGetSinkOperator() {
-    SinkFunction<TestMessageEnvelope> sinkFn = (m, c, t) -> { };
-    SinkOperatorSpec<TestMessageEnvelope> sinkOp = OperatorSpecs.createSinkOperatorSpec(sinkFn);
+    SinkFunction<TestMessageEnvelope> sinkFn = (TestMessageEnvelope message, MessageCollector messageCollector,
+          TaskCoordinator taskCoordinator) -> { };
+    StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
+    SinkOperatorSpec<TestMessageEnvelope> sinkOp = OperatorSpecs.createSinkOperatorSpec(sinkFn, mockGraph);
     assertEquals(sinkOp.getSinkFn(), sinkFn);
-    assertTrue(sinkOp.getOutputStream() == null);
+    assertTrue(sinkOp.getNextStream() == null);
   }
 
   @Test
@@ -65,8 +73,9 @@ public class TestOperatorSpecs {
     //instantiate a window using reflection
     WindowInternal window = new WindowInternal(null, aggregator, keyExtractor, null);
 
-    WindowOperatorSpec spec = OperatorSpecs.<TestMessageEnvelope, String, WindowKey<String>, Integer,
-        WindowPane<WindowKey<String>, Integer>>createWindowOperatorSpec(window);
+    StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
+    MessageStreamImpl<WindowPane<String, Integer>> mockWndOut = mock(MessageStreamImpl.class);
+    WindowOperatorSpec spec = OperatorSpecs.<TestMessageEnvelope, String, Integer>createWindowOperatorSpec(window, mockGraph, mockWndOut);
     assertEquals(spec.getWindow(), window);
     assertEquals(spec.getWindow().getKeyExtractor(), keyExtractor);
     assertEquals(spec.getWindow().getFoldFunction(), aggregator);
@@ -74,13 +83,30 @@ public class TestOperatorSpecs {
 
   @Test
   public void testGetPartialJoinOperator() {
-    BiFunction<MessageEnvelope<Object, ?>, MessageEnvelope<Object, ?>, TestMessageEnvelope> merger =
-        (m1, m2) -> new TestMessageEnvelope(m1.getKey().toString(), m2.getMessage().toString(), System.nanoTime());
-    MessageStreamImpl<TestMessageEnvelope> joinOutput = new MessageStreamImpl<>();
+    PartialJoinFunction<Object, MessageEnvelope<Object, ?>, MessageEnvelope<Object, ?>, TestMessageEnvelope> merger =
+      new PartialJoinFunction<Object, MessageEnvelope<Object, ?>, MessageEnvelope<Object, ?>, TestMessageEnvelope>() {
+        @Override
+        public TestMessageEnvelope apply(MessageEnvelope<Object, ?> m1, MessageEnvelope<Object, ?> m2) {
+          return new TestMessageEnvelope(m1.getKey().toString(), m2.getMessage().toString(), System.nanoTime());
+        }
+
+        @Override
+        public Object getKey(MessageEnvelope<Object, ?> message) {
+          return message.getKey();
+        }
+
+        @Override
+        public Object getOtherKey(MessageEnvelope<Object, ?> message) {
+          return message.getKey();
+        }
+      };
+
+    StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
+    MessageStreamImpl<TestMessageEnvelope> joinOutput = TestMessageStreamImplUtil.<TestMessageEnvelope>getMessageStreamImpl(mockGraph);
     PartialJoinOperatorSpec<MessageEnvelope<Object, ?>, Object, MessageEnvelope<Object, ?>, TestMessageEnvelope> partialJoin =
-        OperatorSpecs.createPartialJoinOperatorSpec(merger, joinOutput);
+        OperatorSpecs.createPartialJoinOperatorSpec(merger, mockGraph, joinOutput);
 
-    assertEquals(partialJoin.getOutputStream(), joinOutput);
+    assertEquals(partialJoin.getNextStream(), joinOutput);
     MessageEnvelope<Object, Object> m = mock(MessageEnvelope.class);
     MessageEnvelope<Object, Object> s = mock(MessageEnvelope.class);
     assertEquals(partialJoin.getTransformFn(), merger);
@@ -88,13 +114,14 @@ public class TestOperatorSpecs {
 
   @Test
   public void testGetMergeOperator() {
-    MessageStreamImpl<TestMessageEnvelope> output = new MessageStreamImpl<>();
-    StreamOperatorSpec<TestMessageEnvelope, TestMessageEnvelope> mergeOp = OperatorSpecs.createMergeOperatorSpec(output);
+    StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
+    MessageStreamImpl<TestMessageEnvelope> output = TestMessageStreamImplUtil.<TestMessageEnvelope>getMessageStreamImpl(mockGraph);
+    StreamOperatorSpec<TestMessageEnvelope, TestMessageEnvelope> mergeOp = OperatorSpecs.createMergeOperatorSpec(mockGraph, output);
     Function<TestMessageEnvelope, Collection<TestMessageEnvelope>> mergeFn = t -> new ArrayList<TestMessageEnvelope>() { {
         this.add(t);
       } };
     TestMessageEnvelope t = mock(TestMessageEnvelope.class);
     assertEquals(mergeOp.getTransformFn().apply(t), mergeFn.apply(t));
-    assertEquals(mergeOp.getOutputStream(), output);
+    assertEquals(mergeOp.getNextStream(), output);
   }
 }