You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nemo.apache.org by ja...@apache.org on 2018/08/09 06:24:08 UTC

[incubator-nemo] branch master updated: [NEMO-136] Rename SchedulerRunner to TaskDispatcher (#97)

This is an automated email from the ASF dual-hosted git repository.

jangho pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git


The following commit(s) were added to refs/heads/master by this push:
     new 04d88fe  [NEMO-136] Rename SchedulerRunner to TaskDispatcher (#97)
04d88fe is described below

commit 04d88fe3236e033cba4f08237f9a3c0b42753998
Author: John Yang <jo...@gmail.com>
AuthorDate: Thu Aug 9 15:24:06 2018 +0900

    [NEMO-136] Rename SchedulerRunner to TaskDispatcher (#97)
    
    JIRA: [NEMO-136: Rename SchedulerRunner to TaskDispatcher](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-136)
    
    **Major changes:**
    - Renames SchedulerRunner to TaskDispatcher
    
    **Minor changes to note:**
    - N/A
    
    **Tests for the changes:**
    - N/A (just renaming)
    
    **Other comments:**
    - N/A
    
    resolves [NEMO-136](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-136)
---
 .../runtime/master/scheduler/BatchScheduler.java   | 22 +++++++++---------
 .../{SchedulerRunner.java => TaskDispatcher.java}  | 26 +++++++++++-----------
 .../runtime/master/scheduler/TaskRetryTest.java    |  2 +-
 3 files changed, 25 insertions(+), 25 deletions(-)

diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchScheduler.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchScheduler.java
index ffee479..24c0e43 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchScheduler.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchScheduler.java
@@ -59,7 +59,7 @@ public final class BatchScheduler implements Scheduler {
   /**
    * Components related to scheduling the given plan.
    */
-  private final SchedulerRunner schedulerRunner;
+  private final TaskDispatcher taskDispatcher;
   private final PendingTaskCollectionPointer pendingTaskCollectionPointer;
   private final ExecutorRegistry executorRegistry;
 
@@ -77,13 +77,13 @@ public final class BatchScheduler implements Scheduler {
   private List<List<Stage>> sortedScheduleGroups;
 
   @Inject
-  private BatchScheduler(final SchedulerRunner schedulerRunner,
+  private BatchScheduler(final TaskDispatcher taskDispatcher,
                          final PendingTaskCollectionPointer pendingTaskCollectionPointer,
                          final BlockManagerMaster blockManagerMaster,
                          final PubSubEventHandlerWrapper pubSubEventHandlerWrapper,
                          final UpdatePhysicalPlanEventHandler updatePhysicalPlanEventHandler,
                          final ExecutorRegistry executorRegistry) {
-    this.schedulerRunner = schedulerRunner;
+    this.taskDispatcher = taskDispatcher;
     this.pendingTaskCollectionPointer = pendingTaskCollectionPointer;
     this.blockManagerMaster = blockManagerMaster;
     this.pubSubEventHandlerWrapper = pubSubEventHandlerWrapper;
@@ -106,7 +106,7 @@ public final class BatchScheduler implements Scheduler {
     this.physicalPlan = submittedPhysicalPlan;
     this.planStateManager = submittedPlanStateManager;
 
-    schedulerRunner.run(this.planStateManager);
+    taskDispatcher.run(this.planStateManager);
     LOG.info("Plan to schedule: {}", this.physicalPlan.getId());
 
     this.sortedScheduleGroups = this.physicalPlan.getStageDAG().getVertices().stream()
@@ -191,13 +191,13 @@ public final class BatchScheduler implements Scheduler {
           break;
       }
 
-      // Invoke schedulerRunner.onExecutorSlotAvailable()
+      // Invoke taskDispatcher.onExecutorSlotAvailable()
       switch (newState) {
         // These three states mean that a slot is made available.
         case COMPLETE:
         case ON_HOLD:
         case SHOULD_RETRY:
-          schedulerRunner.onExecutorSlotAvailable();
+          taskDispatcher.onExecutorSlotAvailable();
           break;
         default:
           break;
@@ -218,7 +218,7 @@ public final class BatchScheduler implements Scheduler {
   public void onExecutorAdded(final ExecutorRepresenter executorRepresenter) {
     LOG.info("{} added (node: {})", executorRepresenter.getExecutorId(), executorRepresenter.getNodeName());
     executorRegistry.registerExecutor(executorRepresenter);
-    schedulerRunner.onExecutorSlotAvailable();
+    taskDispatcher.onExecutorSlotAvailable();
   }
 
   @Override
@@ -242,7 +242,7 @@ public final class BatchScheduler implements Scheduler {
 
   @Override
   public void terminate() {
-    this.schedulerRunner.terminate();
+    this.taskDispatcher.terminate();
     this.executorRegistry.terminate();
   }
 
@@ -254,7 +254,7 @@ public final class BatchScheduler implements Scheduler {
    *
    * These are the reasons why.
    * - We 'reset' {@link PendingTaskCollectionPointer}, and not 'add' new tasks to it
-   * - We make {@link SchedulerRunner} run only tasks that are READY.
+   * - We make {@link TaskDispatcher} dispatch only the tasks that are READY.
    */
   private void doSchedule() {
     final Optional<List<Stage>> earliest = selectEarliestSchedulableGroup();
@@ -277,8 +277,8 @@ public final class BatchScheduler implements Scheduler {
       // Set the pointer to the schedulable tasks.
       pendingTaskCollectionPointer.setToOverwrite(tasksToSchedule);
 
-      // Notify the runner that a new collection is available.
-      schedulerRunner.onNewPendingTaskCollectionAvailable();
+      // Notify the dispatcher that a new collection is available.
+      taskDispatcher.onNewPendingTaskCollectionAvailable();
     } else {
       LOG.info("Skipping this round as no ScheduleGroup is schedulable.");
     }
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulerRunner.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/TaskDispatcher.java
similarity index 90%
rename from runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulerRunner.java
rename to runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/TaskDispatcher.java
index 750c0fc..6c0222c 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulerRunner.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/TaskDispatcher.java
@@ -37,15 +37,15 @@ import javax.annotation.concurrent.NotThreadSafe;
 import javax.inject.Inject;
 
 /**
- * Schedules tasks in discrete batches (scheduling iterations).
- * A scheduling iteration occurs under one of the following conditions
+ * Dispatches tasks to executors in discrete batches (dispatch iterations).
+ * A dispatch iteration occurs under one of the following conditions
  * - An executor slot becomes available (for reasons such as task completion/failure, or executor addition)
  * - A new list of tasks become available (for reasons such as stage completion, task failure, or executor removal)
  */
 @DriverSide
 @NotThreadSafe
-public final class SchedulerRunner {
-  private static final Logger LOG = LoggerFactory.getLogger(SchedulerRunner.class.getName());
+final class TaskDispatcher {
+  private static final Logger LOG = LoggerFactory.getLogger(TaskDispatcher.class.getName());
   private final Map<String, PlanStateManager> planStateManagers;
   private final PendingTaskCollectionPointer pendingTaskCollectionPointer;
   private final ExecutorService schedulerThread;
@@ -58,14 +58,14 @@ public final class SchedulerRunner {
   private final SchedulingPolicy schedulingPolicy;
 
   @Inject
-  private SchedulerRunner(final SchedulingConstraintRegistry schedulingConstraintRegistry,
-                          final SchedulingPolicy schedulingPolicy,
-                          final PendingTaskCollectionPointer pendingTaskCollectionPointer,
-                          final ExecutorRegistry executorRegistry) {
+  private TaskDispatcher(final SchedulingConstraintRegistry schedulingConstraintRegistry,
+                         final SchedulingPolicy schedulingPolicy,
+                         final PendingTaskCollectionPointer pendingTaskCollectionPointer,
+                         final ExecutorRegistry executorRegistry) {
     this.planStateManagers = new HashMap<>();
     this.pendingTaskCollectionPointer = pendingTaskCollectionPointer;
     this.schedulerThread = Executors.newSingleThreadExecutor(runnable ->
-        new Thread(runnable, "SchedulerRunner thread"));
+        new Thread(runnable, "TaskDispatcher thread"));
     this.isSchedulerRunning = false;
     this.isTerminated = false;
     this.executorRegistry = executorRegistry;
@@ -74,7 +74,7 @@ public final class SchedulerRunner {
   }
 
   /**
-   * A separate thread is run to schedule tasks to executors.
+   * A separate thread is run to dispatch tasks to executors.
    * See comments in the {@link Scheduler} for avoiding race conditions.
    */
   private final class SchedulerThread implements Runnable {
@@ -91,11 +91,11 @@ public final class SchedulerRunner {
           LOG.info("{} is incomplete.", planStateManager.getPlanId());
         }
       });
-      LOG.info("SchedulerRunner Terminated!");
+      LOG.info("TaskDispatcher Terminated!");
     }
   }
 
-  void doScheduleTaskList() {
+  private void doScheduleTaskList() {
     final Optional<Collection<Task>> taskListOptional = pendingTaskCollectionPointer.getAndSetNull();
     if (!taskListOptional.isPresent()) {
       // Task list is empty
@@ -162,7 +162,7 @@ public final class SchedulerRunner {
   }
 
   /**
-   * Run the scheduler thread.
+   * Run the dispatcher thread.
    */
   void run(final PlanStateManager planStateManager) {
     planStateManagers.put(planStateManager.getPlanId(), planStateManager);
diff --git a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/TaskRetryTest.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/TaskRetryTest.java
index e1ab449..2746bf5 100644
--- a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/TaskRetryTest.java
+++ b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/TaskRetryTest.java
@@ -57,7 +57,7 @@ import static org.mockito.Mockito.mock;
  * Tests fault tolerance.
  */
 @RunWith(PowerMockRunner.class)
-@PrepareForTest({BlockManagerMaster.class, SchedulerRunner.class, SchedulingConstraintRegistry.class,
+@PrepareForTest({BlockManagerMaster.class, TaskDispatcher.class, SchedulingConstraintRegistry.class,
     PubSubEventHandlerWrapper.class, UpdatePhysicalPlanEventHandler.class, MetricMessageHandler.class})
 public final class TaskRetryTest {
   @Rule public TestName testName = new TestName();