You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nemo.apache.org by ja...@apache.org on 2018/08/08 02:37:38 UTC

[incubator-nemo] 01/02: remove compelteHeldTask

This is an automated email from the ASF dual-hosted git repository.

jangho pushed a commit to branch nemo-175
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git

commit 1a43e79180de615923f88fdac67bc567cfd39812
Author: Jangho Seo <ja...@jangho.io>
AuthorDate: Wed Aug 8 11:25:01 2018 +0900

    remove compelteHeldTask
---
 .../UpdatePhysicalPlanEventHandler.java            |  1 -
 .../master/scheduler/BatchSingleJobScheduler.java  | 25 ++++++----------------
 .../nemo/runtime/master/scheduler/Scheduler.java   |  8 -------
 3 files changed, 7 insertions(+), 27 deletions(-)

diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/eventhandler/UpdatePhysicalPlanEventHandler.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/eventhandler/UpdatePhysicalPlanEventHandler.java
index a81ebeb..4047cbb 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/eventhandler/UpdatePhysicalPlanEventHandler.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/eventhandler/UpdatePhysicalPlanEventHandler.java
@@ -49,6 +49,5 @@ public final class UpdatePhysicalPlanEventHandler implements CompilerEventHandle
     final PhysicalPlan newPlan = updatePhysicalPlanEvent.getNewPhysicalPlan();
 
     this.scheduler.updateJob(newPlan.getId(), newPlan);
-    this.scheduler.completeHeldTask(updatePhysicalPlanEvent.getTaskId(), updatePhysicalPlanEvent.getExecutorId());
   }
 }
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java
index fb29918..57db43b 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java
@@ -130,12 +130,6 @@ public final class BatchSingleJobScheduler implements Scheduler {
     this.physicalPlan = newPhysicalPlan;
   }
 
-  @Override
-  public void completeHeldTask(final String taskId, final String executorId) {
-    onTaskExecutionComplete(executorId, taskId, true);
-    doSchedule();
-  }
-
   /**
    * Handles task state transition notifications sent from executors.
    * Note that we can receive notifications for previous task attempts, due to the nature of asynchronous events.
@@ -161,7 +155,7 @@ public final class BatchSingleJobScheduler implements Scheduler {
       jobStateManager.onTaskStateChanged(taskId, newState);
       switch (newState) {
         case COMPLETE:
-          onTaskExecutionComplete(executorId, taskId, false);
+          onTaskExecutionComplete(executorId, taskId);
           break;
         case SHOULD_RETRY:
           // SHOULD_RETRY from an executor means that the task ran into a recoverable failure
@@ -361,20 +355,17 @@ public final class BatchSingleJobScheduler implements Scheduler {
 
   /**
    * Action after task execution has been completed.
+   * Note this method should not be invoked when the previous state of the task is ON_HOLD.
    * @param executorId id of the executor.
    * @param taskId the ID of the task completed.
-   * @param isOnHoldToComplete whether or not if it is switched to complete after it has been on hold.
    */
   private void onTaskExecutionComplete(final String executorId,
-                                       final String taskId,
-                                       final boolean isOnHoldToComplete) {
+                                       final String taskId) {
     LOG.debug("{} completed in {}", new Object[]{taskId, executorId});
-    if (!isOnHoldToComplete) {
-      executorRegistry.updateExecutor(executorId, (executor, state) -> {
-        executor.onTaskExecutionComplete(taskId);
-        return Pair.of(executor, state);
-      });
-    }
+    executorRegistry.updateExecutor(executorId, (executor, state) -> {
+      executor.onTaskExecutionComplete(taskId);
+      return Pair.of(executor, state);
+    });
   }
 
   /**
@@ -411,8 +402,6 @@ public final class BatchSingleJobScheduler implements Scheduler {
 
       pubSubEventHandlerWrapper.getPubSubEventHandler().onNext(
           new DynamicOptimizationEvent(physicalPlan, metricCollectionBarrierVertex, taskId, executorId));
-    } else {
-      onTaskExecutionComplete(executorId, taskId, true);
     }
   }
 
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/Scheduler.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/Scheduler.java
index 83f5641..aaa82e0 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/Scheduler.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/Scheduler.java
@@ -50,14 +50,6 @@ public interface Scheduler {
   void updateJob(String jobId, PhysicalPlan newPhysicalPlan);
 
   /**
-   * Completes a task which is in ON_HOLD state.
-   *
-   * @param taskId id of the task
-   * @param executorId id of the executor which executes the task
-   */
-  void completeHeldTask(String taskId, String executorId);
-
-  /**
    * Called when an executor is added to Runtime, so that the extra resource can be used to execute the job.
    * @param executorRepresenter a representation of the added executor.
    */