You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nemo.apache.org by ja...@apache.org on 2018/08/08 02:37:38 UTC
[incubator-nemo] 01/02: remove compelteHeldTask
This is an automated email from the ASF dual-hosted git repository.
jangho pushed a commit to branch nemo-175
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git
commit 1a43e79180de615923f88fdac67bc567cfd39812
Author: Jangho Seo <ja...@jangho.io>
AuthorDate: Wed Aug 8 11:25:01 2018 +0900
remove compelteHeldTask
---
.../UpdatePhysicalPlanEventHandler.java | 1 -
.../master/scheduler/BatchSingleJobScheduler.java | 25 ++++++----------------
.../nemo/runtime/master/scheduler/Scheduler.java | 8 -------
3 files changed, 7 insertions(+), 27 deletions(-)
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/eventhandler/UpdatePhysicalPlanEventHandler.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/eventhandler/UpdatePhysicalPlanEventHandler.java
index a81ebeb..4047cbb 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/eventhandler/UpdatePhysicalPlanEventHandler.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/eventhandler/UpdatePhysicalPlanEventHandler.java
@@ -49,6 +49,5 @@ public final class UpdatePhysicalPlanEventHandler implements CompilerEventHandle
final PhysicalPlan newPlan = updatePhysicalPlanEvent.getNewPhysicalPlan();
this.scheduler.updateJob(newPlan.getId(), newPlan);
- this.scheduler.completeHeldTask(updatePhysicalPlanEvent.getTaskId(), updatePhysicalPlanEvent.getExecutorId());
}
}
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java
index fb29918..57db43b 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java
@@ -130,12 +130,6 @@ public final class BatchSingleJobScheduler implements Scheduler {
this.physicalPlan = newPhysicalPlan;
}
- @Override
- public void completeHeldTask(final String taskId, final String executorId) {
- onTaskExecutionComplete(executorId, taskId, true);
- doSchedule();
- }
-
/**
* Handles task state transition notifications sent from executors.
* Note that we can receive notifications for previous task attempts, due to the nature of asynchronous events.
@@ -161,7 +155,7 @@ public final class BatchSingleJobScheduler implements Scheduler {
jobStateManager.onTaskStateChanged(taskId, newState);
switch (newState) {
case COMPLETE:
- onTaskExecutionComplete(executorId, taskId, false);
+ onTaskExecutionComplete(executorId, taskId);
break;
case SHOULD_RETRY:
// SHOULD_RETRY from an executor means that the task ran into a recoverable failure
@@ -361,20 +355,17 @@ public final class BatchSingleJobScheduler implements Scheduler {
/**
* Action after task execution has been completed.
+ * Note this method should not be invoked when the previous state of the task is ON_HOLD.
* @param executorId id of the executor.
* @param taskId the ID of the task completed.
- * @param isOnHoldToComplete whether or not if it is switched to complete after it has been on hold.
*/
private void onTaskExecutionComplete(final String executorId,
- final String taskId,
- final boolean isOnHoldToComplete) {
+ final String taskId) {
LOG.debug("{} completed in {}", new Object[]{taskId, executorId});
- if (!isOnHoldToComplete) {
- executorRegistry.updateExecutor(executorId, (executor, state) -> {
- executor.onTaskExecutionComplete(taskId);
- return Pair.of(executor, state);
- });
- }
+ executorRegistry.updateExecutor(executorId, (executor, state) -> {
+ executor.onTaskExecutionComplete(taskId);
+ return Pair.of(executor, state);
+ });
}
/**
@@ -411,8 +402,6 @@ public final class BatchSingleJobScheduler implements Scheduler {
pubSubEventHandlerWrapper.getPubSubEventHandler().onNext(
new DynamicOptimizationEvent(physicalPlan, metricCollectionBarrierVertex, taskId, executorId));
- } else {
- onTaskExecutionComplete(executorId, taskId, true);
}
}
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/Scheduler.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/Scheduler.java
index 83f5641..aaa82e0 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/Scheduler.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/Scheduler.java
@@ -50,14 +50,6 @@ public interface Scheduler {
void updateJob(String jobId, PhysicalPlan newPhysicalPlan);
/**
- * Completes a task which is in ON_HOLD state.
- *
- * @param taskId id of the task
- * @param executorId id of the executor which executes the task
- */
- void completeHeldTask(String taskId, String executorId);
-
- /**
* Called when an executor is added to Runtime, so that the extra resource can be used to execute the job.
* @param executorRepresenter a representation of the added executor.
*/