You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2017/09/29 00:11:44 UTC

[1/3] samza git commit: SAMZA-1386: Inline End-of-stream and Watermark logic inside OperatorImpl

Repository: samza
Updated Branches:
  refs/heads/0.14.0 475414884 -> 2819cbc76


http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/test/java/org/apache/samza/control/TestWatermarkManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/control/TestWatermarkManager.java b/samza-core/src/test/java/org/apache/samza/control/TestWatermarkManager.java
deleted file mode 100644
index 8fe7a16..0000000
--- a/samza-core/src/test/java/org/apache/samza/control/TestWatermarkManager.java
+++ /dev/null
@@ -1,260 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.control;
-
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Multimap;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import org.apache.samza.Partition;
-import org.apache.samza.container.TaskName;
-import org.apache.samza.message.WatermarkMessage;
-import org.apache.samza.system.IncomingMessageEnvelope;
-import org.apache.samza.system.OutgoingMessageEnvelope;
-import org.apache.samza.system.StreamMetadataCache;
-import org.apache.samza.system.StreamSpec;
-import org.apache.samza.system.SystemStream;
-import org.apache.samza.system.SystemStreamMetadata;
-import org.apache.samza.system.SystemStreamPartition;
-import org.apache.samza.task.MessageCollector;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.ArgumentCaptor;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyBoolean;
-import static org.mockito.Matchers.anyInt;
-import static org.mockito.Matchers.anyLong;
-import static org.mockito.Matchers.anyObject;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doNothing;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-
-public class TestWatermarkManager {
-
-  StreamMetadataCache metadataCache;
-
-  @Before
-  public void setup() {
-    SystemStreamMetadata metadata = mock(SystemStreamMetadata.class);
-    Map<Partition, SystemStreamMetadata.SystemStreamPartitionMetadata> partitionMetadata = new HashMap<>();
-    partitionMetadata.put(new Partition(0), mock(SystemStreamMetadata.SystemStreamPartitionMetadata.class));
-    partitionMetadata.put(new Partition(1), mock(SystemStreamMetadata.SystemStreamPartitionMetadata.class));
-    partitionMetadata.put(new Partition(2), mock(SystemStreamMetadata.SystemStreamPartitionMetadata.class));
-    partitionMetadata.put(new Partition(3), mock(SystemStreamMetadata.SystemStreamPartitionMetadata.class));
-    when(metadata.getSystemStreamPartitionMetadata()).thenReturn(partitionMetadata);
-    metadataCache = mock(StreamMetadataCache.class);
-    when(metadataCache.getSystemStreamMetadata(anyObject(), anyBoolean())).thenReturn(metadata);
-  }
-
-  @Test
-  public void testUpdateFromInputSource() {
-    SystemStreamPartition ssp = new SystemStreamPartition("test-system", "test-stream", new Partition(0));
-    TaskName taskName = new TaskName("Task 0");
-    Multimap<SystemStream, String> streamToTasks = HashMultimap.create();
-    streamToTasks.put(ssp.getSystemStream(), taskName.getTaskName());
-    ControlMessageListenerTask listener = mock(ControlMessageListenerTask.class);
-    when(listener.getIOGraph()).thenReturn(new IOGraph(Collections.emptyList()));
-    WatermarkManager manager = new WatermarkManager("Task 0", listener, streamToTasks, Collections.singleton(ssp), null, null);
-    long time = System.currentTimeMillis();
-    Watermark watermark = manager.update(WatermarkManager.buildWatermarkEnvelope(time, ssp));
-    assertEquals(watermark.getTimestamp(), time);
-  }
-
-  @Test
-  public void testUpdateFromIntermediateStream() {
-    SystemStreamPartition[] ssps = new SystemStreamPartition[3];
-    ssps[0] = new SystemStreamPartition("test-system", "test-stream-1", new Partition(0));
-    ssps[1] = new SystemStreamPartition("test-system", "test-stream-2", new Partition(0));
-    ssps[2] = new SystemStreamPartition("test-system", "test-stream-2", new Partition(1));
-
-    TaskName taskName = new TaskName("Task 0");
-    Multimap<SystemStream, String> streamToTasks = HashMultimap.create();
-    for (SystemStreamPartition ssp : ssps) {
-      streamToTasks.put(ssp.getSystemStream(), taskName.getTaskName());
-    }
-    ControlMessageListenerTask listener = mock(ControlMessageListenerTask.class);
-    when(listener.getIOGraph()).thenReturn(new IOGraph(Collections.emptyList()));
-
-    WatermarkManager manager = new WatermarkManager("Task 0", listener, streamToTasks, new HashSet<>(Arrays.asList(ssps)), null, null);
-    int envelopeCount = 4;
-    IncomingMessageEnvelope[] envelopes = new IncomingMessageEnvelope[envelopeCount];
-
-    long[] time = {300L, 200L, 100L, 400L};
-    for (int i = 0; i < envelopeCount; i++) {
-      envelopes[i] = new IncomingMessageEnvelope(ssps[0], "dummy-offset", "", new WatermarkMessage(time[i], "task " + i, envelopeCount));
-    }
-    for (int i = 0; i < 3; i++) {
-      assertNull(manager.update(envelopes[i]));
-    }
-    // verify the first three messages won't result in end-of-stream
-    assertEquals(manager.getWatermarkTime(ssps[0]), WatermarkManager.TIME_NOT_EXIST);
-    // the fourth message will generate a watermark
-    Watermark watermark = manager.update(envelopes[3]);
-    assertNotNull(watermark);
-    assertEquals(watermark.getTimestamp(), 100);
-    assertEquals(manager.getWatermarkTime(ssps[1]), WatermarkManager.TIME_NOT_EXIST);
-    assertEquals(manager.getWatermarkTime(ssps[2]), WatermarkManager.TIME_NOT_EXIST);
-
-
-    // stream2 has two partitions assigned to this task, so it requires a message from each partition to calculate watermarks
-    long[] time1 = {300L, 200L, 100L, 400L};
-    envelopes = new IncomingMessageEnvelope[envelopeCount];
-    for (int i = 0; i < envelopeCount; i++) {
-      envelopes[i] = new IncomingMessageEnvelope(ssps[1], "dummy-offset", "", new WatermarkMessage(time1[i], "task " + i, envelopeCount));
-    }
-    // verify the messages for the partition 0 won't generate watermark
-    for (int i = 0; i < 4; i++) {
-      assertNull(manager.update(envelopes[i]));
-    }
-    assertEquals(manager.getWatermarkTime(ssps[1]), 100L);
-
-    long[] time2 = {350L, 150L, 500L, 80L};
-    for (int i = 0; i < envelopeCount; i++) {
-      envelopes[i] = new IncomingMessageEnvelope(ssps[2], "dummy-offset", "", new WatermarkMessage(time2[i], "task " + i, envelopeCount));
-    }
-    for (int i = 0; i < 3; i++) {
-      assertNull(manager.update(envelopes[i]));
-    }
-    assertEquals(manager.getWatermarkTime(ssps[2]), WatermarkManager.TIME_NOT_EXIST);
-    // the fourth message will generate the watermark
-    watermark = manager.update(envelopes[3]);
-    assertNotNull(watermark);
-    assertEquals(manager.getWatermarkTime(ssps[2]), 80L);
-    assertEquals(watermark.getTimestamp(), 80L);
-  }
-
-  @Test
-  public void testSendWatermark() {
-    SystemStream ints = new SystemStream("test-system", "int-stream");
-    SystemStreamMetadata metadata = mock(SystemStreamMetadata.class);
-    Map<Partition, SystemStreamMetadata.SystemStreamPartitionMetadata> partitionMetadata = new HashMap<>();
-    partitionMetadata.put(new Partition(0), mock(SystemStreamMetadata.SystemStreamPartitionMetadata.class));
-    partitionMetadata.put(new Partition(1), mock(SystemStreamMetadata.SystemStreamPartitionMetadata.class));
-    partitionMetadata.put(new Partition(2), mock(SystemStreamMetadata.SystemStreamPartitionMetadata.class));
-    partitionMetadata.put(new Partition(3), mock(SystemStreamMetadata.SystemStreamPartitionMetadata.class));
-    when(metadata.getSystemStreamPartitionMetadata()).thenReturn(partitionMetadata);
-    StreamMetadataCache metadataCache = mock(StreamMetadataCache.class);
-    when(metadataCache.getSystemStreamMetadata(anyObject(), anyBoolean())).thenReturn(metadata);
-
-    MessageCollector collector = mock(MessageCollector.class);
-    ControlMessageListenerTask listener = mock(ControlMessageListenerTask.class);
-    when(listener.getIOGraph()).thenReturn(new IOGraph(Collections.emptyList()));
-
-    WatermarkManager manager = new WatermarkManager("task 0",
-        listener,
-        HashMultimap.create(),
-        Collections.EMPTY_SET,
-        metadataCache,
-        collector);
-
-    long time = System.currentTimeMillis();
-    Set<Integer> partitions = new HashSet<>();
-    doAnswer(invocation -> {
-        OutgoingMessageEnvelope envelope = (OutgoingMessageEnvelope) invocation.getArguments()[0];
-        partitions.add((Integer) envelope.getPartitionKey());
-        WatermarkMessage watermarkMessage = (WatermarkMessage) envelope.getMessage();
-        assertEquals(watermarkMessage.getTaskName(), "task 0");
-        assertEquals(watermarkMessage.getTaskCount(), 8);
-        assertEquals(watermarkMessage.getTimestamp(), time);
-        return null;
-      }).when(collector).send(any());
-
-    manager.sendWatermark(time, ints, 8);
-    assertEquals(partitions.size(), 4);
-  }
-
-  @Test
-  public void testPropagate() {
-    StreamSpec outputSpec = new StreamSpec("int-stream", "int-stream", "test-system");
-    List<StreamSpec> inputs = new ArrayList<>();
-    inputs.add(new StreamSpec("input-stream-1", "input-stream-1", "test-system"));
-    inputs.add(new StreamSpec("input-stream-2", "input-stream-2", "test-system"));
-
-    IOGraph ioGraph = TestIOGraph.buildSimpleIOGraph(inputs, outputSpec, true);
-
-    SystemStream input1 = new SystemStream("test-system", "input-stream-1");
-    SystemStream input2 = new SystemStream("test-system", "input-stream-2");
-    SystemStream input3 = new SystemStream("test-system", "input-stream-3");
-    SystemStream ints = new SystemStream("test-system", "int-stream");
-    SystemStreamPartition[] ssps0 = new SystemStreamPartition[3];
-    ssps0[0] = new SystemStreamPartition(input1, new Partition(0));
-    ssps0[1] = new SystemStreamPartition(input2, new Partition(0));
-    ssps0[2] = new SystemStreamPartition(ints, new Partition(0));
-
-    SystemStreamPartition[] ssps1 = new SystemStreamPartition[4];
-    ssps1[0] = new SystemStreamPartition(input1, new Partition(1));
-    ssps1[1] = new SystemStreamPartition(input2, new Partition(1));
-    ssps1[2] = new SystemStreamPartition(input3, new Partition(1));
-    ssps1[3] = new SystemStreamPartition(ints, new Partition(1));
-
-    SystemStreamPartition[] ssps2 = new SystemStreamPartition[2];
-    ssps2[0] = new SystemStreamPartition(input3, new Partition(2));
-    ssps2[1] = new SystemStreamPartition(ints, new Partition(2));
-
-
-    TaskName t0 = new TaskName("task 0"); //consume input1 and input2
-    TaskName t1 = new TaskName("task 1"); //consume input 1 and input2 and input 3
-    TaskName t2 = new TaskName("task 2"); //consume input2 and input 3
-    Multimap<SystemStream, String> inputToTasks = HashMultimap.create();
-    for (SystemStreamPartition ssp : ssps0) {
-      inputToTasks.put(ssp.getSystemStream(), t0.getTaskName());
-    }
-    for (SystemStreamPartition ssp : ssps1) {
-      inputToTasks.put(ssp.getSystemStream(), t1.getTaskName());
-    }
-    for (SystemStreamPartition ssp : ssps2) {
-      inputToTasks.put(ssp.getSystemStream(), t2.getTaskName());
-    }
-
-    ControlMessageListenerTask listener = mock(ControlMessageListenerTask.class);
-    when(listener.getIOGraph()).thenReturn(ioGraph);
-    WatermarkManager manager = spy(
-        new WatermarkManager(t0.getTaskName(), listener, inputToTasks, new HashSet<>(Arrays.asList(ssps0)), null, null));
-
-    IncomingMessageEnvelope envelope = WatermarkManager.buildWatermarkEnvelope(System.currentTimeMillis(), ssps0[0]);
-    doNothing().when(manager).sendWatermark(anyLong(), any(), anyInt());
-    Watermark watermark = manager.update(envelope);
-    assertNotNull(watermark);
-    long time = System.currentTimeMillis();
-    Watermark updatedWatermark = watermark.copyWithTimestamp(time);
-    updatedWatermark.propagate(ints);
-    ArgumentCaptor<Long> arg1 = ArgumentCaptor.forClass(Long.class);
-    ArgumentCaptor<SystemStream> arg2 = ArgumentCaptor.forClass(SystemStream.class);
-    ArgumentCaptor<Integer> arg3 = ArgumentCaptor.forClass(Integer.class);
-    verify(manager).sendWatermark(arg1.capture(), arg2.capture(), arg3.capture());
-    assertEquals(arg1.getValue().longValue(), time);
-    assertEquals(arg2.getValue(), ints);
-    assertEquals(arg3.getValue().intValue(), 2);
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java b/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java
index c51b1ea..af0b786 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java
@@ -32,6 +32,7 @@ import org.apache.samza.system.SystemStream;
 import org.apache.samza.system.SystemStreamPartition;
 import org.apache.samza.task.MessageCollector;
 import org.apache.samza.task.StreamOperatorTask;
+import org.apache.samza.container.TaskContextImpl;
 import org.apache.samza.task.TaskContext;
 import org.apache.samza.task.TaskCoordinator;
 import org.apache.samza.testUtils.TestClock;
@@ -253,7 +254,7 @@ public class TestJoinOperator {
     when(runner.getStreamSpec("instream")).thenReturn(new StreamSpec("instream", "instream", "insystem"));
     when(runner.getStreamSpec("instream2")).thenReturn(new StreamSpec("instream2", "instream2", "insystem2"));
 
-    TaskContext taskContext = mock(TaskContext.class);
+    TaskContextImpl taskContext = mock(TaskContextImpl.class);
     when(taskContext.getSystemStreamPartitions()).thenReturn(ImmutableSet
         .of(new SystemStreamPartition("insystem", "instream", new Partition(0)),
             new SystemStreamPartition("insystem2", "instream2", new Partition(0))));

http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/test/java/org/apache/samza/operators/TestWindowOperator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/TestWindowOperator.java b/samza-core/src/test/java/org/apache/samza/operators/TestWindowOperator.java
index ca8a151..1edc5f6 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/TestWindowOperator.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/TestWindowOperator.java
@@ -26,6 +26,7 @@ import junit.framework.Assert;
 import org.apache.samza.Partition;
 import org.apache.samza.application.StreamApplication;
 import org.apache.samza.config.Config;
+import org.apache.samza.container.TaskContextImpl;
 import org.apache.samza.metrics.MetricsRegistryMap;
 import org.apache.samza.operators.triggers.FiringType;
 import org.apache.samza.operators.triggers.Trigger;
@@ -41,7 +42,6 @@ import org.apache.samza.system.SystemStream;
 import org.apache.samza.system.SystemStreamPartition;
 import org.apache.samza.task.MessageCollector;
 import org.apache.samza.task.StreamOperatorTask;
-import org.apache.samza.task.TaskContext;
 import org.apache.samza.task.TaskCoordinator;
 import org.apache.samza.testUtils.TestClock;
 import org.junit.Before;
@@ -60,13 +60,13 @@ public class TestWindowOperator {
   private final TaskCoordinator taskCoordinator = mock(TaskCoordinator.class);
   private final List<Integer> integers = ImmutableList.of(1, 2, 1, 2, 1, 2, 1, 2, 3);
   private Config config;
-  private TaskContext taskContext;
+  private TaskContextImpl taskContext;
   private ApplicationRunner runner;
 
   @Before
   public void setup() throws Exception {
     config = mock(Config.class);
-    taskContext = mock(TaskContext.class);
+    taskContext = mock(TaskContextImpl.class);
     runner = mock(ApplicationRunner.class);
     when(taskContext.getSystemStreamPartitions()).thenReturn(ImmutableSet
         .of(new SystemStreamPartition("kafka", "integers", new Partition(0))));

http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/test/java/org/apache/samza/operators/impl/TestControlMessageSender.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestControlMessageSender.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestControlMessageSender.java
new file mode 100644
index 0000000..d17d751
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestControlMessageSender.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.operators.impl;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import org.apache.samza.Partition;
+import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.system.WatermarkMessage;
+import org.apache.samza.task.MessageCollector;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+public class TestControlMessageSender {
+
+  @Test
+  public void testSend() {
+    SystemStreamMetadata metadata = mock(SystemStreamMetadata.class);
+    Map<Partition, SystemStreamMetadata.SystemStreamPartitionMetadata> partitionMetadata = new HashMap<>();
+    partitionMetadata.put(new Partition(0), mock(SystemStreamMetadata.SystemStreamPartitionMetadata.class));
+    partitionMetadata.put(new Partition(1), mock(SystemStreamMetadata.SystemStreamPartitionMetadata.class));
+    partitionMetadata.put(new Partition(2), mock(SystemStreamMetadata.SystemStreamPartitionMetadata.class));
+    partitionMetadata.put(new Partition(3), mock(SystemStreamMetadata.SystemStreamPartitionMetadata.class));
+    when(metadata.getSystemStreamPartitionMetadata()).thenReturn(partitionMetadata);
+    StreamMetadataCache metadataCache = mock(StreamMetadataCache.class);
+    when(metadataCache.getSystemStreamMetadata(anyObject(), anyBoolean())).thenReturn(metadata);
+
+    SystemStream systemStream = new SystemStream("test-system", "test-stream");
+    Set<Integer> partitions = new HashSet<>();
+    MessageCollector collector = mock(MessageCollector.class);
+    doAnswer(invocation -> {
+        OutgoingMessageEnvelope envelope = (OutgoingMessageEnvelope) invocation.getArguments()[0];
+        partitions.add((Integer) envelope.getPartitionKey());
+        assertEquals(envelope.getSystemStream(), systemStream);
+        return null;
+      }).when(collector).send(any());
+
+    ControlMessageSender sender = new ControlMessageSender(metadataCache);
+    WatermarkMessage watermark = new WatermarkMessage(System.currentTimeMillis(), "task 0");
+    sender.send(watermark, systemStream, collector);
+    assertEquals(partitions.size(), 4);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/test/java/org/apache/samza/operators/impl/TestEndOfStreamStates.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestEndOfStreamStates.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestEndOfStreamStates.java
new file mode 100644
index 0000000..887991f
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestEndOfStreamStates.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.operators.impl;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import org.apache.samza.Partition;
+import org.apache.samza.system.EndOfStreamMessage;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamPartition;
+import org.junit.Test;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+
+public class TestEndOfStreamStates {
+
+  @Test
+  public void testUpdate() {
+    SystemStream input = new SystemStream("system", "input");
+    SystemStream intermediate = new SystemStream("system", "intermediate");
+
+    Set<SystemStreamPartition> ssps = new HashSet<>();
+    SystemStreamPartition inputPartition0 = new SystemStreamPartition(input, new Partition(0));
+    SystemStreamPartition intPartition0 = new SystemStreamPartition(intermediate, new Partition(0));
+    SystemStreamPartition intPartition1 = new SystemStreamPartition(intermediate, new Partition(1));
+    ssps.add(inputPartition0);
+    ssps.add(intPartition0);
+    ssps.add(intPartition1);
+
+    Map<SystemStream, Integer> producerCounts = new HashMap<>();
+    producerCounts.put(intermediate, 2);
+
+    EndOfStreamStates endOfStreamStates = new EndOfStreamStates(ssps, producerCounts);
+    assertFalse(endOfStreamStates.isEndOfStream(input));
+    assertFalse(endOfStreamStates.isEndOfStream(intermediate));
+    assertFalse(endOfStreamStates.allEndOfStream());
+
+    IncomingMessageEnvelope envelope = IncomingMessageEnvelope.buildEndOfStreamEnvelope(inputPartition0);
+    endOfStreamStates.update((EndOfStreamMessage) envelope.getMessage(), envelope.getSystemStreamPartition());
+    assertTrue(endOfStreamStates.isEndOfStream(input));
+    assertFalse(endOfStreamStates.isEndOfStream(intermediate));
+    assertFalse(endOfStreamStates.allEndOfStream());
+
+    EndOfStreamMessage eos = new EndOfStreamMessage("task 0");
+    endOfStreamStates.update(eos, intPartition0);
+    endOfStreamStates.update(eos, intPartition1);
+    assertFalse(endOfStreamStates.isEndOfStream(intermediate));
+    assertFalse(endOfStreamStates.allEndOfStream());
+
+    eos = new EndOfStreamMessage("task 1");
+    endOfStreamStates.update(eos, intPartition0);
+    endOfStreamStates.update(eos, intPartition1);
+    assertTrue(endOfStreamStates.isEndOfStream(intermediate));
+    assertTrue(endOfStreamStates.allEndOfStream());
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
index 3ae8f5b..4a78da8 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
@@ -18,21 +18,22 @@
  */
 package org.apache.samza.operators.impl;
 
+import java.util.Collection;
+import java.util.Collections;
 import java.util.Set;
 import org.apache.samza.config.Config;
+import org.apache.samza.container.TaskContextImpl;
 import org.apache.samza.metrics.Counter;
-import org.apache.samza.metrics.MetricsRegistry;
 import org.apache.samza.metrics.MetricsRegistryMap;
+import org.apache.samza.metrics.ReadableMetricsRegistry;
 import org.apache.samza.metrics.Timer;
+import org.apache.samza.operators.functions.WatermarkFunction;
 import org.apache.samza.operators.spec.OperatorSpec;
 import org.apache.samza.task.MessageCollector;
 import org.apache.samza.task.TaskContext;
 import org.apache.samza.task.TaskCoordinator;
 import org.junit.Test;
 
-import java.util.Collection;
-import java.util.Collections;
-
 import static org.mockito.Matchers.anyLong;
 import static org.mockito.Matchers.anyObject;
 import static org.mockito.Matchers.anyString;
@@ -47,7 +48,7 @@ public class TestOperatorImpl {
   @Test(expected = IllegalStateException.class)
   public void testMultipleInitShouldThrow() {
     OperatorImpl<Object, Object> opImpl = new TestOpImpl(mock(Object.class));
-    TaskContext mockTaskContext = mock(TaskContext.class);
+    TaskContextImpl mockTaskContext = mock(TaskContextImpl.class);
     when(mockTaskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap());
     opImpl.init(mock(Config.class), mockTaskContext);
     opImpl.init(mock(Config.class), mockTaskContext);
@@ -61,7 +62,7 @@ public class TestOperatorImpl {
 
   @Test
   public void testOnMessagePropagatesResults() {
-    TaskContext mockTaskContext = mock(TaskContext.class);
+    TaskContextImpl mockTaskContext = mock(TaskContextImpl.class);
     when(mockTaskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap());
 
     Object mockTestOpImplOutput = mock(Object.class);
@@ -93,8 +94,8 @@ public class TestOperatorImpl {
 
   @Test
   public void testOnMessageUpdatesMetrics() {
-    TaskContext mockTaskContext = mock(TaskContext.class);
-    MetricsRegistry mockMetricsRegistry = mock(MetricsRegistry.class);
+    TaskContextImpl mockTaskContext = mock(TaskContextImpl.class);
+    ReadableMetricsRegistry mockMetricsRegistry = mock(ReadableMetricsRegistry.class);
     when(mockTaskContext.getMetricsRegistry()).thenReturn(mockMetricsRegistry);
     Counter mockCounter = mock(Counter.class);
     Timer mockTimer = mock(Timer.class);
@@ -117,7 +118,7 @@ public class TestOperatorImpl {
 
   @Test
   public void testOnTimerPropagatesResultsAndTimer() {
-    TaskContext mockTaskContext = mock(TaskContext.class);
+    TaskContextImpl mockTaskContext = mock(TaskContextImpl.class);
     when(mockTaskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap());
 
     Object mockTestOpImplOutput = mock(Object.class);
@@ -153,8 +154,8 @@ public class TestOperatorImpl {
 
   @Test
   public void testOnTimerUpdatesMetrics() {
-    TaskContext mockTaskContext = mock(TaskContext.class);
-    MetricsRegistry mockMetricsRegistry = mock(MetricsRegistry.class);
+    TaskContextImpl mockTaskContext = mock(TaskContextImpl.class);
+    ReadableMetricsRegistry mockMetricsRegistry = mock(ReadableMetricsRegistry.class);
     when(mockTaskContext.getMetricsRegistry()).thenReturn(mockMetricsRegistry);
     Counter mockMessageCounter = mock(Counter.class);
     Timer mockTimer = mock(Timer.class);
@@ -210,6 +211,11 @@ public class TestOperatorImpl {
     TestOpSpec() {
      super(OpCode.INPUT, 1);
     }
+
+    @Override
+    public WatermarkFunction getWatermarkFn() {
+      return null;
+    }
   }
 
   public static Set<OperatorImpl> getNextOperators(OperatorImpl op) {
@@ -221,11 +227,11 @@ public class TestOperatorImpl {
   }
 
   public static long getInputWatermark(OperatorImpl op) {
-    return op.getInputWatermarkTime();
+    return op.getInputWatermark();
   }
 
   public static long getOutputWatermark(OperatorImpl op) {
-    return op.getOutputWatermarkTime();
+    return op.getOutputWatermark();
   }
 
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
index 4505eef..9fab1b7 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
@@ -19,8 +19,23 @@
 
 package org.apache.samza.operators.impl;
 
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.samza.Partition;
 import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.container.TaskContextImpl;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.TaskModel;
 import org.apache.samza.metrics.MetricsRegistryMap;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.OutputStream;
@@ -32,6 +47,7 @@ import org.apache.samza.operators.spec.OperatorSpec.OpCode;
 import org.apache.samza.runtime.ApplicationRunner;
 import org.apache.samza.system.StreamSpec;
 import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamPartition;
 import org.apache.samza.task.MessageCollector;
 import org.apache.samza.task.TaskContext;
 import org.apache.samza.task.TaskCoordinator;
@@ -62,7 +78,7 @@ public class TestOperatorImplGraph {
   public void testEmptyChain() {
     StreamGraphImpl streamGraph = new StreamGraphImpl(mock(ApplicationRunner.class), mock(Config.class));
     OperatorImplGraph opGraph =
-        new OperatorImplGraph(streamGraph, mock(Config.class), mock(TaskContext.class), mock(Clock.class));
+        new OperatorImplGraph(streamGraph, mock(Config.class), mock(TaskContextImpl.class), mock(Clock.class));
     assertEquals(0, opGraph.getAllInputOperators().size());
   }
 
@@ -82,8 +98,9 @@ public class TestOperatorImplGraph {
         .map(mock(MapFunction.class))
         .sendTo(outputStream);
 
-    TaskContext mockTaskContext = mock(TaskContext.class);
+    TaskContextImpl mockTaskContext = mock(TaskContextImpl.class);
     when(mockTaskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap());
+    when(mockTaskContext.getTaskName()).thenReturn(new TaskName("task 0"));
     OperatorImplGraph opImplGraph =
         new OperatorImplGraph(streamGraph, mock(Config.class), mockTaskContext, mock(Clock.class));
 
@@ -113,17 +130,17 @@ public class TestOperatorImplGraph {
     inputStream.filter(mock(FilterFunction.class));
     inputStream.map(mock(MapFunction.class));
 
-    TaskContext mockTaskContext = mock(TaskContext.class);
+    TaskContextImpl mockTaskContext = mock(TaskContextImpl.class);
     when(mockTaskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap());
     OperatorImplGraph opImplGraph =
         new OperatorImplGraph(streamGraph, mock(Config.class), mockTaskContext, mock(Clock.class));
 
     InputOperatorImpl inputOpImpl = opImplGraph.getInputOperator(new SystemStream("input-system", "input-stream"));
     assertEquals(2, inputOpImpl.registeredOperators.size());
-    assertTrue(inputOpImpl.registeredOperators.stream().anyMatch(opImpl ->
-        ((OperatorImpl) opImpl).getOperatorSpec().getOpCode() == OpCode.FILTER));
-    assertTrue(inputOpImpl.registeredOperators.stream().anyMatch(opImpl ->
-        ((OperatorImpl) opImpl).getOperatorSpec().getOpCode() == OpCode.MAP));
+    assertTrue(inputOpImpl.registeredOperators.stream()
+        .anyMatch(opImpl -> ((OperatorImpl) opImpl).getOperatorSpec().getOpCode() == OpCode.FILTER));
+    assertTrue(inputOpImpl.registeredOperators.stream()
+        .anyMatch(opImpl -> ((OperatorImpl) opImpl).getOperatorSpec().getOpCode() == OpCode.MAP));
   }
 
   @Test
@@ -139,13 +156,13 @@ public class TestOperatorImplGraph {
     MapFunction mockMapFunction = mock(MapFunction.class);
     mergedStream.map(mockMapFunction);
 
-    TaskContext mockTaskContext = mock(TaskContext.class);
+    TaskContextImpl mockTaskContext = mock(TaskContextImpl.class);
     when(mockTaskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap());
     OperatorImplGraph opImplGraph =
         new OperatorImplGraph(streamGraph, mock(Config.class), mockTaskContext, mock(Clock.class));
 
     // verify that the DAG after merge is only traversed & initialized once
-    verify(mockMapFunction, times(1)).init(any(Config.class), any(TaskContext.class));
+    verify(mockMapFunction, times(1)).init(any(Config.class), any(TaskContextImpl.class));
   }
 
   @Test
@@ -160,13 +177,13 @@ public class TestOperatorImplGraph {
     MessageStream<Object> inputStream2 = streamGraph.getInputStream("input2", (k, v) -> v);
     inputStream1.join(inputStream2, mockJoinFunction, Duration.ofHours(1));
 
-    TaskContext mockTaskContext = mock(TaskContext.class);
+    TaskContextImpl mockTaskContext = mock(TaskContextImpl.class);
     when(mockTaskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap());
     OperatorImplGraph opImplGraph =
         new OperatorImplGraph(streamGraph, mock(Config.class), mockTaskContext, mock(Clock.class));
 
     // verify that join function is initialized once.
-    verify(mockJoinFunction, times(1)).init(any(Config.class), any(TaskContext.class));
+    verify(mockJoinFunction, times(1)).init(any(Config.class), any(TaskContextImpl.class));
 
     InputOperatorImpl inputOpImpl1 = opImplGraph.getInputOperator(new SystemStream("input-system", "input-stream1"));
     InputOperatorImpl inputOpImpl2 = opImplGraph.getInputOperator(new SystemStream("input-system", "input-stream2"));
@@ -201,7 +218,7 @@ public class TestOperatorImplGraph {
     when(mockRunner.getStreamSpec("input1")).thenReturn(new StreamSpec("input1", "input-stream1", "input-system"));
     when(mockRunner.getStreamSpec("input2")).thenReturn(new StreamSpec("input2", "input-stream2", "input-system"));
     Config mockConfig = mock(Config.class);
-    TaskContext mockContext = mock(TaskContext.class);
+    TaskContextImpl mockContext = mock(TaskContextImpl.class);
     when(mockContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap());
     StreamGraphImpl streamGraph = new StreamGraphImpl(mockRunner, mockConfig);
 
@@ -255,4 +272,144 @@ public class TestOperatorImplGraph {
       }
     };
   }
+
+  @Test
+  public void testGetStreamToConsumerTasks() {
+    String system = "test-system";
+    String stream0 = "test-stream-0";
+    String stream1 = "test-stream-1";
+
+    SystemStreamPartition ssp0 = new SystemStreamPartition(system, stream0, new Partition(0));
+    SystemStreamPartition ssp1 = new SystemStreamPartition(system, stream0, new Partition(1));
+    SystemStreamPartition ssp2 = new SystemStreamPartition(system, stream1, new Partition(0));
+
+    TaskName task0 = new TaskName("Task 0");
+    TaskName task1 = new TaskName("Task 1");
+    Set<SystemStreamPartition> ssps = new HashSet<>();
+    ssps.add(ssp0);
+    ssps.add(ssp2);
+    TaskModel tm0 = new TaskModel(task0, ssps, new Partition(0));
+    ContainerModel cm0 = new ContainerModel("c0", 0, Collections.singletonMap(task0, tm0));
+    TaskModel tm1 = new TaskModel(task1, Collections.singleton(ssp1), new Partition(1));
+    ContainerModel cm1 = new ContainerModel("c1", 1, Collections.singletonMap(task1, tm1));
+
+    Map<String, ContainerModel> cms = new HashMap<>();
+    cms.put(cm0.getProcessorId(), cm0);
+    cms.put(cm1.getProcessorId(), cm1);
+
+    JobModel jobModel = new JobModel(new MapConfig(), cms, null);
+    Multimap<SystemStream, String> streamToTasks = OperatorImplGraph.getStreamToConsumerTasks(jobModel);
+    assertEquals(streamToTasks.get(ssp0.getSystemStream()).size(), 2);
+    assertEquals(streamToTasks.get(ssp2.getSystemStream()).size(), 1);
+  }
+
+  @Test
+  public void testGetOutputToInputStreams() {
+    Map<String, String> configMap = new HashMap<>();
+    configMap.put(JobConfig.JOB_NAME(), "test-app");
+    configMap.put(JobConfig.JOB_DEFAULT_SYSTEM(), "test-system");
+    Config config = new MapConfig(configMap);
+
+    /**
+     * the graph looks like the following. number of partitions in parentheses. quotes indicate expected value.
+     *
+     *                                    input1 -> map -> join -> partitionBy (10) -> output1
+     *                                                       |
+     *                                     input2 -> filter -|
+     *                                                       |
+     *           input3 -> filter -> partitionBy -> map -> join -> output2
+     *
+     */
+    StreamSpec input1 = new StreamSpec("input1", "input1", "system1");
+    StreamSpec input2 = new StreamSpec("input2", "input2", "system2");
+    StreamSpec input3 = new StreamSpec("input3", "input3", "system2");
+
+    StreamSpec output1 = new StreamSpec("output1", "output1", "system1");
+    StreamSpec output2 = new StreamSpec("output2", "output2", "system2");
+
+    ApplicationRunner runner = mock(ApplicationRunner.class);
+    when(runner.getStreamSpec("input1")).thenReturn(input1);
+    when(runner.getStreamSpec("input2")).thenReturn(input2);
+    when(runner.getStreamSpec("input3")).thenReturn(input3);
+    when(runner.getStreamSpec("output1")).thenReturn(output1);
+    when(runner.getStreamSpec("output2")).thenReturn(output2);
+
+    // intermediate streams used in tests
+    StreamSpec int1 = new StreamSpec("test-app-1-partition_by-10", "test-app-1-partition_by-10", "default-system");
+    StreamSpec int2 = new StreamSpec("test-app-1-partition_by-6", "test-app-1-partition_by-6", "default-system");
+    when(runner.getStreamSpec("test-app-1-partition_by-10"))
+        .thenReturn(int1);
+    when(runner.getStreamSpec("test-app-1-partition_by-6"))
+        .thenReturn(int2);
+
+    StreamGraphImpl streamGraph = new StreamGraphImpl(runner, config);
+    BiFunction msgBuilder = mock(BiFunction.class);
+    MessageStream m1 = streamGraph.getInputStream("input1", msgBuilder).map(m -> m);
+    MessageStream m2 = streamGraph.getInputStream("input2", msgBuilder).filter(m -> true);
+    MessageStream m3 = streamGraph.getInputStream("input3", msgBuilder).filter(m -> true).partitionBy(m -> "hehe").map(m -> m);
+    Function mockFn = mock(Function.class);
+    OutputStream<Object, Object, Object> om1 = streamGraph.getOutputStream("output1", mockFn, mockFn);
+    OutputStream<Object, Object, Object> om2 = streamGraph.getOutputStream("output2", mockFn, mockFn);
+
+    m1.join(m2, mock(JoinFunction.class), Duration.ofHours(2)).partitionBy(m -> "haha").sendTo(om1);
+    m3.join(m2, mock(JoinFunction.class), Duration.ofHours(1)).sendTo(om2);
+
+    Multimap<SystemStream, SystemStream> outputToInput = OperatorImplGraph.getIntermediateToInputStreamsMap(streamGraph);
+    Collection<SystemStream> inputs = outputToInput.get(int1.toSystemStream());
+    assertEquals(inputs.size(), 2);
+    assertTrue(inputs.contains(input1.toSystemStream()));
+    assertTrue(inputs.contains(input2.toSystemStream()));
+
+    inputs = outputToInput.get(int2.toSystemStream());
+    assertEquals(inputs.size(), 1);
+    assertEquals(inputs.iterator().next(), input3.toSystemStream());
+  }
+
+  @Test
+  public void testGetProducerTaskCountForIntermediateStreams() {
+    /**
+     * the task assignment looks like the following:
+     *
+     * input1 -----> task0, task1 -----> int1
+     *                                    ^
+     * input2 ------> task1, task2--------|
+     *                                    v
+     * input3 ------> task1 -----------> int2
+     *
+     */
+
+    SystemStream input1 = new SystemStream("system1", "intput1");
+    SystemStream input2 = new SystemStream("system2", "intput2");
+    SystemStream input3 = new SystemStream("system2", "intput3");
+
+    SystemStream int1 = new SystemStream("system1", "int1");
+    SystemStream int2 = new SystemStream("system1", "int2");
+
+
+    String task0 = "Task 0";
+    String task1 = "Task 1";
+    String task2 = "Task 2";
+
+    Multimap<SystemStream, String> streamToConsumerTasks = HashMultimap.create();
+    streamToConsumerTasks.put(input1, task0);
+    streamToConsumerTasks.put(input1, task1);
+    streamToConsumerTasks.put(input2, task1);
+    streamToConsumerTasks.put(input2, task2);
+    streamToConsumerTasks.put(input3, task1);
+    streamToConsumerTasks.put(int1, task0);
+    streamToConsumerTasks.put(int1, task1);
+    streamToConsumerTasks.put(int2, task0);
+
+    Multimap<SystemStream, SystemStream> intermediateToInputStreams = HashMultimap.create();
+    intermediateToInputStreams.put(int1, input1);
+    intermediateToInputStreams.put(int1, input2);
+
+    intermediateToInputStreams.put(int2, input2);
+    intermediateToInputStreams.put(int2, input3);
+
+    Map<SystemStream, Integer> counts = OperatorImplGraph.getProducerTaskCountForIntermediateStreams(
+        streamToConsumerTasks, intermediateToInputStreams);
+    assertTrue(counts.get(int1) == 3);
+    assertTrue(counts.get(int2) == 2);
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/test/java/org/apache/samza/operators/impl/TestWatermarkStates.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestWatermarkStates.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestWatermarkStates.java
new file mode 100644
index 0000000..a726069
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestWatermarkStates.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.operators.impl;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import org.apache.samza.Partition;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.system.WatermarkMessage;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+import static org.apache.samza.operators.impl.WatermarkStates.WATERMARK_NOT_EXIST;
+
+public class TestWatermarkStates {
+
+  @Test
+  public void testUpdate() {
+    SystemStream input = new SystemStream("system", "input");
+    SystemStream intermediate = new SystemStream("system", "intermediate");
+
+    Set<SystemStreamPartition> ssps = new HashSet<>();
+    SystemStreamPartition inputPartition0 = new SystemStreamPartition(input, new Partition(0));
+    SystemStreamPartition intPartition0 = new SystemStreamPartition(intermediate, new Partition(0));
+    SystemStreamPartition intPartition1 = new SystemStreamPartition(intermediate, new Partition(1));
+    ssps.add(inputPartition0);
+    ssps.add(intPartition0);
+    ssps.add(intPartition1);
+
+    Map<SystemStream, Integer> producerCounts = new HashMap<>();
+    producerCounts.put(intermediate, 2);
+
+    // advance watermark on input to 5
+    WatermarkStates watermarkStates = new WatermarkStates(ssps, producerCounts);
+    IncomingMessageEnvelope envelope = IncomingMessageEnvelope.buildWatermarkEnvelope(inputPartition0, 5L);
+    watermarkStates.update((WatermarkMessage) envelope.getMessage(),
+        envelope.getSystemStreamPartition());
+    assertEquals(watermarkStates.getWatermark(input), 5L);
+    assertEquals(watermarkStates.getWatermark(intermediate), WATERMARK_NOT_EXIST);
+
+    // watermark from task 0 on int p0 to 6
+    WatermarkMessage watermarkMessage = new WatermarkMessage(6L, "task 0");
+    watermarkStates.update(watermarkMessage, intPartition0);
+    assertEquals(watermarkStates.getWatermarkPerSSP(intPartition0), WATERMARK_NOT_EXIST);
+    assertEquals(watermarkStates.getWatermark(intermediate), WATERMARK_NOT_EXIST);
+
+    // watermark from task 1 on int p0 to 3
+    watermarkMessage = new WatermarkMessage(3L, "task 1");
+    watermarkStates.update(watermarkMessage, intPartition0);
+    assertEquals(watermarkStates.getWatermarkPerSSP(intPartition0), 3L);
+    assertEquals(watermarkStates.getWatermark(intermediate), WATERMARK_NOT_EXIST);
+
+    // watermark from task 0 on int p1 to 10
+    watermarkMessage = new WatermarkMessage(10L, "task 0");
+    watermarkStates.update(watermarkMessage, intPartition1);
+    assertEquals(watermarkStates.getWatermarkPerSSP(intPartition1), WATERMARK_NOT_EXIST);
+    assertEquals(watermarkStates.getWatermark(intermediate), WATERMARK_NOT_EXIST);
+
+    // watermark from task 1 on int p1 to 4
+    watermarkMessage = new WatermarkMessage(4L, "task 1");
+    watermarkStates.update(watermarkMessage, intPartition1);
+    assertEquals(watermarkStates.getWatermarkPerSSP(intPartition1), 4L);
+    // verify we got a watermark 3 (min) for int stream
+    assertEquals(watermarkStates.getWatermark(intermediate), 3L);
+
+    // advance watermark from task 1 on int p0 to 8
+    watermarkMessage = new WatermarkMessage(8L, "task 1");
+    watermarkStates.update(watermarkMessage, intPartition0);
+    assertEquals(watermarkStates.getWatermarkPerSSP(intPartition0), 6L);
+    // verify we got a watermark 4 (min) for int stream
+    assertEquals(watermarkStates.getWatermark(intermediate), 4L);
+
+    // advance watermark from task 1 on int p1 to 7
+    watermarkMessage = new WatermarkMessage(7L, "task 1");
+    watermarkStates.update(watermarkMessage, intPartition1);
+    assertEquals(watermarkStates.getWatermarkPerSSP(intPartition1), 7L);
+    // verify we got a watermark 6 (min) for int stream
+    assertEquals(watermarkStates.getWatermark(intermediate), 6L);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/test/java/org/apache/samza/serializers/model/serializers/TestIntermediateMessageSerde.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/serializers/model/serializers/TestIntermediateMessageSerde.java b/samza-core/src/test/java/org/apache/samza/serializers/model/serializers/TestIntermediateMessageSerde.java
index 7192525..7a3faca 100644
--- a/samza-core/src/test/java/org/apache/samza/serializers/model/serializers/TestIntermediateMessageSerde.java
+++ b/samza-core/src/test/java/org/apache/samza/serializers/model/serializers/TestIntermediateMessageSerde.java
@@ -24,11 +24,11 @@ import java.io.ByteArrayOutputStream;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.io.Serializable;
-import org.apache.samza.message.EndOfStreamMessage;
-import org.apache.samza.message.WatermarkMessage;
-import org.apache.samza.message.IntermediateMessageType;
 import org.apache.samza.serializers.IntermediateMessageSerde;
 import org.apache.samza.serializers.Serde;
+import org.apache.samza.system.EndOfStreamMessage;
+import org.apache.samza.system.MessageType;
+import org.apache.samza.system.WatermarkMessage;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
@@ -96,7 +96,7 @@ public class TestIntermediateMessageSerde {
     TestUserMessage userMessage = new TestUserMessage(msg, 0, System.currentTimeMillis());
     byte[] bytes = imserde.toBytes(userMessage);
     TestUserMessage de = (TestUserMessage) imserde.fromBytes(bytes);
-    assertEquals(IntermediateMessageType.of(de), IntermediateMessageType.USER_MESSAGE);
+    assertEquals(MessageType.of(de), MessageType.USER_MESSAGE);
     assertEquals(de.getMessage(), msg);
     assertEquals(de.getOffset(), 0);
     assertTrue(de.getTimestamp() > 0);
@@ -106,12 +106,11 @@ public class TestIntermediateMessageSerde {
   public void testWatermarkMessageSerde() {
     IntermediateMessageSerde imserde = new IntermediateMessageSerde(new ObjectSerde());
     String taskName = "task-1";
-    WatermarkMessage watermark = new WatermarkMessage(System.currentTimeMillis(), taskName, 8);
+    WatermarkMessage watermark = new WatermarkMessage(System.currentTimeMillis(), taskName);
     byte[] bytes = imserde.toBytes(watermark);
     WatermarkMessage de = (WatermarkMessage) imserde.fromBytes(bytes);
-    assertEquals(IntermediateMessageType.of(de), IntermediateMessageType.WATERMARK_MESSAGE);
+    assertEquals(MessageType.of(de), MessageType.WATERMARK);
     assertEquals(de.getTaskName(), taskName);
-    assertEquals(de.getTaskCount(), 8);
     assertTrue(de.getTimestamp() > 0);
   }
 
@@ -120,12 +119,11 @@ public class TestIntermediateMessageSerde {
     IntermediateMessageSerde imserde = new IntermediateMessageSerde(new ObjectSerde());
     String streamId = "test-stream";
     String taskName = "task-1";
-    EndOfStreamMessage eos = new EndOfStreamMessage(taskName, 8);
+    EndOfStreamMessage eos = new EndOfStreamMessage(taskName);
     byte[] bytes = imserde.toBytes(eos);
     EndOfStreamMessage de = (EndOfStreamMessage) imserde.fromBytes(bytes);
-    assertEquals(IntermediateMessageType.of(de), IntermediateMessageType.END_OF_STREAM_MESSAGE);
+    assertEquals(MessageType.of(de), MessageType.END_OF_STREAM);
     assertEquals(de.getTaskName(), taskName);
-    assertEquals(de.getTaskCount(), 8);
     assertEquals(de.getVersion(), 1);
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java b/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java
index 3d4976b..5a4b4bf 100644
--- a/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java
+++ b/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java
@@ -39,7 +39,6 @@ import org.apache.samza.container.TaskInstance;
 import org.apache.samza.container.TaskInstanceExceptionHandler;
 import org.apache.samza.container.TaskInstanceMetrics;
 import org.apache.samza.container.TaskName;
-import org.apache.samza.control.EndOfStreamManager;
 import org.apache.samza.metrics.MetricsRegistryMap;
 import org.apache.samza.system.IncomingMessageEnvelope;
 import org.apache.samza.system.SystemConsumer;
@@ -78,8 +77,8 @@ public class TestAsyncRunLoop {
   private final IncomingMessageEnvelope envelope0 = new IncomingMessageEnvelope(ssp0, "0", "key0", "value0");
   private final IncomingMessageEnvelope envelope1 = new IncomingMessageEnvelope(ssp1, "1", "key1", "value1");
   private final IncomingMessageEnvelope envelope3 = new IncomingMessageEnvelope(ssp0, "1", "key0", "value0");
-  private final IncomingMessageEnvelope ssp0EndOfStream = EndOfStreamManager.buildEndOfStreamEnvelope(ssp0);
-  private final IncomingMessageEnvelope ssp1EndOfStream = EndOfStreamManager.buildEndOfStreamEnvelope(ssp1);
+  private final IncomingMessageEnvelope ssp0EndOfStream = IncomingMessageEnvelope.buildEndOfStreamEnvelope(ssp0);
+  private final IncomingMessageEnvelope ssp1EndOfStream = IncomingMessageEnvelope.buildEndOfStreamEnvelope(ssp1);
 
   TaskInstance createTaskInstance(AsyncStreamTask task, TaskName taskName, SystemStreamPartition ssp, OffsetManager manager, SystemConsumers consumers) {
     TaskInstanceMetrics taskInstanceMetrics = new TaskInstanceMetrics("task", new MetricsRegistryMap());
@@ -575,7 +574,7 @@ public class TestAsyncRunLoop {
     SystemStreamPartition ssp2 = new SystemStreamPartition("system1", "stream2", p2);
     IncomingMessageEnvelope envelope1 = new IncomingMessageEnvelope(ssp2, "1", "key1", "message1");
     IncomingMessageEnvelope envelope2 = new IncomingMessageEnvelope(ssp2, "2", "key1", "message1");
-    IncomingMessageEnvelope envelope3 = EndOfStreamManager.buildEndOfStreamEnvelope(ssp2);
+    IncomingMessageEnvelope envelope3 = IncomingMessageEnvelope.buildEndOfStreamEnvelope(ssp2);
 
     Map<SystemStreamPartition, List<IncomingMessageEnvelope>> sspMap = new HashMap<>();
     List<IncomingMessageEnvelope> messageList = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
index 81f3ed1..ea11d9f 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
@@ -20,28 +20,21 @@
 package org.apache.samza.container
 
 
-import java.util
-import java.util
-import java.util.Collections
 import java.util.concurrent.ConcurrentHashMap
-import com.google.common.collect.Multimap
-import org.apache.samza.SamzaException
 
 import org.apache.samza.Partition
-import org.apache.samza.checkpoint.OffsetManager
-import org.apache.samza.config.Config
-import org.apache.samza.config.MapConfig
-import org.apache.samza.control.ControlMessageUtils
-import org.apache.samza.job.model.ContainerModel
-import org.apache.samza.job.model.JobModel
-import org.apache.samza.job.model.TaskModel
-import org.apache.samza.metrics.Counter
-import org.apache.samza.metrics.Metric
-import org.apache.samza.metrics.MetricsRegistryMap
 import org.apache.samza.checkpoint.{Checkpoint, OffsetManager}
 import org.apache.samza.config.{Config, MapConfig}
 import org.apache.samza.metrics.{Counter, Metric, MetricsRegistryMap}
 import org.apache.samza.serializers.SerdeManager
+import org.apache.samza.system.IncomingMessageEnvelope
+import org.apache.samza.system.SystemAdmin
+import org.apache.samza.system.SystemConsumer
+import org.apache.samza.system.SystemConsumers
+import org.apache.samza.system.SystemProducer
+import org.apache.samza.system.SystemProducers
+import org.apache.samza.system.SystemStream
+import org.apache.samza.system.SystemStreamMetadata
 import org.apache.samza.storage.TaskStorageManager
 import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
 import org.apache.samza.system._
@@ -124,9 +117,9 @@ class TestTaskInstance {
    */
   class TroublesomeTask extends StreamTask with WindowableTask {
     def process(
-      envelope: IncomingMessageEnvelope,
-      collector: MessageCollector,
-      coordinator: TaskCoordinator) {
+                 envelope: IncomingMessageEnvelope,
+                 collector: MessageCollector,
+                 coordinator: TaskCoordinator) {
 
       envelope.getOffset().toInt match {
         case offset if offset % 2 == 0 => throw new TroublesomeException
@@ -143,8 +136,8 @@ class TestTaskInstance {
    * Helper method used to retrieve the value of a counter from a group.
    */
   private def getCount(
-    group: ConcurrentHashMap[String, Metric],
-    name: String): Long = {
+                        group: ConcurrentHashMap[String, Metric],
+                        name: String): Long = {
     group.get("exception-ignored-" + name.toLowerCase).asInstanceOf[Counter].getCount
   }
 
@@ -407,36 +400,6 @@ class TestTaskInstance {
     // Finally, checkpoint the inputs with the snapshotted checkpoint captured at the beginning of commit
     mockOrder.verify(offsetManager).writeCheckpoint(taskName, checkpoint)
   }
-
-  @Test
-  def testBuildInputToTasks = {
-    val system: String = "test-system"
-    val stream0: String = "test-stream-0"
-    val stream1: String = "test-stream-1"
-
-    val ssp0: SystemStreamPartition = new SystemStreamPartition(system, stream0, new Partition(0))
-    val ssp1: SystemStreamPartition = new SystemStreamPartition(system, stream0, new Partition(1))
-    val ssp2: SystemStreamPartition = new SystemStreamPartition(system, stream1, new Partition(0))
-
-    val task0: TaskName = new TaskName("Task 0")
-    val task1: TaskName = new TaskName("Task 1")
-    val ssps: util.Set[SystemStreamPartition] = new util.HashSet[SystemStreamPartition]
-    ssps.add(ssp0)
-    ssps.add(ssp2)
-    val tm0: TaskModel = new TaskModel(task0, ssps, new Partition(0))
-    val cm0: ContainerModel = new ContainerModel("c0", 0, Collections.singletonMap(task0, tm0))
-    val tm1: TaskModel = new TaskModel(task1, Collections.singleton(ssp1), new Partition(1))
-    val cm1: ContainerModel = new ContainerModel("c1", 1, Collections.singletonMap(task1, tm1))
-
-    val cms: util.Map[String, ContainerModel] = new util.HashMap[String, ContainerModel]
-    cms.put(cm0.getProcessorId, cm0)
-    cms.put(cm1.getProcessorId, cm1)
-
-    val jobModel: JobModel = new JobModel(new MapConfig, cms, null)
-    val streamToTasks: Multimap[SystemStream, String] = TaskInstance.buildInputToTasks(jobModel)
-    assertEquals(streamToTasks.get(ssp0.getSystemStream).size, 2)
-    assertEquals(streamToTasks.get(ssp2.getSystemStream).size, 1)
-  }
 }
 
 class MockSystemAdmin extends SystemAdmin {

http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/test/scala/org/apache/samza/serializers/TestSerdeManager.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/serializers/TestSerdeManager.scala b/samza-core/src/test/scala/org/apache/samza/serializers/TestSerdeManager.scala
index 9d808cb..774230c 100644
--- a/samza-core/src/test/scala/org/apache/samza/serializers/TestSerdeManager.scala
+++ b/samza-core/src/test/scala/org/apache/samza/serializers/TestSerdeManager.scala
@@ -20,8 +20,8 @@
 package org.apache.samza.serializers
 
 
-import org.apache.samza.message.EndOfStreamMessage
-import org.apache.samza.message.WatermarkMessage
+import org.apache.samza.system.EndOfStreamMessage
+import org.apache.samza.system.WatermarkMessage
 import org.junit.Assert._
 import org.junit.Assert.assertEquals
 import org.junit.Assert.assertEquals
@@ -83,18 +83,17 @@ class TestSerdeManager {
     val eosStreamId = "eos-stream"
     val taskName = "task 1"
     val taskCount = 8
-    outEnvelope = new OutgoingMessageEnvelope(intermediate, "eos", new EndOfStreamMessage(taskName, taskCount))
+    outEnvelope = new OutgoingMessageEnvelope(intermediate, "eos", new EndOfStreamMessage(taskName))
     se = serdeManager.toBytes(outEnvelope)
     inEnvelope = new IncomingMessageEnvelope(new SystemStreamPartition(intermediate, new Partition(0)), "offset", se.getKey, se.getMessage)
     de = serdeManager.fromBytes(inEnvelope)
     assertEquals(de.getKey, "eos")
     val eosMsg = de.getMessage.asInstanceOf[EndOfStreamMessage]
     assertEquals(eosMsg.getTaskName, taskName)
-    assertEquals(eosMsg.getTaskCount, taskCount)
 
     // test watermark message sent to intermediate stream
     val timestamp = System.currentTimeMillis()
-    outEnvelope = new OutgoingMessageEnvelope(intermediate, "watermark", new WatermarkMessage(timestamp, taskName, taskCount))
+    outEnvelope = new OutgoingMessageEnvelope(intermediate, "watermark", new WatermarkMessage(timestamp, taskName))
     se = serdeManager.toBytes(outEnvelope)
     inEnvelope = new IncomingMessageEnvelope(new SystemStreamPartition(intermediate, new Partition(0)), "offset", se.getKey, se.getMessage)
     de = serdeManager.fromBytes(inEnvelope)
@@ -102,6 +101,5 @@ class TestSerdeManager {
     val watermarkMsg = de.getMessage.asInstanceOf[WatermarkMessage]
     assertEquals(watermarkMsg.getTimestamp, timestamp)
     assertEquals(watermarkMsg.getTaskName, taskName)
-    assertEquals(watermarkMsg.getTaskCount, taskCount)
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemConsumer.java
----------------------------------------------------------------------
diff --git a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemConsumer.java b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemConsumer.java
index de0d1da..fb9bb56 100644
--- a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemConsumer.java
+++ b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemConsumer.java
@@ -36,7 +36,6 @@ import org.apache.commons.lang.Validate;
 import org.apache.samza.Partition;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
-import org.apache.samza.control.EndOfStreamManager;
 import org.apache.samza.metrics.Counter;
 import org.apache.samza.metrics.MetricsRegistry;
 import org.apache.samza.system.IncomingMessageEnvelope;
@@ -237,7 +236,7 @@ public class HdfsSystemConsumer extends BlockingEnvelopeMap {
       consumerMetrics.incNumEvents(systemStreamPartition);
       consumerMetrics.incTotalNumEvents();
     }
-    offerMessage(systemStreamPartition, EndOfStreamManager.buildEndOfStreamEnvelope(systemStreamPartition));
+    offerMessage(systemStreamPartition, IncomingMessageEnvelope.buildEndOfStreamEnvelope(systemStreamPartition));
     reader.close();
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java
index f313348..8493cf1 100644
--- a/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java
+++ b/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java
@@ -54,7 +54,7 @@ public class EndOfStreamIntegrationTest extends AbstractIntegrationTestHarness {
   @Test
   public void testPipeline() throws  Exception {
     Random random = new Random();
-    int count = 100;
+    int count = 10;
     PageView[] pageviews = new PageView[count];
     for (int i = 0; i < count; i++) {
       String pagekey = PAGEKEYS[random.nextInt(PAGEKEYS.length - 1)];

http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java
index 2eb72fc..d9202d3 100644
--- a/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java
+++ b/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java
@@ -36,8 +36,6 @@ import org.apache.samza.container.SamzaContainer;
 import org.apache.samza.container.TaskInstance;
 import org.apache.samza.container.TaskName;
 import org.apache.samza.container.grouper.task.SingleContainerGrouperFactory;
-import org.apache.samza.control.EndOfStreamManager;
-import org.apache.samza.control.WatermarkManager;
 import org.apache.samza.metrics.MetricsRegistry;
 import org.apache.samza.operators.impl.InputOperatorImpl;
 import org.apache.samza.operators.impl.OperatorImpl;
@@ -85,14 +83,14 @@ public class WatermarkIntegrationTest extends AbstractIntegrationTestHarness {
   static {
     TEST_DATA.add(createIncomingMessage(new PageView("inbox", 1), SSP0));
     TEST_DATA.add(createIncomingMessage(new PageView("home", 2), SSP1));
-    TEST_DATA.add(WatermarkManager.buildWatermarkEnvelope(1, SSP0));
-    TEST_DATA.add(WatermarkManager.buildWatermarkEnvelope(2, SSP1));
-    TEST_DATA.add(WatermarkManager.buildWatermarkEnvelope(4, SSP0));
-    TEST_DATA.add(WatermarkManager.buildWatermarkEnvelope(3, SSP1));
+    TEST_DATA.add(IncomingMessageEnvelope.buildWatermarkEnvelope(SSP0, 1));
+    TEST_DATA.add(IncomingMessageEnvelope.buildWatermarkEnvelope(SSP1, 2));
+    TEST_DATA.add(IncomingMessageEnvelope.buildWatermarkEnvelope(SSP0, 4));
+    TEST_DATA.add(IncomingMessageEnvelope.buildWatermarkEnvelope(SSP1, 3));
     TEST_DATA.add(createIncomingMessage(new PageView("search", 3), SSP0));
     TEST_DATA.add(createIncomingMessage(new PageView("pymk", 4), SSP1));
-    TEST_DATA.add(EndOfStreamManager.buildEndOfStreamEnvelope(SSP0));
-    TEST_DATA.add(EndOfStreamManager.buildEndOfStreamEnvelope(SSP1));
+    TEST_DATA.add(IncomingMessageEnvelope.buildEndOfStreamEnvelope(SSP0));
+    TEST_DATA.add(IncomingMessageEnvelope.buildEndOfStreamEnvelope(SSP1));
   }
 
   public final static class TestSystemFactory implements SystemFactory {

http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-test/src/test/java/org/apache/samza/test/util/ArraySystemConsumer.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/util/ArraySystemConsumer.java b/samza-test/src/test/java/org/apache/samza/test/util/ArraySystemConsumer.java
index 9b96216..832457b 100644
--- a/samza-test/src/test/java/org/apache/samza/test/util/ArraySystemConsumer.java
+++ b/samza-test/src/test/java/org/apache/samza/test/util/ArraySystemConsumer.java
@@ -27,7 +27,6 @@ import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 import org.apache.samza.config.Config;
-import org.apache.samza.control.EndOfStreamManager;
 import org.apache.samza.system.IncomingMessageEnvelope;
 import org.apache.samza.system.SystemConsumer;
 import org.apache.samza.system.SystemStreamPartition;
@@ -62,7 +61,7 @@ public class ArraySystemConsumer implements SystemConsumer {
       set.forEach(ssp -> {
           List<IncomingMessageEnvelope> envelopes = Arrays.stream(getArrayObjects(ssp.getSystemStream().getStream(), config))
               .map(object -> new IncomingMessageEnvelope(ssp, null, null, object)).collect(Collectors.toList());
-          envelopes.add(EndOfStreamManager.buildEndOfStreamEnvelope(ssp));
+          envelopes.add(IncomingMessageEnvelope.buildEndOfStreamEnvelope(ssp));
           envelopeMap.put(ssp, envelopes);
         });
       done = true;


[3/3] samza git commit: SAMZA-1386: Inline End-of-stream and Watermark logic inside OperatorImpl

Posted by xi...@apache.org.
SAMZA-1386: Inline End-of-stream and Watermark logic inside OperatorImpl

This patch contains the following changes:
1. Refactor watermark and end-of-stream logic. The aggregation/handling has been moved from WatermarkManager/EndOfStreamManager to be inline inside OperatorImpl. This is for keeping the logic in one place.
2. Now subclass of OperatorImpl will override handleWatermark() to do its specific handling, such as fire trigger.
3. Add emitWatermark() in OperatorImpl so subclass can call it to emit watermark upon receiving a message or watermark.

Author: Xinyu Liu <xi...@xiliu-ld1.linkedin.biz>
Author: Xinyu Liu <xi...@xiliu-ld.linkedin.biz>

Reviewers: Yi Pan <ni...@gmail.com>

Closes #277 from xinyuiscool/SAMZA-1386


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/2819cbc7
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/2819cbc7
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/2819cbc7

Branch: refs/heads/0.14.0
Commit: 2819cbc7691b569b3eef66d702746d9e34b3e745
Parents: 4754148
Author: Xinyu Liu <xi...@gmail.com>
Authored: Thu Sep 28 17:11:28 2017 -0700
Committer: Xinyu Liu <xi...@xiliu-ld1.linkedin.biz>
Committed: Thu Sep 28 17:11:28 2017 -0700

----------------------------------------------------------------------
 build.gradle                                    |   1 +
 .../operators/functions/WatermarkFunction.java  |  58 ++++
 .../org/apache/samza/system/ControlMessage.java |  46 +++
 .../apache/samza/system/EndOfStreamMessage.java |  38 +++
 .../samza/system/IncomingMessageEnvelope.java   |   7 +-
 .../org/apache/samza/system/MessageType.java    |  45 +++
 .../apache/samza/system/WatermarkMessage.java   |  46 +++
 .../apache/samza/container/TaskContextImpl.java | 131 ++++++++
 .../control/ControlMessageListenerTask.java     |  49 ---
 .../samza/control/ControlMessageUtils.java      |  81 -----
 .../samza/control/EndOfStreamManager.java       | 159 ---------
 .../java/org/apache/samza/control/IOGraph.java  | 113 -------
 .../org/apache/samza/control/Watermark.java     |  57 ----
 .../apache/samza/control/WatermarkManager.java  | 187 -----------
 .../apache/samza/message/ControlMessage.java    |  52 ---
 .../samza/message/EndOfStreamMessage.java       |  36 --
 .../samza/message/IntermediateMessageType.java  |  46 ---
 .../org/apache/samza/message/MessageType.java   |  46 ---
 .../apache/samza/message/WatermarkMessage.java  |  43 ---
 .../apache/samza/operators/StreamGraphImpl.java |   5 -
 .../operators/impl/ControlMessageSender.java    |  56 ++++
 .../samza/operators/impl/EndOfStreamStates.java |  98 ++++++
 .../samza/operators/impl/OperatorImpl.java      | 198 ++++++++---
 .../samza/operators/impl/OperatorImplGraph.java |  98 +++++-
 .../operators/impl/OutputOperatorImpl.java      |  32 +-
 .../samza/operators/impl/WatermarkStates.java   | 119 +++++++
 .../samza/operators/spec/InputOperatorSpec.java |   6 +
 .../samza/operators/spec/JoinOperatorSpec.java  |   6 +
 .../samza/operators/spec/OperatorSpec.java      |   3 +
 .../operators/spec/OutputOperatorSpec.java      |   7 +
 .../samza/operators/spec/SinkOperatorSpec.java  |   6 +
 .../operators/spec/StreamOperatorSpec.java      |   6 +
 .../operators/spec/WindowOperatorSpec.java      |   8 +
 .../serializers/IntermediateMessageSerde.java   |  18 +-
 .../samza/task/AsyncStreamTaskAdapter.java      |  22 +-
 .../apache/samza/task/StreamOperatorTask.java   |  45 ++-
 .../apache/samza/checkpoint/OffsetManager.scala |   7 +-
 .../apache/samza/container/TaskInstance.scala   | 185 ++---------
 .../apache/samza/serializers/SerdeManager.scala |   5 +-
 .../samza/control/TestControlMessageUtils.java  | 115 -------
 .../samza/control/TestEndOfStreamManager.java   | 333 -------------------
 .../org/apache/samza/control/TestIOGraph.java   | 200 -----------
 .../samza/control/TestWatermarkManager.java     | 260 ---------------
 .../samza/operators/TestJoinOperator.java       |   3 +-
 .../samza/operators/TestWindowOperator.java     |   6 +-
 .../impl/TestControlMessageSender.java          |  73 ++++
 .../operators/impl/TestEndOfStreamStates.java   |  78 +++++
 .../samza/operators/impl/TestOperatorImpl.java  |  32 +-
 .../operators/impl/TestOperatorImplGraph.java   | 181 +++++++++-
 .../operators/impl/TestWatermarkStates.java     | 102 ++++++
 .../TestIntermediateMessageSerde.java           |  18 +-
 .../org/apache/samza/task/TestAsyncRunLoop.java |   7 +-
 .../samza/container/TestTaskInstance.scala      |  63 +---
 .../samza/serializers/TestSerdeManager.scala    |  10 +-
 .../samza/system/hdfs/HdfsSystemConsumer.java   |   3 +-
 .../EndOfStreamIntegrationTest.java             |   2 +-
 .../WatermarkIntegrationTest.java               |  14 +-
 .../samza/test/util/ArraySystemConsumer.java    |   3 +-
 58 files changed, 1521 insertions(+), 2153 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 319e51f..e10b970 100644
--- a/build.gradle
+++ b/build.gradle
@@ -125,6 +125,7 @@ project(':samza-api') {
 
   dependencies {
     compile "org.slf4j:slf4j-api:$slf4jVersion"
+    compile "org.codehaus.jackson:jackson-mapper-asl:$jacksonVersion"
     testCompile "junit:junit:$junitVersion"
     testCompile "org.mockito:mockito-all:$mockitoVersion"
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-api/src/main/java/org/apache/samza/operators/functions/WatermarkFunction.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/WatermarkFunction.java b/samza-api/src/main/java/org/apache/samza/operators/functions/WatermarkFunction.java
new file mode 100644
index 0000000..9c4b9bf
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/operators/functions/WatermarkFunction.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.operators.functions;
+
+/**
+ * Allows user-specific handling of Watermark
+ */
+public interface WatermarkFunction {
+
+  /**
+   * Processes the input watermark coming from upstream operators.
+   * This allows user-defined watermark handling, such as trigger events
+   * or propagate it to downstream.
+   * @param watermark input watermark
+   */
+  void processWatermark(long watermark);
+
+  /**
+   * Returns the output watermark. This function will be invoked immediately after either
+   * of the following events:
+   *
+   * <ol>
+   *
+   * <li> Return of the transform function, e.g. {@link FlatMapFunction}.
+   *
+   * <li> Return of the processWatermark function.
+   *
+   * </ol>
+   *
+   *
+   *
+   * Note: If the transform function returns a collection of output, the output watermark
+   * will be emitted after the output collection is propagated to downstream operators. So
+   * it might delay the watermark propagation. The delay will cause more buffering and might
+   * have performance impact.
+   *
+   * @return output watermark, or null if the output watermark should not be updated. Samza
+   * guarantees that the same watermark value will be only emitted once.
+   */
+  Long getOutputWatermark();
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-api/src/main/java/org/apache/samza/system/ControlMessage.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/ControlMessage.java b/samza-api/src/main/java/org/apache/samza/system/ControlMessage.java
new file mode 100644
index 0000000..4ec58b4
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/system/ControlMessage.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system;
+
+/**
+ * The abstract class of all control messages, containing
+ * the task that produces the control message, the total number of producer tasks,
+ * and a version number.
+ */
+public abstract class ControlMessage {
+  private final String taskName;
+  private int version = 1;
+
+  public ControlMessage(String taskName) {
+    this.taskName = taskName;
+  }
+
+  public String getTaskName() {
+    return taskName;
+  }
+
+  public void setVersion(int version) {
+    this.version = version;
+  }
+
+  public int getVersion() {
+    return version;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-api/src/main/java/org/apache/samza/system/EndOfStreamMessage.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/EndOfStreamMessage.java b/samza-api/src/main/java/org/apache/samza/system/EndOfStreamMessage.java
new file mode 100644
index 0000000..59d0356
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/system/EndOfStreamMessage.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system;
+
+import org.codehaus.jackson.annotate.JsonCreator;
+import org.codehaus.jackson.annotate.JsonProperty;
+
+/**
+ *  The EndOfStreamMessage is a control message that is sent out to next stage
+ *  once the task has consumed to the end of a bounded stream.
+ */
+public class EndOfStreamMessage extends ControlMessage {
+  public EndOfStreamMessage() {
+    this(null);
+  }
+
+  @JsonCreator
+  public EndOfStreamMessage(@JsonProperty("task-name") String taskName) {
+    super(taskName);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java b/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java
index 9182522..96fa81c 100644
--- a/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java
+++ b/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java
@@ -96,9 +96,12 @@ public class IncomingMessageEnvelope {
    * @param ssp The SSP that is at end-of-stream.
    * @return an IncomingMessageEnvelope corresponding to end-of-stream for that SSP.
    */
-  @Deprecated
   public static IncomingMessageEnvelope buildEndOfStreamEnvelope(SystemStreamPartition ssp) {
-    return new IncomingMessageEnvelope(ssp, END_OF_STREAM_OFFSET, null, null);
+    return new IncomingMessageEnvelope(ssp, END_OF_STREAM_OFFSET, null, new EndOfStreamMessage(null));
+  }
+
+  public static IncomingMessageEnvelope buildWatermarkEnvelope(SystemStreamPartition ssp, long watermark) {
+    return new IncomingMessageEnvelope(ssp, null, null, new WatermarkMessage(watermark, null));
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-api/src/main/java/org/apache/samza/system/MessageType.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/MessageType.java b/samza-api/src/main/java/org/apache/samza/system/MessageType.java
new file mode 100644
index 0000000..7129d00
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/system/MessageType.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system;
+
+/**
+ * The type of the intermediate stream message. The enum will be encoded using its ordinal value and
+ * put in the first byte of the serialization of intermediate message.
+ */
+public enum MessageType {
+  USER_MESSAGE,
+  WATERMARK,
+  END_OF_STREAM;
+
+  /**
+   * Returns the {@link MessageType} of a particular intermediate stream message.
+   * @param message an intermediate stream message
+   * @return type of the message
+   */
+  public static MessageType of(Object message) {
+    if (message instanceof WatermarkMessage) {
+      return WATERMARK;
+    } else if (message instanceof EndOfStreamMessage) {
+      return END_OF_STREAM;
+    } else {
+      return USER_MESSAGE;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-api/src/main/java/org/apache/samza/system/WatermarkMessage.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/WatermarkMessage.java b/samza-api/src/main/java/org/apache/samza/system/WatermarkMessage.java
new file mode 100644
index 0000000..7278c5c
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/system/WatermarkMessage.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system;
+
+import org.codehaus.jackson.annotate.JsonCreator;
+import org.codehaus.jackson.annotate.JsonProperty;
+
+/**
+ *  The WatermarkMessage is a control message that is sent out to next stage
+ *  with a watermark timestamp and the task that produces the watermark.
+ */
+public class WatermarkMessage extends ControlMessage {
+  private final long timestamp;
+
+  public WatermarkMessage(long watermark) {
+    this(watermark, null);
+  }
+
+  @JsonCreator
+  public WatermarkMessage(@JsonProperty("timestamp") long timestamp,
+                          @JsonProperty("task-name") String taskName) {
+    super(taskName);
+    this.timestamp = timestamp;
+  }
+
+  public long getTimestamp() {
+    return timestamp;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/main/java/org/apache/samza/container/TaskContextImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/TaskContextImpl.java b/samza-core/src/main/java/org/apache/samza/container/TaskContextImpl.java
new file mode 100644
index 0000000..7990d2b
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/container/TaskContextImpl.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.container;
+
+import com.google.common.collect.ImmutableSet;
+import org.apache.samza.checkpoint.OffsetManager;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.metrics.ReadableMetricsRegistry;
+import org.apache.samza.storage.StorageEngine;
+import org.apache.samza.storage.TaskStorageManager;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.task.TaskContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+public class TaskContextImpl implements TaskContext {
+  private static final Logger LOG = LoggerFactory.getLogger(TaskContextImpl.class);
+
+  private final TaskName taskName;
+  private final TaskInstanceMetrics metrics;
+  private final SamzaContainerContext containerContext;
+  private final Set<SystemStreamPartition> systemStreamPartitions;
+  private final OffsetManager offsetManager;
+  private final TaskStorageManager storageManager;
+  private final JobModel jobModel;
+  private final StreamMetadataCache streamMetadataCache;
+  private final Map<String, Object> objectRegistry = new HashMap<>();
+
+  private Object userContext = null;
+
+  public TaskContextImpl(TaskName taskName,
+                         TaskInstanceMetrics metrics,
+                         SamzaContainerContext containerContext,
+                         Set<SystemStreamPartition> systemStreamPartitions,
+                         OffsetManager offsetManager,
+                         TaskStorageManager storageManager,
+                         JobModel jobModel,
+                         StreamMetadataCache streamMetadataCache) {
+    this.taskName = taskName;
+    this.metrics = metrics;
+    this.containerContext = containerContext;
+    this.systemStreamPartitions = ImmutableSet.copyOf(systemStreamPartitions);
+    this.offsetManager = offsetManager;
+    this.storageManager = storageManager;
+    this.jobModel = jobModel;
+    this.streamMetadataCache = streamMetadataCache;
+  }
+
+  @Override
+  public ReadableMetricsRegistry getMetricsRegistry() {
+    return metrics.registry();
+  }
+
+  @Override
+  public Set<SystemStreamPartition> getSystemStreamPartitions() {
+    return systemStreamPartitions;
+  }
+
+  @Override
+  public StorageEngine getStore(String storeName) {
+    if (storageManager != null) {
+      return storageManager.apply(storeName);
+    } else {
+      LOG.warn("No store found for name: {}", storeName);
+      return null;
+    }
+  }
+
+  @Override
+  public TaskName getTaskName() {
+    return taskName;
+  }
+
+  @Override
+  public SamzaContainerContext getSamzaContainerContext() {
+    return containerContext;
+  }
+
+  @Override
+  public void setStartingOffset(SystemStreamPartition ssp, String offset) {
+    offsetManager.setStartingOffset(taskName, ssp, offset);
+  }
+
+  @Override
+  public void setUserContext(Object context) {
+    userContext = context;
+  }
+
+  @Override
+  public Object getUserContext() {
+    return userContext;
+  }
+
+  public void registerObject(String name, Object value) {
+    objectRegistry.put(name, value);
+  }
+
+  public Object fetchObject(String name) {
+    return objectRegistry.get(name);
+  }
+
+  public JobModel getJobModel() {
+    return jobModel;
+  }
+
+  public StreamMetadataCache getStreamMetadataCache() {
+    return streamMetadataCache;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/main/java/org/apache/samza/control/ControlMessageListenerTask.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/control/ControlMessageListenerTask.java b/samza-core/src/main/java/org/apache/samza/control/ControlMessageListenerTask.java
deleted file mode 100644
index 9e4b40a..0000000
--- a/samza-core/src/main/java/org/apache/samza/control/ControlMessageListenerTask.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.control;
-
-import org.apache.samza.system.SystemStream;
-import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.TaskCoordinator;
-
-
-/**
- * The listener interface for the aggregation result of control messages.
- * Any task that handles control messages such as {@link org.apache.samza.message.EndOfStreamMessage}
- * and {@link org.apache.samza.message.WatermarkMessage} needs to implement this interface.
- */
-public interface ControlMessageListenerTask {
-
-  /**
-   * Returns the topology of the streams. Any control message listener needs to
-   * provide this topology so Samza can propagate the control message to downstreams.
-   * @return {@link IOGraph} of input to output streams. It
-   */
-  IOGraph getIOGraph();
-
-  /**
-   * Invoked when a Watermark comes.
-   * @param watermark contains the watermark timestamp
-   * @param systemStream source of stream that emits the watermark
-   * @param collector message collector
-   * @param coordinator task coordinator
-   */
-  void onWatermark(Watermark watermark, SystemStream systemStream, MessageCollector collector, TaskCoordinator coordinator);
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/main/java/org/apache/samza/control/ControlMessageUtils.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/control/ControlMessageUtils.java b/samza-core/src/main/java/org/apache/samza/control/ControlMessageUtils.java
deleted file mode 100644
index ebb0d86..0000000
--- a/samza-core/src/main/java/org/apache/samza/control/ControlMessageUtils.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.control;
-
-import com.google.common.collect.Multimap;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.stream.Collectors;
-import org.apache.samza.message.ControlMessage;
-import org.apache.samza.system.OutgoingMessageEnvelope;
-import org.apache.samza.system.StreamMetadataCache;
-import org.apache.samza.system.SystemStream;
-import org.apache.samza.system.SystemStreamMetadata;
-import org.apache.samza.task.MessageCollector;
-
-
-/**
- * This class privates static utils for handling control messages
- */
-class ControlMessageUtils {
-
-  /**
-   * Send a control message to every partition of the {@link SystemStream}
-   * @param message control message
-   * @param systemStream the stream to sent
-   * @param metadataCache stream metadata cache
-   * @param collector collector to send the message
-   */
-  static void sendControlMessage(ControlMessage message,
-      SystemStream systemStream,
-      StreamMetadataCache metadataCache,
-      MessageCollector collector) {
-    SystemStreamMetadata metadata = metadataCache.getSystemStreamMetadata(systemStream, true);
-    int partitionCount = metadata.getSystemStreamPartitionMetadata().size();
-    for (int i = 0; i < partitionCount; i++) {
-      OutgoingMessageEnvelope envelopeOut = new OutgoingMessageEnvelope(systemStream, i, "", message);
-      collector.send(envelopeOut);
-    }
-  }
-
-  /**
-   * Calculate the mapping from an output stream to the number of upstream tasks that will produce to the output stream
-   * @param inputToTasks input stream to its consumer tasks mapping
-   * @param ioGraph topology of the stream inputs and outputs
-   * @return mapping from output to upstream task count
-   */
-  static Map<SystemStream, Integer> calculateUpstreamTaskCounts(Multimap<SystemStream, String> inputToTasks,
-      IOGraph ioGraph) {
-    if (ioGraph == null) {
-      return Collections.EMPTY_MAP;
-    }
-    Map<SystemStream, Integer> outputTaskCount = new HashMap<>();
-    ioGraph.getNodes().forEach(node -> {
-        // for each input stream, find out the tasks that are consuming this input stream using the inputToTasks mapping,
-        // then count the number of tasks
-        int count = node.getInputs().stream().flatMap(spec -> inputToTasks.get(spec.toSystemStream()).stream())
-            .collect(Collectors.toSet()).size();
-        // put the count of input tasks to the result
-        outputTaskCount.put(node.getOutput().toSystemStream(), count);
-      });
-    return outputTaskCount;
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/main/java/org/apache/samza/control/EndOfStreamManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/control/EndOfStreamManager.java b/samza-core/src/main/java/org/apache/samza/control/EndOfStreamManager.java
deleted file mode 100644
index 78a8741..0000000
--- a/samza-core/src/main/java/org/apache/samza/control/EndOfStreamManager.java
+++ /dev/null
@@ -1,159 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.control;
-
-import com.google.common.collect.Multimap;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import org.apache.samza.message.EndOfStreamMessage;
-import org.apache.samza.system.IncomingMessageEnvelope;
-import org.apache.samza.system.StreamMetadataCache;
-import org.apache.samza.system.SystemStream;
-import org.apache.samza.system.SystemStreamPartition;
-import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.TaskCoordinator;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * This class handles the end-of-stream control message. It aggregates the end-of-stream state for each input ssps of
- * a task, and propagate the eos messages to downstream intermediate streams if needed.
- *
- * Internal use only.
- */
-public class EndOfStreamManager {
-  private static final Logger log = LoggerFactory.getLogger(EndOfStreamManager.class);
-
-  private final String taskName;
-  private final MessageCollector collector;
-  // end-of-stream state per ssp
-  private final Map<SystemStreamPartition, EndOfStreamState> eosStates;
-  private final StreamMetadataCache metadataCache;
-  // topology information. Set during init()
-  private final ControlMessageListenerTask listener;
-  // mapping from output stream to its upstream task count
-  private final Map<SystemStream, Integer> upstreamTaskCounts;
-
-  public EndOfStreamManager(String taskName,
-      ControlMessageListenerTask listener,
-      Multimap<SystemStream, String> inputToTasks,
-      Set<SystemStreamPartition> ssps,
-      StreamMetadataCache metadataCache,
-      MessageCollector collector) {
-    this.taskName = taskName;
-    this.listener = listener;
-    this.metadataCache = metadataCache;
-    this.collector = collector;
-    Map<SystemStreamPartition, EndOfStreamState> states = new HashMap<>();
-    ssps.forEach(ssp -> {
-        states.put(ssp, new EndOfStreamState());
-      });
-    this.eosStates = Collections.unmodifiableMap(states);
-    this.upstreamTaskCounts = ControlMessageUtils.calculateUpstreamTaskCounts(inputToTasks, listener.getIOGraph());
-  }
-
-  public void update(IncomingMessageEnvelope envelope, TaskCoordinator coordinator) {
-    EndOfStreamState state = eosStates.get(envelope.getSystemStreamPartition());
-    EndOfStreamMessage message = (EndOfStreamMessage) envelope.getMessage();
-    state.update(message.getTaskName(), message.getTaskCount());
-    log.info("Received end-of-stream from task " + message.getTaskName() + " in " + envelope.getSystemStreamPartition());
-
-    SystemStream systemStream = envelope.getSystemStreamPartition().getSystemStream();
-    if (isEndOfStream(systemStream)) {
-      log.info("End-of-stream of input " + systemStream + " for " + systemStream);
-      listener.getIOGraph().getNodesOfInput(systemStream).forEach(node -> {
-          // find the intermediate streams that need broadcast the eos messages
-          if (node.isOutputIntermediate()) {
-            // check all the input stream partitions assigned to the task are end-of-stream
-            boolean inputsEndOfStream = node.getInputs().stream().allMatch(spec -> isEndOfStream(spec.toSystemStream()));
-            if (inputsEndOfStream) {
-              // broadcast the end-of-stream message to the intermediate stream
-              SystemStream outputStream = node.getOutput().toSystemStream();
-              sendEndOfStream(outputStream, upstreamTaskCounts.get(outputStream));
-            }
-          }
-        });
-
-      boolean allEndOfStream = eosStates.values().stream().allMatch(EndOfStreamState::isEndOfStream);
-      if (allEndOfStream) {
-        // all inputs have been end-of-stream, shut down the task
-        log.info("All input streams have reached the end for task " + taskName);
-        coordinator.shutdown(TaskCoordinator.RequestScope.CURRENT_TASK);
-      }
-    }
-  }
-
-  /**
-   * Return true if all partitions of the systemStream that are assigned to the current task have reached EndOfStream.
-   * @param systemStream stream
-   * @return whether the stream reaches to the end for this task
-   */
-  boolean isEndOfStream(SystemStream systemStream) {
-    return eosStates.entrySet().stream()
-        .filter(entry -> entry.getKey().getSystemStream().equals(systemStream))
-        .allMatch(entry -> entry.getValue().isEndOfStream());
-  }
-
-  /**
-   * Send the EndOfStream control messages to downstream
-   * @param systemStream downstream stream
-   */
-  void sendEndOfStream(SystemStream systemStream, int taskCount) {
-    log.info("Send end-of-stream messages with upstream task count {} to all partitions of {}", taskCount, systemStream);
-    final EndOfStreamMessage message = new EndOfStreamMessage(taskName, taskCount);
-    ControlMessageUtils.sendControlMessage(message, systemStream, metadataCache, collector);
-  }
-
-  /**
-   * This class keeps the internal state for a ssp to be end-of-stream.
-   */
-  final static class EndOfStreamState {
-    // set of upstream tasks
-    private final Set<String> tasks = new HashSet<>();
-    private int expectedTotal = Integer.MAX_VALUE;
-    private boolean isEndOfStream = false;
-
-    void update(String taskName, int taskCount) {
-      if (taskName != null) {
-        tasks.add(taskName);
-      }
-      expectedTotal = taskCount;
-      isEndOfStream = tasks.size() == expectedTotal;
-    }
-
-    boolean isEndOfStream() {
-      return isEndOfStream;
-    }
-  }
-
-  /**
-   * Build an end-of-stream envelope for an ssp of a source input.
-   *
-   * @param ssp The SSP that is at end-of-stream.
-   * @return an IncomingMessageEnvelope corresponding to end-of-stream for that SSP.
-   */
-  public static IncomingMessageEnvelope buildEndOfStreamEnvelope(SystemStreamPartition ssp) {
-    return new IncomingMessageEnvelope(ssp, IncomingMessageEnvelope.END_OF_STREAM_OFFSET, null, new EndOfStreamMessage(null, 0));
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/main/java/org/apache/samza/control/IOGraph.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/control/IOGraph.java b/samza-core/src/main/java/org/apache/samza/control/IOGraph.java
deleted file mode 100644
index a30c13d..0000000
--- a/samza-core/src/main/java/org/apache/samza/control/IOGraph.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.control;
-
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Multimap;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import org.apache.samza.operators.StreamGraphImpl;
-import org.apache.samza.operators.spec.OperatorSpec;
-import org.apache.samza.operators.spec.OutputOperatorSpec;
-import org.apache.samza.system.StreamSpec;
-import org.apache.samza.system.SystemStream;
-
-
-/**
- * This class provides the topology of stream inputs to outputs.
- */
-public class IOGraph {
-
-  public static final class IONode {
-    private final Set<StreamSpec> inputs = new HashSet<>();
-    private final StreamSpec output;
-    private final boolean isOutputIntermediate;
-
-    IONode(StreamSpec output, boolean isOutputIntermediate) {
-      this.output = output;
-      this.isOutputIntermediate = isOutputIntermediate;
-    }
-
-    void addInput(StreamSpec input) {
-      inputs.add(input);
-    }
-
-    public Set<StreamSpec> getInputs() {
-      return Collections.unmodifiableSet(inputs);
-    }
-
-    public StreamSpec getOutput() {
-      return output;
-    }
-
-    public boolean isOutputIntermediate() {
-      return isOutputIntermediate;
-    }
-  }
-
-  final Collection<IONode> nodes;
-  final Multimap<SystemStream, IONode> inputToNodes;
-
-  public IOGraph(Collection<IONode> nodes) {
-    this.nodes = Collections.unmodifiableCollection(nodes);
-    this.inputToNodes = HashMultimap.create();
-    nodes.forEach(node -> {
-        node.getInputs().forEach(stream -> {
-            inputToNodes.put(new SystemStream(stream.getSystemName(), stream.getPhysicalName()), node);
-          });
-      });
-  }
-
-  public Collection<IONode> getNodes() {
-    return this.nodes;
-  }
-
-  public Collection<IONode> getNodesOfInput(SystemStream input) {
-    return inputToNodes.get(input);
-  }
-
-  public static IOGraph buildIOGraph(StreamGraphImpl streamGraph) {
-    Map<Integer, IONode> nodes = new HashMap<>();
-    streamGraph.getInputOperators().entrySet().stream()
-        .forEach(entry -> buildIONodes(entry.getKey(), entry.getValue(), nodes));
-    return new IOGraph(nodes.values());
-  }
-
-  /* package private */
-  static void buildIONodes(StreamSpec input, OperatorSpec opSpec, Map<Integer, IONode> ioGraph) {
-    if (opSpec instanceof OutputOperatorSpec) {
-      OutputOperatorSpec outputOpSpec = (OutputOperatorSpec) opSpec;
-      IONode node = ioGraph.get(opSpec.getOpId());
-      if (node == null) {
-        StreamSpec output = outputOpSpec.getOutputStream().getStreamSpec();
-        node = new IONode(output, outputOpSpec.getOpCode() == OperatorSpec.OpCode.PARTITION_BY);
-        ioGraph.put(opSpec.getOpId(), node);
-      }
-      node.addInput(input);
-    }
-
-    Collection<OperatorSpec> nextOperators = opSpec.getRegisteredOperatorSpecs();
-    nextOperators.forEach(spec -> buildIONodes(input, spec, ioGraph));
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/main/java/org/apache/samza/control/Watermark.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/control/Watermark.java b/samza-core/src/main/java/org/apache/samza/control/Watermark.java
deleted file mode 100644
index a11e3b0..0000000
--- a/samza-core/src/main/java/org/apache/samza/control/Watermark.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.control;
-
-import org.apache.samza.annotation.InterfaceStability;
-import org.apache.samza.system.SystemStream;
-
-
-/**
- * A watermark is a monotonically increasing value, which represents the point up to which the
- * system believes it has received all of the data before the watermark timestamp. Data that arrives
- * with a timestamp that is before the watermark is considered late.
- *
- * <p>This is the aggregate result from the WatermarkManager, which keeps track of the control message
- * {@link org.apache.samza.message.WatermarkMessage} and aggregate by returning the min of all watermark timestamp
- * in each partition.
- */
-@InterfaceStability.Unstable
-public interface Watermark {
-  /**
-   * Returns the timestamp of the watermark
-   * Note that if the task consumes more than one partitions of this stream, the watermark emitted is the min of
-   * watermarks across all partitions.
-   * @return timestamp
-   */
-  long getTimestamp();
-
-  /**
-   * Propagates the watermark to an intermediate stream
-   * @param systemStream intermediate stream
-   */
-  void propagate(SystemStream systemStream);
-
-  /**
-   * Create a copy of the watermark with the timestamp
-   * @param timestamp new timestamp
-   * @return new watermark
-   */
-  Watermark copyWithTimestamp(long timestamp);
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/main/java/org/apache/samza/control/WatermarkManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/control/WatermarkManager.java b/samza-core/src/main/java/org/apache/samza/control/WatermarkManager.java
deleted file mode 100644
index c4fdd88..0000000
--- a/samza-core/src/main/java/org/apache/samza/control/WatermarkManager.java
+++ /dev/null
@@ -1,187 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.control;
-
-import com.google.common.collect.Multimap;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import org.apache.samza.message.WatermarkMessage;
-import org.apache.samza.system.IncomingMessageEnvelope;
-import org.apache.samza.system.StreamMetadataCache;
-import org.apache.samza.system.SystemStream;
-import org.apache.samza.system.SystemStreamPartition;
-import org.apache.samza.task.MessageCollector;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * This class manages watermarks. It aggregates the watermark control messages from the upstage tasks
- * for each SSP into an envelope of {@link Watermark}, and provide a dispatcher to propagate it to downstream.
- *
- * Internal use only.
- */
-public class WatermarkManager {
-  private static final Logger log = LoggerFactory.getLogger(WatermarkManager.class);
-  public static final long TIME_NOT_EXIST = -1;
-
-  private final String taskName;
-  private final Map<SystemStreamPartition, WatermarkState> watermarkStates;
-  private final Map<SystemStream, Long> watermarkPerStream;
-  private final StreamMetadataCache metadataCache;
-  private final MessageCollector collector;
-  // mapping from output stream to its upstream task count
-  private final Map<SystemStream, Integer> upstreamTaskCounts;
-
-  public WatermarkManager(String taskName,
-      ControlMessageListenerTask listener,
-      Multimap<SystemStream, String> inputToTasks,
-      Set<SystemStreamPartition> ssps,
-      StreamMetadataCache metadataCache,
-      MessageCollector collector) {
-    this.taskName = taskName;
-    this.watermarkPerStream = new HashMap<>();
-    this.metadataCache = metadataCache;
-    this.collector = collector;
-    this.upstreamTaskCounts = ControlMessageUtils.calculateUpstreamTaskCounts(inputToTasks, listener.getIOGraph());
-
-    Map<SystemStreamPartition, WatermarkState> states = new HashMap<>();
-    ssps.forEach(ssp -> {
-        states.put(ssp, new WatermarkState());
-        watermarkPerStream.put(ssp.getSystemStream(), TIME_NOT_EXIST);
-      });
-    this.watermarkStates = Collections.unmodifiableMap(states);
-  }
-
-  /**
-   * Update the watermark based on the incoming watermark message. The message contains
-   * a timestamp and the upstream producer task. The aggregation result is the minimal value
-   * of all watermarks for the stream:
-   * <ul>
-   *   <li>Watermark(ssp) = min { Watermark(task) | task is upstream producer and the count equals total expected tasks } </li>
-   *   <li>Watermark(stream) = min { Watermark(ssp) | ssp is a partition of stream that assigns to this task } </li>
-   * </ul>
-   *
-   * @param envelope the envelope contains {@link WatermarkMessage}
-   * @return watermark envelope if there is a new aggregate watermark for the stream
-   */
-  public Watermark update(IncomingMessageEnvelope envelope) {
-    SystemStreamPartition ssp = envelope.getSystemStreamPartition();
-    WatermarkState state = watermarkStates.get(ssp);
-    WatermarkMessage message = (WatermarkMessage) envelope.getMessage();
-    state.update(message.getTimestamp(), message.getTaskName(), message.getTaskCount());
-
-    if (state.getWatermarkTime() != TIME_NOT_EXIST) {
-      long minTimestamp = watermarkStates.entrySet().stream()
-          .filter(entry -> entry.getKey().getSystemStream().equals(ssp.getSystemStream()))
-          .map(entry -> entry.getValue().getWatermarkTime())
-          .min(Long::compare)
-          .get();
-      Long curWatermark = watermarkPerStream.get(ssp.getSystemStream());
-      if (curWatermark == null || curWatermark < minTimestamp) {
-        watermarkPerStream.put(ssp.getSystemStream(), minTimestamp);
-        return new WatermarkImpl(minTimestamp);
-      }
-    }
-
-    return null;
-  }
-
-  /* package private */
-  long getWatermarkTime(SystemStreamPartition ssp) {
-    return watermarkStates.get(ssp).getWatermarkTime();
-  }
-
-  /**
-   * Send the watermark message to all partitions of an intermediate stream
-   * @param timestamp watermark timestamp
-   * @param systemStream intermediate stream
-   */
-  void sendWatermark(long timestamp, SystemStream systemStream, int taskCount) {
-    log.info("Send end-of-stream messages to all partitions of " + systemStream);
-    final WatermarkMessage watermarkMessage = new WatermarkMessage(timestamp, taskName, taskCount);
-    ControlMessageUtils.sendControlMessage(watermarkMessage, systemStream, metadataCache, collector);
-  }
-
-  /**
-   * Per ssp state of the watermarks. This class keeps track of the latest watermark timestamp
-   * from each upstream producer tasks, and use the min to update the aggregated watermark time.
-   */
-  final static class WatermarkState {
-    private int expectedTotal = Integer.MAX_VALUE;
-    private final Map<String, Long> timestamps = new HashMap<>();
-    private long watermarkTime = TIME_NOT_EXIST;
-
-    void update(long timestamp, String taskName, int taskCount) {
-      if (taskName != null) {
-        timestamps.put(taskName, timestamp);
-      }
-      expectedTotal = taskCount;
-
-      if (timestamps.size() == expectedTotal) {
-        Optional<Long> min = timestamps.values().stream().min(Long::compare);
-        watermarkTime = min.orElse(timestamp);
-      }
-    }
-
-    long getWatermarkTime() {
-      return watermarkTime;
-    }
-  }
-
-  /**
-   * Implementation of the Watermark. It keeps a reference to the {@link WatermarkManager}
-   */
-  class WatermarkImpl implements Watermark {
-    private final long timestamp;
-
-    WatermarkImpl(long timestamp) {
-      this.timestamp = timestamp;
-    }
-
-    @Override
-    public long getTimestamp() {
-      return timestamp;
-    }
-
-    @Override
-    public void propagate(SystemStream systemStream) {
-      sendWatermark(timestamp, systemStream, upstreamTaskCounts.get(systemStream));
-    }
-
-    @Override
-    public Watermark copyWithTimestamp(long time) {
-      return new WatermarkImpl(time);
-    }
-  }
-
-  /**
-   * Build a watermark control message envelope for an ssp of a source input.
-   * @param timestamp watermark time
-   * @param ssp {@link SystemStreamPartition} where the watermark coming from.
-   * @return envelope of the watermark control message
-   */
-  public static IncomingMessageEnvelope buildWatermarkEnvelope(long timestamp, SystemStreamPartition ssp) {
-    return new IncomingMessageEnvelope(ssp, null, "", new WatermarkMessage(timestamp, null, 0));
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/main/java/org/apache/samza/message/ControlMessage.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/message/ControlMessage.java b/samza-core/src/main/java/org/apache/samza/message/ControlMessage.java
deleted file mode 100644
index 46bf559..0000000
--- a/samza-core/src/main/java/org/apache/samza/message/ControlMessage.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.message;
-
-/**
- * The abstract class of all control messages, containing
- * the task that produces the control message, the total number of producer tasks,
- * and a version number.
- */
-public abstract class ControlMessage {
-  private final String taskName;
-  private final int taskCount;
-  private int version = 1;
-
-  public ControlMessage(String taskName, int taskCount) {
-    this.taskName = taskName;
-    this.taskCount = taskCount;
-  }
-
-  public String getTaskName() {
-    return taskName;
-  }
-
-  public int getTaskCount() {
-    return taskCount;
-  }
-
-  public void setVersion(int version) {
-    this.version = version;
-  }
-
-  public int getVersion() {
-    return version;
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/main/java/org/apache/samza/message/EndOfStreamMessage.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/message/EndOfStreamMessage.java b/samza-core/src/main/java/org/apache/samza/message/EndOfStreamMessage.java
deleted file mode 100644
index 91981a9..0000000
--- a/samza-core/src/main/java/org/apache/samza/message/EndOfStreamMessage.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.message;
-
-import org.codehaus.jackson.annotate.JsonCreator;
-import org.codehaus.jackson.annotate.JsonProperty;
-
-/**
- *  The EndOfStreamMessage is a control message that is sent out to next stage
- *  once the task has consumed to the end of a bounded stream.
- */
-public class EndOfStreamMessage extends ControlMessage {
-
-  @JsonCreator
-  public EndOfStreamMessage(@JsonProperty("task-name") String taskName,
-                            @JsonProperty("task-count") int taskCount) {
-    super(taskName, taskCount);
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/main/java/org/apache/samza/message/IntermediateMessageType.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/message/IntermediateMessageType.java b/samza-core/src/main/java/org/apache/samza/message/IntermediateMessageType.java
deleted file mode 100644
index 25fbb14..0000000
--- a/samza-core/src/main/java/org/apache/samza/message/IntermediateMessageType.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.message;
-
-/**
- * The type of the intermediate stream message. The enum will be encoded using its ordinal value and
- * put in the first byte of the serialization of intermediate message.
- * For more details, see {@link org.apache.samza.serializers.IntermediateMessageSerde}
- */
-public enum IntermediateMessageType {
-  USER_MESSAGE,
-  WATERMARK_MESSAGE,
-  END_OF_STREAM_MESSAGE;
-
-  /**
-   * Returns the {@link IntermediateMessageType} of a particular intermediate stream message.
-   * @param message an intermediate stream message
-   * @return type of the message
-   */
-  public static IntermediateMessageType of(Object message) {
-    if (message instanceof WatermarkMessage) {
-      return WATERMARK_MESSAGE;
-    } else if (message instanceof EndOfStreamMessage) {
-      return END_OF_STREAM_MESSAGE;
-    } else {
-      return USER_MESSAGE;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/main/java/org/apache/samza/message/MessageType.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/message/MessageType.java b/samza-core/src/main/java/org/apache/samza/message/MessageType.java
deleted file mode 100644
index b1199b6..0000000
--- a/samza-core/src/main/java/org/apache/samza/message/MessageType.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.message;
-
-/**
- * The type of the intermediate stream message. The enum will be encoded using its ordinal value and
- * put in the first byte of the serialization of intermediate message.
- * For more details, see {@link org.apache.samza.serializers.IntermediateMessageSerde}
- */
-public enum MessageType {
-  USER_MESSAGE,
-  WATERMARK,
-  END_OF_STREAM;
-
-  /**
-   * Returns the {@link MessageType} of a particular intermediate stream message.
-   * @param message an intermediate stream message
-   * @return type of the message
-   */
-  public static MessageType of(Object message) {
-    if (message instanceof WatermarkMessage) {
-      return WATERMARK;
-    } else if (message instanceof EndOfStreamMessage) {
-      return END_OF_STREAM;
-    } else {
-      return USER_MESSAGE;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/main/java/org/apache/samza/message/WatermarkMessage.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/message/WatermarkMessage.java b/samza-core/src/main/java/org/apache/samza/message/WatermarkMessage.java
deleted file mode 100644
index aa25742..0000000
--- a/samza-core/src/main/java/org/apache/samza/message/WatermarkMessage.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.message;
-
-import org.codehaus.jackson.annotate.JsonCreator;
-import org.codehaus.jackson.annotate.JsonProperty;
-
-/**
- *  The WatermarkMessage is a control message that is sent out to next stage
- *  with a watermark timestamp and the task that produces the watermark.
- */
-public class WatermarkMessage extends ControlMessage {
-  private final long timestamp;
-
-  @JsonCreator
-  public WatermarkMessage(@JsonProperty("timestamp") long timestamp,
-                          @JsonProperty("task-name") String taskName,
-                          @JsonProperty("task-count") int taskCount) {
-    super(taskName, taskCount);
-    this.timestamp = timestamp;
-  }
-
-  public long getTimestamp() {
-    return timestamp;
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java b/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
index 2aec49f..2c2eb56 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
@@ -24,7 +24,6 @@ import org.apache.samza.operators.spec.InputOperatorSpec;
 import org.apache.samza.operators.spec.OutputStreamImpl;
 import org.apache.samza.operators.stream.IntermediateMessageStreamImpl;
 import org.apache.samza.operators.spec.OperatorSpec;
-import org.apache.samza.control.IOGraph;
 import org.apache.samza.runtime.ApplicationRunner;
 import org.apache.samza.system.StreamSpec;
 
@@ -206,8 +205,4 @@ public class StreamGraphImpl implements StreamGraph {
 
     return windowOrJoinSpecs.size() != 0;
   }
-
-  public IOGraph toIOGraph() {
-    return IOGraph.buildIOGraph(this);
-  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/main/java/org/apache/samza/operators/impl/ControlMessageSender.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/ControlMessageSender.java b/samza-core/src/main/java/org/apache/samza/operators/impl/ControlMessageSender.java
new file mode 100644
index 0000000..3bdc361
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/ControlMessageSender.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.operators.impl;
+
+import org.apache.samza.system.ControlMessage;
+import org.apache.samza.system.MessageType;
+import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.task.MessageCollector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * This is a helper class to broadcast control messages to each partition of an intermediate stream
+ */
+class ControlMessageSender {
+  private static final Logger LOG = LoggerFactory.getLogger(ControlMessageSender.class);
+
+  private final StreamMetadataCache metadataCache;
+
+  ControlMessageSender(StreamMetadataCache metadataCache) {
+    this.metadataCache = metadataCache;
+  }
+
+  void send(ControlMessage message, SystemStream systemStream, MessageCollector collector) {
+    SystemStreamMetadata metadata = metadataCache.getSystemStreamMetadata(systemStream, true);
+    int partitionCount = metadata.getSystemStreamPartitionMetadata().size();
+    LOG.info(String.format("Broadcast %s message from task %s to %s with %s partition",
+        MessageType.of(message).name(), message.getTaskName(), systemStream, partitionCount));
+
+    for (int i = 0; i < partitionCount; i++) {
+      OutgoingMessageEnvelope envelopeOut = new OutgoingMessageEnvelope(systemStream, i, null, message);
+      collector.send(envelopeOut);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/main/java/org/apache/samza/operators/impl/EndOfStreamStates.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/EndOfStreamStates.java b/samza-core/src/main/java/org/apache/samza/operators/impl/EndOfStreamStates.java
new file mode 100644
index 0000000..a69b234
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/EndOfStreamStates.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.operators.impl;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import org.apache.samza.system.EndOfStreamMessage;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamPartition;
+
+
+/**
+ * This class manages the end-of-stream state of the streams in a task. Internally it keeps track of end-of-stream
+ * messages received from upstream tasks for each system stream partition (ssp). If messages have been received from
+ * all tasks, it will mark the ssp as end-of-stream. For a stream to be end-of-stream, all its partitions assigned to
+ * the task need to be end-of-stream.
+ *
+ * This class is thread-safe.
+ */
+class EndOfStreamStates {
+
+  private static final class EndOfStreamState {
+    // set of upstream tasks
+    private final Set<String> tasks = new HashSet<>();
+    private final int expectedTotal;
+    private volatile boolean isEndOfStream = false;
+
+    EndOfStreamState(int expectedTotal) {
+      this.expectedTotal = expectedTotal;
+    }
+
+    synchronized void update(String taskName) {
+      if (taskName != null) {
+        tasks.add(taskName);
+      }
+      isEndOfStream = tasks.size() == expectedTotal;
+    }
+
+    boolean isEndOfStream() {
+      return isEndOfStream;
+    }
+  }
+
+  private final Map<SystemStreamPartition, EndOfStreamState> eosStates;
+
+  /**
+   * Constructing the end-of-stream states for a task
+   * @param ssps all the ssps assigned to this task
+   * @param producerTaskCounts mapping from a stream to the number of upstream tasks that produce to it
+   */
+  EndOfStreamStates(Set<SystemStreamPartition> ssps, Map<SystemStream, Integer> producerTaskCounts) {
+    Map<SystemStreamPartition, EndOfStreamState> states = new HashMap<>();
+    ssps.forEach(ssp -> {
+        states.put(ssp, new EndOfStreamState(producerTaskCounts.getOrDefault(ssp.getSystemStream(), 0)));
+      });
+    this.eosStates = Collections.unmodifiableMap(states);
+  }
+
+  /**
+   * Update the state upon receiving an end-of-stream message.
+   * @param eos message of {@link EndOfStreamMessage}
+   * @param ssp system stream partition
+   */
+  void update(EndOfStreamMessage eos, SystemStreamPartition ssp) {
+    EndOfStreamState state = eosStates.get(ssp);
+    state.update(eos.getTaskName());
+  }
+
+  boolean isEndOfStream(SystemStream systemStream) {
+    return eosStates.entrySet().stream()
+        .filter(entry -> entry.getKey().getSystemStream().equals(systemStream))
+        .allMatch(entry -> entry.getValue().isEndOfStream());
+  }
+
+  boolean allEndOfStream() {
+    return eosStates.values().stream().allMatch(EndOfStreamState::isEndOfStream);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
index 8f51c5f..eefd4eb 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
@@ -24,22 +24,29 @@ import java.util.HashSet;
 import java.util.Set;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.MetricsConfig;
-import org.apache.samza.control.Watermark;
-import org.apache.samza.control.WatermarkManager;
+import org.apache.samza.container.TaskContextImpl;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.operators.functions.WatermarkFunction;
+import org.apache.samza.system.EndOfStreamMessage;
 import org.apache.samza.metrics.Counter;
 import org.apache.samza.metrics.MetricsRegistry;
 import org.apache.samza.metrics.Timer;
 import org.apache.samza.operators.spec.OperatorSpec;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.system.WatermarkMessage;
 import org.apache.samza.task.MessageCollector;
 import org.apache.samza.task.TaskContext;
 import org.apache.samza.task.TaskCoordinator;
 import org.apache.samza.util.HighResolutionClock;
-
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Abstract base class for all stream operator implementations.
  */
 public abstract class OperatorImpl<M, RM> {
+  private static final Logger LOG = LoggerFactory.getLogger(OperatorImpl.class);
   private static final String METRICS_GROUP = OperatorImpl.class.getName();
 
   private boolean initialized;
@@ -48,11 +55,18 @@ public abstract class OperatorImpl<M, RM> {
   private Counter numMessage;
   private Timer handleMessageNs;
   private Timer handleTimerNs;
-  private long inputWatermarkTime = WatermarkManager.TIME_NOT_EXIST;
-  private long outputWatermarkTime = WatermarkManager.TIME_NOT_EXIST;
+  private long inputWatermark = WatermarkStates.WATERMARK_NOT_EXIST;
+  private long outputWatermark = WatermarkStates.WATERMARK_NOT_EXIST;
+  private TaskName taskName;
 
   Set<OperatorImpl<RM, ?>> registeredOperators;
   Set<OperatorImpl<?, M>> prevOperators;
+  Set<SystemStream> inputStreams;
+
+  // end-of-stream states
+  private EndOfStreamStates eosStates;
+  // watermark states
+  private WatermarkStates watermarkStates;
 
   /**
    * Initialize this {@link OperatorImpl} and its user-defined functions.
@@ -74,10 +88,16 @@ public abstract class OperatorImpl<M, RM> {
     this.highResClock = createHighResClock(config);
     registeredOperators = new HashSet<>();
     prevOperators = new HashSet<>();
+    inputStreams = new HashSet<>();
     MetricsRegistry metricsRegistry = context.getMetricsRegistry();
     this.numMessage = metricsRegistry.newCounter(METRICS_GROUP, opName + "-messages");
     this.handleMessageNs = metricsRegistry.newTimer(METRICS_GROUP, opName + "-handle-message-ns");
     this.handleTimerNs = metricsRegistry.newTimer(METRICS_GROUP, opName + "-handle-timer-ns");
+    this.taskName = context.getTaskName();
+
+    TaskContextImpl taskContext = (TaskContextImpl) context;
+    this.eosStates = (EndOfStreamStates) taskContext.fetchObject(EndOfStreamStates.class.getName());
+    this.watermarkStates = (WatermarkStates) taskContext.fetchObject(WatermarkStates.class.getName());
 
     handleInit(config, context);
 
@@ -111,6 +131,10 @@ public abstract class OperatorImpl<M, RM> {
     this.prevOperators.add(prevOperator);
   }
 
+  void registerInputStream(SystemStream input) {
+    this.inputStreams.add(input);
+  }
+
   /**
    * Handle the incoming {@code message} for this {@link OperatorImpl} and propagate results to registered operators.
    * <p>
@@ -128,6 +152,13 @@ public abstract class OperatorImpl<M, RM> {
     this.handleMessageNs.update(endNs - startNs);
 
     results.forEach(rm -> this.registeredOperators.forEach(op -> op.onMessage(rm, collector, coordinator)));
+
+    WatermarkFunction watermarkFn = getOperatorSpec().getWatermarkFn();
+    if (watermarkFn != null) {
+      // check whether there is new watermark emitted from the user function
+      Long outputWm = watermarkFn.getOutputWatermark();
+      propagateWatermark(outputWm, collector, coordinator);
+    }
   }
 
   /**
@@ -174,68 +205,149 @@ public abstract class OperatorImpl<M, RM> {
   }
 
   /**
-   * Populate the watermarks based on the following equations:
-   *
-   * <ul>
-   *   <li>InputWatermark(op) = min { OutputWatermark(op') | op1 is upstream of op}</li>
-   *   <li>OutputWatermark(op) = min { InputWatermark(op), OldestWorkTime(op) }</li>
-   * </ul>
-   *
-   * @param watermark incoming watermark
+   * Aggregate {@link EndOfStreamMessage} from each ssp of the stream.
+   * Invoke onEndOfStream() if the stream reaches the end.
+   * @param eos {@link EndOfStreamMessage} object
+   * @param ssp system stream partition
    * @param collector message collector
    * @param coordinator task coordinator
    */
-  public final void onWatermark(Watermark watermark,
-      MessageCollector collector,
+  public final void aggregateEndOfStream(EndOfStreamMessage eos, SystemStreamPartition ssp, MessageCollector collector,
       TaskCoordinator coordinator) {
+    LOG.info("Received end-of-stream message from task {} in {}", eos.getTaskName(), ssp);
+    eosStates.update(eos, ssp);
+
+    SystemStream stream = ssp.getSystemStream();
+    if (eosStates.isEndOfStream(stream)) {
+      LOG.info("Input {} reaches the end for task {}", stream.toString(), taskName.getTaskName());
+      onEndOfStream(collector, coordinator);
+
+      if (eosStates.allEndOfStream()) {
+        // all inputs have been end-of-stream, shut down the task
+        LOG.info("All input streams have reached the end for task {}", taskName.getTaskName());
+        coordinator.shutdown(TaskCoordinator.RequestScope.CURRENT_TASK);
+      }
+    }
+  }
+
+  /**
+   * Invoke handleEndOfStream() if all the input streams to the current operator reach the end.
+   * Propagate the end-of-stream to downstream operators.
+   * @param collector message collector
+   * @param coordinator task coordinator
+   */
+  private final void onEndOfStream(MessageCollector collector, TaskCoordinator coordinator) {
+    if (inputStreams.stream().allMatch(input -> eosStates.isEndOfStream(input))) {
+      handleEndOfStream(collector, coordinator);
+      this.registeredOperators.forEach(op -> op.onEndOfStream(collector, coordinator));
+    }
+  }
+
+  /**
+   * All input streams to this operator reach to the end.
+   * Inherited class should handle end-of-stream by overriding this function.
+   * By default noop implementation is for in-memory operator to handle the EOS. Output operator need to
+   * override this to actually propagate EOS over the wire.
+   * @param collector message collector
+   * @param coordinator task coordinator
+   */
+  protected void handleEndOfStream(MessageCollector collector, TaskCoordinator coordinator) {
+    //Do nothing by default
+  }
+
+  /**
+   * Aggregate the {@link WatermarkMessage} from each ssp into a watermark. Then call onWatermark() if
+   * a new watermark exits.
+   * @param watermarkMessage a {@link WatermarkMessage} object
+   * @param ssp {@link SystemStreamPartition} that the message is coming from.
+   * @param collector message collector
+   * @param coordinator task coordinator
+   */
+  public final void aggregateWatermark(WatermarkMessage watermarkMessage, SystemStreamPartition ssp,
+      MessageCollector collector, TaskCoordinator coordinator) {
+    LOG.debug("Received watermark {} from {}", watermarkMessage.getTimestamp(), ssp);
+    watermarkStates.update(watermarkMessage, ssp);
+    long watermark = watermarkStates.getWatermark(ssp.getSystemStream());
+    if (watermark != WatermarkStates.WATERMARK_NOT_EXIST) {
+      LOG.debug("Got watermark {} from stream {}", watermark, ssp.getSystemStream());
+      onWatermark(watermark, collector, coordinator);
+    }
+  }
+
+  /**
+   * A watermark comes from an upstream operator. This function decides whether we should update the
+   * input watermark based on the watermark time of all the previous operators, and then call handleWatermark()
+   * to let the inherited operator to act on it.
+   * @param watermark incoming watermark from an upstream operator
+   * @param collector message collector
+   * @param coordinator task coordinator
+   */
+  private final void onWatermark(long watermark, MessageCollector collector, TaskCoordinator coordinator) {
     final long inputWatermarkMin;
     if (prevOperators.isEmpty()) {
       // for input operator, use the watermark time coming from the source input
-      inputWatermarkMin = watermark.getTimestamp();
+      inputWatermarkMin = watermark;
     } else {
-      // InputWatermark(op) = min { OutputWatermark(op') | op1 is upstream of op}
-      inputWatermarkMin = prevOperators.stream().map(op -> op.getOutputWatermarkTime()).min(Long::compare).get();
+      // InputWatermark(op) = min { OutputWatermark(op') | op' is upstream of op}
+      inputWatermarkMin = prevOperators.stream().map(op -> op.getOutputWatermark()).min(Long::compare).get();
     }
 
-    if (inputWatermarkTime < inputWatermarkMin) {
+    if (inputWatermark < inputWatermarkMin) {
       // advance the watermark time of this operator
-      inputWatermarkTime = inputWatermarkMin;
-      Watermark inputWatermark = watermark.copyWithTimestamp(inputWatermarkTime);
-      long oldestWorkTime = handleWatermark(inputWatermark, collector, coordinator);
-
-      // OutputWatermark(op) = min { InputWatermark(op), OldestWorkTime(op) }
-      long outputWatermarkMin = Math.min(inputWatermarkTime, oldestWorkTime);
-      if (outputWatermarkTime < outputWatermarkMin) {
-        // populate the watermark to downstream
-        outputWatermarkTime = outputWatermarkMin;
-        Watermark outputWatermark = watermark.copyWithTimestamp(outputWatermarkTime);
+      inputWatermark = inputWatermarkMin;
+      LOG.trace("Advance input watermark to {} in operator {}", inputWatermark, getOperatorName());
+
+      final Long outputWm;
+      WatermarkFunction watermarkFn = getOperatorSpec().getWatermarkFn();
+      if (watermarkFn != null) {
+        // user-overrided watermark handling here
+        watermarkFn.processWatermark(inputWatermark);
+        outputWm = watermarkFn.getOutputWatermark();
+      } else {
+        // use samza-provided watermark handling
+        // default is to propagate the input watermark
+        outputWm = handleWatermark(inputWatermark, collector, coordinator);
+      }
+
+      propagateWatermark(outputWm, collector, coordinator);
+    }
+  }
+
+  private void propagateWatermark(Long outputWm, MessageCollector collector, TaskCoordinator coordinator) {
+    if (outputWm != null) {
+      if (outputWatermark < outputWm) {
+        // advance the watermark
+        outputWatermark = outputWm;
+        LOG.debug("Advance output watermark to {} in operator {}", outputWatermark, getOperatorName());
         this.registeredOperators.forEach(op -> op.onWatermark(outputWatermark, collector, coordinator));
+      } else if (outputWatermark > outputWm) {
+        LOG.warn("Ignore watermark {} that is smaller than the previous watermark {}.", outputWm, outputWatermark);
       }
     }
   }
 
   /**
-   * Returns the oldest time of the envelops that haven't been processed by this operator
-   * Default implementation of handling watermark, which returns the input watermark time
-   * @param inputWatermark input watermark
+   * Handling of the input watermark and returns the output watermark.
+   * In-memory operator can override this to fire event-time triggers. Output operators need to override it
+   * so it can propagate watermarks over the wire. By default it simply returns the input watermark.
+   * @param inputWatermark  input watermark
    * @param collector message collector
    * @param coordinator task coordinator
-   * @return time of oldest processing envelope
+   * @return output watermark, or null if the output watermark should not be updated.
    */
-  protected long handleWatermark(Watermark inputWatermark,
-      MessageCollector collector,
-      TaskCoordinator coordinator) {
-    return inputWatermark.getTimestamp();
+  protected Long handleWatermark(long inputWatermark, MessageCollector collector, TaskCoordinator coordinator) {
+    // Default is no handling. Simply pass on the input watermark as output.
+    return inputWatermark;
   }
 
-  /* package private */
-  long getInputWatermarkTime() {
-    return this.inputWatermarkTime;
+  /* package private for testing */
+  final long getInputWatermark() {
+    return this.inputWatermark;
   }
 
-  /* package private */
-  long getOutputWatermarkTime() {
-    return this.outputWatermarkTime;
+  /* package private for testing */
+  final long getOutputWatermark() {
+    return this.outputWatermark;
   }
 
   public void close() {


[2/3] samza git commit: SAMZA-1386: Inline End-of-stream and Watermark logic inside OperatorImpl

Posted by xi...@apache.org.
http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
index 99496eb..9b747bc 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
@@ -18,9 +18,14 @@
  */
 package org.apache.samza.operators.impl;
 
+import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Multimap;
+import java.util.stream.Collectors;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.samza.config.Config;
+import org.apache.samza.container.TaskContextImpl;
+import org.apache.samza.job.model.JobModel;
 import org.apache.samza.operators.StreamGraphImpl;
 import org.apache.samza.operators.functions.JoinFunction;
 import org.apache.samza.operators.functions.PartialJoinFunction;
@@ -36,6 +41,8 @@ import org.apache.samza.storage.kv.KeyValueStore;
 import org.apache.samza.system.SystemStream;
 import org.apache.samza.task.TaskContext;
 import org.apache.samza.util.Clock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Collection;
@@ -50,6 +57,7 @@ import java.util.Map;
  * The DAG of {@link OperatorImpl}s corresponding to the DAG of {@link OperatorSpec}s.
  */
 public class OperatorImplGraph {
+  private static final Logger LOG = LoggerFactory.getLogger(OperatorImplGraph.class);
 
   /**
    * A mapping from operator names to their {@link OperatorImpl}s in this graph. Used to avoid creating
@@ -84,10 +92,27 @@ public class OperatorImplGraph {
    */
   public OperatorImplGraph(StreamGraphImpl streamGraph, Config config, TaskContext context, Clock clock) {
     this.clock = clock;
+
+    TaskContextImpl taskContext = (TaskContextImpl) context;
+    Map<SystemStream, Integer> producerTaskCounts = hasIntermediateStreams(streamGraph) ?
+        getProducerTaskCountForIntermediateStreams(getStreamToConsumerTasks(taskContext.getJobModel()),
+            getIntermediateToInputStreamsMap(streamGraph)) :
+        Collections.EMPTY_MAP;
+    producerTaskCounts.forEach((stream, count) -> {
+        LOG.info("{} has {} producer tasks.", stream, count);
+      });
+
+    // set states for end-of-stream
+    taskContext.registerObject(EndOfStreamStates.class.getName(),
+        new EndOfStreamStates(context.getSystemStreamPartitions(), producerTaskCounts));
+    // set states for watermark
+    taskContext.registerObject(WatermarkStates.class.getName(),
+        new WatermarkStates(context.getSystemStreamPartitions(), producerTaskCounts));
+
     streamGraph.getInputOperators().forEach((streamSpec, inputOpSpec) -> {
         SystemStream systemStream = new SystemStream(streamSpec.getSystemName(), streamSpec.getPhysicalName());
         InputOperatorImpl inputOperatorImpl =
-            (InputOperatorImpl) createAndRegisterOperatorImpl(null, inputOpSpec, config, context);
+            (InputOperatorImpl) createAndRegisterOperatorImpl(null, inputOpSpec, systemStream, config, context);
         this.inputOperators.put(systemStream, inputOperatorImpl);
       });
   }
@@ -128,17 +153,18 @@ public class OperatorImplGraph {
    * @return  the operator implementation for the operatorSpec
    */
   OperatorImpl createAndRegisterOperatorImpl(OperatorSpec prevOperatorSpec, OperatorSpec operatorSpec,
-      Config config, TaskContext context) {
+      SystemStream inputStream, Config config, TaskContext context) {
     if (!operatorImpls.containsKey(operatorSpec.getOpName()) || operatorSpec instanceof JoinOperatorSpec) {
       // Either this is the first time we've seen this operatorSpec, or this is a join operator spec
       // and we need to create 2 partial join operator impls for it. Initialize and register the sub-DAG.
       OperatorImpl operatorImpl = createOperatorImpl(prevOperatorSpec, operatorSpec, config, context);
       operatorImpl.init(config, context);
+      operatorImpl.registerInputStream(inputStream);
       operatorImpls.put(operatorImpl.getOperatorName(), operatorImpl);
 
       Collection<OperatorSpec> registeredSpecs = operatorSpec.getRegisteredOperatorSpecs();
       registeredSpecs.forEach(registeredSpec -> {
-          OperatorImpl nextImpl = createAndRegisterOperatorImpl(operatorSpec, registeredSpec, config, context);
+          OperatorImpl nextImpl = createAndRegisterOperatorImpl(operatorSpec, registeredSpec, inputStream, config, context);
           operatorImpl.registerNextOperator(nextImpl);
         });
       return operatorImpl;
@@ -246,4 +272,70 @@ public class OperatorImplGraph {
       }
     };
   }
+
+  private boolean hasIntermediateStreams(StreamGraphImpl streamGraph) {
+    return !Collections.disjoint(streamGraph.getInputOperators().keySet(), streamGraph.getOutputStreams().keySet());
+  }
+
+  /**
+   * calculate the task count that produces to each intermediate streams
+   * @param streamToConsumerTasks input streams to task mapping
+   * @param intermediateToInputStreams intermediate stream to input streams mapping
+   * @return mapping from intermediate stream to task count
+   */
+  static Map<SystemStream, Integer> getProducerTaskCountForIntermediateStreams(
+      Multimap<SystemStream, String> streamToConsumerTasks,
+      Multimap<SystemStream, SystemStream> intermediateToInputStreams) {
+    Map<SystemStream, Integer> result = new HashMap<>();
+    intermediateToInputStreams.asMap().entrySet().forEach(entry -> {
+        result.put(entry.getKey(),
+            entry.getValue().stream()
+                .flatMap(systemStream -> streamToConsumerTasks.get(systemStream).stream())
+                .collect(Collectors.toSet()).size());
+      });
+    return result;
+  }
+
+  /**
+   * calculate the mapping from input streams to consumer tasks
+   * @param jobModel JobModel object
+   * @return mapping from input stream to tasks
+   */
+  static Multimap<SystemStream, String> getStreamToConsumerTasks(JobModel jobModel) {
+    Multimap<SystemStream, String> streamToConsumerTasks = HashMultimap.create();
+    jobModel.getContainers().values().forEach(containerModel -> {
+        containerModel.getTasks().values().forEach(taskModel -> {
+            taskModel.getSystemStreamPartitions().forEach(ssp -> {
+                streamToConsumerTasks.put(ssp.getSystemStream(), taskModel.getTaskName().getTaskName());
+              });
+          });
+      });
+    return streamToConsumerTasks;
+  }
+
+  /**
+   * calculate the mapping from output streams to input streams
+   * @param streamGraph the user {@link StreamGraphImpl} instance
+   * @return mapping from output streams to input streams
+   */
+  static Multimap<SystemStream, SystemStream> getIntermediateToInputStreamsMap(StreamGraphImpl streamGraph) {
+    Multimap<SystemStream, SystemStream> outputToInputStreams = HashMultimap.create();
+    streamGraph.getInputOperators().entrySet().stream()
+        .forEach(
+            entry -> computeOutputToInput(entry.getKey().toSystemStream(), entry.getValue(), outputToInputStreams));
+    return outputToInputStreams;
+  }
+
+  private static void computeOutputToInput(SystemStream input, OperatorSpec opSpec,
+      Multimap<SystemStream, SystemStream> outputToInputStreams) {
+    if (opSpec instanceof OutputOperatorSpec) {
+      OutputOperatorSpec outputOpSpec = (OutputOperatorSpec) opSpec;
+      if (outputOpSpec.getOpCode() == OperatorSpec.OpCode.PARTITION_BY) {
+        outputToInputStreams.put(outputOpSpec.getOutputStream().getStreamSpec().toSystemStream(), input);
+      }
+    } else {
+      Collection<OperatorSpec> nextOperators = opSpec.getRegisteredOperatorSpecs();
+      nextOperators.forEach(spec -> computeOutputToInput(input, spec, outputToInputStreams));
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/main/java/org/apache/samza/operators/impl/OutputOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/OutputOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/OutputOperatorImpl.java
index f212b3e..205bba6 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/OutputOperatorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/OutputOperatorImpl.java
@@ -21,12 +21,16 @@ package org.apache.samza.operators.impl;
 import java.util.Collection;
 import java.util.Collections;
 import org.apache.samza.config.Config;
-import org.apache.samza.control.Watermark;
+import org.apache.samza.container.TaskContextImpl;
+import org.apache.samza.system.ControlMessage;
+import org.apache.samza.system.EndOfStreamMessage;
 import org.apache.samza.operators.spec.OperatorSpec;
 import org.apache.samza.operators.spec.OutputOperatorSpec;
 import org.apache.samza.operators.spec.OutputStreamImpl;
 import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.apache.samza.system.StreamMetadataCache;
 import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.WatermarkMessage;
 import org.apache.samza.task.MessageCollector;
 import org.apache.samza.task.TaskContext;
 import org.apache.samza.task.TaskCoordinator;
@@ -39,10 +43,16 @@ class OutputOperatorImpl<M> extends OperatorImpl<M, Void> {
 
   private final OutputOperatorSpec<M> outputOpSpec;
   private final OutputStreamImpl<?, ?, M> outputStream;
+  private final String taskName;
+  private final ControlMessageSender controlMessageSender;
 
   OutputOperatorImpl(OutputOperatorSpec<M> outputOpSpec, Config config, TaskContext context) {
     this.outputOpSpec = outputOpSpec;
     this.outputStream = outputOpSpec.getOutputStream();
+    this.taskName = context.getTaskName().getTaskName();
+
+    StreamMetadataCache streamMetadataCache = ((TaskContextImpl) context).getStreamMetadataCache();
+    this.controlMessageSender = new ControlMessageSender(streamMetadataCache);
   }
 
   @Override
@@ -71,12 +81,22 @@ class OutputOperatorImpl<M> extends OperatorImpl<M, Void> {
   }
 
   @Override
-  protected long handleWatermark(Watermark inputWatermark,
-      MessageCollector collector,
-      TaskCoordinator coordinator) {
+  protected void handleEndOfStream(MessageCollector collector, TaskCoordinator coordinator) {
     if (outputOpSpec.getOpCode() == OperatorSpec.OpCode.PARTITION_BY) {
-      inputWatermark.propagate(outputStream.getStreamSpec().toSystemStream());
+      sendControlMessage(new EndOfStreamMessage(taskName), collector);
     }
-    return inputWatermark.getTimestamp();
+  }
+
+  @Override
+  protected Long handleWatermark(long watermark, MessageCollector collector, TaskCoordinator coordinator) {
+    if (outputOpSpec.getOpCode() == OperatorSpec.OpCode.PARTITION_BY) {
+      sendControlMessage(new WatermarkMessage(watermark, taskName), collector);
+    }
+    return watermark;
+  }
+
+  private void sendControlMessage(ControlMessage message, MessageCollector collector) {
+    SystemStream outputStream = outputOpSpec.getOutputStream().getStreamSpec().toSystemStream();
+    controlMessageSender.send(message, outputStream, collector);
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/main/java/org/apache/samza/operators/impl/WatermarkStates.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/WatermarkStates.java b/samza-core/src/main/java/org/apache/samza/operators/impl/WatermarkStates.java
new file mode 100644
index 0000000..0295626
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/WatermarkStates.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.operators.impl;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.system.WatermarkMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class manages the watermarks coming from input/intermediate streams in a task. Internally it keeps track
+ * of the latest watermark timestamp from each upstream task, and use the min as the consolidated watermark time.
+ *
+ * This class is thread-safe. However, having parallelism within a task may result in out-of-order processing
+ * and inaccurate watermarks. In this scenario, watermarks might be emitted before the previous messages fully processed.
+ */
+class WatermarkStates {
+  private static final Logger LOG = LoggerFactory.getLogger(WatermarkStates.class);
+
+  public static final long WATERMARK_NOT_EXIST = -1;
+
+  private final static class WatermarkState {
+    private final int expectedTotal;
+    private final Map<String, Long> timestamps = new HashMap<>();
+    private volatile long watermarkTime = WATERMARK_NOT_EXIST;
+
+    WatermarkState(int expectedTotal) {
+      this.expectedTotal = expectedTotal;
+    }
+
+    synchronized void update(long timestamp, String taskName) {
+      if (taskName != null) {
+        Long ts = timestamps.get(taskName);
+        if (ts != null && ts > timestamp) {
+          LOG.warn(String.format("Incoming watermark %s is smaller than existing watermark %s for upstream task %s",
+              timestamp, ts, taskName));
+        } else {
+          timestamps.put(taskName, timestamp);
+        }
+      }
+
+      /**
+       * Check whether we got all the watermarks.
+       * At a sources, the expectedTotal is 0.
+       * For any intermediate streams, the expectedTotal is the upstream task count.
+       */
+      if (timestamps.size() == expectedTotal) {
+        Optional<Long> min = timestamps.values().stream().min(Long::compare);
+        watermarkTime = min.orElse(timestamp);
+      }
+    }
+
+    long getWatermarkTime() {
+      return watermarkTime;
+    }
+  }
+
+  private final Map<SystemStreamPartition, WatermarkState> watermarkStates;
+
+  WatermarkStates(Set<SystemStreamPartition> ssps, Map<SystemStream, Integer> producerTaskCounts) {
+    Map<SystemStreamPartition, WatermarkState> states = new HashMap<>();
+    ssps.forEach(ssp -> {
+        states.put(ssp, new WatermarkState(producerTaskCounts.getOrDefault(ssp.getSystemStream(), 0)));
+      });
+    this.watermarkStates = Collections.unmodifiableMap(states);
+  }
+
+  /**
+   * Update the state upon receiving a watermark message.
+   * @param watermarkMessage message of {@link WatermarkMessage}
+   * @param ssp system stream partition
+   * @return true iff the stream has a new watermark
+   */
+  void update(WatermarkMessage watermarkMessage, SystemStreamPartition ssp) {
+    WatermarkState state = watermarkStates.get(ssp);
+    if (state != null) {
+      state.update(watermarkMessage.getTimestamp(), watermarkMessage.getTaskName());
+    } else {
+      LOG.error("SSP {} doesn't have watermark states", ssp);
+    }
+  }
+
+  long getWatermark(SystemStream systemStream) {
+    return watermarkStates.entrySet().stream()
+        .filter(entry -> entry.getKey().getSystemStream().equals(systemStream))
+        .map(entry -> entry.getValue().getWatermarkTime())
+        .min(Long::compare)
+        .orElse(WATERMARK_NOT_EXIST);
+  }
+
+  /* package private for testing */
+  long getWatermarkPerSSP(SystemStreamPartition ssp) {
+    return watermarkStates.get(ssp).getWatermarkTime();
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/main/java/org/apache/samza/operators/spec/InputOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/InputOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/InputOperatorSpec.java
index 6fbc3c1..773f742 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/InputOperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/InputOperatorSpec.java
@@ -19,6 +19,7 @@
 package org.apache.samza.operators.spec;
 
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.samza.operators.functions.WatermarkFunction;
 import org.apache.samza.system.StreamSpec;
 
 import java.util.function.BiFunction;
@@ -49,4 +50,9 @@ public class InputOperatorSpec<K, V, M> extends OperatorSpec<Pair<K, V>, M> {
   public BiFunction<K, V, M> getMsgBuilder() {
     return this.msgBuilder;
   }
+
+  @Override
+  public WatermarkFunction getWatermarkFn() {
+    return null;
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/main/java/org/apache/samza/operators/spec/JoinOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/JoinOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/JoinOperatorSpec.java
index 16f59d7..f4fe0fd 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/JoinOperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/JoinOperatorSpec.java
@@ -19,6 +19,7 @@
 package org.apache.samza.operators.spec;
 
 import org.apache.samza.operators.functions.JoinFunction;
+import org.apache.samza.operators.functions.WatermarkFunction;
 
 
 /**
@@ -70,4 +71,9 @@ public class JoinOperatorSpec<K, M, JM, RM> extends OperatorSpec<Object, RM> { /
   public long getTtlMs() {
     return ttlMs;
   }
+
+  @Override
+  public WatermarkFunction getWatermarkFn() {
+    return joinFn instanceof WatermarkFunction ? (WatermarkFunction) joinFn : null;
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java
index f64e123..4047d92 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java
@@ -19,6 +19,7 @@
 package org.apache.samza.operators.spec;
 
 import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.operators.functions.WatermarkFunction;
 
 import java.util.Collection;
 import java.util.LinkedHashSet;
@@ -118,4 +119,6 @@ public abstract class OperatorSpec<M, OM> {
   public final String getOpName() {
     return String.format("%s-%s", getOpCode().name().toLowerCase(), getOpId());
   }
+
+  abstract public WatermarkFunction getWatermarkFn();
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/main/java/org/apache/samza/operators/spec/OutputOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/OutputOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/OutputOperatorSpec.java
index e6767ec..9759392 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/OutputOperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/OutputOperatorSpec.java
@@ -19,6 +19,8 @@
 package org.apache.samza.operators.spec;
 
 
+import org.apache.samza.operators.functions.WatermarkFunction;
+
 /**
  * The spec for an operator that outputs a {@link org.apache.samza.operators.MessageStream} to a
  * {@link org.apache.samza.system.SystemStream}.
@@ -52,4 +54,9 @@ public class OutputOperatorSpec<M> extends OperatorSpec<M, Void> {
   public OutputStreamImpl<?, ?, M> getOutputStream() {
     return this.outputStream;
   }
+
+  @Override
+  public WatermarkFunction getWatermarkFn() {
+    return null;
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java
index 2b55d95..1145be8 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java
@@ -19,6 +19,7 @@
 package org.apache.samza.operators.spec;
 
 import org.apache.samza.operators.functions.SinkFunction;
+import org.apache.samza.operators.functions.WatermarkFunction;
 
 
 /**
@@ -48,4 +49,9 @@ public class SinkOperatorSpec<M> extends OperatorSpec<M, Void> {
   public SinkFunction<M> getSinkFn() {
     return this.sinkFn;
   }
+
+  @Override
+  public WatermarkFunction getWatermarkFn() {
+    return sinkFn instanceof WatermarkFunction ? (WatermarkFunction) sinkFn : null;
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java
index 1f2f683..aace2e2 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java
@@ -19,6 +19,7 @@
 package org.apache.samza.operators.spec;
 
 import org.apache.samza.operators.functions.FlatMapFunction;
+import org.apache.samza.operators.functions.WatermarkFunction;
 
 
 /**
@@ -46,4 +47,9 @@ public class StreamOperatorSpec<M, OM> extends OperatorSpec<M, OM> {
   public FlatMapFunction<M, OM> getTransformFn() {
     return this.transformFn;
   }
+
+  @Override
+  public WatermarkFunction getWatermarkFn() {
+    return transformFn instanceof WatermarkFunction ? (WatermarkFunction) transformFn : null;
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java
index 0937499..75f1427 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java
@@ -19,6 +19,8 @@
 
 package org.apache.samza.operators.spec;
 
+import org.apache.samza.operators.functions.FoldLeftFunction;
+import org.apache.samza.operators.functions.WatermarkFunction;
 import org.apache.samza.operators.triggers.AnyTrigger;
 import org.apache.samza.operators.triggers.RepeatingTrigger;
 import org.apache.samza.operators.triggers.TimeBasedTrigger;
@@ -109,4 +111,10 @@ public class WindowOperatorSpec<M, WK, WV> extends OperatorSpec<M, WindowPane<WK
     }
     return timeBasedTriggers;
   }
+
+  @Override
+  public WatermarkFunction getWatermarkFn() {
+    FoldLeftFunction fn = window.getFoldLeftFunction();
+    return fn instanceof WatermarkFunction ? (WatermarkFunction) fn : null;
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/main/java/org/apache/samza/serializers/IntermediateMessageSerde.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/serializers/IntermediateMessageSerde.java b/samza-core/src/main/java/org/apache/samza/serializers/IntermediateMessageSerde.java
index 0b98ec6..2ed559f 100644
--- a/samza-core/src/main/java/org/apache/samza/serializers/IntermediateMessageSerde.java
+++ b/samza-core/src/main/java/org/apache/samza/serializers/IntermediateMessageSerde.java
@@ -21,9 +21,9 @@ package org.apache.samza.serializers;
 
 import java.util.Arrays;
 import org.apache.samza.SamzaException;
-import org.apache.samza.message.EndOfStreamMessage;
-import org.apache.samza.message.IntermediateMessageType;
-import org.apache.samza.message.WatermarkMessage;
+import org.apache.samza.system.EndOfStreamMessage;
+import org.apache.samza.system.MessageType;
+import org.apache.samza.system.WatermarkMessage;
 import org.codehaus.jackson.type.TypeReference;
 
 
@@ -86,16 +86,16 @@ public class IntermediateMessageSerde implements Serde<Object> {
   public Object fromBytes(byte[] bytes) {
     try {
       final Object object;
-      final IntermediateMessageType type = IntermediateMessageType.values()[bytes[0]];
+      final MessageType type = MessageType.values()[bytes[0]];
       final byte [] data = Arrays.copyOfRange(bytes, 1, bytes.length);
       switch (type) {
         case USER_MESSAGE:
           object = userMessageSerde.fromBytes(data);
           break;
-        case WATERMARK_MESSAGE:
+        case WATERMARK:
           object = watermarkSerde.fromBytes(data);
           break;
-        case END_OF_STREAM_MESSAGE:
+        case END_OF_STREAM:
           object = eosSerde.fromBytes(data);
           break;
         default:
@@ -117,15 +117,15 @@ public class IntermediateMessageSerde implements Serde<Object> {
   @Override
   public byte[] toBytes(Object object) {
     final byte [] data;
-    final IntermediateMessageType type = IntermediateMessageType.of(object);
+    final MessageType type = MessageType.of(object);
     switch (type) {
       case USER_MESSAGE:
         data = userMessageSerde.toBytes(object);
         break;
-      case WATERMARK_MESSAGE:
+      case WATERMARK:
         data = watermarkSerde.toBytes((WatermarkMessage) object);
         break;
-      case END_OF_STREAM_MESSAGE:
+      case END_OF_STREAM:
         data = eosSerde.toBytes((EndOfStreamMessage) object);
         break;
       default:

http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/main/java/org/apache/samza/task/AsyncStreamTaskAdapter.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/task/AsyncStreamTaskAdapter.java b/samza-core/src/main/java/org/apache/samza/task/AsyncStreamTaskAdapter.java
index e57a89f..e2fea95 100644
--- a/samza-core/src/main/java/org/apache/samza/task/AsyncStreamTaskAdapter.java
+++ b/samza-core/src/main/java/org/apache/samza/task/AsyncStreamTaskAdapter.java
@@ -21,11 +21,7 @@ package org.apache.samza.task;
 
 import java.util.concurrent.ExecutorService;
 import org.apache.samza.config.Config;
-import org.apache.samza.control.ControlMessageListenerTask;
-import org.apache.samza.control.Watermark;
-import org.apache.samza.control.IOGraph;
 import org.apache.samza.system.IncomingMessageEnvelope;
-import org.apache.samza.system.SystemStream;
 
 
 /**
@@ -34,7 +30,7 @@ import org.apache.samza.system.SystemStream;
  * the callbacks once it's done. If the thread pool is null, it follows the legacy
  * synchronous model to execute the tasks on the run loop thread.
  */
-public class AsyncStreamTaskAdapter implements AsyncStreamTask, InitableTask, WindowableTask, ClosableTask, EndOfStreamListenerTask, ControlMessageListenerTask {
+public class AsyncStreamTaskAdapter implements AsyncStreamTask, InitableTask, WindowableTask, ClosableTask, EndOfStreamListenerTask {
   private final StreamTask wrappedTask;
   private final ExecutorService executor;
 
@@ -100,20 +96,4 @@ public class AsyncStreamTaskAdapter implements AsyncStreamTask, InitableTask, Wi
       ((EndOfStreamListenerTask) wrappedTask).onEndOfStream(collector, coordinator);
     }
   }
-
-  @Override
-  public IOGraph getIOGraph() {
-    if (wrappedTask instanceof ControlMessageListenerTask) {
-      return ((ControlMessageListenerTask) wrappedTask).getIOGraph();
-    }
-    return null;
-  }
-
-  @Override
-  public void onWatermark(Watermark watermark, SystemStream stream, MessageCollector collector, TaskCoordinator coordinator) {
-    if (wrappedTask instanceof ControlMessageListenerTask) {
-      ((ControlMessageListenerTask) wrappedTask).onWatermark(watermark, stream, collector, coordinator);
-    }
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java b/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
index 16b7e40..0074e24 100644
--- a/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
+++ b/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
@@ -21,25 +21,28 @@ package org.apache.samza.task;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.samza.application.StreamApplication;
 import org.apache.samza.config.Config;
-import org.apache.samza.control.ControlMessageListenerTask;
-import org.apache.samza.control.Watermark;
+import org.apache.samza.system.EndOfStreamMessage;
+import org.apache.samza.system.MessageType;
 import org.apache.samza.operators.ContextManager;
 import org.apache.samza.operators.StreamGraphImpl;
 import org.apache.samza.operators.impl.InputOperatorImpl;
 import org.apache.samza.operators.impl.OperatorImplGraph;
-import org.apache.samza.control.IOGraph;
 import org.apache.samza.runtime.ApplicationRunner;
 import org.apache.samza.system.IncomingMessageEnvelope;
 import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.WatermarkMessage;
 import org.apache.samza.util.Clock;
 import org.apache.samza.util.SystemClock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
  * A {@link StreamTask} implementation that brings all the operator API implementation components together and
  * feeds the input messages into the user-defined transformation chains in {@link StreamApplication}.
  */
-public final class StreamOperatorTask implements StreamTask, InitableTask, WindowableTask, ClosableTask, ControlMessageListenerTask {
+public final class StreamOperatorTask implements StreamTask, InitableTask, WindowableTask, ClosableTask {
+  private static final Logger LOG = LoggerFactory.getLogger(StreamOperatorTask.class);
 
   private final StreamApplication streamApplication;
   private final ApplicationRunner runner;
@@ -47,7 +50,6 @@ public final class StreamOperatorTask implements StreamTask, InitableTask, Windo
 
   private OperatorImplGraph operatorImplGraph;
   private ContextManager contextManager;
-  private IOGraph ioGraph;
 
   /**
    * Constructs an adaptor task to run the user-implemented {@link StreamApplication}.
@@ -91,7 +93,6 @@ public final class StreamOperatorTask implements StreamTask, InitableTask, Windo
 
     // create the operator impl DAG corresponding to the logical operator spec DAG
     this.operatorImplGraph = new OperatorImplGraph(streamGraph, config, context, clock);
-    this.ioGraph = streamGraph.toIOGraph();
   }
 
   /**
@@ -110,7 +111,21 @@ public final class StreamOperatorTask implements StreamTask, InitableTask, Windo
     SystemStream systemStream = ime.getSystemStreamPartition().getSystemStream();
     InputOperatorImpl inputOpImpl = operatorImplGraph.getInputOperator(systemStream);
     if (inputOpImpl != null) {
-      inputOpImpl.onMessage(Pair.of(ime.getKey(), ime.getMessage()), collector, coordinator);
+      switch (MessageType.of(ime.getMessage())) {
+        case USER_MESSAGE:
+          inputOpImpl.onMessage(Pair.of(ime.getKey(), ime.getMessage()), collector, coordinator);
+          break;
+
+        case END_OF_STREAM:
+          EndOfStreamMessage eosMessage = (EndOfStreamMessage) ime.getMessage();
+          inputOpImpl.aggregateEndOfStream(eosMessage, ime.getSystemStreamPartition(), collector, coordinator);
+          break;
+
+        case WATERMARK:
+          WatermarkMessage watermarkMessage = (WatermarkMessage) ime.getMessage();
+          inputOpImpl.aggregateWatermark(watermarkMessage, ime.getSystemStreamPartition(), collector, coordinator);
+          break;
+      }
     }
   }
 
@@ -121,22 +136,6 @@ public final class StreamOperatorTask implements StreamTask, InitableTask, Windo
   }
 
   @Override
-  public IOGraph getIOGraph() {
-    return ioGraph;
-  }
-
-  @Override
-  public final void onWatermark(Watermark watermark,
-      SystemStream systemStream,
-      MessageCollector collector,
-      TaskCoordinator coordinator) {
-    InputOperatorImpl inputOpImpl = operatorImplGraph.getInputOperator(systemStream);
-    if (inputOpImpl != null) {
-      inputOpImpl.onWatermark(watermark, collector, coordinator);
-    }
-  }
-
-  @Override
   public void close() throws Exception {
     if (this.contextManager != null) {
       this.contextManager.close();

http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala b/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
index 8c739d4..1b2ce80 100644
--- a/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
+++ b/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
@@ -24,6 +24,7 @@ package org.apache.samza.checkpoint
 import java.util.HashMap
 import java.util.concurrent.ConcurrentHashMap
 
+import org.apache.samza.system.IncomingMessageEnvelope
 import org.apache.samza.SamzaException
 import org.apache.samza.config.Config
 import org.apache.samza.config.StreamConfig.Config2Stream
@@ -193,7 +194,7 @@ class OffsetManager(
    */
   def update(taskName: TaskName, systemStreamPartition: SystemStreamPartition, offset: String) {
     lastProcessedOffsets.putIfAbsent(taskName, new ConcurrentHashMap[SystemStreamPartition, String]())
-    if (offset != null) {
+    if (offset != null && !offset.equals(IncomingMessageEnvelope.END_OF_STREAM_OFFSET)) {
       lastProcessedOffsets.get(taskName).put(systemStreamPartition, offset)
     }
   }
@@ -216,6 +217,10 @@ class OffsetManager(
     }
   }
 
+  def setStartingOffset(taskName: TaskName, ssp: SystemStreamPartition, offset: String): Unit = {
+    startingOffsets += taskName -> (startingOffsets(taskName) + (ssp -> offset))
+  }
+
   /**
     * Gets a snapshot of all the current offsets for the specified task. This is useful to
     * ensure there are no concurrent updates to the offsets between when this method is

http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
index 5a19d90..acec365 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
@@ -20,25 +20,17 @@
 package org.apache.samza.container
 
 
-import com.google.common.collect.HashMultimap
-import com.google.common.collect.Multimap
 import org.apache.samza.SamzaException
 import org.apache.samza.checkpoint.OffsetManager
 import org.apache.samza.config.Config
 import org.apache.samza.config.StreamConfig.Config2Stream
-import org.apache.samza.control.ControlMessageListenerTask
-import org.apache.samza.control.ControlMessageUtils
-import org.apache.samza.control.EndOfStreamManager
-import org.apache.samza.control.WatermarkManager
 import org.apache.samza.job.model.JobModel
-import org.apache.samza.message.MessageType
 import org.apache.samza.metrics.MetricsReporter
 import org.apache.samza.storage.TaskStorageManager
 import org.apache.samza.system.IncomingMessageEnvelope
 import org.apache.samza.system.StreamMetadataCache
 import org.apache.samza.system.SystemAdmin
 import org.apache.samza.system.SystemConsumers
-import org.apache.samza.system.SystemStream
 import org.apache.samza.system.SystemStreamPartition
 import org.apache.samza.task.AsyncStreamTask
 import org.apache.samza.task.ClosableTask
@@ -47,34 +39,11 @@ import org.apache.samza.task.InitableTask
 import org.apache.samza.task.ReadableCoordinator
 import org.apache.samza.task.StreamTask
 import org.apache.samza.task.TaskCallbackFactory
-import org.apache.samza.task.TaskContext
 import org.apache.samza.task.TaskInstanceCollector
 import org.apache.samza.task.WindowableTask
 import org.apache.samza.util.Logging
 
 import scala.collection.JavaConverters._
-import scala.collection.JavaConversions._
-
-object TaskInstance {
-  /**
-   * Build a map from a stream to its consumer tasks
-   * @param jobModel job model which contains ssp-to-task assignment
-   * @return the map of input stream to tasks
-   */
-  def buildInputToTasks(jobModel: JobModel): Multimap[SystemStream, String] = {
-    val streamToTasks: Multimap[SystemStream, String] = HashMultimap.create[SystemStream, String]
-    if (jobModel != null) {
-      for (containerModel <- jobModel.getContainers.values) {
-        for (taskModel <- containerModel.getTasks.values) {
-          for (ssp <- taskModel.getSystemStreamPartitions) {
-            streamToTasks.put(ssp.getSystemStream, taskModel.getTaskName.toString)
-          }
-        }
-      }
-    }
-    return streamToTasks
-  }
-}
 
 class TaskInstance(
   val task: Any,
@@ -97,35 +66,10 @@ class TaskInstance(
   val isEndOfStreamListenerTask = task.isInstanceOf[EndOfStreamListenerTask]
   val isClosableTask = task.isInstanceOf[ClosableTask]
   val isAsyncTask = task.isInstanceOf[AsyncStreamTask]
-  val isControlMessageListener = task.isInstanceOf[ControlMessageListenerTask]
-
-  val context = new TaskContext {
-    var userContext: Object = null;
-    def getMetricsRegistry = metrics.registry
-    def getSystemStreamPartitions = systemStreamPartitions.asJava
-    def getStore(storeName: String) = if (storageManager != null) {
-      storageManager(storeName)
-    } else {
-      warn("No store found for name: %s" format storeName)
 
-      null
-    }
-    def getTaskName = taskName
-    def getSamzaContainerContext = containerContext
+  val context = new TaskContextImpl(taskName,metrics, containerContext, systemStreamPartitions.asJava, offsetManager,
+                                    storageManager, jobModel, streamMetadataCache)
 
-    override def setStartingOffset(ssp: SystemStreamPartition, offset: String): Unit = {
-      val startingOffsets = offsetManager.startingOffsets
-      offsetManager.startingOffsets += taskName -> (startingOffsets(taskName) + (ssp -> offset))
-    }
-
-    override def setUserContext(context: Object): Unit = {
-      userContext = context
-    }
-
-    override def getUserContext: Object = {
-      userContext
-    }
-  }
   // store the (ssp -> if this ssp is catched up) mapping. "catched up"
   // means the same ssp in other taskInstances have the same offset as
   // the one here.
@@ -133,10 +77,6 @@ class TaskInstance(
     scala.collection.mutable.Map[SystemStreamPartition, Boolean]()
   systemStreamPartitions.foreach(ssp2CaughtupMapping += _ -> false)
 
-  val inputToTasksMapping = TaskInstance.buildInputToTasks(jobModel)
-  var endOfStreamManager: EndOfStreamManager = null
-  var watermarkManager: WatermarkManager = null
-
   val hasIntermediateStreams = config.getStreamIds.exists(config.getIsIntermediate(_))
 
   def registerMetrics {
@@ -169,22 +109,6 @@ class TaskInstance(
     } else {
       debug("Skipping task initialization for taskName: %s" format taskName)
     }
-
-    if (isControlMessageListener) {
-      endOfStreamManager = new EndOfStreamManager(taskName.getTaskName,
-                                                  task.asInstanceOf[ControlMessageListenerTask],
-                                                  inputToTasksMapping,
-                                                  systemStreamPartitions.asJava,
-                                                  streamMetadataCache,
-                                                  collector)
-
-      watermarkManager = new WatermarkManager(taskName.getTaskName,
-                                                  task.asInstanceOf[ControlMessageListenerTask],
-                                                  inputToTasksMapping,
-                                                  systemStreamPartitions.asJava,
-                                                  streamMetadataCache,
-                                                  collector)
-    }
   }
 
   def registerProducers {
@@ -223,51 +147,20 @@ class TaskInstance(
       trace("Processing incoming message envelope for taskName and SSP: %s, %s"
         format (taskName, envelope.getSystemStreamPartition))
 
-      MessageType.of(envelope.getMessage) match {
-        case MessageType.USER_MESSAGE =>
-          if (isAsyncTask) {
-            exceptionHandler.maybeHandle {
-             val callback = callbackFactory.createCallback()
-             task.asInstanceOf[AsyncStreamTask].processAsync(envelope, collector, coordinator, callback)
-            }
-          }
-          else {
-            exceptionHandler.maybeHandle {
-             task.asInstanceOf[StreamTask].process(envelope, collector, coordinator)
-            }
-
-            trace("Updating offset map for taskName, SSP and offset: %s, %s, %s"
-              format(taskName, envelope.getSystemStreamPartition, envelope.getOffset))
-
-            offsetManager.update(taskName, envelope.getSystemStreamPartition, envelope.getOffset)
-          }
+      if (isAsyncTask) {
+        exceptionHandler.maybeHandle {
+          val callback = callbackFactory.createCallback()
+          task.asInstanceOf[AsyncStreamTask].processAsync(envelope, collector, coordinator, callback)
+        }
+      } else {
+        exceptionHandler.maybeHandle {
+         task.asInstanceOf[StreamTask].process(envelope, collector, coordinator)
+        }
 
-        case MessageType.END_OF_STREAM =>
-          if (isControlMessageListener) {
-            // handle eos synchronously.
-            runSync(callbackFactory) {
-              endOfStreamManager.update(envelope, coordinator)
-            }
-          } else {
-            warn("Ignore end-of-stream message due to %s not implementing ControlMessageListener."
-              format(task.getClass.toString))
-          }
+        trace("Updating offset map for taskName, SSP and offset: %s, %s, %s"
+          format (taskName, envelope.getSystemStreamPartition, envelope.getOffset))
 
-        case MessageType.WATERMARK =>
-          if (isControlMessageListener) {
-            // handle watermark synchronously in the run loop thread.
-            // we might consider running it asynchronously later
-            runSync(callbackFactory) {
-              val watermark = watermarkManager.update(envelope)
-              if (watermark != null) {
-                val stream = envelope.getSystemStreamPartition.getSystemStream
-                task.asInstanceOf[ControlMessageListenerTask].onWatermark(watermark, stream, collector, coordinator)
-              }
-            }
-          } else {
-            warn("Ignore watermark message due to %s not implementing ControlMessageListener."
-              format(task.getClass.toString))
-          }
+        offsetManager.update(taskName, envelope.getSystemStreamPartition, envelope.getOffset)
       }
     }
   }
@@ -343,38 +236,32 @@ class TaskInstance(
    * it's already catched-up.
    */
   private def checkCaughtUp(envelope: IncomingMessageEnvelope) = {
-    systemAdmins match {
-      case null => {
-        warn("systemAdmin is null. Set all SystemStreamPartitions to catched-up")
-        ssp2CaughtupMapping(envelope.getSystemStreamPartition) = true
-      }
-      case others => {
-        val startingOffset = offsetManager.getStartingOffset(taskName, envelope.getSystemStreamPartition)
-            .getOrElse(throw new SamzaException("No offset defined for SystemStreamPartition: %s" format envelope.getSystemStreamPartition))
-        val system = envelope.getSystemStreamPartition.getSystem
-        others(system).offsetComparator(envelope.getOffset, startingOffset) match {
-          case null => {
-            info("offsets in " + system + " is not comparable. Set all SystemStreamPartitions to catched-up")
-            ssp2CaughtupMapping(envelope.getSystemStreamPartition) = true // not comparable
-          }
-          case result => {
-            if (result >= 0) {
-              info(envelope.getSystemStreamPartition.toString + " is catched up.")
-              ssp2CaughtupMapping(envelope.getSystemStreamPartition) = true
+    if (IncomingMessageEnvelope.END_OF_STREAM_OFFSET.equals(envelope.getOffset)) {
+      ssp2CaughtupMapping(envelope.getSystemStreamPartition) = true
+    } else {
+      systemAdmins match {
+        case null => {
+          warn("systemAdmin is null. Set all SystemStreamPartitions to catched-up")
+          ssp2CaughtupMapping(envelope.getSystemStreamPartition) = true
+        }
+        case others => {
+          val startingOffset = offsetManager.getStartingOffset(taskName, envelope.getSystemStreamPartition)
+              .getOrElse(throw new SamzaException("No offset defined for SystemStreamPartition: %s" format envelope.getSystemStreamPartition))
+          val system = envelope.getSystemStreamPartition.getSystem
+          others(system).offsetComparator(envelope.getOffset, startingOffset) match {
+            case null => {
+              info("offsets in " + system + " is not comparable. Set all SystemStreamPartitions to catched-up")
+              ssp2CaughtupMapping(envelope.getSystemStreamPartition) = true // not comparable
+            }
+            case result => {
+              if (result >= 0) {
+                info(envelope.getSystemStreamPartition.toString + " is catched up.")
+                ssp2CaughtupMapping(envelope.getSystemStreamPartition) = true
+              }
             }
           }
         }
       }
     }
   }
-
-  private def runSync(callbackFactory: TaskCallbackFactory)(runCodeBlock: => Unit) = {
-    val callback = callbackFactory.createCallback()
-    try {
-      runCodeBlock
-      callback.complete()
-    } catch {
-      case t: Throwable => callback.failure(t)
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/main/scala/org/apache/samza/serializers/SerdeManager.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/serializers/SerdeManager.scala b/samza-core/src/main/scala/org/apache/samza/serializers/SerdeManager.scala
index 0100c78..76594ae 100644
--- a/samza-core/src/main/scala/org/apache/samza/serializers/SerdeManager.scala
+++ b/samza-core/src/main/scala/org/apache/samza/serializers/SerdeManager.scala
@@ -20,12 +20,13 @@
 package org.apache.samza.serializers
 
 import org.apache.samza.SamzaException
-import org.apache.samza.message.ControlMessage
-import org.apache.samza.message.WatermarkMessage
+import org.apache.samza.system.ControlMessage
 import org.apache.samza.system.SystemStream
 import org.apache.samza.system.OutgoingMessageEnvelope
 import org.apache.samza.system.IncomingMessageEnvelope
 import org.apache.samza.config.StorageConfig
+import org.apache.samza.system.WatermarkMessage
+
 
 class SerdeManager(
   serdes: Map[String, Serde[Object]] = Map(),

http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/test/java/org/apache/samza/control/TestControlMessageUtils.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/control/TestControlMessageUtils.java b/samza-core/src/test/java/org/apache/samza/control/TestControlMessageUtils.java
deleted file mode 100644
index 8351802..0000000
--- a/samza-core/src/test/java/org/apache/samza/control/TestControlMessageUtils.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.control;
-
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Multimap;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import org.apache.samza.Partition;
-import org.apache.samza.container.TaskName;
-import org.apache.samza.message.ControlMessage;
-import org.apache.samza.system.OutgoingMessageEnvelope;
-import org.apache.samza.system.StreamMetadataCache;
-import org.apache.samza.system.StreamSpec;
-import org.apache.samza.system.SystemStream;
-import org.apache.samza.system.SystemStreamMetadata;
-import org.apache.samza.task.MessageCollector;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyBoolean;
-import static org.mockito.Matchers.anyObject;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-
-public class TestControlMessageUtils {
-
-  @Test
-  public void testSendControlMessage() {
-    SystemStreamMetadata metadata = mock(SystemStreamMetadata.class);
-    Map<Partition, SystemStreamMetadata.SystemStreamPartitionMetadata> partitionMetadata = new HashMap<>();
-    partitionMetadata.put(new Partition(0), mock(SystemStreamMetadata.SystemStreamPartitionMetadata.class));
-    partitionMetadata.put(new Partition(1), mock(SystemStreamMetadata.SystemStreamPartitionMetadata.class));
-    partitionMetadata.put(new Partition(2), mock(SystemStreamMetadata.SystemStreamPartitionMetadata.class));
-    partitionMetadata.put(new Partition(3), mock(SystemStreamMetadata.SystemStreamPartitionMetadata.class));
-    when(metadata.getSystemStreamPartitionMetadata()).thenReturn(partitionMetadata);
-    StreamMetadataCache metadataCache = mock(StreamMetadataCache.class);
-    when(metadataCache.getSystemStreamMetadata(anyObject(), anyBoolean())).thenReturn(metadata);
-
-    SystemStream systemStream = new SystemStream("test-system", "test-stream");
-    Set<Integer> partitions = new HashSet<>();
-    MessageCollector collector = mock(MessageCollector.class);
-    doAnswer(invocation -> {
-        OutgoingMessageEnvelope envelope = (OutgoingMessageEnvelope) invocation.getArguments()[0];
-        partitions.add((Integer) envelope.getPartitionKey());
-        assertEquals(envelope.getSystemStream(), systemStream);
-        return null;
-      }).when(collector).send(any());
-
-    ControlMessageUtils.sendControlMessage(mock(ControlMessage.class), systemStream, metadataCache, collector);
-    assertEquals(partitions.size(), 4);
-  }
-
-  @Test
-  public void testCalculateUpstreamTaskCounts() {
-    SystemStream input1 = new SystemStream("test-system", "input-stream-1");
-    SystemStream input2 = new SystemStream("test-system", "input-stream-2");
-    SystemStream input3 = new SystemStream("test-system", "input-stream-3");
-
-    Multimap<SystemStream, String> inputToTasks = HashMultimap.create();
-    TaskName t0 = new TaskName("task 0"); //consume input1 and input2
-    TaskName t1 = new TaskName("task 1"); //consume input1 and input2 and input 3
-    TaskName t2 = new TaskName("task 2"); //consume input2 and input 3
-    inputToTasks.put(input1, t0.getTaskName());
-    inputToTasks.put(input1, t1.getTaskName());
-    inputToTasks.put(input2, t0.getTaskName());
-    inputToTasks.put(input2, t1.getTaskName());
-    inputToTasks.put(input2, t2.getTaskName());
-    inputToTasks.put(input3, t1.getTaskName());
-    inputToTasks.put(input3, t2.getTaskName());
-
-    StreamSpec inputSpec2 = new StreamSpec("input-stream-2", "input-stream-2", "test-system");
-    StreamSpec inputSpec3 = new StreamSpec("input-stream-3", "input-stream-3", "test-system");
-    StreamSpec intSpec1 = new StreamSpec("int-stream-1", "int-stream-1", "test-system");
-    StreamSpec intSpec2 = new StreamSpec("int-stream-2", "int-stream-2", "test-system");
-
-    List<IOGraph.IONode> nodes = new ArrayList<>();
-    IOGraph.IONode node = new IOGraph.IONode(intSpec1, true);
-    node.addInput(inputSpec2);
-    nodes.add(node);
-    node = new IOGraph.IONode(intSpec2, true);
-    node.addInput(inputSpec3);
-    nodes.add(node);
-    IOGraph ioGraph = new IOGraph(nodes);
-
-    Map<SystemStream, Integer> counts = ControlMessageUtils.calculateUpstreamTaskCounts(inputToTasks, ioGraph);
-    assertEquals(counts.get(intSpec1.toSystemStream()).intValue(), 3);
-    assertEquals(counts.get(intSpec2.toSystemStream()).intValue(), 2);
-  }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/test/java/org/apache/samza/control/TestEndOfStreamManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/control/TestEndOfStreamManager.java b/samza-core/src/test/java/org/apache/samza/control/TestEndOfStreamManager.java
deleted file mode 100644
index cc70b6b..0000000
--- a/samza-core/src/test/java/org/apache/samza/control/TestEndOfStreamManager.java
+++ /dev/null
@@ -1,333 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.control;
-
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Multimap;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import org.apache.samza.Partition;
-import org.apache.samza.container.TaskName;
-import org.apache.samza.message.EndOfStreamMessage;
-import org.apache.samza.operators.spec.OperatorSpecs;
-import org.apache.samza.operators.spec.OutputOperatorSpec;
-import org.apache.samza.operators.spec.OutputStreamImpl;
-import org.apache.samza.system.IncomingMessageEnvelope;
-import org.apache.samza.system.OutgoingMessageEnvelope;
-import org.apache.samza.system.StreamMetadataCache;
-import org.apache.samza.system.StreamSpec;
-import org.apache.samza.system.SystemStream;
-import org.apache.samza.system.SystemStreamMetadata;
-import org.apache.samza.system.SystemStreamPartition;
-import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.TaskCoordinator;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.ArgumentCaptor;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyBoolean;
-import static org.mockito.Matchers.anyInt;
-import static org.mockito.Matchers.anyObject;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doNothing;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-
-public class TestEndOfStreamManager {
-  StreamMetadataCache metadataCache;
-
-  @Before
-  public void setup() {
-    SystemStreamMetadata metadata = mock(SystemStreamMetadata.class);
-    Map<Partition, SystemStreamMetadata.SystemStreamPartitionMetadata> partitionMetadata = new HashMap<>();
-    partitionMetadata.put(new Partition(0), mock(SystemStreamMetadata.SystemStreamPartitionMetadata.class));
-    partitionMetadata.put(new Partition(1), mock(SystemStreamMetadata.SystemStreamPartitionMetadata.class));
-    partitionMetadata.put(new Partition(2), mock(SystemStreamMetadata.SystemStreamPartitionMetadata.class));
-    partitionMetadata.put(new Partition(3), mock(SystemStreamMetadata.SystemStreamPartitionMetadata.class));
-    when(metadata.getSystemStreamPartitionMetadata()).thenReturn(partitionMetadata);
-    metadataCache = mock(StreamMetadataCache.class);
-    when(metadataCache.getSystemStreamMetadata(anyObject(), anyBoolean())).thenReturn(metadata);
-  }
-
-  @Test
-  public void testUpdateFromInputSource() {
-    SystemStreamPartition ssp = new SystemStreamPartition("test-system", "test-stream", new Partition(0));
-    TaskName taskName = new TaskName("Task 0");
-    Multimap<SystemStream, String> streamToTasks = HashMultimap.create();
-    streamToTasks.put(ssp.getSystemStream(), taskName.getTaskName());
-    ControlMessageListenerTask listener = mock(ControlMessageListenerTask.class);
-    when(listener.getIOGraph()).thenReturn(new IOGraph(Collections.emptyList()));
-    EndOfStreamManager manager = new EndOfStreamManager("Task 0", listener, streamToTasks, Collections.singleton(ssp), null, null);
-    manager.update(EndOfStreamManager.buildEndOfStreamEnvelope(ssp), mock(TaskCoordinator.class));
-    assertTrue(manager.isEndOfStream(ssp.getSystemStream()));
-  }
-
-  @Test
-  public void testUpdateFromIntermediateStream() {
-    SystemStreamPartition[] ssps = new SystemStreamPartition[3];
-    ssps[0] = new SystemStreamPartition("test-system", "test-stream-1", new Partition(0));
-    ssps[1] = new SystemStreamPartition("test-system", "test-stream-2", new Partition(0));
-    ssps[2] = new SystemStreamPartition("test-system", "test-stream-2", new Partition(1));
-
-    TaskName taskName = new TaskName("Task 0");
-    Multimap<SystemStream, String> streamToTasks = HashMultimap.create();
-    for (SystemStreamPartition ssp : ssps) {
-      streamToTasks.put(ssp.getSystemStream(), taskName.getTaskName());
-    }
-    ControlMessageListenerTask listener = mock(ControlMessageListenerTask.class);
-    when(listener.getIOGraph()).thenReturn(new IOGraph(Collections.emptyList()));
-    EndOfStreamManager manager = new EndOfStreamManager("Task 0", listener, streamToTasks, new HashSet<>(Arrays.asList(ssps)), null, null);
-
-    int envelopeCount = 4;
-    IncomingMessageEnvelope[] envelopes = new IncomingMessageEnvelope[envelopeCount];
-    for (int i = 0; i < envelopeCount; i++) {
-      envelopes[i] = new IncomingMessageEnvelope(ssps[0], "dummy-offset", "", new EndOfStreamMessage("task " + i, envelopeCount));
-    }
-    TaskCoordinator coordinator = mock(TaskCoordinator.class);
-
-    // verify the first three messages won't result in end-of-stream
-    for (int i = 0; i < 3; i++) {
-      manager.update(envelopes[i], coordinator);
-      assertFalse(manager.isEndOfStream(ssps[0].getSystemStream()));
-    }
-    // the fourth message will end the stream
-    manager.update(envelopes[3], coordinator);
-    assertTrue(manager.isEndOfStream(ssps[0].getSystemStream()));
-    assertFalse(manager.isEndOfStream(ssps[1].getSystemStream()));
-
-    // stream2 has two partitions assigned to this task, so it requires a message from each partition to end it
-    envelopes = new IncomingMessageEnvelope[envelopeCount];
-    for (int i = 0; i < envelopeCount; i++) {
-      envelopes[i] = new IncomingMessageEnvelope(ssps[1], "dummy-offset", "dummy-key", new EndOfStreamMessage("task " + i, envelopeCount));
-    }
-    // verify the messages for the partition 0 won't result in end-of-stream
-    for (int i = 0; i < 4; i++) {
-      manager.update(envelopes[i], coordinator);
-      assertFalse(manager.isEndOfStream(ssps[1].getSystemStream()));
-    }
-    for (int i = 0; i < envelopeCount; i++) {
-      envelopes[i] = new IncomingMessageEnvelope(ssps[2], "dummy-offset", "dummy-key", new EndOfStreamMessage("task " + i, envelopeCount));
-    }
-    for (int i = 0; i < 3; i++) {
-      manager.update(envelopes[i], coordinator);
-      assertFalse(manager.isEndOfStream(ssps[1].getSystemStream()));
-    }
-    // the fourth message will end the stream
-    manager.update(envelopes[3], coordinator);
-    assertTrue(manager.isEndOfStream(ssps[1].getSystemStream()));
-  }
-
-  @Test
-  public void testUpdateFromIntermediateStreamWith2Tasks() {
-    SystemStreamPartition[] ssps0 = new SystemStreamPartition[2];
-    ssps0[0] = new SystemStreamPartition("test-system", "test-stream-1", new Partition(0));
-    ssps0[1] = new SystemStreamPartition("test-system", "test-stream-2", new Partition(0));
-
-    SystemStreamPartition ssp1 = new SystemStreamPartition("test-system", "test-stream-2", new Partition(1));
-
-    TaskName t0 = new TaskName("Task 0");
-    Multimap<SystemStream, String> streamToTasks = HashMultimap.create();
-    for (SystemStreamPartition ssp : ssps0) {
-      streamToTasks.put(ssp.getSystemStream(), t0.getTaskName());
-    }
-
-    TaskName t1 = new TaskName("Task 1");
-    streamToTasks.put(ssp1, t1.getTaskName());
-
-    List<StreamSpec> inputs = new ArrayList<>();
-    inputs.add(new StreamSpec("test-stream-1", "test-stream-1", "test-system"));
-    inputs.add(new StreamSpec("test-stream-2", "test-stream-2", "test-system"));
-    StreamSpec outputSpec = new StreamSpec("int-stream", "int-stream", "test-system");
-    IOGraph ioGraph = TestIOGraph.buildSimpleIOGraph(inputs, outputSpec, true);
-
-    ControlMessageListenerTask listener = mock(ControlMessageListenerTask.class);
-    when(listener.getIOGraph()).thenReturn(ioGraph);
-
-    EndOfStreamManager manager0 = spy(new EndOfStreamManager("Task 0", listener, streamToTasks, new HashSet<>(Arrays.asList(ssps0)), null, null));
-    manager0.update(EndOfStreamManager.buildEndOfStreamEnvelope(ssps0[0]), mock(TaskCoordinator.class));
-    assertTrue(manager0.isEndOfStream(ssps0[0].getSystemStream()));
-    doNothing().when(manager0).sendEndOfStream(any(), anyInt());
-    manager0.update(EndOfStreamManager.buildEndOfStreamEnvelope(ssps0[1]), mock(TaskCoordinator.class));
-    assertTrue(manager0.isEndOfStream(ssps0[1].getSystemStream()));
-    verify(manager0).sendEndOfStream(any(), anyInt());
-
-    EndOfStreamManager manager1 = spy(new EndOfStreamManager("Task 1", listener, streamToTasks, Collections.singleton(
-        ssp1), null, null));
-    doNothing().when(manager1).sendEndOfStream(any(), anyInt());
-    manager1.update(EndOfStreamManager.buildEndOfStreamEnvelope(ssp1), mock(TaskCoordinator.class));
-    assertTrue(manager1.isEndOfStream(ssp1.getSystemStream()));
-    verify(manager1).sendEndOfStream(any(), anyInt());
-  }
-
-  @Test
-  public void testSendEndOfStream() {
-    StreamSpec ints = new StreamSpec("int-stream", "int-stream", "test-system");
-    StreamSpec input = new StreamSpec("input-stream", "input-stream", "test-system");
-    IOGraph ioGraph = TestIOGraph.buildSimpleIOGraph(Collections.singletonList(input), ints, true);
-
-    Multimap<SystemStream, String> inputToTasks = HashMultimap.create();
-    for (int i = 0; i < 8; i++) {
-      inputToTasks.put(input.toSystemStream(), "Task " + i);
-    }
-
-    MessageCollector collector = mock(MessageCollector.class);
-    TaskName taskName = new TaskName("Task 0");
-    ControlMessageListenerTask listener = mock(ControlMessageListenerTask.class);
-    when(listener.getIOGraph()).thenReturn(ioGraph);
-    EndOfStreamManager manager = new EndOfStreamManager(taskName.getTaskName(),
-        listener,
-        inputToTasks,
-        Collections.EMPTY_SET,
-        metadataCache,
-        collector);
-
-    Set<Integer> partitions = new HashSet<>();
-    doAnswer(invocation -> {
-        OutgoingMessageEnvelope envelope = (OutgoingMessageEnvelope) invocation.getArguments()[0];
-        partitions.add((Integer) envelope.getPartitionKey());
-        EndOfStreamMessage eosMessage = (EndOfStreamMessage) envelope.getMessage();
-        assertEquals(eosMessage.getTaskName(), taskName.getTaskName());
-        assertEquals(eosMessage.getTaskCount(), 8);
-        return null;
-      }).when(collector).send(any());
-
-    manager.sendEndOfStream(input.toSystemStream(), 8);
-    assertEquals(partitions.size(), 4);
-  }
-
-  @Test
-  public void testPropagate() {
-    List<StreamSpec> inputs = new ArrayList<>();
-    inputs.add(new StreamSpec("input-stream-1", "input-stream-1", "test-system"));
-    inputs.add(new StreamSpec("input-stream-2", "input-stream-2", "test-system"));
-    StreamSpec outputSpec = new StreamSpec("int-stream", "int-stream", "test-system");
-
-    SystemStream input1 = new SystemStream("test-system", "input-stream-1");
-    SystemStream input2 = new SystemStream("test-system", "input-stream-2");
-    SystemStream ints = new SystemStream("test-system", "int-stream");
-    SystemStreamPartition[] ssps = new SystemStreamPartition[3];
-    ssps[0] = new SystemStreamPartition(input1, new Partition(0));
-    ssps[1] = new SystemStreamPartition(input2, new Partition(0));
-    ssps[2] = new SystemStreamPartition(ints, new Partition(0));
-
-    Set<SystemStreamPartition> sspSet = new HashSet<>(Arrays.asList(ssps));
-    TaskName taskName = new TaskName("task 0");
-    Multimap<SystemStream, String> streamToTasks = HashMultimap.create();
-    for (SystemStreamPartition ssp : ssps) {
-      streamToTasks.put(ssp.getSystemStream(), taskName.getTaskName());
-    }
-
-    IOGraph ioGraph = TestIOGraph.buildSimpleIOGraph(inputs, outputSpec, true);
-    MessageCollector collector = mock(MessageCollector.class);
-    ControlMessageListenerTask listener = mock(ControlMessageListenerTask.class);
-    when(listener.getIOGraph()).thenReturn(ioGraph);
-    EndOfStreamManager manager = spy(
-        new EndOfStreamManager("task 0", listener, streamToTasks, sspSet, metadataCache, collector));
-    TaskCoordinator coordinator = mock(TaskCoordinator.class);
-
-    // ssp1 end-of-stream, wait for ssp2
-    manager.update(EndOfStreamManager.buildEndOfStreamEnvelope(ssps[0]), coordinator);
-    verify(manager, never()).sendEndOfStream(any(), anyInt());
-
-    // ssp2 end-of-stream, propagate to intermediate
-    manager.update(EndOfStreamManager.buildEndOfStreamEnvelope(ssps[1]), coordinator);
-    doNothing().when(manager).sendEndOfStream(any(), anyInt());
-    ArgumentCaptor<SystemStream> argument = ArgumentCaptor.forClass(SystemStream.class);
-    verify(manager).sendEndOfStream(argument.capture(), anyInt());
-    assertEquals(ints, argument.getValue());
-
-    // intermediate end-of-stream, shutdown the task
-    manager.update(EndOfStreamManager.buildEndOfStreamEnvelope(ssps[2]), coordinator);
-    doNothing().when(coordinator).shutdown(any());
-    ArgumentCaptor<TaskCoordinator.RequestScope> arg = ArgumentCaptor.forClass(TaskCoordinator.RequestScope.class);
-    verify(coordinator).shutdown(arg.capture());
-    assertEquals(TaskCoordinator.RequestScope.CURRENT_TASK, arg.getValue());
-  }
-
-  //  Test the case when the publishing tasks to intermediate stream is a subset of total tasks
-  @Test
-  public void testPropogateWith2Tasks() {
-    StreamSpec outputSpec = new StreamSpec("int-stream", "int-stream", "test-system");
-    OutputStreamImpl outputStream = new OutputStreamImpl(outputSpec, null, null);
-    OutputOperatorSpec partitionByOp = OperatorSpecs.createPartitionByOperatorSpec(outputStream, 0);
-
-    List<StreamSpec> inputs = new ArrayList<>();
-    inputs.add(new StreamSpec("input-stream-1", "input-stream-1", "test-system"));
-
-    IOGraph ioGraph = TestIOGraph.buildSimpleIOGraph(inputs, outputSpec, true);
-
-    SystemStream input1 = new SystemStream("test-system", "input-stream-1");
-    SystemStream ints = new SystemStream("test-system", "int-stream");
-    SystemStreamPartition ssp1 = new SystemStreamPartition(input1, new Partition(0));
-    SystemStreamPartition ssp2 = new SystemStreamPartition(ints, new Partition(0));
-
-    TaskName t0 = new TaskName("task 0");
-    TaskName t1 = new TaskName("task 1");
-    Multimap<SystemStream, String> streamToTasks = HashMultimap.create();
-    streamToTasks.put(ssp1.getSystemStream(), t0.getTaskName());
-    streamToTasks.put(ssp2.getSystemStream(), t1.getTaskName());
-
-    ControlMessageListenerTask listener = mock(ControlMessageListenerTask.class);
-    when(listener.getIOGraph()).thenReturn(ioGraph);
-
-    EndOfStreamManager manager0 = spy(
-        new EndOfStreamManager(t0.getTaskName(), listener, streamToTasks, Collections.singleton(ssp1), metadataCache, null));
-    EndOfStreamManager manager1 = spy(
-        new EndOfStreamManager(t1.getTaskName(), listener, streamToTasks, Collections.singleton(ssp2), metadataCache, null));
-
-    TaskCoordinator coordinator0 = mock(TaskCoordinator.class);
-    TaskCoordinator coordinator1 = mock(TaskCoordinator.class);
-
-    // ssp1 end-of-stream
-    doNothing().when(manager0).sendEndOfStream(any(), anyInt());
-    doNothing().when(coordinator0).shutdown(any());
-    manager0.update(EndOfStreamManager.buildEndOfStreamEnvelope(ssp1), coordinator0);
-    //verify task count is 1
-    ArgumentCaptor<Integer> argument = ArgumentCaptor.forClass(Integer.class);
-    verify(manager0).sendEndOfStream(any(), argument.capture());
-    assertTrue(argument.getValue() == 1);
-    ArgumentCaptor<TaskCoordinator.RequestScope> arg = ArgumentCaptor.forClass(TaskCoordinator.RequestScope.class);
-    verify(coordinator0).shutdown(arg.capture());
-    assertEquals(TaskCoordinator.RequestScope.CURRENT_TASK, arg.getValue());
-
-    // int1 end-of-stream
-    IncomingMessageEnvelope intEos = new IncomingMessageEnvelope(ssp2, null, null, new EndOfStreamMessage(t0.getTaskName(), 1));
-    manager1.update(intEos, coordinator1);
-    doNothing().when(coordinator1).shutdown(any());
-    verify(manager1, never()).sendEndOfStream(any(), anyInt());
-    arg = ArgumentCaptor.forClass(TaskCoordinator.RequestScope.class);
-    verify(coordinator1).shutdown(arg.capture());
-    assertEquals(TaskCoordinator.RequestScope.CURRENT_TASK, arg.getValue());
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/test/java/org/apache/samza/control/TestIOGraph.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/control/TestIOGraph.java b/samza-core/src/test/java/org/apache/samza/control/TestIOGraph.java
deleted file mode 100644
index 39c56c3..0000000
--- a/samza-core/src/test/java/org/apache/samza/control/TestIOGraph.java
+++ /dev/null
@@ -1,200 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.control;
-
-import java.time.Duration;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.function.BiFunction;
-import java.util.function.Function;
-import org.apache.samza.config.Config;
-import org.apache.samza.config.JobConfig;
-import org.apache.samza.config.MapConfig;
-import org.apache.samza.control.IOGraph.IONode;
-import org.apache.samza.operators.MessageStream;
-import org.apache.samza.operators.OutputStream;
-import org.apache.samza.operators.StreamGraphImpl;
-import org.apache.samza.operators.functions.JoinFunction;
-import org.apache.samza.runtime.ApplicationRunner;
-import org.apache.samza.system.StreamSpec;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-
-public class TestIOGraph {
-  StreamSpec input1;
-  StreamSpec input2;
-  StreamSpec input3;
-  StreamSpec output1;
-  StreamSpec output2;
-  StreamSpec int1;
-  StreamSpec int2;
-
-  StreamGraphImpl streamGraph;
-
-  @Before
-  public void setup() {
-    ApplicationRunner runner = mock(ApplicationRunner.class);
-    Map<String, String> configMap = new HashMap<>();
-    configMap.put(JobConfig.JOB_NAME(), "test-app");
-    configMap.put(JobConfig.JOB_DEFAULT_SYSTEM(), "test-system");
-    Config config = new MapConfig(configMap);
-
-    /**
-     * the graph looks like the following. number of partitions in parentheses. quotes indicate expected value.
-     *
-     *                                    input1 -> map -> join -> output1
-     *                                                       |
-     *                      input2 -> partitionBy -> filter -|
-     *                                                       |
-     *           input3 -> filter -> partitionBy -> map -> join -> output2
-     *
-     */
-    input1 = new StreamSpec("input1", "input1", "system1");
-    input2 = new StreamSpec("input2", "input2", "system2");
-    input3 = new StreamSpec("input3", "input3", "system2");
-
-    output1 = new StreamSpec("output1", "output1", "system1");
-    output2 = new StreamSpec("output2", "output2", "system2");
-
-    runner = mock(ApplicationRunner.class);
-    when(runner.getStreamSpec("input1")).thenReturn(input1);
-    when(runner.getStreamSpec("input2")).thenReturn(input2);
-    when(runner.getStreamSpec("input3")).thenReturn(input3);
-    when(runner.getStreamSpec("output1")).thenReturn(output1);
-    when(runner.getStreamSpec("output2")).thenReturn(output2);
-
-    // intermediate streams used in tests
-    int1 = new StreamSpec("test-app-1-partition_by-3", "test-app-1-partition_by-3", "default-system");
-    int2 = new StreamSpec("test-app-1-partition_by-8", "test-app-1-partition_by-8", "default-system");
-    when(runner.getStreamSpec("test-app-1-partition_by-3"))
-        .thenReturn(int1);
-    when(runner.getStreamSpec("test-app-1-partition_by-8"))
-        .thenReturn(int2);
-
-    streamGraph = new StreamGraphImpl(runner, config);
-    BiFunction msgBuilder = mock(BiFunction.class);
-    MessageStream m1 = streamGraph.getInputStream("input1", msgBuilder).map(m -> m);
-    MessageStream m2 = streamGraph.getInputStream("input2", msgBuilder).partitionBy(m -> "haha").filter(m -> true);
-    MessageStream m3 = streamGraph.getInputStream("input3", msgBuilder).filter(m -> true).partitionBy(m -> "hehe").map(m -> m);
-    Function mockFn = mock(Function.class);
-    OutputStream<Object, Object, Object> om1 = streamGraph.getOutputStream("output1", mockFn, mockFn);
-    OutputStream<Object, Object, Object> om2 = streamGraph.getOutputStream("output2", mockFn, mockFn);
-
-    m1.join(m2, mock(JoinFunction.class), Duration.ofHours(2)).sendTo(om1);
-    m3.join(m2, mock(JoinFunction.class), Duration.ofHours(1)).sendTo(om2);
-  }
-
-  @Test
-  public void testBuildIOGraph() {
-    IOGraph ioGraph = streamGraph.toIOGraph();
-    assertEquals(ioGraph.getNodes().size(), 4);
-
-    for (IONode node : ioGraph.getNodes()) {
-      if (node.getOutput().equals(output1)) {
-        assertEquals(node.getInputs().size(), 2);
-        assertFalse(node.isOutputIntermediate());
-        StreamSpec[] inputs = sort(node.getInputs());
-        assertEquals(inputs[0], input1);
-        assertEquals(inputs[1], int1);
-      } else if (node.getOutput().equals(output2)) {
-        assertEquals(node.getInputs().size(), 2);
-        assertFalse(node.isOutputIntermediate());
-        StreamSpec[] inputs = sort(node.getInputs());
-        assertEquals(inputs[0], int1);
-        assertEquals(inputs[1], int2);
-      } else if (node.getOutput().equals(int1)) {
-        assertEquals(node.getInputs().size(), 1);
-        assertTrue(node.isOutputIntermediate());
-        StreamSpec[] inputs = sort(node.getInputs());
-        assertEquals(inputs[0], input2);
-      } else if (node.getOutput().equals(int2)) {
-        assertEquals(node.getInputs().size(), 1);
-        assertTrue(node.isOutputIntermediate());
-        StreamSpec[] inputs = sort(node.getInputs());
-        assertEquals(inputs[0], input3);
-      }
-    }
-  }
-
-  @Test
-  public void testNodesOfInput() {
-    IOGraph ioGraph = streamGraph.toIOGraph();
-    Collection<IONode> nodes = ioGraph.getNodesOfInput(input1.toSystemStream());
-    assertEquals(nodes.size(), 1);
-    IONode node = nodes.iterator().next();
-    assertEquals(node.getOutput(), output1);
-    assertEquals(node.getInputs().size(), 2);
-    assertFalse(node.isOutputIntermediate());
-
-    nodes = ioGraph.getNodesOfInput(input2.toSystemStream());
-    assertEquals(nodes.size(), 1);
-    node = nodes.iterator().next();
-    assertEquals(node.getOutput(), int1);
-    assertEquals(node.getInputs().size(), 1);
-    assertTrue(node.isOutputIntermediate());
-
-    nodes = ioGraph.getNodesOfInput(int1.toSystemStream());
-    assertEquals(nodes.size(), 2);
-    nodes.forEach(n -> {
-        assertEquals(n.getInputs().size(), 2);
-      });
-
-    nodes = ioGraph.getNodesOfInput(input3.toSystemStream());
-    assertEquals(nodes.size(), 1);
-    node = nodes.iterator().next();
-    assertEquals(node.getOutput(), int2);
-    assertEquals(node.getInputs().size(), 1);
-    assertTrue(node.isOutputIntermediate());
-
-    nodes = ioGraph.getNodesOfInput(int2.toSystemStream());
-    assertEquals(nodes.size(), 1);
-    node = nodes.iterator().next();
-    assertEquals(node.getOutput(), output2);
-    assertEquals(node.getInputs().size(), 2);
-    assertFalse(node.isOutputIntermediate());
-  }
-
-  private static StreamSpec[] sort(Set<StreamSpec> specs) {
-    StreamSpec[] array = new StreamSpec[specs.size()];
-    specs.toArray(array);
-    Arrays.sort(array, (s1, s2) -> s1.getId().compareTo(s2.getId()));
-    return array;
-  }
-
-  public static IOGraph buildSimpleIOGraph(List<StreamSpec> inputs,
-      StreamSpec output,
-      boolean isOutputIntermediate) {
-    IONode node = new IONode(output, isOutputIntermediate);
-    inputs.forEach(input -> node.addInput(input));
-    return new IOGraph(Collections.singleton(node));
-  }
-}