You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nemo.apache.org by jo...@apache.org on 2018/11/01 00:51:52 UTC

[incubator-nemo] branch master updated: [NEMO-232] Implement InputWatermarkManager (#137)

This is an automated email from the ASF dual-hosted git repository.

johnyangk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git


The following commit(s) were added to refs/heads/master by this push:
     new 7858051  [NEMO-232] Implement InputWatermarkManager (#137)
7858051 is described below

commit 78580514afd4fe6086ebcc4945a519844bf04962
Author: Taegeon Um <ta...@gmail.com>
AuthorDate: Thu Nov 1 09:51:48 2018 +0900

    [NEMO-232] Implement InputWatermarkManager (#137)
    
    JIRA: [NEMO-232: Implement InputWatermarkManager](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-232)
    
    **Major changes:**
    - Tracks the minimum watermark among multiple input streams by creating `InputWatermarkManager`
    - Emits watermarks from `OutputCollector` to `InputWatermarkManager`. Then `InputWatermarkManager` emits the minimum input watermarks to next operators
    
    **Minor changes to note:**
    - Create `NextOperatorInfo` to wrap edge index, OperatorVertex, and InputWatermarkManager.
    
    **Tests for the changes:**
    - Create `InputWatermarkManagerTest`
    - Add `testMultipleIncomingEdges` to `TaskExecutorTest`
    
    **Other comments:**
    -
    
    Closes #137
---
 pom.xml                                            |  16 +-
 .../runtime/executor/data/PipeManagerWorker.java   |   8 +-
 .../datatransfer/DataFetcherOutputCollector.java   |   1 +
 .../datatransfer/InputWatermarkManager.java        |  46 ++++
 .../datatransfer/MultiInputWatermarkManager.java   |  90 +++++++
 .../datatransfer/NextIntraTaskOperatorInfo.java    |  59 +++++
 .../OperatorVertexOutputCollector.java             |  35 ++-
 .../datatransfer/SingleInputWatermarkManager.java  |  46 ++++
 .../nemo/runtime/executor/task/TaskExecutor.java   |  73 +++++-
 .../datatransfer/InputWatermarkManagerTest.java    |  88 +++++++
 .../runtime/executor/task/TaskExecutorTest.java    | 274 +++++++++++++++------
 11 files changed, 623 insertions(+), 113 deletions(-)

diff --git a/pom.xml b/pom.xml
index 1675e9c..3a81df7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -235,17 +235,17 @@ under the License.
                 <artifactId>maven-javadoc-plugin</artifactId>
                 <version>3.0.0</version>
                 <configuration>
-                    <excludePackageNames>*.org.apache.nemo.runtime.common.comm</excludePackageNames>
-                    <outputDirectory>docs/apidocs</outputDirectory>
-                    <reportOutputDirectory>docs/apidocs</reportOutputDirectory>
+                  <excludePackageNames>*.org.apache.nemo.runtime.common.comm</excludePackageNames>
+                  <outputDirectory>docs/apidocs</outputDirectory>
+                  <reportOutputDirectory>docs/apidocs</reportOutputDirectory>
                 </configuration>
                 <executions>
                     <execution>
-                        <id>aggregate</id>
-                        <goals>
-                            <goal>aggregate</goal>
-                        </goals>
-                        <phase>site</phase>
+                      <id>aggregate</id>
+                      <goals>
+                          <goal>aggregate</goal>
+                      </goals>
+                      <phase>site</phase>
                     </execution>
                     <execution>
                       <id>test-javadoc</id>
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/PipeManagerWorker.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/PipeManagerWorker.java
index ffbbe56..a433f3a 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/PipeManagerWorker.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/PipeManagerWorker.java
@@ -141,8 +141,8 @@ public final class PipeManagerWorker {
   /**
    * (SYNCHRONIZATION) Called by task threads.
    *
-   * @param runtimeEdge
-   * @param srcTaskIndex
+   * @param runtimeEdge runtime edge
+   * @param srcTaskIndex source task index
    * @return output contexts.
    */
   public List<ByteOutputContext> getOutputContexts(final RuntimeEdge runtimeEdge,
@@ -163,8 +163,8 @@ public final class PipeManagerWorker {
   /**
    * (SYNCHRONIZATION) Called by network threads.
    *
-   * @param outputContext
-   * @throws InvalidProtocolBufferException
+   * @param outputContext output context
+   * @throws InvalidProtocolBufferException protobuf exception
    */
   public void onOutputContext(final ByteOutputContext outputContext) throws InvalidProtocolBufferException {
     final ControlMessage.PipeTransferContextDescriptor descriptor =
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/DataFetcherOutputCollector.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/DataFetcherOutputCollector.java
index 3f1bc90..56c7540 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/DataFetcherOutputCollector.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/DataFetcherOutputCollector.java
@@ -34,6 +34,7 @@ public final class DataFetcherOutputCollector<O> implements OutputCollector<O> {
 
   /**
    * It forwards output to the next operator.
+   * @param nextOperatorVertex next operator to emit data and watermark
    */
   public DataFetcherOutputCollector(final OperatorVertex nextOperatorVertex) {
     this.nextOperatorVertex = nextOperatorVertex;
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/InputWatermarkManager.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/InputWatermarkManager.java
new file mode 100644
index 0000000..66fb7aa
--- /dev/null
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/InputWatermarkManager.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.runtime.executor.datatransfer;
+
+import org.apache.nemo.common.punctuation.Watermark;
+
+
+/**
+ * An interface for tracking input watermarks among multiple input streams.
+ * --edge 1--&gt;
+ * --edge 2--&gt;  watermarkManager --(emitWatermark)--&gt; nextOperator
+ * --edge 3--&gt;
+ */
+public interface InputWatermarkManager {
+
+  /**
+   * This tracks the minimum input watermark among multiple input streams.
+   * Ex)
+   * -- input stream1 (edge 1):  ---------- ts: 3 ------------------ts: 6
+   *                                                                 ^^^
+   *                                                              emit ts: 4 (edge 2) watermark at this time
+   * -- input stream2 (edge 2):  ----------------- ts: 4------
+   *                                                 ^^^
+   *                                             emit ts: 3 (edge 1) watermark at this time
+   * -- input stream3 (edge 3):  ------- ts: 5 ---------------
+   * @param edgeIndex incoming edge index
+   * @param watermark watermark emitted from the edge
+   */
+  void trackAndEmitWatermarks(int edgeIndex, Watermark watermark);
+}
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/MultiInputWatermarkManager.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/MultiInputWatermarkManager.java
new file mode 100644
index 0000000..91c7c55
--- /dev/null
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/MultiInputWatermarkManager.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.runtime.executor.datatransfer;
+
+import org.apache.nemo.common.ir.vertex.OperatorVertex;
+import org.apache.nemo.common.punctuation.Watermark;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * This tracks the minimum input watermark among multiple input streams.
+ */
+public final class MultiInputWatermarkManager implements InputWatermarkManager {
+  private final List<Watermark> watermarks;
+  private final OperatorVertex nextOperator;
+  private int minWatermarkIndex;
+  public MultiInputWatermarkManager(final int numEdges,
+                                    final OperatorVertex nextOperator) {
+    super();
+    this.watermarks = new ArrayList<>(numEdges);
+    this.nextOperator = nextOperator;
+    this.minWatermarkIndex = 0;
+    // We initialize watermarks as min value because
+    // we should not emit watermark until all edges emit watermarks.
+    for (int i = 0; i < numEdges; i++) {
+      watermarks.add(new Watermark(Long.MIN_VALUE));
+    }
+  }
+
+  private int findNextMinWatermarkIndex() {
+    int index = -1;
+    long timestamp = Long.MAX_VALUE;
+    for (int i = 0; i < watermarks.size(); i++) {
+      if (watermarks.get(i).getTimestamp() < timestamp) {
+        index = i;
+        timestamp = watermarks.get(i).getTimestamp();
+      }
+    }
+    return index;
+  }
+
+  @Override
+  public void trackAndEmitWatermarks(final int edgeIndex, final Watermark watermark) {
+    if (edgeIndex == minWatermarkIndex) {
+      // update min watermark
+      final Watermark prevMinWatermark = watermarks.get(minWatermarkIndex);
+      watermarks.set(minWatermarkIndex, watermark);
+       // find min watermark
+      minWatermarkIndex = findNextMinWatermarkIndex();
+      final Watermark minWatermark = watermarks.get(minWatermarkIndex);
+
+      if (minWatermark.getTimestamp() < prevMinWatermark.getTimestamp()) {
+        throw new IllegalStateException(
+          "The current min watermark is ahead of prev min: " + minWatermark + ", " + prevMinWatermark);
+      }
+
+      if (minWatermark.getTimestamp() > prevMinWatermark.getTimestamp()) {
+        // Watermark timestamp progress!
+        // Emit the min watermark
+        nextOperator.getTransform().onWatermark(minWatermark);
+      }
+    } else {
+      // The recent watermark timestamp cannot be less than the previous one
+      // because watermark is monotonically increasing.
+      if (watermarks.get(edgeIndex).getTimestamp() > watermark.getTimestamp()) {
+        throw new IllegalStateException(
+          "The recent watermark timestamp cannot be less than the previous one "
+            + "because watermark is monotonically increasing.");
+      }
+      watermarks.set(edgeIndex, watermark);
+    }
+  }
+}
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/NextIntraTaskOperatorInfo.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/NextIntraTaskOperatorInfo.java
new file mode 100644
index 0000000..1525261
--- /dev/null
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/NextIntraTaskOperatorInfo.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.runtime.executor.datatransfer;
+
+import org.apache.nemo.common.ir.vertex.OperatorVertex;
+
+/**
+ * Contains information for next operator:
+ * -- edgeIndex: the index of edge to next operator.
+ * -- nextOperator: next operator vertex
+ * -- watermarkManager: next operator's watermark manager
+ *
+ * ex)
+ * --edge (index 0)--&gt;
+ * --edge (index 1)--&gt;  watermarkManager --&gt; nextOperator
+ * --edge (index 2)--&gt;
+ */
+public final class NextIntraTaskOperatorInfo {
+
+  private final int edgeIndex;
+  private final OperatorVertex nextOperator;
+  private final InputWatermarkManager watermarkManager;
+
+  public NextIntraTaskOperatorInfo(final int edgeIndex,
+                                   final OperatorVertex nextOperator,
+                                   final InputWatermarkManager watermarkManager) {
+    this.edgeIndex = edgeIndex;
+    this.nextOperator = nextOperator;
+    this.watermarkManager = watermarkManager;
+  }
+
+  public int getEdgeIndex() {
+    return edgeIndex;
+  }
+
+  public OperatorVertex getNextOperator() {
+    return nextOperator;
+  }
+
+  public InputWatermarkManager getWatermarkManager() {
+    return watermarkManager;
+  }
+}
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OperatorVertexOutputCollector.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OperatorVertexOutputCollector.java
index 598cc35..3637780 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OperatorVertexOutputCollector.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OperatorVertexOutputCollector.java
@@ -41,8 +41,8 @@ public final class OperatorVertexOutputCollector<O> implements OutputCollector<O
   private static final Logger LOG = LoggerFactory.getLogger(OperatorVertexOutputCollector.class.getName());
 
   private final IRVertex irVertex;
-  private final List<OperatorVertex> internalMainOutputs;
-  private final Map<String, List<OperatorVertex>> internalAdditionalOutputs;
+  private final List<NextIntraTaskOperatorInfo> internalMainOutputs;
+  private final Map<String, List<NextIntraTaskOperatorInfo>> internalAdditionalOutputs;
   private final List<OutputWriter> externalMainOutputs;
   private final Map<String, List<OutputWriter>> externalAdditionalOutputs;
 
@@ -54,11 +54,12 @@ public final class OperatorVertexOutputCollector<O> implements OutputCollector<O
    * @param externalMainOutputs external main outputs
    * @param externalAdditionalOutputs external additional outputs
    */
-  public OperatorVertexOutputCollector(final IRVertex irVertex,
-                                       final List<OperatorVertex> internalMainOutputs,
-                                       final Map<String, List<OperatorVertex>> internalAdditionalOutputs,
-                                       final List<OutputWriter> externalMainOutputs,
-                                       final Map<String, List<OutputWriter>> externalAdditionalOutputs) {
+  public OperatorVertexOutputCollector(
+    final IRVertex irVertex,
+    final List<NextIntraTaskOperatorInfo> internalMainOutputs,
+    final Map<String, List<NextIntraTaskOperatorInfo>> internalAdditionalOutputs,
+    final List<OutputWriter> externalMainOutputs,
+    final Map<String, List<OutputWriter>> externalAdditionalOutputs) {
     this.irVertex = irVertex;
     this.internalMainOutputs = internalMainOutputs;
     this.internalAdditionalOutputs = internalAdditionalOutputs;
@@ -76,8 +77,8 @@ public final class OperatorVertexOutputCollector<O> implements OutputCollector<O
 
   @Override
   public void emit(final O output) {
-    for (final OperatorVertex internalVertex : internalMainOutputs) {
-      emit(internalVertex, output);
+    for (final NextIntraTaskOperatorInfo internalVertex : internalMainOutputs) {
+      emit(internalVertex.getNextOperator(), output);
     }
 
     for (final OutputWriter externalWriter : externalMainOutputs) {
@@ -89,8 +90,8 @@ public final class OperatorVertexOutputCollector<O> implements OutputCollector<O
   public <T> void emit(final String dstVertexId, final T output) {
 
     if (internalAdditionalOutputs.containsKey(dstVertexId)) {
-      for (final OperatorVertex internalVertex : internalAdditionalOutputs.get(dstVertexId)) {
-        emit(internalVertex, (O) output);
+      for (final NextIntraTaskOperatorInfo internalVertex : internalAdditionalOutputs.get(dstVertexId)) {
+        emit(internalVertex.getNextOperator(), (O) output);
       }
     }
 
@@ -104,15 +105,13 @@ public final class OperatorVertexOutputCollector<O> implements OutputCollector<O
   @Override
   public void emitWatermark(final Watermark watermark) {
     // Emit watermarks to internal vertices
-    // TODO #232: Implement InputWatermarkManager
-    // TODO #232: We should emit the minimum watermark among multiple input streams of Transform.
-    for (final OperatorVertex internalVertex : internalMainOutputs) {
-      internalVertex.getTransform().onWatermark(watermark);
+    for (final NextIntraTaskOperatorInfo internalVertex : internalMainOutputs) {
+      internalVertex.getWatermarkManager().trackAndEmitWatermarks(internalVertex.getEdgeIndex(), watermark);
     }
 
-    for (final List<OperatorVertex> internalVertices : internalAdditionalOutputs.values()) {
-      for (final OperatorVertex internalVertex : internalVertices) {
-        internalVertex.getTransform().onWatermark(watermark);
+    for (final List<NextIntraTaskOperatorInfo> internalVertices : internalAdditionalOutputs.values()) {
+      for (final NextIntraTaskOperatorInfo internalVertex : internalVertices) {
+        internalVertex.getWatermarkManager().trackAndEmitWatermarks(internalVertex.getEdgeIndex(), watermark);
       }
     }
 
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/SingleInputWatermarkManager.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/SingleInputWatermarkManager.java
new file mode 100644
index 0000000..204bf22
--- /dev/null
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/SingleInputWatermarkManager.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.runtime.executor.datatransfer;
+
+import org.apache.nemo.common.ir.vertex.OperatorVertex;
+import org.apache.nemo.common.punctuation.Watermark;
+
+
+/**
+ * This is a special implementation for single input data stream for optimization.
+ */
+public final class SingleInputWatermarkManager implements InputWatermarkManager {
+
+  private final OperatorVertex nextOperator;
+
+  public SingleInputWatermarkManager(final OperatorVertex nextOperator) {
+    this.nextOperator = nextOperator;
+  }
+
+  /**
+   * This just forwards watermarks to the next operator because it has one data stream.
+   * @param edgeIndex edge index
+   * @param watermark watermark
+   */
+  @Override
+  public void trackAndEmitWatermarks(final int edgeIndex,
+                                     final Watermark watermark) {
+    nextOperator.getTransform().onWatermark(watermark);
+  }
+}
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java
index 1541792..8c92443 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java
@@ -28,6 +28,7 @@ import org.apache.nemo.common.ir.edge.executionproperty.BroadcastVariableIdPrope
 import org.apache.nemo.common.ir.vertex.*;
 import org.apache.nemo.common.ir.vertex.transform.AggregateMetricTransform;
 import org.apache.nemo.common.ir.vertex.transform.Transform;
+import org.apache.nemo.runtime.executor.datatransfer.MultiInputWatermarkManager;
 import org.apache.nemo.common.punctuation.Watermark;
 import org.apache.nemo.common.punctuation.Finishmark;
 import org.apache.nemo.runtime.common.RuntimeIdManager;
@@ -154,6 +155,37 @@ public final class TaskExecutor {
     // Traverse in a reverse-topological order to ensure that each visited vertex's children vertices exist.
     final List<IRVertex> reverseTopologicallySorted = Lists.reverse(irVertexDag.getTopologicalSort());
 
+    // Build a map for edge as a key and edge index as a value
+    // This variable is used for creating NextIntraTaskOperatorInfo
+    // in {@link this#getInternalMainOutputs and this#internalMainOutputs}
+    final Map<RuntimeEdge<IRVertex>, Integer> edgeIndexMap = new HashMap<>();
+    reverseTopologicallySorted.forEach(childVertex -> {
+      final List<RuntimeEdge<IRVertex>> edges = irVertexDag.getIncomingEdgesOf(childVertex);
+      for (int edgeIndex = 0; edgeIndex < edges.size(); edgeIndex++) {
+        final RuntimeEdge<IRVertex> edge = edges.get(edgeIndex);
+        edgeIndexMap.putIfAbsent(edge, edgeIndex);
+      }
+    });
+
+    // Build a map for InputWatermarkManager for each operator vertex
+    // This variable is used for creating NextIntraTaskOperatorInfo
+    // in {@link this#getInternalMainOutputs and this#internalMainOutputs}
+    final Map<IRVertex, InputWatermarkManager> operatorWatermarkManagerMap = new HashMap<>();
+    reverseTopologicallySorted.forEach(childVertex -> {
+
+      if (childVertex instanceof OperatorVertex) {
+        final List<RuntimeEdge<IRVertex>> edges = irVertexDag.getIncomingEdgesOf(childVertex);
+        if (edges.size() == 1) {
+          operatorWatermarkManagerMap.putIfAbsent(childVertex,
+            new SingleInputWatermarkManager((OperatorVertex) childVertex));
+        } else {
+          operatorWatermarkManagerMap.putIfAbsent(childVertex,
+            new MultiInputWatermarkManager(edges.size(), (OperatorVertex) childVertex));
+        }
+      }
+
+    });
+
     // Create a harness for each vertex
     final List<DataFetcher> nonBroadcastDataFetcherList = new ArrayList<>();
     final Map<String, VertexHarness> vertexIdToHarness = new HashMap<>();
@@ -165,13 +197,14 @@ public final class TaskExecutor {
       }
 
       // Additional outputs
-      final Map<String, List<OperatorVertex>> internalAdditionalOutputMap =
-        getInternalAdditionalOutputMap(irVertex, irVertexDag);
+      final Map<String, List<NextIntraTaskOperatorInfo>> internalAdditionalOutputMap =
+        getInternalAdditionalOutputMap(irVertex, irVertexDag, edgeIndexMap, operatorWatermarkManagerMap);
       final Map<String, List<OutputWriter>> externalAdditionalOutputMap =
         getExternalAdditionalOutputMap(irVertex, task.getTaskOutgoingEdges(), intermediateDataIOFactory);
 
       // Main outputs
-      final List<OperatorVertex> internalMainOutputs = getInternalMainOutputs(irVertex, irVertexDag);
+      final List<NextIntraTaskOperatorInfo> internalMainOutputs =
+        getInternalMainOutputs(irVertex, irVertexDag, edgeIndexMap, operatorWatermarkManagerMap);
       final List<OutputWriter> externalMainOutputs =
         getExternalMainOutputs(irVertex, task.getTaskOutgoingEdges(), intermediateDataIOFactory);
 
@@ -492,17 +525,25 @@ public final class TaskExecutor {
     return map;
   }
 
-  private Map<String, List<OperatorVertex>> getInternalAdditionalOutputMap(
+  // TODO #253: Refactor getInternal(Main/Additional)OutputMap
+  private Map<String, List<NextIntraTaskOperatorInfo>> getInternalAdditionalOutputMap(
     final IRVertex irVertex,
-    final DAG<IRVertex, RuntimeEdge<IRVertex>> irVertexDag) {
+    final DAG<IRVertex, RuntimeEdge<IRVertex>> irVertexDag,
+    final Map<RuntimeEdge<IRVertex>, Integer> edgeIndexMap,
+    final Map<IRVertex, InputWatermarkManager> operatorWatermarkManagerMap) {
     // Add all intra-task additional tags to additional output map.
-    final Map<String, List<OperatorVertex>> map = new HashMap<>();
+    final Map<String, List<NextIntraTaskOperatorInfo>> map = new HashMap<>();
 
     irVertexDag.getOutgoingEdgesOf(irVertex.getId())
       .stream()
       .filter(edge -> edge.getPropertyValue(AdditionalOutputTagProperty.class).isPresent())
-      .map(edge ->
-        Pair.of(edge.getPropertyValue(AdditionalOutputTagProperty.class).get(), (OperatorVertex) edge.getDst()))
+      .map(edge -> {
+          final String outputTag = edge.getPropertyValue(AdditionalOutputTagProperty.class).get();
+          final int index = edgeIndexMap.get(edge);
+          final OperatorVertex nextOperator = (OperatorVertex) edge.getDst();
+          final InputWatermarkManager inputWatermarkManager = operatorWatermarkManagerMap.get(nextOperator);
+          return Pair.of(outputTag, new NextIntraTaskOperatorInfo(index, nextOperator, inputWatermarkManager));
+        })
       .forEach(pair -> {
         map.putIfAbsent(pair.left(), new ArrayList<>());
         map.get(pair.left()).add(pair.right());
@@ -511,12 +552,22 @@ public final class TaskExecutor {
     return map;
   }
 
-  private List<OperatorVertex> getInternalMainOutputs(final IRVertex irVertex,
-                                                      final DAG<IRVertex, RuntimeEdge<IRVertex>> irVertexDag) {
+  // TODO #253: Refactor getInternal(Main/Additional)OutputMap
+  private List<NextIntraTaskOperatorInfo> getInternalMainOutputs(
+    final IRVertex irVertex,
+    final DAG<IRVertex, RuntimeEdge<IRVertex>> irVertexDag,
+    final Map<RuntimeEdge<IRVertex>, Integer> edgeIndexMap,
+    final Map<IRVertex, InputWatermarkManager> operatorWatermarkManagerMap) {
+
     return irVertexDag.getOutgoingEdgesOf(irVertex.getId())
       .stream()
       .filter(edge -> !edge.getPropertyValue(AdditionalOutputTagProperty.class).isPresent())
-      .map(edge -> (OperatorVertex) edge.getDst())
+      .map(edge -> {
+        final int index = edgeIndexMap.get(edge);
+        final OperatorVertex nextOperator = (OperatorVertex) edge.getDst();
+        final InputWatermarkManager inputWatermarkManager = operatorWatermarkManagerMap.get(nextOperator);
+        return new NextIntraTaskOperatorInfo(index, nextOperator, inputWatermarkManager);
+      })
       .collect(Collectors.toList());
   }
 
diff --git a/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/datatransfer/InputWatermarkManagerTest.java b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/datatransfer/InputWatermarkManagerTest.java
new file mode 100644
index 0000000..9303da8
--- /dev/null
+++ b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/datatransfer/InputWatermarkManagerTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.runtime.executor.datatransfer;
+import org.apache.nemo.common.ir.vertex.OperatorVertex;
+import org.apache.nemo.common.ir.vertex.transform.Transform;
+import org.apache.nemo.common.punctuation.Watermark;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import java.util.LinkedList;
+import java.util.List;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.junit.Assert.assertEquals;
+
+public final class InputWatermarkManagerTest {
+
+  @Test
+  public void test() {
+    final List<Watermark> emittedWatermarks = new LinkedList<>();
+    final Transform transform = mock(Transform.class);
+    doAnswer(new Answer<Void>() {
+      @Override
+      public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
+        final Watermark watermark = invocationOnMock.getArgument(0);
+        emittedWatermarks.add(watermark);
+        return null;
+      }
+    }).when(transform).onWatermark(any(Watermark.class));
+
+    final OperatorVertex operatorVertex = new OperatorVertex(transform);
+    final InputWatermarkManager watermarkManager =
+      new MultiInputWatermarkManager(3, operatorVertex);
+
+    //edge1: 10 s
+    //edge2: 5 s
+    //edge3: 8 s
+    //current min watermark: 5 s
+    watermarkManager.trackAndEmitWatermarks(0, new Watermark(10));
+    assertEquals(0, emittedWatermarks.size());
+    watermarkManager.trackAndEmitWatermarks(1, new Watermark(5));
+    assertEquals(0, emittedWatermarks.size());
+    watermarkManager.trackAndEmitWatermarks(2, new Watermark(8));
+    assertEquals(5, emittedWatermarks.get(0).getTimestamp());
+    emittedWatermarks.clear();
+
+    //edge1: 13
+    //edge2: 9
+    //edge3: 8
+    //current min watermark: 8
+    watermarkManager.trackAndEmitWatermarks(0, new Watermark(13));
+    assertEquals(0, emittedWatermarks.size());
+    watermarkManager.trackAndEmitWatermarks(1, new Watermark(9));
+    assertEquals(8, emittedWatermarks.get(0).getTimestamp());
+    emittedWatermarks.clear();
+
+    //edge1: 13
+    //edge2: 15
+    //edge3: 8
+    //current min watermark: 8
+    watermarkManager.trackAndEmitWatermarks(1, new Watermark(15));
+    assertEquals(0, emittedWatermarks.size());
+
+    //edge1: 13
+    //edge2: 15
+    //edge3: 17
+    //current min watermark: 13
+    watermarkManager.trackAndEmitWatermarks(2, new Watermark(17));
+    assertEquals(13, emittedWatermarks.get(0).getTimestamp());
+  }
+}
diff --git a/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/TaskExecutorTest.java b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/TaskExecutorTest.java
index 8233663..e05bbfb 100644
--- a/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/TaskExecutorTest.java
+++ b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/TaskExecutorTest.java
@@ -61,7 +61,10 @@ import org.powermock.modules.junit4.PowerMockRunner;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.*;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
@@ -192,78 +195,11 @@ public final class TaskExecutorTest {
    */
   @Test()
   public void testUnboundedSourceVertexDataFetching() throws Exception {
-    final IRVertex sourceIRVertex = new SourceVertex() {
-      @Override
-      public IRVertex getClone() {
-        return this;
-      }
-
-      @Override
-      public boolean isBounded() {
-        return false;
-      }
-
-      @Override
-      public List<Readable> getReadables(int desiredNumOfSplits) throws Exception {
-        return null;
-      }
-
-      @Override
-      public void clearInternalStates() {
-
-      }
-    };
-
-    final long watermark = 1234567L;
-
-    final Readable readable = new Readable() {
-      int pointer = 0;
-      final int middle = elements.size() / 2;
-      final int end = elements.size();
-      boolean watermarkEmitted = false;
-
-      @Override
-      public void prepare() {
-
-      }
-
-      // This emulates unbounded source that throws NoSuchElementException
-      // It reads current data until middle point and  throws NoSuchElementException at the middle point.
-      // It resumes the data reading after emitting a watermark, and finishes at the end of the data.
-      @Override
-      public Object readCurrent() throws NoSuchElementException {
-        if (pointer == middle && !watermarkEmitted) {
-          throw new NoSuchElementException();
-        }
-
-        return elements.get(pointer);
-      }
-
-      @Override
-      public void advance() throws IOException {
-        pointer += 1;
-      }
-
-      @Override
-      public long readWatermark() {
-        watermarkEmitted = true;
-        return watermark;
-      }
-
-      @Override
-      public boolean isFinished() {
-        return pointer == end;
-      }
-
-      @Override
-      public List<String> getLocations() throws Exception {
-        return null;
-      }
-
-      @Override
-      public void close() throws IOException {
-      }
-    };
+    final IRVertex sourceIRVertex = new TestUnboundedSourceVertex();
+    final Long watermark = 1234567L;
+    final BlockingQueue<Long> watermarkQueue = new LinkedBlockingQueue<>();
+    watermarkQueue.add(watermark);
+    final Readable readable = new TestUnboundedSourceReadable(watermarkQueue, 1);
 
     final Map<String, Readable> vertexIdToReadable = new HashMap<>();
     vertexIdToReadable.put(sourceIRVertex.getId(), readable);
@@ -330,6 +266,108 @@ public final class TaskExecutorTest {
     assertTrue(checkEqualElements(elements, runtimeEdgeToOutputData.get(taskOutEdge.getId())));
   }
 
+  private void waitUntilWatermarkEmitted(final Queue<Long> watermarkQueue) {
+    while (!watermarkQueue.isEmpty()) {
+      try {
+        Thread.sleep(10);
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      }
+    }
+  }
+
+  /**
+   * The DAG of the task to test will looks like:
+   * source1 -> vertex1 -> vertex2
+   * source2 -> vertex3 ->
+   *
+   * The vertex2 has two incoming edges (from vertex1 and vertex3)
+   * and we test if TaskExecutor handles data and watermarks correctly in this situation.
+   *
+   * source1 emits watermarks:     500 (ts)  600 (ts)   1400 (ts)  1800 (ts)        2500 (ts)
+   * source2 emits watermarks:  1000(ts)                                     2200 (ts)
+   *
+   * The vertex2 should receive and emits watermarks 500, 600, 1000, 1800, and 2200
+   */
+  @Test()
+  public void testMultipleIncomingEdges() throws Exception {
+    final List<Watermark> emittedWatermarks = new ArrayList<>();
+    final IRVertex operatorIRVertex1 = new OperatorVertex(new RelayTransform());
+    final IRVertex operatorIRVertex2 = new OperatorVertex(new RelayTransformNoWatermarkEmit(emittedWatermarks));
+    final IRVertex operatorIRVertex3 = new OperatorVertex(new RelayTransform());
+
+    final IRVertex sourceIRVertex1 = new TestUnboundedSourceVertex();
+    final IRVertex sourceIRVertex2 = new TestUnboundedSourceVertex();
+
+    final Queue<Long> watermarks1 = new ConcurrentLinkedQueue<>();
+    watermarks1.add(500L);
+    final Queue<Long> watermarks2 = new ConcurrentLinkedQueue<>();
+    watermarks2.add(1000L);
+    final Readable readable1 = new TestUnboundedSourceReadable(watermarks1, 5);
+    final Readable readable2 = new TestUnboundedSourceReadable(watermarks2, 2);
+
+    final Map<String, Readable> vertexIdToReadable = new HashMap<>();
+    vertexIdToReadable.put(sourceIRVertex1.getId(), readable1);
+    vertexIdToReadable.put(sourceIRVertex2.getId(), readable2);
+
+    final DAG<IRVertex, RuntimeEdge<IRVertex>> taskDag =
+      new DAGBuilder<IRVertex, RuntimeEdge<IRVertex>>()
+        .addVertex(sourceIRVertex1)
+        .addVertex(sourceIRVertex2)
+        .addVertex(operatorIRVertex1)
+        .addVertex(operatorIRVertex2)
+        .addVertex(operatorIRVertex3)
+        .connectVertices(createEdge(sourceIRVertex1, operatorIRVertex1, "edge1"))
+        .connectVertices(createEdge(operatorIRVertex1, operatorIRVertex2, "edge2"))
+        .connectVertices(createEdge(sourceIRVertex2, operatorIRVertex3, "edge3"))
+        .connectVertices(createEdge(operatorIRVertex3, operatorIRVertex2, "edge4"))
+        .buildWithoutSourceSinkCheck();
+
+    final StageEdge taskOutEdge = mockStageEdgeFrom(operatorIRVertex2);
+    final Task task =
+      new Task(
+        "testSourceVertexDataFetching",
+        generateTaskId(),
+        TASK_EXECUTION_PROPERTY_MAP,
+        new byte[0],
+        Collections.emptyList(),
+        Collections.singletonList(taskOutEdge),
+        vertexIdToReadable);
+
+    // Execute the task.
+    final Thread watermarkEmitThread = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        waitUntilWatermarkEmitted(watermarks2);
+        watermarks1.add(600L);
+        watermarks1.add(1400L);
+        watermarks1.add(1800L);
+        waitUntilWatermarkEmitted(watermarks1);
+        watermarks2.add(2200L);
+        waitUntilWatermarkEmitted(watermarks2);
+        watermarks1.add(2500L);
+        waitUntilWatermarkEmitted(watermarks1);
+      }
+    });
+
+    watermarkEmitThread.start();
+    final TaskExecutor taskExecutor = getTaskExecutor(task, taskDag);
+    taskExecutor.execute();
+
+    watermarkEmitThread.join();
+
+    // Check whether the watermark is emitted
+    assertEquals(Arrays.asList(
+      new Watermark(500), new Watermark(600), new Watermark(1000),
+      new Watermark(1800), new Watermark(2200)), emittedWatermarks);
+
+    // Check the output.
+    final List<Integer> doubledElements = new ArrayList<>(elements.size()*2);
+    doubledElements.addAll(elements);
+    doubledElements.addAll(elements);
+    assertTrue(checkEqualElements(doubledElements, runtimeEdgeToOutputData.get(taskOutEdge.getId())));
+  }
+
   /**
    * The DAG of the task to test will looks like:
    * parent task -> task (vertex 1 -> task 2) -> child task
@@ -601,6 +639,98 @@ public final class TaskExecutorTest {
     }
   }
 
+  /**
+   * Source vertex for unbounded source test.
+   */
+  private final class TestUnboundedSourceVertex extends SourceVertex {
+
+    @Override
+    public boolean isBounded() {
+      return false;
+    }
+
+    @Override
+    public List<Readable> getReadables(int desiredNumOfSplits) throws Exception {
+      return null;
+    }
+
+    @Override
+    public void clearInternalStates() {
+
+    }
+
+    @Override
+    public IRVertex getClone() {
+      return null;
+    }
+  }
+
+
+  // This emulates unbounded source that throws NoSuchElementException
+  // It reads current data until middle point and throws NoSuchElementException at the middle point.
+  // It resumes the data reading after emitting a watermark, and finishes at the end of the data.
+  private final class TestUnboundedSourceReadable implements Readable {
+    int pointer = 0;
+    final int middle = elements.size() / 2;
+    final int end = elements.size();
+    final Queue<Long> watermarks;
+    int numEmittedWatermarks = 0;
+    final int expectedNumWatermarks;
+    long currWatermark = -1;
+
+    public TestUnboundedSourceReadable(final Queue<Long> watermarks,
+                                       final int expectedNumWatermarks) {
+      this.watermarks = watermarks;
+      this.expectedNumWatermarks = expectedNumWatermarks;
+    }
+
+    @Override
+    public void prepare() {
+
+    }
+
+    @Override
+    public Object readCurrent() throws NoSuchElementException {
+      if (pointer == middle && numEmittedWatermarks < expectedNumWatermarks) {
+        throw new NoSuchElementException();
+      }
+      return elements.get(pointer);
+    }
+
+    @Override
+    public void advance() throws IOException {
+      pointer += 1;
+    }
+
+    @Override
+    public long readWatermark() {
+      if (numEmittedWatermarks >= expectedNumWatermarks) {
+        return Long.MAX_VALUE;
+      }
+
+      final Long watermark = watermarks.poll();
+      if (watermark == null) {
+        return currWatermark;
+      }
+      currWatermark = watermark;
+      numEmittedWatermarks += 1;
+      return watermark;
+    }
+
+    @Override
+    public boolean isFinished() {
+      return pointer == end;
+    }
+
+    @Override
+    public List<String> getLocations() throws Exception {
+      return null;
+    }
+
+    @Override
+    public void close() throws IOException {
+    }
+  }
 
   /**
    * Simple identity function for testing.