You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nemo.apache.org by ja...@apache.org on 2018/10/30 01:13:23 UTC

[incubator-nemo] branch master updated: [NEMO-8] Implement PipeManagerMaster/Worker (#129)

This is an automated email from the ASF dual-hosted git repository.

jangho pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git


The following commit(s) were added to refs/heads/master by this push:
     new 0c826c2  [NEMO-8] Implement PipeManagerMaster/Worker (#129)
0c826c2 is described below

commit 0c826c26a547597b9f97e24c809ff073c2ffd30b
Author: John Yang <jo...@gmail.com>
AuthorDate: Tue Oct 30 10:13:18 2018 +0900

    [NEMO-8] Implement PipeManagerMaster/Worker (#129)
    
    JIRA: [NEMO-8: Implement PipeManagerMaster/Worker](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-8)
    
    **Major changes:**
    - Supports fully-pipelined data streaming for bounded sources (not unbounded sources)
      - Tasks do 'finish' after processing all input data, as the data is finite
      - When a tasks finishes, it emits all data it has (e.g., GroupByKey accumulated results) and closes corresponding outgoing pipes, notifying downstream tasks the end of the pipes
      - For stream-processing unbounded sources, we need watermarks (https://issues.apache.org/jira/browse/NEMO-233)
    - Introduces PipeManagerMaster/Worker
      - Shares code with BlockManagerMaster/Worker
    - Naive, Element-wise serialization+compression+writeAndFlush
      - Very likely that this will cause some serious overheads. Will run proper benchmarks and fix the issues in a later PR.
    
    **Minor changes to note:**
    - JobConf#SchedulerImplClassName: Batch and Streaming options
    - StreamingPolicyParallelismFive: The default policy + PipeTransferEverythingPass
    - Fixes the StreamingScheduler to pass the new streaming integration tests
    - Fixes a coder bug in the Beam frontend (PCollectionView coder)
    
    **Tests for the changes:**
    - WindowedWordCountITCase#testStreamingFixedWindow
    - WindowedWordCountITCase#testStreamingSlidingWindow
    
    **Other comments:**
    - Also closes "Implement common API for data transfer" (https://issues.apache.org/jira/browse/NEMO-9)
    
    Closes #129
---
 checkstyle.xml                                     |   2 +-
 .../java/org/apache/nemo/client/JobLauncher.java   |  15 +-
 .../edge/executionproperty/DataStoreProperty.java  |   1 +
 .../org/apache/nemo/common/test/ArgBuilder.java    |   9 +
 .../compiler/frontend/beam/PipelineTranslator.java |  18 +-
 .../annotating/PipeTransferForAllEdgesPass.java    |  47 +++++
 .../main/java/org/apache/nemo/conf/JobConf.java    |   8 +
 .../examples/beam/WindowedWordCountITCase.java     |  48 ++++-
 .../policy/StreamingPolicyParallelismFive.java     |  57 ++++++
 .../runtime/common/message/MessageEnvironment.java |   1 +
 .../common/message/ncs/NcsMessageEnvironment.java  |   7 +
 .../nemo/runtime/common/plan/RuntimeEdge.java      |  15 ++
 runtime/common/src/main/proto/ControlMessage.proto |  33 +++-
 .../org/apache/nemo/runtime/executor/Executor.java |  10 +-
 .../executor/bytetransfer/ByteTransfer.java        |  12 +-
 .../executor/bytetransfer/ByteTransferContext.java |  10 +-
 .../ByteTransportChannelInitializer.java           |  11 +-
 .../executor/bytetransfer/ContextManager.java      |  40 +++--
 .../executor/bytetransfer/ControlFrameEncoder.java |   1 +
 .../runtime/executor/data/BlockManagerWorker.java  |  10 +-
 .../executor/data/BroadcastManagerWorker.java      |   1 +
 .../nemo/runtime/executor/data/PipeContainer.java  | 150 ++++++++++++++++
 .../runtime/executor/data/PipeManagerWorker.java   | 199 +++++++++++++++++++++
 .../{InputReader.java => BlockInputReader.java}    | 125 +++++--------
 .../{OutputWriter.java => BlockOutputWriter.java}  |  90 +++-------
 .../executor/datatransfer/DataTransfer.java        |  40 -----
 .../runtime/executor/datatransfer/InputReader.java | 155 +---------------
 ...Factory.java => IntermediateDataIOFactory.java} |  35 +++-
 .../executor/datatransfer/OutputWriter.java        | 158 ++++------------
 .../executor/datatransfer/PipeInputReader.java     |  82 +++++++++
 .../executor/datatransfer/PipeOutputWriter.java    | 136 ++++++++++++++
 .../executor/task/ParentTaskDataFetcher.java       |   2 +
 .../nemo/runtime/executor/task/TaskExecutor.java   |  42 ++---
 .../executor/datatransfer/DataTransferTest.java    |  77 +++++---
 .../executor/task/ParentTaskDataFetcherTest.java   |   6 +-
 .../runtime/executor/task/TaskExecutorTest.java    |  25 ++-
 .../nemo/runtime/master/PipeManagerMaster.java     | 157 ++++++++++++++++
 .../nemo/runtime/master/scheduler/Scheduler.java   |   2 -
 .../master/scheduler/StreamingScheduler.java       |  29 ++-
 .../runtime/master/scheduler/TaskDispatcher.java   |  10 +-
 .../master/scheduler/StreamingSchedulerTest.java   |   8 +-
 .../runtime/master/scheduler/TaskRetryTest.java    |   2 +-
 42 files changed, 1298 insertions(+), 588 deletions(-)

diff --git a/checkstyle.xml b/checkstyle.xml
index 1ab4089..3d52af6 100644
--- a/checkstyle.xml
+++ b/checkstyle.xml
@@ -169,7 +169,7 @@ under the License.
         <module name="InterfaceIsType"/>
         <module name="VisibilityModifier"/>
 
-        <!-- Miscellaneous other checks.                   -->
+      <!-- Miscellaneous other checks.                   -->
         <!-- See http://checkstyle.sf.net/config_misc.html -->
         <module name="ArrayTypeStyle"/>
         <module name="FinalParameters"/>
diff --git a/client/src/main/java/org/apache/nemo/client/JobLauncher.java b/client/src/main/java/org/apache/nemo/client/JobLauncher.java
index 7e572df..5a54e6b 100644
--- a/client/src/main/java/org/apache/nemo/client/JobLauncher.java
+++ b/client/src/main/java/org/apache/nemo/client/JobLauncher.java
@@ -27,6 +27,7 @@ import org.apache.nemo.runtime.common.comm.ControlMessage;
 import org.apache.nemo.runtime.common.message.MessageEnvironment;
 import org.apache.nemo.runtime.common.message.MessageParameters;
 import org.apache.commons.lang3.SerializationUtils;
+import org.apache.nemo.runtime.master.scheduler.Scheduler;
 import org.apache.reef.client.DriverConfiguration;
 import org.apache.reef.client.DriverLauncher;
 import org.apache.reef.client.parameters.JobMessageHandler;
@@ -109,10 +110,11 @@ public final class JobLauncher {
     final Configuration bandwidthConfig = getJSONConf(builtJobConf, JobConf.BandwidthJSONPath.class,
         JobConf.BandwidthJSONContents.class);
     final Configuration clientConf = getClientConf();
+    final Configuration schedulerConf = getSchedulerConf(builtJobConf);
 
     // Merge Job and Driver Confs
     jobAndDriverConf = Configurations.merge(builtJobConf, driverConf, driverNcsConf, driverMessageConfg,
-        executorResourceConfig, bandwidthConfig, driverRPCServer.getListeningConfiguration());
+        executorResourceConfig, bandwidthConfig, driverRPCServer.getListeningConfiguration(), schedulerConf);
 
     // Get DeployMode Conf
     deployModeConf = Configurations.merge(getDeployModeConf(builtJobConf), clientConf);
@@ -249,6 +251,16 @@ public final class JobLauncher {
     return jcb.build();
   }
 
+  private static Configuration getSchedulerConf(final Configuration jobConf)
+    throws ClassNotFoundException, InjectionException {
+    final Injector injector = TANG.newInjector(jobConf);
+    final String classImplName = injector.getNamedInstance(JobConf.SchedulerImplClassName.class);
+    final JavaConfigurationBuilder jcb = Tang.Factory.getTang().newConfigurationBuilder();
+    final Class schedulerImpl = ((Class<Scheduler>) Class.forName(classImplName));
+    jcb.bindImplementation(Scheduler.class, schedulerImpl);
+    return jcb.build();
+  }
+
   /**
    * Get driver ncs configuration.
    *
@@ -331,6 +343,7 @@ public final class JobLauncher {
     cl.registerShortNameOfClass(JobConf.PartitionTransportServerNumWorkingThreads.class);
     cl.registerShortNameOfClass(JobConf.PartitionTransportClientNumThreads.class);
     cl.registerShortNameOfClass(JobConf.MaxNumDownloadsForARuntimeEdge.class);
+    cl.registerShortNameOfClass(JobConf.SchedulerImplClassName.class);
     cl.processCommandLine(args);
     return confBuilder.build();
   }
diff --git a/common/src/main/java/org/apache/nemo/common/ir/edge/executionproperty/DataStoreProperty.java b/common/src/main/java/org/apache/nemo/common/ir/edge/executionproperty/DataStoreProperty.java
index 8a60871..e53157b 100644
--- a/common/src/main/java/org/apache/nemo/common/ir/edge/executionproperty/DataStoreProperty.java
+++ b/common/src/main/java/org/apache/nemo/common/ir/edge/executionproperty/DataStoreProperty.java
@@ -45,6 +45,7 @@ public final class DataStoreProperty extends EdgeExecutionProperty<DataStoreProp
    * Possible values of DataStore ExecutionProperty.
    */
   public enum Value {
+    Pipe,
     MemoryStore,
     SerializedMemoryStore,
     LocalFileStore,
diff --git a/common/src/main/java/org/apache/nemo/common/test/ArgBuilder.java b/common/src/main/java/org/apache/nemo/common/test/ArgBuilder.java
index cd8e083..954015c 100644
--- a/common/src/main/java/org/apache/nemo/common/test/ArgBuilder.java
+++ b/common/src/main/java/org/apache/nemo/common/test/ArgBuilder.java
@@ -98,6 +98,15 @@ public final class ArgBuilder {
   }
 
   /**
+   * @param schedulerName scheduler.
+   * @return builder with the scheduler.
+   */
+  public ArgBuilder addScheduler(final String schedulerName) {
+    args.add(Arrays.asList("-scheduler_impl_class_name", schedulerName));
+    return this;
+  }
+
+  /**
    * @return the built arguments.
    */
   public String[] build() {
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
index 58a6d4a..7dc7af6 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
@@ -419,21 +419,22 @@ public final class PipelineTranslator
    */
   private static Coder<?> getCoderForView(final PCollectionView view, final CompositeTransformVertex pipeline) {
     final PrimitiveTransformVertex src = pipeline.getPrimitiveProducerOf(view);
-    final Coder<?> baseCoder = src.getNode().getInputs().values().stream()
-      .filter(v -> v instanceof PCollection).map(v -> (PCollection) v).findFirst()
+    final Coder<?> baseCoder = src.getNode().getOutputs().values().stream()
+      .filter(v -> v instanceof PCollection)
+      .map(v -> (PCollection) v)
+      .findFirst()
       .orElseThrow(() -> new RuntimeException(String.format("No incoming PCollection to %s", src)))
       .getCoder();
+    final KvCoder<?, ?> inputKVCoder = (KvCoder) baseCoder;
     final ViewFn viewFn = view.getViewFn();
     if (viewFn instanceof PCollectionViews.IterableViewFn) {
-      return IterableCoder.of(baseCoder);
+      return IterableCoder.of(inputKVCoder.getValueCoder());
     } else if (viewFn instanceof PCollectionViews.ListViewFn) {
-      return ListCoder.of(baseCoder);
+      return ListCoder.of(inputKVCoder.getValueCoder());
     } else if (viewFn instanceof PCollectionViews.MapViewFn) {
-      final KvCoder<?, ?> inputCoder = (KvCoder) baseCoder;
-      return MapCoder.of(inputCoder.getKeyCoder(), inputCoder.getValueCoder());
+      return MapCoder.of(inputKVCoder.getKeyCoder(), inputKVCoder.getValueCoder());
     } else if (viewFn instanceof PCollectionViews.MultimapViewFn) {
-      final KvCoder<?, ?> inputCoder = (KvCoder) baseCoder;
-      return MapCoder.of(inputCoder.getKeyCoder(), IterableCoder.of(inputCoder.getValueCoder()));
+      return MapCoder.of(inputKVCoder.getKeyCoder(), IterableCoder.of(inputKVCoder.getValueCoder()));
     } else if (viewFn instanceof PCollectionViews.SingletonViewFn) {
       return baseCoder;
     } else {
@@ -676,6 +677,7 @@ public final class PipelineTranslator
   private static final class OneToOneCommunicationPatternSelector
       implements BiFunction<IRVertex, IRVertex, CommunicationPatternProperty.Value> {
     private static final OneToOneCommunicationPatternSelector INSTANCE = new OneToOneCommunicationPatternSelector();
+
     @Override
     public CommunicationPatternProperty.Value apply(final IRVertex src, final IRVertex dst) {
       return CommunicationPatternProperty.Value.OneToOne;
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/PipeTransferForAllEdgesPass.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/PipeTransferForAllEdgesPass.java
new file mode 100644
index 0000000..25999dc
--- /dev/null
+++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/PipeTransferForAllEdgesPass.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.compiler.optimizer.pass.compiletime.annotating;
+
+import org.apache.nemo.common.dag.DAG;
+import org.apache.nemo.common.ir.edge.IREdge;
+import org.apache.nemo.common.ir.edge.executionproperty.DataStoreProperty;
+import org.apache.nemo.common.ir.vertex.IRVertex;
+
+/**
+ * Annotate 'Pipe' on all edges.
+ */
+@Annotates(DataStoreProperty.class)
+public final class PipeTransferForAllEdgesPass extends AnnotatingPass {
+  /**
+   * Default constructor.
+   */
+  public PipeTransferForAllEdgesPass() {
+    super(PipeTransferForAllEdgesPass.class);
+  }
+
+  @Override
+  public DAG<IRVertex, IREdge> apply(final DAG<IRVertex, IREdge> dag) {
+    dag.getVertices().forEach(vertex -> {
+      dag.getIncomingEdgesOf(vertex).stream()
+          .forEach(edge -> edge.setPropertyPermanently(
+              DataStoreProperty.of(DataStoreProperty.Value.Pipe)));
+    });
+    return dag;
+  }
+}
diff --git a/conf/src/main/java/org/apache/nemo/conf/JobConf.java b/conf/src/main/java/org/apache/nemo/conf/JobConf.java
index b78c9fa..7de0e18 100644
--- a/conf/src/main/java/org/apache/nemo/conf/JobConf.java
+++ b/conf/src/main/java/org/apache/nemo/conf/JobConf.java
@@ -135,6 +135,14 @@ public final class JobConf extends ConfigurationModuleBuilder {
   public final class MaxTaskAttempt implements Name<Integer> {
   }
 
+  /**
+   * Scheduler impl.
+   */
+  @NamedParameter(doc = "Class name of the scheduler to use", short_name = "scheduler_impl_class_name",
+    default_value = "org.apache.nemo.runtime.master.scheduler.BatchScheduler")
+  public final class SchedulerImplClassName implements Name<String> {
+  }
+
   //////////////////////////////// Runtime Executor Configurations
 
   /**
diff --git a/examples/beam/src/test/java/org/apache/nemo/examples/beam/WindowedWordCountITCase.java b/examples/beam/src/test/java/org/apache/nemo/examples/beam/WindowedWordCountITCase.java
index ed6e0eb..b523543 100644
--- a/examples/beam/src/test/java/org/apache/nemo/examples/beam/WindowedWordCountITCase.java
+++ b/examples/beam/src/test/java/org/apache/nemo/examples/beam/WindowedWordCountITCase.java
@@ -21,11 +21,8 @@ package org.apache.nemo.examples.beam;
 import org.apache.nemo.client.JobLauncher;
 import org.apache.nemo.common.test.ArgBuilder;
 import org.apache.nemo.common.test.ExampleTestUtil;
-import org.apache.nemo.compiler.optimizer.policy.ConditionalLargeShufflePolicy;
 import org.apache.nemo.compiler.optimizer.policy.DefaultPolicy;
 import org.apache.nemo.examples.beam.policy.*;
-import org.junit.After;
-import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.powermock.core.classloader.annotations.PrepareForTest;
@@ -50,7 +47,7 @@ public final class WindowedWordCountITCase {
   private static final String outputFilePath =  fileBasePath + outputFileName;
 
   @Test (timeout = TIMEOUT)
-  public void testFixedWindow() throws Exception {
+  public void testBatchFixedWindow() throws Exception {
     builder = new ArgBuilder()
       .addUserMain(WindowedWordCount.class.getCanonicalName())
       .addUserArgs(inputFilePath, outputFilePath, "fixed");
@@ -70,7 +67,7 @@ public final class WindowedWordCountITCase {
 
 
   @Test (timeout = TIMEOUT)
-  public void testSlidingWindow() throws Exception {
+  public void testBatchSlidingWindow() throws Exception {
     builder = new ArgBuilder()
       .addUserMain(WindowedWordCount.class.getCanonicalName())
       .addUserArgs(inputFilePath, outputFilePath, "sliding");
@@ -87,4 +84,45 @@ public final class WindowedWordCountITCase {
       ExampleTestUtil.deleteOutputFile(fileBasePath, outputFileName);
     }
   }
+
+  @Test (timeout = TIMEOUT)
+  public void testStreamingSchedulerAndPipeFixedWindow() throws Exception {
+    builder = new ArgBuilder()
+      .addScheduler("org.apache.nemo.runtime.master.scheduler.StreamingScheduler")
+      .addUserMain(WindowedWordCount.class.getCanonicalName())
+      .addUserArgs(inputFilePath, outputFilePath, "fixed");
+
+    JobLauncher.main(builder
+      .addResourceJson(executorResourceFileName)
+      .addJobId(WindowedWordCountITCase.class.getSimpleName())
+      .addOptimizationPolicy(StreamingPolicyParallelismFive.class.getCanonicalName())
+      .build());
+
+    try {
+      ExampleTestUtil.ensureOutputValidity(fileBasePath, outputFileName, expectedOutputFileName);
+    } finally {
+      ExampleTestUtil.deleteOutputFile(fileBasePath, outputFileName);
+    }
+  }
+
+
+  @Test (timeout = TIMEOUT)
+  public void testStreamingSchedulerAndPipeSlidingWindow() throws Exception {
+    builder = new ArgBuilder()
+      .addScheduler("org.apache.nemo.runtime.master.scheduler.StreamingScheduler")
+      .addUserMain(WindowedWordCount.class.getCanonicalName())
+      .addUserArgs(inputFilePath, outputFilePath, "sliding");
+
+    JobLauncher.main(builder
+      .addResourceJson(executorResourceFileName)
+      .addJobId(WindowedWordCountITCase.class.getSimpleName())
+      .addOptimizationPolicy(StreamingPolicyParallelismFive.class.getCanonicalName())
+      .build());
+
+    try {
+      ExampleTestUtil.ensureOutputValidity(fileBasePath, outputFileName, expectedSlidingWindowOutputFileName);
+    } finally {
+      ExampleTestUtil.deleteOutputFile(fileBasePath, outputFileName);
+    }
+  }
 }
diff --git a/examples/beam/src/test/java/org/apache/nemo/examples/beam/policy/StreamingPolicyParallelismFive.java b/examples/beam/src/test/java/org/apache/nemo/examples/beam/policy/StreamingPolicyParallelismFive.java
new file mode 100644
index 0000000..ee59ea3
--- /dev/null
+++ b/examples/beam/src/test/java/org/apache/nemo/examples/beam/policy/StreamingPolicyParallelismFive.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.examples.beam.policy;
+
+import org.apache.nemo.common.dag.DAG;
+import org.apache.nemo.common.eventhandler.PubSubEventHandlerWrapper;
+import org.apache.nemo.common.ir.edge.IREdge;
+import org.apache.nemo.common.ir.vertex.IRVertex;
+import org.apache.nemo.compiler.optimizer.pass.compiletime.annotating.PipeTransferForAllEdgesPass;
+import org.apache.nemo.compiler.optimizer.policy.DefaultPolicy;
+import org.apache.nemo.compiler.optimizer.policy.Policy;
+import org.apache.nemo.compiler.optimizer.policy.PolicyBuilder;
+import org.apache.reef.tang.Injector;
+
+/**
+ * Streaming policy.
+ */
+public final class StreamingPolicyParallelismFive implements Policy {
+  private final Policy policy;
+
+  /**
+   * Default constructor.
+   */
+  public StreamingPolicyParallelismFive() {
+    final PolicyBuilder builder = new PolicyBuilder();
+    PolicyTestUtil.overwriteParallelism(5, DefaultPolicy.BUILDER.getCompileTimePasses())
+      .forEach(ctPass -> builder.registerCompileTimePass(ctPass));
+    builder.registerCompileTimePass(new PipeTransferForAllEdgesPass());
+    this.policy = builder.build();
+  }
+
+  @Override
+  public DAG<IRVertex, IREdge> runCompileTimeOptimization(final DAG<IRVertex, IREdge> dag, final String dagDirectory) {
+    return this.policy.runCompileTimeOptimization(dag, dagDirectory);
+  }
+
+  @Override
+  public void registerRunTimeOptimizations(final Injector injector, final PubSubEventHandlerWrapper pubSubWrapper) {
+    this.policy.registerRunTimeOptimizations(injector, pubSubWrapper);
+  }
+}
diff --git a/runtime/common/src/main/java/org/apache/nemo/runtime/common/message/MessageEnvironment.java b/runtime/common/src/main/java/org/apache/nemo/runtime/common/message/MessageEnvironment.java
index 900ae31..1eb3141 100644
--- a/runtime/common/src/main/java/org/apache/nemo/runtime/common/message/MessageEnvironment.java
+++ b/runtime/common/src/main/java/org/apache/nemo/runtime/common/message/MessageEnvironment.java
@@ -35,6 +35,7 @@ public interface MessageEnvironment {
   // The globally known message listener IDs.
   String RUNTIME_MASTER_MESSAGE_LISTENER_ID = "RUNTIME_MASTER_MESSAGE_LISTENER_ID";
   String BLOCK_MANAGER_MASTER_MESSAGE_LISTENER_ID = "BLOCK_MANAGER_MASTER_MESSAGE_LISTENER_ID";
+  String PIPE_MANAGER_MASTER_MESSAGE_LISTENER_ID = "PIPE_MANAGER_MASTER_MESSAGE_LISTENER_ID";
   String EXECUTOR_MESSAGE_LISTENER_ID = "EXECUTOR_MESSAGE_LISTENER_ID";
 
   /**
diff --git a/runtime/common/src/main/java/org/apache/nemo/runtime/common/message/ncs/NcsMessageEnvironment.java b/runtime/common/src/main/java/org/apache/nemo/runtime/common/message/ncs/NcsMessageEnvironment.java
index 25f7e5e..f335a7f 100644
--- a/runtime/common/src/main/java/org/apache/nemo/runtime/common/message/ncs/NcsMessageEnvironment.java
+++ b/runtime/common/src/main/java/org/apache/nemo/runtime/common/message/ncs/NcsMessageEnvironment.java
@@ -206,12 +206,15 @@ public final class NcsMessageEnvironment implements MessageEnvironment {
       case MetricMessageReceived:
       case RequestMetricFlush:
       case MetricFlushed:
+      case PipeInit:
         return MessageType.Send;
       case RequestBlockLocation:
       case RequestBroadcastVariable:
+      case RequestPipeLoc:
         return MessageType.Request;
       case BlockLocationInfo:
       case InMasterBroadcastVariable:
+      case PipeLocInfo:
         return MessageType.Reply;
       default:
         throw new IllegalArgumentException(controlMessage.toString());
@@ -224,6 +227,8 @@ public final class NcsMessageEnvironment implements MessageEnvironment {
         return controlMessage.getRequestBlockLocationMsg().getExecutorId();
       case RequestBroadcastVariable:
         return controlMessage.getRequestbroadcastVariableMsg().getExecutorId();
+      case RequestPipeLoc:
+        return controlMessage.getRequestPipeLocMsg().getExecutorId();
       default:
         throw new IllegalArgumentException(controlMessage.toString());
     }
@@ -235,6 +240,8 @@ public final class NcsMessageEnvironment implements MessageEnvironment {
         return controlMessage.getBlockLocationInfoMsg().getRequestId();
       case InMasterBroadcastVariable:
         return controlMessage.getBroadcastVariableMsg().getRequestId();
+      case PipeLocInfo:
+        return controlMessage.getPipeLocInfoMsg().getRequestId();
       default:
         throw new IllegalArgumentException(controlMessage.toString());
     }
diff --git a/runtime/common/src/main/java/org/apache/nemo/runtime/common/plan/RuntimeEdge.java b/runtime/common/src/main/java/org/apache/nemo/runtime/common/plan/RuntimeEdge.java
index b0e70c4..bf98a86 100644
--- a/runtime/common/src/main/java/org/apache/nemo/runtime/common/plan/RuntimeEdge.java
+++ b/runtime/common/src/main/java/org/apache/nemo/runtime/common/plan/RuntimeEdge.java
@@ -64,6 +64,21 @@ public class RuntimeEdge<V extends Vertex> extends Edge<V> {
   }
 
   /**
+   * @param executionPropertyKey key
+   * @param <T> type
+   * @return the value
+   */
+  public final <T extends Serializable> T getPropertyValueOrRuntimeException(
+    final Class<? extends EdgeExecutionProperty<T>> executionPropertyKey) {
+    final Optional<T> optional = getPropertyValue(executionPropertyKey);
+    if (optional.isPresent()) {
+      return optional.get();
+    } else {
+      throw new IllegalStateException(executionPropertyKey.toString());
+    }
+  }
+
+  /**
    * @return the ExecutionPropertyMap of the Runtime Edge.
    */
   public final ExecutionPropertyMap<EdgeExecutionProperty> getExecutionProperties() {
diff --git a/runtime/common/src/main/proto/ControlMessage.proto b/runtime/common/src/main/proto/ControlMessage.proto
index 7cc62e0..617f50f 100644
--- a/runtime/common/src/main/proto/ControlMessage.proto
+++ b/runtime/common/src/main/proto/ControlMessage.proto
@@ -68,6 +68,9 @@ enum MessageType {
     MetricFlushed = 10;
     RequestBroadcastVariable = 11;
     InMasterBroadcastVariable = 12;
+    PipeInit = 13;
+    RequestPipeLoc = 14;
+    PipeLocInfo = 15;
 }
 
 message Message {
@@ -86,6 +89,9 @@ message Message {
     optional DataCollectMessage dataCollected = 13;
     optional RequestBroadcastVariableMessage requestbroadcastVariableMsg = 14;
     optional InMasterBroadcastVariableMessage broadcastVariableMsg = 15;
+    optional PipeInitMessage pipeInitMsg = 16;
+    optional RequestPipeLocationMessage requestPipeLocMsg = 17;
+    optional PipeLocationInfoMessage pipeLocInfoMsg = 18;
 }
 
 // Messages from Master to Executors
@@ -162,15 +168,23 @@ message ByteTransferContextSetupMessage {
     required int32 transferIndex = 2;
     required ByteTransferDataDirection dataDirection = 3;
     required bytes contextDescriptor = 4;
+    required bool isPipe = 5;
 }
 
-message ByteTransferContextDescriptor {
+message BlockTransferContextDescriptor {
     required string blockId = 1;
     required BlockStore blockStore = 2;
     required string runtimeEdgeId = 3;
     optional bytes keyRange = 4;
 }
 
+message PipeTransferContextDescriptor {
+  required int64 srcTaskIndex = 1;
+  required string runtimeEdgeId = 2;
+  required int64 dstTaskIndex = 3;
+  required int64 numPipeToWait = 4;
+}
+
 enum TaskStateFromExecutor {
     READY = 0;
     EXECUTING = 1;
@@ -210,3 +224,20 @@ message InMasterBroadcastVariableMessage {
   required int64 requestId = 1; // To find the matching request msg
   required bytes variable = 2;
 }
+
+message PipeInitMessage {
+  required int64 srcTaskIndex = 1;
+  required string runtimeEdgeId = 2;
+  required string executorId = 3;
+}
+
+message RequestPipeLocationMessage {
+  required string executorId = 1;
+  required int64 srcTaskIndex = 2;
+  required string runtimeEdgeId = 3;
+}
+
+message PipeLocationInfoMessage {
+  required int64 requestId = 1; // To find the matching request msg
+  required string executorId = 2;
+}
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/Executor.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/Executor.java
index b7597a8..b218bf8 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/Executor.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/Executor.java
@@ -38,7 +38,7 @@ import org.apache.nemo.runtime.common.plan.RuntimeEdge;
 import org.apache.nemo.runtime.common.plan.Task;
 import org.apache.nemo.runtime.executor.data.BroadcastManagerWorker;
 import org.apache.nemo.runtime.executor.data.SerializerManager;
-import org.apache.nemo.runtime.executor.datatransfer.DataTransferFactory;
+import org.apache.nemo.runtime.executor.datatransfer.IntermediateDataIOFactory;
 import org.apache.nemo.runtime.executor.task.TaskExecutor;
 import org.apache.commons.lang3.SerializationUtils;
 import org.apache.commons.lang3.concurrent.BasicThreadFactory;
@@ -71,7 +71,7 @@ public final class Executor {
   /**
    * Factory of InputReader/OutputWriter for executing tasks groups.
    */
-  private final DataTransferFactory dataTransferFactory;
+  private final IntermediateDataIOFactory intermediateDataIOFactory;
 
   private final BroadcastManagerWorker broadcastManagerWorker;
 
@@ -84,7 +84,7 @@ public final class Executor {
                    final PersistentConnectionToMasterMap persistentConnectionToMasterMap,
                    final MessageEnvironment messageEnvironment,
                    final SerializerManager serializerManager,
-                   final DataTransferFactory dataTransferFactory,
+                   final IntermediateDataIOFactory intermediateDataIOFactory,
                    final BroadcastManagerWorker broadcastManagerWorker,
                    final MetricManagerWorker metricMessageSender) {
     this.executorId = executorId;
@@ -93,7 +93,7 @@ public final class Executor {
         .build());
     this.persistentConnectionToMasterMap = persistentConnectionToMasterMap;
     this.serializerManager = serializerManager;
-    this.dataTransferFactory = dataTransferFactory;
+    this.intermediateDataIOFactory = intermediateDataIOFactory;
     this.broadcastManagerWorker = broadcastManagerWorker;
     this.metricMessageSender = metricMessageSender;
     messageEnvironment.setupListener(MessageEnvironment.EXECUTOR_MESSAGE_LISTENER_ID, new ExecutorMessageReceiver());
@@ -138,7 +138,7 @@ public final class Executor {
             e.getPropertyValue(DecompressionProperty.class).orElse(null)));
       });
 
-      new TaskExecutor(task, irDag, taskStateManager, dataTransferFactory, broadcastManagerWorker,
+      new TaskExecutor(task, irDag, taskStateManager, intermediateDataIOFactory, broadcastManagerWorker,
           metricMessageSender, persistentConnectionToMasterMap).execute();
     } catch (final Exception e) {
       persistentConnectionToMasterMap.getMessageSender(MessageEnvironment.RUNTIME_MASTER_MESSAGE_LISTENER_ID).send(
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/bytetransfer/ByteTransfer.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/bytetransfer/ByteTransfer.java
index 1a09f3e..ef49c36 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/bytetransfer/ByteTransfer.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/bytetransfer/ByteTransfer.java
@@ -52,22 +52,26 @@ public final class ByteTransfer {
    * Initiate a transfer context to receive data.
    * @param executorId        the id of the remote executor
    * @param contextDescriptor user-provided descriptor for the new context
+   * @param isPipe            is pipe
    * @return a {@link ByteInputContext} from which the received data can be read
    */
   public CompletableFuture<ByteInputContext> newInputContext(final String executorId,
-                                                             final byte[] contextDescriptor) {
-    return connectTo(executorId).thenApply(manager -> manager.newInputContext(executorId, contextDescriptor));
+                                                             final byte[] contextDescriptor,
+                                                             final boolean isPipe) {
+    return connectTo(executorId).thenApply(manager -> manager.newInputContext(executorId, contextDescriptor, isPipe));
   }
 
   /**
    * Initiate a transfer context to send data.
    * @param executorId         the id of the remote executor
    * @param contextDescriptor  user-provided descriptor for the new context
+   * @param isPipe            is pipe
    * @return a {@link ByteOutputContext} to which data can be written
    */
   public CompletableFuture<ByteOutputContext> newOutputContext(final String executorId,
-                                                               final byte[] contextDescriptor) {
-    return connectTo(executorId).thenApply(manager -> manager.newOutputContext(executorId, contextDescriptor));
+                                                               final byte[] contextDescriptor,
+                                                               final boolean isPipe) {
+    return connectTo(executorId).thenApply(manager -> manager.newOutputContext(executorId, contextDescriptor, isPipe));
   }
 
   /**
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/bytetransfer/ByteTransferContext.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/bytetransfer/ByteTransferContext.java
index de2af97..55226f3 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/bytetransfer/ByteTransferContext.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/bytetransfer/ByteTransferContext.java
@@ -141,6 +141,7 @@ public abstract class ByteTransferContext {
     private final String partnerExecutorId;
     private final ByteTransferDataDirection dataDirection;
     private final int transferIndex;
+    private final boolean isPipe;
 
     /**
      * Create {@link ContextId}.
@@ -148,15 +149,18 @@ public abstract class ByteTransferContext {
      * @param partnerExecutorId   the other executor
      * @param dataDirection       the direction of the data flow
      * @param transferIndex       an index issued by the initiator
+     * @param isPipe              is a pipe context
      */
     ContextId(final String initiatorExecutorId,
               final String partnerExecutorId,
               final ByteTransferDataDirection dataDirection,
-              final int transferIndex) {
+              final int transferIndex,
+              final boolean isPipe) {
       this.initiatorExecutorId = initiatorExecutorId;
       this.partnerExecutorId = partnerExecutorId;
       this.dataDirection = dataDirection;
       this.transferIndex = transferIndex;
+      this.isPipe = isPipe;
     }
 
     public String getInitiatorExecutorId() {
@@ -167,6 +171,10 @@ public abstract class ByteTransferContext {
       return partnerExecutorId;
     }
 
+    public boolean isPipe() {
+      return isPipe;
+    }
+
     public ByteTransferDataDirection getDataDirection() {
       return dataDirection;
     }
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/bytetransfer/ByteTransportChannelInitializer.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/bytetransfer/ByteTransportChannelInitializer.java
index 351fba6..ee80527 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/bytetransfer/ByteTransportChannelInitializer.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/bytetransfer/ByteTransportChannelInitializer.java
@@ -22,6 +22,7 @@ import org.apache.nemo.conf.JobConf;
 import org.apache.nemo.runtime.executor.data.BlockManagerWorker;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.socket.SocketChannel;
+import org.apache.nemo.runtime.executor.data.PipeManagerWorker;
 import org.apache.reef.tang.InjectionFuture;
 import org.apache.reef.tang.annotations.Parameter;
 
@@ -59,6 +60,7 @@ import javax.inject.Inject;
  */
 final class ByteTransportChannelInitializer extends ChannelInitializer<SocketChannel> {
 
+  private final InjectionFuture<PipeManagerWorker> pipeManagerWorker;
   private final InjectionFuture<BlockManagerWorker> blockManagerWorker;
   private final InjectionFuture<ByteTransfer> byteTransfer;
   private final InjectionFuture<ByteTransport> byteTransport;
@@ -69,6 +71,7 @@ final class ByteTransportChannelInitializer extends ChannelInitializer<SocketCha
   /**
    * Creates a netty channel initializer.
    *
+   * @param pipeManagerWorker   provides handler for new contexts by remote executors
    * @param blockManagerWorker  provides handler for new contexts by remote executors
    * @param byteTransfer        provides channel caching
    * @param byteTransport       provides {@link io.netty.channel.group.ChannelGroup}
@@ -77,12 +80,14 @@ final class ByteTransportChannelInitializer extends ChannelInitializer<SocketCha
    * @param localExecutorId     the id of this executor
    */
   @Inject
-  private ByteTransportChannelInitializer(final InjectionFuture<BlockManagerWorker> blockManagerWorker,
+  private ByteTransportChannelInitializer(final InjectionFuture<PipeManagerWorker> pipeManagerWorker,
+                                          final InjectionFuture<BlockManagerWorker> blockManagerWorker,
                                           final InjectionFuture<ByteTransfer> byteTransfer,
                                           final InjectionFuture<ByteTransport> byteTransport,
                                           final ControlFrameEncoder controlFrameEncoder,
                                           final DataFrameEncoder dataFrameEncoder,
                                           @Parameter(JobConf.ExecutorId.class) final String localExecutorId) {
+    this.pipeManagerWorker = pipeManagerWorker;
     this.blockManagerWorker = blockManagerWorker;
     this.byteTransfer = byteTransfer;
     this.byteTransport = byteTransport;
@@ -93,8 +98,8 @@ final class ByteTransportChannelInitializer extends ChannelInitializer<SocketCha
 
   @Override
   protected void initChannel(final SocketChannel ch) {
-    final ContextManager contextManager = new ContextManager(blockManagerWorker.get(), byteTransfer.get(),
-        byteTransport.get().getChannelGroup(), localExecutorId, ch);
+    final ContextManager contextManager = new ContextManager(pipeManagerWorker.get(), blockManagerWorker.get(),
+      byteTransfer.get(), byteTransport.get().getChannelGroup(), localExecutorId, ch);
     ch.pipeline()
         // inbound
         .addLast(new FrameDecoder(contextManager))
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/bytetransfer/ContextManager.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/bytetransfer/ContextManager.java
index 377b80c..c694657 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/bytetransfer/ContextManager.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/bytetransfer/ContextManager.java
@@ -24,6 +24,7 @@ import org.apache.nemo.runtime.executor.bytetransfer.ByteTransferContext.Context
 import org.apache.nemo.runtime.executor.data.BlockManagerWorker;
 import io.netty.channel.*;
 import io.netty.channel.group.ChannelGroup;
+import org.apache.nemo.runtime.executor.data.PipeManagerWorker;
 
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -35,6 +36,7 @@ import java.util.function.Function;
  */
 final class ContextManager extends SimpleChannelInboundHandler<ByteTransferContextSetupMessage> {
 
+  private final PipeManagerWorker pipeManagerWorker;
   private final BlockManagerWorker blockManagerWorker;
   private final ByteTransfer byteTransfer;
   private final ChannelGroup channelGroup;
@@ -51,17 +53,20 @@ final class ContextManager extends SimpleChannelInboundHandler<ByteTransferConte
 
   /**
    * Creates context manager for this channel.
+   * @param pipeManagerWorker   provides handler for new contexts by remote executors
    * @param blockManagerWorker  provides handler for new contexts by remote executors
    * @param byteTransfer        provides channel caching
    * @param channelGroup        to cleanup this channel when closing {@link ByteTransport}
    * @param localExecutorId     local executor id
    * @param channel             the {@link Channel} to manage
    */
-  ContextManager(final BlockManagerWorker blockManagerWorker,
+  ContextManager(final PipeManagerWorker pipeManagerWorker,
+                 final BlockManagerWorker blockManagerWorker,
                  final ByteTransfer byteTransfer,
                  final ChannelGroup channelGroup,
                  final String localExecutorId,
                  final Channel channel) {
+    this.pipeManagerWorker = pipeManagerWorker;
     this.blockManagerWorker = blockManagerWorker;
     this.byteTransfer = byteTransfer;
     this.channelGroup = channelGroup;
@@ -103,7 +108,9 @@ final class ContextManager extends SimpleChannelInboundHandler<ByteTransferConte
     byteTransfer.onNewContextByRemoteExecutor(message.getInitiatorExecutorId(), channel);
     final ByteTransferDataDirection dataDirection = message.getDataDirection();
     final int transferIndex = message.getTransferIndex();
-    final ContextId contextId = new ContextId(remoteExecutorId, localExecutorId, dataDirection, transferIndex);
+    final boolean isPipe = message.getIsPipe();
+    final ContextId contextId =
+      new ContextId(remoteExecutorId, localExecutorId, dataDirection, transferIndex, isPipe);
     final byte[] contextDescriptor = message.getContextDescriptor().toByteArray();
 
     if (dataDirection == ByteTransferDataDirection.INITIATOR_SENDS_DATA) {
@@ -113,7 +120,12 @@ final class ContextManager extends SimpleChannelInboundHandler<ByteTransferConte
         }
         return new ByteInputContext(remoteExecutorId, contextId, contextDescriptor, this);
       });
-      blockManagerWorker.onInputContext(context);
+
+      if (isPipe) {
+        pipeManagerWorker.onInputContext(context);
+      } else {
+        blockManagerWorker.onInputContext(context);
+      }
     } else {
       final ByteOutputContext context = outputContextsInitiatedByRemote.compute(transferIndex, (idx, existing) -> {
         if (existing != null) {
@@ -121,7 +133,11 @@ final class ContextManager extends SimpleChannelInboundHandler<ByteTransferConte
         }
         return new ByteOutputContext(remoteExecutorId, contextId, contextDescriptor, this);
       });
-      blockManagerWorker.onOutputContext(context);
+      if (isPipe) {
+        pipeManagerWorker.onOutputContext(context);
+      } else {
+        blockManagerWorker.onOutputContext(context);
+      }
     }
   }
 
@@ -147,16 +163,18 @@ final class ContextManager extends SimpleChannelInboundHandler<ByteTransferConte
    * @param contextGenerator a function that returns context from context id
    * @param executorId id of the remote executor
    * @param <T> {@link ByteInputContext} or {@link ByteOutputContext}
+   * @param isPipe is a pipe context
    * @return generated context
    */
   <T extends ByteTransferContext> T newContext(final ConcurrentMap<Integer, T> contexts,
                                                final AtomicInteger transferIndexCounter,
                                                final ByteTransferDataDirection dataDirection,
                                                final Function<ContextId, T> contextGenerator,
-                                               final String executorId) {
+                                               final String executorId,
+                                               final boolean isPipe) {
     setRemoteExecutorId(executorId);
     final int transferIndex = transferIndexCounter.getAndIncrement();
-    final ContextId contextId = new ContextId(localExecutorId, executorId, dataDirection, transferIndex);
+    final ContextId contextId = new ContextId(localExecutorId, executorId, dataDirection, transferIndex, isPipe);
     final T context = contexts.compute(transferIndex, (index, existingContext) -> {
       if (existingContext != null) {
         throw new RuntimeException(String.format("Duplicate ContextId: %s", contextId));
@@ -171,26 +189,28 @@ final class ContextManager extends SimpleChannelInboundHandler<ByteTransferConte
    * Create a new {@link ByteInputContext}.
    * @param executorId target executor id
    * @param contextDescriptor the context descriptor
+   * @param isPipe            is pipe
    * @return new {@link ByteInputContext}
    */
-  ByteInputContext newInputContext(final String executorId, final byte[] contextDescriptor) {
+  ByteInputContext newInputContext(final String executorId, final byte[] contextDescriptor, final boolean isPipe) {
     return newContext(inputContextsInitiatedByLocal, nextInputTransferIndex,
         ByteTransferDataDirection.INITIATOR_RECEIVES_DATA,
         contextId -> new ByteInputContext(executorId, contextId, contextDescriptor, this),
-        executorId);
+        executorId, isPipe);
   }
 
   /**
    * Create a new {@link ByteOutputContext}.
    * @param executorId target executor id
    * @param contextDescriptor the context descriptor
+   * @param isPipe            is pipe
    * @return new {@link ByteOutputContext}
    */
-  ByteOutputContext newOutputContext(final String executorId, final byte[] contextDescriptor) {
+  ByteOutputContext newOutputContext(final String executorId, final byte[] contextDescriptor, final boolean isPipe) {
     return newContext(outputContextsInitiatedByLocal, nextOutputTransferIndex,
         ByteTransferDataDirection.INITIATOR_SENDS_DATA,
         contextId -> new ByteOutputContext(executorId, contextId, contextDescriptor, this),
-        executorId);
+        executorId, isPipe);
   }
 
   /**
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/bytetransfer/ControlFrameEncoder.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/bytetransfer/ControlFrameEncoder.java
index f303017..836177a 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/bytetransfer/ControlFrameEncoder.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/bytetransfer/ControlFrameEncoder.java
@@ -62,6 +62,7 @@ final class ControlFrameEncoder extends MessageToMessageEncoder<ByteTransferCont
         .setTransferIndex(in.getContextId().getTransferIndex())
         .setDataDirection(in.getContextId().getDataDirection())
         .setContextDescriptor(ByteString.copyFrom(in.getContextDescriptor()))
+        .setIsPipe(in.getContextId().isPipe())
         .build();
     final byte[] frameBody = message.toByteArray();
     out.add(ZEROS.retain());
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/BlockManagerWorker.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/BlockManagerWorker.java
index 6fc53c4..b404592 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/BlockManagerWorker.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/BlockManagerWorker.java
@@ -29,7 +29,6 @@ import org.apache.nemo.common.ir.edge.executionproperty.DataPersistenceProperty;
 import org.apache.nemo.conf.JobConf;
 import org.apache.nemo.runtime.common.RuntimeIdManager;
 import org.apache.nemo.runtime.common.comm.ControlMessage;
-import org.apache.nemo.runtime.common.comm.ControlMessage.ByteTransferContextDescriptor;
 import org.apache.nemo.common.KeyRange;
 import org.apache.nemo.runtime.common.message.MessageEnvironment;
 import org.apache.nemo.runtime.common.message.PersistentConnectionToMasterMap;
@@ -199,7 +198,8 @@ public final class BlockManagerWorker {
         // Block resides in the evaluator
         return getDataFromLocalBlock(blockId, blockStore, keyRange);
       } else {
-        final ByteTransferContextDescriptor descriptor = ByteTransferContextDescriptor.newBuilder()
+        final ControlMessage.BlockTransferContextDescriptor descriptor =
+          ControlMessage.BlockTransferContextDescriptor.newBuilder()
             .setBlockId(blockId)
             .setBlockStore(convertBlockStore(blockStore))
             .setRuntimeEdgeId(runtimeEdgeId)
@@ -207,7 +207,7 @@ public final class BlockManagerWorker {
             .build();
         final CompletableFuture<ByteInputContext> contextFuture = blockTransferThrottler
             .requestTransferPermission(runtimeEdgeId)
-            .thenCompose(obj -> byteTransfer.newInputContext(targetExecutorId, descriptor.toByteArray()));
+            .thenCompose(obj -> byteTransfer.newInputContext(targetExecutorId, descriptor.toByteArray(), false));
 
         // whenComplete() ensures that blockTransferThrottler.onTransferFinished() is always called,
         // even on failures. Actual failure handling and Task retry will be done by DataFetcher.
@@ -328,8 +328,8 @@ public final class BlockManagerWorker {
    * @throws InvalidProtocolBufferException from errors during parsing context descriptor
    */
   public void onOutputContext(final ByteOutputContext outputContext) throws InvalidProtocolBufferException {
-    final ByteTransferContextDescriptor descriptor = ByteTransferContextDescriptor.PARSER
-        .parseFrom(outputContext.getContextDescriptor());
+    final ControlMessage.BlockTransferContextDescriptor descriptor =
+      ControlMessage.BlockTransferContextDescriptor.PARSER.parseFrom(outputContext.getContextDescriptor());
     final DataStoreProperty.Value blockStore = convertBlockStore(descriptor.getBlockStore());
     final String blockId = descriptor.getBlockId();
     final KeyRange keyRange = SerializationUtils.deserialize(descriptor.getKeyRange().toByteArray());
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/BroadcastManagerWorker.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/BroadcastManagerWorker.java
index 084f0ef..c8a87fc 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/BroadcastManagerWorker.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/BroadcastManagerWorker.java
@@ -128,6 +128,7 @@ public final class BroadcastManagerWorker {
    * @return the variable.
    */
   public Object get(final Serializable id)  {
+    LOG.info("get {}", id);
     try {
       return idToVariableCache.get(id);
     } catch (ExecutionException e) {
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/PipeContainer.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/PipeContainer.java
new file mode 100644
index 0000000..a49ef78
--- /dev/null
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/PipeContainer.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.runtime.executor.data;
+
+import org.apache.nemo.common.Pair;
+import org.apache.nemo.runtime.executor.bytetransfer.ByteOutputContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.ThreadSafe;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * Writes happen in a serialized manner with {@link PipeContainer#putPipeListIfAbsent(Pair, int)}.
+ * This ensures that each key is initialized exactly once, and never updated.
+ *
+ * Writes and reads for the same key never occur concurrently with no problem, because
+ * (1) write never updates, and (2) read happens only after the write.
+ *
+ * Reads can happen concurrently with no problem.
+ */
+@ThreadSafe
+public final class PipeContainer {
+  private static final Logger LOG = LoggerFactory.getLogger(PipeContainer.class.getName());
+  private final ConcurrentHashMap<Pair<String, Long>, CountBasedBlockingContainer<ByteOutputContext>> pipeMap;
+
+  PipeContainer() {
+    this.pipeMap = new ConcurrentHashMap<>();
+  }
+
+  /**
+   * Blocks the get operation when the number of elements is smaller than expected.
+   * @param <T> type of the value.
+   */
+  class CountBasedBlockingContainer<T> {
+    private final Map<Integer, T> indexToValue;
+    private final int expected;
+    private final Lock lock;
+    private final Condition condition;
+
+    CountBasedBlockingContainer(final int expected) {
+      this.indexToValue = new HashMap<>(expected);
+      this.expected = expected;
+      this.lock = new ReentrantLock();
+      this.condition = lock.newCondition();
+    }
+
+    public List<T> getValuesBlocking() {
+      lock.lock();
+      try {
+        if (!isCountSatistified()) {
+          condition.await();
+        }
+        return new ArrayList<>(indexToValue.values());
+      } catch (InterruptedException e) {
+        throw new RuntimeException(e);
+      } finally {
+        lock.unlock();
+      }
+    }
+
+    public void setValue(final int index, final T value) {
+      lock.lock();
+      try {
+        final T previous = indexToValue.put(index, value);
+        if (null != previous) {
+          throw new IllegalStateException(previous.toString());
+        }
+
+        if (isCountSatistified()) {
+          condition.signalAll();
+        }
+      } finally {
+        lock.unlock();
+      }
+    }
+
+    private boolean isCountSatistified() {
+      if (indexToValue.size() < expected) {
+        return false;
+      } else if (indexToValue.size() == expected) {
+        return true;
+      } else {
+        throw new IllegalStateException(indexToValue.size() + " < " + expected);
+      }
+    }
+
+    @Override
+    public String toString() {
+      return indexToValue.toString();
+    }
+  }
+
+  /**
+   * (SYNCHRONIZATION) Initialize the key exactly once.
+   *
+   * @param pairKey
+   * @param expected
+   */
+  synchronized void putPipeListIfAbsent(final Pair<String, Long> pairKey, final int expected) {
+    pipeMap.putIfAbsent(pairKey, new CountBasedBlockingContainer(expected));
+  }
+
+  /**
+   * (SYNCHRONIZATION) CountBasedBlockingContainer takes care of it.
+   *
+   * @param pairKey
+   * @param dstTaskIndex
+   * @param byteOutputContext
+   */
+  void putPipe(final Pair<String, Long> pairKey, final int dstTaskIndex, final ByteOutputContext byteOutputContext) {
+    final CountBasedBlockingContainer<ByteOutputContext> container = pipeMap.get(pairKey);
+    container.setValue(dstTaskIndex, byteOutputContext);
+  }
+
+  /**
+   * (SYNCHRONIZATION) CountBasedBlockingContainer takes care of it.
+   *
+   * @param pairKey
+   * @return
+   */
+  List<ByteOutputContext> getPipes(final Pair<String, Long> pairKey) {
+    final CountBasedBlockingContainer<ByteOutputContext> container = pipeMap.get(pairKey);
+    return container.getValuesBlocking();
+  }
+}
+
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/PipeManagerWorker.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/PipeManagerWorker.java
new file mode 100644
index 0000000..ffbbe56
--- /dev/null
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/PipeManagerWorker.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.runtime.executor.data;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.nemo.common.Pair;
+import org.apache.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty;
+import org.apache.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
+import org.apache.nemo.conf.JobConf;
+import org.apache.nemo.runtime.common.RuntimeIdManager;
+import org.apache.nemo.runtime.common.comm.ControlMessage;
+import org.apache.nemo.runtime.common.message.MessageEnvironment;
+import org.apache.nemo.runtime.common.message.PersistentConnectionToMasterMap;
+import org.apache.nemo.runtime.common.plan.RuntimeEdge;
+import org.apache.nemo.runtime.common.plan.StageEdge;
+import org.apache.nemo.runtime.executor.bytetransfer.ByteInputContext;
+import org.apache.nemo.runtime.executor.bytetransfer.ByteOutputContext;
+import org.apache.nemo.runtime.executor.bytetransfer.ByteTransfer;
+import org.apache.nemo.runtime.executor.data.streamchainer.Serializer;
+import org.apache.reef.tang.annotations.Parameter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.ThreadSafe;
+import javax.inject.Inject;
+import java.util.*;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Two threads use this class
+ * - Network thread: Saves pipe connections created from destination tasks.
+ * - Task executor thread: Creates new pipe connections to destination tasks (read),
+ *                         or retrieves a saved pipe connection (write)
+ */
+@ThreadSafe
+public final class PipeManagerWorker {
+  private static final Logger LOG = LoggerFactory.getLogger(PipeManagerWorker.class.getName());
+
+  private final String executorId;
+  private final SerializerManager serializerManager;
+
+  // To-Executor connections
+  private final ByteTransfer byteTransfer;
+
+  // Thread-safe container
+  private final PipeContainer pipeContainer;
+
+  private final PersistentConnectionToMasterMap toMaster;
+
+  @Inject
+  private PipeManagerWorker(@Parameter(JobConf.ExecutorId.class) final String executorId,
+                            final ByteTransfer byteTransfer,
+                            final SerializerManager serializerManager,
+                            final PersistentConnectionToMasterMap toMaster) {
+    this.executorId = executorId;
+    this.byteTransfer = byteTransfer;
+    this.serializerManager = serializerManager;
+    this.pipeContainer = new PipeContainer();
+    this.toMaster = toMaster;
+  }
+
+  public CompletableFuture<DataUtil.IteratorWithNumBytes> read(final int srcTaskIndex,
+                                                               final RuntimeEdge runtimeEdge,
+                                                               final int dstTaskIndex) {
+    final String runtimeEdgeId = runtimeEdge.getId();
+    // Get the location of the src task (blocking call)
+    final CompletableFuture<ControlMessage.Message> responseFromMasterFuture = toMaster
+      .getMessageSender(MessageEnvironment.PIPE_MANAGER_MASTER_MESSAGE_LISTENER_ID).request(
+        ControlMessage.Message.newBuilder()
+          .setId(RuntimeIdManager.generateMessageId())
+          .setListenerId(MessageEnvironment.PIPE_MANAGER_MASTER_MESSAGE_LISTENER_ID)
+          .setType(ControlMessage.MessageType.RequestPipeLoc)
+          .setRequestPipeLocMsg(
+            ControlMessage.RequestPipeLocationMessage.newBuilder()
+              .setExecutorId(executorId)
+              .setRuntimeEdgeId(runtimeEdgeId)
+              .setSrcTaskIndex(srcTaskIndex)
+              .build())
+          .build());
+
+
+    return responseFromMasterFuture.thenCompose(responseFromMaster -> {
+      // Get executor id
+      if (responseFromMaster.getType() != ControlMessage.MessageType.PipeLocInfo) {
+        throw new RuntimeException("Response message type mismatch!");
+      }
+      final ControlMessage.PipeLocationInfoMessage pipeLocInfo = responseFromMaster.getPipeLocInfoMsg();
+      if (!pipeLocInfo.hasExecutorId()) {
+        throw new IllegalStateException();
+      }
+      final String targetExecutorId = responseFromMaster.getPipeLocInfoMsg().getExecutorId();
+
+      // Descriptor
+      final ControlMessage.PipeTransferContextDescriptor descriptor =
+        ControlMessage.PipeTransferContextDescriptor.newBuilder()
+          .setRuntimeEdgeId(runtimeEdgeId)
+          .setSrcTaskIndex(srcTaskIndex)
+          .setDstTaskIndex(dstTaskIndex)
+          .setNumPipeToWait(getNumOfPipeToWait(runtimeEdge))
+          .build();
+
+      // Connect to the executor
+      return byteTransfer.newInputContext(targetExecutorId, descriptor.toByteArray(), true)
+        .thenApply(context -> new DataUtil.InputStreamIterator(context.getInputStreams(),
+          serializerManager.getSerializer(runtimeEdgeId)));
+    });
+  }
+
+
+  public void notifyMaster(final String runtimeEdgeId, final long srcTaskIndex) {
+    // Notify the master that we're using this pipe.
+    toMaster.getMessageSender(MessageEnvironment.PIPE_MANAGER_MASTER_MESSAGE_LISTENER_ID).send(
+      ControlMessage.Message.newBuilder()
+        .setId(RuntimeIdManager.generateMessageId())
+        .setListenerId(MessageEnvironment.PIPE_MANAGER_MASTER_MESSAGE_LISTENER_ID)
+        .setType(ControlMessage.MessageType.PipeInit)
+        .setPipeInitMsg(ControlMessage.PipeInitMessage.newBuilder()
+          .setRuntimeEdgeId(runtimeEdgeId)
+          .setSrcTaskIndex(srcTaskIndex)
+          .setExecutorId(executorId)
+          .build())
+        .build());
+  }
+
+  /**
+   * (SYNCHRONIZATION) Called by task threads.
+   *
+   * @param runtimeEdge
+   * @param srcTaskIndex
+   * @return output contexts.
+   */
+  public List<ByteOutputContext> getOutputContexts(final RuntimeEdge runtimeEdge,
+                                                   final long srcTaskIndex) {
+
+    // First, initialize the pair key
+    final Pair<String, Long> pairKey = Pair.of(runtimeEdge.getId(), srcTaskIndex);
+    pipeContainer.putPipeListIfAbsent(pairKey, getNumOfPipeToWait(runtimeEdge));
+
+    // Then, do stuff
+    return pipeContainer.getPipes(pairKey); // blocking call
+  }
+
+  public Serializer getSerializer(final String runtimeEdgeId) {
+    return serializerManager.getSerializer(runtimeEdgeId);
+  }
+
+  /**
+   * (SYNCHRONIZATION) Called by network threads.
+   *
+   * @param outputContext
+   * @throws InvalidProtocolBufferException
+   */
+  public void onOutputContext(final ByteOutputContext outputContext) throws InvalidProtocolBufferException {
+    final ControlMessage.PipeTransferContextDescriptor descriptor =
+      ControlMessage.PipeTransferContextDescriptor.PARSER.parseFrom(outputContext.getContextDescriptor());
+
+    final long srcTaskIndex = descriptor.getSrcTaskIndex();
+    final String runtimeEdgeId = descriptor.getRuntimeEdgeId();
+    final int dstTaskIndex = (int) descriptor.getDstTaskIndex();
+    final int numPipeToWait = (int) descriptor.getNumPipeToWait();
+    final Pair<String, Long> pairKey = Pair.of(runtimeEdgeId, srcTaskIndex);
+
+    // First, initialize the pair key
+    pipeContainer.putPipeListIfAbsent(pairKey, numPipeToWait);
+
+    // Then, do stuff
+    pipeContainer.putPipe(pairKey, dstTaskIndex, outputContext);
+  }
+
+  public void onInputContext(final ByteInputContext inputContext) throws InvalidProtocolBufferException {
+    throw new UnsupportedOperationException();
+  }
+
+  private int getNumOfPipeToWait(final RuntimeEdge runtimeEdge) {
+    final int dstParallelism = ((StageEdge) runtimeEdge).getDstIRVertex().getPropertyValue(ParallelismProperty.class)
+      .orElseThrow(() -> new IllegalStateException());
+    final CommunicationPatternProperty.Value commPattern = ((StageEdge) runtimeEdge)
+      .getPropertyValue(CommunicationPatternProperty.class)
+      .orElseThrow(() -> new IllegalStateException());
+
+    return commPattern.equals(CommunicationPatternProperty.Value.OneToOne) ? 1 : dstParallelism;
+  }
+}
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/InputReader.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/BlockInputReader.java
similarity index 64%
copy from runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/InputReader.java
copy to runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/BlockInputReader.java
index 1bd661d..12c6053 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/InputReader.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/BlockInputReader.java
@@ -18,62 +18,52 @@
  */
 package org.apache.nemo.runtime.executor.datatransfer;
 
-import com.google.common.annotations.VisibleForTesting;
+import org.apache.nemo.common.HashRange;
+import org.apache.nemo.common.KeyRange;
+import org.apache.nemo.common.exception.BlockFetchException;
+import org.apache.nemo.common.exception.UnsupportedCommPatternException;
 import org.apache.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty;
 import org.apache.nemo.common.ir.edge.executionproperty.DataStoreProperty;
 import org.apache.nemo.common.ir.edge.executionproperty.DuplicateEdgeGroupProperty;
 import org.apache.nemo.common.ir.edge.executionproperty.DuplicateEdgeGroupPropertyValue;
 import org.apache.nemo.common.ir.vertex.IRVertex;
-import org.apache.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
 import org.apache.nemo.runtime.common.RuntimeIdManager;
-import org.apache.nemo.common.KeyRange;
 import org.apache.nemo.runtime.common.plan.RuntimeEdge;
 import org.apache.nemo.runtime.common.plan.StageEdge;
-import org.apache.nemo.common.exception.BlockFetchException;
-import org.apache.nemo.common.exception.UnsupportedCommPatternException;
-import org.apache.nemo.common.HashRange;
 import org.apache.nemo.runtime.executor.data.BlockManagerWorker;
 import org.apache.nemo.runtime.executor.data.DataUtil;
 
 import java.util.*;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-import java.util.stream.StreamSupport;
 
 /**
  * Represents the input data transfer to a task.
  */
-public final class InputReader extends DataTransfer {
-  private final int dstTaskIndex;
+public final class BlockInputReader implements InputReader {
   private final BlockManagerWorker blockManagerWorker;
 
+  private final int dstTaskIndex;
+
   /**
    * Attributes that specify how we should read the input.
    */
   private final IRVertex srcVertex;
   private final RuntimeEdge runtimeEdge;
 
-  public InputReader(final int dstTaskIndex,
-                     final IRVertex srcVertex,
-                     final RuntimeEdge runtimeEdge,
-                     final BlockManagerWorker blockManagerWorker) {
-    super(runtimeEdge.getId());
+  BlockInputReader(final int dstTaskIndex,
+                   final IRVertex srcVertex,
+                   final RuntimeEdge runtimeEdge,
+                   final BlockManagerWorker blockManagerWorker) {
     this.dstTaskIndex = dstTaskIndex;
     this.srcVertex = srcVertex;
     this.runtimeEdge = runtimeEdge;
     this.blockManagerWorker = blockManagerWorker;
   }
 
-  /**
-   * Reads input data depending on the communication pattern of the srcVertex.
-   *
-   * @return the read data.
-   */
+  @Override
   public List<CompletableFuture<DataUtil.IteratorWithNumBytes>> read() {
     final Optional<CommunicationPatternProperty.Value> comValue =
-            runtimeEdge.getPropertyValue(CommunicationPatternProperty.class);
+      runtimeEdge.getPropertyValue(CommunicationPatternProperty.class);
 
     if (comValue.get().equals(CommunicationPatternProperty.Value.OneToOne)) {
       return Collections.singletonList(readOneToOne());
@@ -88,22 +78,43 @@ public final class InputReader extends DataTransfer {
     }
   }
 
+  @Override
+  public IRVertex getSrcIrVertex() {
+    return srcVertex;
+  }
+
+  /**
+   * See {@link RuntimeIdManager#generateBlockIdWildcard(String, int)} for information on block wildcards.
+   * @param producerTaskIndex to use.
+   * @return wildcard block id that corresponds to "ANY" task attempt of the task index.
+   */
+  private String generateWildCardBlockId(final int producerTaskIndex) {
+    final Optional<DuplicateEdgeGroupPropertyValue> duplicateDataProperty =
+      runtimeEdge.getPropertyValue(DuplicateEdgeGroupProperty.class);
+    if (!duplicateDataProperty.isPresent() || duplicateDataProperty.get().getGroupSize() <= 1) {
+      return RuntimeIdManager.generateBlockIdWildcard(runtimeEdge.getId(), producerTaskIndex);
+    }
+    final String duplicateEdgeId = duplicateDataProperty.get().getRepresentativeEdgeId();
+    return RuntimeIdManager.generateBlockIdWildcard(duplicateEdgeId, producerTaskIndex);
+  }
+
   private CompletableFuture<DataUtil.IteratorWithNumBytes> readOneToOne() {
     final String blockIdWildcard = generateWildCardBlockId(dstTaskIndex);
     final Optional<DataStoreProperty.Value> dataStoreProperty
-        = runtimeEdge.getPropertyValue(DataStoreProperty.class);
-    return blockManagerWorker.readBlock(blockIdWildcard, getId(), dataStoreProperty.get(), HashRange.all());
+      = runtimeEdge.getPropertyValue(DataStoreProperty.class);
+    return blockManagerWorker.readBlock(blockIdWildcard, runtimeEdge.getId(), dataStoreProperty.get(), HashRange.all());
   }
 
   private List<CompletableFuture<DataUtil.IteratorWithNumBytes>> readBroadcast() {
-    final int numSrcTasks = this.getSourceParallelism();
+    final int numSrcTasks = InputReader.getSourceParallelism(this);
     final Optional<DataStoreProperty.Value> dataStoreProperty
-        = runtimeEdge.getPropertyValue(DataStoreProperty.class);
+      = runtimeEdge.getPropertyValue(DataStoreProperty.class);
 
     final List<CompletableFuture<DataUtil.IteratorWithNumBytes>> futures = new ArrayList<>();
     for (int srcTaskIdx = 0; srcTaskIdx < numSrcTasks; srcTaskIdx++) {
       final String blockIdWildcard = generateWildCardBlockId(srcTaskIdx);
-      futures.add(blockManagerWorker.readBlock(blockIdWildcard, getId(), dataStoreProperty.get(), HashRange.all()));
+      futures.add(blockManagerWorker.readBlock(
+        blockIdWildcard, runtimeEdge.getId(), dataStoreProperty.get(), HashRange.all()));
     }
 
     return futures;
@@ -117,72 +128,22 @@ public final class InputReader extends DataTransfer {
   private List<CompletableFuture<DataUtil.IteratorWithNumBytes>> readDataInRange() {
     assert (runtimeEdge instanceof StageEdge);
     final Optional<DataStoreProperty.Value> dataStoreProperty
-        = runtimeEdge.getPropertyValue(DataStoreProperty.class);
+      = runtimeEdge.getPropertyValue(DataStoreProperty.class);
     ((StageEdge) runtimeEdge).getTaskIdxToKeyRange().get(dstTaskIndex);
     final KeyRange hashRangeToRead = ((StageEdge) runtimeEdge).getTaskIdxToKeyRange().get(dstTaskIndex);
     if (hashRangeToRead == null) {
       throw new BlockFetchException(
-          new Throwable("The hash range to read is not assigned to " + dstTaskIndex + "'th task"));
+        new Throwable("The hash range to read is not assigned to " + dstTaskIndex + "'th task"));
     }
 
-    final int numSrcTasks = this.getSourceParallelism();
+    final int numSrcTasks = InputReader.getSourceParallelism(this);
     final List<CompletableFuture<DataUtil.IteratorWithNumBytes>> futures = new ArrayList<>();
     for (int srcTaskIdx = 0; srcTaskIdx < numSrcTasks; srcTaskIdx++) {
       final String blockIdWildcard = generateWildCardBlockId(srcTaskIdx);
       futures.add(
-          blockManagerWorker.readBlock(blockIdWildcard, getId(), dataStoreProperty.get(), hashRangeToRead));
+        blockManagerWorker.readBlock(blockIdWildcard, runtimeEdge.getId(), dataStoreProperty.get(), hashRangeToRead));
     }
 
     return futures;
   }
-
-  /**
-   * See {@link RuntimeIdManager#generateBlockIdWildcard(String, int)} for information on block wildcards.
-   * @param producerTaskIndex to use.
-   * @return wildcard block id that corresponds to "ANY" task attempt of the task index.
-   */
-  private String generateWildCardBlockId(final int producerTaskIndex) {
-    final Optional<DuplicateEdgeGroupPropertyValue> duplicateDataProperty =
-        runtimeEdge.getPropertyValue(DuplicateEdgeGroupProperty.class);
-    if (!duplicateDataProperty.isPresent() || duplicateDataProperty.get().getGroupSize() <= 1) {
-      return RuntimeIdManager.generateBlockIdWildcard(getId(), producerTaskIndex);
-    }
-    final String duplicateEdgeId = duplicateDataProperty.get().getRepresentativeEdgeId();
-    return RuntimeIdManager.generateBlockIdWildcard(duplicateEdgeId, producerTaskIndex);
-  }
-
-  public IRVertex getSrcIrVertex() {
-    return srcVertex;
-  }
-
-  /**
-   * Get the parallelism of the source task.
-   *
-   * @return the parallelism of the source task.
-   */
-  public int getSourceParallelism() {
-    return srcVertex.getPropertyValue(ParallelismProperty.class).
-        orElseThrow(() -> new RuntimeException("No parallelism property on this edge."));
-  }
-
-  /**
-   * Combine the given list of futures.
-   *
-   * @param futures to combine.
-   * @return the combined iterable of elements.
-   * @throws ExecutionException   when fail to get results from futures.
-   * @throws InterruptedException when interrupted during getting results from futures.
-   */
-  @VisibleForTesting
-  public static Iterator combineFutures(final List<CompletableFuture<DataUtil.IteratorWithNumBytes>> futures)
-      throws ExecutionException, InterruptedException {
-    final List concatStreamBase = new ArrayList<>();
-    Stream<Object> concatStream = concatStreamBase.stream();
-    for (int srcTaskIdx = 0; srcTaskIdx < futures.size(); srcTaskIdx++) {
-      final Iterator dataFromATask = futures.get(srcTaskIdx).get();
-      final Iterable iterable = () -> dataFromATask;
-      concatStream = Stream.concat(concatStream, StreamSupport.stream(iterable.spliterator(), false));
-    }
-    return concatStream.collect(Collectors.toList()).iterator();
-  }
 }
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OutputWriter.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/BlockOutputWriter.java
similarity index 58%
copy from runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OutputWriter.java
copy to runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/BlockOutputWriter.java
index ebdbd1b..2391533 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OutputWriter.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/BlockOutputWriter.java
@@ -18,8 +18,6 @@
  */
 package org.apache.nemo.runtime.executor.datatransfer;
 
-import org.apache.nemo.common.KeyExtractor;
-import org.apache.nemo.common.exception.*;
 import org.apache.nemo.common.ir.edge.executionproperty.*;
 import org.apache.nemo.common.ir.vertex.IRVertex;
 import org.apache.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
@@ -29,20 +27,23 @@ import org.apache.nemo.runtime.executor.data.BlockManagerWorker;
 import org.apache.nemo.runtime.executor.data.block.Block;
 import org.apache.nemo.runtime.executor.data.partitioner.*;
 
-import java.util.*;
+import java.util.Map;
+import java.util.Optional;
 
 /**
  * Represents the output data transfer from a task.
  */
-public final class OutputWriter extends DataTransfer implements AutoCloseable {
+public final class BlockOutputWriter implements OutputWriter {
   private final RuntimeEdge<?> runtimeEdge;
   private final IRVertex dstIrVertex;
+  private final Partitioner partitioner;
+
   private final DataStoreProperty.Value blockStoreValue;
   private final BlockManagerWorker blockManagerWorker;
-  private final boolean nonDummyBlock;
   private final Block blockToWrite;
+  private final boolean nonDummyBlock;
+
   private long writtenBytes;
-  private Partitioner partitioner;
 
   /**
    * Constructor.
@@ -53,46 +54,20 @@ public final class OutputWriter extends DataTransfer implements AutoCloseable {
    * @param runtimeEdge         the {@link RuntimeEdge}.
    * @param blockManagerWorker  the {@link BlockManagerWorker}.
    */
-  OutputWriter(final int hashRangeMultiplier,
-               final String srcTaskId,
-               final IRVertex dstIrVertex,
-               final RuntimeEdge<?> runtimeEdge,
-               final BlockManagerWorker blockManagerWorker) {
-    super(runtimeEdge.getId());
+  BlockOutputWriter(final int hashRangeMultiplier,
+                    final String srcTaskId,
+                    final IRVertex dstIrVertex,
+                    final RuntimeEdge<?> runtimeEdge,
+                    final BlockManagerWorker blockManagerWorker) {
     this.runtimeEdge = runtimeEdge;
     this.dstIrVertex = dstIrVertex;
-    this.blockManagerWorker = blockManagerWorker;
-    this.blockStoreValue = runtimeEdge.getPropertyValue(DataStoreProperty.class).
-        orElseThrow(() -> new RuntimeException("No data store property on the edge"));
 
-    // Setup partitioner
-    final int dstParallelism = dstIrVertex.getPropertyValue(ParallelismProperty.class).
-        orElseThrow(() -> new RuntimeException("No parallelism property on the destination vertex"));
-    final Optional<KeyExtractor> keyExtractor = runtimeEdge.getPropertyValue(KeyExtractorProperty.class);
-    final PartitionerProperty.Value partitionerPropertyValue =
-        runtimeEdge.getPropertyValue(PartitionerProperty.class).
-            orElseThrow(() -> new RuntimeException("No partitioner property on the edge"));
-    switch (partitionerPropertyValue) {
-      case IntactPartitioner:
-        this.partitioner = new IntactPartitioner();
-        break;
-      case HashPartitioner:
-        this.partitioner = new HashPartitioner(dstParallelism, keyExtractor.
-            orElseThrow(() -> new RuntimeException("No key extractor property on the edge")));
-        break;
-      case DataSkewHashPartitioner:
-        this.partitioner = new DataSkewHashPartitioner(hashRangeMultiplier, dstParallelism, keyExtractor.
-            orElseThrow(() -> new RuntimeException("No key extractor property on the edge")));
-        break;
-      case DedicatedKeyPerElementPartitioner:
-        this.partitioner = new DedicatedKeyPerElementPartitioner();
-        break;
-      default:
-        throw new UnsupportedPartitionerException(
-            new Throwable("Partitioner " + partitionerPropertyValue + " is not supported."));
-    }
+    this.partitioner = OutputWriter.getPartitioner(runtimeEdge, hashRangeMultiplier);
+    this.blockManagerWorker = blockManagerWorker;
+    this.blockStoreValue = runtimeEdge.getPropertyValue(DataStoreProperty.class)
+      .orElseThrow(() -> new RuntimeException("No data store property on the edge"));
     blockToWrite = blockManagerWorker.createBlock(
-        RuntimeIdManager.generateBlockId(getId(), srcTaskId), blockStoreValue);
+        RuntimeIdManager.generateBlockId(runtimeEdge.getId(), srcTaskId), blockStoreValue);
     final Optional<DuplicateEdgeGroupPropertyValue> duplicateDataProperty =
         runtimeEdge.getPropertyValue(DuplicateEdgeGroupProperty.class);
     nonDummyBlock = !duplicateDataProperty.isPresent()
@@ -100,11 +75,7 @@ public final class OutputWriter extends DataTransfer implements AutoCloseable {
         || duplicateDataProperty.get().getGroupSize() <= 1;
   }
 
-  /**
-   * Writes output element depending on the communication pattern of the edge.
-   *
-   * @param element the element to write.
-   */
+  @Override
   public void write(final Object element) {
     if (nonDummyBlock) {
       blockToWrite.write(partitioner.partition(element), element);
@@ -121,11 +92,11 @@ public final class OutputWriter extends DataTransfer implements AutoCloseable {
    * Notifies that all writes for a block is end.
    * Further write about a committed block will throw an exception.
    */
+  @Override
   public void close() {
     // Commit block.
-    final DataPersistenceProperty.Value persistence =
-        runtimeEdge.getPropertyValue(DataPersistenceProperty.class).
-            orElseThrow(() -> new RuntimeException("No data persistence property on the edge"));
+    final DataPersistenceProperty.Value persistence = (DataPersistenceProperty.Value) runtimeEdge
+      .getPropertyValue(DataPersistenceProperty.class).get();
 
     final Optional<Map<Integer, Long>> partitionSizeMap = blockToWrite.commit();
     // Return the total size of the committed block.
@@ -134,16 +105,13 @@ public final class OutputWriter extends DataTransfer implements AutoCloseable {
       for (final long partitionSize : partitionSizeMap.get().values()) {
         blockSizeTotal += partitionSize;
       }
-      this.writtenBytes = blockSizeTotal;
+      writtenBytes = blockSizeTotal;
     } else {
-      this.writtenBytes = -1; // no written bytes info.
+      writtenBytes = -1; // no written bytes info.
     }
     blockManagerWorker.writeBlock(blockToWrite, blockStoreValue, getExpectedRead(), persistence);
   }
 
-  /**
-   * @return the total written bytes.
-   */
   public Optional<Long> getWrittenBytes() {
     if (writtenBytes == -1) {
       return Optional.empty();
@@ -160,14 +128,14 @@ public final class OutputWriter extends DataTransfer implements AutoCloseable {
    */
   private int getExpectedRead() {
     final Optional<DuplicateEdgeGroupPropertyValue> duplicateDataProperty =
-        runtimeEdge.getPropertyValue(DuplicateEdgeGroupProperty.class);
+      runtimeEdge.getPropertyValue(DuplicateEdgeGroupProperty.class);
     final int duplicatedDataMultiplier =
-        duplicateDataProperty.isPresent() ? duplicateDataProperty.get().getGroupSize() : 1;
+      duplicateDataProperty.isPresent() ? duplicateDataProperty.get().getGroupSize() : 1;
     final int readForABlock = CommunicationPatternProperty.Value.OneToOne.equals(
-        runtimeEdge.getPropertyValue(CommunicationPatternProperty.class).orElseThrow(
-            () -> new RuntimeException("No communication pattern on this edge.")))
-        ? 1 : dstIrVertex.getPropertyValue(ParallelismProperty.class).orElseThrow(
-            () -> new RuntimeException("No parallelism property on the destination vertex."));
+      runtimeEdge.getPropertyValue(CommunicationPatternProperty.class).orElseThrow(
+        () -> new RuntimeException("No communication pattern on this edge.")))
+      ? 1 : dstIrVertex.getPropertyValue(ParallelismProperty.class).orElseThrow(
+      () -> new RuntimeException("No parallelism property on the destination vertex."));
     return readForABlock * duplicatedDataMultiplier;
   }
 }
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/DataTransfer.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/DataTransfer.java
deleted file mode 100644
index dcaae52..0000000
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/DataTransfer.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.nemo.runtime.executor.datatransfer;
-
-
-/**
- * Contains common parts involved in {@link InputReader} and {@link OutputWriter}.
- * The two classes are involved in
- * intermediate data transfer between {@link org.apache.nemo.runtime.common.plan.physical.Task}.
- */
-public abstract class DataTransfer {
-  private final String id;
-
-  public DataTransfer(final String id) {
-    this.id = id;
-  }
-
-  /**
-   * @return ID of the reader/writer.
-   */
-  public final String getId() {
-    return id;
-  }
-}
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/InputReader.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/InputReader.java
index 1bd661d..e6f7657 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/InputReader.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/InputReader.java
@@ -18,171 +18,28 @@
  */
 package org.apache.nemo.runtime.executor.datatransfer;
 
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty;
-import org.apache.nemo.common.ir.edge.executionproperty.DataStoreProperty;
-import org.apache.nemo.common.ir.edge.executionproperty.DuplicateEdgeGroupProperty;
-import org.apache.nemo.common.ir.edge.executionproperty.DuplicateEdgeGroupPropertyValue;
 import org.apache.nemo.common.ir.vertex.IRVertex;
 import org.apache.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
-import org.apache.nemo.runtime.common.RuntimeIdManager;
-import org.apache.nemo.common.KeyRange;
-import org.apache.nemo.runtime.common.plan.RuntimeEdge;
-import org.apache.nemo.runtime.common.plan.StageEdge;
-import org.apache.nemo.common.exception.BlockFetchException;
-import org.apache.nemo.common.exception.UnsupportedCommPatternException;
-import org.apache.nemo.common.HashRange;
-import org.apache.nemo.runtime.executor.data.BlockManagerWorker;
 import org.apache.nemo.runtime.executor.data.DataUtil;
 
 import java.util.*;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-import java.util.stream.StreamSupport;
 
 /**
  * Represents the input data transfer to a task.
  */
-public final class InputReader extends DataTransfer {
-  private final int dstTaskIndex;
-  private final BlockManagerWorker blockManagerWorker;
-
-  /**
-   * Attributes that specify how we should read the input.
-   */
-  private final IRVertex srcVertex;
-  private final RuntimeEdge runtimeEdge;
-
-  public InputReader(final int dstTaskIndex,
-                     final IRVertex srcVertex,
-                     final RuntimeEdge runtimeEdge,
-                     final BlockManagerWorker blockManagerWorker) {
-    super(runtimeEdge.getId());
-    this.dstTaskIndex = dstTaskIndex;
-    this.srcVertex = srcVertex;
-    this.runtimeEdge = runtimeEdge;
-    this.blockManagerWorker = blockManagerWorker;
-  }
-
+public interface InputReader {
   /**
    * Reads input data depending on the communication pattern of the srcVertex.
    *
    * @return the read data.
    */
-  public List<CompletableFuture<DataUtil.IteratorWithNumBytes>> read() {
-    final Optional<CommunicationPatternProperty.Value> comValue =
-            runtimeEdge.getPropertyValue(CommunicationPatternProperty.class);
-
-    if (comValue.get().equals(CommunicationPatternProperty.Value.OneToOne)) {
-      return Collections.singletonList(readOneToOne());
-    } else if (comValue.get().equals(CommunicationPatternProperty.Value.BroadCast)) {
-      return readBroadcast();
-    } else if (comValue.get().equals(CommunicationPatternProperty.Value.Shuffle)) {
-      // If the dynamic optimization which detects data skew is enabled, read the data in the assigned range.
-      // TODO #492: Modularize the data communication pattern.
-      return readDataInRange();
-    } else {
-      throw new UnsupportedCommPatternException(new Exception("Communication pattern not supported"));
-    }
-  }
-
-  private CompletableFuture<DataUtil.IteratorWithNumBytes> readOneToOne() {
-    final String blockIdWildcard = generateWildCardBlockId(dstTaskIndex);
-    final Optional<DataStoreProperty.Value> dataStoreProperty
-        = runtimeEdge.getPropertyValue(DataStoreProperty.class);
-    return blockManagerWorker.readBlock(blockIdWildcard, getId(), dataStoreProperty.get(), HashRange.all());
-  }
-
-  private List<CompletableFuture<DataUtil.IteratorWithNumBytes>> readBroadcast() {
-    final int numSrcTasks = this.getSourceParallelism();
-    final Optional<DataStoreProperty.Value> dataStoreProperty
-        = runtimeEdge.getPropertyValue(DataStoreProperty.class);
-
-    final List<CompletableFuture<DataUtil.IteratorWithNumBytes>> futures = new ArrayList<>();
-    for (int srcTaskIdx = 0; srcTaskIdx < numSrcTasks; srcTaskIdx++) {
-      final String blockIdWildcard = generateWildCardBlockId(srcTaskIdx);
-      futures.add(blockManagerWorker.readBlock(blockIdWildcard, getId(), dataStoreProperty.get(), HashRange.all()));
-    }
-
-    return futures;
-  }
-
-  /**
-   * Read data in the assigned range of hash value.
-   *
-   * @return the list of the completable future of the data.
-   */
-  private List<CompletableFuture<DataUtil.IteratorWithNumBytes>> readDataInRange() {
-    assert (runtimeEdge instanceof StageEdge);
-    final Optional<DataStoreProperty.Value> dataStoreProperty
-        = runtimeEdge.getPropertyValue(DataStoreProperty.class);
-    ((StageEdge) runtimeEdge).getTaskIdxToKeyRange().get(dstTaskIndex);
-    final KeyRange hashRangeToRead = ((StageEdge) runtimeEdge).getTaskIdxToKeyRange().get(dstTaskIndex);
-    if (hashRangeToRead == null) {
-      throw new BlockFetchException(
-          new Throwable("The hash range to read is not assigned to " + dstTaskIndex + "'th task"));
-    }
-
-    final int numSrcTasks = this.getSourceParallelism();
-    final List<CompletableFuture<DataUtil.IteratorWithNumBytes>> futures = new ArrayList<>();
-    for (int srcTaskIdx = 0; srcTaskIdx < numSrcTasks; srcTaskIdx++) {
-      final String blockIdWildcard = generateWildCardBlockId(srcTaskIdx);
-      futures.add(
-          blockManagerWorker.readBlock(blockIdWildcard, getId(), dataStoreProperty.get(), hashRangeToRead));
-    }
+  List<CompletableFuture<DataUtil.IteratorWithNumBytes>> read();
 
-    return futures;
-  }
-
-  /**
-   * See {@link RuntimeIdManager#generateBlockIdWildcard(String, int)} for information on block wildcards.
-   * @param producerTaskIndex to use.
-   * @return wildcard block id that corresponds to "ANY" task attempt of the task index.
-   */
-  private String generateWildCardBlockId(final int producerTaskIndex) {
-    final Optional<DuplicateEdgeGroupPropertyValue> duplicateDataProperty =
-        runtimeEdge.getPropertyValue(DuplicateEdgeGroupProperty.class);
-    if (!duplicateDataProperty.isPresent() || duplicateDataProperty.get().getGroupSize() <= 1) {
-      return RuntimeIdManager.generateBlockIdWildcard(getId(), producerTaskIndex);
-    }
-    final String duplicateEdgeId = duplicateDataProperty.get().getRepresentativeEdgeId();
-    return RuntimeIdManager.generateBlockIdWildcard(duplicateEdgeId, producerTaskIndex);
-  }
-
-  public IRVertex getSrcIrVertex() {
-    return srcVertex;
-  }
+  IRVertex getSrcIrVertex();
 
-  /**
-   * Get the parallelism of the source task.
-   *
-   * @return the parallelism of the source task.
-   */
-  public int getSourceParallelism() {
-    return srcVertex.getPropertyValue(ParallelismProperty.class).
-        orElseThrow(() -> new RuntimeException("No parallelism property on this edge."));
-  }
-
-  /**
-   * Combine the given list of futures.
-   *
-   * @param futures to combine.
-   * @return the combined iterable of elements.
-   * @throws ExecutionException   when fail to get results from futures.
-   * @throws InterruptedException when interrupted during getting results from futures.
-   */
-  @VisibleForTesting
-  public static Iterator combineFutures(final List<CompletableFuture<DataUtil.IteratorWithNumBytes>> futures)
-      throws ExecutionException, InterruptedException {
-    final List concatStreamBase = new ArrayList<>();
-    Stream<Object> concatStream = concatStreamBase.stream();
-    for (int srcTaskIdx = 0; srcTaskIdx < futures.size(); srcTaskIdx++) {
-      final Iterator dataFromATask = futures.get(srcTaskIdx).get();
-      final Iterable iterable = () -> dataFromATask;
-      concatStream = Stream.concat(concatStream, StreamSupport.stream(iterable.spliterator(), false));
-    }
-    return concatStream.collect(Collectors.toList()).iterator();
+  static int getSourceParallelism(final InputReader inputReader) {
+    return inputReader.getSrcIrVertex().getPropertyValue(ParallelismProperty.class)
+      .orElseThrow(() -> new IllegalStateException(inputReader.getSrcIrVertex().getId()));
   }
 }
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/DataTransferFactory.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/IntermediateDataIOFactory.java
similarity index 61%
rename from runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/DataTransferFactory.java
rename to runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/IntermediateDataIOFactory.java
index 5f627ac..a90cc79 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/DataTransferFactory.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/IntermediateDataIOFactory.java
@@ -18,41 +18,51 @@
  */
 package org.apache.nemo.runtime.executor.datatransfer;
 
+import org.apache.nemo.common.ir.edge.executionproperty.DataStoreProperty;
 import org.apache.nemo.conf.JobConf;
 import org.apache.nemo.common.ir.vertex.IRVertex;
 import org.apache.nemo.runtime.common.plan.RuntimeEdge;
+import org.apache.nemo.runtime.common.plan.StageEdge;
 import org.apache.nemo.runtime.executor.data.BlockManagerWorker;
+import org.apache.nemo.runtime.executor.data.PipeManagerWorker;
 import org.apache.reef.tang.annotations.Parameter;
 
 import javax.inject.Inject;
+import java.util.Optional;
 
 /**
  * A factory that produces {@link InputReader} and {@link OutputWriter}.
  */
-public final class DataTransferFactory {
-
+public final class IntermediateDataIOFactory {
+  private final PipeManagerWorker pipeManagerWorker;
   private final BlockManagerWorker blockManagerWorker;
   private final int hashRangeMultiplier;
 
   @Inject
-  private DataTransferFactory(@Parameter(JobConf.HashRangeMultiplier.class) final int hashRangeMultiplier,
-                              final BlockManagerWorker blockManagerWorker) {
+  private IntermediateDataIOFactory(@Parameter(JobConf.HashRangeMultiplier.class) final int hashRangeMultiplier,
+                                    final BlockManagerWorker blockManagerWorker,
+                                    final PipeManagerWorker pipeManagerWorker) {
     this.hashRangeMultiplier = hashRangeMultiplier;
     this.blockManagerWorker = blockManagerWorker;
+    this.pipeManagerWorker = pipeManagerWorker;
   }
 
   /**
    * Creates an {@link OutputWriter} between two stages.
    *
    * @param srcTaskId   the id of the source task.
-   * @param dstIRVertex the {@link IRVertex} that will take the output data as its input.
    * @param runtimeEdge that connects the srcTask to the tasks belonging to dstIRVertex.
    * @return the {@link OutputWriter} created.
    */
   public OutputWriter createWriter(final String srcTaskId,
-                                   final IRVertex dstIRVertex,
                                    final RuntimeEdge<?> runtimeEdge) {
-    return new OutputWriter(hashRangeMultiplier, srcTaskId, dstIRVertex, runtimeEdge, blockManagerWorker);
+    if (isPipe(runtimeEdge)) {
+      return new PipeOutputWriter(hashRangeMultiplier, srcTaskId, runtimeEdge, pipeManagerWorker);
+    } else {
+      final StageEdge stageEdge = (StageEdge) runtimeEdge;
+      return new BlockOutputWriter(
+        hashRangeMultiplier, srcTaskId, stageEdge.getDstIRVertex(), runtimeEdge, blockManagerWorker);
+    }
   }
 
   /**
@@ -66,6 +76,15 @@ public final class DataTransferFactory {
   public InputReader createReader(final int dstTaskIdx,
                                   final IRVertex srcIRVertex,
                                   final RuntimeEdge runtimeEdge) {
-    return new InputReader(dstTaskIdx, srcIRVertex, runtimeEdge, blockManagerWorker);
+    if (isPipe(runtimeEdge)) {
+      return new PipeInputReader(dstTaskIdx, srcIRVertex, runtimeEdge, pipeManagerWorker);
+    } else {
+      return new BlockInputReader(dstTaskIdx, srcIRVertex, runtimeEdge, blockManagerWorker);
+    }
+  }
+
+  private boolean isPipe(final RuntimeEdge runtimeEdge) {
+    final Optional<DataStoreProperty.Value> dataStoreProperty = runtimeEdge.getPropertyValue(DataStoreProperty.class);
+    return dataStoreProperty.isPresent() && dataStoreProperty.get().equals(DataStoreProperty.Value.Pipe);
   }
 }
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OutputWriter.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OutputWriter.java
index ebdbd1b..032510a 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OutputWriter.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OutputWriter.java
@@ -19,14 +19,12 @@
 package org.apache.nemo.runtime.executor.datatransfer;
 
 import org.apache.nemo.common.KeyExtractor;
-import org.apache.nemo.common.exception.*;
-import org.apache.nemo.common.ir.edge.executionproperty.*;
-import org.apache.nemo.common.ir.vertex.IRVertex;
+import org.apache.nemo.common.exception.UnsupportedPartitionerException;
+import org.apache.nemo.common.ir.edge.executionproperty.KeyExtractorProperty;
+import org.apache.nemo.common.ir.edge.executionproperty.PartitionerProperty;
 import org.apache.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
-import org.apache.nemo.runtime.common.RuntimeIdManager;
 import org.apache.nemo.runtime.common.plan.RuntimeEdge;
-import org.apache.nemo.runtime.executor.data.BlockManagerWorker;
-import org.apache.nemo.runtime.executor.data.block.Block;
+import org.apache.nemo.runtime.common.plan.StageEdge;
 import org.apache.nemo.runtime.executor.data.partitioner.*;
 
 import java.util.*;
@@ -34,140 +32,52 @@ import java.util.*;
 /**
  * Represents the output data transfer from a task.
  */
-public final class OutputWriter extends DataTransfer implements AutoCloseable {
-  private final RuntimeEdge<?> runtimeEdge;
-  private final IRVertex dstIrVertex;
-  private final DataStoreProperty.Value blockStoreValue;
-  private final BlockManagerWorker blockManagerWorker;
-  private final boolean nonDummyBlock;
-  private final Block blockToWrite;
-  private long writtenBytes;
-  private Partitioner partitioner;
-
+public interface OutputWriter {
   /**
-   * Constructor.
+   * Writes output element depending on the communication pattern of the edge.
    *
-   * @param hashRangeMultiplier the {@link org.apache.nemo.conf.JobConf.HashRangeMultiplier}.
-   * @param srcTaskId           the id of the source task.
-   * @param dstIrVertex         the destination IR vertex.
-   * @param runtimeEdge         the {@link RuntimeEdge}.
-   * @param blockManagerWorker  the {@link BlockManagerWorker}.
+   * @param element the element to write.
    */
-  OutputWriter(final int hashRangeMultiplier,
-               final String srcTaskId,
-               final IRVertex dstIrVertex,
-               final RuntimeEdge<?> runtimeEdge,
-               final BlockManagerWorker blockManagerWorker) {
-    super(runtimeEdge.getId());
-    this.runtimeEdge = runtimeEdge;
-    this.dstIrVertex = dstIrVertex;
-    this.blockManagerWorker = blockManagerWorker;
-    this.blockStoreValue = runtimeEdge.getPropertyValue(DataStoreProperty.class).
-        orElseThrow(() -> new RuntimeException("No data store property on the edge"));
+  void write(final Object element);
+
+  /**
+   * @return the total written bytes.
+   */
+  Optional<Long> getWrittenBytes();
+
+  void close();
+
 
-    // Setup partitioner
-    final int dstParallelism = dstIrVertex.getPropertyValue(ParallelismProperty.class).
-        orElseThrow(() -> new RuntimeException("No parallelism property on the destination vertex"));
-    final Optional<KeyExtractor> keyExtractor = runtimeEdge.getPropertyValue(KeyExtractorProperty.class);
+  static Partitioner getPartitioner(final RuntimeEdge runtimeEdge,
+                                    final int hashRangeMultiplier) {
+    final StageEdge stageEdge = (StageEdge) runtimeEdge;
     final PartitionerProperty.Value partitionerPropertyValue =
-        runtimeEdge.getPropertyValue(PartitionerProperty.class).
-            orElseThrow(() -> new RuntimeException("No partitioner property on the edge"));
+      (PartitionerProperty.Value) runtimeEdge.getPropertyValueOrRuntimeException(PartitionerProperty.class);
+    final int dstParallelism =
+      stageEdge.getDstIRVertex().getPropertyValue(ParallelismProperty.class).get();
+
+    final Partitioner partitioner;
     switch (partitionerPropertyValue) {
       case IntactPartitioner:
-        this.partitioner = new IntactPartitioner();
+        partitioner = new IntactPartitioner();
         break;
       case HashPartitioner:
-        this.partitioner = new HashPartitioner(dstParallelism, keyExtractor.
-            orElseThrow(() -> new RuntimeException("No key extractor property on the edge")));
+        final KeyExtractor hashKeyExtractor =
+          (KeyExtractor) runtimeEdge.getPropertyValueOrRuntimeException(KeyExtractorProperty.class);
+        partitioner = new HashPartitioner(dstParallelism, hashKeyExtractor);
         break;
       case DataSkewHashPartitioner:
-        this.partitioner = new DataSkewHashPartitioner(hashRangeMultiplier, dstParallelism, keyExtractor.
-            orElseThrow(() -> new RuntimeException("No key extractor property on the edge")));
+        final KeyExtractor dataSkewKeyExtractor =
+          (KeyExtractor) runtimeEdge.getPropertyValueOrRuntimeException(KeyExtractorProperty.class);
+        partitioner = new DataSkewHashPartitioner(hashRangeMultiplier, dstParallelism, dataSkewKeyExtractor);
         break;
       case DedicatedKeyPerElementPartitioner:
-        this.partitioner = new DedicatedKeyPerElementPartitioner();
+        partitioner = new DedicatedKeyPerElementPartitioner();
         break;
       default:
         throw new UnsupportedPartitionerException(
-            new Throwable("Partitioner " + partitionerPropertyValue + " is not supported."));
+          new Throwable("Partitioner " + partitionerPropertyValue + " is not supported."));
     }
-    blockToWrite = blockManagerWorker.createBlock(
-        RuntimeIdManager.generateBlockId(getId(), srcTaskId), blockStoreValue);
-    final Optional<DuplicateEdgeGroupPropertyValue> duplicateDataProperty =
-        runtimeEdge.getPropertyValue(DuplicateEdgeGroupProperty.class);
-    nonDummyBlock = !duplicateDataProperty.isPresent()
-        || duplicateDataProperty.get().getRepresentativeEdgeId().equals(runtimeEdge.getId())
-        || duplicateDataProperty.get().getGroupSize() <= 1;
-  }
-
-  /**
-   * Writes output element depending on the communication pattern of the edge.
-   *
-   * @param element the element to write.
-   */
-  public void write(final Object element) {
-    if (nonDummyBlock) {
-      blockToWrite.write(partitioner.partition(element), element);
-
-      final DedicatedKeyPerElement dedicatedKeyPerElement =
-          partitioner.getClass().getAnnotation(DedicatedKeyPerElement.class);
-      if (dedicatedKeyPerElement != null) {
-        blockToWrite.commitPartitions();
-      }
-    } // If else, does not need to write because the data is duplicated.
-  }
-
-  /**
-   * Notifies that all writes for a block is end.
-   * Further write about a committed block will throw an exception.
-   */
-  public void close() {
-    // Commit block.
-    final DataPersistenceProperty.Value persistence =
-        runtimeEdge.getPropertyValue(DataPersistenceProperty.class).
-            orElseThrow(() -> new RuntimeException("No data persistence property on the edge"));
-
-    final Optional<Map<Integer, Long>> partitionSizeMap = blockToWrite.commit();
-    // Return the total size of the committed block.
-    if (partitionSizeMap.isPresent()) {
-      long blockSizeTotal = 0;
-      for (final long partitionSize : partitionSizeMap.get().values()) {
-        blockSizeTotal += partitionSize;
-      }
-      this.writtenBytes = blockSizeTotal;
-    } else {
-      this.writtenBytes = -1; // no written bytes info.
-    }
-    blockManagerWorker.writeBlock(blockToWrite, blockStoreValue, getExpectedRead(), persistence);
-  }
-
-  /**
-   * @return the total written bytes.
-   */
-  public Optional<Long> getWrittenBytes() {
-    if (writtenBytes == -1) {
-      return Optional.empty();
-    } else {
-      return Optional.of(writtenBytes);
-    }
-  }
-
-  /**
-   * Get the expected number of data read according to the communication pattern of the edge and
-   * the parallelism of destination vertex.
-   *
-   * @return the expected number of data read.
-   */
-  private int getExpectedRead() {
-    final Optional<DuplicateEdgeGroupPropertyValue> duplicateDataProperty =
-        runtimeEdge.getPropertyValue(DuplicateEdgeGroupProperty.class);
-    final int duplicatedDataMultiplier =
-        duplicateDataProperty.isPresent() ? duplicateDataProperty.get().getGroupSize() : 1;
-    final int readForABlock = CommunicationPatternProperty.Value.OneToOne.equals(
-        runtimeEdge.getPropertyValue(CommunicationPatternProperty.class).orElseThrow(
-            () -> new RuntimeException("No communication pattern on this edge.")))
-        ? 1 : dstIrVertex.getPropertyValue(ParallelismProperty.class).orElseThrow(
-            () -> new RuntimeException("No parallelism property on the destination vertex."));
-    return readForABlock * duplicatedDataMultiplier;
+    return partitioner;
   }
 }
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/PipeInputReader.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/PipeInputReader.java
new file mode 100644
index 0000000..eb3999b
--- /dev/null
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/PipeInputReader.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.runtime.executor.datatransfer;
+
+import org.apache.nemo.common.exception.UnsupportedCommPatternException;
+import org.apache.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty;
+import org.apache.nemo.common.ir.vertex.IRVertex;
+import org.apache.nemo.runtime.common.plan.RuntimeEdge;
+import org.apache.nemo.runtime.executor.data.DataUtil;
+import org.apache.nemo.runtime.executor.data.PipeManagerWorker;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Represents the input data transfer to a task.
+ */
+public final class PipeInputReader implements InputReader {
+  private final PipeManagerWorker pipeManagerWorker;
+
+  private final int dstTaskIndex;
+
+  /**
+   * Attributes that specify how we should read the input.
+   */
+  private final IRVertex srcVertex;
+  private final RuntimeEdge runtimeEdge;
+
+  PipeInputReader(final int dstTaskIdx,
+                  final IRVertex srcIRVertex,
+                  final RuntimeEdge runtimeEdge,
+                  final PipeManagerWorker pipeManagerWorker) {
+    this.dstTaskIndex = dstTaskIdx;
+    this.srcVertex = srcIRVertex;
+    this.runtimeEdge = runtimeEdge;
+    this.pipeManagerWorker = pipeManagerWorker;
+  }
+
+  @Override
+  public List<CompletableFuture<DataUtil.IteratorWithNumBytes>> read() {
+    final Optional<CommunicationPatternProperty.Value> comValue =
+      runtimeEdge.getPropertyValue(CommunicationPatternProperty.class);
+
+    if (comValue.get().equals(CommunicationPatternProperty.Value.OneToOne)) {
+      return Collections.singletonList(pipeManagerWorker.read(dstTaskIndex, runtimeEdge, dstTaskIndex));
+    } else if (comValue.get().equals(CommunicationPatternProperty.Value.BroadCast)
+      || comValue.get().equals(CommunicationPatternProperty.Value.Shuffle)) {
+      final int numSrcTasks = InputReader.getSourceParallelism(this);
+      final List<CompletableFuture<DataUtil.IteratorWithNumBytes>> futures = new ArrayList<>();
+      for (int srcTaskIdx = 0; srcTaskIdx < numSrcTasks; srcTaskIdx++) {
+        futures.add(pipeManagerWorker.read(srcTaskIdx, runtimeEdge, dstTaskIndex));
+      }
+      return futures;
+    } else {
+      throw new UnsupportedCommPatternException(new Exception("Communication pattern not supported"));
+    }
+  }
+
+  @Override
+  public IRVertex getSrcIrVertex() {
+    return srcVertex;
+  }
+}
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/PipeOutputWriter.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/PipeOutputWriter.java
new file mode 100644
index 0000000..a5dbf93
--- /dev/null
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/PipeOutputWriter.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.runtime.executor.datatransfer;
+
+import org.apache.nemo.common.DirectByteArrayOutputStream;
+import org.apache.nemo.common.coder.EncoderFactory;
+import org.apache.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty;
+import org.apache.nemo.runtime.common.RuntimeIdManager;
+import org.apache.nemo.runtime.common.plan.RuntimeEdge;
+import org.apache.nemo.runtime.executor.bytetransfer.ByteOutputContext;
+import org.apache.nemo.runtime.executor.data.DataUtil;
+import org.apache.nemo.runtime.executor.data.PipeManagerWorker;
+import org.apache.nemo.runtime.executor.data.partitioner.Partitioner;
+import org.apache.nemo.runtime.executor.data.streamchainer.Serializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * Represents the output data transfer from a task.
+ */
+public final class PipeOutputWriter implements OutputWriter {
+  private static final Logger LOG = LoggerFactory.getLogger(OutputWriter.class.getName());
+
+  private final String srcTaskId;
+  private final PipeManagerWorker pipeManagerWorker;
+
+  private final Partitioner partitioner;
+  private final RuntimeEdge runtimeEdge;
+
+  private boolean initialized;
+  private Serializer serializer;
+  private List<ByteOutputContext> pipes;
+
+  /**
+   * Constructor.
+   *
+   * @param hashRangeMultiplier the {@link org.apache.nemo.conf.JobConf.HashRangeMultiplier}.
+   * @param srcTaskId           the id of the source task.
+   * @param runtimeEdge         the {@link RuntimeEdge}.
+   * @param pipeManagerWorker   the pipe manager.
+   */
+  PipeOutputWriter(final int hashRangeMultiplier,
+                   final String srcTaskId,
+                   final RuntimeEdge runtimeEdge,
+                   final PipeManagerWorker pipeManagerWorker) {
+    this.initialized = false;
+    this.srcTaskId = srcTaskId;
+    this.pipeManagerWorker = pipeManagerWorker;
+    this.pipeManagerWorker.notifyMaster(runtimeEdge.getId(), RuntimeIdManager.getIndexFromTaskId(srcTaskId));
+    this.partitioner = OutputWriter.getPartitioner(runtimeEdge, hashRangeMultiplier);
+    this.runtimeEdge = runtimeEdge;
+  }
+
+  /**
+   * Writes output element.
+   * @param element the element to write.
+   */
+  @Override
+  public void write(final Object element) {
+    if (!initialized) {
+      doInitialize();
+    }
+
+    try (final ByteOutputContext.ByteOutputStream pipeToWriteTo = getPipeToWrite(element)) {
+      // Serialize (Do not compress)
+      final DirectByteArrayOutputStream bytesOutputStream = new DirectByteArrayOutputStream();
+      final OutputStream wrapped = DataUtil.buildOutputStream(bytesOutputStream, serializer.getEncodeStreamChainers());
+      final EncoderFactory.Encoder encoder = serializer.getEncoderFactory().create(wrapped);
+      encoder.encode(element);
+      wrapped.close();
+
+      // Write
+      pipeToWriteTo.write(bytesOutputStream.getBufDirectly());
+    } catch (IOException e) {
+      throw new RuntimeException(e); // For now we crash the executor on IOException
+    }
+  }
+
+  @Override
+  public Optional<Long> getWrittenBytes() {
+    return Optional.empty();
+  }
+
+  @Override
+  public void close() {
+    if (!initialized) {
+      // In order to "wire-up" with the receivers waiting for us.:w
+      doInitialize();
+    }
+
+    pipes.forEach(pipe -> {
+      try {
+        pipe.close();
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    });
+  }
+
+  private void doInitialize() {
+    initialized = true;
+
+    // Blocking call
+    this.pipes = pipeManagerWorker.getOutputContexts(runtimeEdge, RuntimeIdManager.getIndexFromTaskId(srcTaskId));
+    this.serializer = pipeManagerWorker.getSerializer(runtimeEdge.getId());
+  }
+
+  private ByteOutputContext.ByteOutputStream getPipeToWrite(final Object element) throws IOException {
+    return runtimeEdge.getPropertyValue(CommunicationPatternProperty.class)
+      .get()
+      .equals(CommunicationPatternProperty.Value.OneToOne)
+      ? pipes.get(0).newOutputStream()
+      : pipes.get((int) partitioner.partition(element)).newOutputStream();
+  }
+}
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/ParentTaskDataFetcher.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/ParentTaskDataFetcher.java
index 6098b30..e546ee7 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/ParentTaskDataFetcher.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/ParentTaskDataFetcher.java
@@ -69,6 +69,7 @@ class ParentTaskDataFetcher extends DataFetcher {
       }
 
       while (true) {
+
         // This iterator has the element
         if (this.currentIterator.hasNext()) {
           return this.currentIterator.next();
@@ -84,6 +85,7 @@ class ParentTaskDataFetcher extends DataFetcher {
           // We've consumed all the iterators
           break;
         }
+
       }
     } catch (final Throwable e) {
       // Any failure is caught and thrown as an IOException, so that the task is retried.
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java
index 449542a..98600cd 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java
@@ -90,14 +90,14 @@ public final class TaskExecutor {
    * @param task                   Task with information needed during execution.
    * @param irVertexDag            A DAG of vertices.
    * @param taskStateManager       State manager for this Task.
-   * @param dataTransferFactory    For reading from/writing to data to other tasks.
+   * @param intermediateDataIOFactory    For reading from/writing to data to other tasks.
    * @param broadcastManagerWorker For broadcasts.
    * @param metricMessageSender    For sending metric with execution stats to Master.
    */
   public TaskExecutor(final Task task,
                       final DAG<IRVertex, RuntimeEdge<IRVertex>> irVertexDag,
                       final TaskStateManager taskStateManager,
-                      final DataTransferFactory dataTransferFactory,
+                      final IntermediateDataIOFactory intermediateDataIOFactory,
                       final BroadcastManagerWorker broadcastManagerWorker,
                       final MetricMessageSender metricMessageSender,
                       final PersistentConnectionToMasterMap persistentConnectionToMasterMap) {
@@ -117,7 +117,7 @@ public final class TaskExecutor {
     this.persistentConnectionToMasterMap = persistentConnectionToMasterMap;
 
     // Prepare data structures
-    final Pair<List<DataFetcher>, List<VertexHarness>> pair = prepare(task, irVertexDag, dataTransferFactory);
+    final Pair<List<DataFetcher>, List<VertexHarness>> pair = prepare(task, irVertexDag, intermediateDataIOFactory);
     this.nonBroadcastDataFetchers = pair.left();
     this.sortedHarnesses = pair.right();
   }
@@ -141,11 +141,13 @@ public final class TaskExecutor {
    *
    * @param task        task.
    * @param irVertexDag dag.
+   * @param intermediateDataIOFactory intermediate IO.
    * @return fetchers and harnesses.
    */
-  private Pair<List<DataFetcher>, List<VertexHarness>> prepare(final Task task,
-                                                               final DAG<IRVertex, RuntimeEdge<IRVertex>> irVertexDag,
-                                                               final DataTransferFactory dataTransferFactory) {
+  private Pair<List<DataFetcher>, List<VertexHarness>> prepare(
+    final Task task,
+    final DAG<IRVertex, RuntimeEdge<IRVertex>> irVertexDag,
+    final IntermediateDataIOFactory intermediateDataIOFactory) {
     final int taskIndex = RuntimeIdManager.getIndexFromTaskId(task.getTaskId());
 
     // Traverse in a reverse-topological order to ensure that each visited vertex's children vertices exist.
@@ -164,12 +166,12 @@ public final class TaskExecutor {
       final Map<String, List<OperatorVertex>> internalAdditionalOutputMap =
         getInternalAdditionalOutputMap(irVertex, irVertexDag);
       final Map<String, List<OutputWriter>> externalAdditionalOutputMap =
-        getExternalAdditionalOutputMap(irVertex, task.getTaskOutgoingEdges(), dataTransferFactory);
+        getExternalAdditionalOutputMap(irVertex, task.getTaskOutgoingEdges(), intermediateDataIOFactory);
 
       // Main outputs
       final List<OperatorVertex> internalMainOutputs = getInternalMainOutputs(irVertex, irVertexDag);
       final List<OutputWriter> externalMainOutputs =
-        getExternalMainOutputs(irVertex, task.getTaskOutgoingEdges(), dataTransferFactory);
+        getExternalMainOutputs(irVertex, task.getTaskOutgoingEdges(), intermediateDataIOFactory);
 
       final OutputCollector outputCollector;
 
@@ -209,7 +211,7 @@ public final class TaskExecutor {
         .filter(stageEdge -> stageEdge.getPropertyValue(BroadcastVariableIdProperty.class).isPresent())
         .collect(Collectors.toList());
       final List<InputReader> broadcastReaders =
-        getParentTaskReaders(taskIndex, broadcastInEdges, dataTransferFactory);
+        getParentTaskReaders(taskIndex, broadcastInEdges, intermediateDataIOFactory);
       if (broadcastInEdges.size() != broadcastReaders.size()) {
         throw new IllegalStateException(broadcastInEdges.toString() + ", " + broadcastReaders.toString());
       }
@@ -224,7 +226,7 @@ public final class TaskExecutor {
       final List<StageEdge> nonBroadcastInEdges = new ArrayList<>(inEdgesForThisVertex);
       nonBroadcastInEdges.removeAll(broadcastInEdges);
       final List<InputReader> nonBroadcastReaders =
-        getParentTaskReaders(taskIndex, nonBroadcastInEdges, dataTransferFactory);
+        getParentTaskReaders(taskIndex, nonBroadcastInEdges, intermediateDataIOFactory);
       nonBroadcastReaders.forEach(parentTaskReader -> nonBroadcastDataFetcherList.add(
         new ParentTaskDataFetcher(parentTaskReader.getSrcIrVertex(), parentTaskReader,
           new DataFetcherOutputCollector((OperatorVertex) irVertex))));
@@ -456,7 +458,7 @@ public final class TaskExecutor {
   private Map<String, List<OutputWriter>> getExternalAdditionalOutputMap(
     final IRVertex irVertex,
     final List<StageEdge> outEdgesToChildrenTasks,
-    final DataTransferFactory dataTransferFactory) {
+    final IntermediateDataIOFactory intermediateDataIOFactory) {
     // Add all inter-task additional tags to additional output map.
     final Map<String, List<OutputWriter>> map = new HashMap<>();
 
@@ -466,7 +468,7 @@ public final class TaskExecutor {
       .filter(edge -> edge.getPropertyValue(AdditionalOutputTagProperty.class).isPresent())
       .map(edge ->
         Pair.of(edge.getPropertyValue(AdditionalOutputTagProperty.class).get(),
-          dataTransferFactory.createWriter(taskId, edge.getDstIRVertex(), edge)))
+          intermediateDataIOFactory.createWriter(taskId, edge)))
       .forEach(pair -> {
         map.putIfAbsent(pair.left(), new ArrayList<>());
         map.get(pair.left()).add(pair.right());
@@ -495,7 +497,7 @@ public final class TaskExecutor {
   }
 
   private List<OperatorVertex> getInternalMainOutputs(final IRVertex irVertex,
-                                                     final DAG<IRVertex, RuntimeEdge<IRVertex>> irVertexDag) {
+                                                      final DAG<IRVertex, RuntimeEdge<IRVertex>> irVertexDag) {
     return irVertexDag.getOutgoingEdgesOf(irVertex.getId())
       .stream()
       .filter(edge -> !edge.getPropertyValue(AdditionalOutputTagProperty.class).isPresent())
@@ -508,18 +510,18 @@ public final class TaskExecutor {
    *
    * @param irVertex                source irVertex
    * @param outEdgesToChildrenTasks outgoing edges to child tasks
-   * @param dataTransferFactory     dataTransferFactory
+   * @param intermediateDataIOFactory     intermediateDataIOFactory
    * @return OutputWriters for main children tasks
    */
   private List<OutputWriter> getExternalMainOutputs(final IRVertex irVertex,
-                                                   final List<StageEdge> outEdgesToChildrenTasks,
-                                                   final DataTransferFactory dataTransferFactory) {
+                                                    final List<StageEdge> outEdgesToChildrenTasks,
+                                                    final IntermediateDataIOFactory intermediateDataIOFactory) {
     return outEdgesToChildrenTasks
       .stream()
       .filter(edge -> edge.getSrcIRVertex().getId().equals(irVertex.getId()))
       .filter(edge -> !edge.getPropertyValue(AdditionalOutputTagProperty.class).isPresent())
-      .map(outEdgeForThisVertex -> dataTransferFactory
-        .createWriter(taskId, outEdgeForThisVertex.getDstIRVertex(), outEdgeForThisVertex))
+      .map(outEdgeForThisVertex -> intermediateDataIOFactory
+        .createWriter(taskId, outEdgeForThisVertex))
       .collect(Collectors.toList());
   }
 
@@ -539,10 +541,10 @@ public final class TaskExecutor {
 
   private List<InputReader> getParentTaskReaders(final int taskIndex,
                                                  final List<StageEdge> inEdgesFromParentTasks,
-                                                 final DataTransferFactory dataTransferFactory) {
+                                                 final IntermediateDataIOFactory intermediateDataIOFactory) {
     return inEdgesFromParentTasks
       .stream()
-      .map(inEdgeForThisVertex -> dataTransferFactory
+      .map(inEdgeForThisVertex -> intermediateDataIOFactory
         .createReader(taskIndex, inEdgeForThisVertex.getSrcIRVertex(), inEdgeForThisVertex))
       .collect(Collectors.toList());
   }
diff --git a/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/datatransfer/DataTransferTest.java b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/datatransfer/DataTransferTest.java
index 933a414..7befbe0 100644
--- a/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/datatransfer/DataTransferTest.java
+++ b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/datatransfer/DataTransferTest.java
@@ -45,11 +45,13 @@ import org.apache.nemo.runtime.common.plan.StageEdge;
 import org.apache.nemo.runtime.executor.Executor;
 import org.apache.nemo.runtime.executor.TestUtil;
 import org.apache.nemo.runtime.executor.data.BlockManagerWorker;
+import org.apache.nemo.runtime.executor.data.DataUtil;
 import org.apache.nemo.runtime.executor.data.SerializerManager;
 import org.apache.nemo.runtime.master.*;
 import org.apache.nemo.runtime.master.eventhandler.UpdatePhysicalPlanEventHandler;
-import org.apache.beam.sdk.values.KV;
 import org.apache.commons.io.FileUtils;
+import org.apache.nemo.runtime.master.scheduler.BatchScheduler;
+import org.apache.nemo.runtime.master.scheduler.Scheduler;
 import org.apache.reef.driver.evaluator.EvaluatorRequestor;
 import org.apache.reef.io.network.naming.NameResolverConfiguration;
 import org.apache.reef.io.network.naming.NameServer;
@@ -69,13 +71,15 @@ import org.powermock.modules.junit4.PowerMockRunner;
 
 import java.io.File;
 import java.io.IOException;
-import java.io.Serializable;
 import java.util.*;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
 
 import static org.apache.nemo.common.dag.DAG.EMPTY_DAG_DIRECTORY;
 import static org.apache.nemo.runtime.common.RuntimeTestUtil.getRangedNumList;
@@ -83,6 +87,7 @@ import static org.apache.nemo.runtime.common.RuntimeTestUtil.flatten;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
+import static org.powermock.api.mockito.PowerMockito.when;
 
 /**
  * Tests {@link InputReader} and {@link OutputWriter}.
@@ -116,7 +121,7 @@ public final class DataTransferTest {
 
   private BlockManagerMaster master;
   private BlockManagerWorker worker1;
-  private DataTransferFactory transferFactory;
+  private IntermediateDataIOFactory transferFactory;
   private BlockManagerWorker worker2;
   private HashMap<BlockManagerWorker, SerializerManager> serializerManagers = new HashMap<>();
 
@@ -140,6 +145,7 @@ public final class DataTransferTest {
     injector.bindVolatileParameter(JobConf.DAGDirectory.class, EMPTY_DAG_DIRECTORY);
 
     // Necessary for wiring up the message environments
+    injector.bindVolatileInstance(Scheduler.class, injector.getInstance(BatchScheduler.class));
     injector.getInstance(RuntimeMaster.class);
     final BlockManagerMaster master = injector.getInstance(BlockManagerMaster.class);
 
@@ -147,7 +153,7 @@ public final class DataTransferTest {
     nameClientInjector.bindVolatileParameter(JobConf.JobId.class, "data transfer test");
 
     this.master = master;
-    final Pair<BlockManagerWorker, DataTransferFactory> pair1 = createWorker(
+    final Pair<BlockManagerWorker, IntermediateDataIOFactory> pair1 = createWorker(
         EXECUTOR_ID_PREFIX + executorCount.getAndIncrement(), dispatcherInjector, nameClientInjector);
     this.worker1 = pair1.left();
     this.transferFactory = pair1.right();
@@ -161,7 +167,7 @@ public final class DataTransferTest {
     FileUtils.deleteDirectory(new File(TMP_REMOTE_FILE_DIRECTORY));
   }
 
-  private Pair<BlockManagerWorker, DataTransferFactory> createWorker(
+  private Pair<BlockManagerWorker, IntermediateDataIOFactory> createWorker(
       final String executorId,
       final Injector dispatcherInjector,
       final Injector nameClientInjector) throws InjectionException {
@@ -180,12 +186,12 @@ public final class DataTransferTest {
     injector.bindVolatileParameter(JobConf.GlusterVolumeDirectory.class, TMP_REMOTE_FILE_DIRECTORY);
     final BlockManagerWorker blockManagerWorker;
     final SerializerManager serializerManager;
-    final DataTransferFactory dataTransferFactory;
+    final IntermediateDataIOFactory intermediateDataIOFactory;
     try {
       blockManagerWorker = injector.getInstance(BlockManagerWorker.class);
       serializerManager = injector.getInstance(SerializerManager.class);
       serializerManagers.put(blockManagerWorker, serializerManager);
-      dataTransferFactory = injector.getInstance(DataTransferFactory.class);
+      intermediateDataIOFactory = injector.getInstance(IntermediateDataIOFactory.class);
     } catch (final InjectionException e) {
       throw new RuntimeException(e);
     }
@@ -193,7 +199,7 @@ public final class DataTransferTest {
     // Unused, but necessary for wiring up the message environments
     injector.getInstance(Executor.class);
 
-    return Pair.of(blockManagerWorker, dataTransferFactory);
+    return Pair.of(blockManagerWorker, intermediateDataIOFactory);
   }
 
   private Injector createNameClientInjector() {
@@ -320,11 +326,9 @@ public final class DataTransferTest {
     final ExecutionPropertyMap edgeProperties = dummyIREdge.getExecutionProperties();
     final RuntimeEdge dummyEdge;
 
-    final IRVertex srcMockVertex = mock(IRVertex.class);
-    final IRVertex dstMockVertex = mock(IRVertex.class);
     final Stage srcStage = setupStages("srcStage" + testIndex);
     final Stage dstStage = setupStages("dstStage" + testIndex);
-    dummyEdge = new StageEdge(edgeId, edgeProperties, srcMockVertex, dstMockVertex, srcStage, dstStage);
+    dummyEdge = new StageEdge(edgeId, edgeProperties, srcVertex, dstVertex, srcStage, dstStage);
 
     // Initialize states in Master
     TestUtil.generateTaskIds(srcStage).forEach(srcTaskId -> {
@@ -336,7 +340,7 @@ public final class DataTransferTest {
     final List<List> dataWrittenList = new ArrayList<>();
     TestUtil.generateTaskIds(srcStage).forEach(srcTaskId -> {
       final List dataWritten = getRangedNumList(0, PARALLELISM_TEN);
-      final OutputWriter writer = transferFactory.createWriter(srcTaskId, dstVertex, dummyEdge);
+      final OutputWriter writer = transferFactory.createWriter(srcTaskId, dummyEdge);
       dataWritten.iterator().forEachRemaining(writer::write);
       writer.close();
       dataWrittenList.add(dataWritten);
@@ -346,13 +350,13 @@ public final class DataTransferTest {
     final List<List> dataReadList = new ArrayList<>();
     IntStream.range(0, PARALLELISM_TEN).forEach(dstTaskIndex -> {
       final InputReader reader =
-          new InputReader(dstTaskIndex, srcVertex, dummyEdge, receiver);
+          new BlockInputReader(dstTaskIndex, srcVertex, dummyEdge, receiver);
 
-      assertEquals(PARALLELISM_TEN, reader.getSourceParallelism());
+      assertEquals(PARALLELISM_TEN, InputReader.getSourceParallelism(reader));
 
       final List dataRead = new ArrayList<>();
       try {
-        InputReader.combineFutures(reader.read()).forEachRemaining(dataRead::add);
+        combineFutures(reader.read()).forEachRemaining(dataRead::add);
       } catch (final Exception e) {
         throw new RuntimeException(e);
       }
@@ -411,13 +415,10 @@ public final class DataTransferTest {
     final RuntimeEdge dummyEdge, dummyEdge2;
     final ExecutionPropertyMap edgeProperties = dummyIREdge.getExecutionProperties();
 
-    final IRVertex srcMockVertex = mock(IRVertex.class);
-    final IRVertex dstMockVertex = mock(IRVertex.class);
     final Stage srcStage = setupStages("srcStage" + testIndex);
     final Stage dstStage = setupStages("dstStage" + testIndex);
-    dummyEdge = new StageEdge(edgeId, edgeProperties, srcMockVertex, dstMockVertex, srcStage, dstStage);
-    final IRVertex dstMockVertex2 = mock(IRVertex.class);
-    dummyEdge2 = new StageEdge(edgeId2, edgeProperties, srcMockVertex, dstMockVertex2, srcStage, dstStage);
+    dummyEdge = new StageEdge(edgeId, edgeProperties, srcVertex, dstVertex, srcStage, dstStage);
+    dummyEdge2 = new StageEdge(edgeId2, edgeProperties, srcVertex, dstVertex, srcStage, dstStage);
     // Initialize states in Master
     TestUtil.generateTaskIds(srcStage).forEach(srcTaskId -> {
       final String blockId = RuntimeIdManager.generateBlockId(edgeId, srcTaskId);
@@ -429,12 +430,12 @@ public final class DataTransferTest {
     final List<List> dataWrittenList = new ArrayList<>();
     TestUtil.generateTaskIds(srcStage).forEach(srcTaskId -> {
       final List dataWritten = getRangedNumList(0, PARALLELISM_TEN);
-      final OutputWriter writer = transferFactory.createWriter(srcTaskId, dstVertex, dummyEdge);
+      final OutputWriter writer = transferFactory.createWriter(srcTaskId, dummyEdge);
       dataWritten.iterator().forEachRemaining(writer::write);
       writer.close();
       dataWrittenList.add(dataWritten);
 
-      final OutputWriter writer2 = transferFactory.createWriter(srcTaskId, dstVertex, dummyEdge2);
+      final OutputWriter writer2 = transferFactory.createWriter(srcTaskId, dummyEdge2);
       dataWritten.iterator().forEachRemaining(writer2::write);
       writer2.close();
     });
@@ -444,17 +445,17 @@ public final class DataTransferTest {
     final List<List> dataReadList2 = new ArrayList<>();
     IntStream.range(0, PARALLELISM_TEN).forEach(dstTaskIndex -> {
       final InputReader reader =
-          new InputReader(dstTaskIndex, srcVertex, dummyEdge, receiver);
+          new BlockInputReader(dstTaskIndex, srcVertex, dummyEdge, receiver);
       final InputReader reader2 =
-          new InputReader(dstTaskIndex, srcVertex, dummyEdge2, receiver);
+          new BlockInputReader(dstTaskIndex, srcVertex, dummyEdge2, receiver);
 
-      assertEquals(PARALLELISM_TEN, reader.getSourceParallelism());
+      assertEquals(PARALLELISM_TEN, InputReader.getSourceParallelism(reader));
 
-      assertEquals(PARALLELISM_TEN, reader2.getSourceParallelism());
+      assertEquals(PARALLELISM_TEN, InputReader.getSourceParallelism(reader));
 
       final List dataRead = new ArrayList<>();
       try {
-        InputReader.combineFutures(reader.read()).forEachRemaining(dataRead::add);
+        combineFutures(reader.read()).forEachRemaining(dataRead::add);
       } catch (final Exception e) {
         throw new RuntimeException(e);
       }
@@ -462,7 +463,7 @@ public final class DataTransferTest {
 
       final List dataRead2 = new ArrayList<>();
       try {
-        InputReader.combineFutures(reader2.read()).forEachRemaining(dataRead2::add);
+        combineFutures(reader2.read()).forEachRemaining(dataRead2::add);
       } catch (final Exception e) {
         throw new RuntimeException(e);
       }
@@ -534,5 +535,25 @@ public final class DataTransferTest {
     stageExecutionProperty.put(ScheduleGroupProperty.of(0));
     return new Stage(stageId, emptyDag, stageExecutionProperty, Collections.emptyList());
   }
+
+  /**
+   * Combine the given list of futures.
+   *
+   * @param futures to combine.
+   * @return the combined iterable of elements.
+   * @throws ExecutionException   when fail to get results from futures.
+   * @throws InterruptedException when interrupted during getting results from futures.
+   */
+  private Iterator combineFutures(final List<CompletableFuture<DataUtil.IteratorWithNumBytes>> futures)
+    throws ExecutionException, InterruptedException {
+    final List concatStreamBase = new ArrayList<>();
+    Stream<Object> concatStream = concatStreamBase.stream();
+    for (int srcTaskIdx = 0; srcTaskIdx < futures.size(); srcTaskIdx++) {
+      final Iterator dataFromATask = futures.get(srcTaskIdx).get();
+      final Iterable iterable = () -> dataFromATask;
+      concatStream = Stream.concat(concatStream, StreamSupport.stream(iterable.spliterator(), false));
+    }
+    return concatStream.collect(Collectors.toList()).iterator();
+  }
 }
 
diff --git a/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/ParentTaskDataFetcherTest.java b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/ParentTaskDataFetcherTest.java
index 705f833..6cbf694 100644
--- a/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/ParentTaskDataFetcherTest.java
+++ b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/ParentTaskDataFetcherTest.java
@@ -22,9 +22,11 @@ import org.apache.nemo.common.ir.OutputCollector;
 import org.apache.nemo.common.ir.vertex.IRVertex;
 import org.apache.nemo.common.punctuation.Finishmark;
 import org.apache.nemo.runtime.executor.data.DataUtil;
+import org.apache.nemo.runtime.executor.datatransfer.BlockInputReader;
 import org.apache.nemo.runtime.executor.datatransfer.InputReader;
 import org.junit.Test;
 import org.junit.runner.RunWith;
+import org.mockito.Mockito;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
@@ -42,7 +44,7 @@ import static org.mockito.Mockito.when;
  * Tests {@link ParentTaskDataFetcher}.
  */
 @RunWith(PowerMockRunner.class)
-@PrepareForTest({InputReader.class, VertexHarness.class})
+@PrepareForTest({InputReader.class, VertexHarness.class, BlockInputReader.class})
 public final class ParentTaskDataFetcherTest {
 
   @Test(timeout=5000)
@@ -125,7 +127,7 @@ public final class ParentTaskDataFetcherTest {
   }
 
   private InputReader generateInputReader(final CompletableFuture completableFuture) {
-    final InputReader inputReader = mock(InputReader.class);
+    final InputReader inputReader = mock(InputReader.class, Mockito.CALLS_REAL_METHODS);
     when(inputReader.read()).thenReturn(Arrays.asList(completableFuture));
     return inputReader;
   }
diff --git a/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/TaskExecutorTest.java b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/TaskExecutorTest.java
index e0c4f95..b7da5fa 100644
--- a/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/TaskExecutorTest.java
+++ b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/TaskExecutorTest.java
@@ -33,6 +33,7 @@ import org.apache.nemo.common.ir.executionproperty.EdgeExecutionProperty;
 import org.apache.nemo.common.ir.executionproperty.VertexExecutionProperty;
 import org.apache.nemo.common.ir.vertex.InMemorySourceVertex;
 import org.apache.nemo.common.ir.vertex.OperatorVertex;
+import org.apache.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
 import org.apache.nemo.common.ir.vertex.SourceVertex;
 import org.apache.nemo.common.ir.vertex.transform.Transform;
 import org.apache.nemo.common.ir.executionproperty.ExecutionPropertyMap;
@@ -47,9 +48,7 @@ import org.apache.nemo.runtime.executor.MetricMessageSender;
 import org.apache.nemo.runtime.executor.TaskStateManager;
 import org.apache.nemo.runtime.executor.data.BroadcastManagerWorker;
 import org.apache.nemo.runtime.executor.data.DataUtil;
-import org.apache.nemo.runtime.executor.datatransfer.DataTransferFactory;
-import org.apache.nemo.runtime.executor.datatransfer.InputReader;
-import org.apache.nemo.runtime.executor.datatransfer.OutputWriter;
+import org.apache.nemo.runtime.executor.datatransfer.*;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -78,8 +77,8 @@ import static org.mockito.Mockito.*;
  * Tests {@link TaskExecutor}.
  */
 @RunWith(PowerMockRunner.class)
-@PrepareForTest({InputReader.class, OutputWriter.class, DataTransferFactory.class, BroadcastManagerWorker.class,
-    TaskStateManager.class, StageEdge.class, PersistentConnectionToMasterMap.class, Stage.class, IREdge.class})
+@PrepareForTest({InputReader.class, OutputWriter.class, IntermediateDataIOFactory.class, BroadcastManagerWorker.class,
+  TaskStateManager.class, StageEdge.class, PersistentConnectionToMasterMap.class, Stage.class, IREdge.class})
 public final class TaskExecutorTest {
   private static final AtomicInteger RUNTIME_EDGE_ID = new AtomicInteger(0);
   private static final int DATA_SIZE = 100;
@@ -90,7 +89,7 @@ public final class TaskExecutorTest {
 
   private List<Integer> elements;
   private Map<String, List> runtimeEdgeToOutputData;
-  private DataTransferFactory dataTransferFactory;
+  private IntermediateDataIOFactory intermediateDataIOFactory;
   private BroadcastManagerWorker broadcastManagerWorker;
   private TaskStateManager taskStateManager;
   private MetricMessageSender metricMessageSender;
@@ -110,11 +109,11 @@ public final class TaskExecutorTest {
     // Mock a TaskStateManager. It accumulates the state change into a list.
     taskStateManager = mock(TaskStateManager.class);
 
-    // Mock a DataTransferFactory.
+    // Mock a IntermediateDataIOFactory.
     runtimeEdgeToOutputData = new HashMap<>();
-    dataTransferFactory = mock(DataTransferFactory.class);
-    when(dataTransferFactory.createReader(anyInt(), any(), any())).then(new ParentTaskReaderAnswer());
-    when(dataTransferFactory.createWriter(any(), any(), any())).then(new ChildTaskWriterAnswer());
+    intermediateDataIOFactory = mock(IntermediateDataIOFactory.class);
+    when(intermediateDataIOFactory.createReader(anyInt(), any(), any())).then(new ParentTaskReaderAnswer());
+    when(intermediateDataIOFactory.createWriter(any(), any())).then(new ChildTaskWriterAnswer());
 
     // Mock a MetricMessageSender.
     metricMessageSender = mock(MetricMessageSender.class);
@@ -532,9 +531,9 @@ public final class TaskExecutorTest {
       }
       final InputReader inputReader = mock(InputReader.class);
       final IRVertex srcVertex = (IRVertex) invocationOnMock.getArgument(1);
+      srcVertex.setProperty(ParallelismProperty.of(SOURCE_PARALLELISM));
       when(inputReader.getSrcIrVertex()).thenReturn(srcVertex);
       when(inputReader.read()).thenReturn(inputFutures);
-      when(inputReader.getSourceParallelism()).thenReturn(SOURCE_PARALLELISM);
       return inputReader;
     }
   }
@@ -547,7 +546,7 @@ public final class TaskExecutorTest {
     @Override
     public OutputWriter answer(final InvocationOnMock invocationOnMock) throws Throwable {
       final Object[] args = invocationOnMock.getArguments();
-      final RuntimeEdge runtimeEdge = (RuntimeEdge) args[2];
+      final RuntimeEdge runtimeEdge = (RuntimeEdge) args[1];
       final OutputWriter outputWriter = mock(OutputWriter.class);
       doAnswer(new Answer() {
         @Override
@@ -689,7 +688,7 @@ public final class TaskExecutorTest {
   }
 
   private TaskExecutor getTaskExecutor(final Task task, final DAG<IRVertex, RuntimeEdge<IRVertex>> taskDag) {
-    return new TaskExecutor(task, taskDag, taskStateManager, dataTransferFactory, broadcastManagerWorker,
+    return new TaskExecutor(task, taskDag, taskStateManager, intermediateDataIOFactory, broadcastManagerWorker,
       metricMessageSender, persistentConnectionToMasterMap);
   }
 }
diff --git a/runtime/master/src/main/java/org/apache/nemo/runtime/master/PipeManagerMaster.java b/runtime/master/src/main/java/org/apache/nemo/runtime/master/PipeManagerMaster.java
new file mode 100644
index 0000000..f2c5d18
--- /dev/null
+++ b/runtime/master/src/main/java/org/apache/nemo/runtime/master/PipeManagerMaster.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.runtime.master;
+
+import org.apache.nemo.common.Pair;
+import org.apache.nemo.common.exception.IllegalMessageException;
+import org.apache.nemo.runtime.common.RuntimeIdManager;
+import org.apache.nemo.runtime.common.comm.ControlMessage;
+import org.apache.nemo.runtime.common.message.MessageContext;
+import org.apache.nemo.runtime.common.message.MessageEnvironment;
+import org.apache.nemo.runtime.common.message.MessageListener;
+import org.apache.reef.annotations.audience.DriverSide;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.ThreadSafe;
+import javax.inject.Inject;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * Master-side pipe manager.
+ */
+@ThreadSafe
+@DriverSide
+public final class PipeManagerMaster {
+  private static final Logger LOG = LoggerFactory.getLogger(PipeManagerMaster.class.getName());
+  private final Map<Pair<String, Long>, String> runtimeEdgeSrcIndexToExecutor;
+  private final Map<Pair<String, Long>, Lock> runtimeEdgeSrcIndexToLock;
+  private final Map<Pair<String, Long>, Condition> runtimeEdgeSrcIndexToCondition;
+  private final ExecutorService waitForPipe;
+
+  /**
+   * Constructor.
+   * @param masterMessageEnvironment the message environment.
+   */
+  @Inject
+  private PipeManagerMaster(final MessageEnvironment masterMessageEnvironment) {
+    masterMessageEnvironment.setupListener(MessageEnvironment.PIPE_MANAGER_MASTER_MESSAGE_LISTENER_ID,
+      new PipeManagerMasterControlMessageReceiver());
+    this.runtimeEdgeSrcIndexToExecutor = new ConcurrentHashMap<>();
+    this.runtimeEdgeSrcIndexToLock = new ConcurrentHashMap<>();
+    this.runtimeEdgeSrcIndexToCondition = new ConcurrentHashMap<>();
+    this.waitForPipe = Executors.newCachedThreadPool();
+  }
+
+  public void onTaskScheduled(final String edgeId, final long srcIndex) {
+    final Pair<String, Long> keyPair = Pair.of(edgeId, srcIndex);
+    if (null != runtimeEdgeSrcIndexToLock.put(keyPair, new ReentrantLock())) {
+      throw new IllegalStateException(keyPair.toString());
+    }
+    if (null != runtimeEdgeSrcIndexToCondition.put(keyPair, runtimeEdgeSrcIndexToLock.get(keyPair).newCondition())) {
+      throw new IllegalStateException(keyPair.toString());
+    }
+  }
+
+  /**
+   * Handler for control messages received.
+   */
+  public final class PipeManagerMasterControlMessageReceiver implements MessageListener<ControlMessage.Message> {
+    @Override
+    public void onMessage(final ControlMessage.Message message) {
+      switch (message.getType()) {
+        case PipeInit:
+          final ControlMessage.PipeInitMessage pipeInitMessage = message.getPipeInitMsg();
+          final Pair<String, Long> keyPair =
+            Pair.of(pipeInitMessage.getRuntimeEdgeId(), pipeInitMessage.getSrcTaskIndex());
+
+          // Allow to put at most once
+          final Lock lock = runtimeEdgeSrcIndexToLock.get(keyPair);
+          lock.lock();
+          try {
+            if (null != runtimeEdgeSrcIndexToExecutor.put(keyPair, pipeInitMessage.getExecutorId())) {
+              throw new RuntimeException(keyPair.toString());
+            }
+            runtimeEdgeSrcIndexToCondition.get(keyPair).signalAll();
+          } finally {
+            lock.unlock();
+          }
+
+          break;
+        default:
+          throw new IllegalMessageException(new Exception(message.toString()));
+      }
+
+
+    }
+
+    @Override
+    public void onMessageWithContext(final ControlMessage.Message message, final MessageContext messageContext) {
+      switch (message.getType()) {
+        case RequestPipeLoc:
+          final ControlMessage.RequestPipeLocationMessage pipeLocRequest = message.getRequestPipeLocMsg();
+
+          // Use the executor service to avoid blocking the networking thread.
+          waitForPipe.submit(() -> {
+            final Pair<String, Long> keyPair =
+              Pair.of(pipeLocRequest.getRuntimeEdgeId(), pipeLocRequest.getSrcTaskIndex());
+
+            final Lock lock = runtimeEdgeSrcIndexToLock.get(keyPair);
+            lock.lock();
+            try {
+              if (!runtimeEdgeSrcIndexToExecutor.containsKey(keyPair)) {
+                runtimeEdgeSrcIndexToCondition.get(keyPair).await();
+              }
+
+              final String location = runtimeEdgeSrcIndexToExecutor.get(keyPair);
+              if (location == null) {
+                throw new IllegalStateException(keyPair.toString());
+              }
+
+              // Reply the location
+              messageContext.reply(
+                ControlMessage.Message.newBuilder()
+                  .setId(RuntimeIdManager.generateMessageId())
+                  .setListenerId(MessageEnvironment.EXECUTOR_MESSAGE_LISTENER_ID)
+                  .setType(ControlMessage.MessageType.PipeLocInfo)
+                  .setPipeLocInfoMsg(ControlMessage.PipeLocationInfoMessage.newBuilder()
+                    .setRequestId(message.getId())
+                    .setExecutorId(location)
+                    .build())
+                  .build());
+            } catch (InterruptedException e) {
+              throw new RuntimeException(e);
+            } finally {
+              lock.unlock();
+            }
+          });
+
+          break;
+        default:
+          throw new IllegalMessageException(new Exception(message.toString()));
+      }
+    }
+  }
+}
diff --git a/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/Scheduler.java b/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/Scheduler.java
index 94289e8..b8d00b3 100644
--- a/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/Scheduler.java
+++ b/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/Scheduler.java
@@ -22,7 +22,6 @@ import org.apache.nemo.runtime.common.plan.PhysicalPlan;
 import org.apache.nemo.runtime.common.state.TaskState;
 import org.apache.nemo.runtime.master.resource.ExecutorRepresenter;
 import org.apache.reef.annotations.audience.DriverSide;
-import org.apache.reef.tang.annotations.DefaultImplementation;
 
 import javax.annotation.Nullable;
 
@@ -33,7 +32,6 @@ import javax.annotation.Nullable;
  * Other scheduler-related classes that are accessed by only one of the two threads are not synchronized(NotThreadSafe).
  */
 @DriverSide
-@DefaultImplementation(BatchScheduler.class)
 public interface Scheduler {
 
   /**
diff --git a/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/StreamingScheduler.java b/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/StreamingScheduler.java
index 48f5e8c..71f1da2 100644
--- a/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/StreamingScheduler.java
+++ b/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/StreamingScheduler.java
@@ -18,7 +18,6 @@
  */
 package org.apache.nemo.runtime.master.scheduler;
 
-import com.google.common.collect.Lists;
 import org.apache.nemo.common.exception.UnknownExecutionStateException;
 import org.apache.nemo.common.ir.Readable;
 import org.apache.nemo.runtime.common.RuntimeIdManager;
@@ -35,6 +34,7 @@ import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.NotThreadSafe;
+import javax.inject.Inject;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
@@ -42,7 +42,7 @@ import java.util.stream.Collectors;
 /**
  * A simple scheduler for streaming workloads.
  * - Keeps track of new executors
- * - Schedules all tasks in a reverse topological order.
+ * - Schedules all tasks in the plan at once.
  * - Crashes the system upon any other events (should be fixed in the future)
  * - Never stops running.
  */
@@ -55,15 +55,19 @@ public final class StreamingScheduler implements Scheduler {
   private final PendingTaskCollectionPointer pendingTaskCollectionPointer;
   private final ExecutorRegistry executorRegistry;
   private final PlanStateManager planStateManager;
+  private final PipeManagerMaster pipeManagerMaster;
 
+  @Inject
   StreamingScheduler(final TaskDispatcher taskDispatcher,
                      final PendingTaskCollectionPointer pendingTaskCollectionPointer,
                      final ExecutorRegistry executorRegistry,
-                     final PlanStateManager planStateManager) {
+                     final PlanStateManager planStateManager,
+                     final PipeManagerMaster pipeManagerMaster) {
     this.taskDispatcher = taskDispatcher;
     this.pendingTaskCollectionPointer = pendingTaskCollectionPointer;
     this.executorRegistry = executorRegistry;
     this.planStateManager = planStateManager;
+    this.pipeManagerMaster = pipeManagerMaster;
   }
 
   @Override
@@ -75,8 +79,8 @@ public final class StreamingScheduler implements Scheduler {
     planStateManager.storeJSON("submitted");
 
     // Prepare tasks
-    final List<Stage> reverseTopoStages = Lists.reverse(submittedPhysicalPlan.getStageDAG().getTopologicalSort());
-    final List<Task> reverseTopoTasks = reverseTopoStages.stream().flatMap(stageToSchedule -> {
+    final List<Stage> allStages = submittedPhysicalPlan.getStageDAG().getTopologicalSort();
+    final List<Task> allTasks = allStages.stream().flatMap(stageToSchedule -> {
       // Helper variables for this stage
       final List<StageEdge> stageIncomingEdges =
         submittedPhysicalPlan.getStageDAG().getIncomingEdgesOf(stageToSchedule.getId());
@@ -85,6 +89,11 @@ public final class StreamingScheduler implements Scheduler {
       final List<Map<String, Readable>> vertexIdToReadables = stageToSchedule.getVertexIdToReadables();
       final List<String> taskIdsToSchedule = planStateManager.getTaskAttemptsToSchedule(stageToSchedule.getId());
 
+      taskIdsToSchedule.forEach(taskId -> {
+        final int index = RuntimeIdManager.getIndexFromTaskId(taskId);
+        stageOutgoingEdges.forEach(outEdge -> pipeManagerMaster.onTaskScheduled(outEdge.getId(), index));
+      });
+
       // Create tasks of this stage
       return taskIdsToSchedule.stream().map(taskId -> new Task(
         submittedPhysicalPlan.getPlanId(),
@@ -97,7 +106,8 @@ public final class StreamingScheduler implements Scheduler {
     }).collect(Collectors.toList());
 
     // Schedule everything at once
-    pendingTaskCollectionPointer.setToOverwrite(reverseTopoTasks);
+    pendingTaskCollectionPointer.setToOverwrite(allTasks);
+    taskDispatcher.onNewPendingTaskCollectionAvailable();
   }
 
   @Override
@@ -113,11 +123,15 @@ public final class StreamingScheduler implements Scheduler {
                                             final TaskState.State newState,
                                             @Nullable final String vertexPutOnHold,
                                             final TaskState.RecoverableTaskFailureCause failureCause) {
+    planStateManager.onTaskStateChanged(taskId, newState);
+
     switch (newState) {
       case COMPLETE:
-      case SHOULD_RETRY:
+        // Do nothing.
+        break;
       case ON_HOLD:
       case FAILED:
+      case SHOULD_RETRY:
         // TODO #226: StreamingScheduler Fault Tolerance
         throw new UnsupportedOperationException();
       case READY:
@@ -137,6 +151,7 @@ public final class StreamingScheduler implements Scheduler {
   @Override
   public void onExecutorAdded(final ExecutorRepresenter executorRepresenter) {
     LOG.info("{} added (node: {})", executorRepresenter.getExecutorId(), executorRepresenter.getNodeName());
+    taskDispatcher.onExecutorSlotAvailable();
     executorRegistry.registerExecutor(executorRepresenter);
   }
 
diff --git a/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/TaskDispatcher.java b/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/TaskDispatcher.java
index fa0caf5..c50f1e9 100644
--- a/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/TaskDispatcher.java
+++ b/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/TaskDispatcher.java
@@ -50,7 +50,7 @@ import javax.inject.Inject;
 final class TaskDispatcher {
   private static final Logger LOG = LoggerFactory.getLogger(TaskDispatcher.class.getName());
   private final PendingTaskCollectionPointer pendingTaskCollectionPointer;
-  private final ExecutorService schedulerThread;
+  private final ExecutorService dispatcherThread;
   private final PlanStateManager planStateManager;
   private boolean isSchedulerRunning;
   private boolean isTerminated;
@@ -67,7 +67,7 @@ final class TaskDispatcher {
                          final ExecutorRegistry executorRegistry,
                          final PlanStateManager planStateManager) {
     this.pendingTaskCollectionPointer = pendingTaskCollectionPointer;
-    this.schedulerThread = Executors.newSingleThreadExecutor(runnable ->
+    this.dispatcherThread = Executors.newSingleThreadExecutor(runnable ->
         new Thread(runnable, "TaskDispatcher thread"));
     this.planStateManager = planStateManager;
     this.isSchedulerRunning = false;
@@ -81,7 +81,7 @@ final class TaskDispatcher {
    * A separate thread is run to dispatch tasks to executors.
    * See comments in the {@link Scheduler} for avoiding race conditions.
    */
-  private final class SchedulerThread implements Runnable {
+  private final class TaskDispatcherThread implements Runnable {
     @Override
     public void run() {
       while (!isTerminated) {
@@ -167,8 +167,8 @@ final class TaskDispatcher {
    */
   void run() {
     if (!isTerminated && !isSchedulerRunning) {
-      schedulerThread.execute(new SchedulerThread());
-      schedulerThread.shutdown();
+      dispatcherThread.execute(new TaskDispatcherThread());
+      dispatcherThread.shutdown();
       isSchedulerRunning = true;
     }
   }
diff --git a/runtime/master/src/test/java/org/apache/nemo/runtime/master/scheduler/StreamingSchedulerTest.java b/runtime/master/src/test/java/org/apache/nemo/runtime/master/scheduler/StreamingSchedulerTest.java
index 56e69eb..34a1cff 100644
--- a/runtime/master/src/test/java/org/apache/nemo/runtime/master/scheduler/StreamingSchedulerTest.java
+++ b/runtime/master/src/test/java/org/apache/nemo/runtime/master/scheduler/StreamingSchedulerTest.java
@@ -23,6 +23,7 @@ import org.apache.nemo.runtime.common.RuntimeIdManager;
 import org.apache.nemo.runtime.common.plan.PhysicalPlan;
 import org.apache.nemo.runtime.common.plan.TestPlanGenerator;
 import org.apache.nemo.runtime.master.BlockManagerMaster;
+import org.apache.nemo.runtime.master.PipeManagerMaster;
 import org.apache.nemo.runtime.master.PlanStateManager;
 import org.apache.nemo.runtime.master.eventhandler.UpdatePhysicalPlanEventHandler;
 import org.junit.Before;
@@ -46,7 +47,8 @@ import static org.mockito.Mockito.when;
  */
 @RunWith(PowerMockRunner.class)
 @PrepareForTest({BlockManagerMaster.class, PubSubEventHandlerWrapper.class, UpdatePhysicalPlanEventHandler.class,
-  TaskDispatcher.class, PendingTaskCollectionPointer.class, ExecutorRegistry.class, PlanStateManager.class})
+  TaskDispatcher.class, PendingTaskCollectionPointer.class, ExecutorRegistry.class, PlanStateManager.class,
+  PipeManagerMaster.class})
 public final class StreamingSchedulerTest {
   private static final int ATTEMPTS_PER_STAGE = 2;
 
@@ -60,13 +62,15 @@ public final class StreamingSchedulerTest {
     this.pendingTaskCollectionPointer = pendingTaskCollectionPointer;
     final ExecutorRegistry executorRegistry = mock(ExecutorRegistry.class);
     final PlanStateManager planStateManager = mock(PlanStateManager.class);
+    final PipeManagerMaster pipeManagerMaster = mock(PipeManagerMaster.class);
 
     when(planStateManager.getTaskAttemptsToSchedule(any())).thenAnswer(invocationOnMock -> {
       final String stageId = invocationOnMock.getArgument(0);
       return generateAttempts(stageId);
     });
 
-    scheduler = new StreamingScheduler(taskDispatcher, pendingTaskCollectionPointer, executorRegistry, planStateManager);
+    scheduler = new StreamingScheduler(
+      taskDispatcher, pendingTaskCollectionPointer, executorRegistry, planStateManager, pipeManagerMaster);
   }
 
   private List<String> generateAttempts(final String stageId) {
diff --git a/runtime/master/src/test/java/org/apache/nemo/runtime/master/scheduler/TaskRetryTest.java b/runtime/master/src/test/java/org/apache/nemo/runtime/master/scheduler/TaskRetryTest.java
index 11022b1..7af0cbe 100644
--- a/runtime/master/src/test/java/org/apache/nemo/runtime/master/scheduler/TaskRetryTest.java
+++ b/runtime/master/src/test/java/org/apache/nemo/runtime/master/scheduler/TaskRetryTest.java
@@ -246,7 +246,7 @@ public final class TaskRetryTest {
     injector.bindVolatileInstance(UpdatePhysicalPlanEventHandler.class, mock(UpdatePhysicalPlanEventHandler.class));
     injector.bindVolatileInstance(SchedulingConstraintRegistry.class, mock(SchedulingConstraintRegistry.class));
     planStateManager = injector.getInstance(PlanStateManager.class);
-    scheduler = injector.getInstance(Scheduler.class);
+    scheduler = injector.getInstance(BatchScheduler.class);
     blockManagerMaster = injector.getInstance(BlockManagerMaster.class);
 
     scheduler.schedulePlan(plan, MAX_SCHEDULE_ATTEMPT);