You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nemo.apache.org by jo...@apache.org on 2018/08/08 07:45:05 UTC

[incubator-nemo] branch master updated: [NEMO-175] Remove 'taskInfo', add Scheduler#completeHeldTask (#89)

This is an automated email from the ASF dual-hosted git repository.

johnyangk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git


The following commit(s) were added to refs/heads/master by this push:
     new 661497e  [NEMO-175] Remove 'taskInfo', add Scheduler#completeHeldTask (#89)
661497e is described below

commit 661497ed694856a1157f3e362724e765639840f6
Author: Jangho Seo <ja...@jangho.io>
AuthorDate: Wed Aug 8 16:45:02 2018 +0900

    [NEMO-175] Remove 'taskInfo', add Scheduler#completeHeldTask (#89)
    
    JIRA: [NEMO-175](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-175)
    
    **Major changes:**
    - N/A
    
    **Minor changes to note:**
    - Remove taskInfo, which is a pair of taskId and executorId. We'd better simply put two strings in separate arguments.
    - In Scheduler, 'updatePhysicalPlan' optionally have compelted an ON_HOLD task if 'taskInfo' is not null. This PR adds Scheduler#completeHeldTask for this operation.
    
    **Tests for the changes:**
    - Existing tasks should work.
    
    **Other comments:**
    - N/A
    
    resolves [NEMO-175](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-175)
---
 .../eventhandler/DynamicOptimizationEvent.java     | 26 ++++++++++++++-------
 .../DynamicOptimizationEventHandler.java           |  6 ++---
 .../eventhandler/UpdatePhysicalPlanEvent.java      | 26 ++++++++++++++-------
 .../UpdatePhysicalPlanEventHandler.java            |  4 +---
 .../master/scheduler/BatchSingleJobScheduler.java  | 27 ++++++++--------------
 .../nemo/runtime/master/scheduler/Scheduler.java   |  5 +---
 6 files changed, 49 insertions(+), 45 deletions(-)

diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/eventhandler/DynamicOptimizationEvent.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/eventhandler/DynamicOptimizationEvent.java
index e9bd6eb..50df799 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/eventhandler/DynamicOptimizationEvent.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/eventhandler/DynamicOptimizationEvent.java
@@ -18,7 +18,6 @@
  */
 package edu.snu.nemo.runtime.common.eventhandler;
 
-import edu.snu.nemo.common.Pair;
 import edu.snu.nemo.common.eventhandler.RuntimeEvent;
 import edu.snu.nemo.common.ir.vertex.MetricCollectionBarrierVertex;
 import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
@@ -29,20 +28,24 @@ import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
 public final class DynamicOptimizationEvent implements RuntimeEvent {
   private final PhysicalPlan physicalPlan;
   private final MetricCollectionBarrierVertex metricCollectionBarrierVertex;
-  private final Pair<String, String> taskInfo;
+  private final String taskId;
+  private final String executorId;
 
   /**
    * Default constructor.
    * @param physicalPlan physical plan to be optimized.
    * @param metricCollectionBarrierVertex metric collection barrier vertex to retrieve metric data from.
-   * @param taskInfo information of the task.
+   * @param taskId id of the task which triggered the dynamic optimization.
+   * @param executorId the id of executor which executes {@code taskId}
    */
   public DynamicOptimizationEvent(final PhysicalPlan physicalPlan,
                                   final MetricCollectionBarrierVertex metricCollectionBarrierVertex,
-                                  final Pair<String, String> taskInfo) {
+                                  final String taskId,
+                                  final String executorId) {
     this.physicalPlan = physicalPlan;
     this.metricCollectionBarrierVertex = metricCollectionBarrierVertex;
-    this.taskInfo = taskInfo;
+    this.taskId = taskId;
+    this.executorId = executorId;
   }
 
   /**
@@ -60,9 +63,16 @@ public final class DynamicOptimizationEvent implements RuntimeEvent {
   }
 
   /**
-   * @return the information of the task at which this optimization occurs: its name and its task ID.
+   * @return id of the task which triggered the dynamic optimization
    */
-  public Pair<String, String> getTaskInfo() {
-    return this.taskInfo;
+  public String getTaskId() {
+    return taskId;
+  }
+
+  /**
+   * @return id of the executor which triggered the dynamic optimization
+   */
+  public String getExecutorId() {
+    return executorId;
   }
 }
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/eventhandler/DynamicOptimizationEventHandler.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/eventhandler/DynamicOptimizationEventHandler.java
index f4f7d22..56ce7c1 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/eventhandler/DynamicOptimizationEventHandler.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/eventhandler/DynamicOptimizationEventHandler.java
@@ -18,7 +18,6 @@
  */
 package edu.snu.nemo.runtime.common.eventhandler;
 
-import edu.snu.nemo.common.Pair;
 import edu.snu.nemo.common.eventhandler.PubSubEventHandlerWrapper;
 import edu.snu.nemo.common.eventhandler.RuntimeEventHandler;
 import edu.snu.nemo.common.ir.vertex.MetricCollectionBarrierVertex;
@@ -54,11 +53,10 @@ public final class DynamicOptimizationEventHandler implements RuntimeEventHandle
     final MetricCollectionBarrierVertex metricCollectionBarrierVertex =
             dynamicOptimizationEvent.getMetricCollectionBarrierVertex();
 
-    final Pair<String, String> taskInfo = dynamicOptimizationEvent.getTaskInfo();
-
     final PhysicalPlan newPlan = RunTimeOptimizer.dynamicOptimization(physicalPlan,
         metricCollectionBarrierVertex);
 
-    pubSubEventHandler.onNext(new UpdatePhysicalPlanEvent(newPlan, taskInfo));
+    pubSubEventHandler.onNext(new UpdatePhysicalPlanEvent(newPlan, dynamicOptimizationEvent.getTaskId(),
+        dynamicOptimizationEvent.getExecutorId()));
   }
 }
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/eventhandler/UpdatePhysicalPlanEvent.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/eventhandler/UpdatePhysicalPlanEvent.java
index 1333f98..a8f701b 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/eventhandler/UpdatePhysicalPlanEvent.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/eventhandler/UpdatePhysicalPlanEvent.java
@@ -15,7 +15,6 @@
  */
 package edu.snu.nemo.runtime.common.eventhandler;
 
-import edu.snu.nemo.common.Pair;
 import edu.snu.nemo.common.eventhandler.CompilerEvent;
 import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
 
@@ -24,17 +23,21 @@ import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
  */
 public final class UpdatePhysicalPlanEvent implements CompilerEvent {
   private final PhysicalPlan newPhysicalPlan;
-  private final Pair<String, String> taskInfo;
+  private final String taskId;
+  private final String executorId;
 
   /**
    * Constructor.
    * @param newPhysicalPlan the newly optimized physical plan.
-   * @param taskInfo information of the task at which this optimization occurs: its name and its task ID.
+   * @param taskId id of the task which triggered the dynamic optimization.
+   * @param executorId the id of executor which executes {@code taskId}
    */
   UpdatePhysicalPlanEvent(final PhysicalPlan newPhysicalPlan,
-                          final Pair<String, String> taskInfo) {
+                          final String taskId,
+                          final String executorId) {
     this.newPhysicalPlan = newPhysicalPlan;
-    this.taskInfo = taskInfo;
+    this.taskId = taskId;
+    this.executorId = executorId;
   }
 
   /**
@@ -45,9 +48,16 @@ public final class UpdatePhysicalPlanEvent implements CompilerEvent {
   }
 
   /**
-   * @return the information of the task at which this optimization occurs: its name and its task ID.
+   * @return id of the task which triggered the dynamic optimization
    */
-  public Pair<String, String> getTaskInfo() {
-    return this.taskInfo;
+  public String getTaskId() {
+    return taskId;
+  }
+
+  /**
+   * @return id of the executor which triggered the dynamic optimization
+   */
+  public String getExecutorId() {
+    return executorId;
   }
 }
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/eventhandler/UpdatePhysicalPlanEventHandler.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/eventhandler/UpdatePhysicalPlanEventHandler.java
index 2ddb818..4047cbb 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/eventhandler/UpdatePhysicalPlanEventHandler.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/eventhandler/UpdatePhysicalPlanEventHandler.java
@@ -18,7 +18,6 @@
  */
 package edu.snu.nemo.runtime.master.eventhandler;
 
-import edu.snu.nemo.common.Pair;
 import edu.snu.nemo.common.eventhandler.CompilerEventHandler;
 import edu.snu.nemo.runtime.common.eventhandler.UpdatePhysicalPlanEvent;
 import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
@@ -48,8 +47,7 @@ public final class UpdatePhysicalPlanEventHandler implements CompilerEventHandle
   @Override
   public void onNext(final UpdatePhysicalPlanEvent updatePhysicalPlanEvent) {
     final PhysicalPlan newPlan = updatePhysicalPlanEvent.getNewPhysicalPlan();
-    final Pair<String, String> taskInfo = updatePhysicalPlanEvent.getTaskInfo();
 
-    this.scheduler.updateJob(newPlan.getId(), newPlan, taskInfo);
+    this.scheduler.updateJob(newPlan.getId(), newPlan);
   }
 }
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java
index 3780902..57db43b 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java
@@ -124,14 +124,10 @@ public final class BatchSingleJobScheduler implements Scheduler {
   }
 
   @Override
-  public void updateJob(final String jobId, final PhysicalPlan newPhysicalPlan, final Pair<String, String> taskInfo) {
+  public void updateJob(final String jobId, final PhysicalPlan newPhysicalPlan) {
     // update the job in the scheduler.
     // NOTE: what's already been executed is not modified in the new physical plan.
     this.physicalPlan = newPhysicalPlan;
-    if (taskInfo != null) {
-      onTaskExecutionComplete(taskInfo.left(), taskInfo.right(), true);
-      doSchedule();
-    }
   }
 
   /**
@@ -159,7 +155,7 @@ public final class BatchSingleJobScheduler implements Scheduler {
       jobStateManager.onTaskStateChanged(taskId, newState);
       switch (newState) {
         case COMPLETE:
-          onTaskExecutionComplete(executorId, taskId, false);
+          onTaskExecutionComplete(executorId, taskId);
           break;
         case SHOULD_RETRY:
           // SHOULD_RETRY from an executor means that the task ran into a recoverable failure
@@ -359,20 +355,17 @@ public final class BatchSingleJobScheduler implements Scheduler {
 
   /**
    * Action after task execution has been completed.
+   * Note this method should not be invoked when the previous state of the task is ON_HOLD.
    * @param executorId id of the executor.
    * @param taskId the ID of the task completed.
-   * @param isOnHoldToComplete whether or not if it is switched to complete after it has been on hold.
    */
   private void onTaskExecutionComplete(final String executorId,
-                                       final String taskId,
-                                       final boolean isOnHoldToComplete) {
+                                       final String taskId) {
     LOG.debug("{} completed in {}", new Object[]{taskId, executorId});
-    if (!isOnHoldToComplete) {
-      executorRegistry.updateExecutor(executorId, (executor, state) -> {
-        executor.onTaskExecutionComplete(taskId);
-        return Pair.of(executor, state);
-      });
-    }
+    executorRegistry.updateExecutor(executorId, (executor, state) -> {
+      executor.onTaskExecutionComplete(taskId);
+      return Pair.of(executor, state);
+    });
   }
 
   /**
@@ -408,9 +401,7 @@ public final class BatchSingleJobScheduler implements Scheduler {
       // and we will use this vertex to perform metric collection and dynamic optimization.
 
       pubSubEventHandlerWrapper.getPubSubEventHandler().onNext(
-          new DynamicOptimizationEvent(physicalPlan, metricCollectionBarrierVertex, Pair.of(executorId, taskId)));
-    } else {
-      onTaskExecutionComplete(executorId, taskId, true);
+          new DynamicOptimizationEvent(physicalPlan, metricCollectionBarrierVertex, taskId, executorId));
     }
   }
 
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/Scheduler.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/Scheduler.java
index 28b52db..aaa82e0 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/Scheduler.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/Scheduler.java
@@ -15,7 +15,6 @@
  */
 package edu.snu.nemo.runtime.master.scheduler;
 
-import edu.snu.nemo.common.Pair;
 import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
 import edu.snu.nemo.runtime.common.state.TaskState;
 import edu.snu.nemo.runtime.master.JobStateManager;
@@ -47,10 +46,8 @@ public interface Scheduler {
    * Receives and updates the scheduler with a new physical plan for a job.
    * @param jobId the ID of the job to change the physical plan.
    * @param newPhysicalPlan new physical plan for the job.
-   * @param taskInfo pair containing the information of the executor id and task id to mark as complete after the
-   *                 update.
    */
-  void updateJob(String jobId, PhysicalPlan newPhysicalPlan, Pair<String, String> taskInfo);
+  void updateJob(String jobId, PhysicalPlan newPhysicalPlan);
 
   /**
    * Called when an executor is added to Runtime, so that the extra resource can be used to execute the job.