You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nemo.apache.org by ta...@apache.org on 2019/08/11 07:08:50 UTC

[incubator-nemo] branch master updated: [NEMO-398] ExecutorRepresenter interface and DefaultExecutorRepresenter (#227)

This is an automated email from the ASF dual-hosted git repository.

taegeonum pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git


The following commit(s) were added to refs/heads/master by this push:
     new 6fe9e64  [NEMO-398] ExecutorRepresenter interface and DefaultExecutorRepresenter (#227)
6fe9e64 is described below

commit 6fe9e6403f98ba4638b772bfc8e50af6d5db6633
Author: Gao Zhiyuan <al...@gmail.com>
AuthorDate: Sun Aug 11 16:08:45 2019 +0900

    [NEMO-398] ExecutorRepresenter interface and DefaultExecutorRepresenter (#227)
    
    JIRA: [NEMO-398: ExecutorRepresenter interface and LambdaExecutorRepresenter ](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-398)
    
    **Major Changes:**
    `DefaultExecutorRepresenter` has the same function as previous `ExecutorRepresenter` does.
    Now `DefaultExecutorRepresenter` implements `ExecutorRepresenter` interface.
    
    **Tests for the changes:**
    - Not any
---
 .../runtime/master/resource/ContainerManager.java  |   3 +-
 ...senter.java => DefaultExecutorRepresenter.java} |  42 ++--
 .../master/resource/ExecutorRepresenter.java       | 218 +++------------------
 .../master/scheduler/BatchSchedulerTest.java       |   5 +-
 .../FreeSlotSchedulingConstraintTest.java          |   3 +-
 .../runtime/master/scheduler/TaskRetryTest.java    |   3 +-
 6 files changed, 65 insertions(+), 209 deletions(-)

diff --git a/runtime/master/src/main/java/org/apache/nemo/runtime/master/resource/ContainerManager.java b/runtime/master/src/main/java/org/apache/nemo/runtime/master/resource/ContainerManager.java
index 0522b02..e1c21dc 100644
--- a/runtime/master/src/main/java/org/apache/nemo/runtime/master/resource/ContainerManager.java
+++ b/runtime/master/src/main/java/org/apache/nemo/runtime/master/resource/ContainerManager.java
@@ -193,7 +193,8 @@ public final class ContainerManager {
 
     // Create the executor representation.
     final ExecutorRepresenter executorRepresenter =
-      new ExecutorRepresenter(executorId, resourceSpec, messageSender, activeContext, serializationExecutorService,
+      new DefaultExecutorRepresenter(executorId, resourceSpec, messageSender,
+        activeContext, serializationExecutorService,
         activeContext.getEvaluatorDescriptor().getNodeDescriptor().getName());
 
     requestLatchByResourceSpecId.get(resourceSpec.getResourceSpecId()).countDown();
diff --git a/runtime/master/src/main/java/org/apache/nemo/runtime/master/resource/ExecutorRepresenter.java b/runtime/master/src/main/java/org/apache/nemo/runtime/master/resource/DefaultExecutorRepresenter.java
similarity index 88%
copy from runtime/master/src/main/java/org/apache/nemo/runtime/master/resource/ExecutorRepresenter.java
copy to runtime/master/src/main/java/org/apache/nemo/runtime/master/resource/DefaultExecutorRepresenter.java
index dd7031b..04e9051 100644
--- a/runtime/master/src/main/java/org/apache/nemo/runtime/master/resource/ExecutorRepresenter.java
+++ b/runtime/master/src/main/java/org/apache/nemo/runtime/master/resource/DefaultExecutorRepresenter.java
@@ -44,17 +44,12 @@ import java.util.stream.Stream;
 /**
  * (WARNING) This class is not thread-safe, and thus should only be accessed through ExecutorRegistry.
  * <p>
- * Contains information/state regarding an executor.
- * Such information may include:
- * a) The executor's resource type.
- * b) The executor's capacity (ex. number of cores).
- * c) Tasks scheduled/launched for the executor.
- * d) Name of the physical node which hosts this executor.
- * e) (Please add other information as we implement more features).
+ * Implements ExecutorRepresenter that communicates with Executors running on traditional resources
+ * (e.g., virtual machines or cluster resources).
  */
 @NotThreadSafe
-public final class ExecutorRepresenter {
-  private static final Logger LOG = LoggerFactory.getLogger(ExecutorRepresenter.class.getName());
+public final class DefaultExecutorRepresenter implements ExecutorRepresenter {
+  private static final Logger LOG = LoggerFactory.getLogger(DefaultExecutorRepresenter.class.getName());
 
   private final String executorId;
   private final ResourceSpecification resourceSpecification;
@@ -78,12 +73,12 @@ public final class ExecutorRepresenter {
    * @param serializationExecutorService provides threads for message serialization
    * @param nodeName                     physical name of the node where this executor resides
    */
-  public ExecutorRepresenter(final String executorId,
-                             final ResourceSpecification resourceSpecification,
-                             final MessageSender<ControlMessage.Message> messageSender,
-                             final ActiveContext activeContext,
-                             final ExecutorService serializationExecutorService,
-                             final String nodeName) {
+  public DefaultExecutorRepresenter(final String executorId,
+                                    final ResourceSpecification resourceSpecification,
+                                    final MessageSender<ControlMessage.Message> messageSender,
+                                    final ActiveContext activeContext,
+                                    final ExecutorService serializationExecutorService,
+                                    final String nodeName) {
     this.executorId = executorId;
     this.resourceSpecification = resourceSpecification;
     this.messageSender = messageSender;
@@ -102,6 +97,7 @@ public final class ExecutorRepresenter {
    *
    * @return set of identifiers of tasks that were running in this executor.
    */
+  @Override
   public Set<String> onExecutorFailed() {
     failedTasks.addAll(runningComplyingTasks.values());
     failedTasks.addAll(runningNonComplyingTasks.values());
@@ -117,6 +113,7 @@ public final class ExecutorRepresenter {
    *
    * @param task the task to run
    */
+  @Override
   public void onTaskScheduled(final Task task) {
     (task.getPropertyValue(ResourceSlotProperty.class).orElse(true)
       ? runningComplyingTasks : runningNonComplyingTasks).put(task.getTaskId(), task);
@@ -144,6 +141,7 @@ public final class ExecutorRepresenter {
    *
    * @param message Message object to send
    */
+  @Override
   public void sendControlMessage(final ControlMessage.Message message) {
     messageSender.send(message);
   }
@@ -153,6 +151,7 @@ public final class ExecutorRepresenter {
    *
    * @param taskId id of the completed task
    */
+  @Override
   public void onTaskExecutionComplete(final String taskId) {
     final Task completedTask = removeFromRunningTasks(taskId);
     runningTaskToAttempt.remove(completedTask);
@@ -164,6 +163,7 @@ public final class ExecutorRepresenter {
    *
    * @param taskId id of the Task
    */
+  @Override
   public void onTaskExecutionFailed(final String taskId) {
     final Task failedTask = removeFromRunningTasks(taskId);
     runningTaskToAttempt.remove(failedTask);
@@ -173,6 +173,7 @@ public final class ExecutorRepresenter {
   /**
    * @return how many Tasks can this executor simultaneously run
    */
+  @Override
   public int getExecutorCapacity() {
     return resourceSpecification.getCapacity();
   }
@@ -180,6 +181,7 @@ public final class ExecutorRepresenter {
   /**
    * @return the current snapshot of set of Tasks that are running in this executor.
    */
+  @Override
   public Set<Task> getRunningTasks() {
     return Stream.concat(runningComplyingTasks.values().stream(),
       runningNonComplyingTasks.values().stream()).collect(Collectors.toSet());
@@ -188,6 +190,7 @@ public final class ExecutorRepresenter {
   /**
    * @return the number of running {@link Task}s.
    */
+  @Override
   public int getNumOfRunningTasks() {
     return getNumOfComplyingRunningTasks() + getNumOfNonComplyingRunningTasks();
   }
@@ -195,6 +198,7 @@ public final class ExecutorRepresenter {
   /**
    * @return the number of running {@link Task}s that complies to the executor slot restriction.
    */
+  @Override
   public int getNumOfComplyingRunningTasks() {
     return runningComplyingTasks.size();
   }
@@ -202,13 +206,14 @@ public final class ExecutorRepresenter {
   /**
    * @return the number of running {@link Task}s that does not comply to the executor slot restriction.
    */
-  public int getNumOfNonComplyingRunningTasks() {
+  private int getNumOfNonComplyingRunningTasks() {
     return runningNonComplyingTasks.size();
   }
 
   /**
    * @return the executor id
    */
+  @Override
   public String getExecutorId() {
     return executorId;
   }
@@ -216,6 +221,7 @@ public final class ExecutorRepresenter {
   /**
    * @return the container type
    */
+  @Override
   public String getContainerType() {
     return resourceSpecification.getContainerType();
   }
@@ -223,6 +229,7 @@ public final class ExecutorRepresenter {
   /**
    * @return physical name of the node where this executor resides
    */
+  @Override
   public String getNodeName() {
     return nodeName;
   }
@@ -230,6 +237,7 @@ public final class ExecutorRepresenter {
   /**
    * Shuts down this executor.
    */
+  @Override
   public void shutDown() {
     activeContext.close();
   }
@@ -257,7 +265,7 @@ public final class ExecutorRepresenter {
     } else if (runningNonComplyingTasks.containsKey(taskId)) {
       task = runningNonComplyingTasks.remove(taskId);
     } else {
-      throw new RuntimeException(String.format("Task %s not found in its ExecutorRepresenter", taskId));
+      throw new RuntimeException(String.format("Task %s not found in its DefaultExecutorRepresenter", taskId));
     }
     return task;
   }
diff --git a/runtime/master/src/main/java/org/apache/nemo/runtime/master/resource/ExecutorRepresenter.java b/runtime/master/src/main/java/org/apache/nemo/runtime/master/resource/ExecutorRepresenter.java
index dd7031b..26649a8 100644
--- a/runtime/master/src/main/java/org/apache/nemo/runtime/master/resource/ExecutorRepresenter.java
+++ b/runtime/master/src/main/java/org/apache/nemo/runtime/master/resource/ExecutorRepresenter.java
@@ -18,248 +18,94 @@
  */
 package org.apache.nemo.runtime.master.resource;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.protobuf.ByteString;
-import org.apache.commons.lang3.SerializationUtils;
-import org.apache.nemo.common.ir.vertex.executionproperty.ResourceSlotProperty;
-import org.apache.nemo.runtime.common.RuntimeIdManager;
 import org.apache.nemo.runtime.common.comm.ControlMessage;
-import org.apache.nemo.runtime.common.message.MessageEnvironment;
-import org.apache.nemo.runtime.common.message.MessageSender;
 import org.apache.nemo.runtime.common.plan.Task;
-import org.apache.reef.driver.context.ActiveContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-import javax.annotation.concurrent.NotThreadSafe;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.ExecutorService;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
 
 /**
- * (WARNING) This class is not thread-safe, and thus should only be accessed through ExecutorRegistry.
- * <p>
- * Contains information/state regarding an executor.
- * Such information may include:
+ * Contains information/state regarding an executor
+ * Such information may include
  * a) The executor's resource type.
  * b) The executor's capacity (ex. number of cores).
  * c) Tasks scheduled/launched for the executor.
  * d) Name of the physical node which hosts this executor.
  * e) (Please add other information as we implement more features).
  */
-@NotThreadSafe
-public final class ExecutorRepresenter {
-  private static final Logger LOG = LoggerFactory.getLogger(ExecutorRepresenter.class.getName());
-
-  private final String executorId;
-  private final ResourceSpecification resourceSpecification;
-  private final Map<String, Task> runningComplyingTasks;
-  private final Map<String, Task> runningNonComplyingTasks;
-  private final Map<Task, Integer> runningTaskToAttempt;
-  private final Set<Task> completeTasks;
-  private final Set<Task> failedTasks;
-  private final MessageSender<ControlMessage.Message> messageSender;
-  private final ActiveContext activeContext;
-  private final ExecutorService serializationExecutorService;
-  private final String nodeName;
-
-  /**
-   * Creates a reference to the specified executor.
-   *
-   * @param executorId                   the executor id
-   * @param resourceSpecification        specification for the executor
-   * @param messageSender                provides communication context for this executor
-   * @param activeContext                context on the corresponding REEF evaluator
-   * @param serializationExecutorService provides threads for message serialization
-   * @param nodeName                     physical name of the node where this executor resides
-   */
-  public ExecutorRepresenter(final String executorId,
-                             final ResourceSpecification resourceSpecification,
-                             final MessageSender<ControlMessage.Message> messageSender,
-                             final ActiveContext activeContext,
-                             final ExecutorService serializationExecutorService,
-                             final String nodeName) {
-    this.executorId = executorId;
-    this.resourceSpecification = resourceSpecification;
-    this.messageSender = messageSender;
-    this.runningComplyingTasks = new HashMap<>();
-    this.runningNonComplyingTasks = new HashMap<>();
-    this.runningTaskToAttempt = new HashMap<>();
-    this.completeTasks = new HashSet<>();
-    this.failedTasks = new HashSet<>();
-    this.activeContext = activeContext;
-    this.serializationExecutorService = serializationExecutorService;
-    this.nodeName = nodeName;
-  }
+public interface ExecutorRepresenter {
 
   /**
    * Marks all Tasks which were running in this executor as failed.
    *
    * @return set of identifiers of tasks that were running in this executor.
    */
-  public Set<String> onExecutorFailed() {
-    failedTasks.addAll(runningComplyingTasks.values());
-    failedTasks.addAll(runningNonComplyingTasks.values());
-    final Set<String> taskIds = Stream.concat(runningComplyingTasks.keySet().stream(),
-      runningNonComplyingTasks.keySet().stream()).collect(Collectors.toSet());
-    runningComplyingTasks.clear();
-    runningNonComplyingTasks.clear();
-    return taskIds;
-  }
-
-  /**
-   * Marks the Task as running, and sends scheduling message to the executor.
-   *
-   * @param task the task to run
-   */
-  public void onTaskScheduled(final Task task) {
-    (task.getPropertyValue(ResourceSlotProperty.class).orElse(true)
-      ? runningComplyingTasks : runningNonComplyingTasks).put(task.getTaskId(), task);
-    runningTaskToAttempt.put(task, task.getAttemptIdx());
-    failedTasks.remove(task);
-
-
-    serializationExecutorService.execute(() -> {
-      final byte[] serialized = SerializationUtils.serialize(task);
-      sendControlMessage(
-        ControlMessage.Message.newBuilder()
-          .setId(RuntimeIdManager.generateMessageId())
-          .setListenerId(MessageEnvironment.EXECUTOR_MESSAGE_LISTENER_ID)
-          .setType(ControlMessage.MessageType.ScheduleTask)
-          .setScheduleTaskMsg(
-            ControlMessage.ScheduleTaskMsg.newBuilder()
-              .setTask(ByteString.copyFrom(serialized))
-              .build())
-          .build());
-    });
-  }
+  Set<String> onExecutorFailed();
 
   /**
-   * Sends control message to the executor.
-   *
-   * @param message Message object to send
+   * @return how many Tasks can this executor simultaneously run
    */
-  public void sendControlMessage(final ControlMessage.Message message) {
-    messageSender.send(message);
-  }
+  int getExecutorCapacity();
 
   /**
-   * Marks the specified Task as completed.
-   *
-   * @param taskId id of the completed task
+   * @return the current snapshot of set of Tasks that are running in this executor.
    */
-  public void onTaskExecutionComplete(final String taskId) {
-    final Task completedTask = removeFromRunningTasks(taskId);
-    runningTaskToAttempt.remove(completedTask);
-    completeTasks.add(completedTask);
-  }
+  Set<Task> getRunningTasks();
 
   /**
-   * Marks the specified Task as failed.
-   *
-   * @param taskId id of the Task
+   * @return the number of running {@link Task}s.
    */
-  public void onTaskExecutionFailed(final String taskId) {
-    final Task failedTask = removeFromRunningTasks(taskId);
-    runningTaskToAttempt.remove(failedTask);
-    failedTasks.add(failedTask);
-  }
+  int getNumOfRunningTasks();
 
   /**
-   * @return how many Tasks can this executor simultaneously run
+   * @return the number of running {@link Task}s that complies to the executor slot restriction.
    */
-  public int getExecutorCapacity() {
-    return resourceSpecification.getCapacity();
-  }
+  int getNumOfComplyingRunningTasks();
 
   /**
-   * @return the current snapshot of set of Tasks that are running in this executor.
+   * Marks the Task as running, and sends scheduling message to the executor.
+   *
+   * @param task the task to run
    */
-  public Set<Task> getRunningTasks() {
-    return Stream.concat(runningComplyingTasks.values().stream(),
-      runningNonComplyingTasks.values().stream()).collect(Collectors.toSet());
-  }
+   void onTaskScheduled(Task task);
 
   /**
-   * @return the number of running {@link Task}s.
+   * Sends control message to the executor.
+   *
+   * @param message Message object to send
    */
-  public int getNumOfRunningTasks() {
-    return getNumOfComplyingRunningTasks() + getNumOfNonComplyingRunningTasks();
-  }
+   void sendControlMessage(ControlMessage.Message message);
 
   /**
-   * @return the number of running {@link Task}s that complies to the executor slot restriction.
+   * Marks the specified Task as completed.
+   *
+   * @param taskId id of the completed task
    */
-  public int getNumOfComplyingRunningTasks() {
-    return runningComplyingTasks.size();
-  }
+  void onTaskExecutionComplete(String taskId);
 
   /**
-   * @return the number of running {@link Task}s that does not comply to the executor slot restriction.
+   * @return physical name of the node where this executor resides
    */
-  public int getNumOfNonComplyingRunningTasks() {
-    return runningNonComplyingTasks.size();
-  }
+  String getNodeName();
 
   /**
    * @return the executor id
    */
-  public String getExecutorId() {
-    return executorId;
-  }
+  String getExecutorId();
 
   /**
    * @return the container type
    */
-  public String getContainerType() {
-    return resourceSpecification.getContainerType();
-  }
-
-  /**
-   * @return physical name of the node where this executor resides
-   */
-  public String getNodeName() {
-    return nodeName;
-  }
+  String getContainerType();
 
   /**
    * Shuts down this executor.
    */
-  public void shutDown() {
-    activeContext.close();
-  }
-
-  @Override
-  public String toString() {
-    final ObjectMapper mapper = new ObjectMapper();
-    final ObjectNode node = mapper.createObjectNode();
-    node.put("executorId", executorId);
-    node.put("runningTasks", getRunningTasks().toString());
-    node.put("failedTasks", failedTasks.toString());
-    return node.toString();
-  }
+  void shutDown();
 
   /**
-   * Removes the specified {@link Task} from the map of running tasks.
+   * Marks the specified Task as failed.
    *
-   * @param taskId id of the task to remove
-   * @return the removed {@link Task}
+   * @param taskId id of the Task
    */
-  private Task removeFromRunningTasks(final String taskId) {
-    final Task task;
-    if (runningComplyingTasks.containsKey(taskId)) {
-      task = runningComplyingTasks.remove(taskId);
-    } else if (runningNonComplyingTasks.containsKey(taskId)) {
-      task = runningNonComplyingTasks.remove(taskId);
-    } else {
-      throw new RuntimeException(String.format("Task %s not found in its ExecutorRepresenter", taskId));
-    }
-    return task;
-  }
+  void onTaskExecutionFailed(String taskId);
 }
-
diff --git a/runtime/master/src/test/java/org/apache/nemo/runtime/master/scheduler/BatchSchedulerTest.java b/runtime/master/src/test/java/org/apache/nemo/runtime/master/scheduler/BatchSchedulerTest.java
index 85db0ea..0384932 100644
--- a/runtime/master/src/test/java/org/apache/nemo/runtime/master/scheduler/BatchSchedulerTest.java
+++ b/runtime/master/src/test/java/org/apache/nemo/runtime/master/scheduler/BatchSchedulerTest.java
@@ -28,6 +28,7 @@ import org.apache.nemo.runtime.common.plan.*;
 import org.apache.nemo.runtime.master.BlockManagerMaster;
 import org.apache.nemo.runtime.master.PlanStateManager;
 import org.apache.nemo.runtime.master.metric.MetricMessageHandler;
+import org.apache.nemo.runtime.master.resource.DefaultExecutorRepresenter;
 import org.apache.nemo.runtime.master.resource.ExecutorRepresenter;
 import org.apache.nemo.runtime.master.resource.ResourceSpecification;
 import org.apache.reef.driver.context.ActiveContext;
@@ -92,7 +93,7 @@ public final class BatchSchedulerTest {
     final ResourceSpecification computeSpec =
       new ResourceSpecification(ResourcePriorityProperty.COMPUTE, EXECUTOR_CAPACITY, 0);
     final Function<String, ExecutorRepresenter> computeSpecExecutorRepresenterGenerator = executorId ->
-      new ExecutorRepresenter(executorId, computeSpec, mockMsgSender, activeContext, serializationExecutorService,
+      new DefaultExecutorRepresenter(executorId, computeSpec, mockMsgSender, activeContext, serializationExecutorService,
         executorId);
     final ExecutorRepresenter a3 = computeSpecExecutorRepresenterGenerator.apply("a3");
     final ExecutorRepresenter a2 = computeSpecExecutorRepresenterGenerator.apply("a2");
@@ -101,7 +102,7 @@ public final class BatchSchedulerTest {
     final ResourceSpecification storageSpec =
       new ResourceSpecification(ResourcePriorityProperty.TRANSIENT, EXECUTOR_CAPACITY, 0);
     final Function<String, ExecutorRepresenter> storageSpecExecutorRepresenterGenerator = executorId ->
-      new ExecutorRepresenter(executorId, storageSpec, mockMsgSender, activeContext, serializationExecutorService,
+      new DefaultExecutorRepresenter(executorId, storageSpec, mockMsgSender, activeContext, serializationExecutorService,
         executorId);
     final ExecutorRepresenter b2 = storageSpecExecutorRepresenterGenerator.apply("b2");
     final ExecutorRepresenter b1 = storageSpecExecutorRepresenterGenerator.apply("b1");
diff --git a/runtime/master/src/test/java/org/apache/nemo/runtime/master/scheduler/FreeSlotSchedulingConstraintTest.java b/runtime/master/src/test/java/org/apache/nemo/runtime/master/scheduler/FreeSlotSchedulingConstraintTest.java
index f6793a9..f89fd92 100644
--- a/runtime/master/src/test/java/org/apache/nemo/runtime/master/scheduler/FreeSlotSchedulingConstraintTest.java
+++ b/runtime/master/src/test/java/org/apache/nemo/runtime/master/scheduler/FreeSlotSchedulingConstraintTest.java
@@ -59,8 +59,7 @@ public final class FreeSlotSchedulingConstraintTest {
    * @param capacity          the capacity of the executor.
    * @return the mocked executor.
    */
-  private static ExecutorRepresenter mockExecutorRepresenter(final int numComplyingTasks,
-                                                             final int capacity) {
+  private static ExecutorRepresenter mockExecutorRepresenter(final int numComplyingTasks, final int capacity) {
     final ExecutorRepresenter executorRepresenter = mock(ExecutorRepresenter.class);
     when(executorRepresenter.getNumOfComplyingRunningTasks()).thenReturn(numComplyingTasks);
     when(executorRepresenter.getExecutorCapacity()).thenReturn(capacity);
diff --git a/runtime/master/src/test/java/org/apache/nemo/runtime/master/scheduler/TaskRetryTest.java b/runtime/master/src/test/java/org/apache/nemo/runtime/master/scheduler/TaskRetryTest.java
index bedf7d6..2ee2d2d 100644
--- a/runtime/master/src/test/java/org/apache/nemo/runtime/master/scheduler/TaskRetryTest.java
+++ b/runtime/master/src/test/java/org/apache/nemo/runtime/master/scheduler/TaskRetryTest.java
@@ -35,6 +35,7 @@ import org.apache.nemo.runtime.common.state.TaskState;
 import org.apache.nemo.runtime.master.BlockManagerMaster;
 import org.apache.nemo.runtime.master.PlanStateManager;
 import org.apache.nemo.runtime.master.metric.MetricMessageHandler;
+import org.apache.nemo.runtime.master.resource.DefaultExecutorRepresenter;
 import org.apache.nemo.runtime.master.resource.ExecutorRepresenter;
 import org.apache.nemo.runtime.master.resource.ResourceSpecification;
 import org.apache.reef.driver.context.ActiveContext;
@@ -164,7 +165,7 @@ public final class TaskRetryTest {
     Mockito.doThrow(new RuntimeException()).when(activeContext).close();
     final ExecutorService serExecutorService = Executors.newSingleThreadExecutor();
     final ResourceSpecification computeSpec = new ResourceSpecification(ResourcePriorityProperty.COMPUTE, 2, 0);
-    final ExecutorRepresenter executor = new ExecutorRepresenter("EXECUTOR" + ID_OFFSET.getAndIncrement(),
+    final ExecutorRepresenter executor = new DefaultExecutorRepresenter("EXECUTOR" + ID_OFFSET.getAndIncrement(),
       computeSpec, mockMsgSender, activeContext, serExecutorService, "NODE" + ID_OFFSET.getAndIncrement());
     scheduler.onExecutorAdded(executor);
   }