You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nemo.apache.org by je...@apache.org on 2018/06/05 09:39:13 UTC
[incubator-nemo] branch master updated: [NEMO-95] Rename
ExecutableTask to Task (#29)
This is an automated email from the ASF dual-hosted git repository.
jeongyoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git
The following commit(s) were added to refs/heads/master by this push:
new a387c4c [NEMO-95] Rename ExecutableTask to Task (#29)
a387c4c is described below
commit a387c4cd0f6285d72b1adef1e5e80b9b99a104cd
Author: John Yang <jo...@gmail.com>
AuthorDate: Tue Jun 5 18:39:02 2018 +0900
[NEMO-95] Rename ExecutableTask to Task (#29)
JIRA: NEMO-95: Rename ExecutableTask to Task
Major changes:
* Rename ExecutableTask to Task
Minor changes to note:
N/A
Tests for the changes:
N/A
Other comments:
N/A
resolves NEMO-95
---
.../common/plan/{ExecutableTask.java => Task.java} | 38 +++++++++++-----------
.../edu/snu/nemo/runtime/executor/Executor.java | 26 +++++++--------
.../snu/nemo/runtime/executor/TaskExecutor.java | 20 ++++++------
.../nemo/runtime/executor/TaskStateManager.java | 8 ++---
.../master/resource/ExecutorRepresenter.java | 14 ++++----
.../master/scheduler/BatchSingleJobScheduler.java | 2 +-
.../scheduler/CompositeSchedulingPolicy.java | 6 ++--
.../ContainerTypeAwareSchedulingPolicy.java | 10 +++---
.../master/scheduler/FreeSlotSchedulingPolicy.java | 6 ++--
.../master/scheduler/PendingTaskCollection.java | 10 +++---
.../scheduler/RoundRobinSchedulingPolicy.java | 6 ++--
.../runtime/master/scheduler/SchedulerRunner.java | 22 ++++++-------
.../runtime/master/scheduler/SchedulingPolicy.java | 4 +--
.../master/scheduler/SingleJobTaskCollection.java | 26 +++++++--------
.../SourceLocationAwareSchedulingPolicy.java | 8 ++---
.../ContainerTypeAwareSchedulingPolicyTest.java | 16 ++++-----
.../scheduler/FreeSlotSchedulingPolicyTest.java | 8 ++---
.../scheduler/RoundRobinSchedulingPolicyTest.java | 8 ++---
.../master/scheduler/SingleTaskQueueTest.java | 18 +++++-----
.../SourceLocationAwareSchedulingPolicyTest.java | 34 +++++++++----------
.../tests/runtime/executor/TaskExecutorTest.java | 14 ++++----
21 files changed, 152 insertions(+), 152 deletions(-)
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/ExecutableTask.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/Task.java
similarity index 72%
rename from runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/ExecutableTask.java
rename to runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/Task.java
index 1da8f97..5cd8b16 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/ExecutableTask.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/Task.java
@@ -23,9 +23,9 @@ import java.util.List;
import java.util.Map;
/**
- * An ExecutableTask is a self-contained executable that can be executed in specific types of containers.
+ * A Task is a self-contained executable that can be executed on a machine.
*/
-public final class ExecutableTask implements Serializable {
+public final class Task implements Serializable {
private final String jobId;
private final String taskId;
private final int taskIdx;
@@ -33,35 +33,35 @@ public final class ExecutableTask implements Serializable {
private final List<StageEdge> taskOutgoingEdges;
private final int attemptIdx;
private final String containerType;
- private final byte[] serializedTaskDag;
+ private final byte[] serializedIRDag;
private final Map<String, Readable> irVertexIdToReadable;
/**
* Constructor.
*
* @param jobId the id of the job.
- * @param taskId the ID of the scheduled task.
+ * @param taskId the ID of the task.
* @param attemptIdx the attempt index.
* @param containerType the type of container to execute the task on.
* @param serializedIRDag the serialized DAG of the task.
* @param taskIncomingEdges the incoming edges of the task.
* @param taskOutgoingEdges the outgoing edges of the task.
- * @param irVertexIdToReadable the map between logical task ID and readable.
+ * @param irVertexIdToReadable the map between IRVertex id to readable.
*/
- public ExecutableTask(final String jobId,
- final String taskId,
- final int attemptIdx,
- final String containerType,
- final byte[] serializedIRDag,
- final List<StageEdge> taskIncomingEdges,
- final List<StageEdge> taskOutgoingEdges,
- final Map<String, Readable> irVertexIdToReadable) {
+ public Task(final String jobId,
+ final String taskId,
+ final int attemptIdx,
+ final String containerType,
+ final byte[] serializedIRDag,
+ final List<StageEdge> taskIncomingEdges,
+ final List<StageEdge> taskOutgoingEdges,
+ final Map<String, Readable> irVertexIdToReadable) {
this.jobId = jobId;
this.taskId = taskId;
this.taskIdx = RuntimeIdGenerator.getIndexFromTaskId(taskId);
this.attemptIdx = attemptIdx;
this.containerType = containerType;
- this.serializedTaskDag = serializedIRDag;
+ this.serializedIRDag = serializedIRDag;
this.taskIncomingEdges = taskIncomingEdges;
this.taskOutgoingEdges = taskOutgoingEdges;
this.irVertexIdToReadable = irVertexIdToReadable;
@@ -75,21 +75,21 @@ public final class ExecutableTask implements Serializable {
}
/**
- * @return the serialized DAG of the task.
+ * @return the serialized IR DAG of the task.
*/
public byte[] getSerializedIRDag() {
- return serializedTaskDag;
+ return serializedIRDag;
}
/**
- * @return the ID of the scheduled task.
+ * @return the ID of the task.
*/
public String getTaskId() {
return taskId;
}
/**
- * @return the idx of the scheduled task.
+ * @return the idx of the task.
*/
public int getTaskIdx() {
return taskIdx;
@@ -124,7 +124,7 @@ public final class ExecutableTask implements Serializable {
}
/**
- * @return the map between logical task ID and readable.
+ * @return the map between IRVertex id and readable.
*/
public Map<String, Readable> getIrVertexIdToReadable() {
return irVertexIdToReadable;
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/Executor.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/Executor.java
index 8872990..287ca01 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/Executor.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/Executor.java
@@ -28,7 +28,7 @@ import edu.snu.nemo.runtime.common.message.MessageEnvironment;
import edu.snu.nemo.runtime.common.message.MessageListener;
import edu.snu.nemo.runtime.common.message.PersistentConnectionToMasterMap;
import edu.snu.nemo.runtime.common.plan.RuntimeEdge;
-import edu.snu.nemo.runtime.common.plan.ExecutableTask;
+import edu.snu.nemo.runtime.common.plan.Task;
import edu.snu.nemo.runtime.executor.data.SerializerManager;
import edu.snu.nemo.runtime.executor.datatransfer.DataTransferFactory;
import org.apache.commons.lang3.SerializationUtils;
@@ -87,26 +87,26 @@ public final class Executor {
return executorId;
}
- private synchronized void onTaskReceived(final ExecutableTask executableTask) {
+ private synchronized void onTaskReceived(final Task task) {
LOG.debug("Executor [{}] received Task [{}] to execute.",
- new Object[]{executorId, executableTask.getTaskId()});
- executorService.execute(() -> launchTask(executableTask));
+ new Object[]{executorId, task.getTaskId()});
+ executorService.execute(() -> launchTask(task));
}
/**
* Launches the Task, and keeps track of the execution state with taskStateManager.
- * @param executableTask to launch.
+ * @param task to launch.
*/
- private void launchTask(final ExecutableTask executableTask) {
+ private void launchTask(final Task task) {
try {
final DAG<IRVertex, RuntimeEdge<IRVertex>> irDag =
- SerializationUtils.deserialize(executableTask.getSerializedIRDag());
+ SerializationUtils.deserialize(task.getSerializedIRDag());
final TaskStateManager taskStateManager =
- new TaskStateManager(executableTask, executorId, persistentConnectionToMasterMap, metricMessageSender);
+ new TaskStateManager(task, executorId, persistentConnectionToMasterMap, metricMessageSender);
- executableTask.getTaskIncomingEdges()
+ task.getTaskIncomingEdges()
.forEach(e -> serializerManager.register(e.getId(), e.getCoder(), e.getExecutionProperties()));
- executableTask.getTaskOutgoingEdges()
+ task.getTaskOutgoingEdges()
.forEach(e -> serializerManager.register(e.getId(), e.getCoder(), e.getExecutionProperties()));
irDag.getVertices().forEach(v -> {
irDag.getOutgoingEdgesOf(v)
@@ -114,7 +114,7 @@ public final class Executor {
});
new TaskExecutor(
- executableTask, irDag, taskStateManager, dataTransferFactory, metricMessageSender).execute();
+ task, irDag, taskStateManager, dataTransferFactory, metricMessageSender).execute();
} catch (final Exception e) {
persistentConnectionToMasterMap.getMessageSender(MessageEnvironment.RUNTIME_MASTER_MESSAGE_LISTENER_ID).send(
ControlMessage.Message.newBuilder()
@@ -149,9 +149,9 @@ public final class Executor {
switch (message.getType()) {
case ScheduleTask:
final ControlMessage.ScheduleTaskMsg scheduleTaskMsg = message.getScheduleTaskMsg();
- final ExecutableTask executableTask =
+ final Task task =
SerializationUtils.deserialize(scheduleTaskMsg.getTask().toByteArray());
- onTaskReceived(executableTask);
+ onTaskReceived(task);
break;
default:
throw new IllegalMessageException(
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskExecutor.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskExecutor.java
index 09a43cc..4a9b1ee 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskExecutor.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskExecutor.java
@@ -23,7 +23,7 @@ import edu.snu.nemo.common.exception.BlockWriteException;
import edu.snu.nemo.common.ir.Readable;
import edu.snu.nemo.common.ir.vertex.*;
import edu.snu.nemo.common.ir.vertex.transform.Transform;
-import edu.snu.nemo.runtime.common.plan.ExecutableTask;
+import edu.snu.nemo.runtime.common.plan.Task;
import edu.snu.nemo.runtime.common.plan.StageEdge;
import edu.snu.nemo.runtime.common.plan.RuntimeEdge;
import edu.snu.nemo.runtime.common.state.TaskState;
@@ -46,7 +46,7 @@ public final class TaskExecutor {
private static final String ITERATORID_PREFIX = "ITERATOR_";
private static final AtomicInteger ITERATORID_GENERATOR = new AtomicInteger(0);
- // From ExecutableTask
+ // From Task
private final DAG<IRVertex, RuntimeEdge<IRVertex>> irVertexDag;
private final String taskId;
private final int taskIdx;
@@ -81,24 +81,24 @@ public final class TaskExecutor {
/**
* Constructor.
- * @param executableTask Task with information needed during execution.
+ * @param task Task with information needed during execution.
* @param irVertexDag A DAG of vertices.
* @param taskStateManager State manager for this Task.
* @param channelFactory For reading from/writing to data to other Stages.
* @param metricMessageSender For sending metric with execution stats to Master.
*/
- public TaskExecutor(final ExecutableTask executableTask,
+ public TaskExecutor(final Task task,
final DAG<IRVertex, RuntimeEdge<IRVertex>> irVertexDag,
final TaskStateManager taskStateManager,
final DataTransferFactory channelFactory,
final MetricMessageSender metricMessageSender) {
- // Information from the ExecutableTask.
+ // Information from the Task.
this.irVertexDag = irVertexDag;
- this.taskId = executableTask.getTaskId();
- this.taskIdx = executableTask.getTaskIdx();
- this.stageIncomingEdges = executableTask.getTaskIncomingEdges();
- this.stageOutgoingEdges = executableTask.getTaskOutgoingEdges();
- this.irVertexIdToReadable = executableTask.getIrVertexIdToReadable();
+ this.taskId = task.getTaskId();
+ this.taskIdx = task.getTaskIdx();
+ this.stageIncomingEdges = task.getTaskIncomingEdges();
+ this.stageOutgoingEdges = task.getTaskOutgoingEdges();
+ this.irVertexIdToReadable = task.getIrVertexIdToReadable();
// Other parameters.
this.taskStateManager = taskStateManager;
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskStateManager.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskStateManager.java
index 7b1bc52..c06ab5a 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskStateManager.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskStateManager.java
@@ -21,7 +21,7 @@ import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
import edu.snu.nemo.runtime.common.comm.ControlMessage;
import edu.snu.nemo.runtime.common.message.MessageEnvironment;
import edu.snu.nemo.runtime.common.message.PersistentConnectionToMasterMap;
-import edu.snu.nemo.runtime.common.plan.ExecutableTask;
+import edu.snu.nemo.runtime.common.plan.Task;
import java.util.*;
@@ -44,12 +44,12 @@ public final class TaskStateManager {
private final MetricCollector metricCollector;
private final PersistentConnectionToMasterMap persistentConnectionToMasterMap;
- public TaskStateManager(final ExecutableTask executableTask,
+ public TaskStateManager(final Task task,
final String executorId,
final PersistentConnectionToMasterMap persistentConnectionToMasterMap,
final MetricMessageSender metricMessageSender) {
- this.taskId = executableTask.getTaskId();
- this.attemptIdx = executableTask.getAttemptIdx();
+ this.taskId = task.getTaskId();
+ this.attemptIdx = task.getAttemptIdx();
this.executorId = executorId;
this.persistentConnectionToMasterMap = persistentConnectionToMasterMap;
this.metricCollector = new MetricCollector(metricMessageSender);
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ExecutorRepresenter.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ExecutorRepresenter.java
index f73ecdb..62ab5c6 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ExecutorRepresenter.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ExecutorRepresenter.java
@@ -20,7 +20,7 @@ import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
import edu.snu.nemo.runtime.common.comm.ControlMessage;
import edu.snu.nemo.runtime.common.message.MessageEnvironment;
import edu.snu.nemo.runtime.common.message.MessageSender;
-import edu.snu.nemo.runtime.common.plan.ExecutableTask;
+import edu.snu.nemo.runtime.common.plan.Task;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.reef.driver.context.ActiveContext;
@@ -95,17 +95,17 @@ public final class ExecutorRepresenter {
/**
* Marks the Task as running, and sends scheduling message to the executor.
- * @param executableTask
+ * @param task
*/
- public void onTaskScheduled(final ExecutableTask executableTask) {
- runningTasks.add(executableTask.getTaskId());
- runningTaskToAttempt.put(executableTask.getTaskId(), executableTask.getAttemptIdx());
- failedTasks.remove(executableTask.getTaskId());
+ public void onTaskScheduled(final Task task) {
+ runningTasks.add(task.getTaskId());
+ runningTaskToAttempt.put(task.getTaskId(), task.getAttemptIdx());
+ failedTasks.remove(task.getTaskId());
serializationExecutorService.submit(new Runnable() {
@Override
public void run() {
- final byte[] serialized = SerializationUtils.serialize(executableTask);
+ final byte[] serialized = SerializationUtils.serialize(task);
sendControlMessage(
ControlMessage.Message.newBuilder()
.setId(RuntimeIdGenerator.generateMessageId())
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java
index c51d1ec..273f108 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java
@@ -400,7 +400,7 @@ public final class BatchSingleJobScheduler implements Scheduler {
final int attemptIdx = jobStateManager.getCurrentAttemptIndexForTask(taskId);
LOG.debug("Enqueueing {}", taskId);
- pendingTaskCollection.add(new ExecutableTask(
+ pendingTaskCollection.add(new Task(
physicalPlan.getId(),
taskId,
attemptIdx,
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/CompositeSchedulingPolicy.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/CompositeSchedulingPolicy.java
index 66fa986..e15e401 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/CompositeSchedulingPolicy.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/CompositeSchedulingPolicy.java
@@ -15,7 +15,7 @@
*/
package edu.snu.nemo.runtime.master.scheduler;
-import edu.snu.nemo.runtime.common.plan.ExecutableTask;
+import edu.snu.nemo.runtime.common.plan.Task;
import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
import javax.inject.Inject;
@@ -46,10 +46,10 @@ public final class CompositeSchedulingPolicy implements SchedulingPolicy {
@Override
public Set<ExecutorRepresenter> filterExecutorRepresenters(final Set<ExecutorRepresenter> executorRepresenterSet,
- final ExecutableTask executableTask) {
+ final Task task) {
Set<ExecutorRepresenter> candidates = executorRepresenterSet;
for (final SchedulingPolicy schedulingPolicy : schedulingPolicies) {
- candidates = schedulingPolicy.filterExecutorRepresenters(candidates, executableTask);
+ candidates = schedulingPolicy.filterExecutorRepresenters(candidates, task);
}
return candidates;
}
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingPolicy.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingPolicy.java
index 46b3c6b..b8f7d74 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingPolicy.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingPolicy.java
@@ -17,7 +17,7 @@ package edu.snu.nemo.runtime.master.scheduler;
import com.google.common.annotations.VisibleForTesting;
import edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty;
-import edu.snu.nemo.runtime.common.plan.ExecutableTask;
+import edu.snu.nemo.runtime.common.plan.Task;
import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
import javax.inject.Inject;
@@ -37,20 +37,20 @@ public final class ContainerTypeAwareSchedulingPolicy implements SchedulingPolic
/**
* @param executorRepresenterSet Set of {@link ExecutorRepresenter} to be filtered by the container type.
* If the container type of target Task is NONE, it will return the original set.
- * @param executableTask {@link ExecutableTask} to be scheduled.
+ * @param task {@link Task} to be scheduled.
* @return filtered Set of {@link ExecutorRepresenter}.
*/
@Override
public Set<ExecutorRepresenter> filterExecutorRepresenters(final Set<ExecutorRepresenter> executorRepresenterSet,
- final ExecutableTask executableTask) {
+ final Task task) {
- if (executableTask.getContainerType().equals(ExecutorPlacementProperty.NONE)) {
+ if (task.getContainerType().equals(ExecutorPlacementProperty.NONE)) {
return executorRepresenterSet;
}
final Set<ExecutorRepresenter> candidateExecutors =
executorRepresenterSet.stream()
- .filter(executor -> executor.getContainerType().equals(executableTask.getContainerType()))
+ .filter(executor -> executor.getContainerType().equals(task.getContainerType()))
.collect(Collectors.toSet());
return candidateExecutors;
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingPolicy.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingPolicy.java
index 5d19f52..d92d9d4 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingPolicy.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingPolicy.java
@@ -16,7 +16,7 @@
package edu.snu.nemo.runtime.master.scheduler;
import com.google.common.annotations.VisibleForTesting;
-import edu.snu.nemo.runtime.common.plan.ExecutableTask;
+import edu.snu.nemo.runtime.common.plan.Task;
import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
import javax.inject.Inject;
@@ -35,12 +35,12 @@ public final class FreeSlotSchedulingPolicy implements SchedulingPolicy {
/**
* @param executorRepresenterSet Set of {@link ExecutorRepresenter} to be filtered by the free slot of executors.
* Executors that do not have any free slots will be filtered by this policy.
- * @param executableTask {@link ExecutableTask} to be scheduled.
+ * @param task {@link Task} to be scheduled.
* @return filtered Set of {@link ExecutorRepresenter}.
*/
@Override
public Set<ExecutorRepresenter> filterExecutorRepresenters(final Set<ExecutorRepresenter> executorRepresenterSet,
- final ExecutableTask executableTask) {
+ final Task task) {
final Set<ExecutorRepresenter> candidateExecutors =
executorRepresenterSet.stream()
.filter(executor -> executor.getRunningTasks().size() < executor.getExecutorCapacity())
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/PendingTaskCollection.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/PendingTaskCollection.java
index 1d8c8a2..8d954e3 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/PendingTaskCollection.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/PendingTaskCollection.java
@@ -16,7 +16,7 @@
package edu.snu.nemo.runtime.master.scheduler;
import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
-import edu.snu.nemo.runtime.common.plan.ExecutableTask;
+import edu.snu.nemo.runtime.common.plan.Task;
import net.jcip.annotations.ThreadSafe;
import org.apache.reef.annotations.audience.DriverSide;
@@ -38,9 +38,9 @@ public interface PendingTaskCollection {
/**
* Adds a Task to this collection.
- * @param executableTask to add.
+ * @param task to add.
*/
- void add(final ExecutableTask executableTask);
+ void add(final Task task);
/**
* Removes the specified Task to be scheduled.
@@ -49,14 +49,14 @@ public interface PendingTaskCollection {
* @throws NoSuchElementException if the specified Task is not in the queue,
* or removing this Task breaks scheduling order
*/
- ExecutableTask remove(final String taskId) throws NoSuchElementException;
+ Task remove(final String taskId) throws NoSuchElementException;
/**
* Peeks stage that can be scheduled according to job dependency priority.
* Changes to the queue must not reflected to the returned collection to avoid concurrent modification.
* @return stage that can be scheduled, or {@link Optional#empty()} if the queue is empty
*/
- Optional<Collection<ExecutableTask>> peekSchedulableStage();
+ Optional<Collection<Task>> peekSchedulableStage();
/**
* Registers a job to this queue in case the queue needs to understand the topology of the job DAG.
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/RoundRobinSchedulingPolicy.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/RoundRobinSchedulingPolicy.java
index afdc671..5c6e65c 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/RoundRobinSchedulingPolicy.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/RoundRobinSchedulingPolicy.java
@@ -16,7 +16,7 @@
package edu.snu.nemo.runtime.master.scheduler;
import com.google.common.annotations.VisibleForTesting;
-import edu.snu.nemo.runtime.common.plan.ExecutableTask;
+import edu.snu.nemo.runtime.common.plan.Task;
import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
import org.apache.reef.annotations.audience.DriverSide;
@@ -47,12 +47,12 @@ public final class RoundRobinSchedulingPolicy implements SchedulingPolicy {
/**
* @param executorRepresenterSet Set of {@link ExecutorRepresenter} to be filtered by round robin behaviour.
- * @param executableTask {@link ExecutableTask} to be scheduled.
+ * @param task {@link Task} to be scheduled.
* @return filtered Set of {@link ExecutorRepresenter}.
*/
@Override
public Set<ExecutorRepresenter> filterExecutorRepresenters(final Set<ExecutorRepresenter> executorRepresenterSet,
- final ExecutableTask executableTask) {
+ final Task task) {
final OptionalInt minOccupancy =
executorRepresenterSet.stream()
.map(executor -> executor.getRunningTasks().size())
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulerRunner.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulerRunner.java
index aa53780..a329c4b 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulerRunner.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulerRunner.java
@@ -16,7 +16,7 @@
package edu.snu.nemo.runtime.master.scheduler;
import com.google.common.annotations.VisibleForTesting;
-import edu.snu.nemo.runtime.common.plan.ExecutableTask;
+import edu.snu.nemo.runtime.common.plan.Task;
import edu.snu.nemo.runtime.common.state.JobState;
import edu.snu.nemo.runtime.common.state.TaskState;
import edu.snu.nemo.runtime.master.JobStateManager;
@@ -112,7 +112,7 @@ public final class SchedulerRunner {
}
void doScheduleStage() {
- final Collection<ExecutableTask> stageToSchedule = pendingTaskCollection.peekSchedulableStage().orElse(null);
+ final Collection<Task> stageToSchedule = pendingTaskCollection.peekSchedulableStage().orElse(null);
if (stageToSchedule == null) {
// Task queue is empty
LOG.debug("PendingTaskCollection is empty. Awaiting for more Tasks...");
@@ -120,27 +120,27 @@ public final class SchedulerRunner {
}
final AtomicInteger numScheduledTasks = new AtomicInteger(0); // to be incremented in lambda
- for (final ExecutableTask schedulableTask : stageToSchedule) {
- final JobStateManager jobStateManager = jobStateManagers.get(schedulableTask.getJobId());
- LOG.debug("Trying to schedule {}...", schedulableTask.getTaskId());
+ for (final Task task : stageToSchedule) {
+ final JobStateManager jobStateManager = jobStateManagers.get(task.getJobId());
+ LOG.debug("Trying to schedule {}...", task.getTaskId());
executorRegistry.viewExecutors(executors -> {
final Set<ExecutorRepresenter> candidateExecutors =
- schedulingPolicy.filterExecutorRepresenters(executors, schedulableTask);
+ schedulingPolicy.filterExecutorRepresenters(executors, task);
final Optional<ExecutorRepresenter> firstCandidate = candidateExecutors.stream().findFirst();
if (firstCandidate.isPresent()) {
// update metadata first
- jobStateManager.onTaskStateChanged(schedulableTask.getTaskId(), TaskState.State.EXECUTING);
- pendingTaskCollection.remove(schedulableTask.getTaskId());
+ jobStateManager.onTaskStateChanged(task.getTaskId(), TaskState.State.EXECUTING);
+ pendingTaskCollection.remove(task.getTaskId());
numScheduledTasks.incrementAndGet();
// send the task
final ExecutorRepresenter selectedExecutor = firstCandidate.get();
- selectedExecutor.onTaskScheduled(schedulableTask);
- LOG.debug("Successfully scheduled {}", schedulableTask.getTaskId());
+ selectedExecutor.onTaskScheduled(task);
+ LOG.debug("Successfully scheduled {}", task.getTaskId());
} else {
- LOG.debug("Failed to schedule {}", schedulableTask.getTaskId());
+ LOG.debug("Failed to schedule {}", task.getTaskId());
}
});
}
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingPolicy.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingPolicy.java
index 7ea05fb..0064310 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingPolicy.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingPolicy.java
@@ -15,7 +15,7 @@
*/
package edu.snu.nemo.runtime.master.scheduler;
-import edu.snu.nemo.runtime.common.plan.ExecutableTask;
+import edu.snu.nemo.runtime.common.plan.Task;
import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
import org.apache.reef.annotations.audience.DriverSide;
import org.apache.reef.tang.annotations.DefaultImplementation;
@@ -32,5 +32,5 @@ import java.util.Set;
@DefaultImplementation(CompositeSchedulingPolicy.class)
public interface SchedulingPolicy {
Set<ExecutorRepresenter> filterExecutorRepresenters(final Set<ExecutorRepresenter> executorRepresenterSet,
- final ExecutableTask executableTask);
+ final Task task);
}
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SingleJobTaskCollection.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SingleJobTaskCollection.java
index fae4565..b1318b1 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SingleJobTaskCollection.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SingleJobTaskCollection.java
@@ -20,7 +20,7 @@ import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
import edu.snu.nemo.runtime.common.plan.Stage;
import edu.snu.nemo.runtime.common.plan.StageEdge;
-import edu.snu.nemo.runtime.common.plan.ExecutableTask;
+import edu.snu.nemo.runtime.common.plan.Task;
import net.jcip.annotations.ThreadSafe;
import org.apache.reef.annotations.audience.DriverSide;
@@ -41,7 +41,7 @@ public final class SingleJobTaskCollection implements PendingTaskCollection {
/**
* Pending Tasks awaiting to be scheduled for each stage.
*/
- private final ConcurrentMap<String, Map<String, ExecutableTask>> stageIdToPendingTasks;
+ private final ConcurrentMap<String, Map<String, Task>> stageIdToPendingTasks;
/**
* Stages with Tasks that have not yet been scheduled.
@@ -55,17 +55,17 @@ public final class SingleJobTaskCollection implements PendingTaskCollection {
}
@Override
- public synchronized void add(final ExecutableTask executableTask) {
- final String stageId = RuntimeIdGenerator.getStageIdFromTaskId(executableTask.getTaskId());
+ public synchronized void add(final Task task) {
+ final String stageId = RuntimeIdGenerator.getStageIdFromTaskId(task.getTaskId());
stageIdToPendingTasks.compute(stageId, (s, taskIdToTask) -> {
if (taskIdToTask == null) {
- final Map<String, ExecutableTask> taskIdToTaskMap = new HashMap<>();
- taskIdToTaskMap.put(executableTask.getTaskId(), executableTask);
- updateSchedulableStages(stageId, executableTask.getContainerType());
+ final Map<String, Task> taskIdToTaskMap = new HashMap<>();
+ taskIdToTaskMap.put(task.getTaskId(), task);
+ updateSchedulableStages(stageId, task.getContainerType());
return taskIdToTaskMap;
} else {
- taskIdToTask.put(executableTask.getTaskId(), executableTask);
+ taskIdToTask.put(task.getTaskId(), task);
return taskIdToTask;
}
});
@@ -81,18 +81,18 @@ public final class SingleJobTaskCollection implements PendingTaskCollection {
* (i.e. does not belong to the collection from {@link #peekSchedulableStage()}.
*/
@Override
- public synchronized ExecutableTask remove(final String taskId) throws NoSuchElementException {
+ public synchronized Task remove(final String taskId) throws NoSuchElementException {
final String stageId = schedulableStages.peekFirst();
if (stageId == null) {
throw new NoSuchElementException("No schedulable stage in Task queue");
}
- final Map<String, ExecutableTask> pendingTasksForStage = stageIdToPendingTasks.get(stageId);
+ final Map<String, Task> pendingTasksForStage = stageIdToPendingTasks.get(stageId);
if (pendingTasksForStage == null) {
throw new RuntimeException(String.format("Stage %s not found in Task queue", stageId));
}
- final ExecutableTask taskToSchedule = pendingTasksForStage.remove(taskId);
+ final Task taskToSchedule = pendingTasksForStage.remove(taskId);
if (taskToSchedule == null) {
throw new NoSuchElementException(String.format("Task %s not found in Task queue", taskId));
}
@@ -115,13 +115,13 @@ public final class SingleJobTaskCollection implements PendingTaskCollection {
* or {@link Optional#empty} if the queue is empty
*/
@Override
- public synchronized Optional<Collection<ExecutableTask>> peekSchedulableStage() {
+ public synchronized Optional<Collection<Task>> peekSchedulableStage() {
final String stageId = schedulableStages.peekFirst();
if (stageId == null) {
return Optional.empty();
}
- final Map<String, ExecutableTask> pendingTasksForStage = stageIdToPendingTasks.get(stageId);
+ final Map<String, Task> pendingTasksForStage = stageIdToPendingTasks.get(stageId);
if (pendingTasksForStage == null) {
throw new RuntimeException(String.format("Stage %s not found in stageIdToPendingTasks map", stageId));
}
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingPolicy.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingPolicy.java
index bee0a36..35dbb07 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingPolicy.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingPolicy.java
@@ -17,7 +17,7 @@ package edu.snu.nemo.runtime.master.scheduler;
import com.google.common.annotations.VisibleForTesting;
import edu.snu.nemo.common.ir.Readable;
-import edu.snu.nemo.runtime.common.plan.ExecutableTask;
+import edu.snu.nemo.runtime.common.plan.Task;
import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
import org.apache.reef.annotations.audience.DriverSide;
import org.slf4j.Logger;
@@ -59,15 +59,15 @@ public final class SourceLocationAwareSchedulingPolicy implements SchedulingPoli
/**
* @param executorRepresenterSet Set of {@link ExecutorRepresenter} to be filtered by source location.
* If there is no source locations, will return original set.
- * @param executableTask {@link ExecutableTask} to be scheduled.
+ * @param task {@link Task} to be scheduled.
* @return filtered Set of {@link ExecutorRepresenter}.
*/
@Override
public Set<ExecutorRepresenter> filterExecutorRepresenters(final Set<ExecutorRepresenter> executorRepresenterSet,
- final ExecutableTask executableTask) {
+ final Task task) {
final Set<String> sourceLocations;
try {
- sourceLocations = getSourceLocations(executableTask.getIrVertexIdToReadable().values());
+ sourceLocations = getSourceLocations(task.getIrVertexIdToReadable().values());
} catch (final UnsupportedOperationException e) {
return executorRepresenterSet;
} catch (final Exception e) {
diff --git a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingPolicyTest.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingPolicyTest.java
index 1b7768f..55808c9 100644
--- a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingPolicyTest.java
+++ b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingPolicyTest.java
@@ -16,7 +16,7 @@
package edu.snu.nemo.runtime.master.scheduler;
import edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty;
-import edu.snu.nemo.runtime.common.plan.ExecutableTask;
+import edu.snu.nemo.runtime.common.plan.Task;
import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -32,7 +32,7 @@ import static org.mockito.Mockito.*;
* Tests {@link ContainerTypeAwareSchedulingPolicy}.
*/
@RunWith(PowerMockRunner.class)
-@PrepareForTest({ExecutorRepresenter.class, ExecutableTask.class})
+@PrepareForTest({ExecutorRepresenter.class, Task.class})
public final class ContainerTypeAwareSchedulingPolicyTest {
private static ExecutorRepresenter mockExecutorRepresenter(final String containerType) {
@@ -48,24 +48,24 @@ public final class ContainerTypeAwareSchedulingPolicyTest {
final ExecutorRepresenter a1 = mockExecutorRepresenter(ExecutorPlacementProperty.RESERVED);
final ExecutorRepresenter a2 = mockExecutorRepresenter(ExecutorPlacementProperty.NONE);
- final ExecutableTask executableTask1 = mock(ExecutableTask.class);
- when(executableTask1.getContainerType()).thenReturn(ExecutorPlacementProperty.RESERVED);
+ final Task task1 = mock(Task.class);
+ when(task1.getContainerType()).thenReturn(ExecutorPlacementProperty.RESERVED);
final Set<ExecutorRepresenter> executorRepresenterList1 = new HashSet<>(Arrays.asList(a0, a1, a2));
final Set<ExecutorRepresenter> candidateExecutors1 =
- schedulingPolicy.filterExecutorRepresenters(executorRepresenterList1, executableTask1);
+ schedulingPolicy.filterExecutorRepresenters(executorRepresenterList1, task1);
final Set<ExecutorRepresenter> expectedExecutors1 = new HashSet<>(Arrays.asList(a1));
assertEquals(expectedExecutors1, candidateExecutors1);
- final ExecutableTask executableTask2 = mock(ExecutableTask.class);
- when(executableTask2.getContainerType()).thenReturn(ExecutorPlacementProperty.NONE);
+ final Task task2 = mock(Task.class);
+ when(task2.getContainerType()).thenReturn(ExecutorPlacementProperty.NONE);
final Set<ExecutorRepresenter> executorRepresenterList2 = new HashSet<>(Arrays.asList(a0, a1, a2));
final Set<ExecutorRepresenter> candidateExecutors2 =
- schedulingPolicy.filterExecutorRepresenters(executorRepresenterList2, executableTask2);
+ schedulingPolicy.filterExecutorRepresenters(executorRepresenterList2, task2);
final Set<ExecutorRepresenter> expectedExecutors2 = new HashSet<>(Arrays.asList(a0, a1, a2));
assertEquals(expectedExecutors2, candidateExecutors2);
diff --git a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingPolicyTest.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingPolicyTest.java
index ac77c46..c60eeb3 100644
--- a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingPolicyTest.java
+++ b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingPolicyTest.java
@@ -15,7 +15,7 @@
*/
package edu.snu.nemo.runtime.master.scheduler;
-import edu.snu.nemo.runtime.common.plan.ExecutableTask;
+import edu.snu.nemo.runtime.common.plan.Task;
import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -32,7 +32,7 @@ import static org.mockito.Mockito.*;
* Tests {@link FreeSlotSchedulingPolicy}.
*/
@RunWith(PowerMockRunner.class)
-@PrepareForTest({ExecutorRepresenter.class, ExecutableTask.class})
+@PrepareForTest({ExecutorRepresenter.class, Task.class})
public final class FreeSlotSchedulingPolicyTest {
private static ExecutorRepresenter mockExecutorRepresenter(final int numRunningTasks,
@@ -51,12 +51,12 @@ public final class FreeSlotSchedulingPolicyTest {
final ExecutorRepresenter a0 = mockExecutorRepresenter(1, 1);
final ExecutorRepresenter a1 = mockExecutorRepresenter(2, 3);
- final ExecutableTask executableTask = mock(ExecutableTask.class);
+ final Task task = mock(Task.class);
final Set<ExecutorRepresenter> executorRepresenterList = new HashSet<>(Arrays.asList(a0, a1));
final Set<ExecutorRepresenter> candidateExecutors =
- schedulingPolicy.filterExecutorRepresenters(executorRepresenterList, executableTask);
+ schedulingPolicy.filterExecutorRepresenters(executorRepresenterList, task);
final Set<ExecutorRepresenter> expectedExecutors = new HashSet<>(Arrays.asList(a1));
assertEquals(expectedExecutors, candidateExecutors);
diff --git a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/RoundRobinSchedulingPolicyTest.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/RoundRobinSchedulingPolicyTest.java
index d40859b..57aa8cb 100644
--- a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/RoundRobinSchedulingPolicyTest.java
+++ b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/RoundRobinSchedulingPolicyTest.java
@@ -15,7 +15,7 @@
*/
package edu.snu.nemo.runtime.master.scheduler;
-import edu.snu.nemo.runtime.common.plan.ExecutableTask;
+import edu.snu.nemo.runtime.common.plan.Task;
import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -32,7 +32,7 @@ import static org.mockito.Mockito.*;
* Tests {@link RoundRobinSchedulingPolicy}
*/
@RunWith(PowerMockRunner.class)
-@PrepareForTest({ExecutorRepresenter.class, ExecutableTask.class})
+@PrepareForTest({ExecutorRepresenter.class, Task.class})
public final class RoundRobinSchedulingPolicyTest {
private static ExecutorRepresenter mockExecutorRepresenter(final int numRunningTasks) {
@@ -50,12 +50,12 @@ public final class RoundRobinSchedulingPolicyTest {
final ExecutorRepresenter a1 = mockExecutorRepresenter(2);
final ExecutorRepresenter a2 = mockExecutorRepresenter(2);
- final ExecutableTask executableTask = mock(ExecutableTask.class);
+ final Task task = mock(Task.class);
final Set<ExecutorRepresenter> executorRepresenterList = new HashSet<>(Arrays.asList(a0, a1, a2));
final Set<ExecutorRepresenter> candidateExecutors =
- schedulingPolicy.filterExecutorRepresenters(executorRepresenterList, executableTask);
+ schedulingPolicy.filterExecutorRepresenters(executorRepresenterList, task);
final Set<ExecutorRepresenter> expectedExecutors = new HashSet<>(Arrays.asList(a0));
assertEquals(expectedExecutors, candidateExecutors);
diff --git a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SingleTaskQueueTest.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SingleTaskQueueTest.java
index 921dfe8..852ebbc 100644
--- a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SingleTaskQueueTest.java
+++ b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SingleTaskQueueTest.java
@@ -16,7 +16,7 @@
package edu.snu.nemo.runtime.master.scheduler;
import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
-import edu.snu.nemo.runtime.common.plan.ExecutableTask;
+import edu.snu.nemo.runtime.common.plan.Task;
import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
import edu.snu.nemo.runtime.common.plan.Stage;
import edu.snu.nemo.runtime.plangenerator.TestPlanGenerator;
@@ -85,7 +85,7 @@ public final class SingleTaskQueueTest {
executorService.submit(() -> {
try {
assertEquals(dequeueAndGetStageId(), dagOf2Stages.get(1).getId());
- final ExecutableTask dequeuedTask = dequeue();
+ final Task dequeuedTask = dequeue();
assertEquals(RuntimeIdGenerator.getStageIdFromTaskId(dequeuedTask.getTaskId()),
dagOf2Stages.get(1).getId());
@@ -137,7 +137,7 @@ public final class SingleTaskQueueTest {
executorService.submit(() -> {
try {
assertEquals(dequeueAndGetStageId(), dagOf2Stages.get(0).getId());
- final ExecutableTask dequeuedTask = dequeue();
+ final Task dequeuedTask = dequeue();
assertEquals(RuntimeIdGenerator.getStageIdFromTaskId(dequeuedTask.getTaskId()),
dagOf2Stages.get(0).getId());
@@ -213,7 +213,7 @@ public final class SingleTaskQueueTest {
*/
private void scheduleStage(final Stage stage) {
stage.getTaskIds().forEach(taskId ->
- pendingTaskPriorityQueue.add(new ExecutableTask(
+ pendingTaskPriorityQueue.add(new Task(
"TestPlan",
taskId,
0,
@@ -229,17 +229,17 @@ public final class SingleTaskQueueTest {
* @return the stage name of the dequeued task.
*/
private String dequeueAndGetStageId() {
- final ExecutableTask executableTask = dequeue();
- return RuntimeIdGenerator.getStageIdFromTaskId(executableTask.getTaskId());
+ final Task task = dequeue();
+ return RuntimeIdGenerator.getStageIdFromTaskId(task.getTaskId());
}
/**
* Dequeues a scheduled task from the task priority queue.
* @return the Task dequeued
*/
- private ExecutableTask dequeue() {
- final Collection<ExecutableTask> executableTasks
+ private Task dequeue() {
+ final Collection<Task> tasks
= pendingTaskPriorityQueue.peekSchedulableStage().get();
- return pendingTaskPriorityQueue.remove(executableTasks.iterator().next().getTaskId());
+ return pendingTaskPriorityQueue.remove(tasks.iterator().next().getTaskId());
}
}
diff --git a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingPolicyTest.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingPolicyTest.java
index 4263e7f..2f538ee 100644
--- a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingPolicyTest.java
+++ b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingPolicyTest.java
@@ -15,7 +15,7 @@
*/
package edu.snu.nemo.runtime.master.scheduler;
-import edu.snu.nemo.runtime.common.plan.ExecutableTask;
+import edu.snu.nemo.runtime.common.plan.Task;
import edu.snu.nemo.common.ir.Readable;
import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
import org.junit.Test;
@@ -33,7 +33,7 @@ import static org.mockito.Mockito.*;
* Test cases for {@link SourceLocationAwareSchedulingPolicy}.
*/
@RunWith(PowerMockRunner.class)
-@PrepareForTest({ExecutorRepresenter.class, ExecutableTask.class, Readable.class})
+@PrepareForTest({ExecutorRepresenter.class, Task.class, Readable.class})
public final class SourceLocationAwareSchedulingPolicyTest {
private static final String SITE_0 = "SEOUL";
private static final String SITE_1 = "JINJU";
@@ -46,7 +46,7 @@ public final class SourceLocationAwareSchedulingPolicyTest {
}
/**
- * {@link SourceLocationAwareSchedulingPolicy} should fail to schedule a {@link ExecutableTask} when
+ * {@link SourceLocationAwareSchedulingPolicy} should fail to schedule a {@link Task} when
* there are no executors in appropriate location(s).
*/
@Test
@@ -54,7 +54,7 @@ public final class SourceLocationAwareSchedulingPolicyTest {
final SchedulingPolicy schedulingPolicy = new SourceLocationAwareSchedulingPolicy();
// Prepare test scenario
- final ExecutableTask task = CreateExecutableTask.withReadablesWithSourceLocations(
+ final Task task = CreateTask.withReadablesWithSourceLocations(
Collections.singletonList(Collections.singletonList(SITE_0)));
final ExecutorRepresenter e0 = mockExecutorRepresenter(SITE_1);
final ExecutorRepresenter e1 = mockExecutorRepresenter(SITE_1);
@@ -70,19 +70,19 @@ public final class SourceLocationAwareSchedulingPolicyTest {
public void testSourceLocationAwareSchedulingWithMultiSource() {
final SchedulingPolicy schedulingPolicy = new SourceLocationAwareSchedulingPolicy();
// Prepare test scenario
- final ExecutableTask task0 = CreateExecutableTask.withReadablesWithSourceLocations(
+ final Task task0 = CreateTask.withReadablesWithSourceLocations(
Collections.singletonList(Collections.singletonList(SITE_1)));
- final ExecutableTask task1 = CreateExecutableTask.withReadablesWithSourceLocations(
+ final Task task1 = CreateTask.withReadablesWithSourceLocations(
Collections.singletonList(Arrays.asList(SITE_0, SITE_1, SITE_2)));
- final ExecutableTask task2 = CreateExecutableTask.withReadablesWithSourceLocations(
+ final Task task2 = CreateTask.withReadablesWithSourceLocations(
Arrays.asList(Collections.singletonList(SITE_0), Collections.singletonList(SITE_1),
Arrays.asList(SITE_1, SITE_2)));
- final ExecutableTask task3 = CreateExecutableTask.withReadablesWithSourceLocations(
+ final Task task3 = CreateTask.withReadablesWithSourceLocations(
Arrays.asList(Collections.singletonList(SITE_1), Collections.singletonList(SITE_0),
Arrays.asList(SITE_0, SITE_2)));
final ExecutorRepresenter e = mockExecutorRepresenter(SITE_1);
- for (final ExecutableTask task : new HashSet<>(Arrays.asList(task0, task1, task2, task3))) {
+ for (final Task task : new HashSet<>(Arrays.asList(task0, task1, task2, task3))) {
assertEquals(new HashSet<>(Collections.singletonList(e)), schedulingPolicy.filterExecutorRepresenters(
new HashSet<>(Collections.singletonList(e)), task));
}
@@ -90,14 +90,14 @@ public final class SourceLocationAwareSchedulingPolicyTest {
/**
- * Utility for creating {@link ExecutableTask}.
+ * Utility for creating {@link Task}.
*/
- private static final class CreateExecutableTask {
+ private static final class CreateTask {
private static final AtomicInteger taskIndex = new AtomicInteger(0);
private static final AtomicInteger intraTaskIndex = new AtomicInteger(0);
- private static ExecutableTask doCreate(final Collection<Readable> readables) {
- final ExecutableTask mockInstance = mock(ExecutableTask.class);
+ private static Task doCreate(final Collection<Readable> readables) {
+ final Task mockInstance = mock(Task.class);
final Map<String, Readable> readableMap = new HashMap<>();
readables.forEach(readable -> readableMap.put(String.format("TASK-%d", intraTaskIndex.getAndIncrement()),
readable));
@@ -106,7 +106,7 @@ public final class SourceLocationAwareSchedulingPolicyTest {
return mockInstance;
}
- static ExecutableTask withReadablesWithSourceLocations(final Collection<List<String>> sourceLocation) {
+ static Task withReadablesWithSourceLocations(final Collection<List<String>> sourceLocation) {
try {
final List<Readable> readables = new ArrayList<>();
for (final List<String> locations : sourceLocation) {
@@ -120,7 +120,7 @@ public final class SourceLocationAwareSchedulingPolicyTest {
}
}
- static ExecutableTask withReadablesWithoutSourceLocations(final int numReadables) {
+ static Task withReadablesWithoutSourceLocations(final int numReadables) {
try {
final List<Readable> readables = new ArrayList<>();
for (int i = 0; i < numReadables; i++) {
@@ -134,7 +134,7 @@ public final class SourceLocationAwareSchedulingPolicyTest {
}
}
- static ExecutableTask withReadablesWhichThrowException(final int numReadables) {
+ static Task withReadablesWhichThrowException(final int numReadables) {
try {
final List<Readable> readables = new ArrayList<>();
for (int i = 0; i < numReadables; i++) {
@@ -148,7 +148,7 @@ public final class SourceLocationAwareSchedulingPolicyTest {
}
}
- static ExecutableTask withoutReadables() {
+ static Task withoutReadables() {
return doCreate(Collections.emptyList());
}
}
diff --git a/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/TaskExecutorTest.java b/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/TaskExecutorTest.java
index a946b4b..ffbfef6 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/TaskExecutorTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/TaskExecutorTest.java
@@ -27,7 +27,7 @@ import edu.snu.nemo.common.ir.executionproperty.ExecutionPropertyMap;
import edu.snu.nemo.common.ir.vertex.IRVertex;
import edu.snu.nemo.compiler.optimizer.examples.EmptyComponents;
import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
-import edu.snu.nemo.runtime.common.plan.ExecutableTask;
+import edu.snu.nemo.runtime.common.plan.Task;
import edu.snu.nemo.runtime.common.plan.StageEdge;
import edu.snu.nemo.runtime.common.plan.RuntimeEdge;
import edu.snu.nemo.runtime.executor.MetricMessageSender;
@@ -116,8 +116,8 @@ public final class TaskExecutorTest {
final StageEdge stageOutEdge = mock(StageEdge.class);
when(stageOutEdge.getSrcVertex()).thenReturn(sourceIRVertex);
final String taskId = RuntimeIdGenerator.generateTaskId(0, stageId);
- final ExecutableTask executableTask =
- new ExecutableTask(
+ final Task task =
+ new Task(
"testSourceVertex",
taskId,
0,
@@ -129,7 +129,7 @@ public final class TaskExecutorTest {
// Execute the task.
final TaskExecutor taskExecutor = new TaskExecutor(
- executableTask, taskDag, taskStateManager, dataTransferFactory, metricMessageSender);
+ task, taskDag, taskStateManager, dataTransferFactory, metricMessageSender);
taskExecutor.execute();
// Check the output.
@@ -170,8 +170,8 @@ public final class TaskExecutorTest {
when(stageInEdge.getDstVertex()).thenReturn(operatorIRVertex1);
final StageEdge stageOutEdge = mock(StageEdge.class);
when(stageOutEdge.getSrcVertex()).thenReturn(operatorIRVertex2);
- final ExecutableTask executableTask =
- new ExecutableTask(
+ final Task task =
+ new Task(
"testSourceVertex",
taskId,
0,
@@ -183,7 +183,7 @@ public final class TaskExecutorTest {
// Execute the task.
final TaskExecutor taskExecutor = new TaskExecutor(
- executableTask, taskDag, taskStateManager, dataTransferFactory, metricMessageSender);
+ task, taskDag, taskStateManager, dataTransferFactory, metricMessageSender);
taskExecutor.execute();
// Check the output.
--
To stop receiving notification emails like this one, please contact
jeongyoon@apache.org.