You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nemo.apache.org by wo...@apache.org on 2019/09/24 08:27:38 UTC

[incubator-nemo] branch master updated: [NEMO-418] BlockFetchFailureProperty (#238)

This is an automated email from the ASF dual-hosted git repository.

wonook pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git


The following commit(s) were added to refs/heads/master by this push:
     new 7f11840  [NEMO-418] BlockFetchFailureProperty (#238)
7f11840 is described below

commit 7f11840dd6492efb71892db9760ca426993f34aa
Author: John Yang <jo...@apache.org>
AuthorDate: Tue Sep 24 17:27:33 2019 +0900

    [NEMO-418] BlockFetchFailureProperty (#238)
    
    JIRA: NEMO-418: BlockFetchFailureProperty
    
    Major changes:
    
    Introduces BlockFetchFailureProperty that is useful for IREdges such as those from transient to reserved resources
    In BlockManagerWorker, provides an option to wait until fetching "all elements" of each block
    In ParentTaskDataFetcher, provides an option to retry fetching blocks
    Minor changes to note:
    
    Implements BlockInputReader#retry
    Uses a guava cache with a 2-second eviction policy for block location responses
    TransientResourcePolicy uses BlockFetchFailureProperty
    Tests for the changes:
    
    ParentTaskDataFetcherTest#testErrorWhenFutureWithRetry
---
 .../BlockFetchFailureProperty.java                 |  66 +++++++++++++
 .../annotating/TransientResourceDataFlowPass.java  |  58 ------------
 ...java => TransientResourceDataTransferPass.java} |  48 +++++-----
 .../composite/TransientResourceCompositePass.java  |   6 +-
 .../optimizer/policy/PolicyBuilderTest.java        |   2 +-
 .../compiler/optimizer/policy/PolicyImplTest.java  |  17 ----
 .../TransientResourceCompositePassTest.java        |   6 +-
 .../runtime/executor/data/BlockManagerWorker.java  | 104 ++++++++++++++-------
 .../executor/datatransfer/BlockInputReader.java    |  66 +++++++++----
 .../runtime/executor/datatransfer/InputReader.java |  14 ++-
 .../executor/datatransfer/PipeInputReader.java     |  12 +++
 .../executor/task/ParentTaskDataFetcher.java       |  51 +++++++---
 .../executor/task/ParentTaskDataFetcherTest.java   |  52 ++++++++---
 .../runtime/executor/task/TaskExecutorTest.java    |   1 +
 14 files changed, 321 insertions(+), 182 deletions(-)

diff --git a/common/src/main/java/org/apache/nemo/common/ir/edge/executionproperty/BlockFetchFailureProperty.java b/common/src/main/java/org/apache/nemo/common/ir/edge/executionproperty/BlockFetchFailureProperty.java
new file mode 100644
index 0000000..a58354f
--- /dev/null
+++ b/common/src/main/java/org/apache/nemo/common/ir/edge/executionproperty/BlockFetchFailureProperty.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.common.ir.edge.executionproperty;
+
+import org.apache.nemo.common.ir.executionproperty.EdgeExecutionProperty;
+
+/**
+ * Decides how to react to a data block fetch failure.
+ */
+public final class BlockFetchFailureProperty extends EdgeExecutionProperty<BlockFetchFailureProperty.Value> {
+  /**
+   * Constructor.
+   *
+   * @param value value of the execution property.
+   */
+  private BlockFetchFailureProperty(final Value value) {
+    super(value);
+  }
+
+  /**
+   * Static method exposing the constructor.
+   *
+   * @param value value of the new execution property.
+   * @return the newly created execution property.
+   */
+  public static BlockFetchFailureProperty of(final Value value) {
+    return new BlockFetchFailureProperty(value);
+  }
+
+  /**
+   * Possible values of DataFlowModel ExecutionProperty.
+   */
+  public enum Value {
+    /**
+     * (DEFAULT BEHAVIOR)
+     * The task will be cancelled and retried by the scheduler.
+     */
+    CANCEL_TASK,
+
+    /**
+     * Do not cancel the running task.
+     * Instead, retry fetching the data block two seconds after the fetch failure.
+     *
+     * We wait two seconds in the hope that the parent task will be re-scheduled by the master,
+     * and make the block "available" again.
+     * Upon the failure of the retry, we retry again. (i.e., we retry forever)
+     */
+    RETRY_AFTER_TWO_SECONDS_FOREVER,
+  }
+}
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/TransientResourceDataFlowPass.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/TransientResourceDataFlowPass.java
deleted file mode 100644
index 0dca02c..0000000
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/TransientResourceDataFlowPass.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.nemo.compiler.optimizer.pass.compiletime.annotating;
-
-import org.apache.nemo.common.ir.IRDAG;
-import org.apache.nemo.common.ir.edge.IREdge;
-import org.apache.nemo.common.ir.edge.executionproperty.DataFlowProperty;
-import org.apache.nemo.common.ir.vertex.executionproperty.ResourcePriorityProperty;
-import org.apache.nemo.compiler.optimizer.pass.compiletime.Requires;
-
-import java.util.List;
-
-import static org.apache.nemo.compiler.optimizer.pass.compiletime.annotating.TransientResourceDataStorePass.fromTransientToReserved;
-
-/**
- * Push from transient resources to reserved resources.
- */
-@Annotates(DataFlowProperty.class)
-@Requires(ResourcePriorityProperty.class)
-public final class TransientResourceDataFlowPass extends AnnotatingPass {
-  /**
-   * Default constructor.
-   */
-  public TransientResourceDataFlowPass() {
-    super(TransientResourceDataFlowPass.class);
-  }
-
-  @Override
-  public IRDAG apply(final IRDAG dag) {
-    dag.getVertices().forEach(vertex -> {
-      final List<IREdge> inEdges = dag.getIncomingEdgesOf(vertex);
-      if (!inEdges.isEmpty()) {
-        inEdges.forEach(edge -> {
-          if (fromTransientToReserved(edge)) {
-            edge.setPropertyPermanently(DataFlowProperty.of(DataFlowProperty.Value.PUSH));
-          }
-        });
-      }
-    });
-    return dag;
-  }
-}
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/TransientResourceDataStorePass.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/TransientResourceDataTransferPass.java
similarity index 55%
rename from compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/TransientResourceDataStorePass.java
rename to compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/TransientResourceDataTransferPass.java
index 97642b8..becb92d 100644
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/TransientResourceDataStorePass.java
+++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/TransientResourceDataTransferPass.java
@@ -20,24 +20,25 @@ package org.apache.nemo.compiler.optimizer.pass.compiletime.annotating;
 
 import org.apache.nemo.common.ir.IRDAG;
 import org.apache.nemo.common.ir.edge.IREdge;
-import org.apache.nemo.common.ir.edge.executionproperty.DataStoreProperty;
+import org.apache.nemo.common.ir.edge.executionproperty.BlockFetchFailureProperty;
+import org.apache.nemo.common.ir.edge.executionproperty.DataFlowProperty;
+import org.apache.nemo.common.ir.vertex.IRVertex;
 import org.apache.nemo.common.ir.vertex.executionproperty.ResourcePriorityProperty;
 import org.apache.nemo.compiler.optimizer.pass.compiletime.Requires;
 
 import java.util.List;
-import java.util.Optional;
 
 /**
- * Transient resource pass for tagging edges with DataStore ExecutionProperty.
+ * Optimizes IREdges between transient resources and reserved resources.
  */
-@Annotates(DataStoreProperty.class)
+@Annotates({DataFlowProperty.class, BlockFetchFailureProperty.class})
 @Requires(ResourcePriorityProperty.class)
-public final class TransientResourceDataStorePass extends AnnotatingPass {
+public final class TransientResourceDataTransferPass extends AnnotatingPass {
   /**
    * Default constructor.
    */
-  public TransientResourceDataStorePass() {
-    super(TransientResourceDataStorePass.class);
+  public TransientResourceDataTransferPass() {
+    super(TransientResourceDataTransferPass.class);
   }
 
   @Override
@@ -47,12 +48,9 @@ public final class TransientResourceDataStorePass extends AnnotatingPass {
       if (!inEdges.isEmpty()) {
         inEdges.forEach(edge -> {
           if (fromTransientToReserved(edge)) {
-            if (!Optional.of(DataStoreProperty.Value.SERIALIZED_MEMORY_STORE)
-              .equals(edge.getPropertyValue(DataStoreProperty.class))) {
-              edge.setPropertyPermanently(DataStoreProperty.of(DataStoreProperty.Value.MEMORY_STORE));
-            }
-          } else if (fromReservedToTransient(edge)) {
-            edge.setPropertyPermanently(DataStoreProperty.of(DataStoreProperty.Value.LOCAL_FILE_STORE));
+            edge.setPropertyPermanently(DataFlowProperty.of(DataFlowProperty.Value.PUSH));
+            edge.setPropertyPermanently(BlockFetchFailureProperty.of(
+              BlockFetchFailureProperty.Value.RETRY_AFTER_TWO_SECONDS_FOREVER));
           }
         });
       }
@@ -66,11 +64,9 @@ public final class TransientResourceDataStorePass extends AnnotatingPass {
    * @param irEdge edge to check.
    * @return whether or not the edge satisfies the condition.
    */
-  static boolean fromTransientToReserved(final IREdge irEdge) {
-    return ResourcePriorityProperty.TRANSIENT
-      .equals(irEdge.getSrc().getPropertyValue(ResourcePriorityProperty.class).get())
-      && ResourcePriorityProperty.RESERVED
-      .equals(irEdge.getDst().getPropertyValue(ResourcePriorityProperty.class).get());
+  private boolean fromTransientToReserved(final IREdge irEdge) {
+    return ResourcePriorityProperty.TRANSIENT.equals(getResourcePriority(irEdge.getSrc()))
+      && ResourcePriorityProperty.RESERVED.equals(getResourcePriority(irEdge.getDst()));
   }
 
   /**
@@ -79,10 +75,16 @@ public final class TransientResourceDataStorePass extends AnnotatingPass {
    * @param irEdge edge to check.
    * @return whether or not the edge satisfies the condition.
    */
-  static boolean fromReservedToTransient(final IREdge irEdge) {
-    return ResourcePriorityProperty.RESERVED
-      .equals(irEdge.getSrc().getPropertyValue(ResourcePriorityProperty.class).get())
-      && ResourcePriorityProperty.TRANSIENT
-      .equals(irEdge.getDst().getPropertyValue(ResourcePriorityProperty.class).get());
+  private boolean fromReservedToTransient(final IREdge irEdge) {
+    return ResourcePriorityProperty.RESERVED.equals(getResourcePriority(irEdge.getSrc()))
+      && ResourcePriorityProperty.TRANSIENT.equals(getResourcePriority(irEdge.getDst()));
+  }
+
+  /**
+   * @param irVertex that is assigned with a resource priority.
+   * @return the resource priority string.
+   */
+  private String getResourcePriority(final IRVertex irVertex) {
+    return irVertex.getPropertyValue(ResourcePriorityProperty.class).orElseThrow(IllegalStateException::new);
   }
 }
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/composite/TransientResourceCompositePass.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/composite/TransientResourceCompositePass.java
index a848488..6c11c98 100644
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/composite/TransientResourceCompositePass.java
+++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/composite/TransientResourceCompositePass.java
@@ -18,8 +18,7 @@
  */
 package org.apache.nemo.compiler.optimizer.pass.compiletime.composite;
 
-import org.apache.nemo.compiler.optimizer.pass.compiletime.annotating.TransientResourceDataFlowPass;
-import org.apache.nemo.compiler.optimizer.pass.compiletime.annotating.TransientResourceDataStorePass;
+import org.apache.nemo.compiler.optimizer.pass.compiletime.annotating.TransientResourceDataTransferPass;
 import org.apache.nemo.compiler.optimizer.pass.compiletime.annotating.TransientResourcePriorityPass;
 
 import java.util.Arrays;
@@ -35,8 +34,7 @@ public final class TransientResourceCompositePass extends CompositePass {
   public TransientResourceCompositePass() {
     super(Arrays.asList(
       new TransientResourcePriorityPass(),
-      new TransientResourceDataStorePass(),
-      new TransientResourceDataFlowPass()
+      new TransientResourceDataTransferPass()
     ));
   }
 }
diff --git a/compiler/optimizer/src/test/java/org/apache/nemo/compiler/optimizer/policy/PolicyBuilderTest.java b/compiler/optimizer/src/test/java/org/apache/nemo/compiler/optimizer/policy/PolicyBuilderTest.java
index 1e1721a..606b985 100644
--- a/compiler/optimizer/src/test/java/org/apache/nemo/compiler/optimizer/policy/PolicyBuilderTest.java
+++ b/compiler/optimizer/src/test/java/org/apache/nemo/compiler/optimizer/policy/PolicyBuilderTest.java
@@ -35,7 +35,7 @@ public final class PolicyBuilderTest {
 
   @Test
   public void testTransientResourcePolicy() {
-    assertEquals(17, TransientResourcePolicy.BUILDER.getCompileTimePasses().size());
+    assertEquals(16, TransientResourcePolicy.BUILDER.getCompileTimePasses().size());
     assertEquals(0, TransientResourcePolicy.BUILDER.getRunTimePasses().size());
   }
 
diff --git a/compiler/optimizer/src/test/java/org/apache/nemo/compiler/optimizer/policy/PolicyImplTest.java b/compiler/optimizer/src/test/java/org/apache/nemo/compiler/optimizer/policy/PolicyImplTest.java
index 6089766..dbbd134 100644
--- a/compiler/optimizer/src/test/java/org/apache/nemo/compiler/optimizer/policy/PolicyImplTest.java
+++ b/compiler/optimizer/src/test/java/org/apache/nemo/compiler/optimizer/policy/PolicyImplTest.java
@@ -99,23 +99,6 @@ public final class PolicyImplTest {
   }
 
   @Test
-  public void testTransientAndDisaggregationCombination() throws Exception {
-    final List<CompileTimePass> compileTimePasses = new ArrayList<>();
-    final Set<RunTimePass<?>> runTimePasses = new HashSet<>();
-    compileTimePasses.addAll(TransientResourcePolicy.BUILDER.getCompileTimePasses());
-    runTimePasses.addAll(TransientResourcePolicy.BUILDER.getRunTimePasses());
-    compileTimePasses.addAll(DisaggregationPolicy.BUILDER.getCompileTimePasses());
-    runTimePasses.addAll(DisaggregationPolicy.BUILDER.getRunTimePasses());
-
-    final Policy combinedPolicy = new PolicyImpl(compileTimePasses, runTimePasses);
-
-    // This should throw an exception.
-    // Not all data store should be transferred from and to the GFS.
-    expectedException.expect(CompileTimeOptimizationException.class);
-    combinedPolicy.runCompileTimeOptimization(dag, DAG.EMPTY_DAG_DIRECTORY);
-  }
-
-  @Test
   public void testDataSkewAndLargeShuffleCombination() throws Exception {
     final List<CompileTimePass> compileTimePasses = new ArrayList<>();
     final Set<RunTimePass<?>> runTimePasses = new HashSet<>();
diff --git a/compiler/test/src/test/java/org/apache/nemo/compiler/optimizer/pass/compiletime/composite/TransientResourceCompositePassTest.java b/compiler/test/src/test/java/org/apache/nemo/compiler/optimizer/pass/compiletime/composite/TransientResourceCompositePassTest.java
index 23dc5b8..ae8d30b 100644
--- a/compiler/test/src/test/java/org/apache/nemo/compiler/optimizer/pass/compiletime/composite/TransientResourceCompositePassTest.java
+++ b/compiler/test/src/test/java/org/apache/nemo/compiler/optimizer/pass/compiletime/composite/TransientResourceCompositePassTest.java
@@ -25,7 +25,7 @@ import org.apache.nemo.common.ir.edge.executionproperty.DataStoreProperty;
 import org.apache.nemo.common.ir.vertex.IRVertex;
 import org.apache.nemo.common.ir.vertex.executionproperty.ResourcePriorityProperty;
 import org.apache.nemo.compiler.CompilerTestUtil;
-import org.apache.nemo.compiler.optimizer.pass.compiletime.annotating.TransientResourceDataStorePass;
+import org.apache.nemo.compiler.optimizer.pass.compiletime.annotating.TransientResourceDataTransferPass;
 import org.apache.nemo.compiler.optimizer.pass.compiletime.annotating.TransientResourcePriorityPass;
 import org.junit.Before;
 import org.junit.Test;
@@ -36,7 +36,7 @@ import org.powermock.modules.junit4.PowerMockRunner;
 import static org.junit.Assert.assertEquals;
 
 /**
- * Test {@link TransientResourcePriorityPass} and {@link TransientResourceDataStorePass}.
+ * Test {@link TransientResourcePriorityPass} and {@link TransientResourceDataTransferPass}.
  */
 @RunWith(PowerMockRunner.class)
 @PrepareForTest(JobLauncher.class)
@@ -49,7 +49,7 @@ public class TransientResourceCompositePassTest {
   }
 
   @Test
-  public void testTransientResourcePass() throws Exception {
+  public void testTransientResourcePass() {
     final IRDAG processedDAG = new TransientResourceCompositePass().apply(compiledDAG);
 
     final IRVertex vertexX = processedDAG.getTopologicalSort().get(0);
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/BlockManagerWorker.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/BlockManagerWorker.java
index f8861df..8814121 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/BlockManagerWorker.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/BlockManagerWorker.java
@@ -18,6 +18,9 @@
  */
 package org.apache.nemo.runtime.executor.data;
 
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.InvalidProtocolBufferException;
 import org.apache.commons.lang3.SerializationUtils;
@@ -26,8 +29,11 @@ import org.apache.nemo.common.exception.BlockFetchException;
 import org.apache.nemo.common.exception.BlockWriteException;
 import org.apache.nemo.common.exception.UnsupportedBlockStoreException;
 import org.apache.nemo.common.exception.UnsupportedExecutionPropertyException;
+import org.apache.nemo.common.ir.edge.executionproperty.BlockFetchFailureProperty;
 import org.apache.nemo.common.ir.edge.executionproperty.DataPersistenceProperty;
 import org.apache.nemo.common.ir.edge.executionproperty.DataStoreProperty;
+import org.apache.nemo.common.ir.executionproperty.EdgeExecutionProperty;
+import org.apache.nemo.common.ir.executionproperty.ExecutionPropertyMap;
 import org.apache.nemo.conf.JobConf;
 import org.apache.nemo.runtime.common.RuntimeIdManager;
 import org.apache.nemo.runtime.common.comm.ControlMessage;
@@ -53,10 +59,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
+import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicInteger;
 
 /**
@@ -78,7 +81,7 @@ public final class BlockManagerWorker {
 
   // To-Master connections
   private final PersistentConnectionToMasterMap persistentConnectionToMasterMap;
-  private final Map<String, CompletableFuture<ControlMessage.Message>> pendingBlockLocationRequest;
+  private final LoadingCache<String, CompletableFuture<ControlMessage.Message>> blockLocationResponseCache;
 
   // To-Executor connections
   private final ByteTransfer byteTransfer;
@@ -121,7 +124,31 @@ public final class BlockManagerWorker {
     this.backgroundExecutorService = Executors.newFixedThreadPool(numThreads);
     this.blockToRemainingRead = new ConcurrentHashMap<>();
     this.serializerManager = serializerManager;
-    this.pendingBlockLocationRequest = new ConcurrentHashMap<>();
+    this.blockLocationResponseCache = CacheBuilder.newBuilder()
+      // 2 seconds might be enough for "concurrent pending" fetch requests to reuse the same location
+      .expireAfterWrite(2, TimeUnit.SECONDS)
+      // No other eviction policy such as maximum cache size (i.e., this cache is unbounded)
+      .build(new CacheLoader<String, CompletableFuture<ControlMessage.Message>>() {
+        @Override
+        public CompletableFuture<ControlMessage.Message> load(final String blockIdWildcard) {
+          // Ask Master for the location.
+          // (IMPORTANT): This 'request' effectively blocks the TaskExecutor thread if the block is IN_PROGRESS.
+          // We use this property to make the receiver task of a 'push' edge to wait in an Executor for its input data
+          // to become available.
+          return persistentConnectionToMasterMap
+            .getMessageSender(MessageEnvironment.BLOCK_MANAGER_MASTER_MESSAGE_LISTENER_ID).request(
+              ControlMessage.Message.newBuilder()
+                .setId(RuntimeIdManager.generateMessageId())
+                .setListenerId(MessageEnvironment.BLOCK_MANAGER_MASTER_MESSAGE_LISTENER_ID)
+                .setType(ControlMessage.MessageType.RequestBlockLocation)
+                .setRequestBlockLocationMsg(
+                  ControlMessage.RequestBlockLocationMsg.newBuilder()
+                    .setExecutorId(executorId)
+                    .setBlockIdWildcard(blockIdWildcard)
+                    .build())
+                .build());
+        }
+      });
     this.blockTransferThrottler = blockTransferThrottler;
   }
 
@@ -148,39 +175,22 @@ public final class BlockManagerWorker {
    *
    * @param blockIdWildcard of the block.
    * @param runtimeEdgeId   id of the runtime edge that corresponds to the block.
-   * @param blockStore      for the data storage.
+   * @param edgeProperties  for the edge.
    * @param keyRange        the key range descriptor
    * @return the {@link CompletableFuture} of the block.
    */
   public CompletableFuture<DataUtil.IteratorWithNumBytes> readBlock(
     final String blockIdWildcard,
     final String runtimeEdgeId,
-    final DataStoreProperty.Value blockStore,
+    final ExecutionPropertyMap<EdgeExecutionProperty> edgeProperties,
     final KeyRange keyRange) {
     // Let's see if a remote worker has it
-    final CompletableFuture<ControlMessage.Message> blockLocationFuture =
-      pendingBlockLocationRequest.computeIfAbsent(blockIdWildcard, blockIdToRequest -> {
-        // Ask Master for the location.
-        // (IMPORTANT): This 'request' effectively blocks the TaskExecutor thread if the block is IN_PROGRESS.
-        // We use this property to make the receiver task of a 'push' edge to wait in an Executor for its input data
-        // to become available.
-        final CompletableFuture<ControlMessage.Message> responseFromMasterFuture = persistentConnectionToMasterMap
-          .getMessageSender(MessageEnvironment.BLOCK_MANAGER_MASTER_MESSAGE_LISTENER_ID).request(
-            ControlMessage.Message.newBuilder()
-              .setId(RuntimeIdManager.generateMessageId())
-              .setListenerId(MessageEnvironment.BLOCK_MANAGER_MASTER_MESSAGE_LISTENER_ID)
-              .setType(ControlMessage.MessageType.RequestBlockLocation)
-              .setRequestBlockLocationMsg(
-                ControlMessage.RequestBlockLocationMsg.newBuilder()
-                  .setExecutorId(executorId)
-                  .setBlockIdWildcard(blockIdWildcard)
-                  .build())
-              .build());
-        return responseFromMasterFuture;
-      });
-    blockLocationFuture.whenComplete((message, throwable) -> {
-      pendingBlockLocationRequest.remove(blockIdWildcard);
-    });
+    final CompletableFuture<ControlMessage.Message> blockLocationFuture;
+    try {
+      blockLocationFuture = blockLocationResponseCache.get(blockIdWildcard);
+    } catch (ExecutionException e) {
+      throw new RuntimeException(e); // This should never happen, since we're only getting a "future"
+    }
 
     // Using thenCompose so that fetching block data starts after getting response from master.
     return blockLocationFuture.thenCompose(responseFromMaster -> {
@@ -199,6 +209,7 @@ public final class BlockManagerWorker {
       // This is the executor id that we wanted to know
       final String blockId = blockLocationInfoMsg.getBlockId();
       final String targetExecutorId = blockLocationInfoMsg.getOwnerExecutorId();
+      final DataStoreProperty.Value blockStore = edgeProperties.get(DataStoreProperty.class).get();
       if (targetExecutorId.equals(executorId) || targetExecutorId.equals(REMOTE_FILE_STORE)) {
         // Block resides in the evaluator
         return getDataFromLocalBlock(blockId, blockStore, keyRange);
@@ -228,9 +239,34 @@ public final class BlockManagerWorker {
           }
         });
 
-        return contextFuture
-          .thenApply(context -> new DataUtil.InputStreamIterator(context.getInputStreams(),
-            serializerManager.getSerializer(runtimeEdgeId)));
+        final BlockFetchFailureProperty.Value fetchFailure = edgeProperties.get(BlockFetchFailureProperty.class)
+          .orElse(BlockFetchFailureProperty.Value.CANCEL_TASK); // the default behavior.
+        if (!fetchFailure.equals(BlockFetchFailureProperty.Value.CANCEL_TASK)) {
+          /**
+           * Wait until fetching "all elements" of each block.
+           *
+           * Problem: If the task won't be cancelled upon fetch failure, then the task can potentially
+           * process blocks partially or process the same elements more than once.
+           *
+           * Solution: With this waiting, a task that fetches a block either
+           * - Processes all elements of the block
+           * - Processes no element of the block (i.e., Runs into a block fetch exception while waiting)
+           */
+          return contextFuture
+            .thenCompose(ByteInputContext::getCompletedFuture)
+            // thenApply waits for the future.
+            .thenApply(streams -> new DataUtil.InputStreamIterator<>(
+              streams, serializerManager.getSerializer(runtimeEdgeId)));
+        } else {
+          /**
+           * Process "each element" of a block as soon as the element comes in.
+           * No worries about partial/duplicate processing here, as the task will be cancelled and restarted cleanly.
+           * Probably best performance when there is no failure.
+           */
+          return contextFuture
+            .thenApply(context -> new DataUtil.InputStreamIterator<>(context.getInputStreams(),
+              serializerManager.getSerializer(runtimeEdgeId)));
+        }
       }
     });
   }
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/BlockInputReader.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/BlockInputReader.java
index a9ce06b..b03b395 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/BlockInputReader.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/BlockInputReader.java
@@ -23,9 +23,10 @@ import org.apache.nemo.common.KeyRange;
 import org.apache.nemo.common.exception.BlockFetchException;
 import org.apache.nemo.common.exception.UnsupportedCommPatternException;
 import org.apache.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty;
-import org.apache.nemo.common.ir.edge.executionproperty.DataStoreProperty;
 import org.apache.nemo.common.ir.edge.executionproperty.DuplicateEdgeGroupProperty;
 import org.apache.nemo.common.ir.edge.executionproperty.DuplicateEdgeGroupPropertyValue;
+import org.apache.nemo.common.ir.executionproperty.EdgeExecutionProperty;
+import org.apache.nemo.common.ir.executionproperty.ExecutionPropertyMap;
 import org.apache.nemo.common.ir.vertex.IRVertex;
 import org.apache.nemo.runtime.common.RuntimeIdManager;
 import org.apache.nemo.runtime.common.plan.RuntimeEdge;
@@ -38,6 +39,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
+import java.util.function.Predicate;
 
 /**
  * Represents the input data transfer to a task.
@@ -71,9 +73,25 @@ public final class BlockInputReader implements InputReader {
     if (comValue.get().equals(CommunicationPatternProperty.Value.ONE_TO_ONE)) {
       return Collections.singletonList(readOneToOne());
     } else if (comValue.get().equals(CommunicationPatternProperty.Value.BROADCAST)) {
-      return readBroadcast();
+      return readBroadcast(index -> true);
     } else if (comValue.get().equals(CommunicationPatternProperty.Value.SHUFFLE)) {
-      return readDataInRange();
+      return readDataInRange(index -> true);
+    } else {
+      throw new UnsupportedCommPatternException(new Exception("Communication pattern not supported"));
+    }
+  }
+
+  @Override
+  public CompletableFuture<DataUtil.IteratorWithNumBytes> retry(final int desiredIndex) {
+    final Optional<CommunicationPatternProperty.Value> comValue =
+      runtimeEdge.getPropertyValue(CommunicationPatternProperty.class);
+
+    if (comValue.get().equals(CommunicationPatternProperty.Value.ONE_TO_ONE)) {
+      return readOneToOne();
+    } else if (comValue.get().equals(CommunicationPatternProperty.Value.BROADCAST)) {
+      return checkSingleElement(readBroadcast(index -> index == desiredIndex));
+    } else if (comValue.get().equals(CommunicationPatternProperty.Value.SHUFFLE)) {
+      return checkSingleElement(readDataInRange(index -> index == desiredIndex));
     } else {
       throw new UnsupportedCommPatternException(new Exception("Communication pattern not supported"));
     }
@@ -84,6 +102,19 @@ public final class BlockInputReader implements InputReader {
     return srcVertex;
   }
 
+  @Override
+  public ExecutionPropertyMap<EdgeExecutionProperty> getProperties() {
+    return runtimeEdge.getExecutionProperties();
+  }
+
+  private CompletableFuture<DataUtil.IteratorWithNumBytes> checkSingleElement(
+    final List<CompletableFuture<DataUtil.IteratorWithNumBytes>> list) {
+    if (list.size() != 1) {
+      throw new IllegalArgumentException(list.toString());
+    }
+    return list.get(0);
+  }
+
   /**
    * See {@link RuntimeIdManager#generateBlockIdWildcard(String, int)} for information on block wildcards.
    *
@@ -102,21 +133,19 @@ public final class BlockInputReader implements InputReader {
 
   private CompletableFuture<DataUtil.IteratorWithNumBytes> readOneToOne() {
     final String blockIdWildcard = generateWildCardBlockId(dstTaskIndex);
-    final Optional<DataStoreProperty.Value> dataStoreProperty
-      = runtimeEdge.getPropertyValue(DataStoreProperty.class);
-    return blockManagerWorker.readBlock(blockIdWildcard, runtimeEdge.getId(), dataStoreProperty.get(), HashRange.all());
+    return blockManagerWorker.readBlock(
+      blockIdWildcard, runtimeEdge.getId(), runtimeEdge.getExecutionProperties(), HashRange.all());
   }
 
-  private List<CompletableFuture<DataUtil.IteratorWithNumBytes>> readBroadcast() {
+  private List<CompletableFuture<DataUtil.IteratorWithNumBytes>> readBroadcast(final Predicate<Integer> predicate) {
     final int numSrcTasks = InputReader.getSourceParallelism(this);
-    final Optional<DataStoreProperty.Value> dataStoreProperty
-      = runtimeEdge.getPropertyValue(DataStoreProperty.class);
-
     final List<CompletableFuture<DataUtil.IteratorWithNumBytes>> futures = new ArrayList<>();
     for (int srcTaskIdx = 0; srcTaskIdx < numSrcTasks; srcTaskIdx++) {
-      final String blockIdWildcard = generateWildCardBlockId(srcTaskIdx);
-      futures.add(blockManagerWorker.readBlock(
-        blockIdWildcard, runtimeEdge.getId(), dataStoreProperty.get(), HashRange.all()));
+      if (predicate.test(srcTaskIdx)) {
+        final String blockIdWildcard = generateWildCardBlockId(srcTaskIdx);
+        futures.add(blockManagerWorker.readBlock(
+          blockIdWildcard, runtimeEdge.getId(), runtimeEdge.getExecutionProperties(), HashRange.all()));
+      }
     }
 
     return futures;
@@ -127,9 +156,8 @@ public final class BlockInputReader implements InputReader {
    *
    * @return the list of the completable future of the data.
    */
-  private List<CompletableFuture<DataUtil.IteratorWithNumBytes>> readDataInRange() {
+  private List<CompletableFuture<DataUtil.IteratorWithNumBytes>> readDataInRange(final Predicate<Integer> predicate) {
     assert (runtimeEdge instanceof StageEdge);
-    final Optional<DataStoreProperty.Value> dataStoreProperty = runtimeEdge.getPropertyValue(DataStoreProperty.class);
     final List<KeyRange> keyRangeList = ((StageEdge) runtimeEdge).getKeyRanges();
     final KeyRange hashRangeToRead = keyRangeList.get(dstTaskIndex);
     if (hashRangeToRead == null) {
@@ -139,9 +167,11 @@ public final class BlockInputReader implements InputReader {
     final int numSrcTasks = InputReader.getSourceParallelism(this);
     final List<CompletableFuture<DataUtil.IteratorWithNumBytes>> futures = new ArrayList<>();
     for (int srcTaskIdx = 0; srcTaskIdx < numSrcTasks; srcTaskIdx++) {
-      final String blockIdWildcard = generateWildCardBlockId(srcTaskIdx);
-      futures.add(
-        blockManagerWorker.readBlock(blockIdWildcard, runtimeEdge.getId(), dataStoreProperty.get(), hashRangeToRead));
+      if (predicate.test(srcTaskIdx)) {
+        final String blockIdWildcard = generateWildCardBlockId(srcTaskIdx);
+        futures.add(blockManagerWorker.readBlock(
+          blockIdWildcard, runtimeEdge.getId(), runtimeEdge.getExecutionProperties(), hashRangeToRead));
+      }
     }
 
     return futures;
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/InputReader.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/InputReader.java
index 356d7f3..9cd0195 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/InputReader.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/InputReader.java
@@ -18,6 +18,8 @@
  */
 package org.apache.nemo.runtime.executor.datatransfer;
 
+import org.apache.nemo.common.ir.executionproperty.EdgeExecutionProperty;
+import org.apache.nemo.common.ir.executionproperty.ExecutionPropertyMap;
 import org.apache.nemo.common.ir.vertex.IRVertex;
 import org.apache.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
 import org.apache.nemo.runtime.executor.data.DataUtil;
@@ -32,12 +34,22 @@ public interface InputReader {
   /**
    * Reads input data depending on the communication pattern of the srcVertex.
    *
-   * @return the read data.
+   * @return the list of iterators.
    */
   List<CompletableFuture<DataUtil.IteratorWithNumBytes>> read();
 
+  /**
+   * Retry reading input data.
+   *
+   * @param index of the failed iterator in the list returned by read().
+   * @return the retried iterator.
+   */
+  CompletableFuture<DataUtil.IteratorWithNumBytes> retry(int index);
+
   IRVertex getSrcIrVertex();
 
+  ExecutionPropertyMap<EdgeExecutionProperty> getProperties();
+
   static int getSourceParallelism(final InputReader inputReader) {
     return inputReader.getSrcIrVertex().getPropertyValue(ParallelismProperty.class)
       .orElseThrow(() -> new IllegalStateException(inputReader.getSrcIrVertex().getId()));
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/PipeInputReader.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/PipeInputReader.java
index 329251d..9af48d1 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/PipeInputReader.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/PipeInputReader.java
@@ -20,6 +20,8 @@ package org.apache.nemo.runtime.executor.datatransfer;
 
 import org.apache.nemo.common.exception.UnsupportedCommPatternException;
 import org.apache.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty;
+import org.apache.nemo.common.ir.executionproperty.EdgeExecutionProperty;
+import org.apache.nemo.common.ir.executionproperty.ExecutionPropertyMap;
 import org.apache.nemo.common.ir.vertex.IRVertex;
 import org.apache.nemo.runtime.common.plan.RuntimeEdge;
 import org.apache.nemo.runtime.executor.data.DataUtil;
@@ -76,6 +78,16 @@ public final class PipeInputReader implements InputReader {
   }
 
   @Override
+  public CompletableFuture<DataUtil.IteratorWithNumBytes> retry(final int index) {
+    throw new UnsupportedOperationException(String.valueOf(index));
+  }
+
+  @Override
+  public ExecutionPropertyMap<EdgeExecutionProperty> getProperties() {
+    return runtimeEdge.getExecutionProperties();
+  }
+
+  @Override
   public IRVertex getSrcIrVertex() {
     return srcVertex;
   }
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/ParentTaskDataFetcher.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/ParentTaskDataFetcher.java
index d3c223f..a8ae4a9 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/ParentTaskDataFetcher.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/ParentTaskDataFetcher.java
@@ -19,6 +19,7 @@
 package org.apache.nemo.runtime.executor.task;
 
 import org.apache.nemo.common.ir.OutputCollector;
+import org.apache.nemo.common.ir.edge.executionproperty.BlockFetchFailureProperty;
 import org.apache.nemo.common.ir.vertex.IRVertex;
 import org.apache.nemo.common.punctuation.Finishmark;
 import org.apache.nemo.runtime.executor.data.DataUtil;
@@ -39,7 +40,7 @@ import java.util.concurrent.LinkedBlockingQueue;
 class ParentTaskDataFetcher extends DataFetcher {
   private static final Logger LOG = LoggerFactory.getLogger(ParentTaskDataFetcher.class);
 
-  private final InputReader readersForParentTask;
+  private final InputReader inputReader;
   private final LinkedBlockingQueue iteratorQueue;
 
   // Non-finals (lazy fetching)
@@ -51,10 +52,10 @@ class ParentTaskDataFetcher extends DataFetcher {
   private long encodedBytes = 0;
 
   ParentTaskDataFetcher(final IRVertex dataSource,
-                        final InputReader readerForParentTask,
+                        final InputReader inputReader,
                         final OutputCollector outputCollector) {
     super(dataSource, outputCollector);
-    this.readersForParentTask = readerForParentTask;
+    this.inputReader = inputReader;
     this.firstFetch = true;
     this.currentIteratorIndex = 0;
     this.iteratorQueue = new LinkedBlockingQueue<>();
@@ -119,22 +120,50 @@ class ParentTaskDataFetcher extends DataFetcher {
     }
   }
 
-  private void fetchDataLazily() throws IOException {
-    final List<CompletableFuture<DataUtil.IteratorWithNumBytes>> futures = readersForParentTask.read();
-    this.expectedNumOfIterators = futures.size();
-
-    futures.forEach(compFuture -> compFuture.whenComplete((iterator, exception) -> {
+  private void handleIncomingBlock(final int index,
+                                   final CompletableFuture<DataUtil.IteratorWithNumBytes> future) {
+    future.whenComplete((iterator, exception) -> {
       try {
         if (exception != null) {
-          iteratorQueue.put(exception);
+          final BlockFetchFailureProperty.Value fetchFailure = inputReader.getProperties()
+            .get(BlockFetchFailureProperty.class)
+            .orElse(BlockFetchFailureProperty.Value.CANCEL_TASK); // default behavior
+
+          if (fetchFailure.equals(BlockFetchFailureProperty.Value.RETRY_AFTER_TWO_SECONDS_FOREVER)) {
+            // Retry block fetch (keep the running task)
+            LOG.info("Retry src irvertex {} with index {} after two seconds",
+              inputReader.getSrcIrVertex().getId(), index);
+            final int twoSecondsInMs =  2 * 1000;
+            Thread.sleep(twoSecondsInMs);
+            final CompletableFuture<DataUtil.IteratorWithNumBytes> retryFuture = inputReader.retry(index);
+            handleIncomingBlock(index, retryFuture);
+          } else if (fetchFailure.equals(BlockFetchFailureProperty.Value.CANCEL_TASK)) {
+            // Retry the entire task
+            iteratorQueue.put(exception);
+          } else {
+            throw new UnsupportedOperationException(fetchFailure.toString());
+          }
         } else {
-          iteratorQueue.put(iterator);
+          // Process the iterator
+          iteratorQueue.put(iterator); // can block here
         }
       } catch (final InterruptedException e) {
         Thread.currentThread().interrupt();
         throw new RuntimeException(e); // this should not happen
       }
-    }));
+    });
+  }
+
+  private void fetchDataLazily() {
+    final List<CompletableFuture<DataUtil.IteratorWithNumBytes>> futures = inputReader.read();
+    this.expectedNumOfIterators = futures.size();
+    for (int i = 0; i < futures.size(); i++) {
+      final int index = i;
+      final CompletableFuture<DataUtil.IteratorWithNumBytes> future = futures.get(i);
+      future.whenComplete((iterator, exception) -> {
+        handleIncomingBlock(index, future);
+      });
+    }
   }
 
   final long getSerializedBytes() {
diff --git a/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/ParentTaskDataFetcherTest.java b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/ParentTaskDataFetcherTest.java
index ae3b90f..ab774e9 100644
--- a/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/ParentTaskDataFetcherTest.java
+++ b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/ParentTaskDataFetcherTest.java
@@ -19,6 +19,9 @@
 package org.apache.nemo.runtime.executor.task;
 
 import org.apache.nemo.common.ir.OutputCollector;
+import org.apache.nemo.common.ir.edge.executionproperty.BlockFetchFailureProperty;
+import org.apache.nemo.common.ir.executionproperty.EdgeExecutionProperty;
+import org.apache.nemo.common.ir.executionproperty.ExecutionPropertyMap;
 import org.apache.nemo.common.ir.vertex.IRVertex;
 import org.apache.nemo.common.punctuation.Finishmark;
 import org.apache.nemo.runtime.executor.data.DataUtil;
@@ -40,6 +43,8 @@ import java.util.concurrent.Executors;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -89,17 +94,9 @@ public final class ParentTaskDataFetcherTest {
   }
 
   @Test(timeout = 5000, expected = IOException.class)
-  public void testErrorWhenRPC() throws Exception {
+  public void testErrorWhenFuture() throws Exception {
     // Failing future
-    final CompletableFuture failingFuture = CompletableFuture.runAsync(() -> {
-      try {
-        Thread.sleep(2 * 1000); // Block the fetcher for 2 seconds
-        throw new RuntimeException(); // Fail this future
-      } catch (InterruptedException e) {
-        // This shouldn't happen.
-        // We don't throw anything here, so that IOException does not occur and the test fails
-      }
-    }, Executors.newSingleThreadExecutor());
+    final CompletableFuture failingFuture = generateFailingFuture();
     final InputReader inputReader = generateInputReader(failingFuture);
 
     // Fetcher
@@ -110,6 +107,24 @@ public final class ParentTaskDataFetcherTest {
     assertTrue(failingFuture.isCompletedExceptionally());
   }
 
+  @Test(timeout = 5000)
+  public void testErrorWhenFutureWithRetry() throws Exception {
+    // Failing future
+    final CompletableFuture failingFuture = generateFailingFuture();
+    final InputReader inputReader = generateInputReader(
+      failingFuture,
+      BlockFetchFailureProperty.of(BlockFetchFailureProperty.Value.RETRY_AFTER_TWO_SECONDS_FOREVER)); // retry
+
+    final List<String> empty = new ArrayList<>(0); // empty data
+    when(inputReader.retry(anyInt()))
+      .thenReturn(generateCompletableFuture(
+        empty.iterator())); // success upon retry
+
+    // Fetcher should work on retry
+    final ParentTaskDataFetcher fetcher = createFetcher(inputReader);
+    assertEquals(Finishmark.getInstance(), fetcher.fetchDataElement());
+  }
+
   @Test(timeout = 5000, expected = IOException.class)
   public void testErrorWhenReadingData() throws Exception {
     // Failed iterator
@@ -129,13 +144,26 @@ public final class ParentTaskDataFetcherTest {
       mock(OutputCollector.class));
   }
 
-  private InputReader generateInputReader(final CompletableFuture completableFuture) {
+  private InputReader generateInputReader(final CompletableFuture completableFuture,
+                                          final EdgeExecutionProperty... properties) {
     final InputReader inputReader = mock(InputReader.class, Mockito.CALLS_REAL_METHODS);
+    when(inputReader.getSrcIrVertex()).thenReturn(mock(IRVertex.class));
     when(inputReader.read()).thenReturn(Arrays.asList(completableFuture));
+    final ExecutionPropertyMap<EdgeExecutionProperty> propertyMap = new ExecutionPropertyMap<>("");
+    for (final EdgeExecutionProperty p : properties) {
+      propertyMap.put(p);
+    }
+    when(inputReader.getProperties()).thenReturn(propertyMap);
     return inputReader;
   }
 
-  private CompletableFuture generateCompletableFuture(final Iterator iterator) {
+  private CompletableFuture generateFailingFuture() {
+    return CompletableFuture.runAsync(() -> {
+      throw new RuntimeException(); // Fail this future
+    }, Executors.newSingleThreadExecutor());
+  }
+
+  private CompletableFuture<DataUtil.IteratorWithNumBytes> generateCompletableFuture(final Iterator iterator) {
     return CompletableFuture.completedFuture(DataUtil.IteratorWithNumBytes.of(iterator));
   }
 
diff --git a/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/TaskExecutorTest.java b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/TaskExecutorTest.java
index 721af5d..9aaf455 100644
--- a/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/TaskExecutorTest.java
+++ b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/TaskExecutorTest.java
@@ -590,6 +590,7 @@ public final class TaskExecutorTest {
       srcVertex.setProperty(ParallelismProperty.of(SOURCE_PARALLELISM));
       when(inputReader.getSrcIrVertex()).thenReturn(srcVertex);
       when(inputReader.read()).thenReturn(inputFutures);
+      when(inputReader.getProperties()).thenReturn(new ExecutionPropertyMap<>(""));
       return inputReader;
     }
   }