You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nemo.apache.org by ja...@apache.org on 2018/08/09 06:24:08 UTC
[incubator-nemo] branch master updated: [NEMO-136] Rename
SchedulerRunner to TaskDispatcher (#97)
This is an automated email from the ASF dual-hosted git repository.
jangho pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git
The following commit(s) were added to refs/heads/master by this push:
new 04d88fe [NEMO-136] Rename SchedulerRunner to TaskDispatcher (#97)
04d88fe is described below
commit 04d88fe3236e033cba4f08237f9a3c0b42753998
Author: John Yang <jo...@gmail.com>
AuthorDate: Thu Aug 9 15:24:06 2018 +0900
[NEMO-136] Rename SchedulerRunner to TaskDispatcher (#97)
JIRA: [NEMO-136: Rename SchedulerRunner to TaskDispatcher](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-136)
**Major changes:**
- Renames SchedulerRunner to TaskDispatcher
**Minor changes to note:**
- N/A
**Tests for the changes:**
- N/A (just renaming)
**Other comments:**
- N/A
resolves [NEMO-136](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-136)
---
.../runtime/master/scheduler/BatchScheduler.java | 22 +++++++++---------
.../{SchedulerRunner.java => TaskDispatcher.java} | 26 +++++++++++-----------
.../runtime/master/scheduler/TaskRetryTest.java | 2 +-
3 files changed, 25 insertions(+), 25 deletions(-)
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchScheduler.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchScheduler.java
index ffee479..24c0e43 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchScheduler.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchScheduler.java
@@ -59,7 +59,7 @@ public final class BatchScheduler implements Scheduler {
/**
* Components related to scheduling the given plan.
*/
- private final SchedulerRunner schedulerRunner;
+ private final TaskDispatcher taskDispatcher;
private final PendingTaskCollectionPointer pendingTaskCollectionPointer;
private final ExecutorRegistry executorRegistry;
@@ -77,13 +77,13 @@ public final class BatchScheduler implements Scheduler {
private List<List<Stage>> sortedScheduleGroups;
@Inject
- private BatchScheduler(final SchedulerRunner schedulerRunner,
+ private BatchScheduler(final TaskDispatcher taskDispatcher,
final PendingTaskCollectionPointer pendingTaskCollectionPointer,
final BlockManagerMaster blockManagerMaster,
final PubSubEventHandlerWrapper pubSubEventHandlerWrapper,
final UpdatePhysicalPlanEventHandler updatePhysicalPlanEventHandler,
final ExecutorRegistry executorRegistry) {
- this.schedulerRunner = schedulerRunner;
+ this.taskDispatcher = taskDispatcher;
this.pendingTaskCollectionPointer = pendingTaskCollectionPointer;
this.blockManagerMaster = blockManagerMaster;
this.pubSubEventHandlerWrapper = pubSubEventHandlerWrapper;
@@ -106,7 +106,7 @@ public final class BatchScheduler implements Scheduler {
this.physicalPlan = submittedPhysicalPlan;
this.planStateManager = submittedPlanStateManager;
- schedulerRunner.run(this.planStateManager);
+ taskDispatcher.run(this.planStateManager);
LOG.info("Plan to schedule: {}", this.physicalPlan.getId());
this.sortedScheduleGroups = this.physicalPlan.getStageDAG().getVertices().stream()
@@ -191,13 +191,13 @@ public final class BatchScheduler implements Scheduler {
break;
}
- // Invoke schedulerRunner.onExecutorSlotAvailable()
+ // Invoke taskDispatcher.onExecutorSlotAvailable()
switch (newState) {
// These three states mean that a slot is made available.
case COMPLETE:
case ON_HOLD:
case SHOULD_RETRY:
- schedulerRunner.onExecutorSlotAvailable();
+ taskDispatcher.onExecutorSlotAvailable();
break;
default:
break;
@@ -218,7 +218,7 @@ public final class BatchScheduler implements Scheduler {
public void onExecutorAdded(final ExecutorRepresenter executorRepresenter) {
LOG.info("{} added (node: {})", executorRepresenter.getExecutorId(), executorRepresenter.getNodeName());
executorRegistry.registerExecutor(executorRepresenter);
- schedulerRunner.onExecutorSlotAvailable();
+ taskDispatcher.onExecutorSlotAvailable();
}
@Override
@@ -242,7 +242,7 @@ public final class BatchScheduler implements Scheduler {
@Override
public void terminate() {
- this.schedulerRunner.terminate();
+ this.taskDispatcher.terminate();
this.executorRegistry.terminate();
}
@@ -254,7 +254,7 @@ public final class BatchScheduler implements Scheduler {
*
* These are the reasons why.
* - We 'reset' {@link PendingTaskCollectionPointer}, and not 'add' new tasks to it
- * - We make {@link SchedulerRunner} run only tasks that are READY.
+ * - We make {@link TaskDispatcher} dispatch only the tasks that are READY.
*/
private void doSchedule() {
final Optional<List<Stage>> earliest = selectEarliestSchedulableGroup();
@@ -277,8 +277,8 @@ public final class BatchScheduler implements Scheduler {
// Set the pointer to the schedulable tasks.
pendingTaskCollectionPointer.setToOverwrite(tasksToSchedule);
- // Notify the runner that a new collection is available.
- schedulerRunner.onNewPendingTaskCollectionAvailable();
+ // Notify the dispatcher that a new collection is available.
+ taskDispatcher.onNewPendingTaskCollectionAvailable();
} else {
LOG.info("Skipping this round as no ScheduleGroup is schedulable.");
}
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulerRunner.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/TaskDispatcher.java
similarity index 90%
rename from runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulerRunner.java
rename to runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/TaskDispatcher.java
index 750c0fc..6c0222c 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulerRunner.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/TaskDispatcher.java
@@ -37,15 +37,15 @@ import javax.annotation.concurrent.NotThreadSafe;
import javax.inject.Inject;
/**
- * Schedules tasks in discrete batches (scheduling iterations).
- * A scheduling iteration occurs under one of the following conditions
+ * Dispatches tasks to executors in discrete batches (dispatch iterations).
+ * A dispatch iteration occurs under one of the following conditions
* - An executor slot becomes available (for reasons such as task completion/failure, or executor addition)
* - A new list of tasks become available (for reasons such as stage completion, task failure, or executor removal)
*/
@DriverSide
@NotThreadSafe
-public final class SchedulerRunner {
- private static final Logger LOG = LoggerFactory.getLogger(SchedulerRunner.class.getName());
+final class TaskDispatcher {
+ private static final Logger LOG = LoggerFactory.getLogger(TaskDispatcher.class.getName());
private final Map<String, PlanStateManager> planStateManagers;
private final PendingTaskCollectionPointer pendingTaskCollectionPointer;
private final ExecutorService schedulerThread;
@@ -58,14 +58,14 @@ public final class SchedulerRunner {
private final SchedulingPolicy schedulingPolicy;
@Inject
- private SchedulerRunner(final SchedulingConstraintRegistry schedulingConstraintRegistry,
- final SchedulingPolicy schedulingPolicy,
- final PendingTaskCollectionPointer pendingTaskCollectionPointer,
- final ExecutorRegistry executorRegistry) {
+ private TaskDispatcher(final SchedulingConstraintRegistry schedulingConstraintRegistry,
+ final SchedulingPolicy schedulingPolicy,
+ final PendingTaskCollectionPointer pendingTaskCollectionPointer,
+ final ExecutorRegistry executorRegistry) {
this.planStateManagers = new HashMap<>();
this.pendingTaskCollectionPointer = pendingTaskCollectionPointer;
this.schedulerThread = Executors.newSingleThreadExecutor(runnable ->
- new Thread(runnable, "SchedulerRunner thread"));
+ new Thread(runnable, "TaskDispatcher thread"));
this.isSchedulerRunning = false;
this.isTerminated = false;
this.executorRegistry = executorRegistry;
@@ -74,7 +74,7 @@ public final class SchedulerRunner {
}
/**
- * A separate thread is run to schedule tasks to executors.
+ * A separate thread is run to dispatch tasks to executors.
* See comments in the {@link Scheduler} for avoiding race conditions.
*/
private final class SchedulerThread implements Runnable {
@@ -91,11 +91,11 @@ public final class SchedulerRunner {
LOG.info("{} is incomplete.", planStateManager.getPlanId());
}
});
- LOG.info("SchedulerRunner Terminated!");
+ LOG.info("TaskDispatcher Terminated!");
}
}
- void doScheduleTaskList() {
+ private void doScheduleTaskList() {
final Optional<Collection<Task>> taskListOptional = pendingTaskCollectionPointer.getAndSetNull();
if (!taskListOptional.isPresent()) {
// Task list is empty
@@ -162,7 +162,7 @@ public final class SchedulerRunner {
}
/**
- * Run the scheduler thread.
+ * Run the dispatcher thread.
*/
void run(final PlanStateManager planStateManager) {
planStateManagers.put(planStateManager.getPlanId(), planStateManager);
diff --git a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/TaskRetryTest.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/TaskRetryTest.java
index e1ab449..2746bf5 100644
--- a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/TaskRetryTest.java
+++ b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/TaskRetryTest.java
@@ -57,7 +57,7 @@ import static org.mockito.Mockito.mock;
* Tests fault tolerance.
*/
@RunWith(PowerMockRunner.class)
-@PrepareForTest({BlockManagerMaster.class, SchedulerRunner.class, SchedulingConstraintRegistry.class,
+@PrepareForTest({BlockManagerMaster.class, TaskDispatcher.class, SchedulingConstraintRegistry.class,
PubSubEventHandlerWrapper.class, UpdatePhysicalPlanEventHandler.class, MetricMessageHandler.class})
public final class TaskRetryTest {
@Rule public TestName testName = new TestName();