You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nemo.apache.org by ta...@apache.org on 2019/08/11 07:08:50 UTC
[incubator-nemo] branch master updated: [NEMO-398]
ExecutorRepresenter interface and DefaultExecutorRepresenter (#227)
This is an automated email from the ASF dual-hosted git repository.
taegeonum pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git
The following commit(s) were added to refs/heads/master by this push:
new 6fe9e64 [NEMO-398] ExecutorRepresenter interface and DefaultExecutorRepresenter (#227)
6fe9e64 is described below
commit 6fe9e6403f98ba4638b772bfc8e50af6d5db6633
Author: Gao Zhiyuan <al...@gmail.com>
AuthorDate: Sun Aug 11 16:08:45 2019 +0900
[NEMO-398] ExecutorRepresenter interface and DefaultExecutorRepresenter (#227)
JIRA: [NEMO-398: ExecutorRepresenter interface and LambdaExecutorRepresenter ](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-398)
**Major Changes:**
`DefaultExecutorRepresenter` has the same function as previous `ExecutorRepresenter` does.
Now `DefaultExecutorRepresenter` implements `ExecutorRepresenter` interface.
**Tests for the changes:**
- Not any
---
.../runtime/master/resource/ContainerManager.java | 3 +-
...senter.java => DefaultExecutorRepresenter.java} | 42 ++--
.../master/resource/ExecutorRepresenter.java | 218 +++------------------
.../master/scheduler/BatchSchedulerTest.java | 5 +-
.../FreeSlotSchedulingConstraintTest.java | 3 +-
.../runtime/master/scheduler/TaskRetryTest.java | 3 +-
6 files changed, 65 insertions(+), 209 deletions(-)
diff --git a/runtime/master/src/main/java/org/apache/nemo/runtime/master/resource/ContainerManager.java b/runtime/master/src/main/java/org/apache/nemo/runtime/master/resource/ContainerManager.java
index 0522b02..e1c21dc 100644
--- a/runtime/master/src/main/java/org/apache/nemo/runtime/master/resource/ContainerManager.java
+++ b/runtime/master/src/main/java/org/apache/nemo/runtime/master/resource/ContainerManager.java
@@ -193,7 +193,8 @@ public final class ContainerManager {
// Create the executor representation.
final ExecutorRepresenter executorRepresenter =
- new ExecutorRepresenter(executorId, resourceSpec, messageSender, activeContext, serializationExecutorService,
+ new DefaultExecutorRepresenter(executorId, resourceSpec, messageSender,
+ activeContext, serializationExecutorService,
activeContext.getEvaluatorDescriptor().getNodeDescriptor().getName());
requestLatchByResourceSpecId.get(resourceSpec.getResourceSpecId()).countDown();
diff --git a/runtime/master/src/main/java/org/apache/nemo/runtime/master/resource/ExecutorRepresenter.java b/runtime/master/src/main/java/org/apache/nemo/runtime/master/resource/DefaultExecutorRepresenter.java
similarity index 88%
copy from runtime/master/src/main/java/org/apache/nemo/runtime/master/resource/ExecutorRepresenter.java
copy to runtime/master/src/main/java/org/apache/nemo/runtime/master/resource/DefaultExecutorRepresenter.java
index dd7031b..04e9051 100644
--- a/runtime/master/src/main/java/org/apache/nemo/runtime/master/resource/ExecutorRepresenter.java
+++ b/runtime/master/src/main/java/org/apache/nemo/runtime/master/resource/DefaultExecutorRepresenter.java
@@ -44,17 +44,12 @@ import java.util.stream.Stream;
/**
* (WARNING) This class is not thread-safe, and thus should only be accessed through ExecutorRegistry.
* <p>
- * Contains information/state regarding an executor.
- * Such information may include:
- * a) The executor's resource type.
- * b) The executor's capacity (ex. number of cores).
- * c) Tasks scheduled/launched for the executor.
- * d) Name of the physical node which hosts this executor.
- * e) (Please add other information as we implement more features).
+ * Implements ExecutorRepresenter that communicates with Executors running on traditional resources
+ * (e.g., virtual machines or cluster resources).
*/
@NotThreadSafe
-public final class ExecutorRepresenter {
- private static final Logger LOG = LoggerFactory.getLogger(ExecutorRepresenter.class.getName());
+public final class DefaultExecutorRepresenter implements ExecutorRepresenter {
+ private static final Logger LOG = LoggerFactory.getLogger(DefaultExecutorRepresenter.class.getName());
private final String executorId;
private final ResourceSpecification resourceSpecification;
@@ -78,12 +73,12 @@ public final class ExecutorRepresenter {
* @param serializationExecutorService provides threads for message serialization
* @param nodeName physical name of the node where this executor resides
*/
- public ExecutorRepresenter(final String executorId,
- final ResourceSpecification resourceSpecification,
- final MessageSender<ControlMessage.Message> messageSender,
- final ActiveContext activeContext,
- final ExecutorService serializationExecutorService,
- final String nodeName) {
+ public DefaultExecutorRepresenter(final String executorId,
+ final ResourceSpecification resourceSpecification,
+ final MessageSender<ControlMessage.Message> messageSender,
+ final ActiveContext activeContext,
+ final ExecutorService serializationExecutorService,
+ final String nodeName) {
this.executorId = executorId;
this.resourceSpecification = resourceSpecification;
this.messageSender = messageSender;
@@ -102,6 +97,7 @@ public final class ExecutorRepresenter {
*
* @return set of identifiers of tasks that were running in this executor.
*/
+ @Override
public Set<String> onExecutorFailed() {
failedTasks.addAll(runningComplyingTasks.values());
failedTasks.addAll(runningNonComplyingTasks.values());
@@ -117,6 +113,7 @@ public final class ExecutorRepresenter {
*
* @param task the task to run
*/
+ @Override
public void onTaskScheduled(final Task task) {
(task.getPropertyValue(ResourceSlotProperty.class).orElse(true)
? runningComplyingTasks : runningNonComplyingTasks).put(task.getTaskId(), task);
@@ -144,6 +141,7 @@ public final class ExecutorRepresenter {
*
* @param message Message object to send
*/
+ @Override
public void sendControlMessage(final ControlMessage.Message message) {
messageSender.send(message);
}
@@ -153,6 +151,7 @@ public final class ExecutorRepresenter {
*
* @param taskId id of the completed task
*/
+ @Override
public void onTaskExecutionComplete(final String taskId) {
final Task completedTask = removeFromRunningTasks(taskId);
runningTaskToAttempt.remove(completedTask);
@@ -164,6 +163,7 @@ public final class ExecutorRepresenter {
*
* @param taskId id of the Task
*/
+ @Override
public void onTaskExecutionFailed(final String taskId) {
final Task failedTask = removeFromRunningTasks(taskId);
runningTaskToAttempt.remove(failedTask);
@@ -173,6 +173,7 @@ public final class ExecutorRepresenter {
/**
* @return how many Tasks can this executor simultaneously run
*/
+ @Override
public int getExecutorCapacity() {
return resourceSpecification.getCapacity();
}
@@ -180,6 +181,7 @@ public final class ExecutorRepresenter {
/**
* @return the current snapshot of set of Tasks that are running in this executor.
*/
+ @Override
public Set<Task> getRunningTasks() {
return Stream.concat(runningComplyingTasks.values().stream(),
runningNonComplyingTasks.values().stream()).collect(Collectors.toSet());
@@ -188,6 +190,7 @@ public final class ExecutorRepresenter {
/**
* @return the number of running {@link Task}s.
*/
+ @Override
public int getNumOfRunningTasks() {
return getNumOfComplyingRunningTasks() + getNumOfNonComplyingRunningTasks();
}
@@ -195,6 +198,7 @@ public final class ExecutorRepresenter {
/**
* @return the number of running {@link Task}s that complies to the executor slot restriction.
*/
+ @Override
public int getNumOfComplyingRunningTasks() {
return runningComplyingTasks.size();
}
@@ -202,13 +206,14 @@ public final class ExecutorRepresenter {
/**
* @return the number of running {@link Task}s that does not comply to the executor slot restriction.
*/
- public int getNumOfNonComplyingRunningTasks() {
+ private int getNumOfNonComplyingRunningTasks() {
return runningNonComplyingTasks.size();
}
/**
* @return the executor id
*/
+ @Override
public String getExecutorId() {
return executorId;
}
@@ -216,6 +221,7 @@ public final class ExecutorRepresenter {
/**
* @return the container type
*/
+ @Override
public String getContainerType() {
return resourceSpecification.getContainerType();
}
@@ -223,6 +229,7 @@ public final class ExecutorRepresenter {
/**
* @return physical name of the node where this executor resides
*/
+ @Override
public String getNodeName() {
return nodeName;
}
@@ -230,6 +237,7 @@ public final class ExecutorRepresenter {
/**
* Shuts down this executor.
*/
+ @Override
public void shutDown() {
activeContext.close();
}
@@ -257,7 +265,7 @@ public final class ExecutorRepresenter {
} else if (runningNonComplyingTasks.containsKey(taskId)) {
task = runningNonComplyingTasks.remove(taskId);
} else {
- throw new RuntimeException(String.format("Task %s not found in its ExecutorRepresenter", taskId));
+ throw new RuntimeException(String.format("Task %s not found in its DefaultExecutorRepresenter", taskId));
}
return task;
}
diff --git a/runtime/master/src/main/java/org/apache/nemo/runtime/master/resource/ExecutorRepresenter.java b/runtime/master/src/main/java/org/apache/nemo/runtime/master/resource/ExecutorRepresenter.java
index dd7031b..26649a8 100644
--- a/runtime/master/src/main/java/org/apache/nemo/runtime/master/resource/ExecutorRepresenter.java
+++ b/runtime/master/src/main/java/org/apache/nemo/runtime/master/resource/ExecutorRepresenter.java
@@ -18,248 +18,94 @@
*/
package org.apache.nemo.runtime.master.resource;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.protobuf.ByteString;
-import org.apache.commons.lang3.SerializationUtils;
-import org.apache.nemo.common.ir.vertex.executionproperty.ResourceSlotProperty;
-import org.apache.nemo.runtime.common.RuntimeIdManager;
import org.apache.nemo.runtime.common.comm.ControlMessage;
-import org.apache.nemo.runtime.common.message.MessageEnvironment;
-import org.apache.nemo.runtime.common.message.MessageSender;
import org.apache.nemo.runtime.common.plan.Task;
-import org.apache.reef.driver.context.ActiveContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import javax.annotation.concurrent.NotThreadSafe;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
import java.util.Set;
-import java.util.concurrent.ExecutorService;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
/**
- * (WARNING) This class is not thread-safe, and thus should only be accessed through ExecutorRegistry.
- * <p>
- * Contains information/state regarding an executor.
- * Such information may include:
+ * Contains information/state regarding an executor
+ * Such information may include
* a) The executor's resource type.
* b) The executor's capacity (ex. number of cores).
* c) Tasks scheduled/launched for the executor.
* d) Name of the physical node which hosts this executor.
* e) (Please add other information as we implement more features).
*/
-@NotThreadSafe
-public final class ExecutorRepresenter {
- private static final Logger LOG = LoggerFactory.getLogger(ExecutorRepresenter.class.getName());
-
- private final String executorId;
- private final ResourceSpecification resourceSpecification;
- private final Map<String, Task> runningComplyingTasks;
- private final Map<String, Task> runningNonComplyingTasks;
- private final Map<Task, Integer> runningTaskToAttempt;
- private final Set<Task> completeTasks;
- private final Set<Task> failedTasks;
- private final MessageSender<ControlMessage.Message> messageSender;
- private final ActiveContext activeContext;
- private final ExecutorService serializationExecutorService;
- private final String nodeName;
-
- /**
- * Creates a reference to the specified executor.
- *
- * @param executorId the executor id
- * @param resourceSpecification specification for the executor
- * @param messageSender provides communication context for this executor
- * @param activeContext context on the corresponding REEF evaluator
- * @param serializationExecutorService provides threads for message serialization
- * @param nodeName physical name of the node where this executor resides
- */
- public ExecutorRepresenter(final String executorId,
- final ResourceSpecification resourceSpecification,
- final MessageSender<ControlMessage.Message> messageSender,
- final ActiveContext activeContext,
- final ExecutorService serializationExecutorService,
- final String nodeName) {
- this.executorId = executorId;
- this.resourceSpecification = resourceSpecification;
- this.messageSender = messageSender;
- this.runningComplyingTasks = new HashMap<>();
- this.runningNonComplyingTasks = new HashMap<>();
- this.runningTaskToAttempt = new HashMap<>();
- this.completeTasks = new HashSet<>();
- this.failedTasks = new HashSet<>();
- this.activeContext = activeContext;
- this.serializationExecutorService = serializationExecutorService;
- this.nodeName = nodeName;
- }
+public interface ExecutorRepresenter {
/**
* Marks all Tasks which were running in this executor as failed.
*
* @return set of identifiers of tasks that were running in this executor.
*/
- public Set<String> onExecutorFailed() {
- failedTasks.addAll(runningComplyingTasks.values());
- failedTasks.addAll(runningNonComplyingTasks.values());
- final Set<String> taskIds = Stream.concat(runningComplyingTasks.keySet().stream(),
- runningNonComplyingTasks.keySet().stream()).collect(Collectors.toSet());
- runningComplyingTasks.clear();
- runningNonComplyingTasks.clear();
- return taskIds;
- }
-
- /**
- * Marks the Task as running, and sends scheduling message to the executor.
- *
- * @param task the task to run
- */
- public void onTaskScheduled(final Task task) {
- (task.getPropertyValue(ResourceSlotProperty.class).orElse(true)
- ? runningComplyingTasks : runningNonComplyingTasks).put(task.getTaskId(), task);
- runningTaskToAttempt.put(task, task.getAttemptIdx());
- failedTasks.remove(task);
-
-
- serializationExecutorService.execute(() -> {
- final byte[] serialized = SerializationUtils.serialize(task);
- sendControlMessage(
- ControlMessage.Message.newBuilder()
- .setId(RuntimeIdManager.generateMessageId())
- .setListenerId(MessageEnvironment.EXECUTOR_MESSAGE_LISTENER_ID)
- .setType(ControlMessage.MessageType.ScheduleTask)
- .setScheduleTaskMsg(
- ControlMessage.ScheduleTaskMsg.newBuilder()
- .setTask(ByteString.copyFrom(serialized))
- .build())
- .build());
- });
- }
+ Set<String> onExecutorFailed();
/**
- * Sends control message to the executor.
- *
- * @param message Message object to send
+ * @return how many Tasks can this executor simultaneously run
*/
- public void sendControlMessage(final ControlMessage.Message message) {
- messageSender.send(message);
- }
+ int getExecutorCapacity();
/**
- * Marks the specified Task as completed.
- *
- * @param taskId id of the completed task
+ * @return the current snapshot of set of Tasks that are running in this executor.
*/
- public void onTaskExecutionComplete(final String taskId) {
- final Task completedTask = removeFromRunningTasks(taskId);
- runningTaskToAttempt.remove(completedTask);
- completeTasks.add(completedTask);
- }
+ Set<Task> getRunningTasks();
/**
- * Marks the specified Task as failed.
- *
- * @param taskId id of the Task
+ * @return the number of running {@link Task}s.
*/
- public void onTaskExecutionFailed(final String taskId) {
- final Task failedTask = removeFromRunningTasks(taskId);
- runningTaskToAttempt.remove(failedTask);
- failedTasks.add(failedTask);
- }
+ int getNumOfRunningTasks();
/**
- * @return how many Tasks can this executor simultaneously run
+ * @return the number of running {@link Task}s that complies to the executor slot restriction.
*/
- public int getExecutorCapacity() {
- return resourceSpecification.getCapacity();
- }
+ int getNumOfComplyingRunningTasks();
/**
- * @return the current snapshot of set of Tasks that are running in this executor.
+ * Marks the Task as running, and sends scheduling message to the executor.
+ *
+ * @param task the task to run
*/
- public Set<Task> getRunningTasks() {
- return Stream.concat(runningComplyingTasks.values().stream(),
- runningNonComplyingTasks.values().stream()).collect(Collectors.toSet());
- }
+ void onTaskScheduled(Task task);
/**
- * @return the number of running {@link Task}s.
+ * Sends control message to the executor.
+ *
+ * @param message Message object to send
*/
- public int getNumOfRunningTasks() {
- return getNumOfComplyingRunningTasks() + getNumOfNonComplyingRunningTasks();
- }
+ void sendControlMessage(ControlMessage.Message message);
/**
- * @return the number of running {@link Task}s that complies to the executor slot restriction.
+ * Marks the specified Task as completed.
+ *
+ * @param taskId id of the completed task
*/
- public int getNumOfComplyingRunningTasks() {
- return runningComplyingTasks.size();
- }
+ void onTaskExecutionComplete(String taskId);
/**
- * @return the number of running {@link Task}s that does not comply to the executor slot restriction.
+ * @return physical name of the node where this executor resides
*/
- public int getNumOfNonComplyingRunningTasks() {
- return runningNonComplyingTasks.size();
- }
+ String getNodeName();
/**
* @return the executor id
*/
- public String getExecutorId() {
- return executorId;
- }
+ String getExecutorId();
/**
* @return the container type
*/
- public String getContainerType() {
- return resourceSpecification.getContainerType();
- }
-
- /**
- * @return physical name of the node where this executor resides
- */
- public String getNodeName() {
- return nodeName;
- }
+ String getContainerType();
/**
* Shuts down this executor.
*/
- public void shutDown() {
- activeContext.close();
- }
-
- @Override
- public String toString() {
- final ObjectMapper mapper = new ObjectMapper();
- final ObjectNode node = mapper.createObjectNode();
- node.put("executorId", executorId);
- node.put("runningTasks", getRunningTasks().toString());
- node.put("failedTasks", failedTasks.toString());
- return node.toString();
- }
+ void shutDown();
/**
- * Removes the specified {@link Task} from the map of running tasks.
+ * Marks the specified Task as failed.
*
- * @param taskId id of the task to remove
- * @return the removed {@link Task}
+ * @param taskId id of the Task
*/
- private Task removeFromRunningTasks(final String taskId) {
- final Task task;
- if (runningComplyingTasks.containsKey(taskId)) {
- task = runningComplyingTasks.remove(taskId);
- } else if (runningNonComplyingTasks.containsKey(taskId)) {
- task = runningNonComplyingTasks.remove(taskId);
- } else {
- throw new RuntimeException(String.format("Task %s not found in its ExecutorRepresenter", taskId));
- }
- return task;
- }
+ void onTaskExecutionFailed(String taskId);
}
-
diff --git a/runtime/master/src/test/java/org/apache/nemo/runtime/master/scheduler/BatchSchedulerTest.java b/runtime/master/src/test/java/org/apache/nemo/runtime/master/scheduler/BatchSchedulerTest.java
index 85db0ea..0384932 100644
--- a/runtime/master/src/test/java/org/apache/nemo/runtime/master/scheduler/BatchSchedulerTest.java
+++ b/runtime/master/src/test/java/org/apache/nemo/runtime/master/scheduler/BatchSchedulerTest.java
@@ -28,6 +28,7 @@ import org.apache.nemo.runtime.common.plan.*;
import org.apache.nemo.runtime.master.BlockManagerMaster;
import org.apache.nemo.runtime.master.PlanStateManager;
import org.apache.nemo.runtime.master.metric.MetricMessageHandler;
+import org.apache.nemo.runtime.master.resource.DefaultExecutorRepresenter;
import org.apache.nemo.runtime.master.resource.ExecutorRepresenter;
import org.apache.nemo.runtime.master.resource.ResourceSpecification;
import org.apache.reef.driver.context.ActiveContext;
@@ -92,7 +93,7 @@ public final class BatchSchedulerTest {
final ResourceSpecification computeSpec =
new ResourceSpecification(ResourcePriorityProperty.COMPUTE, EXECUTOR_CAPACITY, 0);
final Function<String, ExecutorRepresenter> computeSpecExecutorRepresenterGenerator = executorId ->
- new ExecutorRepresenter(executorId, computeSpec, mockMsgSender, activeContext, serializationExecutorService,
+ new DefaultExecutorRepresenter(executorId, computeSpec, mockMsgSender, activeContext, serializationExecutorService,
executorId);
final ExecutorRepresenter a3 = computeSpecExecutorRepresenterGenerator.apply("a3");
final ExecutorRepresenter a2 = computeSpecExecutorRepresenterGenerator.apply("a2");
@@ -101,7 +102,7 @@ public final class BatchSchedulerTest {
final ResourceSpecification storageSpec =
new ResourceSpecification(ResourcePriorityProperty.TRANSIENT, EXECUTOR_CAPACITY, 0);
final Function<String, ExecutorRepresenter> storageSpecExecutorRepresenterGenerator = executorId ->
- new ExecutorRepresenter(executorId, storageSpec, mockMsgSender, activeContext, serializationExecutorService,
+ new DefaultExecutorRepresenter(executorId, storageSpec, mockMsgSender, activeContext, serializationExecutorService,
executorId);
final ExecutorRepresenter b2 = storageSpecExecutorRepresenterGenerator.apply("b2");
final ExecutorRepresenter b1 = storageSpecExecutorRepresenterGenerator.apply("b1");
diff --git a/runtime/master/src/test/java/org/apache/nemo/runtime/master/scheduler/FreeSlotSchedulingConstraintTest.java b/runtime/master/src/test/java/org/apache/nemo/runtime/master/scheduler/FreeSlotSchedulingConstraintTest.java
index f6793a9..f89fd92 100644
--- a/runtime/master/src/test/java/org/apache/nemo/runtime/master/scheduler/FreeSlotSchedulingConstraintTest.java
+++ b/runtime/master/src/test/java/org/apache/nemo/runtime/master/scheduler/FreeSlotSchedulingConstraintTest.java
@@ -59,8 +59,7 @@ public final class FreeSlotSchedulingConstraintTest {
* @param capacity the capacity of the executor.
* @return the mocked executor.
*/
- private static ExecutorRepresenter mockExecutorRepresenter(final int numComplyingTasks,
- final int capacity) {
+ private static ExecutorRepresenter mockExecutorRepresenter(final int numComplyingTasks, final int capacity) {
final ExecutorRepresenter executorRepresenter = mock(ExecutorRepresenter.class);
when(executorRepresenter.getNumOfComplyingRunningTasks()).thenReturn(numComplyingTasks);
when(executorRepresenter.getExecutorCapacity()).thenReturn(capacity);
diff --git a/runtime/master/src/test/java/org/apache/nemo/runtime/master/scheduler/TaskRetryTest.java b/runtime/master/src/test/java/org/apache/nemo/runtime/master/scheduler/TaskRetryTest.java
index bedf7d6..2ee2d2d 100644
--- a/runtime/master/src/test/java/org/apache/nemo/runtime/master/scheduler/TaskRetryTest.java
+++ b/runtime/master/src/test/java/org/apache/nemo/runtime/master/scheduler/TaskRetryTest.java
@@ -35,6 +35,7 @@ import org.apache.nemo.runtime.common.state.TaskState;
import org.apache.nemo.runtime.master.BlockManagerMaster;
import org.apache.nemo.runtime.master.PlanStateManager;
import org.apache.nemo.runtime.master.metric.MetricMessageHandler;
+import org.apache.nemo.runtime.master.resource.DefaultExecutorRepresenter;
import org.apache.nemo.runtime.master.resource.ExecutorRepresenter;
import org.apache.nemo.runtime.master.resource.ResourceSpecification;
import org.apache.reef.driver.context.ActiveContext;
@@ -164,7 +165,7 @@ public final class TaskRetryTest {
Mockito.doThrow(new RuntimeException()).when(activeContext).close();
final ExecutorService serExecutorService = Executors.newSingleThreadExecutor();
final ResourceSpecification computeSpec = new ResourceSpecification(ResourcePriorityProperty.COMPUTE, 2, 0);
- final ExecutorRepresenter executor = new ExecutorRepresenter("EXECUTOR" + ID_OFFSET.getAndIncrement(),
+ final ExecutorRepresenter executor = new DefaultExecutorRepresenter("EXECUTOR" + ID_OFFSET.getAndIncrement(),
computeSpec, mockMsgSender, activeContext, serExecutorService, "NODE" + ID_OFFSET.getAndIncrement());
scheduler.onExecutorAdded(executor);
}