You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2017/04/05 23:42:29 UTC

[1/4] samza git commit: SAMZA-1094 SAMZA-1101 SAMZA-1159; Remove MessageEnvelope from public operator APIs. : Delay the creation of SinkFunction for output streams. : Move StreamSpec from a public API to an internal class.

Repository: samza
Updated Branches:
  refs/heads/master 65af13df1 -> 4bf8ab6eb


http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/test/java/org/apache/samza/operators/triggers/TestWindowOperator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/triggers/TestWindowOperator.java b/samza-core/src/test/java/org/apache/samza/operators/triggers/TestWindowOperator.java
deleted file mode 100644
index 0d720dd..0000000
--- a/samza-core/src/test/java/org/apache/samza/operators/triggers/TestWindowOperator.java
+++ /dev/null
@@ -1,389 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.operators.triggers;
-
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import junit.framework.Assert;
-import org.apache.samza.Partition;
-import org.apache.samza.application.StreamApplication;
-import org.apache.samza.config.Config;
-import org.apache.samza.operators.MessageStream;
-import org.apache.samza.operators.StreamGraph;
-import org.apache.samza.operators.data.MessageEnvelope;
-import org.apache.samza.operators.windows.AccumulationMode;
-import org.apache.samza.operators.windows.WindowPane;
-import org.apache.samza.operators.windows.Windows;
-import org.apache.samza.runtime.ApplicationRunner;
-import org.apache.samza.system.IncomingMessageEnvelope;
-import org.apache.samza.system.StreamSpec;
-import org.apache.samza.system.SystemStreamPartition;
-import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.StreamOperatorTask;
-import org.apache.samza.task.TaskContext;
-import org.apache.samza.task.TaskCoordinator;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.function.Function;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-public class TestWindowOperator {
-  private final MessageCollector messageCollector = mock(MessageCollector.class);
-  private final TaskCoordinator taskCoordinator = mock(TaskCoordinator.class);
-  private final List<WindowPane<Integer, Collection<MessageEnvelope<Integer, Integer>>>> windowPanes = new ArrayList<>();
-  private final List<Integer> integers = ImmutableList.of(1, 2, 1, 2, 1, 2, 1, 2, 3);
-  private Config config;
-  private TaskContext taskContext;
-  private ApplicationRunner runner;
-
-  @Before
-  public void setup() throws Exception {
-    windowPanes.clear();
-
-    config = mock(Config.class);
-    taskContext = mock(TaskContext.class);
-    runner = mock(ApplicationRunner.class);
-    when(taskContext.getSystemStreamPartitions()).thenReturn(ImmutableSet
-        .of(new SystemStreamPartition("kafka", "integers", new Partition(0))));
-
-  }
-
-  @Test
-  public void testTumblingWindowsDiscardingMode() throws Exception {
-
-    StreamApplication sgb = new KeyedTumblingWindowStreamApplication(AccumulationMode.DISCARDING, Duration.ofSeconds(1), Triggers.repeat(Triggers.count(2)));
-    TestClock testClock = new TestClock();
-    StreamOperatorTask task = new StreamOperatorTask(sgb, testClock, runner);
-    task.init(config, taskContext);
-
-    integers.forEach(n -> task.process(new IntegerMessageEnvelope(n, n), messageCollector, taskCoordinator));
-    testClock.advanceTime(Duration.ofSeconds(1));
-
-    task.window(messageCollector, taskCoordinator);
-    Assert.assertEquals(windowPanes.size(), 5);
-    Assert.assertEquals(windowPanes.get(0).getKey().getKey(), new Integer(1));
-    Assert.assertEquals(((Collection<MessageEnvelope<Integer, Integer>>) windowPanes.get(0).getMessage()).size(), 2);
-
-    Assert.assertEquals(windowPanes.get(1).getKey().getKey(), new Integer(2));
-    Assert.assertEquals(((Collection<MessageEnvelope<Integer, Integer>>) windowPanes.get(1).getMessage()).size(), 2);
-
-    Assert.assertEquals(windowPanes.get(2).getKey().getKey(), new Integer(1));
-    Assert.assertEquals(((Collection<MessageEnvelope<Integer, Integer>>) windowPanes.get(2).getMessage()).size(), 2);
-
-    Assert.assertEquals(windowPanes.get(3).getKey().getKey(), new Integer(2));
-    Assert.assertEquals(((Collection<MessageEnvelope<Integer, Integer>>) windowPanes.get(3).getMessage()).size(), 2);
-
-    Assert.assertEquals(windowPanes.get(4).getKey().getKey(), new Integer(3));
-    Assert.assertEquals(((Collection<MessageEnvelope<Integer, Integer>>) windowPanes.get(4).getMessage()).size(), 1);
-  }
-
-  @Test
-  public void testTumblingWindowsAccumulatingMode() throws Exception {
-    StreamApplication sgb = new KeyedTumblingWindowStreamApplication(AccumulationMode.ACCUMULATING, Duration.ofSeconds(1), Triggers.repeat(Triggers.count(2)));
-    TestClock testClock = new TestClock();
-    StreamOperatorTask task = new StreamOperatorTask(sgb, testClock, runner);
-    task.init(config, taskContext);
-
-    integers.forEach(n -> task.process(new IntegerMessageEnvelope(n, n), messageCollector, taskCoordinator));
-    testClock.advanceTime(Duration.ofSeconds(1));
-    task.window(messageCollector, taskCoordinator);
-
-    Assert.assertEquals(windowPanes.size(), 7);
-    Assert.assertEquals(windowPanes.get(0).getKey().getKey(), new Integer(1));
-    Assert.assertEquals(((Collection<MessageEnvelope<Integer, Integer>>) windowPanes.get(0).getMessage()).size(), 2);
-
-    Assert.assertEquals(windowPanes.get(1).getKey().getKey(), new Integer(2));
-    Assert.assertEquals(((Collection<MessageEnvelope<Integer, Integer>>) windowPanes.get(1).getMessage()).size(), 2);
-
-    Assert.assertEquals(windowPanes.get(2).getKey().getKey(), new Integer(1));
-    Assert.assertEquals(((Collection<MessageEnvelope<Integer, Integer>>) windowPanes.get(2).getMessage()).size(), 4);
-
-    Assert.assertEquals(windowPanes.get(3).getKey().getKey(), new Integer(2));
-    Assert.assertEquals(((Collection<MessageEnvelope<Integer, Integer>>) windowPanes.get(3).getMessage()).size(), 4);
-  }
-
-  @Test
-  public void testSessionWindowsDiscardingMode() throws Exception {
-    StreamApplication sgb = new KeyedSessionWindowStreamApplication(AccumulationMode.DISCARDING, Duration.ofMillis(500));
-    TestClock testClock = new TestClock();
-    StreamOperatorTask task = new StreamOperatorTask(sgb, testClock, runner);
-    task.init(config, taskContext);
-
-    task.process(new IntegerMessageEnvelope(1, 1), messageCollector, taskCoordinator);
-    task.process(new IntegerMessageEnvelope(1, 1), messageCollector, taskCoordinator);
-    testClock.advanceTime(Duration.ofSeconds(1));
-    task.window(messageCollector, taskCoordinator);
-
-    Assert.assertEquals(windowPanes.size(), 1);
-    Assert.assertEquals(windowPanes.get(0).getKey().getPaneId(), "1");
-    Assert.assertEquals(windowPanes.get(0).getKey().getKey(), new Integer(1));
-
-    task.process(new IntegerMessageEnvelope(2, 2), messageCollector, taskCoordinator);
-    task.process(new IntegerMessageEnvelope(2, 2), messageCollector, taskCoordinator);
-    task.process(new IntegerMessageEnvelope(3, 3), messageCollector, taskCoordinator);
-    task.process(new IntegerMessageEnvelope(3, 3), messageCollector, taskCoordinator);
-
-    testClock.advanceTime(Duration.ofSeconds(1));
-    task.window(messageCollector, taskCoordinator);
-
-    Assert.assertEquals(windowPanes.size(), 3);
-    Assert.assertEquals(windowPanes.get(0).getKey().getPaneId(), "1");
-    Assert.assertEquals(windowPanes.get(1).getKey().getPaneId(), "1001");
-    Assert.assertEquals(windowPanes.get(2).getKey().getPaneId(), "1001");
-    Assert.assertEquals(((Collection<MessageEnvelope<Integer, Integer>>) windowPanes.get(0).getMessage()).size(), 2);
-    Assert.assertEquals(((Collection<MessageEnvelope<Integer, Integer>>) windowPanes.get(1).getMessage()).size(), 2);
-    Assert.assertEquals(((Collection<MessageEnvelope<Integer, Integer>>) windowPanes.get(2).getMessage()).size(), 2);
-
-    task.process(new IntegerMessageEnvelope(2, 2), messageCollector, taskCoordinator);
-    task.process(new IntegerMessageEnvelope(2, 2), messageCollector, taskCoordinator);
-
-    testClock.advanceTime(Duration.ofSeconds(1));
-    task.window(messageCollector, taskCoordinator);
-    Assert.assertEquals(windowPanes.size(), 4);
-    Assert.assertEquals(windowPanes.get(3).getKey().getKey(), new Integer(2));
-    Assert.assertEquals(windowPanes.get(3).getKey().getPaneId(), "2001");
-    Assert.assertEquals(((Collection<MessageEnvelope<Integer, Integer>>) windowPanes.get(3).getMessage()).size(), 2);
-
-  }
-
-  @Test
-  public void testSessionWindowsAccumulatingMode() throws Exception {
-    StreamApplication sgb = new KeyedSessionWindowStreamApplication(AccumulationMode.DISCARDING, Duration.ofMillis(500));
-    TestClock testClock = new TestClock();
-    StreamOperatorTask task = new StreamOperatorTask(sgb, testClock, runner);
-    task.init(config, taskContext);
-
-    task.process(new IntegerMessageEnvelope(1, 1), messageCollector, taskCoordinator);
-    task.process(new IntegerMessageEnvelope(1, 1), messageCollector, taskCoordinator);
-    testClock.advanceTime(Duration.ofSeconds(1));
-
-    task.process(new IntegerMessageEnvelope(2, 2), messageCollector, taskCoordinator);
-    task.process(new IntegerMessageEnvelope(2, 2), messageCollector, taskCoordinator);
-
-    task.process(new IntegerMessageEnvelope(2, 2), messageCollector, taskCoordinator);
-    task.process(new IntegerMessageEnvelope(2, 2), messageCollector, taskCoordinator);
-
-    testClock.advanceTime(Duration.ofSeconds(1));
-    task.window(messageCollector, taskCoordinator);
-    Assert.assertEquals(windowPanes.size(), 2);
-    Assert.assertEquals(((Collection<MessageEnvelope<Integer, Integer>>) windowPanes.get(0).getMessage()).size(), 2);
-    Assert.assertEquals(windowPanes.get(0).getKey().getKey(), new Integer(1));
-    Assert.assertEquals(windowPanes.get(1).getKey().getKey(), new Integer(2));
-    Assert.assertEquals(((Collection<MessageEnvelope<Integer, Integer>>) windowPanes.get(0).getMessage()).size(), 2);
-    Assert.assertEquals(((Collection<MessageEnvelope<Integer, Integer>>) windowPanes.get(1).getMessage()).size(), 4);
-  }
-
-  @Test
-  public void testCancelationOfOnceTrigger() throws Exception {
-    StreamApplication sgb = new KeyedTumblingWindowStreamApplication(AccumulationMode.ACCUMULATING, Duration.ofSeconds(1), Triggers.count(2));
-    TestClock testClock = new TestClock();
-    StreamOperatorTask task = new StreamOperatorTask(sgb, testClock, runner);
-    task.init(config, taskContext);
-
-    task.process(new IntegerMessageEnvelope(1, 1), messageCollector, taskCoordinator);
-    task.process(new IntegerMessageEnvelope(1, 2), messageCollector, taskCoordinator);
-    Assert.assertEquals(windowPanes.size(), 1);
-    Assert.assertEquals(windowPanes.get(0).getKey().getPaneId(), "0");
-    Assert.assertEquals(windowPanes.get(0).getKey().getKey(), new Integer(1));
-    Assert.assertEquals(windowPanes.get(0).getFiringType(), FiringType.EARLY);
-
-    task.process(new IntegerMessageEnvelope(1, 3), messageCollector, taskCoordinator);
-    task.process(new IntegerMessageEnvelope(1, 4), messageCollector, taskCoordinator);
-    task.process(new IntegerMessageEnvelope(1, 5), messageCollector, taskCoordinator);
-
-    Assert.assertEquals(windowPanes.size(), 1);
-
-    testClock.advanceTime(Duration.ofSeconds(1));
-    task.window(messageCollector, taskCoordinator);
-
-    Assert.assertEquals(windowPanes.size(), 2);
-    Assert.assertEquals(windowPanes.get(0).getKey().getPaneId(), "0");
-    Assert.assertEquals(windowPanes.get(1).getKey().getPaneId(), "0");
-    Assert.assertEquals(windowPanes.get(1).getFiringType(), FiringType.DEFAULT);
-
-    task.process(new IntegerMessageEnvelope(3, 6), messageCollector, taskCoordinator);
-    testClock.advanceTime(Duration.ofSeconds(1));
-    task.window(messageCollector, taskCoordinator);
-
-    Assert.assertEquals(windowPanes.size(), 3);
-    Assert.assertEquals(windowPanes.get(2).getKey().getKey(), new Integer(3));
-    Assert.assertEquals(windowPanes.get(2).getKey().getPaneId(), "1000");
-    Assert.assertEquals(windowPanes.get(2).getFiringType(), FiringType.DEFAULT);
-    Assert.assertEquals(((Collection<MessageEnvelope<Integer, Integer>>) windowPanes.get(2).getMessage()).size(), 1);
-
-  }
-
-  @Test
-  public void testCancelationOfAnyTrigger() throws Exception {
-    StreamApplication sgb = new KeyedTumblingWindowStreamApplication(AccumulationMode.ACCUMULATING, Duration.ofSeconds(1),
-        Triggers.any(Triggers.count(2), Triggers.timeSinceFirstMessage(Duration.ofMillis(500))));
-    TestClock testClock = new TestClock();
-    StreamOperatorTask task = new StreamOperatorTask(sgb, testClock, runner);
-    task.init(config, taskContext);
-
-    task.process(new IntegerMessageEnvelope(1, 1), messageCollector, taskCoordinator);
-    task.process(new IntegerMessageEnvelope(1, 2), messageCollector, taskCoordinator);
-    //assert that the count trigger fired
-    Assert.assertEquals(windowPanes.size(), 1);
-
-    //advance the timer to enable the triggering of the inner timeSinceFirstMessage trigger
-    testClock.advanceTime(Duration.ofMillis(500));
-
-    //assert that the triggering of the count trigger cancelled the inner timeSinceFirstMessage trigger
-    Assert.assertEquals(windowPanes.size(), 1);
-
-    task.process(new IntegerMessageEnvelope(1, 3), messageCollector, taskCoordinator);
-    task.process(new IntegerMessageEnvelope(1, 4), messageCollector, taskCoordinator);
-    task.process(new IntegerMessageEnvelope(1, 5), messageCollector, taskCoordinator);
-
-    //advance timer by 500 more millis to enable the default trigger
-    testClock.advanceTime(Duration.ofMillis(500));
-    task.window(messageCollector, taskCoordinator);
-
-    //assert that the default trigger fired
-    Assert.assertEquals(windowPanes.size(), 2);
-    Assert.assertEquals(windowPanes.get(1).getFiringType(), FiringType.DEFAULT);
-    Assert.assertEquals(windowPanes.get(1).getKey().getKey(), new Integer(1));
-    Assert.assertEquals(windowPanes.get(1).getKey().getPaneId(), "0");
-    Assert.assertEquals(((Collection<MessageEnvelope<Integer, Integer>>) windowPanes.get(1).getMessage()).size(), 5);
-
-    task.process(new IntegerMessageEnvelope(1, 5), messageCollector, taskCoordinator);
-
-    //advance timer by 500 millis to enable the inner timeSinceFirstMessage trigger
-    testClock.advanceTime(Duration.ofMillis(500));
-    task.window(messageCollector, taskCoordinator);
-
-    Assert.assertEquals(windowPanes.size(), 3);
-    Assert.assertEquals(windowPanes.get(2).getFiringType(), FiringType.EARLY);
-    Assert.assertEquals(windowPanes.get(2).getKey().getKey(), new Integer(1));
-    Assert.assertEquals(windowPanes.get(2).getKey().getPaneId(), "1000");
-
-    //advance timer by > 500 millis to enable the default trigger
-    testClock.advanceTime(Duration.ofMillis(900));
-    task.window(messageCollector, taskCoordinator);
-    Assert.assertEquals(windowPanes.size(), 4);
-    Assert.assertEquals(windowPanes.get(3).getFiringType(), FiringType.DEFAULT);
-    Assert.assertEquals(windowPanes.get(3).getKey().getKey(), new Integer(1));
-    Assert.assertEquals(windowPanes.get(3).getKey().getPaneId(), "1000");
-  }
-
-  @Test
-  public void testCancelationOfRepeatingNestedTriggers() throws Exception {
-
-    StreamApplication sgb = new KeyedTumblingWindowStreamApplication(AccumulationMode.ACCUMULATING, Duration.ofSeconds(1),
-        Triggers.repeat(Triggers.any(Triggers.count(2), Triggers.timeSinceFirstMessage(Duration.ofMillis(500)))));
-    TestClock testClock = new TestClock();
-    StreamOperatorTask task = new StreamOperatorTask(sgb, testClock, runner);
-    task.init(config, taskContext);
-
-    task.process(new IntegerMessageEnvelope(1, 1), messageCollector, taskCoordinator);
-
-    task.process(new IntegerMessageEnvelope(1, 2), messageCollector, taskCoordinator);
-    //assert that the count trigger fired
-    Assert.assertEquals(windowPanes.size(), 1);
-
-    //advance the timer to enable the potential triggering of the inner timeSinceFirstMessage trigger
-    task.process(new IntegerMessageEnvelope(1, 3), messageCollector, taskCoordinator);
-    testClock.advanceTime(Duration.ofMillis(500));
-    //assert that the triggering of the count trigger cancelled the inner timeSinceFirstMessage trigger
-    task.window(messageCollector, taskCoordinator);
-    Assert.assertEquals(windowPanes.size(), 2);
-
-    task.process(new IntegerMessageEnvelope(1, 3), messageCollector, taskCoordinator);
-    task.process(new IntegerMessageEnvelope(1, 4), messageCollector, taskCoordinator);
-    Assert.assertEquals(windowPanes.size(), 3);
-
-    task.process(new IntegerMessageEnvelope(1, 5), messageCollector, taskCoordinator);
-    //advance timer by 500 more millis to enable the default trigger
-    testClock.advanceTime(Duration.ofMillis(500));
-    task.window(messageCollector, taskCoordinator);
-    //assert that the default trigger fired
-    Assert.assertEquals(windowPanes.size(), 4);
-  }
-
-  private class KeyedTumblingWindowStreamApplication implements StreamApplication {
-
-    private final StreamSpec streamSpec = new StreamSpec("integer-stream", "integers", "kafka");
-    private final AccumulationMode mode;
-    private final Duration duration;
-    private final Trigger<MessageEnvelope<Integer, Integer>> earlyTrigger;
-
-    KeyedTumblingWindowStreamApplication(AccumulationMode mode, Duration timeDuration, Trigger<MessageEnvelope<Integer, Integer>> earlyTrigger) {
-      this.mode = mode;
-      this.duration = timeDuration;
-      this.earlyTrigger = earlyTrigger;
-    }
-
-    @Override
-    public void init(StreamGraph graph, Config config) {
-      MessageStream<MessageEnvelope<Integer, Integer>> inStream = graph.createInStream(streamSpec, null, null);
-      Function<MessageEnvelope<Integer, Integer>, Integer> keyFn = m -> m.getKey();
-      inStream
-        .map(m -> m)
-        .window(Windows.keyedTumblingWindow(keyFn, duration).setEarlyTrigger(earlyTrigger)
-          .setAccumulationMode(mode))
-        .map(m -> {
-            windowPanes.add(m);
-            return m;
-          });
-    }
-  }
-
-  private class KeyedSessionWindowStreamApplication implements StreamApplication {
-
-    private final StreamSpec streamSpec = new StreamSpec("integer-stream", "integers", "kafka");
-    private final AccumulationMode mode;
-    private final Duration duration;
-
-    KeyedSessionWindowStreamApplication(AccumulationMode mode, Duration duration) {
-      this.mode = mode;
-      this.duration = duration;
-    }
-
-    @Override
-    public void init(StreamGraph graph, Config config) {
-      MessageStream<MessageEnvelope<Integer, Integer>> inStream = graph.createInStream(streamSpec, null, null);
-      Function<MessageEnvelope<Integer, Integer>, Integer> keyFn = m -> m.getKey();
-
-      inStream
-          .map(m -> m)
-          .window(Windows.keyedSessionWindow(keyFn, duration)
-              .setAccumulationMode(mode))
-          .map(m -> {
-              windowPanes.add(m);
-              return m;
-            });
-    }
-  }
-
-  private class IntegerMessageEnvelope extends IncomingMessageEnvelope {
-    IntegerMessageEnvelope(int key, int msg) {
-      super(new SystemStreamPartition("kafka", "integers", new Partition(0)), "1", key, msg);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/test/java/org/apache/samza/testUtils/TestClock.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/testUtils/TestClock.java b/samza-core/src/test/java/org/apache/samza/testUtils/TestClock.java
new file mode 100644
index 0000000..710ebda
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/testUtils/TestClock.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.testUtils;
+
+import org.apache.samza.util.Clock;
+
+import java.time.Duration;
+
+/**
+ * An implementation of {@link Clock} that allows to advance the time by an arbitrary duration.
+ * Used for testing.
+ */
+public class TestClock implements Clock {
+
+  long currentTime = 1;
+
+  public void advanceTime(Duration duration) {
+    currentTime += duration.toMillis();
+  }
+
+  public void advanceTime(long millis) {
+    currentTime += millis;
+  }
+
+  @Override
+  public long currentTimeMillis() {
+    return currentTime;
+  }
+}


[4/4] samza git commit: SAMZA-1094 SAMZA-1101 SAMZA-1159; Remove MessageEnvelope from public operator APIs. : Delay the creation of SinkFunction for output streams. : Move StreamSpec from a public API to an internal class.

Posted by ja...@apache.org.
SAMZA-1094 SAMZA-1101 SAMZA-1159; Remove MessageEnvelope from public operator APIs. : Delay the creation of SinkFunction for output streams. : Move StreamSpec from a public API to an internal class.

Removed the MessageEnvelope and OutputStream interfaces from public operator APIs.
Moved the creation of SinkFunction for output streams to SinkOperatorSpec.
Moved StreamSpec from a public API to an internal class.

Additionally,
1. Removed references to StreamGraph in OperatorSpecs. It was being used to getNextOpId(). MessageStreamsImpl now gets the ID and gives it to OperatorSpecs itself.
2. Updated and cleaned up the StreamGraphBuilder examples.
3. Renamed SinkOperatorSpec to OutputOperatorSpec since its used by sink, sendTo and partitionBy.

nickpan47 and xinyuiscool, please take a look.

Author: Prateek Maheshwari <pm...@linkedin.com>

Reviewers: Jagadish <ja...@apache.org>, Yi Pan <ni...@gmail.com>

Closes #92 from prateekm/message-envelope-removal


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/4bf8ab6e
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/4bf8ab6e
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/4bf8ab6e

Branch: refs/heads/master
Commit: 4bf8ab6ebdf95cdf78f07b81a3b450a7f3fd9d45
Parents: 65af13d
Author: Prateek Maheshwari <pm...@linkedin.com>
Authored: Wed Apr 5 16:42:15 2017 -0700
Committer: vjagadish1989 <jv...@linkedin.com>
Committed: Wed Apr 5 16:42:15 2017 -0700

----------------------------------------------------------------------
 .../apache/samza/operators/MessageStream.java   |  35 +-
 .../apache/samza/operators/OutputStream.java    |  18 +-
 .../org/apache/samza/operators/StreamGraph.java |  80 ++--
 .../operators/data/InputMessageEnvelope.java    |  63 ---
 .../apache/samza/operators/data/LongOffset.java |  80 ----
 .../samza/operators/data/MessageEnvelope.java   |  54 ---
 .../org/apache/samza/operators/data/Offset.java |  31 --
 .../samza/operators/functions/SinkFunction.java |   2 +-
 .../apache/samza/runtime/ApplicationRunner.java |   3 +-
 .../samza/operators/TestMessageEnvelope.java    |  61 ---
 .../operators/TestOutputMessageEnvelope.java    |  43 --
 .../data/TestIncomingSystemMessage.java         |  54 ---
 .../samza/operators/data/TestLongOffset.java    |  79 ----
 .../samza/execution/ExecutionPlanner.java       |  16 +-
 .../samza/operators/MessageStreamImpl.java      |  63 +--
 .../apache/samza/operators/StreamGraphImpl.java | 262 +++---------
 .../samza/operators/impl/OperatorGraph.java     | 185 ---------
 .../samza/operators/impl/OperatorImplGraph.java | 188 +++++++++
 .../samza/operators/impl/SinkOperatorImpl.java  |   2 +
 .../samza/operators/spec/OperatorSpec.java      |  19 +-
 .../samza/operators/spec/OperatorSpecs.java     | 113 ++---
 .../operators/spec/PartialJoinOperatorSpec.java |  13 +-
 .../samza/operators/spec/SinkOperatorSpec.java  |  95 +++--
 .../operators/spec/StreamOperatorSpec.java      |  30 +-
 .../operators/spec/WindowOperatorSpec.java      |  15 +-
 .../operators/stream/InputStreamInternal.java   |  39 ++
 .../stream/InputStreamInternalImpl.java         |  45 ++
 .../stream/IntermediateStreamInternalImpl.java  |  61 +++
 .../operators/stream/OutputStreamInternal.java  |  43 ++
 .../stream/OutputStreamInternalImpl.java        |  52 +++
 .../samza/runtime/RemoteApplicationRunner.java  |   3 +-
 .../apache/samza/task/StreamOperatorTask.java   | 145 +++----
 .../apache/samza/example/BroadcastExample.java  |  69 ++++
 .../samza/example/KeyValueStoreExample.java     |  80 +---
 .../samza/example/NoContextStreamExample.java   | 128 ------
 .../samza/example/OrderShipmentJoinExample.java | 112 ++---
 .../samza/example/PageViewCounterExample.java   |  62 +--
 .../samza/example/RepartitionExample.java       |  85 +---
 .../samza/example/TestBasicStreamGraphs.java    | 103 -----
 .../samza/example/TestBroadcastExample.java     | 107 -----
 .../apache/samza/example/TestExampleBase.java   |  46 ---
 .../apache/samza/example/TestJoinExample.java   | 116 ------
 .../apache/samza/example/TestWindowExample.java |  74 ----
 .../org/apache/samza/example/WindowExample.java |  78 ++++
 .../samza/execution/TestExecutionPlanner.java   | 125 +++---
 .../samza/operators/TestJoinOperator.java       |  31 +-
 .../samza/operators/TestMessageStreamImpl.java  |  34 +-
 .../samza/operators/TestWindowOperator.java     | 413 +++++++++++++++++++
 .../data/JsonIncomingSystemMessageEnvelope.java |  60 ---
 .../operators/data/TestMessageEnvelope.java     |  57 +++
 .../data/TestOutputMessageEnvelope.java         |  39 ++
 .../samza/operators/impl/TestOperatorImpl.java  |   4 +-
 .../samza/operators/impl/TestOperatorImpls.java |  35 +-
 .../operators/impl/TestSinkOperatorImpl.java    |   2 +-
 .../operators/impl/TestStreamOperatorImpl.java  |   4 +-
 .../samza/operators/spec/TestOperatorSpecs.java |  81 ++--
 .../samza/operators/triggers/TestClock.java     |  45 --
 .../operators/triggers/TestWindowOperator.java  | 389 -----------------
 .../org/apache/samza/testUtils/TestClock.java   |  45 ++
 59 files changed, 1762 insertions(+), 2654 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java b/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java
index 16c5976..345bff0 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java
@@ -75,22 +75,24 @@ public interface MessageStream<M> {
   MessageStream<M> filter(FilterFunction<M> filterFn);
 
   /**
-   * Allows sending messages in this {@link MessageStream} to an output using the provided {@link SinkFunction}.
+   * Allows sending messages in this {@link MessageStream} to an output system using the provided {@link SinkFunction}.
    *
-   * NOTE: the output may not be a {@link org.apache.samza.system.SystemStream}. It can be an external database, etc.
+   * NOTE: If the output is for a {@link org.apache.samza.system.SystemStream}, use
+   * {@link #sendTo(OutputStream)} instead. This transform should only be used to output to
+   * non-stream systems (e.g., an external database).
    *
-   * @param sinkFn  the function to send messages in this stream to output
+   * @param sinkFn the function to send messages in this stream to an external system
    */
   void sink(SinkFunction<M> sinkFn);
 
   /**
    * Allows sending messages in this {@link MessageStream} to an output {@link MessageStream}.
    *
-   * NOTE: the {@code stream} has to be a {@link MessageStream}.
-   *
-   * @param stream  the output {@link MessageStream}
+   * @param outputStream the output stream to send messages to
+   * @param <K> the type of key in the outgoing message
+   * @param <V> the type of message in the outgoing message
    */
-  void sendTo(OutputStream<M> stream);
+  <K, V> void sendTo(OutputStream<K, V, M> outputStream);
 
   /**
    * Groups the messages in this {@link MessageStream} according to the provided {@link Window} semantics
@@ -128,19 +130,20 @@ public interface MessageStream<M> {
    * <p>
    * The merging streams must have the same messages of type {@code M}.
    *
-   * @param otherStreams  other {@link MessageStream}s to be merged with this {@link MessageStream}
-   * @return  the merged {@link MessageStream}
+   * @param otherStreams other {@link MessageStream}s to be merged with this {@link MessageStream}
+   * @return the merged {@link MessageStream}
    */
   MessageStream<M> merge(Collection<MessageStream<M>> otherStreams);
 
   /**
-   * Send the input message to an output {@link org.apache.samza.system.SystemStream} and consume it as input {@link MessageStream} again.
-   *
-   * Note: this is an transform function only used in logic DAG. In a physical DAG, this is either translated to a NOOP function, or a {@code MessageStream#sendThrough} function.
+   * Sends the messages of type {@code M}in this {@link MessageStream} to a repartitioned output stream and consumes
+   * them as an input {@link MessageStream} again. Uses keys returned by the {@code keyExtractor} as the partition key.
    *
-   * @param parKeyExtractor  a {@link Function} that extract the partition key from a message in this {@link MessageStream}
-   * @param <K>  the type of partition key
-   * @return  a {@link MessageStream} object after the re-partition
+   * @param keyExtractor the {@link Function} to extract the output message key and partition key from
+   *                     the input message
+   * @param <K> the type of output message key and partition key
+   * @return the repartitioned {@link MessageStream}
    */
-  <K> MessageStream<M> partitionBy(Function<M, K> parKeyExtractor);
+  <K> MessageStream<M> partitionBy(Function<M, K> keyExtractor);
+
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-api/src/main/java/org/apache/samza/operators/OutputStream.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/OutputStream.java b/samza-api/src/main/java/org/apache/samza/operators/OutputStream.java
index 179f0e7..7335d56 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/OutputStream.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/OutputStream.java
@@ -19,23 +19,15 @@
 package org.apache.samza.operators;
 
 import org.apache.samza.annotation.InterfaceStability;
-import org.apache.samza.operators.functions.SinkFunction;
-
 
 /**
- * The interface class defining the specific {@link SinkFunction} for a system {@link OutputStream}.
+ * An output stream to send messages to.
  *
- * @param <M>  The type of message to be send to this output stream
+ * @param <K> the type of key in the outgoing message
+ * @param <V> the type of message in the outgoing message
+ * @param <M> the type of message in this {@link OutputStream}
  */
 @InterfaceStability.Unstable
-public interface OutputStream<M> {
+public interface OutputStream<K, V, M> {
 
-  /**
-   * Returns the specific {@link SinkFunction} for this output stream. The {@link OutputStream} is created
-   * via {@link StreamGraph#createOutStream(StreamSpec, Serde, Serde)} or {@link StreamGraph#createIntStream(StreamSpec, Serde, Serde)}.
-   * Hence, the proper types of serdes for key and value are instantiated and are used in the {@link SinkFunction} returned.
-   *
-   * @return  The pre-defined {@link SinkFunction} to apply proper serdes before sending the message to the output stream.
-   */
-  SinkFunction<M> getSinkFunction();
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-api/src/main/java/org/apache/samza/operators/StreamGraph.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/StreamGraph.java b/samza-api/src/main/java/org/apache/samza/operators/StreamGraph.java
index 30c4576..ff1c580 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/StreamGraph.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/StreamGraph.java
@@ -19,76 +19,52 @@
 package org.apache.samza.operators;
 
 import org.apache.samza.annotation.InterfaceStability;
-import org.apache.samza.operators.data.MessageEnvelope;
-import org.apache.samza.serializers.Serde;
-import org.apache.samza.system.StreamSpec;
 
-import java.util.Map;
+import java.util.function.BiFunction;
+import java.util.function.Function;
 
 /**
- * Job-level programming interface to create an operator DAG and run in various different runtime environments.
+ * Provides APIs for accessing�{@link MessageStream}s to be used to create the DAG of transforms.
  */
 @InterfaceStability.Unstable
 public interface StreamGraph {
-  /**
-   * Method to add an input {@link MessageStream} from the system
-   *
-   * @param streamSpec  the {@link StreamSpec} describing the physical characteristics of the input {@link MessageStream}
-   * @param keySerde  the serde used to serialize/deserialize the message key from the input {@link MessageStream}
-   * @param msgSerde  the serde used to serialize/deserialize the message body from the input {@link MessageStream}
-   * @param <K>  the type of key in the input message
-   * @param <V>  the type of message in the input message
-   * @param <M>  the type of {@link MessageEnvelope} in the input {@link MessageStream}
-   * @return   the input {@link MessageStream} object
-   */
-  <K, V, M extends MessageEnvelope<K, V>> MessageStream<M> createInStream(StreamSpec streamSpec, Serde<K> keySerde, Serde<V> msgSerde);
-
-  /**
-   * Method to add an output {@link MessageStream} from the system
-   *
-   * @param streamSpec  the {@link StreamSpec} describing the physical characteristics of the output {@link MessageStream}
-   * @param keySerde  the serde used to serialize/deserialize the message key from the output {@link MessageStream}
-   * @param msgSerde  the serde used to serialize/deserialize the message body from the output {@link MessageStream}
-   * @param <K>  the type of key in the output message
-   * @param <V>  the type of message in the output message
-   * @param <M>  the type of {@link MessageEnvelope} in the output {@link MessageStream}
-   * @return   the output {@link MessageStream} object
-   */
-  <K, V, M extends MessageEnvelope<K, V>> OutputStream<M> createOutStream(StreamSpec streamSpec, Serde<K> keySerde, Serde<V> msgSerde);
-
-  /**
-   * Method to add an intermediate {@link MessageStream} from the system
-   *
-   * @param streamSpec  the {@link StreamSpec} describing the physical characteristics of the intermediate {@link MessageStream}
-   * @param keySerde  the serde used to serialize/deserialize the message key from the intermediate {@link MessageStream}
-   * @param msgSerde  the serde used to serialize/deserialize the message body from the intermediate {@link MessageStream}
-   * @param <K>  the type of key in the intermediate message
-   * @param <V>  the type of message in the intermediate message
-   * @param <M>  the type of {@link MessageEnvelope} in the intermediate {@link MessageStream}
-   * @return   the intermediate {@link MessageStream} object
-   */
-  <K, V, M extends MessageEnvelope<K, V>> OutputStream<M> createIntStream(StreamSpec streamSpec, Serde<K> keySerde, Serde<V> msgSerde);
 
   /**
-   * Method to get the input {@link MessageStream}s
+   * Gets the input {@link MessageStream} corresponding to the logical {@code streamId}.
    *
+   * @param streamId the unique logical ID for the stream
+   * @param msgBuilder the {@link BiFunction} to convert the incoming key and message to a message
+   *                   in the input {@link MessageStream}
+   * @param <K> the type of key in the incoming message
+   * @param <V> the type of message in the incoming message
+   * @param <M> the type of message in the input {@link MessageStream}
    * @return the input {@link MessageStream}
    */
-  Map<StreamSpec, MessageStream> getInStreams();
+  <K, V, M> MessageStream<M> getInputStream(String streamId, BiFunction<K, V, M> msgBuilder);
 
   /**
-   * Method to get the {@link OutputStream}s
+   * Gets the {@link OutputStream} corresponding to the logical {@code streamId}.
    *
-   * @return  the map of all {@link OutputStream}s
+   * @param streamId the unique logical ID for the stream
+   * @param keyExtractor the {@link Function} to extract the outgoing key from the output message
+   * @param msgExtractor the {@link Function} to extract the outgoing message from the output message
+   * @param <K> the type of key in the outgoing message
+   * @param <V> the type of message in the outgoing message
+   * @param <M> the type of message in the {@link OutputStream}
+   * @return the output {@link MessageStream}
    */
-  Map<StreamSpec, OutputStream> getOutStreams();
+  <K, V, M> OutputStream<K, V, M> getOutputStream(String streamId,
+      Function<M, K> keyExtractor, Function<M, V> msgExtractor);
 
   /**
-   * Method to set the {@link ContextManager} for this {@link StreamGraph}
+   * Sets the {@link ContextManager} for this {@link StreamGraph}.
+   *
+   * The provided {@code contextManager} will be initialized before the transformation functions
+   * and can be used to setup shared context between them.
    *
-   * @param manager  the {@link ContextManager} object
-   * @return  this {@link StreamGraph} object
+   * @param contextManager the {@link ContextManager} to use for the {@link StreamGraph}
+   * @return the {@link StreamGraph} with the {@code contextManager} as its {@link ContextManager}
    */
-  StreamGraph withContextManager(ContextManager manager);
+  StreamGraph withContextManager(ContextManager contextManager);
 
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-api/src/main/java/org/apache/samza/operators/data/InputMessageEnvelope.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/data/InputMessageEnvelope.java b/samza-api/src/main/java/org/apache/samza/operators/data/InputMessageEnvelope.java
deleted file mode 100644
index 306145b..0000000
--- a/samza-api/src/main/java/org/apache/samza/operators/data/InputMessageEnvelope.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.data;
-
-import org.apache.samza.system.IncomingMessageEnvelope;
-import org.apache.samza.system.SystemStreamPartition;
-
-
-/**
- * A {@link MessageEnvelope} that provides additional information about its input {@link SystemStreamPartition}
- * and its {@link Offset} within the {@link SystemStreamPartition}.
- * <p>
- * Note: the {@link Offset} is only unique and comparable within its {@link SystemStreamPartition}.
- */
-public class InputMessageEnvelope implements MessageEnvelope<Object, Object> {
-
-  private final IncomingMessageEnvelope ime;
-
-  /**
-   * Creates an {@code InputMessageEnvelope} from the {@link IncomingMessageEnvelope}.
-   *
-   * @param ime  the {@link IncomingMessageEnvelope} from the input system.
-   */
-  public InputMessageEnvelope(IncomingMessageEnvelope ime) {
-    this.ime = ime;
-  }
-
-  @Override
-  public Object getKey() {
-    return this.ime.getKey();
-  }
-
-  @Override
-  public Object getMessage() {
-    return this.ime.getMessage();
-  }
-
-  public Offset getOffset() {
-    // TODO: need to add offset factory to generate different types of offset. This is just a placeholder,
-    // assuming incoming message envelope carries long value as offset (i.e. Kafka case)
-    return new LongOffset(this.ime.getOffset());
-  }
-
-  public SystemStreamPartition getSystemStreamPartition() {
-    return this.ime.getSystemStreamPartition();
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-api/src/main/java/org/apache/samza/operators/data/LongOffset.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/data/LongOffset.java b/samza-api/src/main/java/org/apache/samza/operators/data/LongOffset.java
deleted file mode 100644
index 0b6c0fa..0000000
--- a/samza-api/src/main/java/org/apache/samza/operators/data/LongOffset.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.operators.data;
-
-/**
- * An implementation of {@link org.apache.samza.operators.data.Offset}, w/ {@code long} value as the offset
- */
-public class LongOffset implements Offset {
-
-  /**
-   * The offset value in {@code long}
-   */
-  private final Long offset;
-
-  private LongOffset(long offset) {
-    this.offset = offset;
-  }
-
-  public LongOffset(String offset) {
-    this.offset = Long.valueOf(offset);
-  }
-
-  @Override
-  public int compareTo(Offset o) {
-    if (!(o instanceof LongOffset)) {
-      throw new IllegalArgumentException("Not comparable offset classes. LongOffset vs " + o.getClass().getName());
-    }
-    LongOffset other = (LongOffset) o;
-    return this.offset.compareTo(other.offset);
-  }
-
-  @Override
-  public boolean equals(Object other) {
-    if (!(other instanceof LongOffset)) {
-      return false;
-    }
-    LongOffset o = (LongOffset) other;
-    return this.offset.equals(o.offset);
-  }
-
-  @Override
-  public int hashCode() {
-    return offset.hashCode();
-  }
-
-  /**
-   * Helper method to get the minimum offset
-   *
-   * @return The minimum offset
-   */
-  public static LongOffset getMinOffset() {
-    return new LongOffset(Long.MIN_VALUE);
-  }
-
-  /**
-   * Helper method to get the maximum offset
-   *
-   * @return The maximum offset
-   */
-  public static LongOffset getMaxOffset() {
-    return new LongOffset(Long.MAX_VALUE);
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-api/src/main/java/org/apache/samza/operators/data/MessageEnvelope.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/data/MessageEnvelope.java b/samza-api/src/main/java/org/apache/samza/operators/data/MessageEnvelope.java
deleted file mode 100644
index 703a44c..0000000
--- a/samza-api/src/main/java/org/apache/samza/operators/data/MessageEnvelope.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.operators.data;
-
-import org.apache.samza.annotation.InterfaceStability;
-
-
-/**
- * An entry in the input/output {@link org.apache.samza.operators.MessageStream}s
- */
-@InterfaceStability.Unstable
-public interface MessageEnvelope<K, M> {
-
-  /**
-   * Get the key for this {@link MessageEnvelope}.
-   *
-   * @return  the key for this {@link MessageEnvelope}
-   */
-  K getKey();
-
-  /**
-   * Get the message in this {@link MessageEnvelope}.
-   *
-   * @return  the message in this {@link MessageEnvelope}
-   */
-  M getMessage();
-
-  /**
-   * Whether this {@link MessageEnvelope} indicates deletion of a previous message with this key.
-   *
-   * @return  true if the current {@link MessageEnvelope} indicates deletion of a previous message with this key
-   */
-  default boolean isDelete() {
-    return false;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-api/src/main/java/org/apache/samza/operators/data/Offset.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/data/Offset.java b/samza-api/src/main/java/org/apache/samza/operators/data/Offset.java
deleted file mode 100644
index 5ac1ad7..0000000
--- a/samza-api/src/main/java/org/apache/samza/operators/data/Offset.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.operators.data;
-
-import org.apache.samza.annotation.InterfaceStability;
-
-
-/**
- * A generic interface extending {@link java.lang.Comparable} to be used as {@code Offset} in a stream
- */
-@InterfaceStability.Unstable
-public interface Offset extends Comparable<Offset> {
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-api/src/main/java/org/apache/samza/operators/functions/SinkFunction.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/SinkFunction.java b/samza-api/src/main/java/org/apache/samza/operators/functions/SinkFunction.java
index 08e090a..1d140ee 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/functions/SinkFunction.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/functions/SinkFunction.java
@@ -37,7 +37,7 @@ public interface SinkFunction<M>  extends InitableFunction {
    * or shut the container down.
    *
    * @param message  the input message to be sent to an output {@link org.apache.samza.system.SystemStream}
-   * @param messageCollector  the {@link MessageCollector} to send the {@link org.apache.samza.operators.data.MessageEnvelope}
+   * @param messageCollector  the {@link MessageCollector} to send the message
    * @param taskCoordinator  the {@link TaskCoordinator} to request commits or shutdown
    */
   void apply(M message, MessageCollector messageCollector, TaskCoordinator taskCoordinator);

http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-api/src/main/java/org/apache/samza/runtime/ApplicationRunner.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/runtime/ApplicationRunner.java b/samza-api/src/main/java/org/apache/samza/runtime/ApplicationRunner.java
index e4e24b4..b761d86 100644
--- a/samza-api/src/main/java/org/apache/samza/runtime/ApplicationRunner.java
+++ b/samza-api/src/main/java/org/apache/samza/runtime/ApplicationRunner.java
@@ -18,7 +18,6 @@
  */
 package org.apache.samza.runtime;
 
-import java.lang.reflect.Constructor;
 import org.apache.samza.annotation.InterfaceStability;
 import org.apache.samza.application.StreamApplication;
 import org.apache.samza.config.Config;
@@ -26,6 +25,8 @@ import org.apache.samza.config.ConfigException;
 import org.apache.samza.job.ApplicationStatus;
 import org.apache.samza.system.StreamSpec;
 
+import java.lang.reflect.Constructor;
+
 
 /**
  * A physical execution engine to deploy the config and jobs to run the {@link org.apache.samza.operators.StreamGraph}

http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-api/src/test/java/org/apache/samza/operators/TestMessageEnvelope.java
----------------------------------------------------------------------
diff --git a/samza-api/src/test/java/org/apache/samza/operators/TestMessageEnvelope.java b/samza-api/src/test/java/org/apache/samza/operators/TestMessageEnvelope.java
deleted file mode 100644
index dfa69ac..0000000
--- a/samza-api/src/test/java/org/apache/samza/operators/TestMessageEnvelope.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators;
-
-import org.apache.samza.operators.data.MessageEnvelope;
-
-
-public class TestMessageEnvelope implements MessageEnvelope<String, TestMessageEnvelope.MessageType> {
-
-  private final String key;
-  private final MessageType value;
-
-  public TestMessageEnvelope(String key, String value, long eventTime) {
-    this.key = key;
-    this.value = new MessageType(value, eventTime);
-  }
-
-  @Override
-  public MessageType getMessage() {
-    return this.value;
-  }
-
-  @Override
-  public String getKey() {
-    return this.key;
-  }
-
-  public class MessageType {
-    private final String value;
-    private final long eventTime;
-
-    public MessageType(String value, long eventTime) {
-      this.value = value;
-      this.eventTime = eventTime;
-    }
-
-    public long getEventTime() {
-      return eventTime;
-    }
-
-    public String getValue() {
-      return value;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-api/src/test/java/org/apache/samza/operators/TestOutputMessageEnvelope.java
----------------------------------------------------------------------
diff --git a/samza-api/src/test/java/org/apache/samza/operators/TestOutputMessageEnvelope.java b/samza-api/src/test/java/org/apache/samza/operators/TestOutputMessageEnvelope.java
deleted file mode 100644
index 284b30b..0000000
--- a/samza-api/src/test/java/org/apache/samza/operators/TestOutputMessageEnvelope.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators;
-
-import org.apache.samza.operators.data.MessageEnvelope;
-
-
-public class TestOutputMessageEnvelope implements MessageEnvelope<String, Integer> {
-  private final String key;
-  private final Integer value;
-
-  public TestOutputMessageEnvelope(String key, Integer value) {
-    this.key = key;
-    this.value = value;
-  }
-
-  @Override
-  public Integer getMessage() {
-    return this.value;
-  }
-
-  @Override
-  public String getKey() {
-    return this.key;
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-api/src/test/java/org/apache/samza/operators/data/TestIncomingSystemMessage.java
----------------------------------------------------------------------
diff --git a/samza-api/src/test/java/org/apache/samza/operators/data/TestIncomingSystemMessage.java b/samza-api/src/test/java/org/apache/samza/operators/data/TestIncomingSystemMessage.java
deleted file mode 100644
index e3a1290..0000000
--- a/samza-api/src/test/java/org/apache/samza/operators/data/TestIncomingSystemMessage.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.data;
-
-import org.apache.samza.system.IncomingMessageEnvelope;
-import org.apache.samza.system.SystemStreamPartition;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-
-public class TestIncomingSystemMessage {
-
-  @Test
-  public void testConstructor() {
-    IncomingMessageEnvelope ime = mock(IncomingMessageEnvelope.class);
-    InputMessageEnvelope ism = new InputMessageEnvelope(ime);
-
-    Object mockKey = mock(Object.class);
-    Object mockValue = mock(Object.class);
-    LongOffset testOffset = new LongOffset("12345");
-    SystemStreamPartition mockSsp = mock(SystemStreamPartition.class);
-
-    when(ime.getKey()).thenReturn(mockKey);
-    when(ime.getMessage()).thenReturn(mockValue);
-    when(ime.getSystemStreamPartition()).thenReturn(mockSsp);
-    when(ime.getOffset()).thenReturn("12345");
-
-    assertEquals(ism.getKey(), mockKey);
-    assertEquals(ism.getMessage(), mockValue);
-    assertEquals(ism.getSystemStreamPartition(), mockSsp);
-    assertEquals(ism.getOffset(), testOffset);
-    assertFalse(ism.isDelete());
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-api/src/test/java/org/apache/samza/operators/data/TestLongOffset.java
----------------------------------------------------------------------
diff --git a/samza-api/src/test/java/org/apache/samza/operators/data/TestLongOffset.java b/samza-api/src/test/java/org/apache/samza/operators/data/TestLongOffset.java
deleted file mode 100644
index 7838896..0000000
--- a/samza-api/src/test/java/org/apache/samza/operators/data/TestLongOffset.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.data;
-
-import org.junit.Test;
-
-import java.lang.reflect.Field;
-
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.mock;
-
-
-public class TestLongOffset {
-
-  @Test
-  public void testConstructor() throws Exception {
-    LongOffset o1 = new LongOffset("12345");
-    Field offsetField = LongOffset.class.getDeclaredField("offset");
-    offsetField.setAccessible(true);
-    Long x = (Long) offsetField.get(o1);
-    assertEquals(x.longValue(), 12345L);
-
-    o1 = new LongOffset("012345");
-    x = (Long) offsetField.get(o1);
-    assertEquals(x.longValue(), 12345L);
-
-    try {
-      o1 = new LongOffset("xyz");
-      fail("Constructor of LongOffset should have failed w/ mal-formatted numbers");
-    } catch (NumberFormatException nfe) {
-      // expected
-    }
-  }
-
-  @Test
-  public void testComparator() {
-    LongOffset o1 = new LongOffset("11111");
-    Offset other = mock(Offset.class);
-    try {
-      o1.compareTo(other);
-      fail("compareTo() should have have failed when comparing to an object of a different class");
-    } catch (IllegalArgumentException iae) {
-      // expected
-    }
-
-    LongOffset o2 = new LongOffset("-10000");
-    assertEquals(o1.compareTo(o2), 1);
-    LongOffset o3 = new LongOffset("22222");
-    assertEquals(o1.compareTo(o3), -1);
-    LongOffset o4 = new LongOffset("11111");
-    assertEquals(o1.compareTo(o4), 0);
-  }
-
-  @Test
-  public void testEquals() {
-    LongOffset o1 = new LongOffset("12345");
-    Offset other = mock(Offset.class);
-    assertFalse(o1.equals(other));
-
-    LongOffset o2 = new LongOffset("0012345");
-    assertTrue(o1.equals(o2));
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java b/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
index 47deecd..be807e9 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
@@ -33,7 +33,7 @@ import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.MessageStreamImpl;
-import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.operators.StreamGraphImpl;
 import org.apache.samza.operators.spec.OperatorSpec;
 import org.apache.samza.operators.spec.PartialJoinOperatorSpec;
 import org.apache.samza.system.StreamSpec;
@@ -57,7 +57,7 @@ public class ExecutionPlanner {
     this.streamManager = streamManager;
   }
 
-  public JobGraph plan(StreamGraph streamGraph) throws Exception {
+  public JobGraph plan(StreamGraphImpl streamGraph) throws Exception {
     // create physical job graph based on stream graph
     JobGraph jobGraph = createJobGraph(streamGraph);
 
@@ -72,10 +72,10 @@ public class ExecutionPlanner {
   /**
    * Create the physical graph from StreamGraph
    */
-  /* package private */ JobGraph createJobGraph(StreamGraph streamGraph) {
+  /* package private */ JobGraph createJobGraph(StreamGraphImpl streamGraph) {
     JobGraph jobGraph = new JobGraph(streamGraph, config);
-    Set<StreamSpec> sourceStreams = new HashSet<>(streamGraph.getInStreams().keySet());
-    Set<StreamSpec> sinkStreams = new HashSet<>(streamGraph.getOutStreams().keySet());
+    Set<StreamSpec> sourceStreams = new HashSet<>(streamGraph.getInputStreams().keySet());
+    Set<StreamSpec> sinkStreams = new HashSet<>(streamGraph.getOutputStreams().keySet());
     Set<StreamSpec> intStreams = new HashSet<>(sourceStreams);
     intStreams.retainAll(sinkStreams);
     sourceStreams.removeAll(intStreams);
@@ -103,7 +103,7 @@ public class ExecutionPlanner {
   /**
    * Figure out the number of partitions of all streams
    */
-  /* package private */ void calculatePartitions(StreamGraph streamGraph, JobGraph jobGraph) {
+  /* package private */ void calculatePartitions(StreamGraphImpl streamGraph, JobGraph jobGraph) {
     // fetch the external streams partition info
     updateExistingPartitions(jobGraph, streamManager);
 
@@ -152,7 +152,7 @@ public class ExecutionPlanner {
   /**
    * Calculate the partitions for the input streams of join operators
    */
-  /* package private */ static void calculateJoinInputPartitions(StreamGraph streamGraph, JobGraph jobGraph) {
+  /* package private */ static void calculateJoinInputPartitions(StreamGraphImpl streamGraph, JobGraph jobGraph) {
     // mapping from a source stream to all join specs reachable from it
     Multimap<OperatorSpec, StreamEdge> joinSpecToStreamEdges = HashMultimap.create();
     // reverse mapping of the above
@@ -166,7 +166,7 @@ public class ExecutionPlanner {
     // The visited set keeps track of the join specs that have been already inserted in the queue before
     Set<OperatorSpec> visited = new HashSet<>();
 
-    streamGraph.getInStreams().entrySet().forEach(entry -> {
+    streamGraph.getInputStreams().entrySet().forEach(entry -> {
         StreamEdge streamEdge = jobGraph.getOrCreateEdge(entry.getKey());
         // Traverses the StreamGraph to find and update mappings for all Joins reachable from this input StreamEdge
         findReachableJoins(entry.getValue(), streamEdge, joinSpecToStreamEdges, streamEdgeToJoinSpecs,

http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java b/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
index 339df7a..dfe231e 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
@@ -28,6 +28,8 @@ import org.apache.samza.operators.functions.PartialJoinFunction;
 import org.apache.samza.operators.functions.SinkFunction;
 import org.apache.samza.operators.spec.OperatorSpec;
 import org.apache.samza.operators.spec.OperatorSpecs;
+import org.apache.samza.operators.spec.SinkOperatorSpec;
+import org.apache.samza.operators.stream.OutputStreamInternal;
 import org.apache.samza.operators.util.InternalInMemoryStore;
 import org.apache.samza.operators.windows.Window;
 import org.apache.samza.operators.windows.WindowPane;
@@ -51,7 +53,7 @@ import java.util.function.Function;
  */
 public class MessageStreamImpl<M> implements MessageStream<M> {
   /**
-   * The {@link StreamGraphImpl} object that contains this {@link MessageStreamImpl}
+   * The {@link StreamGraphImpl} that contains this {@link MessageStreamImpl}
    */
   private final StreamGraphImpl graph;
 
@@ -65,52 +67,59 @@ public class MessageStreamImpl<M> implements MessageStream<M> {
    *
    * @param graph the {@link StreamGraphImpl} object that this stream belongs to
    */
-  MessageStreamImpl(StreamGraphImpl graph) {
+  public MessageStreamImpl(StreamGraphImpl graph) {
     this.graph = graph;
   }
 
   @Override
   public <TM> MessageStream<TM> map(MapFunction<M, TM> mapFn) {
-    OperatorSpec<TM> op = OperatorSpecs.<M, TM>createMapOperatorSpec(mapFn, this.graph, new MessageStreamImpl<>(this.graph));
+    OperatorSpec<TM> op = OperatorSpecs.createMapOperatorSpec(
+        mapFn, new MessageStreamImpl<>(this.graph), this.graph.getNextOpId());
     this.registeredOperatorSpecs.add(op);
     return op.getNextStream();
   }
 
   @Override
   public MessageStream<M> filter(FilterFunction<M> filterFn) {
-    OperatorSpec<M> op = OperatorSpecs.<M>createFilterOperatorSpec(filterFn, this.graph, new MessageStreamImpl<>(this.graph));
+    OperatorSpec<M> op = OperatorSpecs.createFilterOperatorSpec(
+        filterFn, new MessageStreamImpl<>(this.graph), this.graph.getNextOpId());
     this.registeredOperatorSpecs.add(op);
     return op.getNextStream();
   }
 
   @Override
   public <TM> MessageStream<TM> flatMap(FlatMapFunction<M, TM> flatMapFn) {
-    OperatorSpec<TM> op = OperatorSpecs.createStreamOperatorSpec(flatMapFn, this.graph, new MessageStreamImpl<>(this.graph));
+    OperatorSpec<TM> op = OperatorSpecs.createStreamOperatorSpec(
+        flatMapFn, new MessageStreamImpl<>(this.graph), this.graph.getNextOpId());
     this.registeredOperatorSpecs.add(op);
     return op.getNextStream();
   }
 
   @Override
   public void sink(SinkFunction<M> sinkFn) {
-    this.registeredOperatorSpecs.add(OperatorSpecs.createSinkOperatorSpec(sinkFn, this.graph));
+    SinkOperatorSpec<M> op = OperatorSpecs.createSinkOperatorSpec(sinkFn, this.graph.getNextOpId());
+    this.registeredOperatorSpecs.add(op);
   }
 
   @Override
-  public void sendTo(OutputStream<M> stream) {
-    this.registeredOperatorSpecs.add(OperatorSpecs.createSendToOperatorSpec(stream.getSinkFunction(), this.graph, stream));
+  public <K, V> void sendTo(OutputStream<K, V, M> outputStream) {
+    SinkOperatorSpec<M> op = OperatorSpecs.createSendToOperatorSpec(
+        (OutputStreamInternal<K, V, M>) outputStream, this.graph.getNextOpId());
+    this.registeredOperatorSpecs.add(op);
   }
 
   @Override
   public <K, WV> MessageStream<WindowPane<K, WV>> window(Window<M, K, WV> window) {
     OperatorSpec<WindowPane<K, WV>> wndOp = OperatorSpecs.createWindowOperatorSpec((WindowInternal<M, K, WV>) window,
-        this.graph, new MessageStreamImpl<>(this.graph));
+        new MessageStreamImpl<>(this.graph), this.graph.getNextOpId());
     this.registeredOperatorSpecs.add(wndOp);
     return wndOp.getNextStream();
   }
 
   @Override
-  public <K, JM, RM> MessageStream<RM> join(MessageStream<JM> otherStream, JoinFunction<K, M, JM, RM> joinFn, Duration ttl) {
-    MessageStreamImpl<RM> outputStream = new MessageStreamImpl<>(this.graph);
+  public <K, JM, RM> MessageStream<RM> join(
+      MessageStream<JM> otherStream, JoinFunction<K, M, JM, RM> joinFn, Duration ttl) {
+    MessageStreamImpl<RM> nextStream = new MessageStreamImpl<>(this.graph);
 
     PartialJoinFunction<K, M, JM, RM> thisPartialJoinFn = new PartialJoinFunction<K, M, JM, RM>() {
       private KeyValueStore<K, PartialJoinMessage<M>> thisStreamState;
@@ -163,32 +172,36 @@ public class MessageStreamImpl<M> implements MessageStream<M> {
       }
     };
 
-    this.registeredOperatorSpecs.add(OperatorSpecs.<K, M, JM, RM>createPartialJoinOperatorSpec(
-        thisPartialJoinFn, otherPartialJoinFn, ttl.toMillis(), this.graph, outputStream));
+    this.registeredOperatorSpecs.add(OperatorSpecs.createPartialJoinOperatorSpec(
+        thisPartialJoinFn, otherPartialJoinFn, ttl.toMillis(), nextStream, this.graph.getNextOpId()));
 
-    ((MessageStreamImpl<JM>) otherStream).registeredOperatorSpecs.add(OperatorSpecs.<K, JM, M, RM>createPartialJoinOperatorSpec(
-        otherPartialJoinFn, thisPartialJoinFn, ttl.toMillis(), this.graph, outputStream));
+    ((MessageStreamImpl<JM>) otherStream).registeredOperatorSpecs
+        .add(OperatorSpecs.createPartialJoinOperatorSpec(
+            otherPartialJoinFn, thisPartialJoinFn, ttl.toMillis(), nextStream, this.graph.getNextOpId()));
 
-    return outputStream;
+    return nextStream;
   }
 
   @Override
   public MessageStream<M> merge(Collection<MessageStream<M>> otherStreams) {
-    MessageStreamImpl<M> outputStream = new MessageStreamImpl<>(this.graph);
+    MessageStreamImpl<M> nextStream = new MessageStreamImpl<>(this.graph);
 
     otherStreams.add(this);
     otherStreams.forEach(other -> ((MessageStreamImpl<M>) other).registeredOperatorSpecs.
-        add(OperatorSpecs.createMergeOperatorSpec(this.graph, outputStream)));
-    return outputStream;
+        add(OperatorSpecs.createMergeOperatorSpec(nextStream, this.graph.getNextOpId())));
+    return nextStream;
   }
 
   @Override
-  public <K> MessageStream<M> partitionBy(Function<M, K> parKeyExtractor) {
-    int opId = graph.getNextOpId();
-    MessageStreamImpl<M> intStream = this.graph.generateIntStreamFromOpId(opId, parKeyExtractor);
-    OutputStream<M> outputStream = this.graph.getOutputStream(intStream);
-    this.registeredOperatorSpecs.add(OperatorSpecs.createPartitionOperatorSpec(outputStream.getSinkFunction(), outputStream, opId));
-    return intStream;
+  public <K> MessageStream<M> partitionBy(Function<M, K> keyExtractor) {
+    int opId = this.graph.getNextOpId();
+    String opName = String.format("%s-%s", OperatorSpec.OpCode.PARTITION_BY.name().toLowerCase(), opId);
+    MessageStreamImpl<M> intermediateStream =
+        this.graph.<K, M, M>getIntermediateStream(opName, keyExtractor, m -> m, (k, m) -> m);
+    SinkOperatorSpec<M> partitionByOperatorSpec = OperatorSpecs.createPartitionByOperatorSpec(
+        (OutputStreamInternal<K, M, M>) intermediateStream, opId);
+    this.registeredOperatorSpecs.add(partitionByOperatorSpec);
+    return intermediateStream;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java b/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
index 6f7377b..a49b68e 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
@@ -18,248 +18,110 @@
  */
 package org.apache.samza.operators;
 
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.function.Function;
-
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
-import org.apache.samza.operators.data.MessageEnvelope;
-import org.apache.samza.operators.functions.SinkFunction;
-import org.apache.samza.operators.spec.OperatorSpec;
+import org.apache.samza.operators.stream.InputStreamInternal;
+import org.apache.samza.operators.stream.InputStreamInternalImpl;
+import org.apache.samza.operators.stream.IntermediateStreamInternalImpl;
+import org.apache.samza.operators.stream.OutputStreamInternal;
+import org.apache.samza.operators.stream.OutputStreamInternalImpl;
 import org.apache.samza.runtime.ApplicationRunner;
-import org.apache.samza.serializers.Serde;
-import org.apache.samza.system.OutgoingMessageEnvelope;
 import org.apache.samza.system.StreamSpec;
-import org.apache.samza.system.SystemStream;
-import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.TaskCoordinator;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.BiFunction;
+import java.util.function.Function;
 
 /**
- * The implementation of {@link StreamGraph} interface. This class provides implementation of methods to allow users to
- * create system input/output/intermediate streams.
+ * A {@link StreamGraph} that provides APIs for accessing�{@link MessageStream}s to be used to
+ * create the DAG of transforms.
  */
 public class StreamGraphImpl implements StreamGraph {
 
   /**
-   * Unique identifier for each {@link org.apache.samza.operators.spec.OperatorSpec} added to transform the {@link MessageEnvelope}
-   * in the input {@link MessageStream}s.
+   * Unique identifier for each {@link org.apache.samza.operators.spec.OperatorSpec} in the graph.
+   * Should only be accessed by {@link MessageStreamImpl} via {@link #getNextOpId()}.
    */
   private int opId = 0;
 
-  // TODO: SAMZA-1101: the instantiation of physical streams and the physical sink functions should be delayed
-  // after physical deployment. The input/output/intermediate stream creation should also be delegated to {@link ExecutionEnvironment}
-  // s.t. we can allow different physical instantiation of stream under different execution environment w/o code change.
-  private class InputStreamImpl<K, V, M extends MessageEnvelope<K, V>> extends MessageStreamImpl<M> {
-    final StreamSpec spec;
-    final Serde<K> keySerde;
-    final Serde<V> msgSerde;
-
-    InputStreamImpl(StreamGraphImpl graph, StreamSpec streamSpec, Serde<K> keySerde, Serde<V> msgSerde) {
-      super(graph);
-      this.spec = streamSpec;
-      this.keySerde = keySerde;
-      this.msgSerde = msgSerde;
-    }
-
-    StreamSpec getSpec() {
-      return this.spec;
-    }
-
-  }
-
-  private class OutputStreamImpl<K, V, M extends MessageEnvelope<K, V>> implements OutputStream<M> {
-    final StreamSpec spec;
-    final Serde<K> keySerde;
-    final Serde<V> msgSerde;
-
-    OutputStreamImpl(StreamGraphImpl graph, StreamSpec streamSpec, Serde<K> keySerde, Serde<V> msgSerde) {
-      this.spec = streamSpec;
-      this.keySerde = keySerde;
-      this.msgSerde = msgSerde;
-    }
-
-    StreamSpec getSpec() {
-      return this.spec;
-    }
-
-    @Override
-    public SinkFunction<M> getSinkFunction() {
-      return (M message, MessageCollector mc, TaskCoordinator tc) -> {
-        // TODO: need to find a way to directly pass in the serde class names
-        // mc.send(new OutgoingMessageEnvelope(this.spec.getSystemStream(), this.keySerde.getClass().getName(), this.msgSerde.getClass().getName(),
-        //    message.getKey(), message.getKey(), message.getMessage()));
-        mc.send(new OutgoingMessageEnvelope(new SystemStream(this.spec.getSystemName(), this.spec.getPhysicalName()), message.getKey(), message.getMessage()));
-      };
-    }
-  }
-
-  private class IntermediateStreamImpl<PK, K, V, M extends MessageEnvelope<K, V>> extends InputStreamImpl<K, V, M> implements OutputStream<M> {
-    final Function<M, PK> parKeyFn;
-
-    /**
-     * Default constructor
-     *
-     * @param graph the {@link StreamGraphImpl} object that this stream belongs to
-     */
-    IntermediateStreamImpl(StreamGraphImpl graph, StreamSpec streamSpec, Serde<K> keySerde, Serde<V> msgSerde) {
-      this(graph, streamSpec, keySerde, msgSerde, null);
-    }
-
-    IntermediateStreamImpl(StreamGraphImpl graph, StreamSpec streamSpec, Serde<K> keySerde, Serde<V> msgSerde, Function<M, PK> parKeyFn) {
-      super(graph, streamSpec, keySerde, msgSerde);
-      this.parKeyFn = parKeyFn;
-    }
-
-    @Override
-    public SinkFunction<M> getSinkFunction() {
-      return (M message, MessageCollector mc, TaskCoordinator tc) -> {
-        // TODO: need to find a way to directly pass in the serde class names
-        // mc.send(new OutgoingMessageEnvelope(this.spec.getSystemStream(), this.keySerde.getClass().getName(), this.msgSerde.getClass().getName(),
-        //    message.getKey(), message.getKey(), message.getMessage()));
-        if (this.parKeyFn == null) {
-          mc.send(new OutgoingMessageEnvelope(new SystemStream(this.spec.getSystemName(), this.spec.getPhysicalName()), message.getKey(), message.getMessage()));
-        } else {
-          // apply partition key function
-          mc.send(new OutgoingMessageEnvelope(new SystemStream(this.spec.getSystemName(), this.spec.getPhysicalName()), this.parKeyFn.apply(message), message.getKey(), message.getMessage()));
-        }
-      };
-    }
-  }
-
-  /**
-   * Maps keeping all {@link SystemStream}s that are input and output of operators in {@link StreamGraphImpl}
-   */
-  private final Map<String, MessageStream> inStreams = new HashMap<>();
-  private final Map<String, OutputStream> outStreams = new HashMap<>();
+  private final Map<StreamSpec, InputStreamInternal> inStreams = new HashMap<>();
+  private final Map<StreamSpec, OutputStreamInternal> outStreams = new HashMap<>();
   private final ApplicationRunner runner;
   private final Config config;
 
   private ContextManager contextManager = new ContextManager() { };
 
   public StreamGraphImpl(ApplicationRunner runner, Config config) {
+    // TODO: SAMZA-1118 - Move StreamSpec and ApplicationRunner out of StreamGraphImpl once Systems
+    // can use streamId to send and receive messages.
     this.runner = runner;
     this.config = config;
   }
 
   @Override
-  public <K, V, M extends MessageEnvelope<K, V>> MessageStream<M> createInStream(StreamSpec streamSpec, Serde<K> keySerde, Serde<V> msgSerde) {
-    if (!this.inStreams.containsKey(streamSpec.getId())) {
-      this.inStreams.putIfAbsent(streamSpec.getId(), new InputStreamImpl<K, V, M>(this, streamSpec, keySerde, msgSerde));
-    }
-    return this.inStreams.get(streamSpec.getId());
+  public <K, V, M> MessageStream<M> getInputStream(String streamId, BiFunction<K, V, M> msgBuilder) {
+    return inStreams.computeIfAbsent(runner.getStreamSpec(streamId),
+        streamSpec -> new InputStreamInternalImpl<>(this, streamSpec, msgBuilder));
   }
 
-  /**
-   * Helper method to be used by {@link MessageStreamImpl} class
-   *
-   * @param streamSpec  the {@link StreamSpec} object defining the {@link SystemStream} as the output
-   * @param <M>  the type of {@link MessageEnvelope}s in the output {@link SystemStream}
-   * @return  the {@link MessageStreamImpl} object
-   */
   @Override
-  public <K, V, M extends MessageEnvelope<K, V>> OutputStream<M> createOutStream(StreamSpec streamSpec, Serde<K> keySerde, Serde<V> msgSerde) {
-    if (!this.outStreams.containsKey(streamSpec.getId())) {
-      this.outStreams.putIfAbsent(streamSpec.getId(), new OutputStreamImpl<K, V, M>(this, streamSpec, keySerde, msgSerde));
-    }
-    return this.outStreams.get(streamSpec.getId());
+  public <K, V, M> OutputStream<K, V, M> getOutputStream(String streamId,
+      Function<M, K> keyExtractor, Function<M, V> msgExtractor) {
+    return outStreams.computeIfAbsent(runner.getStreamSpec(streamId),
+        streamSpec -> new OutputStreamInternalImpl<>(this, streamSpec, keyExtractor, msgExtractor));
+  }
+
+  @Override
+  public StreamGraph withContextManager(ContextManager contextManager) {
+    this.contextManager = contextManager;
+    return this;
   }
 
   /**
-   * Helper method to be used by {@link MessageStreamImpl} class
+   * Internal helper for {@link MessageStreamImpl} to add an intermediate {@link MessageStream} to the graph.
+   * An intermediate {@link MessageStream} is both an output and an input stream.
    *
-   * @param streamSpec  the {@link StreamSpec} object defining the {@link SystemStream} as an intermediate {@link SystemStream}
-   * @param <M>  the type of {@link MessageEnvelope}s in the output {@link SystemStream}
-   * @return  the {@link MessageStreamImpl} object
+   * @param streamName the name of the stream to be created. Will be prefixed with job name and id to generate the
+   *                   logical streamId.
+   * @param keyExtractor the {@link Function} to extract the outgoing key from the intermediate message
+   * @param msgExtractor the {@link Function} to extract the outgoing message from the intermediate message
+   * @param msgBuilder the {@link BiFunction} to convert the incoming key and message to a message
+   *                   in the intermediate {@link MessageStream}
+   * @param <K> the type of key in the intermediate message
+   * @param <V> the type of message in the intermediate message
+   * @param <M> the type of messages in the intermediate {@link MessageStream}
+   * @return  the intermediate {@link MessageStreamImpl}
    */
-  @Override
-  public <K, V, M extends MessageEnvelope<K, V>> OutputStream<M> createIntStream(StreamSpec streamSpec,
-      Serde<K> keySerde, Serde<V> msgSerde) {
-    if (!this.inStreams.containsKey(streamSpec.getId())) {
-      this.inStreams.putIfAbsent(streamSpec.getId(), new IntermediateStreamImpl<K, K, V, M>(this, streamSpec, keySerde, msgSerde));
-    }
-    IntermediateStreamImpl<K, K, V, M> intStream = (IntermediateStreamImpl<K, K, V, M>) this.inStreams.get(streamSpec.getId());
-    if (!this.outStreams.containsKey(streamSpec.getId())) {
-      this.outStreams.putIfAbsent(streamSpec.getId(), intStream);
-    }
+  <K, V, M> MessageStreamImpl<M> getIntermediateStream(String streamName,
+      Function<M, K> keyExtractor, Function<M, V> msgExtractor, BiFunction<K, V, M> msgBuilder) {
+    String streamId = String.format("%s-%s-%s",
+        config.get(JobConfig.JOB_NAME()),
+        config.get(JobConfig.JOB_ID(), "1"),
+        streamName);
+    StreamSpec streamSpec = runner.getStreamSpec(streamId);
+    IntermediateStreamInternalImpl<K, V, M> intStream =
+        (IntermediateStreamInternalImpl<K, V, M>) inStreams
+            .computeIfAbsent(streamSpec,
+                k -> new IntermediateStreamInternalImpl<>(this, streamSpec, keyExtractor, msgExtractor, msgBuilder));
+    outStreams.putIfAbsent(streamSpec, intStream);
     return intStream;
   }
 
-  @Override public Map<StreamSpec, MessageStream> getInStreams() {
-    Map<StreamSpec, MessageStream> inStreamMap = new HashMap<>();
-    this.inStreams.forEach((ss, entry) -> inStreamMap.put(((InputStreamImpl) entry).getSpec(), entry));
-    return Collections.unmodifiableMap(inStreamMap);
-  }
-
-  @Override public Map<StreamSpec, OutputStream> getOutStreams() {
-    Map<StreamSpec, OutputStream> outStreamMap = new HashMap<>();
-    this.outStreams.forEach((ss, entry) -> {
-        StreamSpec streamSpec = (entry instanceof IntermediateStreamImpl) ?
-          ((IntermediateStreamImpl) entry).getSpec() :
-          ((OutputStreamImpl) entry).getSpec();
-        outStreamMap.put(streamSpec, entry);
-      });
-    return Collections.unmodifiableMap(outStreamMap);
-  }
-
-  @Override
-  public StreamGraph withContextManager(ContextManager manager) {
-    this.contextManager = manager;
-    return this;
+  public Map<StreamSpec, InputStreamInternal> getInputStreams() {
+    return Collections.unmodifiableMap(inStreams);
   }
 
-  public int getNextOpId() {
-    return this.opId++;
+  public Map<StreamSpec, OutputStreamInternal> getOutputStreams() {
+    return Collections.unmodifiableMap(outStreams);
   }
 
   public ContextManager getContextManager() {
     return this.contextManager;
   }
 
-  /**
-   * Helper method to be get the input stream via {@link SystemStream}
-   *
-   * @param sstream  the {@link SystemStream}
-   * @return  a {@link MessageStreamImpl} object corresponding to the {@code systemStream}
-   */
-  public MessageStreamImpl getInputStream(SystemStream sstream) {
-    for (MessageStream entry: this.inStreams.values()) {
-      if (((InputStreamImpl) entry).getSpec().getSystemName().equals(sstream.getSystem()) &&
-          ((InputStreamImpl) entry).getSpec().getPhysicalName().equals(sstream.getStream())) {
-        return (MessageStreamImpl) entry;
-      }
-    }
-    return null;
-  }
-
-  <M> OutputStream<M> getOutputStream(MessageStreamImpl<M> intStream) {
-    if (this.outStreams.containsValue(intStream)) {
-      return (OutputStream<M>) intStream;
-    }
-    return null;
-  }
-
-  /**
-   * Method to generate intermediate stream from an operator ID.
-   *
-   * @param opId  operator ID
-   * @param parKeyFn  the function to extract the partition key from the input message
-   * @param <PK>  the type of partition key
-   * @param <M>  the type of input message
-   * @return  the {@link OutputStream} object for the re-partitioned stream
-   */
-  <PK, M> MessageStreamImpl<M> generateIntStreamFromOpId(int opId, Function<M, PK> parKeyFn) {
-    String opNameWithId = String.format("%s-%s", OperatorSpec.OpCode.PARTITION_BY.name().toLowerCase(), opId);
-    String streamId = String.format("%s-%s-%s",
-        config.get(JobConfig.JOB_NAME()),
-        config.get(JobConfig.JOB_ID(), "1"),
-        opNameWithId);
-    StreamSpec streamSpec = runner.getStreamSpec(streamId);
-
-    this.inStreams.putIfAbsent(streamSpec.getId(), new IntermediateStreamImpl(this, streamSpec, null, null, parKeyFn));
-    IntermediateStreamImpl intStream = (IntermediateStreamImpl) this.inStreams.get(streamSpec.getId());
-    this.outStreams.putIfAbsent(streamSpec.getId(), intStream);
-    return intStream;
+  /* package private */ int getNextOpId() {
+    return this.opId++;
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorGraph.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorGraph.java b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorGraph.java
deleted file mode 100644
index ca8e34b..0000000
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorGraph.java
+++ /dev/null
@@ -1,185 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.impl;
-
-import org.apache.samza.config.Config;
-import org.apache.samza.operators.MessageStreamImpl;
-import org.apache.samza.operators.spec.OperatorSpec;
-import org.apache.samza.operators.spec.PartialJoinOperatorSpec;
-import org.apache.samza.operators.spec.SinkOperatorSpec;
-import org.apache.samza.operators.spec.StreamOperatorSpec;
-import org.apache.samza.operators.spec.WindowOperatorSpec;
-import org.apache.samza.system.SystemStream;
-import org.apache.samza.task.TaskContext;
-import org.apache.samza.util.Clock;
-import org.apache.samza.util.SystemClock;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-
-
-/**
- * Instantiates the DAG of {@link OperatorImpl}s corresponding to the {@link OperatorSpec}s for a
- * {@link MessageStreamImpl}
- */
-public class OperatorGraph {
-
-  /**
-   * A {@link Map} from {@link OperatorSpec} to {@link OperatorImpl}. This map registers all {@link OperatorImpl} in the DAG
-   * of {@link OperatorImpl} in a {@link org.apache.samza.container.TaskInstance}. Each {@link OperatorImpl} is created
-   * according to a single instance of {@link OperatorSpec}.
-   */
-  private final Map<OperatorSpec, OperatorImpl> operators = new HashMap<>();
-
-  /**
-   * This {@link Map} describes the DAG of {@link OperatorImpl} that are chained together to process the input messages.
-   */
-  private final Map<SystemStream, RootOperatorImpl> operatorGraph = new HashMap<>();
-
-  private final Clock clock;
-
-  public OperatorGraph(Clock clock) {
-    this.clock = clock;
-  }
-
-  public OperatorGraph() {
-    this(SystemClock.instance());
-  }
-
-  /**
-   * Initialize the whole DAG of {@link OperatorImpl}s, based on the input {@link MessageStreamImpl} from the {@link org.apache.samza.operators.StreamGraph}.
-   * This method will traverse each input {@link org.apache.samza.operators.MessageStream} in the {@code inputStreams} and
-   * instantiate the corresponding {@link OperatorImpl} chains that take the {@link org.apache.samza.operators.MessageStream} as input.
-   *
-   * @param inputStreams  the map of input {@link org.apache.samza.operators.MessageStream}s
-   * @param config  the {@link Config} required to instantiate operators
-   * @param context  the {@link TaskContext} required to instantiate operators
-   */
-  public void init(Map<SystemStream, MessageStreamImpl> inputStreams, Config config, TaskContext context) {
-    inputStreams.forEach((ss, mstream) -> this.operatorGraph.put(ss, this.createOperatorImpls(mstream, config, context)));
-  }
-
-  /**
-   * Get the {@link RootOperatorImpl} corresponding to the provided {@code ss}.
-   *
-   * @param ss  input {@link SystemStream}
-   * @return  the {@link RootOperatorImpl} that starts processing the input message
-   */
-  public RootOperatorImpl get(SystemStream ss) {
-    return this.operatorGraph.get(ss);
-  }
-
-  /**
-   * Get all {@link RootOperatorImpl}s for the graph.
-   *
-   * @return  an immutable view of all {@link RootOperatorImpl}s for the graph
-   */
-  public Collection<RootOperatorImpl> getAll() {
-    return Collections.unmodifiableCollection(this.operatorGraph.values());
-  }
-
-  /**
-   * Traverses the DAG of {@link OperatorSpec}s starting from the provided {@link MessageStreamImpl},
-   * creates the corresponding DAG of {@link OperatorImpl}s, and returns its root {@link RootOperatorImpl} node.
-   *
-   * @param source  the input {@link MessageStreamImpl} to instantiate {@link OperatorImpl}s for
-   * @param <M>  the type of messagess in the {@code source} {@link MessageStreamImpl}
-   * @param config  the {@link Config} required to instantiate operators
-   * @param context  the {@link TaskContext} required to instantiate operators
-   * @return  root node for the {@link OperatorImpl} DAG
-   */
-  private <M> RootOperatorImpl<M> createOperatorImpls(MessageStreamImpl<M> source, Config config,
-      TaskContext context) {
-    // since the source message stream might have multiple operator specs registered on it,
-    // create a new root node as a single point of entry for the DAG.
-    RootOperatorImpl<M> rootOperator = new RootOperatorImpl<>();
-    // create the pipeline/topology starting from the source
-    source.getRegisteredOperatorSpecs().forEach(registeredOperator -> {
-        // pass in the source and context s.t. stateful stream operators can initialize their stores
-        OperatorImpl<M, ?> operatorImpl =
-            this.createAndRegisterOperatorImpl(registeredOperator, source, config, context);
-        rootOperator.registerNextOperator(operatorImpl);
-      });
-    return rootOperator;
-  }
-
-  /**
-   * Helper method to recursively traverse the {@link OperatorSpec} DAG and instantiate and link the corresponding
-   * {@link OperatorImpl}s.
-   *
-   * @param operatorSpec  the operatorSpec registered with the {@code source}
-   * @param source  the source {@link MessageStreamImpl}
-   * @param <M>  type of input message
-   * @param config  the {@link Config} required to instantiate operators
-   * @param context  the {@link TaskContext} required to instantiate operators
-   * @return  the operator implementation for the operatorSpec
-   */
-  private <M> OperatorImpl<M, ?> createAndRegisterOperatorImpl(OperatorSpec operatorSpec,
-      MessageStreamImpl<M> source, Config config, TaskContext context) {
-    if (!operators.containsKey(operatorSpec)) {
-      OperatorImpl<M, ?> operatorImpl = createOperatorImpl(source, operatorSpec, config, context);
-      if (operators.putIfAbsent(operatorSpec, operatorImpl) == null) {
-        // this is the first time we've added the operatorImpl corresponding to the operatorSpec,
-        // so traverse and initialize and register the rest of the DAG.
-        // initialize the corresponding operator function
-        operatorSpec.init(config, context);
-        MessageStreamImpl nextStream = operatorSpec.getNextStream();
-        if (nextStream != null) {
-          Collection<OperatorSpec> registeredSpecs = nextStream.getRegisteredOperatorSpecs();
-          registeredSpecs.forEach(registeredSpec -> {
-              OperatorImpl subImpl = this.createAndRegisterOperatorImpl(registeredSpec, nextStream, config, context);
-              operatorImpl.registerNextOperator(subImpl);
-            });
-        }
-        return operatorImpl;
-      }
-    }
-
-    // the implementation corresponding to operatorSpec has already been instantiated
-    // and registered, so we do not need to traverse the DAG further.
-    return operators.get(operatorSpec);
-  }
-
-  /**
-   * Creates a new {@link OperatorImpl} instance for the provided {@link OperatorSpec}.
-   *
-   * @param source  the source {@link MessageStreamImpl}
-   * @param <M>  type of input message
-   * @param operatorSpec  the immutable {@link OperatorSpec} definition.
-   * @param config  the {@link Config} required to instantiate operators
-   * @param context  the {@link TaskContext} required to instantiate operators
-   * @return  the {@link OperatorImpl} implementation instance
-   */
-  private <M> OperatorImpl<M, ?> createOperatorImpl(MessageStreamImpl<M> source, OperatorSpec operatorSpec, Config config, TaskContext context) {
-    if (operatorSpec instanceof StreamOperatorSpec) {
-      StreamOperatorSpec<M, ?> streamOpSpec = (StreamOperatorSpec<M, ?>) operatorSpec;
-      return new StreamOperatorImpl<>(streamOpSpec, source, config, context);
-    } else if (operatorSpec instanceof SinkOperatorSpec) {
-      return new SinkOperatorImpl<>((SinkOperatorSpec<M>) operatorSpec, config, context);
-    } else if (operatorSpec instanceof WindowOperatorSpec) {
-      return new WindowOperatorImpl((WindowOperatorSpec<M, ?, ?>) operatorSpec, clock);
-    } else if (operatorSpec instanceof PartialJoinOperatorSpec) {
-      return new PartialJoinOperatorImpl<>((PartialJoinOperatorSpec) operatorSpec, source, config, context);
-    }
-    throw new IllegalArgumentException(
-        String.format("Unsupported OperatorSpec: %s", operatorSpec.getClass().getName()));
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
new file mode 100644
index 0000000..709f2a0
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.impl;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.MessageStreamImpl;
+import org.apache.samza.operators.StreamGraphImpl;
+import org.apache.samza.operators.spec.OperatorSpec;
+import org.apache.samza.operators.spec.SinkOperatorSpec;
+import org.apache.samza.operators.spec.PartialJoinOperatorSpec;
+import org.apache.samza.operators.spec.StreamOperatorSpec;
+import org.apache.samza.operators.spec.WindowOperatorSpec;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.task.TaskContext;
+import org.apache.samza.util.Clock;
+import org.apache.samza.util.SystemClock;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+
+/**
+ * Instantiates the DAG of {@link OperatorImpl}s corresponding to the {@link OperatorSpec}s for the input
+ * {@link MessageStreamImpl}s.
+ */
+public class OperatorImplGraph {
+
+  /**
+   * A mapping from {@link OperatorSpec}s to their {@link OperatorImpl}s in this graph. Used to avoid creating
+   * multiple {@link OperatorImpl}s for an {@link OperatorSpec}, e.g., when it's reached from different
+   * input {@link MessageStreamImpl}s.
+   */
+  private final Map<OperatorSpec, OperatorImpl> operatorImpls = new HashMap<>();
+
+  /**
+   * A mapping from input {@link SystemStream}s to their {@link OperatorImpl} sub-DAG in this graph.
+   */
+  private final Map<SystemStream, RootOperatorImpl> rootOperators = new HashMap<>();
+
+  private final Clock clock;
+
+  public OperatorImplGraph(Clock clock) {
+    this.clock = clock;
+  }
+
+  /* package private */ OperatorImplGraph() {
+    this(SystemClock.instance());
+  }
+
+  /**
+   * Initialize the DAG of {@link OperatorImpl}s for the input {@link MessageStreamImpl} in the provided
+   * {@link StreamGraphImpl}.
+   *
+   * @param streamGraph  the logical {@link StreamGraphImpl}
+   * @param config  the {@link Config} required to instantiate operators
+   * @param context  the {@link TaskContext} required to instantiate operators
+   */
+  public void init(StreamGraphImpl streamGraph, Config config, TaskContext context) {
+    streamGraph.getInputStreams().forEach((streamSpec, inputStream) -> {
+        SystemStream systemStream = new SystemStream(streamSpec.getSystemName(), streamSpec.getPhysicalName());
+        this.rootOperators.put(systemStream, this.createOperatorImpls((MessageStreamImpl) inputStream, config, context));
+      });
+  }
+
+  /**
+   * Get the {@link RootOperatorImpl} corresponding to the provided input {@code systemStream}.
+   *
+   * @param systemStream  input {@link SystemStream}
+   * @return  the {@link RootOperatorImpl} that starts processing the input message
+   */
+  public RootOperatorImpl getRootOperator(SystemStream systemStream) {
+    return this.rootOperators.get(systemStream);
+  }
+
+  /**
+   * Get all {@link RootOperatorImpl}s for the graph.
+   *
+   * @return  an unmodifiable view of all {@link RootOperatorImpl}s for the graph
+   */
+  public Collection<RootOperatorImpl> getAllRootOperators() {
+    return Collections.unmodifiableCollection(this.rootOperators.values());
+  }
+
+  /**
+   * Traverses the DAG of {@link OperatorSpec}s starting from the provided {@link MessageStreamImpl},
+   * creates the corresponding DAG of {@link OperatorImpl}s, and returns its root {@link RootOperatorImpl} node.
+   *
+   * @param source  the input {@link MessageStreamImpl} to instantiate {@link OperatorImpl}s for
+   * @param <M>  the type of messagess in the {@code source} {@link MessageStreamImpl}
+   * @param config  the {@link Config} required to instantiate operators
+   * @param context  the {@link TaskContext} required to instantiate operators
+   * @return  root node for the {@link OperatorImpl} DAG
+   */
+  private <M> RootOperatorImpl<M> createOperatorImpls(MessageStreamImpl<M> source, Config config, TaskContext context) {
+    // since the source message stream might have multiple operator specs registered on it,
+    // create a new root node as a single point of entry for the DAG.
+    RootOperatorImpl<M> rootOperator = new RootOperatorImpl<>();
+    // create the pipeline/topology starting from the source
+    source.getRegisteredOperatorSpecs().forEach(registeredOperator -> {
+        // pass in the source and context s.t. stateful stream operators can initialize their stores
+        OperatorImpl<M, ?> operatorImpl =
+            createAndRegisterOperatorImpl(registeredOperator, source, config, context);
+        rootOperator.registerNextOperator(operatorImpl);
+      });
+    return rootOperator;
+  }
+
+  /**
+   * Helper method to recursively traverse the {@link OperatorSpec} DAG and instantiate and link the corresponding
+   * {@link OperatorImpl}s.
+   *
+   * @param operatorSpec  the operatorSpec registered with the {@code source}
+   * @param source  the source {@link MessageStreamImpl}
+   * @param <M>  type of input message
+   * @param config  the {@link Config} required to instantiate operators
+   * @param context  the {@link TaskContext} required to instantiate operators
+   * @return  the operator implementation for the operatorSpec
+   */
+  private <M> OperatorImpl<M, ?> createAndRegisterOperatorImpl(OperatorSpec operatorSpec,
+      MessageStreamImpl<M> source, Config config, TaskContext context) {
+    if (!operatorImpls.containsKey(operatorSpec)) {
+      OperatorImpl<M, ?> operatorImpl = createOperatorImpl(source, operatorSpec, config, context);
+      if (operatorImpls.putIfAbsent(operatorSpec, operatorImpl) == null) {
+        // this is the first time we've added the operatorImpl corresponding to the operatorSpec,
+        // so traverse and initialize and register the rest of the DAG.
+        // initialize the corresponding operator function
+        operatorSpec.init(config, context);
+        MessageStreamImpl nextStream = operatorSpec.getNextStream();
+        if (nextStream != null) {
+          Collection<OperatorSpec> registeredSpecs = nextStream.getRegisteredOperatorSpecs();
+          registeredSpecs.forEach(registeredSpec -> {
+              OperatorImpl subImpl = createAndRegisterOperatorImpl(registeredSpec, nextStream, config, context);
+              operatorImpl.registerNextOperator(subImpl);
+            });
+        }
+        return operatorImpl;
+      }
+    }
+
+    // the implementation corresponding to operatorSpec has already been instantiated
+    // and registered, so we do not need to traverse the DAG further.
+    return operatorImpls.get(operatorSpec);
+  }
+
+  /**
+   * Creates a new {@link OperatorImpl} instance for the provided {@link OperatorSpec}.
+   *
+   * @param source  the source {@link MessageStreamImpl}
+   * @param <M>  type of input message
+   * @param operatorSpec  the immutable {@link OperatorSpec} definition.
+   * @param config  the {@link Config} required to instantiate operators
+   * @param context  the {@link TaskContext} required to instantiate operators
+   * @return  the {@link OperatorImpl} implementation instance
+   */
+  private <M> OperatorImpl<M, ?> createOperatorImpl(MessageStreamImpl<M> source,
+      OperatorSpec operatorSpec, Config config, TaskContext context) {
+    if (operatorSpec instanceof StreamOperatorSpec) {
+      StreamOperatorSpec<M, ?> streamOpSpec = (StreamOperatorSpec<M, ?>) operatorSpec;
+      return new StreamOperatorImpl<>(streamOpSpec, source, config, context);
+    } else if (operatorSpec instanceof SinkOperatorSpec) {
+      return new SinkOperatorImpl<>((SinkOperatorSpec<M>) operatorSpec, config, context);
+    } else if (operatorSpec instanceof WindowOperatorSpec) {
+      return new WindowOperatorImpl((WindowOperatorSpec<M, ?, ?>) operatorSpec, clock);
+    } else if (operatorSpec instanceof PartialJoinOperatorSpec) {
+      return new PartialJoinOperatorImpl<>((PartialJoinOperatorSpec) operatorSpec, source, config, context);
+    }
+    throw new IllegalArgumentException(
+        String.format("Unsupported OperatorSpec: %s", operatorSpec.getClass().getName()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java
index 41d1778..f92fbfb 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java
@@ -40,5 +40,7 @@ class SinkOperatorImpl<M> extends OperatorImpl<M, M> {
   @Override
   public void onNext(M message, MessageCollector collector, TaskCoordinator coordinator) {
     this.sinkFn.apply(message, collector, coordinator);
+    // there should be no further chained operators since this is a terminal operator.
+    // hence we don't call #propogateResult() here.
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java
index 5a125a2..18090e2 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java
@@ -25,8 +25,8 @@ import org.apache.samza.task.TaskContext;
 
 
 /**
- * A stateless serializable stream operator specification that holds all the information required
- * to transform the input {@link MessageStreamImpl} and produce the output {@link MessageStreamImpl}.
+ * A stream operator specification that holds all the information required to transform 
+ * the input {@link MessageStreamImpl} and produce the output {@link MessageStreamImpl}.
  *
  * @param <OM>  the type of output message from the operator
  */
@@ -45,16 +45,21 @@ public interface OperatorSpec<OM> {
     PARTITION_BY
   }
 
-
   /**
-   * Get the output stream containing transformed messages produced by this operator.
-   * @return  the output stream containing transformed messages produced by this operator.
+   * Get the next {@link MessageStreamImpl} that receives the transformed messages produced by this operator.
+   * @return  the next {@link MessageStreamImpl}
    */
   MessageStreamImpl<OM> getNextStream();
 
   /**
-   * Return the ID for this operator
-   * @return ID integer
+   * Get the {@link OpCode} for this operator.
+   * @return  the {@link OpCode} for this operator
+   */
+  OpCode getOpCode();
+
+  /**
+   * Get the unique ID of this operator in the {@link org.apache.samza.operators.StreamGraph}.
+   * @return  the unique operator ID
    */
   int getOpId();
 


[3/4] samza git commit: SAMZA-1094 SAMZA-1101 SAMZA-1159; Remove MessageEnvelope from public operator APIs. : Delay the creation of SinkFunction for output streams. : Move StreamSpec from a public API to an internal class.

Posted by ja...@apache.org.
http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java
index c00f470..e2c4b9a 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java
@@ -19,21 +19,20 @@
 
 package org.apache.samza.operators.spec;
 
-import java.util.Collection;
 import org.apache.samza.config.Config;
 import org.apache.samza.operators.MessageStreamImpl;
-import org.apache.samza.operators.OutputStream;
-import org.apache.samza.operators.StreamGraphImpl;
 import org.apache.samza.operators.functions.FilterFunction;
 import org.apache.samza.operators.functions.FlatMapFunction;
 import org.apache.samza.operators.functions.MapFunction;
 import org.apache.samza.operators.functions.PartialJoinFunction;
 import org.apache.samza.operators.functions.SinkFunction;
+import org.apache.samza.operators.stream.OutputStreamInternal;
 import org.apache.samza.operators.windows.WindowPane;
 import org.apache.samza.operators.windows.internal.WindowInternal;
+import org.apache.samza.task.TaskContext;
 
 import java.util.ArrayList;
-import org.apache.samza.task.TaskContext;
+import java.util.Collection;
 
 
 /**
@@ -47,13 +46,14 @@ public class OperatorSpecs {
    * Creates a {@link StreamOperatorSpec} for {@link MapFunction}
    *
    * @param mapFn  the map function
-   * @param graph  the {@link StreamGraphImpl} object
-   * @param output  the output {@link MessageStreamImpl} object
+   * @param nextStream  the output {@link MessageStreamImpl} to send messages to
+   * @param opId  the unique ID of the operator
    * @param <M>  type of input message
    * @param <OM>  type of output message
    * @return  the {@link StreamOperatorSpec}
    */
-  public static <M, OM> StreamOperatorSpec<M, OM> createMapOperatorSpec(MapFunction<M, OM> mapFn, StreamGraphImpl graph, MessageStreamImpl<OM> output) {
+  public static <M, OM> StreamOperatorSpec<M, OM> createMapOperatorSpec(
+      MapFunction<M, OM> mapFn, MessageStreamImpl<OM> nextStream, int opId) {
     return new StreamOperatorSpec<>(new FlatMapFunction<M, OM>() {
       @Override
       public Collection<OM> apply(M message) {
@@ -71,19 +71,20 @@ public class OperatorSpecs {
       public void init(Config config, TaskContext context) {
         mapFn.init(config, context);
       }
-    }, output, OperatorSpec.OpCode.MAP, graph.getNextOpId());
+    }, nextStream, OperatorSpec.OpCode.MAP, opId);
   }
 
   /**
    * Creates a {@link StreamOperatorSpec} for {@link FilterFunction}
    *
    * @param filterFn  the transformation function
-   * @param graph  the {@link StreamGraphImpl} object
-   * @param output  the output {@link MessageStreamImpl} object
+   * @param nextStream  the output {@link MessageStreamImpl} to send messages to
+   * @param opId  the unique ID of the operator
    * @param <M>  type of input message
    * @return  the {@link StreamOperatorSpec}
    */
-  public static <M> StreamOperatorSpec<M, M> createFilterOperatorSpec(FilterFunction<M> filterFn, StreamGraphImpl graph, MessageStreamImpl<M> output) {
+  public static <M> StreamOperatorSpec<M, M> createFilterOperatorSpec(
+      FilterFunction<M> filterFn, MessageStreamImpl<M> nextStream, int opId) {
     return new StreamOperatorSpec<>(new FlatMapFunction<M, M>() {
       @Override
       public Collection<M> apply(M message) {
@@ -100,77 +101,81 @@ public class OperatorSpecs {
       public void init(Config config, TaskContext context) {
         filterFn.init(config, context);
       }
-    }, output, OperatorSpec.OpCode.FILTER, graph.getNextOpId());
+    }, nextStream, OperatorSpec.OpCode.FILTER, opId);
   }
 
   /**
    * Creates a {@link StreamOperatorSpec}.
    *
    * @param transformFn  the transformation function
-   * @param graph  the {@link StreamGraphImpl} object
-   * @param output  the output {@link MessageStreamImpl} object
+   * @param nextStream  the output {@link MessageStreamImpl} to send messages to
+   * @param opId  the unique ID of the operator
    * @param <M>  type of input message
    * @param <OM>  type of output message
    * @return  the {@link StreamOperatorSpec}
    */
   public static <M, OM> StreamOperatorSpec<M, OM> createStreamOperatorSpec(
-      FlatMapFunction<M, OM> transformFn, StreamGraphImpl graph, MessageStreamImpl<OM> output) {
-    return new StreamOperatorSpec<>(transformFn, output, OperatorSpec.OpCode.FLAT_MAP, graph.getNextOpId());
+      FlatMapFunction<M, OM> transformFn, MessageStreamImpl<OM> nextStream, int opId) {
+    return new StreamOperatorSpec<>(transformFn, nextStream, OperatorSpec.OpCode.FLAT_MAP, opId);
   }
 
   /**
-   * Creates a {@link SinkOperatorSpec}.
+   * Creates a {@link SinkOperatorSpec} for the sink operator.
    *
-   * @param sinkFn  the sink function
+   * @param sinkFn  the sink function provided by the user
+   * @param opId  the unique ID of the operator
    * @param <M>  type of input message
-   * @param graph  the {@link StreamGraphImpl} object
-   * @return  the {@link SinkOperatorSpec}
+   * @return  the {@link SinkOperatorSpec} for the sink operator
    */
-  public static <M> SinkOperatorSpec<M> createSinkOperatorSpec(SinkFunction<M> sinkFn, StreamGraphImpl graph) {
-    return new SinkOperatorSpec<>(sinkFn, OperatorSpec.OpCode.SINK, graph.getNextOpId());
+  public static <M> SinkOperatorSpec<M> createSinkOperatorSpec(SinkFunction<M> sinkFn, int opId) {
+    return new SinkOperatorSpec<>(sinkFn, OperatorSpec.OpCode.SINK, opId);
   }
 
   /**
-   * Creates a {@link SinkOperatorSpec}.
+   * Creates a {@link SinkOperatorSpec} for the sendTo operator.
    *
-   * @param sinkFn  the sink function
-   * @param graph  the {@link StreamGraphImpl} object
-   * @param stream  the {@link OutputStream} where the message is sent to
-   * @param <M>  type of input message
-   * @return  the {@link SinkOperatorSpec}
+   * @param outputStream  the {@link OutputStreamInternal} to send messages to
+   * @param opId  the unique ID of the operator
+   * @param <K> the type of key in the outgoing message
+   * @param <V> the type of message in the outgoing message
+   * @param <M> the type of message in the {@link OutputStreamInternal}
+   * @return  the {@link SinkOperatorSpec} for the sendTo operator
    */
-  public static <M> SinkOperatorSpec<M> createSendToOperatorSpec(SinkFunction<M> sinkFn, StreamGraphImpl graph, OutputStream<M> stream) {
-    return new SinkOperatorSpec<>(sinkFn, OperatorSpec.OpCode.SEND_TO, graph.getNextOpId(), stream);
+  public static <K, V, M> SinkOperatorSpec<M> createSendToOperatorSpec(
+      OutputStreamInternal<K, V, M> outputStream, int opId) {
+    return new SinkOperatorSpec<>(outputStream, OperatorSpec.OpCode.SEND_TO, opId);
   }
 
   /**
-   * Creates a {@link SinkOperatorSpec}.
+   * Creates a {@link SinkOperatorSpec} for the partitionBy operator.
    *
-   * @param sinkFn  the sink function
-   * @param stream  the {@link OutputStream} where the message is sent to
-   * @param opId operator ID
-   * @param <M>  type of input message
-   * @return  the {@link SinkOperatorSpec}
+   * @param outputStream  the {@link OutputStreamInternal} to send messages to
+   * @param opId  the unique ID of the operator
+   * @param <K> the type of key in the outgoing message
+   * @param <V> the type of message in the outgoing message
+   * @param <M> the type of message in the {@link OutputStreamInternal}
+   * @return  the {@link SinkOperatorSpec} for the partitionBy operator
    */
-  public static <M> SinkOperatorSpec<M> createPartitionOperatorSpec(SinkFunction<M> sinkFn, OutputStream<M> stream, int opId) {
-    return new SinkOperatorSpec<>(sinkFn, OperatorSpec.OpCode.PARTITION_BY, opId, stream);
+  public static <K, V, M> SinkOperatorSpec<M> createPartitionByOperatorSpec(
+      OutputStreamInternal<K, V, M> outputStream, int opId) {
+    return new SinkOperatorSpec<>(outputStream, OperatorSpec.OpCode.PARTITION_BY, opId);
   }
 
   /**
    * Creates a {@link WindowOperatorSpec}.
    *
-   * @param window the description of the window.
-   * @param graph  the {@link StreamGraphImpl} object
-   * @param wndOutput  the window output {@link MessageStreamImpl} object
-   * @param <M> the type of input message
-   * @param <WK> the type of key in the {@link WindowPane}
-   * @param <WV> the type of value in the window
+   * @param window  the description of the window.
+   * @param nextStream  the output {@link MessageStreamImpl} to send messages to
+   * @param opId  the unique ID of the operator
+   * @param <M>  the type of input message
+   * @param <WK>  the type of key in the {@link WindowPane}
+   * @param <WV>  the type of value in the window
    * @return  the {@link WindowOperatorSpec}
    */
 
   public static <M, WK, WV> WindowOperatorSpec<M, WK, WV> createWindowOperatorSpec(
-      WindowInternal<M, WK, WV> window, StreamGraphImpl graph, MessageStreamImpl<WindowPane<WK, WV>> wndOutput) {
-    return new WindowOperatorSpec<>(window, wndOutput, graph.getNextOpId());
+      WindowInternal<M, WK, WV> window, MessageStreamImpl<WindowPane<WK, WV>> nextStream, int opId) {
+    return new WindowOperatorSpec<>(window, nextStream, opId);
   }
 
   /**
@@ -179,8 +184,8 @@ public class OperatorSpecs {
    * @param thisPartialJoinFn  the partial join function for this message stream
    * @param otherPartialJoinFn  the partial join function for the other message stream
    * @param ttlMs  the ttl in ms for retaining messages in each stream
-   * @param graph  the {@link StreamGraphImpl} object
-   * @param joinOutput  the output {@link MessageStreamImpl}
+   * @param nextStream  the output {@link MessageStreamImpl} to send messages to
+   * @param opId  the unique ID of the operator
    * @param <K>  the type of join key
    * @param <M>  the type of input message
    * @param <JM>  the type of message in the other join stream
@@ -189,25 +194,25 @@ public class OperatorSpecs {
    */
   public static <K, M, JM, RM> PartialJoinOperatorSpec<K, M, JM, RM> createPartialJoinOperatorSpec(
       PartialJoinFunction<K, M, JM, RM> thisPartialJoinFn, PartialJoinFunction<K, JM, M, RM> otherPartialJoinFn,
-      long ttlMs, StreamGraphImpl graph, MessageStreamImpl<RM> joinOutput) {
-    return new PartialJoinOperatorSpec<K, M, JM, RM>(thisPartialJoinFn, otherPartialJoinFn, ttlMs, joinOutput, graph.getNextOpId());
+      long ttlMs, MessageStreamImpl<RM> nextStream, int opId) {
+    return new PartialJoinOperatorSpec<K, M, JM, RM>(thisPartialJoinFn, otherPartialJoinFn, ttlMs, nextStream, opId);
   }
 
   /**
    * Creates a {@link StreamOperatorSpec} with a merger function.
    *
-   * @param graph  the {@link StreamGraphImpl} object
-   * @param mergeOutput  the output {@link MessageStreamImpl} from the merger
+   * @param nextStream  the output {@link MessageStreamImpl} to send messages to
+   * @param opId  the unique ID of the operator
    * @param <M>  the type of input message
    * @return  the {@link StreamOperatorSpec} for the merge
    */
-  public static <M> StreamOperatorSpec<M, M> createMergeOperatorSpec(StreamGraphImpl graph, MessageStreamImpl<M> mergeOutput) {
+  public static <M> StreamOperatorSpec<M, M> createMergeOperatorSpec(MessageStreamImpl<M> nextStream, int opId) {
     return new StreamOperatorSpec<M, M>(message ->
         new ArrayList<M>() {
           {
             this.add(message);
           }
         },
-        mergeOutput, OperatorSpec.OpCode.MERGE, graph.getNextOpId());
+        nextStream, OperatorSpec.OpCode.MERGE, opId);
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/main/java/org/apache/samza/operators/spec/PartialJoinOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/PartialJoinOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/PartialJoinOperatorSpec.java
index 669895f..b1dc529 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/PartialJoinOperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/PartialJoinOperatorSpec.java
@@ -35,11 +35,10 @@ import org.apache.samza.task.TaskContext;
  */
 public class PartialJoinOperatorSpec<K, M, JM, RM> implements OperatorSpec<RM> {
 
-
   private final PartialJoinFunction<K, M, JM, RM> thisPartialJoinFn;
   private final PartialJoinFunction<K, JM, M, RM> otherPartialJoinFn;
   private final long ttlMs;
-  private final MessageStreamImpl<RM> joinOutput;
+  private final MessageStreamImpl<RM> nextStream;
   private final int opId;
 
   /**
@@ -50,22 +49,22 @@ public class PartialJoinOperatorSpec<K, M, JM, RM> implements OperatorSpec<RM> {
    * @param otherPartialJoinFn  partial join function that provides state for input messages of type {@code JM}
    *                            in the other stream
    * @param ttlMs  the ttl in ms for retaining messages in each stream
-   * @param joinOutput  the output {@link MessageStreamImpl} of the join results
+   * @param nextStream  the output {@link MessageStreamImpl} containing the messages produced from this operator
    * @param opId  the unique ID for this operator
    */
   PartialJoinOperatorSpec(PartialJoinFunction<K, M, JM, RM> thisPartialJoinFn,
       PartialJoinFunction<K, JM, M, RM> otherPartialJoinFn, long ttlMs,
-      MessageStreamImpl<RM> joinOutput, int opId) {
+      MessageStreamImpl<RM> nextStream, int opId) {
     this.thisPartialJoinFn = thisPartialJoinFn;
     this.otherPartialJoinFn = otherPartialJoinFn;
     this.ttlMs = ttlMs;
-    this.joinOutput = joinOutput;
+    this.nextStream = nextStream;
     this.opId = opId;
   }
 
   @Override
   public MessageStreamImpl<RM> getNextStream() {
-    return this.joinOutput;
+    return this.nextStream;
   }
 
   public PartialJoinFunction<K, M, JM, RM> getThisPartialJoinFn() {
@@ -80,10 +79,12 @@ public class PartialJoinOperatorSpec<K, M, JM, RM> implements OperatorSpec<RM> {
     return ttlMs;
   }
 
+  @Override
   public OperatorSpec.OpCode getOpCode() {
     return OpCode.JOIN;
   }
 
+  @Override
   public int getOpId() {
     return this.opId;
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java
index ba30d67..7de85f3 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java
@@ -20,69 +20,54 @@ package org.apache.samza.operators.spec;
 
 import org.apache.samza.config.Config;
 import org.apache.samza.operators.MessageStreamImpl;
-import org.apache.samza.operators.OutputStream;
 import org.apache.samza.operators.functions.SinkFunction;
+import org.apache.samza.operators.stream.OutputStreamInternal;
+import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.task.MessageCollector;
 import org.apache.samza.task.TaskContext;
+import org.apache.samza.task.TaskCoordinator;
 
 
 /**
- * The spec for a sink operator that accepts user-defined logic to output a {@link MessageStreamImpl} to an external
- * system. This is a terminal operator and does allows further operator chaining.
+ * The spec for an operator that outputs a {@link MessageStreamImpl} to an external system.
+ * This is a terminal operator and does not allow further operator chaining.
  *
  * @param <M>  the type of input message
  */
 public class SinkOperatorSpec<M> implements OperatorSpec {
 
-  /**
-   * {@link OpCode} for this {@link SinkOperatorSpec}
-   */
+  private final SinkFunction<M> sinkFn;
+  private OutputStreamInternal<?, ?, M> outputStream; // may be null
   private final OperatorSpec.OpCode opCode;
-
-  /**
-   * The unique ID for this operator.
-   */
   private final int opId;
 
   /**
-   * The user-defined sink function
-   */
-  private final SinkFunction<M> sinkFn;
-
-  /**
-   * Potential output stream defined by the {@link SinkFunction}
-   */
-  private final OutputStream<M> outStream;
-
-  /**
-   * Default constructor for a {@link SinkOperatorSpec} w/o an output stream. (e.g. output is sent to remote database)
+   * Constructs a {@link SinkOperatorSpec} with a user defined {@link SinkFunction}.
    *
    * @param sinkFn  a user defined {@link SinkFunction} that will be called with the output message,
    *                the output {@link org.apache.samza.task.MessageCollector} and the
    *                {@link org.apache.samza.task.TaskCoordinator}.
-   * @param opCode  the specific {@link OpCode} for this {@link SinkOperatorSpec}. It could be {@link OpCode#SINK}, {@link OpCode#SEND_TO},
-   *                or {@link OpCode#PARTITION_BY}
-   * @param opId  the unique id of this {@link SinkOperatorSpec} in the {@link org.apache.samza.operators.StreamGraph}
+   * @param opCode  the specific {@link OpCode} for this {@link SinkOperatorSpec}.
+   *                It could be {@link OpCode#SINK}, {@link OpCode#SEND_TO}, or {@link OpCode#PARTITION_BY}.
+   * @param opId  the unique ID of this {@link OperatorSpec} in the graph
    */
   SinkOperatorSpec(SinkFunction<M> sinkFn, OperatorSpec.OpCode opCode, int opId) {
-    this(sinkFn, opCode, opId, null);
+    this.sinkFn = sinkFn;
+    this.opCode = opCode;
+    this.opId = opId;
   }
 
   /**
-   * Default constructor for a {@link SinkOperatorSpec} that sends the output to an {@link OutputStream}
-   *
-   * @param sinkFn  a user defined {@link SinkFunction} that will be called with the output message,
-   *                the output {@link org.apache.samza.task.MessageCollector} and the
-   *                {@link org.apache.samza.task.TaskCoordinator}.
-   * @param opCode  the specific {@link OpCode} for this {@link SinkOperatorSpec}. It could be {@link OpCode#SINK}, {@link OpCode#SEND_TO},
-   *                or {@link OpCode#PARTITION_BY}
-   * @param opId  the unique id of this {@link SinkOperatorSpec} in the {@link org.apache.samza.operators.StreamGraph}
-   * @param opId  the {@link OutputStream} for this {@link SinkOperatorSpec}
+   * Constructs a {@link SinkOperatorSpec} to send messages to the provided {@code outStream}
+   * @param outputStream  the {@link OutputStreamInternal} to send messages to
+   * @param opCode the specific {@link OpCode} for this {@link SinkOperatorSpec}.
+   *               It could be {@link OpCode#SINK}, {@link OpCode#SEND_TO}, or {@link OpCode#PARTITION_BY}
+   * @param opId  the unique ID of this {@link SinkOperatorSpec} in the graph
    */
-  SinkOperatorSpec(SinkFunction<M> sinkFn, OperatorSpec.OpCode opCode, int opId, OutputStream<M> outStream) {
-    this.sinkFn = sinkFn;
-    this.opCode = opCode;
-    this.opId = opId;
-    this.outStream = outStream;
+  SinkOperatorSpec(OutputStreamInternal<?, ?, M> outputStream, OperatorSpec.OpCode opCode, int opId) {
+    this(createSinkFn(outputStream), opCode, opId);
+    this.outputStream = outputStream;
   }
 
   /**
@@ -94,23 +79,47 @@ public class SinkOperatorSpec<M> implements OperatorSpec {
     return null;
   }
 
+  /**
+   * The {@link OutputStreamInternal} that this operator is sending its output to.
+   * @return the {@link OutputStreamInternal} for this operator if any, else null.
+   */
+  public OutputStreamInternal<?, ?, M> getOutputStream() {
+    return this.outputStream;
+  }
+
   public SinkFunction<M> getSinkFn() {
     return this.sinkFn;
   }
 
+  @Override
   public OperatorSpec.OpCode getOpCode() {
     return this.opCode;
   }
 
+  @Override
   public int getOpId() {
     return this.opId;
   }
 
-  public OutputStream<M> getOutStream() {
-    return this.outStream;
+  @Override
+  public void init(Config config, TaskContext context) {
+    this.sinkFn.init(config, context);
   }
 
-  @Override public void init(Config config, TaskContext context) {
-    this.sinkFn.init(config, context);
+  /**
+   * Creates a {@link SinkFunction} to send messages to the provided {@code output}.
+   * @param outputStream  the {@link OutputStreamInternal} to send messages to
+   * @param <M>  the type of input message
+   * @return  a {@link SinkFunction} that sends messages to the provided {@code output}
+   */
+  private static <M> SinkFunction<M> createSinkFn(OutputStreamInternal<?, ?, M> outputStream) {
+    return (M message, MessageCollector mc, TaskCoordinator tc) -> {
+      // TODO: SAMZA-1148 - need to find a way to directly pass in the serde class names
+      SystemStream systemStream = new SystemStream(outputStream.getStreamSpec().getSystemName(),
+          outputStream.getStreamSpec().getPhysicalName());
+      Object key = outputStream.getKeyExtractor().apply(message);
+      Object msg = outputStream.getMsgExtractor().apply(message);
+      mc.send(new OutgoingMessageEnvelope(systemStream, key, msg));
+    };
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java
index d7813f7..3c427c7 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java
@@ -32,54 +32,42 @@ import org.apache.samza.task.TaskContext;
  */
 public class StreamOperatorSpec<M, OM> implements OperatorSpec<OM> {
 
-  /**
-   * {@link OpCode} for this {@link StreamOperatorSpec}
-   */
+  private final FlatMapFunction<M, OM> transformFn;
+  private final MessageStreamImpl<OM> nextStream;
   private final OperatorSpec.OpCode opCode;
-
-  /**
-   * The unique ID for this operator.
-   */
   private final int opId;
 
   /**
-   * The output {@link MessageStreamImpl} from this {@link StreamOperatorSpec}
-   */
-  private final MessageStreamImpl<OM> outputStream;
-
-  /**
-   * Transformation function applied in this {@link StreamOperatorSpec}
-   */
-  private final FlatMapFunction<M, OM> transformFn;
-
-  /**
    * Constructor for a {@link StreamOperatorSpec} that accepts an output {@link MessageStreamImpl}.
    *
    * @param transformFn  the transformation function
-   * @param outputStream  the output {@link MessageStreamImpl}
+   * @param nextStream  the output {@link MessageStreamImpl} containing the messages produced from this operator
    * @param opCode  the {@link OpCode} for this {@link StreamOperatorSpec}
    * @param opId  the unique id for this {@link StreamOperatorSpec} in a {@link org.apache.samza.operators.StreamGraph}
    */
-  StreamOperatorSpec(FlatMapFunction<M, OM> transformFn, MessageStreamImpl outputStream, OperatorSpec.OpCode opCode, int opId) {
-    this.outputStream = outputStream;
+  StreamOperatorSpec(FlatMapFunction<M, OM> transformFn, MessageStreamImpl nextStream,
+      OperatorSpec.OpCode opCode, int opId) {
     this.transformFn = transformFn;
+    this.nextStream = nextStream;
     this.opCode = opCode;
     this.opId = opId;
   }
 
   @Override
   public MessageStreamImpl<OM> getNextStream() {
-    return this.outputStream;
+    return this.nextStream;
   }
 
   public FlatMapFunction<M, OM> getTransformFn() {
     return this.transformFn;
   }
 
+  @Override
   public OperatorSpec.OpCode getOpCode() {
     return this.opCode;
   }
 
+  @Override
   public int getOpId() {
     return this.opId;
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java
index 6d948d7..9515e38 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java
@@ -36,21 +36,18 @@ import org.apache.samza.task.TaskContext;
 public class WindowOperatorSpec<M, WK, WV> implements OperatorSpec<WindowPane<WK, WV>> {
 
   private final WindowInternal<M, WK, WV> window;
-
-  private final MessageStreamImpl<WindowPane<WK, WV>> outputStream;
-
+  private final MessageStreamImpl<WindowPane<WK, WV>> nextStream;
   private final int opId;
 
-
   /**
    * Constructor for {@link WindowOperatorSpec}.
    *
    * @param window  the window function
-   * @param outputStream  the output {@link MessageStreamImpl} from this {@link WindowOperatorSpec}
+   * @param nextStream  the output {@link MessageStreamImpl} containing the messages produced from this operator
    * @param opId  auto-generated unique ID of this operator
    */
-  WindowOperatorSpec(WindowInternal<M, WK, WV> window, MessageStreamImpl<WindowPane<WK, WV>> outputStream, int opId) {
-    this.outputStream = outputStream;
+  WindowOperatorSpec(WindowInternal<M, WK, WV> window, MessageStreamImpl<WindowPane<WK, WV>> nextStream, int opId) {
+    this.nextStream = nextStream;
     this.window = window;
     this.opId = opId;
   }
@@ -64,17 +61,19 @@ public class WindowOperatorSpec<M, WK, WV> implements OperatorSpec<WindowPane<WK
 
   @Override
   public MessageStreamImpl<WindowPane<WK, WV>> getNextStream() {
-    return this.outputStream;
+    return this.nextStream;
   }
 
   public WindowInternal<M, WK, WV> getWindow() {
     return window;
   }
 
+  @Override
   public OpCode getOpCode() {
     return OpCode.WINDOW;
   }
 
+  @Override
   public int getOpId() {
     return this.opId;
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/main/java/org/apache/samza/operators/stream/InputStreamInternal.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/stream/InputStreamInternal.java b/samza-core/src/main/java/org/apache/samza/operators/stream/InputStreamInternal.java
new file mode 100644
index 0000000..e67b326
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/operators/stream/InputStreamInternal.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.stream;
+
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.system.StreamSpec;
+
+import java.util.function.BiFunction;
+
+/**
+ * Internal representation of an input stream.
+ *
+ * @param <M> the type of messages in the input stream
+ */
+@InterfaceStability.Unstable
+public interface InputStreamInternal<K, V, M> extends MessageStream<M> {
+
+  StreamSpec getStreamSpec();
+
+  BiFunction<K, V, M> getMsgBuilder();
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/main/java/org/apache/samza/operators/stream/InputStreamInternalImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/stream/InputStreamInternalImpl.java b/samza-core/src/main/java/org/apache/samza/operators/stream/InputStreamInternalImpl.java
new file mode 100644
index 0000000..c4337d0
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/operators/stream/InputStreamInternalImpl.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.stream;
+
+import org.apache.samza.operators.MessageStreamImpl;
+import org.apache.samza.operators.StreamGraphImpl;
+import org.apache.samza.system.StreamSpec;
+
+import java.util.function.BiFunction;
+
+public class InputStreamInternalImpl<K, V, M> extends MessageStreamImpl<M> implements InputStreamInternal<K, V, M> {
+
+  private final StreamSpec streamSpec;
+  private final BiFunction<K, V, M> msgBuilder;
+
+  public InputStreamInternalImpl(StreamGraphImpl graph, StreamSpec streamSpec, BiFunction<K, V, M> msgBuilder) {
+    super(graph);
+    this.streamSpec = streamSpec;
+    this.msgBuilder = msgBuilder;
+  }
+
+  public StreamSpec getStreamSpec() {
+    return this.streamSpec;
+  }
+
+  public BiFunction<K, V, M> getMsgBuilder() {
+    return this.msgBuilder;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/main/java/org/apache/samza/operators/stream/IntermediateStreamInternalImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/stream/IntermediateStreamInternalImpl.java b/samza-core/src/main/java/org/apache/samza/operators/stream/IntermediateStreamInternalImpl.java
new file mode 100644
index 0000000..a1bee6a
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/operators/stream/IntermediateStreamInternalImpl.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.stream;
+
+import org.apache.samza.operators.MessageStreamImpl;
+import org.apache.samza.operators.StreamGraphImpl;
+import org.apache.samza.system.StreamSpec;
+
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+public class IntermediateStreamInternalImpl<K, V, M> extends MessageStreamImpl<M>
+    implements InputStreamInternal<K, V, M>, OutputStreamInternal<K, V, M> {
+
+  private final StreamSpec streamSpec;
+  private final Function<M, K> keyExtractor;
+  private final Function<M, V> msgExtractor;
+  private final BiFunction<K, V, M> msgBuilder;
+
+  public IntermediateStreamInternalImpl(StreamGraphImpl graph, StreamSpec streamSpec,
+      Function<M, K> keyExtractor, Function<M, V> msgExtractor, BiFunction<K, V, M> msgBuilder) {
+    super(graph);
+    this.streamSpec = streamSpec;
+    this.keyExtractor = keyExtractor;
+    this.msgExtractor = msgExtractor;
+    this.msgBuilder = msgBuilder;
+  }
+
+  public StreamSpec getStreamSpec() {
+    return this.streamSpec;
+  }
+
+  public Function<M, K> getKeyExtractor() {
+    return this.keyExtractor;
+  }
+
+  public Function<M, V> getMsgExtractor() {
+    return this.msgExtractor;
+  }
+
+  @Override
+  public BiFunction<K, V, M> getMsgBuilder() {
+    return this.msgBuilder;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/main/java/org/apache/samza/operators/stream/OutputStreamInternal.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/stream/OutputStreamInternal.java b/samza-core/src/main/java/org/apache/samza/operators/stream/OutputStreamInternal.java
new file mode 100644
index 0000000..48ce641
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/operators/stream/OutputStreamInternal.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.operators.stream;
+
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.operators.OutputStream;
+import org.apache.samza.system.StreamSpec;
+
+import java.util.function.Function;
+
+
+/**
+ * Internal representation of an output stream.
+ *
+ * @param <M> the type of messages in the output stream
+ */
+@InterfaceStability.Unstable
+public interface OutputStreamInternal<K, V, M> extends OutputStream<K, V, M> {
+
+  StreamSpec getStreamSpec();
+
+  Function<M, K> getKeyExtractor();
+
+  Function<M, V> getMsgExtractor();
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/main/java/org/apache/samza/operators/stream/OutputStreamInternalImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/stream/OutputStreamInternalImpl.java b/samza-core/src/main/java/org/apache/samza/operators/stream/OutputStreamInternalImpl.java
new file mode 100644
index 0000000..a2d0cca
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/operators/stream/OutputStreamInternalImpl.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.stream;
+
+import org.apache.samza.operators.MessageStreamImpl;
+import org.apache.samza.operators.StreamGraphImpl;
+import org.apache.samza.system.StreamSpec;
+
+import java.util.function.Function;
+
+public class OutputStreamInternalImpl<K, V, M> extends MessageStreamImpl<M> implements OutputStreamInternal<K, V, M> {
+
+  private final StreamSpec streamSpec;
+  private final Function<M, K> keyExtractor;
+  private final Function<M, V> msgExtractor;
+
+  public OutputStreamInternalImpl(StreamGraphImpl graph, StreamSpec streamSpec,
+      Function<M, K> keyExtractor, Function<M, V> msgExtractor) {
+    super(graph);
+    this.streamSpec = streamSpec;
+    this.keyExtractor = keyExtractor;
+    this.msgExtractor = msgExtractor;
+  }
+
+  public StreamSpec getStreamSpec() {
+    return this.streamSpec;
+  }
+
+  public Function<M, K> getKeyExtractor() {
+    return this.keyExtractor;
+  }
+
+  public Function<M, V> getMsgExtractor() {
+    return this.msgExtractor;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java
index cd6c492..4cb0d28 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java
@@ -31,7 +31,6 @@ import org.apache.samza.execution.JobNode;
 import org.apache.samza.execution.StreamManager;
 import org.apache.samza.job.ApplicationStatus;
 import org.apache.samza.job.JobRunner;
-import org.apache.samza.operators.StreamGraph;
 import org.apache.samza.operators.StreamGraphImpl;
 import org.apache.samza.system.StreamSpec;
 import org.slf4j.Logger;
@@ -135,7 +134,7 @@ public class RemoteApplicationRunner extends AbstractApplicationRunner {
 
   private JobGraph getExecutionPlan(StreamApplication app) throws Exception {
     // build stream graph
-    StreamGraph streamGraph = new StreamGraphImpl(this, config);
+    StreamGraphImpl streamGraph = new StreamGraphImpl(this, config);
     app.init(streamGraph, config);
 
     // create the physical execution plan

http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java b/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
index d4224c3..73bb53f 100644
--- a/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
+++ b/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
@@ -18,113 +18,114 @@
  */
 package org.apache.samza.task;
 
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-
+import org.apache.samza.application.StreamApplication;
 import org.apache.samza.config.Config;
 import org.apache.samza.operators.ContextManager;
-import org.apache.samza.operators.MessageStreamImpl;
-import org.apache.samza.application.StreamApplication;
 import org.apache.samza.operators.StreamGraphImpl;
-import org.apache.samza.operators.data.InputMessageEnvelope;
-import org.apache.samza.operators.impl.OperatorGraph;
+import org.apache.samza.operators.impl.OperatorImplGraph;
+import org.apache.samza.operators.stream.InputStreamInternal;
 import org.apache.samza.runtime.ApplicationRunner;
 import org.apache.samza.system.IncomingMessageEnvelope;
 import org.apache.samza.system.SystemStream;
-import org.apache.samza.system.SystemStreamPartition;
 import org.apache.samza.util.Clock;
 import org.apache.samza.util.SystemClock;
 
+import java.util.HashMap;
+import java.util.Map;
+
 
 /**
- * Execution of the logic sub-DAG
- *
- *
- * An {@link StreamTask} implementation that receives {@link InputMessageEnvelope}s and propagates them
- * through the user's stream transformations defined in {@link StreamGraphImpl} using the
- * {@link org.apache.samza.operators.MessageStream} APIs.
- * <p>
- * This class brings all the operator API implementation components together and feeds the
- * {@link InputMessageEnvelope}s into the transformation chains.
- * <p>
- * It accepts an instance of the user implemented factory {@link StreamApplication} as input parameter of the constructor.
- * When its own {@link #init(Config, TaskContext)} method is called during startup, it instantiate a user-defined {@link StreamGraphImpl}
- * from the {@link StreamApplication}, calls {@link StreamGraphImpl#getContextManager()} to initialize the task-wide context
- * for the graph, and creates a {@link MessageStreamImpl} corresponding to each of its input
- * {@link org.apache.samza.system.SystemStreamPartition}s. Each input {@link MessageStreamImpl}
- * will be corresponding to either an input stream or intermediate stream in {@link StreamGraphImpl}.
- * <p>
- * Then, this task calls {@link org.apache.samza.operators.impl.OperatorGraph#init(Map, Config, TaskContext)} for each of the input
- * {@link MessageStreamImpl}. This instantiates the {@link org.apache.samza.operators.impl.OperatorImpl} DAG
- * corresponding to the aforementioned {@link org.apache.samza.operators.spec.OperatorSpec} DAG and returns the
- * root node of the DAG, which this class saves.
- * <p>
- * Now that it has the root for the DAG corresponding to each {@link org.apache.samza.system.SystemStreamPartition}, it
- * can pass the message envelopes received in {@link StreamTask#process(IncomingMessageEnvelope, MessageCollector, TaskCoordinator)}
- * along to the appropriate root nodes. From then on, each {@link org.apache.samza.operators.impl.OperatorImpl} propagates
- * its transformed output to the next set of {@link org.apache.samza.operators.impl.OperatorImpl}s.
+ * A {@link StreamTask} implementation that brings all the operator API implementation components together and
+ * feeds the input messages into the user-defined transformation chains in {@link StreamApplication}.
  */
 public final class StreamOperatorTask implements StreamTask, InitableTask, WindowableTask, ClosableTask {
 
-  /**
-   * A mapping from each {@link SystemStream} to the root node of its operator chain DAG.
-   */
-  private final OperatorGraph operatorGraph;
-
-  private final StreamApplication graphBuilder;
-
+  private final StreamApplication streamApplication;
   private final ApplicationRunner runner;
-
   private final Clock clock;
 
+  private OperatorImplGraph operatorImplGraph;
   private ContextManager contextManager;
+  private Map<SystemStream, InputStreamInternal> inputSystemStreamToInputStream;
 
-  private Set<SystemStreamPartition> systemStreamPartitions;
-
-  public StreamOperatorTask(StreamApplication graphBuilder, ApplicationRunner runner) {
-    this(graphBuilder, SystemClock.instance(), runner);
+  /**
+   * Constructs an adaptor task to run the user-implemented {@link StreamApplication}.
+   * @param streamApplication the user-implemented {@link StreamApplication} that creates the logical DAG
+   * @param runner the {@link ApplicationRunner} to get the mapping between logical and physical streams
+   * @param clock the {@link Clock} to use for time-keeping
+   */
+  public StreamOperatorTask(StreamApplication streamApplication, ApplicationRunner runner, Clock clock) {
+    this.streamApplication = streamApplication;
+    this.runner = runner;
+    this.clock = clock;
   }
 
-  // purely for testing.
-  public StreamOperatorTask(StreamApplication graphBuilder, Clock clock, ApplicationRunner runner) {
-    this.graphBuilder = graphBuilder;
-    this.operatorGraph = new OperatorGraph(clock);
-    this.clock = clock;
-    this.runner = runner;
+  public StreamOperatorTask(StreamApplication application, ApplicationRunner runner) {
+    this(application, runner, SystemClock.instance());
   }
 
+  /**
+   * Initializes this task during startup.
+   * <p>
+   * Implementation: Initializes the user-implemented {@link StreamApplication}. The {@link StreamApplication} sets
+   * the input and output streams and the task-wide context manager using the {@link StreamGraphImpl} APIs,
+   * and the logical transforms using the {@link org.apache.samza.operators.MessageStream} APIs.
+   *<p>
+   * It then uses the {@link StreamGraphImpl} to create the {@link OperatorImplGraph} corresponding to the logical
+   * DAG. It also saves the mapping between input {@link SystemStream}s and their corresponding
+   * {@link InputStreamInternal}s for delivering incoming messages to the appropriate sub-DAG.
+   *
+   * @param config allows accessing of fields in the configuration files that this StreamTask is specified in
+   * @param context allows initializing and accessing contextual data of this StreamTask
+   * @throws Exception in case of initialization errors
+   */
   @Override
   public final void init(Config config, TaskContext context) throws Exception {
-    // create the MessageStreamsImpl object and initialize app-specific logic DAG within the task
-    StreamGraphImpl streamGraph = new StreamGraphImpl(this.runner, config);
-    this.graphBuilder.init(streamGraph, config);
-    // get the context manager of the {@link StreamGraph} and initialize the task-specific context
-    this.contextManager = streamGraph.getContextManager();
-    this.systemStreamPartitions = context.getSystemStreamPartitions();
+    StreamGraphImpl streamGraph = new StreamGraphImpl(runner, config);
+    // initialize the user-implemented stream application.
+    this.streamApplication.init(streamGraph, config);
 
-    Map<SystemStream, MessageStreamImpl> inputBySystemStream = new HashMap<>();
-    systemStreamPartitions.forEach(ssp -> {
-        if (!inputBySystemStream.containsKey(ssp.getSystemStream())) {
-          // create mapping from the physical input {@link SystemStream} to the logic {@link MessageStream}
-          inputBySystemStream.putIfAbsent(ssp.getSystemStream(), streamGraph.getInputStream(ssp.getSystemStream()));
-        }
+    // get the user-implemented context manager and initialize the task-specific context.
+    this.contextManager = streamGraph.getContextManager();
+    TaskContext initializedTaskContext = this.contextManager.initTaskContext(config, context);
+
+    // create the operator impl DAG corresponding to the logical operator spec DAG
+    OperatorImplGraph operatorImplGraph = new OperatorImplGraph(clock);
+    operatorImplGraph.init(streamGraph, config, initializedTaskContext);
+    this.operatorImplGraph = operatorImplGraph;
+
+    // TODO: SAMZA-1118 - Remove mapping after SystemConsumer starts returning logical streamId with incoming messages
+    inputSystemStreamToInputStream = new HashMap<>();
+    streamGraph.getInputStreams().forEach((streamSpec, inputStream)-> {
+        SystemStream systemStream = new SystemStream(streamSpec.getSystemName(), streamSpec.getPhysicalName());
+        inputSystemStreamToInputStream.put(systemStream, inputStream);
       });
-    operatorGraph.init(inputBySystemStream, config, this.contextManager.initTaskContext(config, context));
   }
 
+  /**
+   * Passes the incoming message envelopes along to the {@link org.apache.samza.operators.impl.RootOperatorImpl} node
+   * for the input {@link SystemStream}.
+   * <p>
+   * From then on, each {@link org.apache.samza.operators.impl.OperatorImpl} propagates its transformed output to
+   * its chained {@link org.apache.samza.operators.impl.OperatorImpl}s itself.
+   *
+   * @param ime incoming message envelope to process
+   * @param collector the collector to send messages with
+   * @param coordinator the coordinator to request commits or shutdown
+   */
   @Override
   public final void process(IncomingMessageEnvelope ime, MessageCollector collector, TaskCoordinator coordinator) {
-    this.operatorGraph.get(ime.getSystemStreamPartition().getSystemStream())
-        .onNext(new InputMessageEnvelope(ime), collector, coordinator);
+    SystemStream systemStream = ime.getSystemStreamPartition().getSystemStream();
+    InputStreamInternal inputStream = inputSystemStreamToInputStream.get(systemStream);
+    // TODO: SAMZA-1148 - Cast to appropriate input (key, msg) types based on the serde before applying the msgBuilder.
+    operatorImplGraph.getRootOperator(systemStream)
+        .onNext(inputStream.getMsgBuilder().apply(ime.getKey(), ime.getMessage()), collector, coordinator);
   }
 
   @Override
   public final void window(MessageCollector collector, TaskCoordinator coordinator)  {
-    systemStreamPartitions.forEach(ssp -> {
-        this.operatorGraph.get(ssp.getSystemStream())
-          .onTick(collector, coordinator);
-      });
+    operatorImplGraph.getAllRootOperators()
+        .forEach(rootOperator -> rootOperator.onTick(collector, coordinator));
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/test/java/org/apache/samza/example/BroadcastExample.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/BroadcastExample.java b/samza-core/src/test/java/org/apache/samza/example/BroadcastExample.java
new file mode 100644
index 0000000..a09247a
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/example/BroadcastExample.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.example;
+
+import org.apache.samza.application.StreamApplication;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.OutputStream;
+import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.runtime.ApplicationRunner;
+import org.apache.samza.util.CommandLine;
+
+
+/**
+ * Example implementation of a task that splits its input into multiple output streams.
+ */
+public class BroadcastExample implements StreamApplication {
+
+  @Override
+  public void init(StreamGraph graph, Config config) {
+    MessageStream<PageViewEvent> inputStream =
+        graph.getInputStream("inputStream", (k, m) -> (PageViewEvent) m);
+    OutputStream<String, PageViewEvent, PageViewEvent> outputStream1 =
+        graph.getOutputStream("outputStream1", m -> m.key, m -> m);
+    OutputStream<String, PageViewEvent, PageViewEvent> outputStream2 =
+        graph.getOutputStream("outputStream2", m -> m.key, m -> m);
+    OutputStream<String, PageViewEvent, PageViewEvent> outputStream3 =
+        graph.getOutputStream("outputStream3", m -> m.key, m -> m);
+
+    inputStream.filter(m -> m.key.equals("key1")).sendTo(outputStream1);
+    inputStream.filter(m -> m.key.equals("key2")).sendTo(outputStream2);
+    inputStream.filter(m -> m.key.equals("key3")).sendTo(outputStream3);
+  }
+
+  // local execution mode
+  public static void main(String[] args) throws Exception {
+    CommandLine cmdLine = new CommandLine();
+    Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
+    ApplicationRunner localRunner = ApplicationRunner.getLocalRunner(config);
+    localRunner.run(new BroadcastExample());
+  }
+
+  class PageViewEvent {
+    String key;
+    long timestamp;
+
+    public PageViewEvent(String key, long timestamp) {
+      this.key = key;
+      this.timestamp = timestamp;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/test/java/org/apache/samza/example/KeyValueStoreExample.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/KeyValueStoreExample.java b/samza-core/src/test/java/org/apache/samza/example/KeyValueStoreExample.java
index 5898f1f..6b913c4 100644
--- a/samza-core/src/test/java/org/apache/samza/example/KeyValueStoreExample.java
+++ b/samza-core/src/test/java/org/apache/samza/example/KeyValueStoreExample.java
@@ -18,55 +18,42 @@
  */
 package org.apache.samza.example;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
 
-import org.apache.samza.operators.*;
 import org.apache.samza.application.StreamApplication;
 import org.apache.samza.config.Config;
-import org.apache.samza.operators.data.MessageEnvelope;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.OutputStream;
+import org.apache.samza.operators.StreamGraph;
 import org.apache.samza.operators.functions.FlatMapFunction;
-import org.apache.samza.serializers.JsonSerde;
-import org.apache.samza.serializers.StringSerde;
-import org.apache.samza.storage.kv.KeyValueStore;
 import org.apache.samza.runtime.ApplicationRunner;
-import org.apache.samza.system.StreamSpec;
+import org.apache.samza.storage.kv.KeyValueStore;
 import org.apache.samza.task.TaskContext;
 import org.apache.samza.util.CommandLine;
 
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
 
 /**
  * Example code using {@link KeyValueStore} to implement event-time window
  */
 public class KeyValueStoreExample implements StreamApplication {
 
-  /**
-   * used by remote application runner to launch the job in remote program. The remote program should follow the similar
-   * invoking context as in local runner:
-   *
-   *   public static void main(String args[]) throws Exception {
-   *     CommandLine cmdLine = new CommandLine();
-   *     Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
-   *     ApplicationRunner runner = ApplicationRunner.fromConfig(config);
-   *     runner.run(new UserMainExample(), config)
-   *   }
-   *
-   */
   @Override public void init(StreamGraph graph, Config config) {
-
-    MessageStream<PageViewEvent> pageViewEvents = graph.createInStream(input1, new StringSerde("UTF-8"), new JsonSerde<>());
-    OutputStream<StatsOutput> pageViewPerMemberCounters = graph.createOutStream(output, new StringSerde("UTF-8"), new JsonSerde<StatsOutput>());
+    MessageStream<PageViewEvent> pageViewEvents = graph.getInputStream(
+        "pageViewEventStream", (k, v) -> (PageViewEvent) v);
+    OutputStream<String, StatsOutput, StatsOutput> pageViewEventPerMemberStream = graph.getOutputStream(
+        "pageViewEventPerMemberStream", statsOutput -> statsOutput.memberId, statsOutput -> statsOutput);
 
     pageViewEvents.
-        partitionBy(m -> m.getMessage().memberId).
+        partitionBy(m -> m.memberId).
         flatMap(new MyStatsCounter()).
-        sendTo(pageViewPerMemberCounters);
-
+        sendTo(pageViewEventPerMemberStream);
   }
 
-  // local program model
+  // local execution mode
   public static void main(String[] args) throws Exception {
     CommandLine cmdLine = new CommandLine();
     Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
@@ -88,8 +75,8 @@ public class KeyValueStoreExample implements StreamApplication {
     @Override
     public Collection<StatsOutput> apply(PageViewEvent message) {
       List<StatsOutput> outputStats = new ArrayList<>();
-      long wndTimestamp = (long) Math.floor(TimeUnit.MILLISECONDS.toMinutes(message.getMessage().timestamp) / 5) * 5;
-      String wndKey = String.format("%s-%d", message.getMessage().memberId, wndTimestamp);
+      long wndTimestamp = (long) Math.floor(TimeUnit.MILLISECONDS.toMinutes(message.timestamp) / 5) * 5;
+      String wndKey = String.format("%s-%d", message.memberId, wndTimestamp);
       StatsWindowState curState = this.statsStore.get(wndKey);
       curState.newCount++;
       long curTimeMs = System.currentTimeMillis();
@@ -97,7 +84,7 @@ public class KeyValueStoreExample implements StreamApplication {
         curState.timeAtLastOutput = curTimeMs;
         curState.lastCount += curState.newCount;
         curState.newCount = 0;
-        outputStats.add(new StatsOutput(message.getMessage().memberId, wndTimestamp, curState.lastCount));
+        outputStats.add(new StatsOutput(message.memberId, wndTimestamp, curState.lastCount));
       }
       // update counter w/o generating output
       this.statsStore.put(wndKey, curState);
@@ -110,11 +97,7 @@ public class KeyValueStoreExample implements StreamApplication {
     }
   }
 
-  StreamSpec input1 = new StreamSpec("pageViewEventStream", "PageViewEvent", "kafka");
-
-  StreamSpec output = new StreamSpec("pageViewEventPerMemberStream", "PageViewEventCountByMemberId", "kafka");
-
-  class PageViewEvent implements MessageEnvelope<String, PageViewEvent> {
+  class PageViewEvent {
     String pageId;
     String memberId;
     long timestamp;
@@ -124,19 +107,9 @@ public class KeyValueStoreExample implements StreamApplication {
       this.memberId = memberId;
       this.timestamp = timestamp;
     }
-
-    @Override
-    public String getKey() {
-      return this.pageId;
-    }
-
-    @Override
-    public PageViewEvent getMessage() {
-      return this;
-    }
   }
 
-  class StatsOutput implements MessageEnvelope<String, StatsOutput> {
+  class StatsOutput {
     private String memberId;
     private long timestamp;
     private Integer count;
@@ -146,16 +119,5 @@ public class KeyValueStoreExample implements StreamApplication {
       this.timestamp = timestamp;
       this.count = count;
     }
-
-    @Override
-    public String getKey() {
-      return this.memberId;
-    }
-
-    @Override
-    public StatsOutput getMessage() {
-      return this;
-    }
   }
-
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/test/java/org/apache/samza/example/NoContextStreamExample.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/NoContextStreamExample.java b/samza-core/src/test/java/org/apache/samza/example/NoContextStreamExample.java
deleted file mode 100644
index 516daf5..0000000
--- a/samza-core/src/test/java/org/apache/samza/example/NoContextStreamExample.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.example;
-
-import org.apache.samza.operators.*;
-import org.apache.samza.application.StreamApplication;
-import org.apache.samza.config.Config;
-import org.apache.samza.operators.data.InputMessageEnvelope;
-import org.apache.samza.operators.data.JsonIncomingSystemMessageEnvelope;
-import org.apache.samza.operators.data.Offset;
-import org.apache.samza.operators.functions.JoinFunction;
-import org.apache.samza.serializers.JsonSerde;
-import org.apache.samza.serializers.StringSerde;
-import org.apache.samza.runtime.ApplicationRunner;
-import org.apache.samza.system.StreamSpec;
-import org.apache.samza.system.SystemStreamPartition;
-import org.apache.samza.util.CommandLine;
-
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.List;
-
-
-/**
- * Example {@link StreamApplication} code to test the API methods
- */
-public class NoContextStreamExample implements StreamApplication {
-
-  StreamSpec input1 = new StreamSpec("inputStreamA", "PageViewEvent", "kafka");
-
-  StreamSpec input2 = new StreamSpec("inputStreamB", "RumLixEvent", "kafka");
-
-  StreamSpec output = new StreamSpec("joinedPageViewStream", "PageViewJoinRumLix", "kafka");
-
-  class MessageType {
-    String joinKey;
-    List<String> joinFields = new ArrayList<>();
-  }
-
-  class JsonMessageEnvelope extends JsonIncomingSystemMessageEnvelope<MessageType> {
-    JsonMessageEnvelope(String key, MessageType data, Offset offset, SystemStreamPartition partition) {
-      super(key, data, offset, partition);
-    }
-  }
-
-  private JsonMessageEnvelope getInputMessage(InputMessageEnvelope ism) {
-    return new JsonMessageEnvelope(
-        ((MessageType) ism.getMessage()).joinKey,
-        (MessageType) ism.getMessage(),
-        ism.getOffset(),
-        ism.getSystemStreamPartition());
-  }
-
-  class MyJoinFunction implements JoinFunction<String, JsonMessageEnvelope, JsonMessageEnvelope, JsonIncomingSystemMessageEnvelope<MessageType>> {
-
-    @Override
-    public JsonIncomingSystemMessageEnvelope<MessageType> apply(JsonMessageEnvelope m1,
-        JsonMessageEnvelope m2) {
-      MessageType newJoinMsg = new MessageType();
-      newJoinMsg.joinKey = m1.getKey();
-      newJoinMsg.joinFields.addAll(m1.getMessage().joinFields);
-      newJoinMsg.joinFields.addAll(m2.getMessage().joinFields);
-      return new JsonMessageEnvelope(m1.getMessage().joinKey, newJoinMsg, null, null);
-    }
-
-    @Override
-    public String getFirstKey(JsonMessageEnvelope message) {
-      return message.getKey();
-    }
-
-    @Override
-    public String getSecondKey(JsonMessageEnvelope message) {
-      return message.getKey();
-    }
-  }
-
-
-  /**
-   * used by remote application runner to launch the job in remote program. The remote program should follow the similar
-   * invoking context as in local:
-   *
-   *   public static void main(String args[]) throws Exception {
-   *     CommandLine cmdLine = new CommandLine();
-   *     Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
-   *     ApplicationRunner runner = ApplicationRunner.fromConfig(config);
-   *     runner.run(new NoContextStreamExample(), config);
-   *   }
-   *
-   */
-  @Override public void init(StreamGraph graph, Config config) {
-    MessageStream<InputMessageEnvelope> inputSource1 = graph.<Object, Object, InputMessageEnvelope>createInStream(
-        input1, null, null);
-    MessageStream<InputMessageEnvelope> inputSource2 = graph.<Object, Object, InputMessageEnvelope>createInStream(
-        input2, null, null);
-    OutputStream<JsonIncomingSystemMessageEnvelope<MessageType>> outStream = graph.createOutStream(output,
-        new StringSerde("UTF-8"), new JsonSerde<>());
-
-    inputSource1.map(this::getInputMessage).
-        join(inputSource2.map(this::getInputMessage), new MyJoinFunction(), Duration.ofMinutes(1)).
-        sendTo(outStream);
-
-  }
-
-  // standalone local program model
-  public static void main(String[] args) throws Exception {
-    CommandLine cmdLine = new CommandLine();
-    Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
-    ApplicationRunner localRunner = ApplicationRunner.getLocalRunner(config);
-    localRunner.run(new NoContextStreamExample());
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java b/samza-core/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java
index a338f6b..80d0e16 100644
--- a/samza-core/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java
+++ b/samza-core/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java
@@ -18,17 +18,13 @@
  */
 package org.apache.samza.example;
 
+import org.apache.samza.application.StreamApplication;
 import org.apache.samza.config.Config;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.OutputStream;
-import org.apache.samza.application.StreamApplication;
 import org.apache.samza.operators.StreamGraph;
-import org.apache.samza.operators.data.MessageEnvelope;
 import org.apache.samza.operators.functions.JoinFunction;
-import org.apache.samza.serializers.JsonSerde;
-import org.apache.samza.serializers.StringSerde;
 import org.apache.samza.runtime.ApplicationRunner;
-import org.apache.samza.system.StreamSpec;
 import org.apache.samza.util.CommandLine;
 
 import java.time.Duration;
@@ -38,29 +34,19 @@ import java.time.Duration;
  */
 public class OrderShipmentJoinExample implements StreamApplication {
 
-  /**
-   * used by remote application runner to launch the job in remote program. The remote program should follow the similar
-   * invoking context as in local runner:
-   *
-   *   public static void main(String args[]) throws Exception {
-   *     CommandLine cmdLine = new CommandLine();
-   *     Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
-   *     ApplicationRunner runner = ApplicationRunner.fromConfig(config);
-   *     runner.run(new UserMainExample(), config);
-   *   }
-   *
-   */
-  @Override public void init(StreamGraph graph, Config config) {
-
-    MessageStream<OrderRecord> orders = graph.createInStream(input1, new StringSerde("UTF-8"), new JsonSerde<>());
-    MessageStream<ShipmentRecord> shipments = graph.createInStream(input2, new StringSerde("UTF-8"), new JsonSerde<>());
-    OutputStream<FulFilledOrderRecord> fulfilledOrders = graph.createOutStream(output, new StringSerde("UTF-8"), new JsonSerde<>());
-
-    orders.join(shipments, new MyJoinFunction(), Duration.ofMinutes(1)).sendTo(fulfilledOrders);
+  @Override
+  public void init(StreamGraph graph, Config config) {
+    MessageStream<OrderRecord> orders = graph.getInputStream("orderStream", (k, m) -> (OrderRecord) m);
+    MessageStream<ShipmentRecord> shipments = graph.getInputStream("shipmentStream", (k, m) -> (ShipmentRecord) m);
+    OutputStream<String, FulFilledOrderRecord, FulFilledOrderRecord> joinedOrderShipmentStream =
+        graph.getOutputStream("joinedOrderShipmentStream", m -> m.orderId, m -> m);
 
+    orders
+        .join(shipments, new MyJoinFunction(), Duration.ofMinutes(1))
+        .sendTo(joinedOrderShipmentStream);
   }
 
-  // standalone local program model
+  // local execution mode
   public static void main(String[] args) throws Exception {
     CommandLine cmdLine = new CommandLine();
     Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
@@ -68,13 +54,24 @@ public class OrderShipmentJoinExample implements StreamApplication {
     localRunner.run(new OrderShipmentJoinExample());
   }
 
-  StreamSpec input1 = new StreamSpec("orderStream", "OrderEvent", "kafka");
+  class MyJoinFunction implements JoinFunction<String, OrderRecord, ShipmentRecord, FulFilledOrderRecord> {
+    @Override
+    public FulFilledOrderRecord apply(OrderRecord message, ShipmentRecord otherMessage) {
+      return new FulFilledOrderRecord(message.orderId, message.orderTimeMs, otherMessage.shipTimeMs);
+    }
 
-  StreamSpec input2 = new StreamSpec("shipmentStream", "ShipmentEvent", "kafka");
+    @Override
+    public String getFirstKey(OrderRecord message) {
+      return message.orderId;
+    }
 
-  StreamSpec output = new StreamSpec("joinedOrderShipmentStream", "OrderShipmentJoinEvent", "kafka");
+    @Override
+    public String getSecondKey(ShipmentRecord message) {
+      return message.orderId;
+    }
+  }
 
-  class OrderRecord implements MessageEnvelope<String, OrderRecord> {
+  class OrderRecord {
     String orderId;
     long orderTimeMs;
 
@@ -82,19 +79,9 @@ public class OrderShipmentJoinExample implements StreamApplication {
       this.orderId = orderId;
       this.orderTimeMs = timeMs;
     }
-
-    @Override
-    public String getKey() {
-      return this.orderId;
-    }
-
-    @Override
-    public OrderRecord getMessage() {
-      return this;
-    }
   }
 
-  class ShipmentRecord implements MessageEnvelope<String, ShipmentRecord> {
+  class ShipmentRecord {
     String orderId;
     long shipTimeMs;
 
@@ -102,19 +89,9 @@ public class OrderShipmentJoinExample implements StreamApplication {
       this.orderId = orderId;
       this.shipTimeMs = timeMs;
     }
-
-    @Override
-    public String getKey() {
-      return this.orderId;
-    }
-
-    @Override
-    public ShipmentRecord getMessage() {
-      return this;
-    }
   }
 
-  class FulFilledOrderRecord implements MessageEnvelope<String, FulFilledOrderRecord> {
+  class FulFilledOrderRecord {
     String orderId;
     long orderTimeMs;
     long shipTimeMs;
@@ -124,38 +101,5 @@ public class OrderShipmentJoinExample implements StreamApplication {
       this.orderTimeMs = orderTimeMs;
       this.shipTimeMs = shipTimeMs;
     }
-
-
-    @Override
-    public String getKey() {
-      return this.orderId;
-    }
-
-    @Override
-    public FulFilledOrderRecord getMessage() {
-      return this;
-    }
-  }
-
-  FulFilledOrderRecord myJoinResult(OrderRecord m1, ShipmentRecord m2) {
-    return new FulFilledOrderRecord(m1.getKey(), m1.orderTimeMs, m2.shipTimeMs);
-  }
-
-  class MyJoinFunction implements JoinFunction<String, OrderRecord, ShipmentRecord, FulFilledOrderRecord> {
-
-    @Override
-    public FulFilledOrderRecord apply(OrderRecord message, ShipmentRecord otherMessage) {
-      return OrderShipmentJoinExample.this.myJoinResult(message, otherMessage);
-    }
-
-    @Override
-    public String getFirstKey(OrderRecord message) {
-      return message.getKey();
-    }
-
-    @Override
-    public String getSecondKey(ShipmentRecord message) {
-      return message.getKey();
-    }
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java b/samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java
index 6edf048..547cac6 100644
--- a/samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java
+++ b/samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java
@@ -18,18 +18,17 @@
  */
 package org.apache.samza.example;
 
-import org.apache.samza.operators.*;
 import org.apache.samza.application.StreamApplication;
 import org.apache.samza.config.Config;
-import org.apache.samza.operators.data.MessageEnvelope;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.OutputStream;
+import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.operators.functions.FoldLeftFunction;
 import org.apache.samza.operators.triggers.Triggers;
 import org.apache.samza.operators.windows.AccumulationMode;
 import org.apache.samza.operators.windows.WindowPane;
 import org.apache.samza.operators.windows.Windows;
-import org.apache.samza.serializers.JsonSerde;
-import org.apache.samza.serializers.StringSerde;
 import org.apache.samza.runtime.ApplicationRunner;
-import org.apache.samza.system.StreamSpec;
 import org.apache.samza.util.CommandLine;
 
 import java.time.Duration;
@@ -42,20 +41,22 @@ import java.util.function.Supplier;
 public class PageViewCounterExample implements StreamApplication {
 
   @Override public void init(StreamGraph graph, Config config) {
+    MessageStream<PageViewEvent> pageViewEvents =
+        graph.getInputStream("pageViewEventStream", (k, m) -> (PageViewEvent) m);
+    OutputStream<String, PageViewCount, PageViewCount> pageViewEventPerMemberStream = graph
+        .getOutputStream("pageViewEventPerMemberStream", m -> m.memberId, m -> m);
 
-    MessageStream<PageViewEvent> pageViewEvents = graph.createInStream(input1, new StringSerde("UTF-8"), new JsonSerde<>());
-    OutputStream<MyStreamOutput> pageViewPerMemberCounters = graph.createOutStream(output, new StringSerde("UTF-8"), new JsonSerde<>());
     Supplier<Integer> initialValue = () -> 0;
-
-    pageViewEvents.
-        window(Windows.<PageViewEvent, String, Integer>keyedTumblingWindow(m -> m.getMessage().memberId, Duration.ofSeconds(10), initialValue, (m, c) -> c + 1).
-            setEarlyTrigger(Triggers.repeat(Triggers.count(5))).
-            setAccumulationMode(AccumulationMode.DISCARDING)).
-        map(MyStreamOutput::new).
-        sendTo(pageViewPerMemberCounters);
-
+    FoldLeftFunction<PageViewEvent, Integer> foldLeftFn = (m, c) -> c + 1;
+    pageViewEvents
+        .window(Windows.keyedTumblingWindow(m -> m.memberId, Duration.ofSeconds(10), initialValue, foldLeftFn)
+            .setEarlyTrigger(Triggers.repeat(Triggers.count(5)))
+            .setAccumulationMode(AccumulationMode.DISCARDING))
+        .map(PageViewCount::new)
+        .sendTo(pageViewEventPerMemberStream);
   }
 
+  // local execution mode
   public static void main(String[] args) {
     CommandLine cmdLine = new CommandLine();
     Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
@@ -63,11 +64,7 @@ public class PageViewCounterExample implements StreamApplication {
     localRunner.run(new PageViewCounterExample());
   }
 
-  StreamSpec input1 = new StreamSpec("pageViewEventStream", "PageViewEvent", "kafka");
-
-  StreamSpec output = new StreamSpec("pageViewEventPerMemberStream", "PageViewEventCountByMemberId", "kafka");
-
-  class PageViewEvent implements MessageEnvelope<String, PageViewEvent> {
+  class PageViewEvent {
     String pageId;
     String memberId;
     long timestamp;
@@ -77,38 +74,17 @@ public class PageViewCounterExample implements StreamApplication {
       this.memberId = memberId;
       this.timestamp = timestamp;
     }
-
-    @Override
-    public String getKey() {
-      return this.pageId;
-    }
-
-    @Override
-    public PageViewEvent getMessage() {
-      return this;
-    }
   }
 
-  class MyStreamOutput implements MessageEnvelope<String, MyStreamOutput> {
+  class PageViewCount {
     String memberId;
     long timestamp;
     int count;
 
-    MyStreamOutput(WindowPane<String, Integer> m) {
+    PageViewCount(WindowPane<String, Integer> m) {
       this.memberId = m.getKey().getKey();
       this.timestamp = Long.valueOf(m.getKey().getPaneId());
       this.count = m.getMessage();
     }
-
-    @Override
-    public String getKey() {
-      return this.memberId;
-    }
-
-    @Override
-    public MyStreamOutput getMessage() {
-      return this;
-    }
   }
-
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java b/samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java
index e222fe4..37375cd 100644
--- a/samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java
+++ b/samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java
@@ -18,16 +18,14 @@
  */
 package org.apache.samza.example;
 
-import org.apache.samza.operators.*;
 import org.apache.samza.application.StreamApplication;
 import org.apache.samza.config.Config;
-import org.apache.samza.operators.data.MessageEnvelope;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.OutputStream;
+import org.apache.samza.operators.StreamGraph;
 import org.apache.samza.operators.windows.WindowPane;
 import org.apache.samza.operators.windows.Windows;
-import org.apache.samza.serializers.JsonSerde;
-import org.apache.samza.serializers.StringSerde;
 import org.apache.samza.runtime.ApplicationRunner;
-import org.apache.samza.system.StreamSpec;
 import org.apache.samza.util.CommandLine;
 
 import java.time.Duration;
@@ -39,47 +37,21 @@ import java.util.function.Supplier;
  */
 public class RepartitionExample implements StreamApplication {
 
-  /**
-   * used by remote application runner to launch the job in remote program. The remote program should follow the similar
-   * invoking context as in local runner:
-   *
-   *   public static void main(String args[]) throws Exception {
-   *     CommandLine cmdLine = new CommandLine();
-   *     Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
-   *     ApplicationRunner runner = ApplicationRunner.fromConfig(config);
-   *     runner.run(new UserMainExample(), config);
-   *   }
-   *
-   */
-
-  /**
-   * used by remote execution environment to launch the job in remote program. The remote program should follow the similar
-   * invoking context as in standalone:
-   *
-   *   public static void main(String args[]) throws Exception {
-   *     CommandLine cmdLine = new CommandLine();
-   *     Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
-   *     ExecutionEnvironment remoteEnv = ExecutionEnvironment.getRemoteEnvironment(config);
-   *     remoteEnv.run(new UserMainExample(), config);
-   *   }
-   *
-   */
   @Override public void init(StreamGraph graph, Config config) {
-
-    MessageStream<PageViewEvent> pageViewEvents = graph.createInStream(input1, new StringSerde("UTF-8"), new JsonSerde<>());
-    OutputStream<MyStreamOutput> pageViewPerMemberCounters = graph.createOutStream(output, new StringSerde("UTF-8"), new JsonSerde<>());
     Supplier<Integer> initialValue = () -> 0;
-
-    pageViewEvents.
-        partitionBy(m -> m.getMessage().memberId).
-        window(Windows.<PageViewEvent, String, Integer>keyedTumblingWindow(
-            msg -> msg.getMessage().memberId, Duration.ofMinutes(5), initialValue, (m, c) -> c + 1)).
-        map(MyStreamOutput::new).
-        sendTo(pageViewPerMemberCounters);
-
+    MessageStream<PageViewEvent> pageViewEvents =
+        graph.getInputStream("pageViewEventStream", (k, m) -> (PageViewEvent) m);
+    OutputStream<String, MyStreamOutput, MyStreamOutput> pageViewEventPerMemberStream = graph
+        .getOutputStream("pageViewEventPerMemberStream", m -> m.memberId, m -> m);
+
+    pageViewEvents
+        .partitionBy(m -> m.memberId)
+        .window(Windows.keyedTumblingWindow(m -> m.memberId, Duration.ofMinutes(5), initialValue, (m, c) -> c + 1))
+        .map(MyStreamOutput::new)
+        .sendTo(pageViewEventPerMemberStream);
   }
 
-  // standalone local program model
+  // local execution mode
   public static void main(String[] args) throws Exception {
     CommandLine cmdLine = new CommandLine();
     Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
@@ -87,11 +59,7 @@ public class RepartitionExample implements StreamApplication {
     localRunner.run(new RepartitionExample());
   }
 
-  StreamSpec input1 = new StreamSpec("pageViewEventStream", "PageViewEvent", "kafka");
-
-  StreamSpec output = new StreamSpec("pageViewEventPerMemberStream", "PageViewEventCountByMemberId", "kafka");
-
-  class PageViewEvent implements MessageEnvelope<String, PageViewEvent> {
+  class PageViewEvent {
     String pageId;
     String memberId;
     long timestamp;
@@ -101,19 +69,9 @@ public class RepartitionExample implements StreamApplication {
       this.memberId = memberId;
       this.timestamp = timestamp;
     }
-
-    @Override
-    public String getKey() {
-      return this.pageId;
-    }
-
-    @Override
-    public PageViewEvent getMessage() {
-      return this;
-    }
   }
 
-  class MyStreamOutput implements MessageEnvelope<String, MyStreamOutput> {
+  class MyStreamOutput {
     String memberId;
     long timestamp;
     int count;
@@ -123,16 +81,5 @@ public class RepartitionExample implements StreamApplication {
       this.timestamp = Long.valueOf(m.getKey().getPaneId());
       this.count = m.getMessage();
     }
-
-    @Override
-    public String getKey() {
-      return this.memberId;
-    }
-
-    @Override
-    public MyStreamOutput getMessage() {
-      return this;
-    }
   }
-
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/test/java/org/apache/samza/example/TestBasicStreamGraphs.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/TestBasicStreamGraphs.java b/samza-core/src/test/java/org/apache/samza/example/TestBasicStreamGraphs.java
deleted file mode 100644
index 6975955..0000000
--- a/samza-core/src/test/java/org/apache/samza/example/TestBasicStreamGraphs.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.example;
-
-import java.lang.reflect.Field;
-import org.apache.samza.Partition;
-import org.apache.samza.config.Config;
-import org.apache.samza.config.MapConfig;
-import org.apache.samza.operators.impl.OperatorGraph;
-import org.apache.samza.runtime.ApplicationRunner;
-import org.apache.samza.system.SystemStreamPartition;
-import org.apache.samza.task.StreamOperatorTask;
-import org.apache.samza.task.TaskContext;
-import org.junit.Test;
-
-import java.util.HashSet;
-import java.util.Set;
-
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.*;
-
-
-/**
- * Unit test for {@link StreamOperatorTask}
- */
-public class TestBasicStreamGraphs {
-
-  private final Set<SystemStreamPartition> inputPartitions = new HashSet<SystemStreamPartition>() { {
-      for (int i = 0; i < 4; i++) {
-        this.add(new SystemStreamPartition("my-system", String.format("my-topic%d", i), new Partition(i)));
-      }
-    } };
-
-  private final ApplicationRunner runner = mock(ApplicationRunner.class);
-
-  @Test
-  public void testUserTask() throws Exception {
-    Config config = new MapConfig();
-    TaskContext mockContext = mock(TaskContext.class);
-    when(mockContext.getSystemStreamPartitions()).thenReturn(this.inputPartitions);
-    TestWindowExample userTask = new TestWindowExample(this.inputPartitions);
-    StreamOperatorTask adaptorTask = new StreamOperatorTask(userTask, runner);
-    Field pipelineMapFld = StreamOperatorTask.class.getDeclaredField("operatorGraph");
-    pipelineMapFld.setAccessible(true);
-    OperatorGraph opGraph = (OperatorGraph) pipelineMapFld.get(adaptorTask);
-
-    adaptorTask.init(config, mockContext);
-    this.inputPartitions.forEach(partition -> {
-        assertNotNull(opGraph.get(partition.getSystemStream()));
-      });
-  }
-
-  @Test
-  public void testSplitTask() throws Exception {
-    Config config = new MapConfig();
-    TaskContext mockContext = mock(TaskContext.class);
-    when(mockContext.getSystemStreamPartitions()).thenReturn(this.inputPartitions);
-    TestBroadcastExample splitTask = new TestBroadcastExample(this.inputPartitions);
-    StreamOperatorTask adaptorTask = new StreamOperatorTask(splitTask, runner);
-    Field pipelineMapFld = StreamOperatorTask.class.getDeclaredField("operatorGraph");
-    pipelineMapFld.setAccessible(true);
-    OperatorGraph opGraph = (OperatorGraph) pipelineMapFld.get(adaptorTask);
-
-    adaptorTask.init(config, mockContext);
-    this.inputPartitions.forEach(partition -> {
-        assertNotNull(opGraph.get(partition.getSystemStream()));
-      });
-  }
-
-  @Test
-  public void testJoinTask() throws Exception {
-    Config config = new MapConfig();
-    TaskContext mockContext = mock(TaskContext.class);
-    when(mockContext.getSystemStreamPartitions()).thenReturn(this.inputPartitions);
-    TestJoinExample joinTask = new TestJoinExample(this.inputPartitions);
-    StreamOperatorTask adaptorTask = new StreamOperatorTask(joinTask, runner);
-    Field pipelineMapFld = StreamOperatorTask.class.getDeclaredField("operatorGraph");
-    pipelineMapFld.setAccessible(true);
-    OperatorGraph opGraph = (OperatorGraph) pipelineMapFld.get(adaptorTask);
-
-    adaptorTask.init(config, mockContext);
-    this.inputPartitions.forEach(partition -> {
-        assertNotNull(opGraph.get(partition.getSystemStream()));
-      });
-  }
-
-}


[2/4] samza git commit: SAMZA-1094 SAMZA-1101 SAMZA-1159; Remove MessageEnvelope from public operator APIs. : Delay the creation of SinkFunction for output streams. : Move StreamSpec from a public API to an internal class.

Posted by ja...@apache.org.
http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/test/java/org/apache/samza/example/TestBroadcastExample.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/TestBroadcastExample.java b/samza-core/src/test/java/org/apache/samza/example/TestBroadcastExample.java
deleted file mode 100644
index 1c30a21..0000000
--- a/samza-core/src/test/java/org/apache/samza/example/TestBroadcastExample.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.example;
-
-import java.time.Duration;
-import java.util.Set;
-import java.util.function.Supplier;
-
-import org.apache.samza.config.Config;
-import org.apache.samza.operators.MessageStream;
-import org.apache.samza.operators.StreamGraph;
-import org.apache.samza.operators.data.InputMessageEnvelope;
-import org.apache.samza.operators.data.JsonIncomingSystemMessageEnvelope;
-import org.apache.samza.operators.data.Offset;
-import org.apache.samza.operators.functions.FoldLeftFunction;
-import org.apache.samza.operators.triggers.Triggers;
-import org.apache.samza.operators.windows.Windows;
-import org.apache.samza.system.StreamSpec;
-import org.apache.samza.system.SystemStreamPartition;
-
-
-/**
- * Example implementation of split stream tasks
- *
- */
-public class TestBroadcastExample extends TestExampleBase {
-
-  TestBroadcastExample(Set<SystemStreamPartition> inputs) {
-    super(inputs);
-  }
-
-  class MessageType {
-    String field1;
-    String field2;
-    String field3;
-    String field4;
-    String parKey;
-    private long timestamp;
-
-    public long getTimestamp() {
-      return this.timestamp;
-    }
-  }
-
-  class JsonMessageEnvelope extends JsonIncomingSystemMessageEnvelope<MessageType> {
-    JsonMessageEnvelope(String key, MessageType data, Offset offset, SystemStreamPartition partition) {
-      super(key, data, offset, partition);
-    }
-  }
-
-  @Override
-  public void init(StreamGraph graph, Config config) {
-    FoldLeftFunction<JsonMessageEnvelope, Integer> sumAggregator = (m, c) -> c + 1;
-    Supplier<Integer> initialValue = () -> 0;
-
-    inputs.keySet().forEach(entry -> {
-        MessageStream<JsonMessageEnvelope> inputStream = graph.<Object, Object, InputMessageEnvelope>createInStream(
-                new StreamSpec(entry.getSystem() + "-" + entry.getStream(), entry.getStream(), entry.getSystem()), null, null).map(this::getInputMessage);
-
-        inputStream.filter(this::myFilter1).window(Windows.tumblingWindow(Duration.ofMillis(100), initialValue, sumAggregator)
-            .setLateTrigger(Triggers.any(Triggers.count(30000), Triggers.timeSinceFirstMessage(Duration.ofMillis(10)))));
-
-        inputStream.filter(this::myFilter2).window(Windows.tumblingWindow(Duration.ofMillis(100), initialValue, sumAggregator)
-            .setLateTrigger(Triggers.any(Triggers.count(30000), Triggers.timeSinceFirstMessage(Duration.ofMillis(10)))));
-
-        inputStream.filter(this::myFilter3).window(Windows.tumblingWindow(Duration.ofMillis(100), initialValue, sumAggregator)
-            .setLateTrigger(Triggers.any(Triggers.count(30000), Triggers.timeSinceFirstMessage(Duration.ofMillis(10)))));
-
-      });
-  }
-
-  JsonMessageEnvelope getInputMessage(InputMessageEnvelope m1) {
-    return (JsonMessageEnvelope) m1.getMessage();
-  }
-
-  boolean myFilter1(JsonMessageEnvelope m1) {
-    // Do user defined processing here
-    return m1.getMessage().parKey.equals("key1");
-  }
-
-  boolean myFilter2(JsonMessageEnvelope m1) {
-    // Do user defined processing here
-    return m1.getMessage().parKey.equals("key2");
-  }
-
-  boolean myFilter3(JsonMessageEnvelope m1) {
-    return m1.getMessage().parKey.equals("key3");
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/test/java/org/apache/samza/example/TestExampleBase.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/TestExampleBase.java b/samza-core/src/test/java/org/apache/samza/example/TestExampleBase.java
deleted file mode 100644
index dd661a0..0000000
--- a/samza-core/src/test/java/org/apache/samza/example/TestExampleBase.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.example;
-
-import org.apache.samza.application.StreamApplication;
-import org.apache.samza.system.SystemStream;
-import org.apache.samza.system.SystemStreamPartition;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-/**
- * Base class for test examples
- *
- */
-public abstract class TestExampleBase implements StreamApplication {
-
-  protected final Map<SystemStream, Set<SystemStreamPartition>> inputs;
-
-  TestExampleBase(Set<SystemStreamPartition> inputs) {
-    this.inputs = new HashMap<>();
-    for (SystemStreamPartition input : inputs) {
-      this.inputs.putIfAbsent(input.getSystemStream(), new HashSet<>());
-      this.inputs.get(input.getSystemStream()).add(input);
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/test/java/org/apache/samza/example/TestJoinExample.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/TestJoinExample.java b/samza-core/src/test/java/org/apache/samza/example/TestJoinExample.java
deleted file mode 100644
index 6c9f8c2..0000000
--- a/samza-core/src/test/java/org/apache/samza/example/TestJoinExample.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.example;
-
-import org.apache.samza.config.Config;
-import org.apache.samza.operators.MessageStream;
-import org.apache.samza.operators.StreamGraph;
-import org.apache.samza.operators.data.InputMessageEnvelope;
-import org.apache.samza.operators.data.JsonIncomingSystemMessageEnvelope;
-import org.apache.samza.operators.data.Offset;
-import org.apache.samza.operators.functions.JoinFunction;
-import org.apache.samza.serializers.JsonSerde;
-import org.apache.samza.serializers.StringSerde;
-import org.apache.samza.system.StreamSpec;
-import org.apache.samza.system.SystemStream;
-import org.apache.samza.system.SystemStreamPartition;
-
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Set;
-
-
-/**
- * Example implementation of unique key-based stream-stream join tasks
- *
- */
-public class TestJoinExample  extends TestExampleBase {
-
-  TestJoinExample(Set<SystemStreamPartition> inputs) {
-    super(inputs);
-  }
-
-  class MessageType {
-    String joinKey;
-    List<String> joinFields = new ArrayList<>();
-  }
-
-  class JsonMessageEnvelope extends JsonIncomingSystemMessageEnvelope<MessageType> {
-    JsonMessageEnvelope(String key, MessageType data, Offset offset, SystemStreamPartition partition) {
-      super(key, data, offset, partition);
-    }
-  }
-
-  MessageStream<JsonMessageEnvelope> joinOutput = null;
-
-  @Override
-  public void init(StreamGraph graph, Config config) {
-
-    for (SystemStream input : inputs.keySet()) {
-      StreamSpec inputStreamSpec = new StreamSpec(input.getSystem() + "-" + input.getStream(), input.getStream(), input.getSystem());
-      MessageStream<JsonMessageEnvelope> newSource = graph.<Object, Object, InputMessageEnvelope>createInStream(
-          inputStreamSpec, null, null).map(this::getInputMessage);
-      if (joinOutput == null) {
-        joinOutput = newSource;
-      } else {
-        joinOutput = joinOutput.join(newSource, new MyJoinFunction(), Duration.ofMinutes(1));
-      }
-    }
-
-    joinOutput.sendTo(graph.createOutStream(
-            new StreamSpec("joinOutput", "JoinOutputEvent", "kafka"),
-            new StringSerde("UTF-8"), new JsonSerde<>()));
-
-  }
-
-  private JsonMessageEnvelope getInputMessage(InputMessageEnvelope ism) {
-    return new JsonMessageEnvelope(
-        ((MessageType) ism.getMessage()).joinKey,
-        (MessageType) ism.getMessage(),
-        ism.getOffset(),
-        ism.getSystemStreamPartition());
-  }
-
-  class MyJoinFunction implements JoinFunction<String, JsonMessageEnvelope, JsonMessageEnvelope, JsonMessageEnvelope> {
-    JsonMessageEnvelope myJoinResult(JsonMessageEnvelope m1, JsonMessageEnvelope m2) {
-      MessageType newJoinMsg = new MessageType();
-      newJoinMsg.joinKey = m1.getKey();
-      newJoinMsg.joinFields.addAll(m1.getMessage().joinFields);
-      newJoinMsg.joinFields.addAll(m2.getMessage().joinFields);
-      return new JsonMessageEnvelope(m1.getMessage().joinKey, newJoinMsg, null, null);
-    }
-
-    @Override
-    public JsonMessageEnvelope apply(JsonMessageEnvelope message, JsonMessageEnvelope otherMessage) {
-      return this.myJoinResult(message, otherMessage);
-    }
-
-    @Override
-    public String getFirstKey(JsonMessageEnvelope message) {
-      return message.getKey();
-    }
-
-    @Override
-    public String getSecondKey(JsonMessageEnvelope message) {
-      return message.getKey();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/test/java/org/apache/samza/example/TestWindowExample.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/TestWindowExample.java b/samza-core/src/test/java/org/apache/samza/example/TestWindowExample.java
deleted file mode 100644
index c88df7c..0000000
--- a/samza-core/src/test/java/org/apache/samza/example/TestWindowExample.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.example;
-
-import org.apache.samza.config.Config;
-import org.apache.samza.operators.StreamGraph;
-import org.apache.samza.operators.data.InputMessageEnvelope;
-import org.apache.samza.operators.data.JsonIncomingSystemMessageEnvelope;
-import org.apache.samza.operators.data.MessageEnvelope;
-import org.apache.samza.operators.data.Offset;
-import org.apache.samza.operators.functions.FoldLeftFunction;
-import org.apache.samza.operators.windows.Windows;
-import org.apache.samza.system.StreamSpec;
-import org.apache.samza.system.SystemStreamPartition;
-
-import java.time.Duration;
-import java.util.Set;
-import java.util.function.Supplier;
-
-
-/**
- * Example implementation of a simple user-defined tasks w/ window operators
- *
- */
-public class TestWindowExample extends TestExampleBase {
-  class MessageType {
-    String field1;
-    String field2;
-  }
-
-  TestWindowExample(Set<SystemStreamPartition> inputs) {
-    super(inputs);
-  }
-
-  class JsonMessageEnvelope extends JsonIncomingSystemMessageEnvelope<MessageType> {
-
-    JsonMessageEnvelope(String key, MessageType data, Offset offset, SystemStreamPartition partition) {
-      super(key, data, offset, partition);
-    }
-  }
-
-  @Override
-  public void init(StreamGraph graph, Config config) {
-    FoldLeftFunction<JsonMessageEnvelope, Integer> maxAggregator = (m, c) -> c + 1;
-    Supplier<Integer> initialValue = () -> 0;
-    inputs.keySet().forEach(source -> graph.<Object, Object, InputMessageEnvelope>createInStream(
-            new StreamSpec(source.getSystem() + "-" + source.getStream(), source.getStream(), source.getSystem()), null, null).
-        map(m1 -> new JsonMessageEnvelope(this.myMessageKeyFunction(m1), (MessageType) m1.getMessage(), m1.getOffset(),
-            m1.getSystemStreamPartition())).window(Windows.tumblingWindow(Duration.ofMillis(200), initialValue, maxAggregator)));
-
-  }
-
-  String myMessageKeyFunction(MessageEnvelope<Object, Object> m) {
-    return m.getKey().toString();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/test/java/org/apache/samza/example/WindowExample.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/WindowExample.java b/samza-core/src/test/java/org/apache/samza/example/WindowExample.java
new file mode 100644
index 0000000..159dba2
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/example/WindowExample.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.example;
+
+import org.apache.samza.application.StreamApplication;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.OutputStream;
+import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.operators.functions.FoldLeftFunction;
+import org.apache.samza.operators.triggers.Triggers;
+import org.apache.samza.operators.windows.WindowPane;
+import org.apache.samza.operators.windows.Windows;
+import org.apache.samza.runtime.ApplicationRunner;
+import org.apache.samza.util.CommandLine;
+
+import java.time.Duration;
+import java.util.function.Supplier;
+
+
+/**
+ * Example implementation of a simple user-defined task w/ a window operator.
+ *
+ */
+public class WindowExample implements StreamApplication {
+
+  @Override
+  public void init(StreamGraph graph, Config config) {
+    Supplier<Integer> initialValue = () -> 0;
+    FoldLeftFunction<PageViewEvent, Integer> counter = (m, c) -> c == null ? 1 : c + 1;
+    MessageStream<PageViewEvent> inputStream = graph.getInputStream("inputStream", (k, m) -> (PageViewEvent) m);
+    OutputStream<String, Integer, WindowPane<Void, Integer>> outputStream = graph
+        .getOutputStream("outputStream", m -> m.getKey().getPaneId(), m -> m.getMessage());
+
+    // create a tumbling window that outputs the number of message collected every 10 minutes.
+    // also emit early results if either the number of messages collected reaches 30000, or if no new messages arrive
+    // for 1 minute.
+    inputStream
+        .window(Windows.tumblingWindow(Duration.ofMinutes(10), initialValue, counter)
+            .setLateTrigger(Triggers.any(Triggers.count(30000), Triggers.timeSinceLastMessage(Duration.ofMinutes(1)))))
+        .sendTo(outputStream);
+  }
+
+  // local execution mode
+  public static void main(String[] args) throws Exception {
+    CommandLine cmdLine = new CommandLine();
+    Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
+    ApplicationRunner localRunner = ApplicationRunner.getLocalRunner(config);
+    localRunner.run(new WindowExample());
+  }
+
+  class PageViewEvent {
+    String key;
+    long timestamp;
+
+    public PageViewEvent(String key, long timestamp) {
+      this.key = key;
+      this.timestamp = timestamp;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
index e524ba1..e661798 100644
--- a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
+++ b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
@@ -19,56 +19,52 @@
 
 package org.apache.samza.execution;
 
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
 import org.apache.samza.Partition;
-import org.apache.samza.application.StreamApplication;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.MapConfig;
-import org.apache.samza.job.ApplicationStatus;
 import org.apache.samza.operators.MessageStream;
-import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.operators.OutputStream;
 import org.apache.samza.operators.StreamGraphImpl;
 import org.apache.samza.operators.functions.JoinFunction;
-import org.apache.samza.operators.functions.SinkFunction;
-import org.apache.samza.runtime.AbstractApplicationRunner;
 import org.apache.samza.runtime.ApplicationRunner;
 import org.apache.samza.system.StreamSpec;
 import org.apache.samza.system.SystemAdmin;
 import org.apache.samza.system.SystemStreamMetadata;
 import org.apache.samza.system.SystemStreamPartition;
-import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.TaskCoordinator;
 import org.junit.Before;
 import org.junit.Test;
 
-import static org.junit.Assert.*;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 
 public class TestExecutionPlanner {
 
-  private Config config;
-
   private static final String DEFAULT_SYSTEM = "test-system";
   private static final int DEFAULT_PARTITIONS = 10;
 
+  private Map<String, SystemAdmin> systemAdmins;
+  private StreamManager streamManager;
+  private ApplicationRunner runner;
+  private Config config;
+
   private StreamSpec input1;
   private StreamSpec input2;
   private StreamSpec input3;
   private StreamSpec output1;
   private StreamSpec output2;
 
-  private Map<String, SystemAdmin> systemAdmins;
-  private StreamManager streamManager;
-
-  private ApplicationRunner runner;
-
   private JoinFunction createJoin() {
     return new JoinFunction() {
       @Override
@@ -88,14 +84,6 @@ public class TestExecutionPlanner {
     };
   }
 
-  private SinkFunction createSink() {
-    return new SinkFunction() {
-      @Override
-      public void apply(Object message, MessageCollector messageCollector, TaskCoordinator taskCoordinator) {
-      }
-    };
-  }
-
   private SystemAdmin createSystemAdmin(Map<String, Integer> streamToPartitions) {
 
     return new SystemAdmin() {
@@ -139,37 +127,43 @@ public class TestExecutionPlanner {
     };
   }
 
-  private StreamGraph createSimpleGraph() {
+  private StreamGraphImpl createSimpleGraph() {
     /**
      * a simple graph of partitionBy and map
      *
      * input1 -> partitionBy -> map -> output1
      *
      */
-    StreamGraph streamGraph = new StreamGraphImpl(runner, config);
-    streamGraph.createInStream(input1, null, null).partitionBy(m -> "yes!!!").map(m -> m).sendTo(streamGraph.createOutStream(output1, null, null));
+    StreamGraphImpl streamGraph = new StreamGraphImpl(runner, config);
+    OutputStream<Object, Object, Object> output1 = streamGraph.getOutputStream("output1", null, null);
+    streamGraph.getInputStream("input1", null)
+        .partitionBy(m -> "yes!!!").map(m -> m)
+        .sendTo(output1);
     return streamGraph;
   }
 
-  private StreamGraph createStreamGraphWithJoin() {
+  private StreamGraphImpl createStreamGraphWithJoin() {
 
-    /** the graph looks like the following
+    /**
+     * the graph looks like the following. number of partitions in parentheses. quotes indicate expected value.
      *
-     *                        input1 -> map -> join -> output1
-     *                                           |
-     *          input2 -> partitionBy -> filter -|
-     *                                           |
-     * input3 -> filter -> partitionBy -> map -> join -> output2
+     *                               input1 (64) -> map -> join -> output1 (8)
+     *                                                       |
+     *          input2 (16) -> partitionBy ("64") -> filter -|
+     *                                                       |
+     * input3 (32) -> filter -> partitionBy ("64") -> map -> join -> output2 (16)
      *
      */
 
-    StreamGraph streamGraph = new StreamGraphImpl(runner, config);
-    MessageStream m1 = streamGraph.createInStream(input1, null, null).map(m -> m);
-    MessageStream m2 = streamGraph.createInStream(input2, null, null).partitionBy(m -> "haha").filter(m -> true);
-    MessageStream m3 = streamGraph.createInStream(input3, null, null).filter(m -> true).partitionBy(m -> "hehe").map(m -> m);
+    StreamGraphImpl streamGraph = new StreamGraphImpl(runner, config);
+    MessageStream m1 = streamGraph.getInputStream("input1", null).map(m -> m);
+    MessageStream m2 = streamGraph.getInputStream("input2", null).partitionBy(m -> "haha").filter(m -> true);
+    MessageStream m3 = streamGraph.getInputStream("input3", null).filter(m -> true).partitionBy(m -> "hehe").map(m -> m);
+    OutputStream<Object, Object, Object> output1 = streamGraph.getOutputStream("output1", null, null);
+    OutputStream<Object, Object, Object> output2 = streamGraph.getOutputStream("output2", null, null);
 
-    m1.join(m2, createJoin(), Duration.ofHours(1)).sendTo(streamGraph.createOutStream(output1, null, null));
-    m3.join(m2, createJoin(), Duration.ofHours(1)).sendTo(streamGraph.createOutStream(output2, null, null));
+    m1.join(m2, createJoin(), Duration.ofHours(2)).sendTo(output1);
+    m3.join(m2, createJoin(), Duration.ofHours(1)).sendTo(output2);
 
     return streamGraph;
   }
@@ -205,27 +199,26 @@ public class TestExecutionPlanner {
     systemAdmins.put("system2", systemAdmin2);
     streamManager = new StreamManager(systemAdmins);
 
-    runner = new AbstractApplicationRunner(config) {
-      @Override
-      public void run(StreamApplication streamApp) {
-      }
-
-      @Override
-      public void kill(StreamApplication streamApp) {
-
-      }
-
-      @Override
-      public ApplicationStatus status(StreamApplication streamApp) {
-        return null;
-      }
-    };
+    runner = mock(ApplicationRunner.class);
+    when(runner.getStreamSpec("input1")).thenReturn(input1);
+    when(runner.getStreamSpec("input2")).thenReturn(input2);
+    when(runner.getStreamSpec("input3")).thenReturn(input3);
+    when(runner.getStreamSpec("output1")).thenReturn(output1);
+    when(runner.getStreamSpec("output2")).thenReturn(output2);
+
+    // intermediate streams used in tests
+    when(runner.getStreamSpec("test-app-1-partition_by-0"))
+        .thenReturn(new StreamSpec("test-app-1-partition_by-0", "test-app-1-partition_by-0", "default-system"));
+    when(runner.getStreamSpec("test-app-1-partition_by-1"))
+        .thenReturn(new StreamSpec("test-app-1-partition_by-1", "test-app-1-partition_by-1", "default-system"));
+    when(runner.getStreamSpec("test-app-1-partition_by-4"))
+        .thenReturn(new StreamSpec("test-app-1-partition_by-4", "test-app-1-partition_by-4", "default-system"));
   }
 
   @Test
   public void testCreateProcessorGraph() {
     ExecutionPlanner planner = new ExecutionPlanner(config, streamManager);
-    StreamGraph streamGraph = createStreamGraphWithJoin();
+    StreamGraphImpl streamGraph = createStreamGraphWithJoin();
 
     JobGraph jobGraph = planner.createJobGraph(streamGraph);
     assertTrue(jobGraph.getSources().size() == 3);
@@ -236,7 +229,7 @@ public class TestExecutionPlanner {
   @Test
   public void testFetchExistingStreamPartitions() {
     ExecutionPlanner planner = new ExecutionPlanner(config, streamManager);
-    StreamGraph streamGraph = createStreamGraphWithJoin();
+    StreamGraphImpl streamGraph = createStreamGraphWithJoin();
     JobGraph jobGraph = planner.createJobGraph(streamGraph);
 
     ExecutionPlanner.updateExistingPartitions(jobGraph, streamManager);
@@ -254,7 +247,7 @@ public class TestExecutionPlanner {
   @Test
   public void testCalculateJoinInputPartitions() {
     ExecutionPlanner planner = new ExecutionPlanner(config, streamManager);
-    StreamGraph streamGraph = createStreamGraphWithJoin();
+    StreamGraphImpl streamGraph = createStreamGraphWithJoin();
     JobGraph jobGraph = planner.createJobGraph(streamGraph);
 
     ExecutionPlanner.updateExistingPartitions(jobGraph, streamManager);
@@ -273,7 +266,7 @@ public class TestExecutionPlanner {
     Config cfg = new MapConfig(map);
 
     ExecutionPlanner planner = new ExecutionPlanner(cfg, streamManager);
-    StreamGraph streamGraph = createSimpleGraph();
+    StreamGraphImpl streamGraph = createSimpleGraph();
     JobGraph jobGraph = planner.createJobGraph(streamGraph);
     planner.calculatePartitions(streamGraph, jobGraph);
 
@@ -286,7 +279,7 @@ public class TestExecutionPlanner {
   @Test
   public void testCalculateIntStreamPartitions() {
     ExecutionPlanner planner = new ExecutionPlanner(config, streamManager);
-    StreamGraph streamGraph = createSimpleGraph();
+    StreamGraphImpl streamGraph = createSimpleGraph();
     JobGraph jobGraph = planner.createJobGraph(streamGraph);
     planner.calculatePartitions(streamGraph, jobGraph);
 

http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java b/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java
index 4e6c750..acc8588 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java
@@ -22,7 +22,6 @@ import com.google.common.collect.ImmutableSet;
 import org.apache.samza.Partition;
 import org.apache.samza.application.StreamApplication;
 import org.apache.samza.config.Config;
-import org.apache.samza.operators.data.MessageEnvelope;
 import org.apache.samza.operators.functions.JoinFunction;
 import org.apache.samza.runtime.ApplicationRunner;
 import org.apache.samza.system.IncomingMessageEnvelope;
@@ -48,7 +47,6 @@ import static org.mockito.Mockito.when;
 
 public class TestJoinOperator {
   private final TaskCoordinator taskCoordinator = mock(TaskCoordinator.class);
-  private final ApplicationRunner runner = mock(ApplicationRunner.class);
   private final Set<Integer> numbers = ImmutableSet.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
 
   @Test
@@ -226,6 +224,10 @@ public class TestJoinOperator {
   }
 
   private StreamOperatorTask createStreamOperatorTask() throws Exception {
+    ApplicationRunner runner = mock(ApplicationRunner.class);
+    when(runner.getStreamSpec("instream")).thenReturn(new StreamSpec("instream", "instream", "insystem"));
+    when(runner.getStreamSpec("instream2")).thenReturn(new StreamSpec("instream2", "instream2", "insystem2"));
+
     TaskContext taskContext = mock(TaskContext.class);
     when(taskContext.getSystemStreamPartitions()).thenReturn(ImmutableSet
         .of(new SystemStreamPartition("insystem", "instream", new Partition(0)),
@@ -239,13 +241,12 @@ public class TestJoinOperator {
   }
 
   private class TestStreamApplication implements StreamApplication {
-    StreamSpec inStreamSpec = new StreamSpec("instream", "instream", "insystem");
-    StreamSpec inStreamSpec2 = new StreamSpec("instream2", "instream2", "insystem2");
-
     @Override
     public void init(StreamGraph graph, Config config) {
-      MessageStream<MessageEnvelope<Integer, Integer>> inStream = graph.createInStream(inStreamSpec, null, null);
-      MessageStream<MessageEnvelope<Integer, Integer>> inStream2 = graph.createInStream(inStreamSpec2, null, null);
+      MessageStream<FirstStreamIME> inStream =
+          graph.getInputStream("instream", FirstStreamIME::new);
+      MessageStream<SecondStreamIME> inStream2 =
+          graph.getInputStream("instream2", SecondStreamIME::new);
 
       SystemStream outputSystemStream = new SystemStream("outputSystem", "outputStream");
       inStream
@@ -256,22 +257,20 @@ public class TestJoinOperator {
     }
   }
 
-  private class TestJoinFunction
-      implements JoinFunction<Integer, MessageEnvelope<Integer, Integer>, MessageEnvelope<Integer, Integer>, Integer> {
+  private class TestJoinFunction implements JoinFunction<Integer, FirstStreamIME, SecondStreamIME, Integer> {
     @Override
-    public Integer apply(MessageEnvelope<Integer, Integer> message,
-        MessageEnvelope<Integer, Integer> otherMessage) {
-      return message.getMessage() + otherMessage.getMessage();
+    public Integer apply(FirstStreamIME message, SecondStreamIME otherMessage) {
+      return (Integer) message.getMessage() + (Integer) otherMessage.getMessage();
     }
 
     @Override
-    public Integer getFirstKey(MessageEnvelope<Integer, Integer> message) {
-      return message.getKey();
+    public Integer getFirstKey(FirstStreamIME message) {
+      return (Integer) message.getKey();
     }
 
     @Override
-    public Integer getSecondKey(MessageEnvelope<Integer, Integer> message) {
-      return message.getKey();
+    public Integer getSecondKey(SecondStreamIME message) {
+      return (Integer) message.getKey();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java b/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
index 8a2dd95..e815b81 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
@@ -18,25 +18,19 @@
  */
 package org.apache.samza.operators;
 
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.function.Function;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.MapConfig;
+import org.apache.samza.operators.data.TestMessageEnvelope;
+import org.apache.samza.operators.data.TestOutputMessageEnvelope;
 import org.apache.samza.operators.functions.FilterFunction;
 import org.apache.samza.operators.functions.FlatMapFunction;
 import org.apache.samza.operators.functions.JoinFunction;
 import org.apache.samza.operators.functions.MapFunction;
 import org.apache.samza.operators.functions.SinkFunction;
 import org.apache.samza.operators.spec.OperatorSpec;
-import org.apache.samza.operators.spec.PartialJoinOperatorSpec;
 import org.apache.samza.operators.spec.SinkOperatorSpec;
+import org.apache.samza.operators.spec.PartialJoinOperatorSpec;
 import org.apache.samza.operators.spec.StreamOperatorSpec;
 import org.apache.samza.runtime.ApplicationRunner;
 import org.apache.samza.system.OutgoingMessageEnvelope;
@@ -45,6 +39,15 @@ import org.apache.samza.task.MessageCollector;
 import org.apache.samza.task.TaskCoordinator;
 import org.junit.Test;
 
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
@@ -138,7 +141,6 @@ public class TestMessageStreamImpl {
     OperatorSpec<TestMessageEnvelope> sinkOp = subs.iterator().next();
     assertTrue(sinkOp instanceof SinkOperatorSpec);
     assertEquals(((SinkOperatorSpec) sinkOp).getSinkFn(), xSink);
-    assertNull(((SinkOperatorSpec) sinkOp).getNextStream());
   }
 
   @Test
@@ -220,8 +222,8 @@ public class TestMessageStreamImpl {
     MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(streamGraph);
     Function<TestMessageEnvelope, String> keyExtractorFunc = m -> "222";
     inputStream.partitionBy(keyExtractorFunc);
-    assertTrue(streamGraph.getInStreams().size() == 1);
-    assertTrue(streamGraph.getOutStreams().size() == 1);
+    assertTrue(streamGraph.getInputStreams().size() == 1);
+    assertTrue(streamGraph.getOutputStreams().size() == 1);
 
     Collection<OperatorSpec> subs = inputStream.getRegisteredOperatorSpecs();
     assertEquals(subs.size(), 1);
@@ -229,11 +231,7 @@ public class TestMessageStreamImpl {
     assertTrue(partitionByOp instanceof SinkOperatorSpec);
     assertNull(partitionByOp.getNextStream());
 
-    ((SinkOperatorSpec) partitionByOp).getSinkFn().apply(new TestMessageEnvelope("111", "test", 1000), new MessageCollector() {
-      @Override
-      public void send(OutgoingMessageEnvelope envelope) {
-        assertTrue(envelope.getPartitionKey().equals("222"));
-      }
-    }, null);
+    ((SinkOperatorSpec) partitionByOp).getSinkFn().apply(new TestMessageEnvelope("111", "test", 1000),
+        envelope -> assertTrue(envelope.getPartitionKey().equals("222")), null);
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/test/java/org/apache/samza/operators/TestWindowOperator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/TestWindowOperator.java b/samza-core/src/test/java/org/apache/samza/operators/TestWindowOperator.java
new file mode 100644
index 0000000..6603137
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/operators/TestWindowOperator.java
@@ -0,0 +1,413 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.operators;
+
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import junit.framework.Assert;
+import org.apache.samza.Partition;
+import org.apache.samza.application.StreamApplication;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.triggers.FiringType;
+import org.apache.samza.system.StreamSpec;
+import org.apache.samza.testUtils.TestClock;
+import org.apache.samza.operators.triggers.Trigger;
+import org.apache.samza.operators.triggers.Triggers;
+import org.apache.samza.operators.windows.AccumulationMode;
+import org.apache.samza.operators.windows.WindowPane;
+import org.apache.samza.operators.windows.Windows;
+import org.apache.samza.runtime.ApplicationRunner;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.StreamOperatorTask;
+import org.apache.samza.task.TaskContext;
+import org.apache.samza.task.TaskCoordinator;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.function.Function;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestWindowOperator {
+  private final MessageCollector messageCollector = mock(MessageCollector.class);
+  private final TaskCoordinator taskCoordinator = mock(TaskCoordinator.class);
+  private final List<WindowPane<Integer, Collection<MessageEnvelope<Integer, Integer>>>> windowPanes = new ArrayList<>();
+  private final List<Integer> integers = ImmutableList.of(1, 2, 1, 2, 1, 2, 1, 2, 3);
+  private Config config;
+  private TaskContext taskContext;
+  private ApplicationRunner runner;
+
+  @Before
+  public void setup() throws Exception {
+    windowPanes.clear();
+
+    config = mock(Config.class);
+    taskContext = mock(TaskContext.class);
+    runner = mock(ApplicationRunner.class);
+    when(taskContext.getSystemStreamPartitions()).thenReturn(ImmutableSet
+        .of(new SystemStreamPartition("kafka", "integers", new Partition(0))));
+    when(runner.getStreamSpec("integer-stream")).thenReturn(new StreamSpec("integer-stream", "integers", "kafka"));
+  }
+
+  @Test
+  public void testTumblingWindowsDiscardingMode() throws Exception {
+
+    StreamApplication sgb = new KeyedTumblingWindowStreamApplication(AccumulationMode.DISCARDING,
+        Duration.ofSeconds(1), Triggers.repeat(Triggers.count(2)));
+    TestClock testClock = new TestClock();
+    StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock);
+    task.init(config, taskContext);
+
+    integers.forEach(n -> task.process(new IntegerMessageEnvelope(n, n), messageCollector, taskCoordinator));
+    testClock.advanceTime(Duration.ofSeconds(1));
+
+    task.window(messageCollector, taskCoordinator);
+    Assert.assertEquals(windowPanes.size(), 5);
+    Assert.assertEquals(windowPanes.get(0).getKey().getKey(), new Integer(1));
+    Assert.assertEquals((windowPanes.get(0).getMessage()).size(), 2);
+
+    Assert.assertEquals(windowPanes.get(1).getKey().getKey(), new Integer(2));
+    Assert.assertEquals((windowPanes.get(1).getMessage()).size(), 2);
+
+    Assert.assertEquals(windowPanes.get(2).getKey().getKey(), new Integer(1));
+    Assert.assertEquals((windowPanes.get(2).getMessage()).size(), 2);
+
+    Assert.assertEquals(windowPanes.get(3).getKey().getKey(), new Integer(2));
+    Assert.assertEquals((windowPanes.get(3).getMessage()).size(), 2);
+
+    Assert.assertEquals(windowPanes.get(4).getKey().getKey(), new Integer(3));
+    Assert.assertEquals((windowPanes.get(4).getMessage()).size(), 1);
+  }
+
+  @Test
+  public void testTumblingWindowsAccumulatingMode() throws Exception {
+    StreamApplication sgb = new KeyedTumblingWindowStreamApplication(AccumulationMode.ACCUMULATING,
+        Duration.ofSeconds(1), Triggers.repeat(Triggers.count(2)));
+    TestClock testClock = new TestClock();
+    StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock);
+    task.init(config, taskContext);
+
+    integers.forEach(n -> task.process(new IntegerMessageEnvelope(n, n), messageCollector, taskCoordinator));
+    testClock.advanceTime(Duration.ofSeconds(1));
+    task.window(messageCollector, taskCoordinator);
+
+    Assert.assertEquals(windowPanes.size(), 7);
+    Assert.assertEquals(windowPanes.get(0).getKey().getKey(), new Integer(1));
+    Assert.assertEquals((windowPanes.get(0).getMessage()).size(), 2);
+
+    Assert.assertEquals(windowPanes.get(1).getKey().getKey(), new Integer(2));
+    Assert.assertEquals((windowPanes.get(1).getMessage()).size(), 2);
+
+    Assert.assertEquals(windowPanes.get(2).getKey().getKey(), new Integer(1));
+    Assert.assertEquals((windowPanes.get(2).getMessage()).size(), 4);
+
+    Assert.assertEquals(windowPanes.get(3).getKey().getKey(), new Integer(2));
+    Assert.assertEquals((windowPanes.get(3).getMessage()).size(), 4);
+  }
+
+  @Test
+  public void testSessionWindowsDiscardingMode() throws Exception {
+    StreamApplication sgb = new KeyedSessionWindowStreamApplication(AccumulationMode.DISCARDING, Duration.ofMillis(500));
+    TestClock testClock = new TestClock();
+    StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock);
+    task.init(config, taskContext);
+
+    task.process(new IntegerMessageEnvelope(1, 1), messageCollector, taskCoordinator);
+    task.process(new IntegerMessageEnvelope(1, 1), messageCollector, taskCoordinator);
+    testClock.advanceTime(Duration.ofSeconds(1));
+    task.window(messageCollector, taskCoordinator);
+
+    Assert.assertEquals(windowPanes.size(), 1);
+    Assert.assertEquals(windowPanes.get(0).getKey().getPaneId(), "1");
+    Assert.assertEquals(windowPanes.get(0).getKey().getKey(), new Integer(1));
+
+    task.process(new IntegerMessageEnvelope(2, 2), messageCollector, taskCoordinator);
+    task.process(new IntegerMessageEnvelope(2, 2), messageCollector, taskCoordinator);
+    task.process(new IntegerMessageEnvelope(3, 3), messageCollector, taskCoordinator);
+    task.process(new IntegerMessageEnvelope(3, 3), messageCollector, taskCoordinator);
+
+    testClock.advanceTime(Duration.ofSeconds(1));
+    task.window(messageCollector, taskCoordinator);
+
+    Assert.assertEquals(windowPanes.size(), 3);
+    Assert.assertEquals(windowPanes.get(0).getKey().getPaneId(), "1");
+    Assert.assertEquals(windowPanes.get(1).getKey().getPaneId(), "1001");
+    Assert.assertEquals(windowPanes.get(2).getKey().getPaneId(), "1001");
+    Assert.assertEquals((windowPanes.get(0).getMessage()).size(), 2);
+    Assert.assertEquals((windowPanes.get(1).getMessage()).size(), 2);
+    Assert.assertEquals((windowPanes.get(2).getMessage()).size(), 2);
+
+    task.process(new IntegerMessageEnvelope(2, 2), messageCollector, taskCoordinator);
+    task.process(new IntegerMessageEnvelope(2, 2), messageCollector, taskCoordinator);
+
+    testClock.advanceTime(Duration.ofSeconds(1));
+    task.window(messageCollector, taskCoordinator);
+    Assert.assertEquals(windowPanes.size(), 4);
+    Assert.assertEquals(windowPanes.get(3).getKey().getKey(), new Integer(2));
+    Assert.assertEquals(windowPanes.get(3).getKey().getPaneId(), "2001");
+    Assert.assertEquals((windowPanes.get(3).getMessage()).size(), 2);
+
+  }
+
+  @Test
+  public void testSessionWindowsAccumulatingMode() throws Exception {
+    StreamApplication sgb = new KeyedSessionWindowStreamApplication(AccumulationMode.DISCARDING,
+        Duration.ofMillis(500));
+    TestClock testClock = new TestClock();
+    StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock);
+    task.init(config, taskContext);
+
+    task.process(new IntegerMessageEnvelope(1, 1), messageCollector, taskCoordinator);
+    task.process(new IntegerMessageEnvelope(1, 1), messageCollector, taskCoordinator);
+    testClock.advanceTime(Duration.ofSeconds(1));
+
+    task.process(new IntegerMessageEnvelope(2, 2), messageCollector, taskCoordinator);
+    task.process(new IntegerMessageEnvelope(2, 2), messageCollector, taskCoordinator);
+
+    task.process(new IntegerMessageEnvelope(2, 2), messageCollector, taskCoordinator);
+    task.process(new IntegerMessageEnvelope(2, 2), messageCollector, taskCoordinator);
+
+    testClock.advanceTime(Duration.ofSeconds(1));
+    task.window(messageCollector, taskCoordinator);
+    Assert.assertEquals(windowPanes.size(), 2);
+    Assert.assertEquals((windowPanes.get(0).getMessage()).size(), 2);
+    Assert.assertEquals(windowPanes.get(0).getKey().getKey(), new Integer(1));
+    Assert.assertEquals(windowPanes.get(1).getKey().getKey(), new Integer(2));
+    Assert.assertEquals((windowPanes.get(0).getMessage()).size(), 2);
+    Assert.assertEquals((windowPanes.get(1).getMessage()).size(), 4);
+  }
+
+  @Test
+  public void testCancellationOfOnceTrigger() throws Exception {
+    StreamApplication sgb = new KeyedTumblingWindowStreamApplication(AccumulationMode.ACCUMULATING,
+        Duration.ofSeconds(1), Triggers.count(2));
+    TestClock testClock = new TestClock();
+    StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock);
+    task.init(config, taskContext);
+
+    task.process(new IntegerMessageEnvelope(1, 1), messageCollector, taskCoordinator);
+    task.process(new IntegerMessageEnvelope(1, 2), messageCollector, taskCoordinator);
+    Assert.assertEquals(windowPanes.size(), 1);
+    Assert.assertEquals(windowPanes.get(0).getKey().getPaneId(), "0");
+    Assert.assertEquals(windowPanes.get(0).getKey().getKey(), new Integer(1));
+    Assert.assertEquals(windowPanes.get(0).getFiringType(), FiringType.EARLY);
+
+    task.process(new IntegerMessageEnvelope(1, 3), messageCollector, taskCoordinator);
+    task.process(new IntegerMessageEnvelope(1, 4), messageCollector, taskCoordinator);
+    task.process(new IntegerMessageEnvelope(1, 5), messageCollector, taskCoordinator);
+
+    Assert.assertEquals(windowPanes.size(), 1);
+
+    testClock.advanceTime(Duration.ofSeconds(1));
+    task.window(messageCollector, taskCoordinator);
+
+    Assert.assertEquals(windowPanes.size(), 2);
+    Assert.assertEquals(windowPanes.get(0).getKey().getPaneId(), "0");
+    Assert.assertEquals(windowPanes.get(1).getKey().getPaneId(), "0");
+    Assert.assertEquals(windowPanes.get(1).getFiringType(), FiringType.DEFAULT);
+
+    task.process(new IntegerMessageEnvelope(3, 6), messageCollector, taskCoordinator);
+    testClock.advanceTime(Duration.ofSeconds(1));
+    task.window(messageCollector, taskCoordinator);
+
+    Assert.assertEquals(windowPanes.size(), 3);
+    Assert.assertEquals(windowPanes.get(2).getKey().getKey(), new Integer(3));
+    Assert.assertEquals(windowPanes.get(2).getKey().getPaneId(), "1000");
+    Assert.assertEquals(windowPanes.get(2).getFiringType(), FiringType.DEFAULT);
+    Assert.assertEquals((windowPanes.get(2).getMessage()).size(), 1);
+
+  }
+
+  @Test
+  public void testCancellationOfAnyTrigger() throws Exception {
+    StreamApplication sgb = new KeyedTumblingWindowStreamApplication(AccumulationMode.ACCUMULATING, Duration.ofSeconds(1),
+        Triggers.any(Triggers.count(2), Triggers.timeSinceFirstMessage(Duration.ofMillis(500))));
+    TestClock testClock = new TestClock();
+    StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock);
+    task.init(config, taskContext);
+
+    task.process(new IntegerMessageEnvelope(1, 1), messageCollector, taskCoordinator);
+    task.process(new IntegerMessageEnvelope(1, 2), messageCollector, taskCoordinator);
+    //assert that the count trigger fired
+    Assert.assertEquals(windowPanes.size(), 1);
+
+    //advance the timer to enable the triggering of the inner timeSinceFirstMessage trigger
+    testClock.advanceTime(Duration.ofMillis(500));
+
+    //assert that the triggering of the count trigger cancelled the inner timeSinceFirstMessage trigger
+    Assert.assertEquals(windowPanes.size(), 1);
+
+    task.process(new IntegerMessageEnvelope(1, 3), messageCollector, taskCoordinator);
+    task.process(new IntegerMessageEnvelope(1, 4), messageCollector, taskCoordinator);
+    task.process(new IntegerMessageEnvelope(1, 5), messageCollector, taskCoordinator);
+
+    //advance timer by 500 more millis to enable the default trigger
+    testClock.advanceTime(Duration.ofMillis(500));
+    task.window(messageCollector, taskCoordinator);
+
+    //assert that the default trigger fired
+    Assert.assertEquals(windowPanes.size(), 2);
+    Assert.assertEquals(windowPanes.get(1).getFiringType(), FiringType.DEFAULT);
+    Assert.assertEquals(windowPanes.get(1).getKey().getKey(), new Integer(1));
+    Assert.assertEquals(windowPanes.get(1).getKey().getPaneId(), "0");
+    Assert.assertEquals((windowPanes.get(1).getMessage()).size(), 5);
+
+    task.process(new IntegerMessageEnvelope(1, 5), messageCollector, taskCoordinator);
+
+    //advance timer by 500 millis to enable the inner timeSinceFirstMessage trigger
+    testClock.advanceTime(Duration.ofMillis(500));
+    task.window(messageCollector, taskCoordinator);
+
+    Assert.assertEquals(windowPanes.size(), 3);
+    Assert.assertEquals(windowPanes.get(2).getFiringType(), FiringType.EARLY);
+    Assert.assertEquals(windowPanes.get(2).getKey().getKey(), new Integer(1));
+    Assert.assertEquals(windowPanes.get(2).getKey().getPaneId(), "1000");
+
+    //advance timer by > 500 millis to enable the default trigger
+    testClock.advanceTime(Duration.ofMillis(900));
+    task.window(messageCollector, taskCoordinator);
+    Assert.assertEquals(windowPanes.size(), 4);
+    Assert.assertEquals(windowPanes.get(3).getFiringType(), FiringType.DEFAULT);
+    Assert.assertEquals(windowPanes.get(3).getKey().getKey(), new Integer(1));
+    Assert.assertEquals(windowPanes.get(3).getKey().getPaneId(), "1000");
+  }
+
+  @Test
+  public void testCancelationOfRepeatingNestedTriggers() throws Exception {
+
+    StreamApplication sgb = new KeyedTumblingWindowStreamApplication(AccumulationMode.ACCUMULATING, Duration.ofSeconds(1),
+        Triggers.repeat(Triggers.any(Triggers.count(2), Triggers.timeSinceFirstMessage(Duration.ofMillis(500)))));
+    TestClock testClock = new TestClock();
+    StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock);
+    task.init(config, taskContext);
+
+    task.process(new IntegerMessageEnvelope(1, 1), messageCollector, taskCoordinator);
+
+    task.process(new IntegerMessageEnvelope(1, 2), messageCollector, taskCoordinator);
+    //assert that the count trigger fired
+    Assert.assertEquals(windowPanes.size(), 1);
+
+    //advance the timer to enable the potential triggering of the inner timeSinceFirstMessage trigger
+    task.process(new IntegerMessageEnvelope(1, 3), messageCollector, taskCoordinator);
+    testClock.advanceTime(Duration.ofMillis(500));
+    //assert that the triggering of the count trigger cancelled the inner timeSinceFirstMessage trigger
+    task.window(messageCollector, taskCoordinator);
+    Assert.assertEquals(windowPanes.size(), 2);
+
+    task.process(new IntegerMessageEnvelope(1, 3), messageCollector, taskCoordinator);
+    task.process(new IntegerMessageEnvelope(1, 4), messageCollector, taskCoordinator);
+    Assert.assertEquals(windowPanes.size(), 3);
+
+    task.process(new IntegerMessageEnvelope(1, 5), messageCollector, taskCoordinator);
+    //advance timer by 500 more millis to enable the default trigger
+    testClock.advanceTime(Duration.ofMillis(500));
+    task.window(messageCollector, taskCoordinator);
+    //assert that the default trigger fired
+    Assert.assertEquals(windowPanes.size(), 4);
+  }
+
+  private class KeyedTumblingWindowStreamApplication implements StreamApplication {
+
+    private final AccumulationMode mode;
+    private final Duration duration;
+    private final Trigger<MessageEnvelope<Integer, Integer>> earlyTrigger;
+
+    KeyedTumblingWindowStreamApplication(AccumulationMode mode,
+        Duration timeDuration, Trigger<MessageEnvelope<Integer, Integer>> earlyTrigger) {
+      this.mode = mode;
+      this.duration = timeDuration;
+      this.earlyTrigger = earlyTrigger;
+    }
+
+    @Override
+    public void init(StreamGraph graph, Config config) {
+      MessageStream<MessageEnvelope<Integer, Integer>> inStream = graph.getInputStream("integer-stream",
+          (k, m) -> new MessageEnvelope(k, m));
+      Function<MessageEnvelope<Integer, Integer>, Integer> keyFn = m -> m.getKey();
+      inStream
+        .map(m -> m)
+        .window(Windows.keyedTumblingWindow(keyFn, duration).setEarlyTrigger(earlyTrigger)
+          .setAccumulationMode(mode))
+        .map(m -> {
+            windowPanes.add(m);
+            return m;
+          });
+    }
+  }
+
+  private class KeyedSessionWindowStreamApplication implements StreamApplication {
+
+    private final AccumulationMode mode;
+    private final Duration duration;
+
+    KeyedSessionWindowStreamApplication(AccumulationMode mode, Duration duration) {
+      this.mode = mode;
+      this.duration = duration;
+    }
+
+    @Override
+    public void init(StreamGraph graph, Config config) {
+      MessageStream<MessageEnvelope<Integer, Integer>> inStream = graph.getInputStream("integer-stream",
+          (k, m) -> new MessageEnvelope(k, m));
+      Function<MessageEnvelope<Integer, Integer>, Integer> keyFn = m -> m.getKey();
+
+      inStream
+          .map(m -> m)
+          .window(Windows.keyedSessionWindow(keyFn, duration)
+              .setAccumulationMode(mode))
+          .map(m -> {
+              windowPanes.add(m);
+              return m;
+            });
+    }
+  }
+
+  private class IntegerMessageEnvelope extends IncomingMessageEnvelope {
+    IntegerMessageEnvelope(int key, int msg) {
+      super(new SystemStreamPartition("kafka", "integers", new Partition(0)), "1", key, msg);
+    }
+  }
+
+  private class MessageEnvelope<K, V> {
+    private final K key;
+    private final V value;
+
+    MessageEnvelope(K key, V value) {
+      this.key = key;
+      this.value = value;
+    }
+
+    public K getKey() {
+      return key;
+    }
+
+    public V getValue() {
+      return value;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/test/java/org/apache/samza/operators/data/JsonIncomingSystemMessageEnvelope.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/data/JsonIncomingSystemMessageEnvelope.java b/samza-core/src/test/java/org/apache/samza/operators/data/JsonIncomingSystemMessageEnvelope.java
deleted file mode 100644
index 9a425d1..0000000
--- a/samza-core/src/test/java/org/apache/samza/operators/data/JsonIncomingSystemMessageEnvelope.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.data;
-
-import org.apache.samza.system.SystemStreamPartition;
-
-
-/**
- * Example input {@link MessageEnvelope} w/ Json message and string as the key.
- */
-
-public class JsonIncomingSystemMessageEnvelope<T> implements MessageEnvelope<String, T> {
-
-  private final String key;
-  private final T data;
-  private final Offset offset;
-  private final SystemStreamPartition partition;
-
-  public JsonIncomingSystemMessageEnvelope(String key, T data, Offset offset, SystemStreamPartition partition) {
-    this.key = key;
-    this.data = data;
-    this.offset = offset;
-    this.partition = partition;
-  }
-
-  @Override
-  public T getMessage() {
-    return this.data;
-  }
-
-  @Override
-  public String getKey() {
-    return this.key;
-  }
-
-  public Offset getOffset() {
-    return this.offset;
-  }
-
-  public SystemStreamPartition getSystemStreamPartition() {
-    return this.partition;
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/test/java/org/apache/samza/operators/data/TestMessageEnvelope.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/data/TestMessageEnvelope.java b/samza-core/src/test/java/org/apache/samza/operators/data/TestMessageEnvelope.java
new file mode 100644
index 0000000..2524c28
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/operators/data/TestMessageEnvelope.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.data;
+
+
+public class TestMessageEnvelope {
+
+  private final String key;
+  private final MessageType value;
+
+  public TestMessageEnvelope(String key, String value, long eventTime) {
+    this.key = key;
+    this.value = new MessageType(value, eventTime);
+  }
+
+  public MessageType getMessage() {
+    return this.value;
+  }
+
+  public String getKey() {
+    return this.key;
+  }
+
+  public class MessageType {
+    private final String value;
+    private final long eventTime;
+
+    public MessageType(String value, long eventTime) {
+      this.value = value;
+      this.eventTime = eventTime;
+    }
+
+    public long getEventTime() {
+      return eventTime;
+    }
+
+    public String getValue() {
+      return value;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/test/java/org/apache/samza/operators/data/TestOutputMessageEnvelope.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/data/TestOutputMessageEnvelope.java b/samza-core/src/test/java/org/apache/samza/operators/data/TestOutputMessageEnvelope.java
new file mode 100644
index 0000000..f9537a3
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/operators/data/TestOutputMessageEnvelope.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.data;
+
+
+public class TestOutputMessageEnvelope {
+  private final String key;
+  private final Integer value;
+
+  public TestOutputMessageEnvelope(String key, Integer value) {
+    this.key = key;
+    this.value = value;
+  }
+
+  public Integer getMessage() {
+    return this.value;
+  }
+
+  public String getKey() {
+    return this.key;
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
index 5722dbd..f978c3c 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
@@ -18,8 +18,8 @@
  */
 package org.apache.samza.operators.impl;
 
-import org.apache.samza.operators.TestMessageEnvelope;
-import org.apache.samza.operators.TestOutputMessageEnvelope;
+import org.apache.samza.operators.data.TestMessageEnvelope;
+import org.apache.samza.operators.data.TestOutputMessageEnvelope;
 import org.apache.samza.task.MessageCollector;
 import org.apache.samza.task.TaskCoordinator;
 import org.hamcrest.core.IsEqual;

http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java
index 31f6f4a..267cdfc 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java
@@ -18,24 +18,21 @@
  */
 package org.apache.samza.operators.impl;
 
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
 import org.apache.samza.config.Config;
 import org.apache.samza.operators.MessageStreamImpl;
 import org.apache.samza.operators.StreamGraphImpl;
-import org.apache.samza.operators.TestMessageEnvelope;
 import org.apache.samza.operators.TestMessageStreamImplUtil;
-import org.apache.samza.operators.TestOutputMessageEnvelope;
-import org.apache.samza.operators.data.MessageEnvelope;
+import org.apache.samza.operators.data.TestMessageEnvelope;
+import org.apache.samza.operators.data.TestOutputMessageEnvelope;
 import org.apache.samza.operators.functions.FlatMapFunction;
 import org.apache.samza.operators.functions.JoinFunction;
 import org.apache.samza.operators.functions.PartialJoinFunction;
 import org.apache.samza.operators.functions.SinkFunction;
 import org.apache.samza.operators.spec.OperatorSpec;
-import org.apache.samza.operators.spec.WindowOperatorSpec;
-import org.apache.samza.operators.spec.PartialJoinOperatorSpec;
 import org.apache.samza.operators.spec.SinkOperatorSpec;
+import org.apache.samza.operators.spec.PartialJoinOperatorSpec;
 import org.apache.samza.operators.spec.StreamOperatorSpec;
+import org.apache.samza.operators.spec.WindowOperatorSpec;
 import org.apache.samza.operators.windows.Windows;
 import org.apache.samza.operators.windows.internal.WindowInternal;
 import org.apache.samza.operators.windows.internal.WindowType;
@@ -44,6 +41,8 @@ import org.junit.Before;
 import org.junit.Test;
 
 import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Iterator;
@@ -66,11 +65,11 @@ public class TestOperatorImpls {
     nextOperatorsField = OperatorImpl.class.getDeclaredField("nextOperators");
     nextOperatorsField.setAccessible(true);
 
-    createOpMethod = OperatorGraph.class.getDeclaredMethod("createOperatorImpl", MessageStreamImpl.class,
+    createOpMethod = OperatorImplGraph.class.getDeclaredMethod("createOperatorImpl", MessageStreamImpl.class,
         OperatorSpec.class, Config.class, TaskContext.class);
     createOpMethod.setAccessible(true);
 
-    createOpsMethod = OperatorGraph.class.getDeclaredMethod("createOperatorImpls", MessageStreamImpl.class, Config.class, TaskContext.class);
+    createOpsMethod = OperatorImplGraph.class.getDeclaredMethod("createOperatorImpls", MessageStreamImpl.class, Config.class, TaskContext.class);
     createOpsMethod.setAccessible(true);
   }
 
@@ -84,8 +83,8 @@ public class TestOperatorImpls {
     Config mockConfig = mock(Config.class);
     TaskContext mockContext = mock(TaskContext.class);
 
-    OperatorGraph opGraph = new OperatorGraph();
-    OperatorImpl<TestMessageEnvelope, ? extends MessageEnvelope> opImpl = (OperatorImpl<TestMessageEnvelope, ? extends MessageEnvelope>)
+    OperatorImplGraph opGraph = new OperatorImplGraph();
+    OperatorImpl<TestMessageEnvelope, ?> opImpl = (OperatorImpl<TestMessageEnvelope, ?>)
         createOpMethod.invoke(opGraph, mockStream, mockWnd, mockConfig, mockContext);
     assertTrue(opImpl instanceof WindowOperatorImpl);
     Field wndInternalField = WindowOperatorImpl.class.getDeclaredField("window");
@@ -97,7 +96,7 @@ public class TestOperatorImpls {
     StreamOperatorSpec<TestMessageEnvelope, TestOutputMessageEnvelope> mockSimpleOp = mock(StreamOperatorSpec.class);
     FlatMapFunction<TestMessageEnvelope, TestOutputMessageEnvelope> mockTxfmFn = mock(FlatMapFunction.class);
     when(mockSimpleOp.getTransformFn()).thenReturn(mockTxfmFn);
-    opImpl = (OperatorImpl<TestMessageEnvelope, ? extends MessageEnvelope>) createOpMethod.invoke(opGraph, mockStream, mockSimpleOp, mockConfig, mockContext);
+    opImpl = (OperatorImpl<TestMessageEnvelope, ?>) createOpMethod.invoke(opGraph, mockStream, mockSimpleOp, mockConfig, mockContext);
     assertTrue(opImpl instanceof StreamOperatorImpl);
     Field txfmFnField = StreamOperatorImpl.class.getDeclaredField("transformFn");
     txfmFnField.setAccessible(true);
@@ -107,7 +106,7 @@ public class TestOperatorImpls {
     SinkFunction<TestMessageEnvelope> sinkFn = (m, mc, tc) -> { };
     SinkOperatorSpec<TestMessageEnvelope> sinkOp = mock(SinkOperatorSpec.class);
     when(sinkOp.getSinkFn()).thenReturn(sinkFn);
-    opImpl = (OperatorImpl<TestMessageEnvelope, ? extends MessageEnvelope>) createOpMethod.invoke(opGraph, mockStream, sinkOp, mockConfig, mockContext);
+    opImpl = (OperatorImpl<TestMessageEnvelope, ?>) createOpMethod.invoke(opGraph, mockStream, sinkOp, mockConfig, mockContext);
     assertTrue(opImpl instanceof SinkOperatorImpl);
     Field sinkFnField = SinkOperatorImpl.class.getDeclaredField("sinkFn");
     sinkFnField.setAccessible(true);
@@ -116,7 +115,7 @@ public class TestOperatorImpls {
     // get join operator
     PartialJoinOperatorSpec<String, TestMessageEnvelope, TestMessageEnvelope, TestOutputMessageEnvelope> joinOp = mock(PartialJoinOperatorSpec.class);
     PartialJoinFunction<String, TestMessageEnvelope, TestMessageEnvelope, TestOutputMessageEnvelope> joinFn = mock(PartialJoinFunction.class);
-    opImpl = (OperatorImpl<TestMessageEnvelope, ? extends MessageEnvelope>) createOpMethod.invoke(opGraph, mockStream, joinOp, mockConfig, mockContext);
+    opImpl = (OperatorImpl<TestMessageEnvelope, ?>) createOpMethod.invoke(opGraph, mockStream, joinOp, mockConfig, mockContext);
     assertTrue(opImpl instanceof PartialJoinOperatorImpl);
   }
 
@@ -126,7 +125,7 @@ public class TestOperatorImpls {
     MessageStreamImpl<TestMessageEnvelope> testStream = mock(MessageStreamImpl.class);
     TaskContext mockContext = mock(TaskContext.class);
     Config mockConfig = mock(Config.class);
-    OperatorGraph opGraph = new OperatorGraph();
+    OperatorImplGraph opGraph = new OperatorImplGraph();
     RootOperatorImpl operatorChain = (RootOperatorImpl) createOpsMethod.invoke(opGraph, testStream, mockConfig, mockContext);
     assertTrue(operatorChain != null);
   }
@@ -139,7 +138,7 @@ public class TestOperatorImpls {
     TaskContext mockContext = mock(TaskContext.class);
     Config mockConfig = mock(Config.class);
     testInput.map(m -> m).window(Windows.keyedSessionWindow(TestMessageEnvelope::getKey, Duration.ofMinutes(10)));
-    OperatorGraph opGraph = new OperatorGraph();
+    OperatorImplGraph opGraph = new OperatorImplGraph();
     RootOperatorImpl operatorChain = (RootOperatorImpl) createOpsMethod.invoke(opGraph, testInput, mockConfig, mockContext);
     Set<OperatorImpl> subsSet = (Set<OperatorImpl>) nextOperatorsField.get(operatorChain);
     assertEquals(subsSet.size(), 1);
@@ -160,7 +159,7 @@ public class TestOperatorImpls {
     Config mockConfig = mock(Config.class);
     testInput.filter(m -> m.getMessage().getEventTime() > 123456L).flatMap(m -> new ArrayList() { { this.add(m); this.add(m); } });
     testInput.filter(m -> m.getMessage().getEventTime() < 123456L).map(m -> m);
-    OperatorGraph opGraph = new OperatorGraph();
+    OperatorImplGraph opGraph = new OperatorImplGraph();
     RootOperatorImpl operatorChain = (RootOperatorImpl) createOpsMethod.invoke(opGraph, testInput, mockConfig, mockContext);
     Set<OperatorImpl> subsSet = (Set<OperatorImpl>) nextOperatorsField.get(operatorChain);
     assertEquals(subsSet.size(), 2);
@@ -208,7 +207,7 @@ public class TestOperatorImpls {
               }
             }, Duration.ofMinutes(1))
         .map(m -> m);
-    OperatorGraph opGraph = new OperatorGraph();
+    OperatorImplGraph opGraph = new OperatorImplGraph();
     // now, we create chained operators from each input sources
     RootOperatorImpl chain1 = (RootOperatorImpl) createOpsMethod.invoke(opGraph, input1, mockConfig, mockContext);
     RootOperatorImpl chain2 = (RootOperatorImpl) createOpsMethod.invoke(opGraph, input2, mockConfig, mockContext);

http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java
index ce9fdd2..abd7740 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java
@@ -19,7 +19,7 @@
 package org.apache.samza.operators.impl;
 
 import org.apache.samza.config.Config;
-import org.apache.samza.operators.TestOutputMessageEnvelope;
+import org.apache.samza.operators.data.TestOutputMessageEnvelope;
 import org.apache.samza.operators.functions.SinkFunction;
 import org.apache.samza.operators.spec.SinkOperatorSpec;
 import org.apache.samza.task.MessageCollector;

http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java
index 0a873fd..9dd161a 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java
@@ -22,8 +22,8 @@ import java.util.ArrayList;
 import java.util.Collection;
 import org.apache.samza.config.Config;
 import org.apache.samza.operators.MessageStreamImpl;
-import org.apache.samza.operators.TestMessageEnvelope;
-import org.apache.samza.operators.TestOutputMessageEnvelope;
+import org.apache.samza.operators.data.TestMessageEnvelope;
+import org.apache.samza.operators.data.TestOutputMessageEnvelope;
 import org.apache.samza.operators.functions.FlatMapFunction;
 import org.apache.samza.operators.spec.StreamOperatorSpec;
 import org.apache.samza.task.MessageCollector;

http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpecs.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpecs.java b/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpecs.java
index ec1d74c..37e3d1a 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpecs.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpecs.java
@@ -20,16 +20,16 @@ package org.apache.samza.operators.spec;
 
 import org.apache.samza.operators.MessageStreamImpl;
 import org.apache.samza.operators.StreamGraphImpl;
-import org.apache.samza.operators.TestMessageEnvelope;
 import org.apache.samza.operators.TestMessageStreamImplUtil;
-import org.apache.samza.operators.TestOutputMessageEnvelope;
-import org.apache.samza.operators.data.MessageEnvelope;
+import org.apache.samza.operators.data.TestMessageEnvelope;
+import org.apache.samza.operators.data.TestOutputMessageEnvelope;
 import org.apache.samza.operators.functions.FlatMapFunction;
 import org.apache.samza.operators.functions.FoldLeftFunction;
 import org.apache.samza.operators.functions.PartialJoinFunction;
 import org.apache.samza.operators.functions.SinkFunction;
-import org.apache.samza.operators.windows.internal.WindowInternal;
+import org.apache.samza.operators.stream.OutputStreamInternalImpl;
 import org.apache.samza.operators.windows.WindowPane;
+import org.apache.samza.operators.windows.internal.WindowInternal;
 import org.apache.samza.operators.windows.internal.WindowType;
 import org.apache.samza.task.MessageCollector;
 import org.apache.samza.task.TaskCoordinator;
@@ -41,58 +41,79 @@ import java.util.function.Function;
 import java.util.function.Supplier;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertNotNull;
 import static org.mockito.Mockito.mock;
 
 
 public class TestOperatorSpecs {
   @Test
-  public void testGetStreamOperator() {
-    FlatMapFunction<MessageEnvelope, TestMessageEnvelope> transformFn = m -> new ArrayList<TestMessageEnvelope>() { {
-          this.add(new TestMessageEnvelope(m.getKey().toString(), m.getMessage().toString(), 12345L));
+  public void testCreateStreamOperator() {
+    FlatMapFunction<?, TestMessageEnvelope> transformFn = m -> new ArrayList<TestMessageEnvelope>() { {
+          this.add(new TestMessageEnvelope(m.toString(), m.toString(), 12345L));
         } };
     MessageStreamImpl<TestMessageEnvelope> mockOutput = mock(MessageStreamImpl.class);
-    StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
-    StreamOperatorSpec<MessageEnvelope, TestMessageEnvelope> strmOp = OperatorSpecs.createStreamOperatorSpec(transformFn, mockGraph, mockOutput);
-    assertEquals(strmOp.getTransformFn(), transformFn);
-    assertEquals(strmOp.getNextStream(), mockOutput);
+    StreamOperatorSpec<?, TestMessageEnvelope> streamOp =
+        OperatorSpecs.createStreamOperatorSpec(transformFn, mockOutput, 1);
+    assertEquals(streamOp.getTransformFn(), transformFn);
+    assertEquals(streamOp.getNextStream(), mockOutput);
   }
 
   @Test
-  public void testGetSinkOperator() {
+  public void testCreateSinkOperator() {
     SinkFunction<TestMessageEnvelope> sinkFn = (TestMessageEnvelope message, MessageCollector messageCollector,
           TaskCoordinator taskCoordinator) -> { };
-    StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
-    SinkOperatorSpec<TestMessageEnvelope> sinkOp = OperatorSpecs.createSinkOperatorSpec(sinkFn, mockGraph);
+    SinkOperatorSpec<TestMessageEnvelope> sinkOp = OperatorSpecs.createSinkOperatorSpec(sinkFn, 1);
     assertEquals(sinkOp.getSinkFn(), sinkFn);
-    assertTrue(sinkOp.getNextStream() == null);
+    assertEquals(sinkOp.getOpCode(), OperatorSpec.OpCode.SINK);
+    assertEquals(sinkOp.getNextStream(), null);
+  }
+
+  @Test
+  public void testCreateSendToOperator() {
+    OutputStreamInternalImpl<Object, Object, TestMessageEnvelope> mockOutput = mock(OutputStreamInternalImpl.class);
+    SinkOperatorSpec<TestMessageEnvelope> sinkOp = OperatorSpecs.createSendToOperatorSpec(mockOutput, 1);
+    assertNotNull(sinkOp.getSinkFn());
+    assertEquals(sinkOp.getOpCode(), OperatorSpec.OpCode.SEND_TO);
+    assertEquals(sinkOp.getNextStream(), null);
+  }
+
+
+  @Test
+  public void testCreatePartitionByOperator() {
+    OutputStreamInternalImpl<Object, Object, TestMessageEnvelope> mockOutput = mock(OutputStreamInternalImpl.class);
+    SinkOperatorSpec<TestMessageEnvelope> sinkOp = OperatorSpecs.createPartitionByOperatorSpec(mockOutput, 1);
+    assertNotNull(sinkOp.getSinkFn());
+    assertEquals(sinkOp.getOpCode(), OperatorSpec.OpCode.PARTITION_BY);
+    assertEquals(sinkOp.getNextStream(), null);
   }
 
   @Test
-  public void testGetWindowOperator() throws Exception {
+  public void testCreateWindowOperator() throws Exception {
     Function<TestMessageEnvelope, String> keyExtractor = m -> "globalkey";
     FoldLeftFunction<TestMessageEnvelope, Integer> aggregator = (m, c) -> c + 1;
     Supplier<Integer> initialValue = () -> 0;
     //instantiate a window using reflection
     WindowInternal window = new WindowInternal(null, initialValue, aggregator, keyExtractor, null, WindowType.TUMBLING);
 
-    StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
     MessageStreamImpl<WindowPane<String, Integer>> mockWndOut = mock(MessageStreamImpl.class);
-    WindowOperatorSpec spec = OperatorSpecs.<TestMessageEnvelope, String, Integer>createWindowOperatorSpec(window, mockGraph, mockWndOut);
+    WindowOperatorSpec spec =
+        OperatorSpecs.<TestMessageEnvelope, String, Integer>createWindowOperatorSpec(window, mockWndOut, 1);
     assertEquals(spec.getWindow(), window);
     assertEquals(spec.getWindow().getKeyExtractor(), keyExtractor);
     assertEquals(spec.getWindow().getFoldLeftFunction(), aggregator);
   }
 
   @Test
-  public void testGetPartialJoinOperator() {
-    PartialJoinFunction<String, TestMessageEnvelope, TestMessageEnvelope, TestOutputMessageEnvelope> thisPartialJoinFn = mock(PartialJoinFunction.class);
-    PartialJoinFunction<String, TestMessageEnvelope, TestMessageEnvelope, TestOutputMessageEnvelope> otherPartialJoinFn = mock(PartialJoinFunction.class);
+  public void testCreatePartialJoinOperator() {
+    PartialJoinFunction<String, TestMessageEnvelope, TestMessageEnvelope, TestOutputMessageEnvelope> thisPartialJoinFn
+        = mock(PartialJoinFunction.class);
+    PartialJoinFunction<String, TestMessageEnvelope, TestMessageEnvelope, TestOutputMessageEnvelope> otherPartialJoinFn
+        = mock(PartialJoinFunction.class);
     StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
     MessageStreamImpl<TestOutputMessageEnvelope> joinOutput = TestMessageStreamImplUtil.getMessageStreamImpl(mockGraph);
 
-    PartialJoinOperatorSpec<String, TestMessageEnvelope, TestMessageEnvelope, TestOutputMessageEnvelope> partialJoinSpec =
-        OperatorSpecs.createPartialJoinOperatorSpec(thisPartialJoinFn, otherPartialJoinFn, 1000 * 60, mockGraph, joinOutput);
+    PartialJoinOperatorSpec<String, TestMessageEnvelope, TestMessageEnvelope, TestOutputMessageEnvelope> partialJoinSpec
+        = OperatorSpecs.createPartialJoinOperatorSpec(thisPartialJoinFn, otherPartialJoinFn, 1000 * 60, joinOutput, 1);
 
     assertEquals(partialJoinSpec.getNextStream(), joinOutput);
     assertEquals(partialJoinSpec.getThisPartialJoinFn(), thisPartialJoinFn);
@@ -100,13 +121,15 @@ public class TestOperatorSpecs {
   }
 
   @Test
-  public void testGetMergeOperator() {
+  public void testCreateMergeOperator() {
     StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
     MessageStreamImpl<TestMessageEnvelope> output = TestMessageStreamImplUtil.<TestMessageEnvelope>getMessageStreamImpl(mockGraph);
-    StreamOperatorSpec<TestMessageEnvelope, TestMessageEnvelope> mergeOp = OperatorSpecs.createMergeOperatorSpec(mockGraph, output);
-    Function<TestMessageEnvelope, Collection<TestMessageEnvelope>> mergeFn = t -> new ArrayList<TestMessageEnvelope>() { {
-        this.add(t);
-      } };
+    StreamOperatorSpec<TestMessageEnvelope, TestMessageEnvelope> mergeOp =
+        OperatorSpecs.createMergeOperatorSpec(output, 1);
+    Function<TestMessageEnvelope, Collection<TestMessageEnvelope>> mergeFn =
+        t -> new ArrayList<TestMessageEnvelope>() { {
+            this.add(t);
+          } };
     TestMessageEnvelope t = mock(TestMessageEnvelope.class);
     assertEquals(mergeOp.getTransformFn().apply(t), mergeFn.apply(t));
     assertEquals(mergeOp.getNextStream(), output);

http://git-wip-us.apache.org/repos/asf/samza/blob/4bf8ab6e/samza-core/src/test/java/org/apache/samza/operators/triggers/TestClock.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/triggers/TestClock.java b/samza-core/src/test/java/org/apache/samza/operators/triggers/TestClock.java
deleted file mode 100644
index 674a8f1..0000000
--- a/samza-core/src/test/java/org/apache/samza/operators/triggers/TestClock.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.triggers;
-
-import org.apache.samza.util.Clock;
-
-import java.time.Duration;
-
-/**
- * An implementation of {@link Clock} that allows to advance the time by an arbitrary duration.
- * Used for testing.
- */
-public class TestClock implements Clock {
-
-  long currentTime = 1;
-
-  public void advanceTime(Duration duration) {
-    currentTime += duration.toMillis();
-  }
-
-  public void advanceTime(long millis) {
-    currentTime += millis;
-  }
-
-  @Override
-  public long currentTimeMillis() {
-    return currentTime;
-  }
-}