You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nemo.apache.org by jo...@apache.org on 2018/11/13 04:30:42 UTC

[incubator-nemo] branch master updated: [NEMO-245, 247] Handle watermark in OutputWriter and Implement unbounded word count example (#153)

This is an automated email from the ASF dual-hosted git repository.

johnyangk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git


The following commit(s) were added to refs/heads/master by this push:
     new dc3519f  [NEMO-245,247] Handle watermark in OutputWriter and Implement unbounded word count example (#153)
dc3519f is described below

commit dc3519f55a7668cefc6b19a3606bf47f81bda9f3
Author: Taegeon Um <ta...@gmail.com>
AuthorDate: Tue Nov 13 13:30:36 2018 +0900

    [NEMO-245,247] Handle watermark in OutputWriter and Implement unbounded word count example (#153)
    
    JIRA: [NEMO-245: Handle watermark in OutputWriter](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-245)
    
    [NEMO-247: UnboundedSource WindowedWordCount ITCase](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-247)
    
    **Major changes:**
    - Add `NemoEventDecoder/Encoder` to encode/decode watermarks
    - Handle inter-task watermarks at `MultiThreadParentDataFetcher`
    - Modify `OutputWriter` interface to emit watermark (`emitWatermark`)
    - Refactor `WindowedWordCountExample` to run this example with unbounded source
    
    
    **Tests for the changes:**
    - unbounded windowed word count (but disabled because we cannot force close nemo job in test)
---
 .../nemo/common/coder/BytesDecoderFactory.java     |  6 +-
 .../nemo/common/coder/BytesEncoderFactory.java     |  4 +
 .../ir/edge/executionproperty/DecoderProperty.java |  1 +
 .../ir/edge/executionproperty/EncoderProperty.java |  1 +
 .../common/ir/vertex/transform/RelayTransform.java |  3 +
 .../frontend/beam/coder/BeamDecoderFactory.java    |  5 ++
 .../frontend/beam/coder/BeamEncoderFactory.java    |  3 +
 .../beam/transform/AbstractDoFnTransform.java      |  6 +-
 .../frontend/beam/transform/DoFnTransform.java     |  3 +
 .../reshaping/LargeShuffleRelayReshapingPass.java  |  6 +-
 .../GroupByKeyAndWindowDoFnTransformTest.java      |  2 -
 .../nemo/examples/beam/WindowedWordCount.java      | 99 ++++++++++++++--------
 .../examples/beam/WindowedWordCountITCase.java     | 32 ++++++-
 .../org/apache/nemo/runtime/executor/Executor.java | 49 +++++++++--
 .../executor/bytetransfer/ByteOutputContext.java   |  3 +
 .../nemo/runtime/executor/data/DataUtil.java       |  2 +
 .../data/partition/SerializedPartition.java        |  1 +
 .../executor/datatransfer/BlockOutputWriter.java   | 10 +++
 .../datatransfer/DataFetcherOutputCollector.java   | 10 ++-
 .../datatransfer/InputWatermarkManager.java        |  2 +
 .../datatransfer/MultiInputWatermarkManager.java   | 23 +++--
 .../datatransfer/NemoEventDecoderFactory.java      | 93 ++++++++++++++++++++
 .../datatransfer/NemoEventEncoderFactory.java      | 72 ++++++++++++++++
 .../OperatorVertexOutputCollector.java             | 18 +++-
 ...lector.java => OperatorWatermarkCollector.java} | 30 +++----
 .../executor/datatransfer/OutputWriter.java        |  7 ++
 .../executor/datatransfer/PipeOutputWriter.java    | 52 ++++++++----
 .../datatransfer/SingleInputWatermarkManager.java  | 10 +--
 ...termarkManager.java => WatermarkWithIndex.java} | 36 +++++---
 .../task/MultiThreadParentTaskDataFetcher.java     | 52 +++++++++++-
 .../executor/task/SourceVertexDataFetcher.java     |  4 +
 .../nemo/runtime/executor/task/TaskExecutor.java   | 72 +++++++++++-----
 .../datatransfer/InputWatermarkManagerTest.java    |  2 +-
 .../master/resource/ExecutorRepresenter.java       | 26 +++---
 34 files changed, 597 insertions(+), 148 deletions(-)

diff --git a/common/src/main/java/org/apache/nemo/common/coder/BytesDecoderFactory.java b/common/src/main/java/org/apache/nemo/common/coder/BytesDecoderFactory.java
index 279f7c5..1bb8185 100644
--- a/common/src/main/java/org/apache/nemo/common/coder/BytesDecoderFactory.java
+++ b/common/src/main/java/org/apache/nemo/common/coder/BytesDecoderFactory.java
@@ -19,7 +19,10 @@
 package org.apache.nemo.common.coder;
 
 import org.apache.nemo.common.DirectByteArrayOutputStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
 
@@ -27,6 +30,7 @@ import java.io.InputStream;
  * A {@link DecoderFactory} which is used for an array of bytes.
  */
 public final class BytesDecoderFactory implements DecoderFactory<byte[]> {
+  private static final Logger LOG = LoggerFactory.getLogger(BytesDecoderFactory.class.getName());
 
   private static final BytesDecoderFactory BYTES_DECODER_FACTORY = new BytesDecoderFactory();
 
@@ -84,7 +88,7 @@ public final class BytesDecoderFactory implements DecoderFactory<byte[]> {
           returnedArray = true;
           return new byte[0];
         } else {
-          throw new IOException("EoF (empty partition)!"); // TODO #120: use EOF exception instead of IOException.
+          throw new EOFException("EoF (empty partition)!"); // TODO #120: use EOF exception instead of IOException.
         }
       }
       final byte[] resultBytes = new byte[lengthToRead]; // Read the size of this byte array.
diff --git a/common/src/main/java/org/apache/nemo/common/coder/BytesEncoderFactory.java b/common/src/main/java/org/apache/nemo/common/coder/BytesEncoderFactory.java
index 140f5ea..3a5af26 100644
--- a/common/src/main/java/org/apache/nemo/common/coder/BytesEncoderFactory.java
+++ b/common/src/main/java/org/apache/nemo/common/coder/BytesEncoderFactory.java
@@ -18,12 +18,16 @@
  */
 package org.apache.nemo.common.coder;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.*;
 
 /**
  * A {@link EncoderFactory} which is used for an array of bytes.
  */
 public final class BytesEncoderFactory implements EncoderFactory<byte[]> {
+  private static final Logger LOG = LoggerFactory.getLogger(BytesEncoderFactory.class.getName());
 
   private static final BytesEncoderFactory BYTES_ENCODER_FACTORY = new BytesEncoderFactory();
 
diff --git a/common/src/main/java/org/apache/nemo/common/ir/edge/executionproperty/DecoderProperty.java b/common/src/main/java/org/apache/nemo/common/ir/edge/executionproperty/DecoderProperty.java
index bf6e908..32406c9 100644
--- a/common/src/main/java/org/apache/nemo/common/ir/edge/executionproperty/DecoderProperty.java
+++ b/common/src/main/java/org/apache/nemo/common/ir/edge/executionproperty/DecoderProperty.java
@@ -23,6 +23,7 @@ import org.apache.nemo.common.ir.executionproperty.EdgeExecutionProperty;
 
 /**
  * Decoder ExecutionProperty.
+ * TODO #276: Add NoCoder property value in Encoder/DecoderProperty
  */
 public final class DecoderProperty extends EdgeExecutionProperty<DecoderFactory> {
   /**
diff --git a/common/src/main/java/org/apache/nemo/common/ir/edge/executionproperty/EncoderProperty.java b/common/src/main/java/org/apache/nemo/common/ir/edge/executionproperty/EncoderProperty.java
index cf931fc..8e6385d 100644
--- a/common/src/main/java/org/apache/nemo/common/ir/edge/executionproperty/EncoderProperty.java
+++ b/common/src/main/java/org/apache/nemo/common/ir/edge/executionproperty/EncoderProperty.java
@@ -23,6 +23,7 @@ import org.apache.nemo.common.ir.executionproperty.EdgeExecutionProperty;
 
 /**
  * EncoderFactory ExecutionProperty.
+ * TODO #276: Add NoCoder property value in Encoder/DecoderProperty
  */
 public final class EncoderProperty extends EdgeExecutionProperty<EncoderFactory> {
   /**
diff --git a/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/RelayTransform.java b/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/RelayTransform.java
index b0dbe54..cd713d3 100644
--- a/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/RelayTransform.java
+++ b/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/RelayTransform.java
@@ -20,6 +20,8 @@ package org.apache.nemo.common.ir.vertex.transform;
 
 import org.apache.nemo.common.ir.OutputCollector;
 import org.apache.nemo.common.punctuation.Watermark;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * A {@link Transform} relays input data from upstream vertex to downstream vertex promptly.
@@ -28,6 +30,7 @@ import org.apache.nemo.common.punctuation.Watermark;
  */
 public final class RelayTransform<T> implements Transform<T, T> {
   private OutputCollector<T> outputCollector;
+  private static final Logger LOG = LoggerFactory.getLogger(RelayTransform.class.getName());
 
   /**
    * Default constructor.
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/coder/BeamDecoderFactory.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/coder/BeamDecoderFactory.java
index 4cfefe8..c1ff6f0 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/coder/BeamDecoderFactory.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/coder/BeamDecoderFactory.java
@@ -119,6 +119,11 @@ public final class BeamDecoderFactory<T> implements DecoderFactory<T> {
     public T2 decode() throws IOException {
       return decodeInternal();
     }
+
+    @Override
+    public String toString() {
+      return "BeamDecoder: {" + beamCoder.toString() + "}";
+    }
   }
 
   /**
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/coder/BeamEncoderFactory.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/coder/BeamEncoderFactory.java
index e46f5b0..090c24b 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/coder/BeamEncoderFactory.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/coder/BeamEncoderFactory.java
@@ -22,6 +22,8 @@ import org.apache.nemo.common.coder.EncoderFactory;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.VoidCoder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.OutputStream;
@@ -31,6 +33,7 @@ import java.io.OutputStream;
  * @param <T> the type of element to encode.
  */
 public final class BeamEncoderFactory<T> implements EncoderFactory<T> {
+  private static final Logger LOG = LoggerFactory.getLogger(BeamEncoderFactory.class.getName());
 
   private final Coder<T> beamCoder;
 
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/AbstractDoFnTransform.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/AbstractDoFnTransform.java
index 6a8f8d4..dd5ca35 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/AbstractDoFnTransform.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/AbstractDoFnTransform.java
@@ -65,9 +65,9 @@ public abstract class AbstractDoFnTransform<InputT, InterT, OutputT> implements
   private transient DoFnInvoker<InterT, OutputT> doFnInvoker;
   private transient DoFnRunners.OutputManager outputManager;
 
-  // For bundle
-  // we consider count and time millis for start/finish bundle
-  // if # of processed elements > bundleSize
+  // Variables for bundle.
+  // We consider count and time millis for start/finish bundle.
+  // If # of processed elements > bundleSize
   // or elapsed time > bundleMillis, we finish the current bundle and start a new one
   private transient long bundleSize;
   private transient long bundleMillis;
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransform.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransform.java
index 18368c6..9f7a4e0 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransform.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransform.java
@@ -27,6 +27,8 @@ import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.WindowingStrategy;
 import org.apache.nemo.common.ir.OutputCollector;
 import org.apache.nemo.common.punctuation.Watermark;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.Collection;
 import java.util.List;
@@ -39,6 +41,7 @@ import java.util.Map;
  * @param <OutputT> output type.
  */
 public final class DoFnTransform<InputT, OutputT> extends AbstractDoFnTransform<InputT, InputT, OutputT> {
+  private static final Logger LOG = LoggerFactory.getLogger(DoFnTransform.class.getName());
 
   /**
    * DoFnTransform Constructor.
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LargeShuffleRelayReshapingPass.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LargeShuffleRelayReshapingPass.java
index 8499225..c34615e 100644
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LargeShuffleRelayReshapingPass.java
+++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LargeShuffleRelayReshapingPass.java
@@ -53,11 +53,11 @@ public final class LargeShuffleRelayReshapingPass extends ReshapingPass {
       // We care about OperatorVertices that have any incoming edge that
       // has Shuffle as data communication pattern.
       if (v instanceof OperatorVertex && dag.getIncomingEdgesOf(v).stream().anyMatch(irEdge ->
-              CommunicationPatternProperty.Value.Shuffle
+        CommunicationPatternProperty.Value.Shuffle
           .equals(irEdge.getPropertyValue(CommunicationPatternProperty.class).get()))) {
         dag.getIncomingEdgesOf(v).forEach(edge -> {
           if (CommunicationPatternProperty.Value.Shuffle
-                .equals(edge.getPropertyValue(CommunicationPatternProperty.class).get())) {
+            .equals(edge.getPropertyValue(CommunicationPatternProperty.class).get())) {
             // Insert a merger vertex having transform that write received data immediately
             // before the vertex receiving shuffled data.
             final OperatorVertex iFileMergerVertex = new OperatorVertex(new RelayTransform());
@@ -67,7 +67,7 @@ public final class LargeShuffleRelayReshapingPass extends ReshapingPass {
               new IREdge(CommunicationPatternProperty.Value.Shuffle, edge.getSrc(), iFileMergerVertex);
             edge.copyExecutionPropertiesTo(newEdgeToMerger);
             final IREdge newEdgeFromMerger = new IREdge(CommunicationPatternProperty.Value.OneToOne,
-                iFileMergerVertex, v);
+              iFileMergerVertex, v);
             newEdgeFromMerger.setProperty(EncoderProperty.of(edge.getPropertyValue(EncoderProperty.class).get()));
             newEdgeFromMerger.setProperty(DecoderProperty.of(edge.getPropertyValue(DecoderProperty.class).get()));
             builder.connectVertices(newEdgeToMerger);
diff --git a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransformTest.java b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransformTest.java
index e3fa23e..f9a44ec 100644
--- a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransformTest.java
+++ b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransformTest.java
@@ -26,11 +26,9 @@ import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.WindowingStrategy;
-import org.apache.nemo.common.ir.OutputCollector;
 import org.apache.nemo.common.ir.vertex.transform.Transform;
 import org.apache.nemo.common.punctuation.Watermark;
 import org.apache.nemo.compiler.frontend.beam.NemoPipelineOptions;
-import org.apache.reef.io.Tuple;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.junit.Test;
diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/WindowedWordCount.java b/examples/beam/src/main/java/org/apache/nemo/examples/beam/WindowedWordCount.java
index 5353763..d7f8c85 100644
--- a/examples/beam/src/main/java/org/apache/nemo/examples/beam/WindowedWordCount.java
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/WindowedWordCount.java
@@ -19,6 +19,7 @@
 package org.apache.nemo.examples.beam;
 
 import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.GenerateSequence;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.*;
@@ -26,6 +27,7 @@ import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
 import org.apache.nemo.compiler.frontend.beam.NemoPipelineOptions;
 import org.apache.nemo.compiler.frontend.beam.NemoPipelineRunner;
 import org.joda.time.Duration;
@@ -41,19 +43,65 @@ public final class WindowedWordCount {
   private WindowedWordCount() {
   }
 
+  public static final String INPUT_TYPE_BOUNDED = "bounded";
+  public static final String INPUT_TYPE_UNBOUNDED = "unbounded";
+  private static final String SPLITTER = "!";
+
+
+  private static PCollection<KV<String, Long>> getSource(
+    final Pipeline p,
+    final String[] args) {
+
+    final String inputType = args[2];
+    if (inputType.compareTo(INPUT_TYPE_BOUNDED) == 0) {
+      final String inputFilePath = args[3];
+      return GenericSourceSink.read(p, inputFilePath)
+        .apply(ParDo.of(new DoFn<String, String>() {
+          @ProcessElement
+          public void processElement(@Element final String elem,
+                                     final OutputReceiver<String> out) {
+            final String[] splitt = elem.split(SPLITTER);
+            out.outputWithTimestamp(splitt[0], new Instant(Long.valueOf(splitt[1])));
+          }
+        }))
+        .apply(MapElements.<String, KV<String, Long>>via(new SimpleFunction<String, KV<String, Long>>() {
+          @Override
+          public KV<String, Long> apply(final String line) {
+            final String[] words = line.split(" +");
+            final String documentId = words[0] + "#" + words[1];
+            final Long count = Long.parseLong(words[2]);
+            return KV.of(documentId, count);
+          }
+        }));
+    } else if (inputType.compareTo(INPUT_TYPE_UNBOUNDED) == 0) {
+      // unbounded
+      return p.apply(GenerateSequence
+        .from(1)
+        .withRate(2, Duration.standardSeconds(1))
+        .withTimestampFn(num -> new Instant(num * 500)))
+        .apply(MapElements.via(new SimpleFunction<Long, KV<String, Long>>() {
+          @Override
+          public KV<String, Long> apply(final Long val) {
+            return KV.of(String.valueOf(val % 2), 1L);
+          }
+        }));
+    } else {
+      throw new RuntimeException("Unsupported input type: " + inputType);
+    }
+  }
   /**
    * Main function for the MR BEAM program.
    * @param args arguments.
    */
   public static void main(final String[] args) {
-    final String inputFilePath = args[0];
-    final String outputFilePath = args[1];
-    final String windowType = args[2];
-    final Window<String> windowFn;
+    final String outputFilePath = args[0];
+    final String windowType = args[1];
+
+    final Window<KV<String, Long>> windowFn;
     if (windowType.equals("fixed")) {
-      windowFn = Window.<String>into(FixedWindows.of(Duration.standardSeconds(5)));
+      windowFn = Window.<KV<String, Long>>into(FixedWindows.of(Duration.standardSeconds(5)));
     } else {
-      windowFn = Window.<String>into(SlidingWindows.of(Duration.standardSeconds(10))
+      windowFn = Window.<KV<String, Long>>into(SlidingWindows.of(Duration.standardSeconds(10))
         .every(Duration.standardSeconds(5)));
     }
 
@@ -62,33 +110,18 @@ public final class WindowedWordCount {
     options.setJobName("WindowedWordCount");
 
     final Pipeline p = Pipeline.create(options);
-    GenericSourceSink.read(p, inputFilePath)
-        .apply(ParDo.of(new DoFn<String, String>() {
-            @ProcessElement
-            public void processElement(@Element final String elem,
-                                       final OutputReceiver<String> out) {
-              final String[] splitt = elem.split("!");
-              out.outputWithTimestamp(splitt[0], new Instant(Long.valueOf(splitt[1])));
-            }
-        }))
-        .apply(windowFn)
-        .apply(MapElements.<String, KV<String, Long>>via(new SimpleFunction<String, KV<String, Long>>() {
-          @Override
-          public KV<String, Long> apply(final String line) {
-            final String[] words = line.split(" +");
-            final String documentId = words[0] + "#" + words[1];
-            final Long count = Long.parseLong(words[2]);
-            return KV.of(documentId, count);
-          }
-        }))
-        .apply(Sum.longsPerKey())
-        .apply(MapElements.<KV<String, Long>, String>via(new SimpleFunction<KV<String, Long>, String>() {
-          @Override
-          public String apply(final KV<String, Long> kv) {
-            return kv.getKey() + ": " + kv.getValue();
-          }
-        }))
-        .apply(new WriteOneFilePerWindow(outputFilePath, null));
+
+    getSource(p, args)
+      .apply(windowFn)
+      .apply(Sum.longsPerKey())
+      .apply(MapElements.<KV<String, Long>, String>via(new SimpleFunction<KV<String, Long>, String>() {
+        @Override
+        public String apply(final KV<String, Long> kv) {
+          return kv.getKey() + ": " + kv.getValue();
+        }
+      }))
+      .apply(new WriteOneFilePerWindow(outputFilePath, 1));
+
     p.run();
   }
 }
diff --git a/examples/beam/src/test/java/org/apache/nemo/examples/beam/WindowedWordCountITCase.java b/examples/beam/src/test/java/org/apache/nemo/examples/beam/WindowedWordCountITCase.java
index 55ed19d..c0134aa 100644
--- a/examples/beam/src/test/java/org/apache/nemo/examples/beam/WindowedWordCountITCase.java
+++ b/examples/beam/src/test/java/org/apache/nemo/examples/beam/WindowedWordCountITCase.java
@@ -28,6 +28,9 @@ import org.junit.runner.RunWith;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
+import static org.apache.nemo.examples.beam.WindowedWordCount.INPUT_TYPE_BOUNDED;
+import static org.apache.nemo.examples.beam.WindowedWordCount.INPUT_TYPE_UNBOUNDED;
+
 /**
  * Test Windowed word count program with JobLauncher.
  */
@@ -51,7 +54,7 @@ public final class WindowedWordCountITCase {
   public void testBatchFixedWindow() throws Exception {
     builder = new ArgBuilder()
       .addUserMain(WindowedWordCount.class.getCanonicalName())
-      .addUserArgs(inputFilePath, outputFilePath, "fixed");
+      .addUserArgs(outputFilePath, "fixed", INPUT_TYPE_BOUNDED, inputFilePath);
 
     JobLauncher.main(builder
         .addResourceJson(executorResourceFileName)
@@ -71,7 +74,7 @@ public final class WindowedWordCountITCase {
   public void testBatchSlidingWindow() throws Exception {
     builder = new ArgBuilder()
       .addUserMain(WindowedWordCount.class.getCanonicalName())
-      .addUserArgs(inputFilePath, outputFilePath, "sliding");
+      .addUserArgs(outputFilePath, "sliding", INPUT_TYPE_BOUNDED, inputFilePath);
 
     JobLauncher.main(builder
       .addResourceJson(executorResourceFileName)
@@ -91,7 +94,7 @@ public final class WindowedWordCountITCase {
     builder = new ArgBuilder()
       .addScheduler("org.apache.nemo.runtime.master.scheduler.StreamingScheduler")
       .addUserMain(WindowedWordCount.class.getCanonicalName())
-      .addUserArgs(inputFilePath, outputFilePath, "fixed");
+      .addUserArgs(outputFilePath, "fixed", INPUT_TYPE_BOUNDED, inputFilePath);
 
     JobLauncher.main(builder
       .addResourceJson(executorResourceFileName)
@@ -112,7 +115,7 @@ public final class WindowedWordCountITCase {
     builder = new ArgBuilder()
       .addScheduler("org.apache.nemo.runtime.master.scheduler.StreamingScheduler")
       .addUserMain(WindowedWordCount.class.getCanonicalName())
-      .addUserArgs(inputFilePath, outputFilePath, "sliding");
+      .addUserArgs(outputFilePath, "sliding", INPUT_TYPE_BOUNDED, inputFilePath);
 
     JobLauncher.main(builder
       .addResourceJson(executorResourceFileName)
@@ -126,4 +129,25 @@ public final class WindowedWordCountITCase {
       ExampleTestUtil.deleteOutputFile(fileBasePath, outputFileName);
     }
   }
+
+
+  // TODO #271: We currently disable this test because we cannot force close Nemo
+  //@Test (timeout = TIMEOUT)
+  public void testUnboundedSlidingWindow() throws Exception {
+    builder = new ArgBuilder()
+      .addScheduler("org.apache.nemo.runtime.master.scheduler.StreamingScheduler")
+      .addUserMain(WindowedWordCount.class.getCanonicalName())
+      .addUserArgs(outputFilePath, "sliding", INPUT_TYPE_UNBOUNDED);
+
+    JobLauncher.main(builder
+      .addResourceJson(executorResourceFileName)
+      .addJobId(WindowedWordCountITCase.class.getSimpleName())
+      .addOptimizationPolicy(StreamingPolicyParallelismFive.class.getCanonicalName())
+      .build());
+
+    try {
+      ExampleTestUtil.ensureOutputValidity(fileBasePath, outputFileName, expectedSlidingWindowOutputFileName);
+    } finally {
+    }
+  }
 }
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/Executor.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/Executor.java
index b218bf8..f7aa184 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/Executor.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/Executor.java
@@ -19,6 +19,10 @@
 package org.apache.nemo.runtime.executor;
 
 import com.google.protobuf.ByteString;
+import org.apache.nemo.common.coder.BytesDecoderFactory;
+import org.apache.nemo.common.coder.BytesEncoderFactory;
+import org.apache.nemo.common.coder.DecoderFactory;
+import org.apache.nemo.common.coder.EncoderFactory;
 import org.apache.nemo.common.dag.DAG;
 import org.apache.nemo.common.ir.edge.executionproperty.DecoderProperty;
 import org.apache.nemo.common.ir.edge.executionproperty.DecompressionProperty;
@@ -39,6 +43,8 @@ import org.apache.nemo.runtime.common.plan.Task;
 import org.apache.nemo.runtime.executor.data.BroadcastManagerWorker;
 import org.apache.nemo.runtime.executor.data.SerializerManager;
 import org.apache.nemo.runtime.executor.datatransfer.IntermediateDataIOFactory;
+import org.apache.nemo.runtime.executor.datatransfer.NemoEventDecoderFactory;
+import org.apache.nemo.runtime.executor.datatransfer.NemoEventEncoderFactory;
 import org.apache.nemo.runtime.executor.task.TaskExecutor;
 import org.apache.commons.lang3.SerializationUtils;
 import org.apache.commons.lang3.concurrent.BasicThreadFactory;
@@ -114,6 +120,7 @@ public final class Executor {
    * @param task to launch.
    */
   private void launchTask(final Task task) {
+    LOG.info("Launch task: {}", task);
     try {
       final DAG<IRVertex, RuntimeEdge<IRVertex>> irDag =
           SerializationUtils.deserialize(task.getSerializedIRDag());
@@ -121,19 +128,19 @@ public final class Executor {
           new TaskStateManager(task, executorId, persistentConnectionToMasterMap, metricMessageSender);
 
       task.getTaskIncomingEdges().forEach(e -> serializerManager.register(e.getId(),
-          e.getPropertyValue(EncoderProperty.class).get(),
-          e.getPropertyValue(DecoderProperty.class).get(),
+          getEncoderFactory(e.getPropertyValue(EncoderProperty.class).get()),
+          getDecoderFactory(e.getPropertyValue(DecoderProperty.class).get()),
           e.getPropertyValue(CompressionProperty.class).orElse(null),
           e.getPropertyValue(DecompressionProperty.class).orElse(null)));
       task.getTaskOutgoingEdges().forEach(e -> serializerManager.register(e.getId(),
-          e.getPropertyValue(EncoderProperty.class).get(),
-          e.getPropertyValue(DecoderProperty.class).get(),
+          getEncoderFactory(e.getPropertyValue(EncoderProperty.class).get()),
+          getDecoderFactory(e.getPropertyValue(DecoderProperty.class).get()),
           e.getPropertyValue(CompressionProperty.class).orElse(null),
           e.getPropertyValue(DecompressionProperty.class).orElse(null)));
       irDag.getVertices().forEach(v -> {
         irDag.getOutgoingEdgesOf(v).forEach(e -> serializerManager.register(e.getId(),
-            e.getPropertyValue(EncoderProperty.class).get(),
-            e.getPropertyValue(DecoderProperty.class).get(),
+            getEncoderFactory(e.getPropertyValue(EncoderProperty.class).get()),
+            getDecoderFactory(e.getPropertyValue(DecoderProperty.class).get()),
             e.getPropertyValue(CompressionProperty.class).orElse(null),
             e.getPropertyValue(DecompressionProperty.class).orElse(null)));
       });
@@ -155,6 +162,36 @@ public final class Executor {
     }
   }
 
+  /**
+   * This wraps the encoder with NemoEventEncoder.
+   * If the encoder is BytesEncoderFactory, we do not wrap the encoder.
+   * TODO #276: Add NoCoder property value in Encoder/DecoderProperty
+   * @param encoderFactory encoder factory
+   * @return wrapped encoder
+   */
+  private EncoderFactory getEncoderFactory(final EncoderFactory encoderFactory) {
+    if (encoderFactory instanceof BytesEncoderFactory) {
+      return encoderFactory;
+    } else {
+      return new NemoEventEncoderFactory(encoderFactory);
+    }
+  }
+
+  /**
+   * This wraps the encoder with NemoEventDecoder.
+   * If the decoder is BytesDecoderFactory, we do not wrap the decoder.
+   * TODO #276: Add NoCoder property value in Encoder/DecoderProperty
+   * @param decoderFactory decoder factory
+   * @return wrapped decoder
+   */
+  private DecoderFactory getDecoderFactory(final DecoderFactory decoderFactory) {
+    if (decoderFactory instanceof BytesDecoderFactory) {
+      return decoderFactory;
+    } else {
+      return new NemoEventDecoderFactory(decoderFactory);
+    }
+  }
+
   public void terminate() {
     try {
       metricMessageSender.close();
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/bytetransfer/ByteOutputContext.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/bytetransfer/ByteOutputContext.java
index 51cd7e1..315760c 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/bytetransfer/ByteOutputContext.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/bytetransfer/ByteOutputContext.java
@@ -22,6 +22,8 @@ import org.apache.nemo.runtime.executor.data.FileArea;
 import org.apache.nemo.runtime.executor.data.partition.SerializedPartition;
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 import java.io.IOException;
@@ -37,6 +39,7 @@ import java.nio.file.Paths;
  * although the execution order may not be linearized if they were called from different threads.</p>
  */
 public final class ByteOutputContext extends ByteTransferContext implements AutoCloseable {
+  private static final Logger LOG = LoggerFactory.getLogger(ByteOutputContext.class.getName());
 
   private final Channel channel;
 
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/DataUtil.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/DataUtil.java
index 9cb7b23..80e83df 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/DataUtil.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/DataUtil.java
@@ -144,6 +144,8 @@ public final class DataUtil {
     final List<NonSerializedPartition<K>> nonSerializedPartitions = new ArrayList<>();
     for (final SerializedPartition<K> partitionToConvert : partitionsToConvert) {
       final K key = partitionToConvert.getKey();
+
+
       try (final ByteArrayInputStream byteArrayInputStream =
                new ByteArrayInputStream(partitionToConvert.getData())) {
         final NonSerializedPartition<K> deserializePartition = deserializePartition(
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/partition/SerializedPartition.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/partition/SerializedPartition.java
index 952fbf4..71541c2 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/partition/SerializedPartition.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/partition/SerializedPartition.java
@@ -116,6 +116,7 @@ public final class SerializedPartition<K> implements Partition<byte[], K> {
       // inner buffer directly, which can be an unfinished(not flushed) buffer.
       wrappedStream.close();
       this.serializedData = bytesOutputStream.getBufDirectly();
+
       this.length = bytesOutputStream.getCount();
       this.committed = true;
     }
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/BlockOutputWriter.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/BlockOutputWriter.java
index 2391533..4b85087 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/BlockOutputWriter.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/BlockOutputWriter.java
@@ -21,11 +21,14 @@ package org.apache.nemo.runtime.executor.datatransfer;
 import org.apache.nemo.common.ir.edge.executionproperty.*;
 import org.apache.nemo.common.ir.vertex.IRVertex;
 import org.apache.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
+import org.apache.nemo.common.punctuation.Watermark;
 import org.apache.nemo.runtime.common.RuntimeIdManager;
 import org.apache.nemo.runtime.common.plan.RuntimeEdge;
 import org.apache.nemo.runtime.executor.data.BlockManagerWorker;
 import org.apache.nemo.runtime.executor.data.block.Block;
 import org.apache.nemo.runtime.executor.data.partitioner.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.Map;
 import java.util.Optional;
@@ -34,6 +37,8 @@ import java.util.Optional;
  * Represents the output data transfer from a task.
  */
 public final class BlockOutputWriter implements OutputWriter {
+  private static final Logger LOG = LoggerFactory.getLogger(BlockOutputWriter.class.getName());
+
   private final RuntimeEdge<?> runtimeEdge;
   private final IRVertex dstIrVertex;
   private final Partitioner partitioner;
@@ -88,6 +93,11 @@ public final class BlockOutputWriter implements OutputWriter {
     } // If else, does not need to write because the data is duplicated.
   }
 
+  @Override
+  public void writeWatermark(final Watermark watermark) {
+    // do nothing
+  }
+
   /**
    * Notifies that all writes for a block is end.
    * Further write about a committed block will throw an exception.
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/DataFetcherOutputCollector.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/DataFetcherOutputCollector.java
index 56c7540..d50ad82 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/DataFetcherOutputCollector.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/DataFetcherOutputCollector.java
@@ -31,13 +31,19 @@ import org.slf4j.LoggerFactory;
 public final class DataFetcherOutputCollector<O> implements OutputCollector<O> {
   private static final Logger LOG = LoggerFactory.getLogger(DataFetcherOutputCollector.class.getName());
   private final OperatorVertex nextOperatorVertex;
+  private final int edgeIndex;
+  private final InputWatermarkManager watermarkManager;
 
   /**
    * It forwards output to the next operator.
    * @param nextOperatorVertex next operator to emit data and watermark
    */
-  public DataFetcherOutputCollector(final OperatorVertex nextOperatorVertex) {
+  public DataFetcherOutputCollector(final OperatorVertex nextOperatorVertex,
+                                    final int edgeIndex,
+                                    final InputWatermarkManager watermarkManager) {
     this.nextOperatorVertex = nextOperatorVertex;
+    this.edgeIndex = edgeIndex;
+    this.watermarkManager = watermarkManager;
   }
 
   @Override
@@ -47,7 +53,7 @@ public final class DataFetcherOutputCollector<O> implements OutputCollector<O> {
 
   @Override
   public void emitWatermark(final Watermark watermark) {
-    nextOperatorVertex.getTransform().onWatermark(watermark);
+    watermarkManager.trackAndEmitWatermarks(edgeIndex, watermark);
   }
 
   @Override
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/InputWatermarkManager.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/InputWatermarkManager.java
index 66fb7aa..adbb659 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/InputWatermarkManager.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/InputWatermarkManager.java
@@ -31,6 +31,8 @@ public interface InputWatermarkManager {
 
   /**
    * This tracks the minimum input watermark among multiple input streams.
+   * This method is not a Thread-safe so the caller should synchronize it
+   * if multiple threads access this method concurrently.
    * Ex)
    * -- input stream1 (edge 1):  ---------- ts: 3 ------------------ts: 6
    *                                                                 ^^^
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/MultiInputWatermarkManager.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/MultiInputWatermarkManager.java
index 91c7c55..613eccc 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/MultiInputWatermarkManager.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/MultiInputWatermarkManager.java
@@ -18,8 +18,10 @@
  */
 package org.apache.nemo.runtime.executor.datatransfer;
 
-import org.apache.nemo.common.ir.vertex.OperatorVertex;
+import org.apache.nemo.common.ir.OutputCollector;
 import org.apache.nemo.common.punctuation.Watermark;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -28,14 +30,16 @@ import java.util.List;
  * This tracks the minimum input watermark among multiple input streams.
  */
 public final class MultiInputWatermarkManager implements InputWatermarkManager {
+  private static final Logger LOG = LoggerFactory.getLogger(MultiInputWatermarkManager.class.getName());
+
   private final List<Watermark> watermarks;
-  private final OperatorVertex nextOperator;
+  private final OutputCollector<?> watermarkCollector;
   private int minWatermarkIndex;
   public MultiInputWatermarkManager(final int numEdges,
-                                    final OperatorVertex nextOperator) {
+                                    final OutputCollector<?> watermarkCollector) {
     super();
     this.watermarks = new ArrayList<>(numEdges);
-    this.nextOperator = nextOperator;
+    this.watermarkCollector = watermarkCollector;
     this.minWatermarkIndex = 0;
     // We initialize watermarks as min value because
     // we should not emit watermark until all edges emit watermarks.
@@ -58,6 +62,12 @@ public final class MultiInputWatermarkManager implements InputWatermarkManager {
 
   @Override
   public void trackAndEmitWatermarks(final int edgeIndex, final Watermark watermark) {
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Track watermark {} emitted from edge {}:, {}", watermark.getTimestamp(), edgeIndex,
+        watermarks.toString());
+    }
+
     if (edgeIndex == minWatermarkIndex) {
       // update min watermark
       final Watermark prevMinWatermark = watermarks.get(minWatermarkIndex);
@@ -74,7 +84,10 @@ public final class MultiInputWatermarkManager implements InputWatermarkManager {
       if (minWatermark.getTimestamp() > prevMinWatermark.getTimestamp()) {
         // Watermark timestamp progress!
         // Emit the min watermark
-        nextOperator.getTransform().onWatermark(minWatermark);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Emit watermark {}, {}", minWatermark, watermarks);
+        }
+        watermarkCollector.emitWatermark(minWatermark);
       }
     } else {
       // The recent watermark timestamp cannot be less than the previous one
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/NemoEventDecoderFactory.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/NemoEventDecoderFactory.java
new file mode 100644
index 0000000..1367e70
--- /dev/null
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/NemoEventDecoderFactory.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.runtime.executor.datatransfer;
+
+import org.apache.commons.lang.SerializationUtils;
+import org.apache.nemo.common.coder.DecoderFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * A factory for NemoEventDecoder.
+ */
+public final class NemoEventDecoderFactory implements DecoderFactory {
+  private static final Logger LOG = LoggerFactory.getLogger(NemoEventDecoderFactory.class.getName());
+
+  private final DecoderFactory valueDecoderFactory;
+
+  public NemoEventDecoderFactory(final DecoderFactory valueDecoderFactory) {
+    this.valueDecoderFactory = valueDecoderFactory;
+  }
+
+  @Override
+  public Decoder create(final InputStream inputStream) throws IOException {
+    return new NemoEventDecoder(valueDecoderFactory.create(inputStream), inputStream);
+  }
+
+  /**
+   * This class decodes receive data into two types.
+   * - normal data
+   * - WatermarkWithIndex
+   */
+  private final class NemoEventDecoder implements DecoderFactory.Decoder {
+
+    private final Decoder valueDecoder;
+    private final InputStream inputStream;
+
+    NemoEventDecoder(final Decoder valueDecoder,
+                     final InputStream inputStream) {
+      this.valueDecoder = valueDecoder;
+      this.inputStream = inputStream;
+    }
+
+    @Override
+    public Object decode() throws IOException {
+
+      final byte isWatermark = (byte) inputStream.read();
+      if (isWatermark == -1) {
+        // end of the input stream
+        throw new EOFException();
+      }
+
+      if (isWatermark == 0x00) {
+        // this is not a watermark
+        return valueDecoder.decode();
+      } else if (isWatermark == 0x01) {
+        // this is a watermark
+        final WatermarkWithIndex watermarkWithIndex =
+          (WatermarkWithIndex) SerializationUtils.deserialize(inputStream);
+        return watermarkWithIndex;
+      } else {
+        throw new RuntimeException("Watermark decoding failure: " + isWatermark);
+      }
+    }
+
+    @Override
+    public String toString() {
+      final StringBuilder stringBuilder = new StringBuilder("NemoDecoder{");
+      stringBuilder.append(valueDecoder.toString());
+      stringBuilder.append("}");
+      return stringBuilder.toString();
+    }
+  }
+}
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/NemoEventEncoderFactory.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/NemoEventEncoderFactory.java
new file mode 100644
index 0000000..c49beda
--- /dev/null
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/NemoEventEncoderFactory.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.runtime.executor.datatransfer;
+
+import org.apache.commons.lang.SerializationUtils;
+import org.apache.nemo.common.coder.EncoderFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.Serializable;
+
+/**
+ * A factory for NemoEventEncoder.
+ */
+public final class NemoEventEncoderFactory implements EncoderFactory {
+  private static final Logger LOG = LoggerFactory.getLogger(NemoEventEncoderFactory.class.getName());
+
+  private final EncoderFactory valueEncoderFactory;
+
+  public NemoEventEncoderFactory(final EncoderFactory valueEncoderFactory) {
+    this.valueEncoderFactory = valueEncoderFactory;
+  }
+
+  @Override
+  public Encoder create(final OutputStream outputStream) throws IOException {
+    return new NemoEventEncoder(valueEncoderFactory.create(outputStream), outputStream);
+  }
+
+  /**
+   * This encodes normal data and WatermarkWithIndex.
+   * @param <T>
+   */
+  private final class NemoEventEncoder<T> implements EncoderFactory.Encoder<T> {
+    private final EncoderFactory.Encoder<T> valueEncoder;
+    private final OutputStream outputStream;
+
+    NemoEventEncoder(final EncoderFactory.Encoder<T> valueEncoder,
+                     final OutputStream outputStream) {
+      this.valueEncoder = valueEncoder;
+      this.outputStream = outputStream;
+    }
+
+    @Override
+    public void encode(final T element) throws IOException {
+      if (element instanceof WatermarkWithIndex) {
+        outputStream.write(0x01); // this is watermark
+        outputStream.write(SerializationUtils.serialize((Serializable) element));
+      } else {
+        outputStream.write(0x00); // this is a data element
+        valueEncoder.encode(element);
+      }
+    }
+  }
+}
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OperatorVertexOutputCollector.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OperatorVertexOutputCollector.java
index 3637780..12d9932 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OperatorVertexOutputCollector.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OperatorVertexOutputCollector.java
@@ -77,6 +77,7 @@ public final class OperatorVertexOutputCollector<O> implements OutputCollector<O
 
   @Override
   public void emit(final O output) {
+
     for (final NextIntraTaskOperatorInfo internalVertex : internalMainOutputs) {
       emit(internalVertex.getNextOperator(), output);
     }
@@ -104,6 +105,11 @@ public final class OperatorVertexOutputCollector<O> implements OutputCollector<O
 
   @Override
   public void emitWatermark(final Watermark watermark) {
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("{} emits watermark {}", irVertex.getId(), watermark);
+    }
+
     // Emit watermarks to internal vertices
     for (final NextIntraTaskOperatorInfo internalVertex : internalMainOutputs) {
       internalVertex.getWatermarkManager().trackAndEmitWatermarks(internalVertex.getEdgeIndex(), watermark);
@@ -115,7 +121,15 @@ public final class OperatorVertexOutputCollector<O> implements OutputCollector<O
       }
     }
 
-    // TODO #245: handle watermarks in OutputWriter
-    // TODO #245: currently ignore emitting watermarks to output writer
+    // Emit watermarks to output writer
+    for (final OutputWriter outputWriter : externalMainOutputs) {
+      outputWriter.writeWatermark(watermark);
+    }
+
+    for (final List<OutputWriter> externalVertices : externalAdditionalOutputs.values()) {
+      for (final OutputWriter externalVertex : externalVertices) {
+        externalVertex.writeWatermark(watermark);
+      }
+    }
   }
 }
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/DataFetcherOutputCollector.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OperatorWatermarkCollector.java
similarity index 56%
copy from runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/DataFetcherOutputCollector.java
copy to runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OperatorWatermarkCollector.java
index 56c7540..66efb72 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/DataFetcherOutputCollector.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OperatorWatermarkCollector.java
@@ -23,35 +23,31 @@ import org.apache.nemo.common.ir.vertex.OperatorVertex;
 import org.apache.nemo.common.punctuation.Watermark;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 /**
- * This collector receives data from DataFetcher and forwards it to the next operator.
- * @param <O> output type.
+ * This class is used for collecting watermarks for an OperatorVertex.
+ * InputWatermarkManager emits watermarks to this class.
  */
-public final class DataFetcherOutputCollector<O> implements OutputCollector<O> {
-  private static final Logger LOG = LoggerFactory.getLogger(DataFetcherOutputCollector.class.getName());
-  private final OperatorVertex nextOperatorVertex;
+public final class OperatorWatermarkCollector implements OutputCollector {
+  private static final Logger LOG = LoggerFactory.getLogger(OperatorWatermarkCollector.class.getName());
+
+  private final OperatorVertex operatorVertex;
 
-  /**
-   * It forwards output to the next operator.
-   * @param nextOperatorVertex next operator to emit data and watermark
-   */
-  public DataFetcherOutputCollector(final OperatorVertex nextOperatorVertex) {
-    this.nextOperatorVertex = nextOperatorVertex;
+  public OperatorWatermarkCollector(final OperatorVertex operatorVertex) {
+    this.operatorVertex = operatorVertex;
   }
 
   @Override
-  public void emit(final O output) {
-    nextOperatorVertex.getTransform().onData(output);
+  public void emit(final Object output) {
+    throw new IllegalStateException("Should not be called");
   }
 
   @Override
   public void emitWatermark(final Watermark watermark) {
-    nextOperatorVertex.getTransform().onWatermark(watermark);
+    operatorVertex.getTransform().onWatermark(watermark);
   }
 
   @Override
-  public <T> void emit(final String dstVertexId, final T output) {
-    throw new RuntimeException("No additional output tag in DataFetcherOutputCollector");
+  public void emit(final String dstVertexId, final Object output) {
+    throw new IllegalStateException("Should not be called");
   }
 }
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OutputWriter.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OutputWriter.java
index 032510a..301c95a 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OutputWriter.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OutputWriter.java
@@ -23,6 +23,7 @@ import org.apache.nemo.common.exception.UnsupportedPartitionerException;
 import org.apache.nemo.common.ir.edge.executionproperty.KeyExtractorProperty;
 import org.apache.nemo.common.ir.edge.executionproperty.PartitionerProperty;
 import org.apache.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
+import org.apache.nemo.common.punctuation.Watermark;
 import org.apache.nemo.runtime.common.plan.RuntimeEdge;
 import org.apache.nemo.runtime.common.plan.StageEdge;
 import org.apache.nemo.runtime.executor.data.partitioner.*;
@@ -41,6 +42,12 @@ public interface OutputWriter {
   void write(final Object element);
 
   /**
+   * Writes watermarks to all edges.
+   * @param watermark watermark
+   */
+  void writeWatermark(final Watermark watermark);
+
+  /**
    * @return the total written bytes.
    */
   Optional<Long> getWrittenBytes();
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/PipeOutputWriter.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/PipeOutputWriter.java
index a5dbf93..dd70394 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/PipeOutputWriter.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/PipeOutputWriter.java
@@ -21,6 +21,7 @@ package org.apache.nemo.runtime.executor.datatransfer;
 import org.apache.nemo.common.DirectByteArrayOutputStream;
 import org.apache.nemo.common.coder.EncoderFactory;
 import org.apache.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty;
+import org.apache.nemo.common.punctuation.Watermark;
 import org.apache.nemo.runtime.common.RuntimeIdManager;
 import org.apache.nemo.runtime.common.plan.RuntimeEdge;
 import org.apache.nemo.runtime.executor.bytetransfer.ByteOutputContext;
@@ -33,6 +34,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.OutputStream;
+import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
 
@@ -43,6 +45,7 @@ public final class PipeOutputWriter implements OutputWriter {
   private static final Logger LOG = LoggerFactory.getLogger(OutputWriter.class.getName());
 
   private final String srcTaskId;
+  private final int srcTaskIndex;
   private final PipeManagerWorker pipeManagerWorker;
 
   private final Partitioner partitioner;
@@ -70,6 +73,27 @@ public final class PipeOutputWriter implements OutputWriter {
     this.pipeManagerWorker.notifyMaster(runtimeEdge.getId(), RuntimeIdManager.getIndexFromTaskId(srcTaskId));
     this.partitioner = OutputWriter.getPartitioner(runtimeEdge, hashRangeMultiplier);
     this.runtimeEdge = runtimeEdge;
+    this.srcTaskIndex = RuntimeIdManager.getIndexFromTaskId(srcTaskId);
+  }
+
+  private void writeData(final Object element, final List<ByteOutputContext> pipeList) {
+    pipeList.forEach(pipe -> {
+
+      try (final ByteOutputContext.ByteOutputStream pipeToWriteTo = pipe.newOutputStream()) {
+        // Serialize (Do not compress)
+        final DirectByteArrayOutputStream bytesOutputStream = new DirectByteArrayOutputStream();
+        final OutputStream wrapped =
+          DataUtil.buildOutputStream(bytesOutputStream, serializer.getEncodeStreamChainers());
+        final EncoderFactory.Encoder encoder = serializer.getEncoderFactory().create(wrapped);
+        encoder.encode(element);
+        wrapped.close();
+
+        // Write
+        pipeToWriteTo.write(bytesOutputStream.getBufDirectly());
+      } catch (IOException e) {
+        throw new RuntimeException(e); // For now we crash the executor on IOException
+      }
+    });
   }
 
   /**
@@ -82,19 +106,17 @@ public final class PipeOutputWriter implements OutputWriter {
       doInitialize();
     }
 
-    try (final ByteOutputContext.ByteOutputStream pipeToWriteTo = getPipeToWrite(element)) {
-      // Serialize (Do not compress)
-      final DirectByteArrayOutputStream bytesOutputStream = new DirectByteArrayOutputStream();
-      final OutputStream wrapped = DataUtil.buildOutputStream(bytesOutputStream, serializer.getEncodeStreamChainers());
-      final EncoderFactory.Encoder encoder = serializer.getEncoderFactory().create(wrapped);
-      encoder.encode(element);
-      wrapped.close();
-
-      // Write
-      pipeToWriteTo.write(bytesOutputStream.getBufDirectly());
-    } catch (IOException e) {
-      throw new RuntimeException(e); // For now we crash the executor on IOException
+    writeData(element, getPipeToWrite(element));
+  }
+
+  @Override
+  public void writeWatermark(final Watermark watermark) {
+    if (!initialized) {
+      doInitialize();
     }
+
+    final WatermarkWithIndex watermarkWithIndex = new WatermarkWithIndex(watermark, srcTaskIndex);
+    writeData(watermarkWithIndex, pipes);
   }
 
   @Override
@@ -126,11 +148,11 @@ public final class PipeOutputWriter implements OutputWriter {
     this.serializer = pipeManagerWorker.getSerializer(runtimeEdge.getId());
   }
 
-  private ByteOutputContext.ByteOutputStream getPipeToWrite(final Object element) throws IOException {
+  private List<ByteOutputContext> getPipeToWrite(final Object element) {
     return runtimeEdge.getPropertyValue(CommunicationPatternProperty.class)
       .get()
       .equals(CommunicationPatternProperty.Value.OneToOne)
-      ? pipes.get(0).newOutputStream()
-      : pipes.get((int) partitioner.partition(element)).newOutputStream();
+      ? Collections.singletonList(pipes.get(0))
+      : Collections.singletonList(pipes.get((int) partitioner.partition(element)));
   }
 }
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/SingleInputWatermarkManager.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/SingleInputWatermarkManager.java
index 204bf22..e8135f9 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/SingleInputWatermarkManager.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/SingleInputWatermarkManager.java
@@ -18,7 +18,7 @@
  */
 package org.apache.nemo.runtime.executor.datatransfer;
 
-import org.apache.nemo.common.ir.vertex.OperatorVertex;
+import org.apache.nemo.common.ir.OutputCollector;
 import org.apache.nemo.common.punctuation.Watermark;
 
 
@@ -27,10 +27,10 @@ import org.apache.nemo.common.punctuation.Watermark;
  */
 public final class SingleInputWatermarkManager implements InputWatermarkManager {
 
-  private final OperatorVertex nextOperator;
+  private final OutputCollector watermarkCollector;
 
-  public SingleInputWatermarkManager(final OperatorVertex nextOperator) {
-    this.nextOperator = nextOperator;
+  public SingleInputWatermarkManager(final OutputCollector watermarkCollector) {
+    this.watermarkCollector = watermarkCollector;
   }
 
   /**
@@ -41,6 +41,6 @@ public final class SingleInputWatermarkManager implements InputWatermarkManager
   @Override
   public void trackAndEmitWatermarks(final int edgeIndex,
                                      final Watermark watermark) {
-    nextOperator.getTransform().onWatermark(watermark);
+    watermarkCollector.emitWatermark(watermark);
   }
 }
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/SingleInputWatermarkManager.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/WatermarkWithIndex.java
similarity index 56%
copy from runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/SingleInputWatermarkManager.java
copy to runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/WatermarkWithIndex.java
index 204bf22..3db6cd5 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/SingleInputWatermarkManager.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/WatermarkWithIndex.java
@@ -18,29 +18,37 @@
  */
 package org.apache.nemo.runtime.executor.datatransfer;
 
-import org.apache.nemo.common.ir.vertex.OperatorVertex;
 import org.apache.nemo.common.punctuation.Watermark;
 
+import java.io.Serializable;
 
 /**
- * This is a special implementation for single input data stream for optimization.
+ * This contains a watermark and the src task index.
+ * It is used for transferring the watermark between tasks.
  */
-public final class SingleInputWatermarkManager implements InputWatermarkManager {
+public final class WatermarkWithIndex implements Serializable {
+  private final Watermark watermark;
+  private final int index;
 
-  private final OperatorVertex nextOperator;
+  public WatermarkWithIndex(final Watermark watermark, final int index) {
+    this.watermark = watermark;
+    this.index = index;
+  }
+
+  public Watermark getWatermark() {
+    return watermark;
+  }
 
-  public SingleInputWatermarkManager(final OperatorVertex nextOperator) {
-    this.nextOperator = nextOperator;
+  public int getIndex() {
+    return index;
   }
 
-  /**
-   * This just forwards watermarks to the next operator because it has one data stream.
-   * @param edgeIndex edge index
-   * @param watermark watermark
-   */
   @Override
-  public void trackAndEmitWatermarks(final int edgeIndex,
-                                     final Watermark watermark) {
-    nextOperator.getTransform().onWatermark(watermark);
+  public String toString() {
+    final StringBuilder sb = new StringBuilder();
+    sb.append(watermark);
+    sb.append(" from ");
+    sb.append(index);
+    return sb.toString();
   }
 }
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/MultiThreadParentTaskDataFetcher.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/MultiThreadParentTaskDataFetcher.java
index a9b0da3..001060c 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/MultiThreadParentTaskDataFetcher.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/MultiThreadParentTaskDataFetcher.java
@@ -21,8 +21,9 @@ package org.apache.nemo.runtime.executor.task;
 import org.apache.nemo.common.ir.OutputCollector;
 import org.apache.nemo.common.ir.vertex.IRVertex;
 import org.apache.nemo.common.punctuation.Finishmark;
+import org.apache.nemo.common.punctuation.Watermark;
 import org.apache.nemo.runtime.executor.data.DataUtil;
-import org.apache.nemo.runtime.executor.datatransfer.InputReader;
+import org.apache.nemo.runtime.executor.datatransfer.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -56,9 +57,12 @@ class MultiThreadParentTaskDataFetcher extends DataFetcher {
   private long serBytes = 0;
   private long encodedBytes = 0;
 
-  private int numOfIterators;
+  private int numOfIterators; // == numOfIncomingEdges
   private int numOfFinishMarks = 0;
 
+  // A watermark manager
+  private InputWatermarkManager inputWatermarkManager;
+
   MultiThreadParentTaskDataFetcher(final IRVertex dataSource,
                                    final InputReader readerForParentTask,
                                    final OutputCollector outputCollector) {
@@ -96,6 +100,12 @@ class MultiThreadParentTaskDataFetcher extends DataFetcher {
     final List<CompletableFuture<DataUtil.IteratorWithNumBytes>> futures = readersForParentTask.read();
     numOfIterators = futures.size();
 
+    if (numOfIterators > 1) {
+      inputWatermarkManager = new MultiInputWatermarkManager(numOfIterators, new WatermarkCollector());
+    } else {
+      inputWatermarkManager = new SingleInputWatermarkManager(new WatermarkCollector());
+    }
+
     futures.forEach(compFuture -> compFuture.whenComplete((iterator, exception) -> {
       // A thread for each iterator
       queueInsertionThreads.submit(() -> {
@@ -103,7 +113,21 @@ class MultiThreadParentTaskDataFetcher extends DataFetcher {
           // Consume this iterator to the end.
           while (iterator.hasNext()) { // blocked on the iterator.
             final Object element = iterator.next();
-            elementQueue.offer(element);
+
+
+            if (element instanceof WatermarkWithIndex) {
+              // watermark element
+              // the input watermark manager is accessed by multiple threads
+              // so we should synchronize it
+              synchronized (inputWatermarkManager) {
+                final WatermarkWithIndex watermarkWithIndex = (WatermarkWithIndex) element;
+                inputWatermarkManager.trackAndEmitWatermarks(
+                  watermarkWithIndex.getIndex(), watermarkWithIndex.getWatermark());
+              }
+            } else {
+              // data element
+              elementQueue.offer(element);
+            }
           }
 
           // This iterator is finished.
@@ -147,4 +171,26 @@ class MultiThreadParentTaskDataFetcher extends DataFetcher {
   public void close() throws Exception {
     queueInsertionThreads.shutdown();
   }
+
+  /**
+   * Just adds the emitted watermark to the element queue.
+   * It receives the watermark from InputWatermarkManager.
+   */
+  private final class WatermarkCollector implements OutputCollector {
+
+    @Override
+    public void emit(final Object output) {
+      throw new IllegalStateException("Should not be called");
+    }
+
+    @Override
+    public void emitWatermark(final Watermark watermark) {
+      elementQueue.offer(watermark);
+    }
+
+    @Override
+    public void emit(final String dstVertexId, final Object output) {
+      throw new IllegalStateException("Should not be called");
+    }
+  }
 }
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/SourceVertexDataFetcher.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/SourceVertexDataFetcher.java
index fa4bd8a..b42bd77 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/SourceVertexDataFetcher.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/SourceVertexDataFetcher.java
@@ -23,6 +23,8 @@ import org.apache.nemo.common.ir.Readable;
 import org.apache.nemo.common.ir.vertex.SourceVertex;
 import org.apache.nemo.common.punctuation.Watermark;
 import org.apache.nemo.common.punctuation.Finishmark;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.NoSuchElementException;
@@ -34,6 +36,8 @@ import java.util.concurrent.TimeUnit;
  * Fetches data from a data source.
  */
 class SourceVertexDataFetcher extends DataFetcher {
+  private static final Logger LOG = LoggerFactory.getLogger(SourceVertexDataFetcher.class.getName());
+
   private final Readable readable;
   private long boundedSourceReadTime = 0;
   private static final long WATERMARK_PERIOD = 1000; // ms
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java
index 8c92443..518bff3 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java
@@ -21,6 +21,7 @@ package org.apache.nemo.runtime.executor.task;
 import com.google.common.collect.Lists;
 import org.apache.nemo.common.Pair;
 import org.apache.nemo.common.dag.DAG;
+import org.apache.nemo.common.dag.Edge;
 import org.apache.nemo.common.ir.OutputCollector;
 import org.apache.nemo.common.ir.Readable;
 import org.apache.nemo.common.ir.edge.executionproperty.AdditionalOutputTagProperty;
@@ -124,6 +125,21 @@ public final class TaskExecutor {
     this.sortedHarnesses = pair.right();
   }
 
+  // Get all of the intra-task edges + inter-task edges
+  private List<Edge> getAllIncomingEdges(
+    final Task task,
+    final DAG<IRVertex, RuntimeEdge<IRVertex>> irVertexDag,
+    final IRVertex childVertex) {
+    final List<Edge> edges = new ArrayList<>();
+    edges.addAll(irVertexDag.getIncomingEdgesOf(childVertex));
+    final List<StageEdge> taskEdges = task.getTaskIncomingEdges().stream()
+      .filter(edge -> edge.getDstIRVertex().getId().equals(childVertex.getId()))
+      .collect(Collectors.toList());
+    edges.addAll(taskEdges);
+    return edges;
+  }
+
+
   /**
    * Converts the DAG of vertices into pointer-based DAG of vertex harnesses.
    * This conversion is necessary for constructing concrete data channels for each vertex's inputs and outputs.
@@ -158,11 +174,11 @@ public final class TaskExecutor {
     // Build a map for edge as a key and edge index as a value
     // This variable is used for creating NextIntraTaskOperatorInfo
     // in {@link this#getInternalMainOutputs and this#internalMainOutputs}
-    final Map<RuntimeEdge<IRVertex>, Integer> edgeIndexMap = new HashMap<>();
+    final Map<Edge, Integer> edgeIndexMap = new HashMap<>();
     reverseTopologicallySorted.forEach(childVertex -> {
-      final List<RuntimeEdge<IRVertex>> edges = irVertexDag.getIncomingEdgesOf(childVertex);
+      final List<Edge> edges = getAllIncomingEdges(task, irVertexDag, childVertex);
       for (int edgeIndex = 0; edgeIndex < edges.size(); edgeIndex++) {
-        final RuntimeEdge<IRVertex> edge = edges.get(edgeIndex);
+        final Edge edge = edges.get(edgeIndex);
         edgeIndexMap.putIfAbsent(edge, edgeIndex);
       }
     });
@@ -174,13 +190,15 @@ public final class TaskExecutor {
     reverseTopologicallySorted.forEach(childVertex -> {
 
       if (childVertex instanceof OperatorVertex) {
-        final List<RuntimeEdge<IRVertex>> edges = irVertexDag.getIncomingEdgesOf(childVertex);
+        final List<Edge> edges = getAllIncomingEdges(task, irVertexDag, childVertex);
         if (edges.size() == 1) {
           operatorWatermarkManagerMap.putIfAbsent(childVertex,
-            new SingleInputWatermarkManager((OperatorVertex) childVertex));
+            new SingleInputWatermarkManager(
+              new OperatorWatermarkCollector((OperatorVertex) childVertex)));
         } else {
           operatorWatermarkManagerMap.putIfAbsent(childVertex,
-            new MultiInputWatermarkManager(edges.size(), (OperatorVertex) childVertex));
+            new MultiInputWatermarkManager(edges.size(),
+              new OperatorWatermarkCollector((OperatorVertex) childVertex)));
         }
       }
 
@@ -257,23 +275,33 @@ public final class TaskExecutor {
             .orElseThrow(() -> new IllegalStateException(inEdge.toString())),
           broadcastReaders.get(i));
       }
+
       // Parent-task read (non-broadcasts)
       final List<StageEdge> nonBroadcastInEdges = new ArrayList<>(inEdgesForThisVertex);
       nonBroadcastInEdges.removeAll(broadcastInEdges);
-      final List<InputReader> nonBroadcastReaders =
-        getParentTaskReaders(taskIndex, nonBroadcastInEdges, intermediateDataIOFactory);
-      nonBroadcastReaders.forEach(parentTaskReader -> {
-        final DataFetcher dataFetcher;
-        if (parentTaskReader instanceof PipeInputReader) {
-          nonBroadcastDataFetcherList.add(
-            new MultiThreadParentTaskDataFetcher(parentTaskReader.getSrcIrVertex(), parentTaskReader,
-              new DataFetcherOutputCollector((OperatorVertex) irVertex)));
-        } else {
-          nonBroadcastDataFetcherList.add(
-            new ParentTaskDataFetcher(parentTaskReader.getSrcIrVertex(), parentTaskReader,
-              new DataFetcherOutputCollector((OperatorVertex) irVertex)));
-        }
-      });
+
+      nonBroadcastInEdges
+        .stream()
+        .map(incomingEdge ->
+          Pair.of(incomingEdge, intermediateDataIOFactory
+            .createReader(taskIndex, incomingEdge.getSrcIRVertex(), incomingEdge)))
+        .forEach(pair -> {
+          if (irVertex instanceof OperatorVertex) {
+            final StageEdge edge = pair.left();
+            final int edgeIndex = edgeIndexMap.get(edge);
+            final InputWatermarkManager watermarkManager = operatorWatermarkManagerMap.get(irVertex);
+            final InputReader parentTaskReader = pair.right();
+            if (parentTaskReader instanceof PipeInputReader) {
+              nonBroadcastDataFetcherList.add(
+                new MultiThreadParentTaskDataFetcher(parentTaskReader.getSrcIrVertex(), parentTaskReader,
+                  new DataFetcherOutputCollector((OperatorVertex) irVertex, edgeIndex, watermarkManager)));
+            } else {
+              nonBroadcastDataFetcherList.add(
+                new ParentTaskDataFetcher(parentTaskReader.getSrcIrVertex(), parentTaskReader,
+                  new DataFetcherOutputCollector((OperatorVertex) irVertex, edgeIndex, watermarkManager)));
+            }
+          }
+        });
     });
 
     final List<VertexHarness> sortedHarnessList = irVertexDag.getTopologicalSort()
@@ -529,7 +557,7 @@ public final class TaskExecutor {
   private Map<String, List<NextIntraTaskOperatorInfo>> getInternalAdditionalOutputMap(
     final IRVertex irVertex,
     final DAG<IRVertex, RuntimeEdge<IRVertex>> irVertexDag,
-    final Map<RuntimeEdge<IRVertex>, Integer> edgeIndexMap,
+    final Map<Edge, Integer> edgeIndexMap,
     final Map<IRVertex, InputWatermarkManager> operatorWatermarkManagerMap) {
     // Add all intra-task additional tags to additional output map.
     final Map<String, List<NextIntraTaskOperatorInfo>> map = new HashMap<>();
@@ -556,7 +584,7 @@ public final class TaskExecutor {
   private List<NextIntraTaskOperatorInfo> getInternalMainOutputs(
     final IRVertex irVertex,
     final DAG<IRVertex, RuntimeEdge<IRVertex>> irVertexDag,
-    final Map<RuntimeEdge<IRVertex>, Integer> edgeIndexMap,
+    final Map<Edge, Integer> edgeIndexMap,
     final Map<IRVertex, InputWatermarkManager> operatorWatermarkManagerMap) {
 
     return irVertexDag.getOutgoingEdgesOf(irVertex.getId())
diff --git a/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/datatransfer/InputWatermarkManagerTest.java b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/datatransfer/InputWatermarkManagerTest.java
index 9303da8..5242d46 100644
--- a/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/datatransfer/InputWatermarkManagerTest.java
+++ b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/datatransfer/InputWatermarkManagerTest.java
@@ -47,7 +47,7 @@ public final class InputWatermarkManagerTest {
 
     final OperatorVertex operatorVertex = new OperatorVertex(transform);
     final InputWatermarkManager watermarkManager =
-      new MultiInputWatermarkManager(3, operatorVertex);
+      new MultiInputWatermarkManager(3, new OperatorWatermarkCollector(operatorVertex));
 
     //edge1: 10 s
     //edge2: 5 s
diff --git a/runtime/master/src/main/java/org/apache/nemo/runtime/master/resource/ExecutorRepresenter.java b/runtime/master/src/main/java/org/apache/nemo/runtime/master/resource/ExecutorRepresenter.java
index 63d98d5..bf6279c 100644
--- a/runtime/master/src/main/java/org/apache/nemo/runtime/master/resource/ExecutorRepresenter.java
+++ b/runtime/master/src/main/java/org/apache/nemo/runtime/master/resource/ExecutorRepresenter.java
@@ -29,6 +29,8 @@ import org.apache.nemo.runtime.common.message.MessageSender;
 import org.apache.nemo.runtime.common.plan.Task;
 import org.apache.commons.lang3.SerializationUtils;
 import org.apache.reef.driver.context.ActiveContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import javax.annotation.concurrent.NotThreadSafe;
 import java.util.*;
@@ -49,6 +51,8 @@ import java.util.stream.Stream;
  */
 @NotThreadSafe
 public final class ExecutorRepresenter {
+  private static final Logger LOG = LoggerFactory.getLogger(ExecutorRepresenter.class.getName());
+
   private final String executorId;
   private final ResourceSpecification resourceSpecification;
   private final Map<String, Task> runningComplyingTasks;
@@ -113,18 +117,20 @@ public final class ExecutorRepresenter {
         ? runningComplyingTasks : runningNonComplyingTasks).put(task.getTaskId(), task);
     runningTaskToAttempt.put(task, task.getAttemptIdx());
     failedTasks.remove(task);
-    serializationExecutorService.submit(() -> {
+
+
+    serializationExecutorService.execute(() -> {
       final byte[] serialized = SerializationUtils.serialize(task);
       sendControlMessage(
-          ControlMessage.Message.newBuilder()
-              .setId(RuntimeIdManager.generateMessageId())
-              .setListenerId(MessageEnvironment.EXECUTOR_MESSAGE_LISTENER_ID)
-              .setType(ControlMessage.MessageType.ScheduleTask)
-              .setScheduleTaskMsg(
-                  ControlMessage.ScheduleTaskMsg.newBuilder()
-                      .setTask(ByteString.copyFrom(serialized))
-                      .build())
-              .build());
+        ControlMessage.Message.newBuilder()
+          .setId(RuntimeIdManager.generateMessageId())
+          .setListenerId(MessageEnvironment.EXECUTOR_MESSAGE_LISTENER_ID)
+          .setType(ControlMessage.MessageType.ScheduleTask)
+          .setScheduleTaskMsg(
+            ControlMessage.ScheduleTaskMsg.newBuilder()
+              .setTask(ByteString.copyFrom(serialized))
+              .build())
+          .build());
     });
   }