You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2017/10/03 22:23:51 UTC

[13/16] samza git commit: SAMZA-1386: Inline End-of-stream and Watermark logic inside OperatorImpl

SAMZA-1386: Inline End-of-stream and Watermark logic inside OperatorImpl

This patch contains the following changes:
1. Refactor watermark and end-of-stream logic. The aggregation/handling has been moved from WatermarkManager/EndOfStreamManager to be inline inside OperatorImpl. This is for keeping the logic in one place.
2. Now subclass of OperatorImpl will override handleWatermark() to do its specific handling, such as fire trigger.
3. Add emitWatermark() in OperatorImpl so subclass can call it to emit watermark upon receiving a message or watermark.

Author: Xinyu Liu <xi...@xiliu-ld1.linkedin.biz>
Author: Xinyu Liu <xi...@xiliu-ld.linkedin.biz>

Reviewers: Yi Pan <ni...@gmail.com>

Closes #277 from xinyuiscool/SAMZA-1386


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/2819cbc7
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/2819cbc7
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/2819cbc7

Branch: refs/heads/master
Commit: 2819cbc7691b569b3eef66d702746d9e34b3e745
Parents: 4754148
Author: Xinyu Liu <xi...@gmail.com>
Authored: Thu Sep 28 17:11:28 2017 -0700
Committer: Xinyu Liu <xi...@xiliu-ld1.linkedin.biz>
Committed: Thu Sep 28 17:11:28 2017 -0700

----------------------------------------------------------------------
 build.gradle                                    |   1 +
 .../operators/functions/WatermarkFunction.java  |  58 ++++
 .../org/apache/samza/system/ControlMessage.java |  46 +++
 .../apache/samza/system/EndOfStreamMessage.java |  38 +++
 .../samza/system/IncomingMessageEnvelope.java   |   7 +-
 .../org/apache/samza/system/MessageType.java    |  45 +++
 .../apache/samza/system/WatermarkMessage.java   |  46 +++
 .../apache/samza/container/TaskContextImpl.java | 131 ++++++++
 .../control/ControlMessageListenerTask.java     |  49 ---
 .../samza/control/ControlMessageUtils.java      |  81 -----
 .../samza/control/EndOfStreamManager.java       | 159 ---------
 .../java/org/apache/samza/control/IOGraph.java  | 113 -------
 .../org/apache/samza/control/Watermark.java     |  57 ----
 .../apache/samza/control/WatermarkManager.java  | 187 -----------
 .../apache/samza/message/ControlMessage.java    |  52 ---
 .../samza/message/EndOfStreamMessage.java       |  36 --
 .../samza/message/IntermediateMessageType.java  |  46 ---
 .../org/apache/samza/message/MessageType.java   |  46 ---
 .../apache/samza/message/WatermarkMessage.java  |  43 ---
 .../apache/samza/operators/StreamGraphImpl.java |   5 -
 .../operators/impl/ControlMessageSender.java    |  56 ++++
 .../samza/operators/impl/EndOfStreamStates.java |  98 ++++++
 .../samza/operators/impl/OperatorImpl.java      | 198 ++++++++---
 .../samza/operators/impl/OperatorImplGraph.java |  98 +++++-
 .../operators/impl/OutputOperatorImpl.java      |  32 +-
 .../samza/operators/impl/WatermarkStates.java   | 119 +++++++
 .../samza/operators/spec/InputOperatorSpec.java |   6 +
 .../samza/operators/spec/JoinOperatorSpec.java  |   6 +
 .../samza/operators/spec/OperatorSpec.java      |   3 +
 .../operators/spec/OutputOperatorSpec.java      |   7 +
 .../samza/operators/spec/SinkOperatorSpec.java  |   6 +
 .../operators/spec/StreamOperatorSpec.java      |   6 +
 .../operators/spec/WindowOperatorSpec.java      |   8 +
 .../serializers/IntermediateMessageSerde.java   |  18 +-
 .../samza/task/AsyncStreamTaskAdapter.java      |  22 +-
 .../apache/samza/task/StreamOperatorTask.java   |  45 ++-
 .../apache/samza/checkpoint/OffsetManager.scala |   7 +-
 .../apache/samza/container/TaskInstance.scala   | 185 ++---------
 .../apache/samza/serializers/SerdeManager.scala |   5 +-
 .../samza/control/TestControlMessageUtils.java  | 115 -------
 .../samza/control/TestEndOfStreamManager.java   | 333 -------------------
 .../org/apache/samza/control/TestIOGraph.java   | 200 -----------
 .../samza/control/TestWatermarkManager.java     | 260 ---------------
 .../samza/operators/TestJoinOperator.java       |   3 +-
 .../samza/operators/TestWindowOperator.java     |   6 +-
 .../impl/TestControlMessageSender.java          |  73 ++++
 .../operators/impl/TestEndOfStreamStates.java   |  78 +++++
 .../samza/operators/impl/TestOperatorImpl.java  |  32 +-
 .../operators/impl/TestOperatorImplGraph.java   | 181 +++++++++-
 .../operators/impl/TestWatermarkStates.java     | 102 ++++++
 .../TestIntermediateMessageSerde.java           |  18 +-
 .../org/apache/samza/task/TestAsyncRunLoop.java |   7 +-
 .../samza/container/TestTaskInstance.scala      |  63 +---
 .../samza/serializers/TestSerdeManager.scala    |  10 +-
 .../samza/system/hdfs/HdfsSystemConsumer.java   |   3 +-
 .../EndOfStreamIntegrationTest.java             |   2 +-
 .../WatermarkIntegrationTest.java               |  14 +-
 .../samza/test/util/ArraySystemConsumer.java    |   3 +-
 58 files changed, 1521 insertions(+), 2153 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 319e51f..e10b970 100644
--- a/build.gradle
+++ b/build.gradle
@@ -125,6 +125,7 @@ project(':samza-api') {
 
   dependencies {
     compile "org.slf4j:slf4j-api:$slf4jVersion"
+    compile "org.codehaus.jackson:jackson-mapper-asl:$jacksonVersion"
     testCompile "junit:junit:$junitVersion"
     testCompile "org.mockito:mockito-all:$mockitoVersion"
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-api/src/main/java/org/apache/samza/operators/functions/WatermarkFunction.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/WatermarkFunction.java b/samza-api/src/main/java/org/apache/samza/operators/functions/WatermarkFunction.java
new file mode 100644
index 0000000..9c4b9bf
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/operators/functions/WatermarkFunction.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.operators.functions;
+
+/**
+ * Allows user-specific handling of Watermark
+ */
+public interface WatermarkFunction {
+
+  /**
+   * Processes the input watermark coming from upstream operators.
+   * This allows user-defined watermark handling, such as trigger events
+   * or propagate it to downstream.
+   * @param watermark input watermark
+   */
+  void processWatermark(long watermark);
+
+  /**
+   * Returns the output watermark. This function will be invoked immediately after either
+   * of the following events:
+   *
+   * <ol>
+   *
+   * <li> Return of the transform function, e.g. {@link FlatMapFunction}.
+   *
+   * <li> Return of the processWatermark function.
+   *
+   * </ol>
+   *
+   *
+   *
+   * Note: If the transform function returns a collection of output, the output watermark
+   * will be emitted after the output collection is propagated to downstream operators. So
+   * it might delay the watermark propagation. The delay will cause more buffering and might
+   * have performance impact.
+   *
+   * @return output watermark, or null if the output watermark should not be updated. Samza
+   * guarantees that the same watermark value will be only emitted once.
+   */
+  Long getOutputWatermark();
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-api/src/main/java/org/apache/samza/system/ControlMessage.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/ControlMessage.java b/samza-api/src/main/java/org/apache/samza/system/ControlMessage.java
new file mode 100644
index 0000000..4ec58b4
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/system/ControlMessage.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system;
+
+/**
+ * The abstract class of all control messages, containing
+ * the task that produces the control message, the total number of producer tasks,
+ * and a version number.
+ */
+public abstract class ControlMessage {
+  private final String taskName;
+  private int version = 1;
+
+  public ControlMessage(String taskName) {
+    this.taskName = taskName;
+  }
+
+  public String getTaskName() {
+    return taskName;
+  }
+
+  public void setVersion(int version) {
+    this.version = version;
+  }
+
+  public int getVersion() {
+    return version;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-api/src/main/java/org/apache/samza/system/EndOfStreamMessage.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/EndOfStreamMessage.java b/samza-api/src/main/java/org/apache/samza/system/EndOfStreamMessage.java
new file mode 100644
index 0000000..59d0356
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/system/EndOfStreamMessage.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system;
+
+import org.codehaus.jackson.annotate.JsonCreator;
+import org.codehaus.jackson.annotate.JsonProperty;
+
+/**
+ *  The EndOfStreamMessage is a control message that is sent out to next stage
+ *  once the task has consumed to the end of a bounded stream.
+ */
+public class EndOfStreamMessage extends ControlMessage {
+  public EndOfStreamMessage() {
+    this(null);
+  }
+
+  @JsonCreator
+  public EndOfStreamMessage(@JsonProperty("task-name") String taskName) {
+    super(taskName);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java b/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java
index 9182522..96fa81c 100644
--- a/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java
+++ b/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java
@@ -96,9 +96,12 @@ public class IncomingMessageEnvelope {
    * @param ssp The SSP that is at end-of-stream.
    * @return an IncomingMessageEnvelope corresponding to end-of-stream for that SSP.
    */
-  @Deprecated
   public static IncomingMessageEnvelope buildEndOfStreamEnvelope(SystemStreamPartition ssp) {
-    return new IncomingMessageEnvelope(ssp, END_OF_STREAM_OFFSET, null, null);
+    return new IncomingMessageEnvelope(ssp, END_OF_STREAM_OFFSET, null, new EndOfStreamMessage(null));
+  }
+
+  public static IncomingMessageEnvelope buildWatermarkEnvelope(SystemStreamPartition ssp, long watermark) {
+    return new IncomingMessageEnvelope(ssp, null, null, new WatermarkMessage(watermark, null));
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-api/src/main/java/org/apache/samza/system/MessageType.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/MessageType.java b/samza-api/src/main/java/org/apache/samza/system/MessageType.java
new file mode 100644
index 0000000..7129d00
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/system/MessageType.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system;
+
+/**
+ * The type of the intermediate stream message. The enum will be encoded using its ordinal value and
+ * put in the first byte of the serialization of intermediate message.
+ */
+public enum MessageType {
+  USER_MESSAGE,
+  WATERMARK,
+  END_OF_STREAM;
+
+  /**
+   * Returns the {@link MessageType} of a particular intermediate stream message.
+   * @param message an intermediate stream message
+   * @return type of the message
+   */
+  public static MessageType of(Object message) {
+    if (message instanceof WatermarkMessage) {
+      return WATERMARK;
+    } else if (message instanceof EndOfStreamMessage) {
+      return END_OF_STREAM;
+    } else {
+      return USER_MESSAGE;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-api/src/main/java/org/apache/samza/system/WatermarkMessage.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/WatermarkMessage.java b/samza-api/src/main/java/org/apache/samza/system/WatermarkMessage.java
new file mode 100644
index 0000000..7278c5c
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/system/WatermarkMessage.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system;
+
+import org.codehaus.jackson.annotate.JsonCreator;
+import org.codehaus.jackson.annotate.JsonProperty;
+
+/**
+ *  The WatermarkMessage is a control message that is sent out to next stage
+ *  with a watermark timestamp and the task that produces the watermark.
+ */
+public class WatermarkMessage extends ControlMessage {
+  private final long timestamp;
+
+  public WatermarkMessage(long watermark) {
+    this(watermark, null);
+  }
+
+  @JsonCreator
+  public WatermarkMessage(@JsonProperty("timestamp") long timestamp,
+                          @JsonProperty("task-name") String taskName) {
+    super(taskName);
+    this.timestamp = timestamp;
+  }
+
+  public long getTimestamp() {
+    return timestamp;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/main/java/org/apache/samza/container/TaskContextImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/TaskContextImpl.java b/samza-core/src/main/java/org/apache/samza/container/TaskContextImpl.java
new file mode 100644
index 0000000..7990d2b
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/container/TaskContextImpl.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.container;
+
+import com.google.common.collect.ImmutableSet;
+import org.apache.samza.checkpoint.OffsetManager;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.metrics.ReadableMetricsRegistry;
+import org.apache.samza.storage.StorageEngine;
+import org.apache.samza.storage.TaskStorageManager;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.task.TaskContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+public class TaskContextImpl implements TaskContext {
+  private static final Logger LOG = LoggerFactory.getLogger(TaskContextImpl.class);
+
+  private final TaskName taskName;
+  private final TaskInstanceMetrics metrics;
+  private final SamzaContainerContext containerContext;
+  private final Set<SystemStreamPartition> systemStreamPartitions;
+  private final OffsetManager offsetManager;
+  private final TaskStorageManager storageManager;
+  private final JobModel jobModel;
+  private final StreamMetadataCache streamMetadataCache;
+  private final Map<String, Object> objectRegistry = new HashMap<>();
+
+  private Object userContext = null;
+
+  public TaskContextImpl(TaskName taskName,
+                         TaskInstanceMetrics metrics,
+                         SamzaContainerContext containerContext,
+                         Set<SystemStreamPartition> systemStreamPartitions,
+                         OffsetManager offsetManager,
+                         TaskStorageManager storageManager,
+                         JobModel jobModel,
+                         StreamMetadataCache streamMetadataCache) {
+    this.taskName = taskName;
+    this.metrics = metrics;
+    this.containerContext = containerContext;
+    this.systemStreamPartitions = ImmutableSet.copyOf(systemStreamPartitions);
+    this.offsetManager = offsetManager;
+    this.storageManager = storageManager;
+    this.jobModel = jobModel;
+    this.streamMetadataCache = streamMetadataCache;
+  }
+
+  @Override
+  public ReadableMetricsRegistry getMetricsRegistry() {
+    return metrics.registry();
+  }
+
+  @Override
+  public Set<SystemStreamPartition> getSystemStreamPartitions() {
+    return systemStreamPartitions;
+  }
+
+  @Override
+  public StorageEngine getStore(String storeName) {
+    if (storageManager != null) {
+      return storageManager.apply(storeName);
+    } else {
+      LOG.warn("No store found for name: {}", storeName);
+      return null;
+    }
+  }
+
+  @Override
+  public TaskName getTaskName() {
+    return taskName;
+  }
+
+  @Override
+  public SamzaContainerContext getSamzaContainerContext() {
+    return containerContext;
+  }
+
+  @Override
+  public void setStartingOffset(SystemStreamPartition ssp, String offset) {
+    offsetManager.setStartingOffset(taskName, ssp, offset);
+  }
+
+  @Override
+  public void setUserContext(Object context) {
+    userContext = context;
+  }
+
+  @Override
+  public Object getUserContext() {
+    return userContext;
+  }
+
+  public void registerObject(String name, Object value) {
+    objectRegistry.put(name, value);
+  }
+
+  public Object fetchObject(String name) {
+    return objectRegistry.get(name);
+  }
+
+  public JobModel getJobModel() {
+    return jobModel;
+  }
+
+  public StreamMetadataCache getStreamMetadataCache() {
+    return streamMetadataCache;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/main/java/org/apache/samza/control/ControlMessageListenerTask.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/control/ControlMessageListenerTask.java b/samza-core/src/main/java/org/apache/samza/control/ControlMessageListenerTask.java
deleted file mode 100644
index 9e4b40a..0000000
--- a/samza-core/src/main/java/org/apache/samza/control/ControlMessageListenerTask.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.control;
-
-import org.apache.samza.system.SystemStream;
-import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.TaskCoordinator;
-
-
-/**
- * The listener interface for the aggregation result of control messages.
- * Any task that handles control messages such as {@link org.apache.samza.message.EndOfStreamMessage}
- * and {@link org.apache.samza.message.WatermarkMessage} needs to implement this interface.
- */
-public interface ControlMessageListenerTask {
-
-  /**
-   * Returns the topology of the streams. Any control message listener needs to
-   * provide this topology so Samza can propagate the control message to downstreams.
-   * @return {@link IOGraph} of input to output streams. It
-   */
-  IOGraph getIOGraph();
-
-  /**
-   * Invoked when a Watermark comes.
-   * @param watermark contains the watermark timestamp
-   * @param systemStream source of stream that emits the watermark
-   * @param collector message collector
-   * @param coordinator task coordinator
-   */
-  void onWatermark(Watermark watermark, SystemStream systemStream, MessageCollector collector, TaskCoordinator coordinator);
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/main/java/org/apache/samza/control/ControlMessageUtils.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/control/ControlMessageUtils.java b/samza-core/src/main/java/org/apache/samza/control/ControlMessageUtils.java
deleted file mode 100644
index ebb0d86..0000000
--- a/samza-core/src/main/java/org/apache/samza/control/ControlMessageUtils.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.control;
-
-import com.google.common.collect.Multimap;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.stream.Collectors;
-import org.apache.samza.message.ControlMessage;
-import org.apache.samza.system.OutgoingMessageEnvelope;
-import org.apache.samza.system.StreamMetadataCache;
-import org.apache.samza.system.SystemStream;
-import org.apache.samza.system.SystemStreamMetadata;
-import org.apache.samza.task.MessageCollector;
-
-
-/**
- * This class privates static utils for handling control messages
- */
-class ControlMessageUtils {
-
-  /**
-   * Send a control message to every partition of the {@link SystemStream}
-   * @param message control message
-   * @param systemStream the stream to sent
-   * @param metadataCache stream metadata cache
-   * @param collector collector to send the message
-   */
-  static void sendControlMessage(ControlMessage message,
-      SystemStream systemStream,
-      StreamMetadataCache metadataCache,
-      MessageCollector collector) {
-    SystemStreamMetadata metadata = metadataCache.getSystemStreamMetadata(systemStream, true);
-    int partitionCount = metadata.getSystemStreamPartitionMetadata().size();
-    for (int i = 0; i < partitionCount; i++) {
-      OutgoingMessageEnvelope envelopeOut = new OutgoingMessageEnvelope(systemStream, i, "", message);
-      collector.send(envelopeOut);
-    }
-  }
-
-  /**
-   * Calculate the mapping from an output stream to the number of upstream tasks that will produce to the output stream
-   * @param inputToTasks input stream to its consumer tasks mapping
-   * @param ioGraph topology of the stream inputs and outputs
-   * @return mapping from output to upstream task count
-   */
-  static Map<SystemStream, Integer> calculateUpstreamTaskCounts(Multimap<SystemStream, String> inputToTasks,
-      IOGraph ioGraph) {
-    if (ioGraph == null) {
-      return Collections.EMPTY_MAP;
-    }
-    Map<SystemStream, Integer> outputTaskCount = new HashMap<>();
-    ioGraph.getNodes().forEach(node -> {
-        // for each input stream, find out the tasks that are consuming this input stream using the inputToTasks mapping,
-        // then count the number of tasks
-        int count = node.getInputs().stream().flatMap(spec -> inputToTasks.get(spec.toSystemStream()).stream())
-            .collect(Collectors.toSet()).size();
-        // put the count of input tasks to the result
-        outputTaskCount.put(node.getOutput().toSystemStream(), count);
-      });
-    return outputTaskCount;
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/main/java/org/apache/samza/control/EndOfStreamManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/control/EndOfStreamManager.java b/samza-core/src/main/java/org/apache/samza/control/EndOfStreamManager.java
deleted file mode 100644
index 78a8741..0000000
--- a/samza-core/src/main/java/org/apache/samza/control/EndOfStreamManager.java
+++ /dev/null
@@ -1,159 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.control;
-
-import com.google.common.collect.Multimap;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import org.apache.samza.message.EndOfStreamMessage;
-import org.apache.samza.system.IncomingMessageEnvelope;
-import org.apache.samza.system.StreamMetadataCache;
-import org.apache.samza.system.SystemStream;
-import org.apache.samza.system.SystemStreamPartition;
-import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.TaskCoordinator;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * This class handles the end-of-stream control message. It aggregates the end-of-stream state for each input ssps of
- * a task, and propagate the eos messages to downstream intermediate streams if needed.
- *
- * Internal use only.
- */
-public class EndOfStreamManager {
-  private static final Logger log = LoggerFactory.getLogger(EndOfStreamManager.class);
-
-  private final String taskName;
-  private final MessageCollector collector;
-  // end-of-stream state per ssp
-  private final Map<SystemStreamPartition, EndOfStreamState> eosStates;
-  private final StreamMetadataCache metadataCache;
-  // topology information. Set during init()
-  private final ControlMessageListenerTask listener;
-  // mapping from output stream to its upstream task count
-  private final Map<SystemStream, Integer> upstreamTaskCounts;
-
-  public EndOfStreamManager(String taskName,
-      ControlMessageListenerTask listener,
-      Multimap<SystemStream, String> inputToTasks,
-      Set<SystemStreamPartition> ssps,
-      StreamMetadataCache metadataCache,
-      MessageCollector collector) {
-    this.taskName = taskName;
-    this.listener = listener;
-    this.metadataCache = metadataCache;
-    this.collector = collector;
-    Map<SystemStreamPartition, EndOfStreamState> states = new HashMap<>();
-    ssps.forEach(ssp -> {
-        states.put(ssp, new EndOfStreamState());
-      });
-    this.eosStates = Collections.unmodifiableMap(states);
-    this.upstreamTaskCounts = ControlMessageUtils.calculateUpstreamTaskCounts(inputToTasks, listener.getIOGraph());
-  }
-
-  public void update(IncomingMessageEnvelope envelope, TaskCoordinator coordinator) {
-    EndOfStreamState state = eosStates.get(envelope.getSystemStreamPartition());
-    EndOfStreamMessage message = (EndOfStreamMessage) envelope.getMessage();
-    state.update(message.getTaskName(), message.getTaskCount());
-    log.info("Received end-of-stream from task " + message.getTaskName() + " in " + envelope.getSystemStreamPartition());
-
-    SystemStream systemStream = envelope.getSystemStreamPartition().getSystemStream();
-    if (isEndOfStream(systemStream)) {
-      log.info("End-of-stream of input " + systemStream + " for " + systemStream);
-      listener.getIOGraph().getNodesOfInput(systemStream).forEach(node -> {
-          // find the intermediate streams that need broadcast the eos messages
-          if (node.isOutputIntermediate()) {
-            // check all the input stream partitions assigned to the task are end-of-stream
-            boolean inputsEndOfStream = node.getInputs().stream().allMatch(spec -> isEndOfStream(spec.toSystemStream()));
-            if (inputsEndOfStream) {
-              // broadcast the end-of-stream message to the intermediate stream
-              SystemStream outputStream = node.getOutput().toSystemStream();
-              sendEndOfStream(outputStream, upstreamTaskCounts.get(outputStream));
-            }
-          }
-        });
-
-      boolean allEndOfStream = eosStates.values().stream().allMatch(EndOfStreamState::isEndOfStream);
-      if (allEndOfStream) {
-        // all inputs have been end-of-stream, shut down the task
-        log.info("All input streams have reached the end for task " + taskName);
-        coordinator.shutdown(TaskCoordinator.RequestScope.CURRENT_TASK);
-      }
-    }
-  }
-
-  /**
-   * Return true if all partitions of the systemStream that are assigned to the current task have reached EndOfStream.
-   * @param systemStream stream
-   * @return whether the stream reaches to the end for this task
-   */
-  boolean isEndOfStream(SystemStream systemStream) {
-    return eosStates.entrySet().stream()
-        .filter(entry -> entry.getKey().getSystemStream().equals(systemStream))
-        .allMatch(entry -> entry.getValue().isEndOfStream());
-  }
-
-  /**
-   * Send the EndOfStream control messages to downstream
-   * @param systemStream downstream stream
-   */
-  void sendEndOfStream(SystemStream systemStream, int taskCount) {
-    log.info("Send end-of-stream messages with upstream task count {} to all partitions of {}", taskCount, systemStream);
-    final EndOfStreamMessage message = new EndOfStreamMessage(taskName, taskCount);
-    ControlMessageUtils.sendControlMessage(message, systemStream, metadataCache, collector);
-  }
-
-  /**
-   * This class keeps the internal state for a ssp to be end-of-stream.
-   */
-  final static class EndOfStreamState {
-    // set of upstream tasks
-    private final Set<String> tasks = new HashSet<>();
-    private int expectedTotal = Integer.MAX_VALUE;
-    private boolean isEndOfStream = false;
-
-    void update(String taskName, int taskCount) {
-      if (taskName != null) {
-        tasks.add(taskName);
-      }
-      expectedTotal = taskCount;
-      isEndOfStream = tasks.size() == expectedTotal;
-    }
-
-    boolean isEndOfStream() {
-      return isEndOfStream;
-    }
-  }
-
-  /**
-   * Build an end-of-stream envelope for an ssp of a source input.
-   *
-   * @param ssp The SSP that is at end-of-stream.
-   * @return an IncomingMessageEnvelope corresponding to end-of-stream for that SSP.
-   */
-  public static IncomingMessageEnvelope buildEndOfStreamEnvelope(SystemStreamPartition ssp) {
-    return new IncomingMessageEnvelope(ssp, IncomingMessageEnvelope.END_OF_STREAM_OFFSET, null, new EndOfStreamMessage(null, 0));
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/main/java/org/apache/samza/control/IOGraph.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/control/IOGraph.java b/samza-core/src/main/java/org/apache/samza/control/IOGraph.java
deleted file mode 100644
index a30c13d..0000000
--- a/samza-core/src/main/java/org/apache/samza/control/IOGraph.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.control;
-
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Multimap;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import org.apache.samza.operators.StreamGraphImpl;
-import org.apache.samza.operators.spec.OperatorSpec;
-import org.apache.samza.operators.spec.OutputOperatorSpec;
-import org.apache.samza.system.StreamSpec;
-import org.apache.samza.system.SystemStream;
-
-
-/**
- * This class provides the topology of stream inputs to outputs.
- */
-public class IOGraph {
-
-  public static final class IONode {
-    private final Set<StreamSpec> inputs = new HashSet<>();
-    private final StreamSpec output;
-    private final boolean isOutputIntermediate;
-
-    IONode(StreamSpec output, boolean isOutputIntermediate) {
-      this.output = output;
-      this.isOutputIntermediate = isOutputIntermediate;
-    }
-
-    void addInput(StreamSpec input) {
-      inputs.add(input);
-    }
-
-    public Set<StreamSpec> getInputs() {
-      return Collections.unmodifiableSet(inputs);
-    }
-
-    public StreamSpec getOutput() {
-      return output;
-    }
-
-    public boolean isOutputIntermediate() {
-      return isOutputIntermediate;
-    }
-  }
-
-  final Collection<IONode> nodes;
-  final Multimap<SystemStream, IONode> inputToNodes;
-
-  public IOGraph(Collection<IONode> nodes) {
-    this.nodes = Collections.unmodifiableCollection(nodes);
-    this.inputToNodes = HashMultimap.create();
-    nodes.forEach(node -> {
-        node.getInputs().forEach(stream -> {
-            inputToNodes.put(new SystemStream(stream.getSystemName(), stream.getPhysicalName()), node);
-          });
-      });
-  }
-
-  public Collection<IONode> getNodes() {
-    return this.nodes;
-  }
-
-  public Collection<IONode> getNodesOfInput(SystemStream input) {
-    return inputToNodes.get(input);
-  }
-
-  public static IOGraph buildIOGraph(StreamGraphImpl streamGraph) {
-    Map<Integer, IONode> nodes = new HashMap<>();
-    streamGraph.getInputOperators().entrySet().stream()
-        .forEach(entry -> buildIONodes(entry.getKey(), entry.getValue(), nodes));
-    return new IOGraph(nodes.values());
-  }
-
-  /* package private */
-  static void buildIONodes(StreamSpec input, OperatorSpec opSpec, Map<Integer, IONode> ioGraph) {
-    if (opSpec instanceof OutputOperatorSpec) {
-      OutputOperatorSpec outputOpSpec = (OutputOperatorSpec) opSpec;
-      IONode node = ioGraph.get(opSpec.getOpId());
-      if (node == null) {
-        StreamSpec output = outputOpSpec.getOutputStream().getStreamSpec();
-        node = new IONode(output, outputOpSpec.getOpCode() == OperatorSpec.OpCode.PARTITION_BY);
-        ioGraph.put(opSpec.getOpId(), node);
-      }
-      node.addInput(input);
-    }
-
-    Collection<OperatorSpec> nextOperators = opSpec.getRegisteredOperatorSpecs();
-    nextOperators.forEach(spec -> buildIONodes(input, spec, ioGraph));
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/main/java/org/apache/samza/control/Watermark.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/control/Watermark.java b/samza-core/src/main/java/org/apache/samza/control/Watermark.java
deleted file mode 100644
index a11e3b0..0000000
--- a/samza-core/src/main/java/org/apache/samza/control/Watermark.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.control;
-
-import org.apache.samza.annotation.InterfaceStability;
-import org.apache.samza.system.SystemStream;
-
-
-/**
- * A watermark is a monotonically increasing value, which represents the point up to which the
- * system believes it has received all of the data before the watermark timestamp. Data that arrives
- * with a timestamp that is before the watermark is considered late.
- *
- * <p>This is the aggregate result from the WatermarkManager, which keeps track of the control message
- * {@link org.apache.samza.message.WatermarkMessage} and aggregate by returning the min of all watermark timestamp
- * in each partition.
- */
-@InterfaceStability.Unstable
-public interface Watermark {
-  /**
-   * Returns the timestamp of the watermark
-   * Note that if the task consumes more than one partitions of this stream, the watermark emitted is the min of
-   * watermarks across all partitions.
-   * @return timestamp
-   */
-  long getTimestamp();
-
-  /**
-   * Propagates the watermark to an intermediate stream
-   * @param systemStream intermediate stream
-   */
-  void propagate(SystemStream systemStream);
-
-  /**
-   * Create a copy of the watermark with the timestamp
-   * @param timestamp new timestamp
-   * @return new watermark
-   */
-  Watermark copyWithTimestamp(long timestamp);
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/main/java/org/apache/samza/control/WatermarkManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/control/WatermarkManager.java b/samza-core/src/main/java/org/apache/samza/control/WatermarkManager.java
deleted file mode 100644
index c4fdd88..0000000
--- a/samza-core/src/main/java/org/apache/samza/control/WatermarkManager.java
+++ /dev/null
@@ -1,187 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.control;
-
-import com.google.common.collect.Multimap;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import org.apache.samza.message.WatermarkMessage;
-import org.apache.samza.system.IncomingMessageEnvelope;
-import org.apache.samza.system.StreamMetadataCache;
-import org.apache.samza.system.SystemStream;
-import org.apache.samza.system.SystemStreamPartition;
-import org.apache.samza.task.MessageCollector;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * This class manages watermarks. It aggregates the watermark control messages from the upstage tasks
- * for each SSP into an envelope of {@link Watermark}, and provide a dispatcher to propagate it to downstream.
- *
- * Internal use only.
- */
-public class WatermarkManager {
-  private static final Logger log = LoggerFactory.getLogger(WatermarkManager.class);
-  public static final long TIME_NOT_EXIST = -1;
-
-  private final String taskName;
-  private final Map<SystemStreamPartition, WatermarkState> watermarkStates;
-  private final Map<SystemStream, Long> watermarkPerStream;
-  private final StreamMetadataCache metadataCache;
-  private final MessageCollector collector;
-  // mapping from output stream to its upstream task count
-  private final Map<SystemStream, Integer> upstreamTaskCounts;
-
-  public WatermarkManager(String taskName,
-      ControlMessageListenerTask listener,
-      Multimap<SystemStream, String> inputToTasks,
-      Set<SystemStreamPartition> ssps,
-      StreamMetadataCache metadataCache,
-      MessageCollector collector) {
-    this.taskName = taskName;
-    this.watermarkPerStream = new HashMap<>();
-    this.metadataCache = metadataCache;
-    this.collector = collector;
-    this.upstreamTaskCounts = ControlMessageUtils.calculateUpstreamTaskCounts(inputToTasks, listener.getIOGraph());
-
-    Map<SystemStreamPartition, WatermarkState> states = new HashMap<>();
-    ssps.forEach(ssp -> {
-        states.put(ssp, new WatermarkState());
-        watermarkPerStream.put(ssp.getSystemStream(), TIME_NOT_EXIST);
-      });
-    this.watermarkStates = Collections.unmodifiableMap(states);
-  }
-
-  /**
-   * Update the watermark based on the incoming watermark message. The message contains
-   * a timestamp and the upstream producer task. The aggregation result is the minimal value
-   * of all watermarks for the stream:
-   * <ul>
-   *   <li>Watermark(ssp) = min { Watermark(task) | task is upstream producer and the count equals total expected tasks } </li>
-   *   <li>Watermark(stream) = min { Watermark(ssp) | ssp is a partition of stream that assigns to this task } </li>
-   * </ul>
-   *
-   * @param envelope the envelope contains {@link WatermarkMessage}
-   * @return watermark envelope if there is a new aggregate watermark for the stream
-   */
-  public Watermark update(IncomingMessageEnvelope envelope) {
-    SystemStreamPartition ssp = envelope.getSystemStreamPartition();
-    WatermarkState state = watermarkStates.get(ssp);
-    WatermarkMessage message = (WatermarkMessage) envelope.getMessage();
-    state.update(message.getTimestamp(), message.getTaskName(), message.getTaskCount());
-
-    if (state.getWatermarkTime() != TIME_NOT_EXIST) {
-      long minTimestamp = watermarkStates.entrySet().stream()
-          .filter(entry -> entry.getKey().getSystemStream().equals(ssp.getSystemStream()))
-          .map(entry -> entry.getValue().getWatermarkTime())
-          .min(Long::compare)
-          .get();
-      Long curWatermark = watermarkPerStream.get(ssp.getSystemStream());
-      if (curWatermark == null || curWatermark < minTimestamp) {
-        watermarkPerStream.put(ssp.getSystemStream(), minTimestamp);
-        return new WatermarkImpl(minTimestamp);
-      }
-    }
-
-    return null;
-  }
-
-  /* package private */
-  long getWatermarkTime(SystemStreamPartition ssp) {
-    return watermarkStates.get(ssp).getWatermarkTime();
-  }
-
-  /**
-   * Send the watermark message to all partitions of an intermediate stream
-   * @param timestamp watermark timestamp
-   * @param systemStream intermediate stream
-   */
-  void sendWatermark(long timestamp, SystemStream systemStream, int taskCount) {
-    log.info("Send end-of-stream messages to all partitions of " + systemStream);
-    final WatermarkMessage watermarkMessage = new WatermarkMessage(timestamp, taskName, taskCount);
-    ControlMessageUtils.sendControlMessage(watermarkMessage, systemStream, metadataCache, collector);
-  }
-
-  /**
-   * Per ssp state of the watermarks. This class keeps track of the latest watermark timestamp
-   * from each upstream producer tasks, and use the min to update the aggregated watermark time.
-   */
-  final static class WatermarkState {
-    private int expectedTotal = Integer.MAX_VALUE;
-    private final Map<String, Long> timestamps = new HashMap<>();
-    private long watermarkTime = TIME_NOT_EXIST;
-
-    void update(long timestamp, String taskName, int taskCount) {
-      if (taskName != null) {
-        timestamps.put(taskName, timestamp);
-      }
-      expectedTotal = taskCount;
-
-      if (timestamps.size() == expectedTotal) {
-        Optional<Long> min = timestamps.values().stream().min(Long::compare);
-        watermarkTime = min.orElse(timestamp);
-      }
-    }
-
-    long getWatermarkTime() {
-      return watermarkTime;
-    }
-  }
-
-  /**
-   * Implementation of the Watermark. It keeps a reference to the {@link WatermarkManager}
-   */
-  class WatermarkImpl implements Watermark {
-    private final long timestamp;
-
-    WatermarkImpl(long timestamp) {
-      this.timestamp = timestamp;
-    }
-
-    @Override
-    public long getTimestamp() {
-      return timestamp;
-    }
-
-    @Override
-    public void propagate(SystemStream systemStream) {
-      sendWatermark(timestamp, systemStream, upstreamTaskCounts.get(systemStream));
-    }
-
-    @Override
-    public Watermark copyWithTimestamp(long time) {
-      return new WatermarkImpl(time);
-    }
-  }
-
-  /**
-   * Build a watermark control message envelope for an ssp of a source input.
-   * @param timestamp watermark time
-   * @param ssp {@link SystemStreamPartition} where the watermark coming from.
-   * @return envelope of the watermark control message
-   */
-  public static IncomingMessageEnvelope buildWatermarkEnvelope(long timestamp, SystemStreamPartition ssp) {
-    return new IncomingMessageEnvelope(ssp, null, "", new WatermarkMessage(timestamp, null, 0));
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/main/java/org/apache/samza/message/ControlMessage.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/message/ControlMessage.java b/samza-core/src/main/java/org/apache/samza/message/ControlMessage.java
deleted file mode 100644
index 46bf559..0000000
--- a/samza-core/src/main/java/org/apache/samza/message/ControlMessage.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.message;
-
-/**
- * The abstract class of all control messages, containing
- * the task that produces the control message, the total number of producer tasks,
- * and a version number.
- */
-public abstract class ControlMessage {
-  private final String taskName;
-  private final int taskCount;
-  private int version = 1;
-
-  public ControlMessage(String taskName, int taskCount) {
-    this.taskName = taskName;
-    this.taskCount = taskCount;
-  }
-
-  public String getTaskName() {
-    return taskName;
-  }
-
-  public int getTaskCount() {
-    return taskCount;
-  }
-
-  public void setVersion(int version) {
-    this.version = version;
-  }
-
-  public int getVersion() {
-    return version;
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/main/java/org/apache/samza/message/EndOfStreamMessage.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/message/EndOfStreamMessage.java b/samza-core/src/main/java/org/apache/samza/message/EndOfStreamMessage.java
deleted file mode 100644
index 91981a9..0000000
--- a/samza-core/src/main/java/org/apache/samza/message/EndOfStreamMessage.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.message;
-
-import org.codehaus.jackson.annotate.JsonCreator;
-import org.codehaus.jackson.annotate.JsonProperty;
-
-/**
- *  The EndOfStreamMessage is a control message that is sent out to next stage
- *  once the task has consumed to the end of a bounded stream.
- */
-public class EndOfStreamMessage extends ControlMessage {
-
-  @JsonCreator
-  public EndOfStreamMessage(@JsonProperty("task-name") String taskName,
-                            @JsonProperty("task-count") int taskCount) {
-    super(taskName, taskCount);
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/main/java/org/apache/samza/message/IntermediateMessageType.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/message/IntermediateMessageType.java b/samza-core/src/main/java/org/apache/samza/message/IntermediateMessageType.java
deleted file mode 100644
index 25fbb14..0000000
--- a/samza-core/src/main/java/org/apache/samza/message/IntermediateMessageType.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.message;
-
-/**
- * The type of the intermediate stream message. The enum will be encoded using its ordinal value and
- * put in the first byte of the serialization of intermediate message.
- * For more details, see {@link org.apache.samza.serializers.IntermediateMessageSerde}
- */
-public enum IntermediateMessageType {
-  USER_MESSAGE,
-  WATERMARK_MESSAGE,
-  END_OF_STREAM_MESSAGE;
-
-  /**
-   * Returns the {@link IntermediateMessageType} of a particular intermediate stream message.
-   * @param message an intermediate stream message
-   * @return type of the message
-   */
-  public static IntermediateMessageType of(Object message) {
-    if (message instanceof WatermarkMessage) {
-      return WATERMARK_MESSAGE;
-    } else if (message instanceof EndOfStreamMessage) {
-      return END_OF_STREAM_MESSAGE;
-    } else {
-      return USER_MESSAGE;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/main/java/org/apache/samza/message/MessageType.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/message/MessageType.java b/samza-core/src/main/java/org/apache/samza/message/MessageType.java
deleted file mode 100644
index b1199b6..0000000
--- a/samza-core/src/main/java/org/apache/samza/message/MessageType.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.message;
-
-/**
- * The type of the intermediate stream message. The enum will be encoded using its ordinal value and
- * put in the first byte of the serialization of intermediate message.
- * For more details, see {@link org.apache.samza.serializers.IntermediateMessageSerde}
- */
-public enum MessageType {
-  USER_MESSAGE,
-  WATERMARK,
-  END_OF_STREAM;
-
-  /**
-   * Returns the {@link MessageType} of a particular intermediate stream message.
-   * @param message an intermediate stream message
-   * @return type of the message
-   */
-  public static MessageType of(Object message) {
-    if (message instanceof WatermarkMessage) {
-      return WATERMARK;
-    } else if (message instanceof EndOfStreamMessage) {
-      return END_OF_STREAM;
-    } else {
-      return USER_MESSAGE;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/main/java/org/apache/samza/message/WatermarkMessage.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/message/WatermarkMessage.java b/samza-core/src/main/java/org/apache/samza/message/WatermarkMessage.java
deleted file mode 100644
index aa25742..0000000
--- a/samza-core/src/main/java/org/apache/samza/message/WatermarkMessage.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.message;
-
-import org.codehaus.jackson.annotate.JsonCreator;
-import org.codehaus.jackson.annotate.JsonProperty;
-
-/**
- *  The WatermarkMessage is a control message that is sent out to next stage
- *  with a watermark timestamp and the task that produces the watermark.
- */
-public class WatermarkMessage extends ControlMessage {
-  private final long timestamp;
-
-  @JsonCreator
-  public WatermarkMessage(@JsonProperty("timestamp") long timestamp,
-                          @JsonProperty("task-name") String taskName,
-                          @JsonProperty("task-count") int taskCount) {
-    super(taskName, taskCount);
-    this.timestamp = timestamp;
-  }
-
-  public long getTimestamp() {
-    return timestamp;
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java b/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
index 2aec49f..2c2eb56 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
@@ -24,7 +24,6 @@ import org.apache.samza.operators.spec.InputOperatorSpec;
 import org.apache.samza.operators.spec.OutputStreamImpl;
 import org.apache.samza.operators.stream.IntermediateMessageStreamImpl;
 import org.apache.samza.operators.spec.OperatorSpec;
-import org.apache.samza.control.IOGraph;
 import org.apache.samza.runtime.ApplicationRunner;
 import org.apache.samza.system.StreamSpec;
 
@@ -206,8 +205,4 @@ public class StreamGraphImpl implements StreamGraph {
 
     return windowOrJoinSpecs.size() != 0;
   }
-
-  public IOGraph toIOGraph() {
-    return IOGraph.buildIOGraph(this);
-  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/main/java/org/apache/samza/operators/impl/ControlMessageSender.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/ControlMessageSender.java b/samza-core/src/main/java/org/apache/samza/operators/impl/ControlMessageSender.java
new file mode 100644
index 0000000..3bdc361
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/ControlMessageSender.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.operators.impl;
+
+import org.apache.samza.system.ControlMessage;
+import org.apache.samza.system.MessageType;
+import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.task.MessageCollector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * This is a helper class to broadcast control messages to each partition of an intermediate stream
+ */
+class ControlMessageSender {
+  private static final Logger LOG = LoggerFactory.getLogger(ControlMessageSender.class);
+
+  private final StreamMetadataCache metadataCache;
+
+  ControlMessageSender(StreamMetadataCache metadataCache) {
+    this.metadataCache = metadataCache;
+  }
+
+  void send(ControlMessage message, SystemStream systemStream, MessageCollector collector) {
+    SystemStreamMetadata metadata = metadataCache.getSystemStreamMetadata(systemStream, true);
+    int partitionCount = metadata.getSystemStreamPartitionMetadata().size();
+    LOG.info(String.format("Broadcast %s message from task %s to %s with %s partition",
+        MessageType.of(message).name(), message.getTaskName(), systemStream, partitionCount));
+
+    for (int i = 0; i < partitionCount; i++) {
+      OutgoingMessageEnvelope envelopeOut = new OutgoingMessageEnvelope(systemStream, i, null, message);
+      collector.send(envelopeOut);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/main/java/org/apache/samza/operators/impl/EndOfStreamStates.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/EndOfStreamStates.java b/samza-core/src/main/java/org/apache/samza/operators/impl/EndOfStreamStates.java
new file mode 100644
index 0000000..a69b234
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/EndOfStreamStates.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.operators.impl;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import org.apache.samza.system.EndOfStreamMessage;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamPartition;
+
+
+/**
+ * This class manages the end-of-stream state of the streams in a task. Internally it keeps track of end-of-stream
+ * messages received from upstream tasks for each system stream partition (ssp). If messages have been received from
+ * all tasks, it will mark the ssp as end-of-stream. For a stream to be end-of-stream, all its partitions assigned to
+ * the task need to be end-of-stream.
+ *
+ * This class is thread-safe.
+ */
+class EndOfStreamStates {
+
+  private static final class EndOfStreamState {
+    // set of upstream tasks
+    private final Set<String> tasks = new HashSet<>();
+    private final int expectedTotal;
+    private volatile boolean isEndOfStream = false;
+
+    EndOfStreamState(int expectedTotal) {
+      this.expectedTotal = expectedTotal;
+    }
+
+    synchronized void update(String taskName) {
+      if (taskName != null) {
+        tasks.add(taskName);
+      }
+      isEndOfStream = tasks.size() == expectedTotal;
+    }
+
+    boolean isEndOfStream() {
+      return isEndOfStream;
+    }
+  }
+
+  private final Map<SystemStreamPartition, EndOfStreamState> eosStates;
+
+  /**
+   * Constructing the end-of-stream states for a task
+   * @param ssps all the ssps assigned to this task
+   * @param producerTaskCounts mapping from a stream to the number of upstream tasks that produce to it
+   */
+  EndOfStreamStates(Set<SystemStreamPartition> ssps, Map<SystemStream, Integer> producerTaskCounts) {
+    Map<SystemStreamPartition, EndOfStreamState> states = new HashMap<>();
+    ssps.forEach(ssp -> {
+        states.put(ssp, new EndOfStreamState(producerTaskCounts.getOrDefault(ssp.getSystemStream(), 0)));
+      });
+    this.eosStates = Collections.unmodifiableMap(states);
+  }
+
+  /**
+   * Update the state upon receiving an end-of-stream message.
+   * @param eos message of {@link EndOfStreamMessage}
+   * @param ssp system stream partition
+   */
+  void update(EndOfStreamMessage eos, SystemStreamPartition ssp) {
+    EndOfStreamState state = eosStates.get(ssp);
+    state.update(eos.getTaskName());
+  }
+
+  boolean isEndOfStream(SystemStream systemStream) {
+    return eosStates.entrySet().stream()
+        .filter(entry -> entry.getKey().getSystemStream().equals(systemStream))
+        .allMatch(entry -> entry.getValue().isEndOfStream());
+  }
+
+  boolean allEndOfStream() {
+    return eosStates.values().stream().allMatch(EndOfStreamState::isEndOfStream);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/2819cbc7/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
index 8f51c5f..eefd4eb 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
@@ -24,22 +24,29 @@ import java.util.HashSet;
 import java.util.Set;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.MetricsConfig;
-import org.apache.samza.control.Watermark;
-import org.apache.samza.control.WatermarkManager;
+import org.apache.samza.container.TaskContextImpl;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.operators.functions.WatermarkFunction;
+import org.apache.samza.system.EndOfStreamMessage;
 import org.apache.samza.metrics.Counter;
 import org.apache.samza.metrics.MetricsRegistry;
 import org.apache.samza.metrics.Timer;
 import org.apache.samza.operators.spec.OperatorSpec;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.system.WatermarkMessage;
 import org.apache.samza.task.MessageCollector;
 import org.apache.samza.task.TaskContext;
 import org.apache.samza.task.TaskCoordinator;
 import org.apache.samza.util.HighResolutionClock;
-
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Abstract base class for all stream operator implementations.
  */
 public abstract class OperatorImpl<M, RM> {
+  private static final Logger LOG = LoggerFactory.getLogger(OperatorImpl.class);
   private static final String METRICS_GROUP = OperatorImpl.class.getName();
 
   private boolean initialized;
@@ -48,11 +55,18 @@ public abstract class OperatorImpl<M, RM> {
   private Counter numMessage;
   private Timer handleMessageNs;
   private Timer handleTimerNs;
-  private long inputWatermarkTime = WatermarkManager.TIME_NOT_EXIST;
-  private long outputWatermarkTime = WatermarkManager.TIME_NOT_EXIST;
+  private long inputWatermark = WatermarkStates.WATERMARK_NOT_EXIST;
+  private long outputWatermark = WatermarkStates.WATERMARK_NOT_EXIST;
+  private TaskName taskName;
 
   Set<OperatorImpl<RM, ?>> registeredOperators;
   Set<OperatorImpl<?, M>> prevOperators;
+  Set<SystemStream> inputStreams;
+
+  // end-of-stream states
+  private EndOfStreamStates eosStates;
+  // watermark states
+  private WatermarkStates watermarkStates;
 
   /**
    * Initialize this {@link OperatorImpl} and its user-defined functions.
@@ -74,10 +88,16 @@ public abstract class OperatorImpl<M, RM> {
     this.highResClock = createHighResClock(config);
     registeredOperators = new HashSet<>();
     prevOperators = new HashSet<>();
+    inputStreams = new HashSet<>();
     MetricsRegistry metricsRegistry = context.getMetricsRegistry();
     this.numMessage = metricsRegistry.newCounter(METRICS_GROUP, opName + "-messages");
     this.handleMessageNs = metricsRegistry.newTimer(METRICS_GROUP, opName + "-handle-message-ns");
     this.handleTimerNs = metricsRegistry.newTimer(METRICS_GROUP, opName + "-handle-timer-ns");
+    this.taskName = context.getTaskName();
+
+    TaskContextImpl taskContext = (TaskContextImpl) context;
+    this.eosStates = (EndOfStreamStates) taskContext.fetchObject(EndOfStreamStates.class.getName());
+    this.watermarkStates = (WatermarkStates) taskContext.fetchObject(WatermarkStates.class.getName());
 
     handleInit(config, context);
 
@@ -111,6 +131,10 @@ public abstract class OperatorImpl<M, RM> {
     this.prevOperators.add(prevOperator);
   }
 
+  void registerInputStream(SystemStream input) {
+    this.inputStreams.add(input);
+  }
+
   /**
    * Handle the incoming {@code message} for this {@link OperatorImpl} and propagate results to registered operators.
    * <p>
@@ -128,6 +152,13 @@ public abstract class OperatorImpl<M, RM> {
     this.handleMessageNs.update(endNs - startNs);
 
     results.forEach(rm -> this.registeredOperators.forEach(op -> op.onMessage(rm, collector, coordinator)));
+
+    WatermarkFunction watermarkFn = getOperatorSpec().getWatermarkFn();
+    if (watermarkFn != null) {
+      // check whether there is new watermark emitted from the user function
+      Long outputWm = watermarkFn.getOutputWatermark();
+      propagateWatermark(outputWm, collector, coordinator);
+    }
   }
 
   /**
@@ -174,68 +205,149 @@ public abstract class OperatorImpl<M, RM> {
   }
 
   /**
-   * Populate the watermarks based on the following equations:
-   *
-   * <ul>
-   *   <li>InputWatermark(op) = min { OutputWatermark(op') | op1 is upstream of op}</li>
-   *   <li>OutputWatermark(op) = min { InputWatermark(op), OldestWorkTime(op) }</li>
-   * </ul>
-   *
-   * @param watermark incoming watermark
+   * Aggregate {@link EndOfStreamMessage} from each ssp of the stream.
+   * Invoke onEndOfStream() if the stream reaches the end.
+   * @param eos {@link EndOfStreamMessage} object
+   * @param ssp system stream partition
    * @param collector message collector
    * @param coordinator task coordinator
    */
-  public final void onWatermark(Watermark watermark,
-      MessageCollector collector,
+  public final void aggregateEndOfStream(EndOfStreamMessage eos, SystemStreamPartition ssp, MessageCollector collector,
       TaskCoordinator coordinator) {
+    LOG.info("Received end-of-stream message from task {} in {}", eos.getTaskName(), ssp);
+    eosStates.update(eos, ssp);
+
+    SystemStream stream = ssp.getSystemStream();
+    if (eosStates.isEndOfStream(stream)) {
+      LOG.info("Input {} reaches the end for task {}", stream.toString(), taskName.getTaskName());
+      onEndOfStream(collector, coordinator);
+
+      if (eosStates.allEndOfStream()) {
+        // all inputs have been end-of-stream, shut down the task
+        LOG.info("All input streams have reached the end for task {}", taskName.getTaskName());
+        coordinator.shutdown(TaskCoordinator.RequestScope.CURRENT_TASK);
+      }
+    }
+  }
+
+  /**
+   * Invoke handleEndOfStream() if all the input streams to the current operator reach the end.
+   * Propagate the end-of-stream to downstream operators.
+   * @param collector message collector
+   * @param coordinator task coordinator
+   */
+  private final void onEndOfStream(MessageCollector collector, TaskCoordinator coordinator) {
+    if (inputStreams.stream().allMatch(input -> eosStates.isEndOfStream(input))) {
+      handleEndOfStream(collector, coordinator);
+      this.registeredOperators.forEach(op -> op.onEndOfStream(collector, coordinator));
+    }
+  }
+
+  /**
+   * All input streams to this operator reach to the end.
+   * Inherited class should handle end-of-stream by overriding this function.
+   * By default noop implementation is for in-memory operator to handle the EOS. Output operator need to
+   * override this to actually propagate EOS over the wire.
+   * @param collector message collector
+   * @param coordinator task coordinator
+   */
+  protected void handleEndOfStream(MessageCollector collector, TaskCoordinator coordinator) {
+    //Do nothing by default
+  }
+
+  /**
+   * Aggregate the {@link WatermarkMessage} from each ssp into a watermark. Then call onWatermark() if
+   * a new watermark exits.
+   * @param watermarkMessage a {@link WatermarkMessage} object
+   * @param ssp {@link SystemStreamPartition} that the message is coming from.
+   * @param collector message collector
+   * @param coordinator task coordinator
+   */
+  public final void aggregateWatermark(WatermarkMessage watermarkMessage, SystemStreamPartition ssp,
+      MessageCollector collector, TaskCoordinator coordinator) {
+    LOG.debug("Received watermark {} from {}", watermarkMessage.getTimestamp(), ssp);
+    watermarkStates.update(watermarkMessage, ssp);
+    long watermark = watermarkStates.getWatermark(ssp.getSystemStream());
+    if (watermark != WatermarkStates.WATERMARK_NOT_EXIST) {
+      LOG.debug("Got watermark {} from stream {}", watermark, ssp.getSystemStream());
+      onWatermark(watermark, collector, coordinator);
+    }
+  }
+
+  /**
+   * A watermark comes from an upstream operator. This function decides whether we should update the
+   * input watermark based on the watermark time of all the previous operators, and then call handleWatermark()
+   * to let the inherited operator to act on it.
+   * @param watermark incoming watermark from an upstream operator
+   * @param collector message collector
+   * @param coordinator task coordinator
+   */
+  private final void onWatermark(long watermark, MessageCollector collector, TaskCoordinator coordinator) {
     final long inputWatermarkMin;
     if (prevOperators.isEmpty()) {
       // for input operator, use the watermark time coming from the source input
-      inputWatermarkMin = watermark.getTimestamp();
+      inputWatermarkMin = watermark;
     } else {
-      // InputWatermark(op) = min { OutputWatermark(op') | op1 is upstream of op}
-      inputWatermarkMin = prevOperators.stream().map(op -> op.getOutputWatermarkTime()).min(Long::compare).get();
+      // InputWatermark(op) = min { OutputWatermark(op') | op' is upstream of op}
+      inputWatermarkMin = prevOperators.stream().map(op -> op.getOutputWatermark()).min(Long::compare).get();
     }
 
-    if (inputWatermarkTime < inputWatermarkMin) {
+    if (inputWatermark < inputWatermarkMin) {
       // advance the watermark time of this operator
-      inputWatermarkTime = inputWatermarkMin;
-      Watermark inputWatermark = watermark.copyWithTimestamp(inputWatermarkTime);
-      long oldestWorkTime = handleWatermark(inputWatermark, collector, coordinator);
-
-      // OutputWatermark(op) = min { InputWatermark(op), OldestWorkTime(op) }
-      long outputWatermarkMin = Math.min(inputWatermarkTime, oldestWorkTime);
-      if (outputWatermarkTime < outputWatermarkMin) {
-        // populate the watermark to downstream
-        outputWatermarkTime = outputWatermarkMin;
-        Watermark outputWatermark = watermark.copyWithTimestamp(outputWatermarkTime);
+      inputWatermark = inputWatermarkMin;
+      LOG.trace("Advance input watermark to {} in operator {}", inputWatermark, getOperatorName());
+
+      final Long outputWm;
+      WatermarkFunction watermarkFn = getOperatorSpec().getWatermarkFn();
+      if (watermarkFn != null) {
+        // user-overrided watermark handling here
+        watermarkFn.processWatermark(inputWatermark);
+        outputWm = watermarkFn.getOutputWatermark();
+      } else {
+        // use samza-provided watermark handling
+        // default is to propagate the input watermark
+        outputWm = handleWatermark(inputWatermark, collector, coordinator);
+      }
+
+      propagateWatermark(outputWm, collector, coordinator);
+    }
+  }
+
+  private void propagateWatermark(Long outputWm, MessageCollector collector, TaskCoordinator coordinator) {
+    if (outputWm != null) {
+      if (outputWatermark < outputWm) {
+        // advance the watermark
+        outputWatermark = outputWm;
+        LOG.debug("Advance output watermark to {} in operator {}", outputWatermark, getOperatorName());
         this.registeredOperators.forEach(op -> op.onWatermark(outputWatermark, collector, coordinator));
+      } else if (outputWatermark > outputWm) {
+        LOG.warn("Ignore watermark {} that is smaller than the previous watermark {}.", outputWm, outputWatermark);
       }
     }
   }
 
   /**
-   * Returns the oldest time of the envelops that haven't been processed by this operator
-   * Default implementation of handling watermark, which returns the input watermark time
-   * @param inputWatermark input watermark
+   * Handling of the input watermark and returns the output watermark.
+   * In-memory operator can override this to fire event-time triggers. Output operators need to override it
+   * so it can propagate watermarks over the wire. By default it simply returns the input watermark.
+   * @param inputWatermark  input watermark
    * @param collector message collector
    * @param coordinator task coordinator
-   * @return time of oldest processing envelope
+   * @return output watermark, or null if the output watermark should not be updated.
    */
-  protected long handleWatermark(Watermark inputWatermark,
-      MessageCollector collector,
-      TaskCoordinator coordinator) {
-    return inputWatermark.getTimestamp();
+  protected Long handleWatermark(long inputWatermark, MessageCollector collector, TaskCoordinator coordinator) {
+    // Default is no handling. Simply pass on the input watermark as output.
+    return inputWatermark;
   }
 
-  /* package private */
-  long getInputWatermarkTime() {
-    return this.inputWatermarkTime;
+  /* package private for testing */
+  final long getInputWatermark() {
+    return this.inputWatermark;
   }
 
-  /* package private */
-  long getOutputWatermarkTime() {
-    return this.outputWatermarkTime;
+  /* package private for testing */
+  final long getOutputWatermark() {
+    return this.outputWatermark;
   }
 
   public void close() {