You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nemo.apache.org by jo...@apache.org on 2018/08/08 07:45:05 UTC
[incubator-nemo] branch master updated: [NEMO-175] Remove
'taskInfo', add Scheduler#completeHeldTask (#89)
This is an automated email from the ASF dual-hosted git repository.
johnyangk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git
The following commit(s) were added to refs/heads/master by this push:
new 661497e [NEMO-175] Remove 'taskInfo', add Scheduler#completeHeldTask (#89)
661497e is described below
commit 661497ed694856a1157f3e362724e765639840f6
Author: Jangho Seo <ja...@jangho.io>
AuthorDate: Wed Aug 8 16:45:02 2018 +0900
[NEMO-175] Remove 'taskInfo', add Scheduler#completeHeldTask (#89)
JIRA: [NEMO-175](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-175)
**Major changes:**
- N/A
**Minor changes to note:**
- Remove taskInfo, which is a pair of taskId and executorId. We'd better simply put two strings in separate arguments.
- In Scheduler, 'updatePhysicalPlan' optionally have compelted an ON_HOLD task if 'taskInfo' is not null. This PR adds Scheduler#completeHeldTask for this operation.
**Tests for the changes:**
- Existing tasks should work.
**Other comments:**
- N/A
resolves [NEMO-175](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-175)
---
.../eventhandler/DynamicOptimizationEvent.java | 26 ++++++++++++++-------
.../DynamicOptimizationEventHandler.java | 6 ++---
.../eventhandler/UpdatePhysicalPlanEvent.java | 26 ++++++++++++++-------
.../UpdatePhysicalPlanEventHandler.java | 4 +---
.../master/scheduler/BatchSingleJobScheduler.java | 27 ++++++++--------------
.../nemo/runtime/master/scheduler/Scheduler.java | 5 +---
6 files changed, 49 insertions(+), 45 deletions(-)
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/eventhandler/DynamicOptimizationEvent.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/eventhandler/DynamicOptimizationEvent.java
index e9bd6eb..50df799 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/eventhandler/DynamicOptimizationEvent.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/eventhandler/DynamicOptimizationEvent.java
@@ -18,7 +18,6 @@
*/
package edu.snu.nemo.runtime.common.eventhandler;
-import edu.snu.nemo.common.Pair;
import edu.snu.nemo.common.eventhandler.RuntimeEvent;
import edu.snu.nemo.common.ir.vertex.MetricCollectionBarrierVertex;
import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
@@ -29,20 +28,24 @@ import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
public final class DynamicOptimizationEvent implements RuntimeEvent {
private final PhysicalPlan physicalPlan;
private final MetricCollectionBarrierVertex metricCollectionBarrierVertex;
- private final Pair<String, String> taskInfo;
+ private final String taskId;
+ private final String executorId;
/**
* Default constructor.
* @param physicalPlan physical plan to be optimized.
* @param metricCollectionBarrierVertex metric collection barrier vertex to retrieve metric data from.
- * @param taskInfo information of the task.
+ * @param taskId id of the task which triggered the dynamic optimization.
+ * @param executorId the id of executor which executes {@code taskId}
*/
public DynamicOptimizationEvent(final PhysicalPlan physicalPlan,
final MetricCollectionBarrierVertex metricCollectionBarrierVertex,
- final Pair<String, String> taskInfo) {
+ final String taskId,
+ final String executorId) {
this.physicalPlan = physicalPlan;
this.metricCollectionBarrierVertex = metricCollectionBarrierVertex;
- this.taskInfo = taskInfo;
+ this.taskId = taskId;
+ this.executorId = executorId;
}
/**
@@ -60,9 +63,16 @@ public final class DynamicOptimizationEvent implements RuntimeEvent {
}
/**
- * @return the information of the task at which this optimization occurs: its name and its task ID.
+ * @return id of the task which triggered the dynamic optimization
*/
- public Pair<String, String> getTaskInfo() {
- return this.taskInfo;
+ public String getTaskId() {
+ return taskId;
+ }
+
+ /**
+ * @return id of the executor which triggered the dynamic optimization
+ */
+ public String getExecutorId() {
+ return executorId;
}
}
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/eventhandler/DynamicOptimizationEventHandler.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/eventhandler/DynamicOptimizationEventHandler.java
index f4f7d22..56ce7c1 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/eventhandler/DynamicOptimizationEventHandler.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/eventhandler/DynamicOptimizationEventHandler.java
@@ -18,7 +18,6 @@
*/
package edu.snu.nemo.runtime.common.eventhandler;
-import edu.snu.nemo.common.Pair;
import edu.snu.nemo.common.eventhandler.PubSubEventHandlerWrapper;
import edu.snu.nemo.common.eventhandler.RuntimeEventHandler;
import edu.snu.nemo.common.ir.vertex.MetricCollectionBarrierVertex;
@@ -54,11 +53,10 @@ public final class DynamicOptimizationEventHandler implements RuntimeEventHandle
final MetricCollectionBarrierVertex metricCollectionBarrierVertex =
dynamicOptimizationEvent.getMetricCollectionBarrierVertex();
- final Pair<String, String> taskInfo = dynamicOptimizationEvent.getTaskInfo();
-
final PhysicalPlan newPlan = RunTimeOptimizer.dynamicOptimization(physicalPlan,
metricCollectionBarrierVertex);
- pubSubEventHandler.onNext(new UpdatePhysicalPlanEvent(newPlan, taskInfo));
+ pubSubEventHandler.onNext(new UpdatePhysicalPlanEvent(newPlan, dynamicOptimizationEvent.getTaskId(),
+ dynamicOptimizationEvent.getExecutorId()));
}
}
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/eventhandler/UpdatePhysicalPlanEvent.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/eventhandler/UpdatePhysicalPlanEvent.java
index 1333f98..a8f701b 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/eventhandler/UpdatePhysicalPlanEvent.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/eventhandler/UpdatePhysicalPlanEvent.java
@@ -15,7 +15,6 @@
*/
package edu.snu.nemo.runtime.common.eventhandler;
-import edu.snu.nemo.common.Pair;
import edu.snu.nemo.common.eventhandler.CompilerEvent;
import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
@@ -24,17 +23,21 @@ import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
*/
public final class UpdatePhysicalPlanEvent implements CompilerEvent {
private final PhysicalPlan newPhysicalPlan;
- private final Pair<String, String> taskInfo;
+ private final String taskId;
+ private final String executorId;
/**
* Constructor.
* @param newPhysicalPlan the newly optimized physical plan.
- * @param taskInfo information of the task at which this optimization occurs: its name and its task ID.
+ * @param taskId id of the task which triggered the dynamic optimization.
+ * @param executorId the id of executor which executes {@code taskId}
*/
UpdatePhysicalPlanEvent(final PhysicalPlan newPhysicalPlan,
- final Pair<String, String> taskInfo) {
+ final String taskId,
+ final String executorId) {
this.newPhysicalPlan = newPhysicalPlan;
- this.taskInfo = taskInfo;
+ this.taskId = taskId;
+ this.executorId = executorId;
}
/**
@@ -45,9 +48,16 @@ public final class UpdatePhysicalPlanEvent implements CompilerEvent {
}
/**
- * @return the information of the task at which this optimization occurs: its name and its task ID.
+ * @return id of the task which triggered the dynamic optimization
*/
- public Pair<String, String> getTaskInfo() {
- return this.taskInfo;
+ public String getTaskId() {
+ return taskId;
+ }
+
+ /**
+ * @return id of the executor which triggered the dynamic optimization
+ */
+ public String getExecutorId() {
+ return executorId;
}
}
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/eventhandler/UpdatePhysicalPlanEventHandler.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/eventhandler/UpdatePhysicalPlanEventHandler.java
index 2ddb818..4047cbb 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/eventhandler/UpdatePhysicalPlanEventHandler.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/eventhandler/UpdatePhysicalPlanEventHandler.java
@@ -18,7 +18,6 @@
*/
package edu.snu.nemo.runtime.master.eventhandler;
-import edu.snu.nemo.common.Pair;
import edu.snu.nemo.common.eventhandler.CompilerEventHandler;
import edu.snu.nemo.runtime.common.eventhandler.UpdatePhysicalPlanEvent;
import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
@@ -48,8 +47,7 @@ public final class UpdatePhysicalPlanEventHandler implements CompilerEventHandle
@Override
public void onNext(final UpdatePhysicalPlanEvent updatePhysicalPlanEvent) {
final PhysicalPlan newPlan = updatePhysicalPlanEvent.getNewPhysicalPlan();
- final Pair<String, String> taskInfo = updatePhysicalPlanEvent.getTaskInfo();
- this.scheduler.updateJob(newPlan.getId(), newPlan, taskInfo);
+ this.scheduler.updateJob(newPlan.getId(), newPlan);
}
}
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java
index 3780902..57db43b 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java
@@ -124,14 +124,10 @@ public final class BatchSingleJobScheduler implements Scheduler {
}
@Override
- public void updateJob(final String jobId, final PhysicalPlan newPhysicalPlan, final Pair<String, String> taskInfo) {
+ public void updateJob(final String jobId, final PhysicalPlan newPhysicalPlan) {
// update the job in the scheduler.
// NOTE: what's already been executed is not modified in the new physical plan.
this.physicalPlan = newPhysicalPlan;
- if (taskInfo != null) {
- onTaskExecutionComplete(taskInfo.left(), taskInfo.right(), true);
- doSchedule();
- }
}
/**
@@ -159,7 +155,7 @@ public final class BatchSingleJobScheduler implements Scheduler {
jobStateManager.onTaskStateChanged(taskId, newState);
switch (newState) {
case COMPLETE:
- onTaskExecutionComplete(executorId, taskId, false);
+ onTaskExecutionComplete(executorId, taskId);
break;
case SHOULD_RETRY:
// SHOULD_RETRY from an executor means that the task ran into a recoverable failure
@@ -359,20 +355,17 @@ public final class BatchSingleJobScheduler implements Scheduler {
/**
* Action after task execution has been completed.
+ * Note this method should not be invoked when the previous state of the task is ON_HOLD.
* @param executorId id of the executor.
* @param taskId the ID of the task completed.
- * @param isOnHoldToComplete whether or not if it is switched to complete after it has been on hold.
*/
private void onTaskExecutionComplete(final String executorId,
- final String taskId,
- final boolean isOnHoldToComplete) {
+ final String taskId) {
LOG.debug("{} completed in {}", new Object[]{taskId, executorId});
- if (!isOnHoldToComplete) {
- executorRegistry.updateExecutor(executorId, (executor, state) -> {
- executor.onTaskExecutionComplete(taskId);
- return Pair.of(executor, state);
- });
- }
+ executorRegistry.updateExecutor(executorId, (executor, state) -> {
+ executor.onTaskExecutionComplete(taskId);
+ return Pair.of(executor, state);
+ });
}
/**
@@ -408,9 +401,7 @@ public final class BatchSingleJobScheduler implements Scheduler {
// and we will use this vertex to perform metric collection and dynamic optimization.
pubSubEventHandlerWrapper.getPubSubEventHandler().onNext(
- new DynamicOptimizationEvent(physicalPlan, metricCollectionBarrierVertex, Pair.of(executorId, taskId)));
- } else {
- onTaskExecutionComplete(executorId, taskId, true);
+ new DynamicOptimizationEvent(physicalPlan, metricCollectionBarrierVertex, taskId, executorId));
}
}
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/Scheduler.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/Scheduler.java
index 28b52db..aaa82e0 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/Scheduler.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/Scheduler.java
@@ -15,7 +15,6 @@
*/
package edu.snu.nemo.runtime.master.scheduler;
-import edu.snu.nemo.common.Pair;
import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
import edu.snu.nemo.runtime.common.state.TaskState;
import edu.snu.nemo.runtime.master.JobStateManager;
@@ -47,10 +46,8 @@ public interface Scheduler {
* Receives and updates the scheduler with a new physical plan for a job.
* @param jobId the ID of the job to change the physical plan.
* @param newPhysicalPlan new physical plan for the job.
- * @param taskInfo pair containing the information of the executor id and task id to mark as complete after the
- * update.
*/
- void updateJob(String jobId, PhysicalPlan newPhysicalPlan, Pair<String, String> taskInfo);
+ void updateJob(String jobId, PhysicalPlan newPhysicalPlan);
/**
* Called when an executor is added to Runtime, so that the extra resource can be used to execute the job.