You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2015/12/27 07:40:40 UTC
tez git commit: TEZ-2914. Ability to limit vertex concurrency (bikas)
(cherry picked from commit 34eb75d709bbe6e0417f9b4023f4fa1cec81bd8b)
Repository: tez
Updated Branches:
refs/heads/branch-0.7 1bd2f63c4 -> 73d5ce6e2
TEZ-2914. Ability to limit vertex concurrency (bikas)
(cherry picked from commit 34eb75d709bbe6e0417f9b4023f4fa1cec81bd8b)
Conflicts:
CHANGES.txt
tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/73d5ce6e
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/73d5ce6e
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/73d5ce6e
Branch: refs/heads/branch-0.7
Commit: 73d5ce6e29167199dbdb616e599428390db09854
Parents: 1bd2f63
Author: Bikas Saha <bi...@apache.org>
Authored: Fri Dec 25 19:39:04 2015 -0800
Committer: Bikas Saha <bi...@apache.org>
Committed: Sat Dec 26 22:40:15 2015 -0800
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/tez/dag/api/TezConfiguration.java | 11 +-
.../apache/tez/dag/app/dag/DAGScheduler.java | 68 +++++++++-
.../java/org/apache/tez/dag/app/dag/Vertex.java | 2 +-
.../app/dag/event/DAGEventSchedulerUpdate.java | 3 +-
.../DAGEventSchedulerUpdateTAAssigned.java | 36 ------
.../apache/tez/dag/app/dag/impl/DAGImpl.java | 17 +--
.../app/dag/impl/DAGSchedulerNaturalOrder.java | 15 +--
.../DAGSchedulerNaturalOrderControlled.java | 15 +--
.../apache/tez/dag/app/dag/impl/TaskImpl.java | 43 ++-----
.../apache/tez/dag/app/dag/impl/VertexImpl.java | 6 +
.../dag/app/rm/TaskSchedulerEventHandler.java | 2 -
.../apache/tez/dag/app/MockDAGAppMaster.java | 13 +-
.../tez/dag/app/TestMockDAGAppMaster.java | 48 +++++++
.../tez/dag/app/dag/impl/TestDAGImpl.java | 4 -
.../tez/dag/app/dag/impl/TestDAGScheduler.java | 127 ++++++++++++++++++-
.../TestDAGSchedulerNaturalOrderControlled.java | 38 +++---
.../app/rm/TestTaskSchedulerEventHandler.java | 6 +-
18 files changed, 303 insertions(+), 152 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/73d5ce6e/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 2489efe..3a5ac29 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -6,6 +6,7 @@ INCOMPATIBLE CHANGES
TEZ-2949. Allow duplicate dag names within session for Tez.
ALL CHANGES
+ TEZ-2914. Ability to limit vertex concurrency
TEZ-3011. Link Vertex Name in Dag Tasks/Task Attempts to Vertex
TEZ-2538. ADDITIONAL_SPILL_COUNT wrongly populated for DefaultSorter with multiple partitions.
TEZ-3006. Remove unused import in TestHistoryParser.
http://git-wip-us.apache.org/repos/asf/tez/blob/73d5ce6e/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index a001735..8997327 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -503,7 +503,16 @@ public class TezConfiguration extends Configuration {
public static final String TEZ_AM_MAX_APP_ATTEMPTS = TEZ_AM_PREFIX +
"max.app.attempts";
public static final int TEZ_AM_MAX_APP_ATTEMPTS_DEFAULT = 2;
-
+
+ /**
+ * Int value. The maximum number of attempts that can run concurrently for a given vertex.
+ * Setting <=0 implies no limit
+ */
+ @ConfigurationScope(Scope.VERTEX)
+ public static final String TEZ_AM_VERTEX_MAX_TASK_CONCURRENCY =
+ TEZ_AM_PREFIX + "vertex.max-task-concurrency";
+ public static final int TEZ_AM_VERTEX_MAX_TASK_CONCURRENCY_DEFAULT = -1;
+
/**
* Int value. The maximum number of attempts that can fail for a particular task before the task is failed.
* This does not count killed attempts. Task failure results in DAG failure.
http://git-wip-us.apache.org/repos/asf/tez/blob/73d5ce6e/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGScheduler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGScheduler.java
index 2d3b006..87a6261 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGScheduler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGScheduler.java
@@ -18,16 +18,70 @@
package org.apache.tez.dag.app.dag;
+import java.util.Map;
+import java.util.Queue;
+
import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdate;
-import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdateTAAssigned;
+import org.apache.tez.dag.records.TezVertexID;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
-public interface DAGScheduler {
+public abstract class DAGScheduler {
+ private static class VertexInfo {
+ int concurrencyLimit;
+ int concurrency;
+ Queue<DAGEventSchedulerUpdate> pendingAttempts = Lists.newLinkedList();
+
+ VertexInfo(int limit) {
+ this.concurrencyLimit = limit;
+ }
+ }
- public void vertexCompleted(Vertex vertex);
+ Map<TezVertexID, VertexInfo> vertexInfo = null;
- public void scheduleTask(DAGEventSchedulerUpdate event);
+ public void addVertexConcurrencyLimit(TezVertexID vId, int concurrency) {
+ if (vertexInfo == null) {
+ vertexInfo = Maps.newHashMap();
+ }
+ if (concurrency > 0) {
+ vertexInfo.put(vId, new VertexInfo(concurrency));
+ }
+ }
- public void taskScheduled(DAGEventSchedulerUpdateTAAssigned event);
-
- public void taskSucceeded(DAGEventSchedulerUpdate event);
+ public void scheduleTask(DAGEventSchedulerUpdate event) {
+ VertexInfo vInfo = null;
+ if (vertexInfo != null) {
+ vInfo = vertexInfo.get(event.getAttempt().getID().getTaskID().getVertexID());
+ }
+ scheduleTaskWithLimit(event, vInfo);
+ }
+
+ private void scheduleTaskWithLimit(DAGEventSchedulerUpdate event, VertexInfo vInfo) {
+ if (vInfo != null) {
+ if (vInfo.concurrency >= vInfo.concurrencyLimit) {
+ vInfo.pendingAttempts.add(event);
+ return; // already at max concurrency
+ }
+ vInfo.concurrency++;
+ }
+ scheduleTaskEx(event);
+ }
+
+ public void taskCompleted(DAGEventSchedulerUpdate event) {
+ taskCompletedEx(event);
+ if (vertexInfo != null) {
+ VertexInfo vInfo = vertexInfo.get(event.getAttempt().getID().getTaskID().getVertexID());
+ if (vInfo != null) {
+ vInfo.concurrency--;
+ if (!vInfo.pendingAttempts.isEmpty()) {
+ scheduleTaskWithLimit(vInfo.pendingAttempts.poll(), vInfo);
+ }
+ }
+ }
+ }
+
+ public abstract void scheduleTaskEx(DAGEventSchedulerUpdate event);
+
+ public abstract void taskCompletedEx(DAGEventSchedulerUpdate event);
}
http://git-wip-us.apache.org/repos/asf/tez/blob/73d5ce6e/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
index 1c92314..e006822 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
@@ -86,7 +86,7 @@ public interface Vertex extends Comparable<Vertex> {
*/
TezCounters getCachedCounters();
-
+ int getMaxTaskConcurrency();
Map<TezTaskID, Task> getTasks();
Task getTask(TezTaskID taskID);
Task getTask(int taskIndex);
http://git-wip-us.apache.org/repos/asf/tez/blob/73d5ce6e/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventSchedulerUpdate.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventSchedulerUpdate.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventSchedulerUpdate.java
index a436a8c..eda02b5 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventSchedulerUpdate.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventSchedulerUpdate.java
@@ -24,8 +24,7 @@ public class DAGEventSchedulerUpdate extends DAGEvent {
public enum UpdateType {
TA_SCHEDULE,
- TA_SCHEDULED,
- TA_SUCCEEDED
+ TA_COMPLETED
}
private final TaskAttempt attempt;
http://git-wip-us.apache.org/repos/asf/tez/blob/73d5ce6e/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventSchedulerUpdateTAAssigned.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventSchedulerUpdateTAAssigned.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventSchedulerUpdateTAAssigned.java
deleted file mode 100644
index 8e27843..0000000
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventSchedulerUpdateTAAssigned.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.dag.app.dag.event;
-
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.tez.dag.app.dag.TaskAttempt;
-
-public class DAGEventSchedulerUpdateTAAssigned extends DAGEventSchedulerUpdate {
-
- final Container container;
-
- public DAGEventSchedulerUpdateTAAssigned(TaskAttempt attempt, Container container) {
- super(DAGEventSchedulerUpdate.UpdateType.TA_SCHEDULED, attempt);
- this.container = container;
- }
-
- public Container getContainer() {
- return container;
- }
-}
http://git-wip-us.apache.org/repos/asf/tez/blob/73d5ce6e/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index c04f9cb..bb6f474 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -103,7 +103,6 @@ import org.apache.tez.dag.app.dag.event.DAGEventCounterUpdate;
import org.apache.tez.dag.app.dag.event.DAGEventDiagnosticsUpdate;
import org.apache.tez.dag.app.dag.event.DAGEventRecoverEvent;
import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdate;
-import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdateTAAssigned;
import org.apache.tez.dag.app.dag.event.DAGEventStartDag;
import org.apache.tez.dag.app.dag.event.DAGEventType;
import org.apache.tez.dag.app.dag.event.DAGEventVertexCompleted;
@@ -1622,6 +1621,9 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
LOG.info("Using DAG Scheduler: " + dagSchedulerClassName);
dag.dagScheduler = ReflectionUtils.createClazzInstance(dagSchedulerClassName, new Class<?>[] {
DAG.class, EventHandler.class}, new Object[] {dag, dag.eventHandler});
+ for (Vertex v : dag.vertices.values()) {
+ dag.dagScheduler.addVertexConcurrencyLimit(v.getVertexId(), v.getMaxTaskConcurrency());
+ }
}
private static VertexImpl createVertex(DAGImpl dag, String vertexName, int vId) {
@@ -1995,10 +1997,6 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
Vertex vertex = job.vertices.get(vertexEvent.getVertexId());
job.numCompletedVertices++;
if (vertexEvent.getVertexState() == VertexState.SUCCEEDED) {
- if (!job.reRunningVertices.contains(vertex.getVertexId())) {
- // vertex succeeded for the first time
- job.dagScheduler.vertexCompleted(vertex);
- }
forceTransitionToKillWait = !(job.vertexSucceeded(vertex));
}
else if (vertexEvent.getVertexState() == VertexState.FAILED) {
@@ -2217,13 +2215,8 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
case TA_SCHEDULE:
dag.dagScheduler.scheduleTask(sEvent);
break;
- case TA_SCHEDULED:
- DAGEventSchedulerUpdateTAAssigned taEvent =
- (DAGEventSchedulerUpdateTAAssigned) sEvent;
- dag.dagScheduler.taskScheduled(taEvent);
- break;
- case TA_SUCCEEDED:
- dag.dagScheduler.taskSucceeded(sEvent);
+ case TA_COMPLETED:
+ dag.dagScheduler.taskCompleted(sEvent);
break;
default:
throw new TezUncheckedException("Unknown DAGEventSchedulerUpdate:"
http://git-wip-us.apache.org/repos/asf/tez/blob/73d5ce6e/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGSchedulerNaturalOrder.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGSchedulerNaturalOrder.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGSchedulerNaturalOrder.java
index 8d42227..4246ad0 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGSchedulerNaturalOrder.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGSchedulerNaturalOrder.java
@@ -26,11 +26,10 @@ import org.apache.tez.dag.app.dag.DAGScheduler;
import org.apache.tez.dag.app.dag.TaskAttempt;
import org.apache.tez.dag.app.dag.Vertex;
import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdate;
-import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdateTAAssigned;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventSchedule;
@SuppressWarnings("rawtypes")
-public class DAGSchedulerNaturalOrder implements DAGScheduler {
+public class DAGSchedulerNaturalOrder extends DAGScheduler {
private static final Logger LOG =
LoggerFactory.getLogger(DAGSchedulerNaturalOrder.class);
@@ -44,11 +43,7 @@ public class DAGSchedulerNaturalOrder implements DAGScheduler {
}
@Override
- public void vertexCompleted(Vertex vertex) {
- }
-
- @Override
- public void scheduleTask(DAGEventSchedulerUpdate event) {
+ public void scheduleTaskEx(DAGEventSchedulerUpdate event) {
TaskAttempt attempt = event.getAttempt();
Vertex vertex = dag.getVertex(attempt.getVertexID());
int vertexDistanceFromRoot = vertex.getDistanceFromRoot();
@@ -69,11 +64,7 @@ public class DAGSchedulerNaturalOrder implements DAGScheduler {
}
@Override
- public void taskScheduled(DAGEventSchedulerUpdateTAAssigned event) {
- }
-
- @Override
- public void taskSucceeded(DAGEventSchedulerUpdate event) {
+ public void taskCompletedEx(DAGEventSchedulerUpdate event) {
}
@SuppressWarnings("unchecked")
http://git-wip-us.apache.org/repos/asf/tez/blob/73d5ce6e/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGSchedulerNaturalOrderControlled.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGSchedulerNaturalOrderControlled.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGSchedulerNaturalOrderControlled.java
index 2469a2f..0802dce 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGSchedulerNaturalOrderControlled.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGSchedulerNaturalOrderControlled.java
@@ -36,7 +36,6 @@ import org.apache.tez.dag.app.dag.DAGScheduler;
import org.apache.tez.dag.app.dag.TaskAttempt;
import org.apache.tez.dag.app.dag.Vertex;
import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdate;
-import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdateTAAssigned;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventSchedule;
import org.apache.tez.dag.records.TezTaskAttemptID;
@@ -50,7 +49,7 @@ import org.apache.tez.dag.records.TezTaskAttemptID;
* - generic slow start mechanism across all vertices - independent of the type of edges.
*/
@SuppressWarnings("rawtypes")
-public class DAGSchedulerNaturalOrderControlled implements DAGScheduler {
+public class DAGSchedulerNaturalOrderControlled extends DAGScheduler {
private static final Logger LOG =
LoggerFactory.getLogger(DAGSchedulerNaturalOrderControlled.class);
@@ -72,13 +71,9 @@ public class DAGSchedulerNaturalOrderControlled implements DAGScheduler {
this.handler = dispatcher;
}
- @Override
- public void vertexCompleted(Vertex vertex) {
- }
-
// TODO Does ordering matter - it currently depends on the order returned by vertex.getOutput*
@Override
- public void scheduleTask(DAGEventSchedulerUpdate event) {
+ public void scheduleTaskEx(DAGEventSchedulerUpdate event) {
TaskAttempt attempt = event.getAttempt();
Vertex vertex = dag.getVertex(attempt.getVertexID());
int vertexDistanceFromRoot = vertex.getDistanceFromRoot();
@@ -241,11 +236,7 @@ public class DAGSchedulerNaturalOrderControlled implements DAGScheduler {
}
@Override
- public void taskScheduled(DAGEventSchedulerUpdateTAAssigned event) {
- }
-
- @Override
- public void taskSucceeded(DAGEventSchedulerUpdate event) {
+ public void taskCompletedEx(DAGEventSchedulerUpdate event) {
}
@SuppressWarnings("unchecked")
http://git-wip-us.apache.org/repos/asf/tez/blob/73d5ce6e/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index cc4f046..e5e0a37 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -949,40 +949,17 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
private void handleTaskAttemptCompletion(TezTaskAttemptID attemptId,
TaskAttemptStateInternal attemptState) {
this.sendTaskAttemptCompletionEvent(attemptId, attemptState);
- }
-
- // TODO: Recovery
- /*
- private static TaskFinishedEvent createTaskFinishedEvent(TaskImpl task, TaskStateInternal taskState) {
- TaskFinishedEvent tfe =
- new TaskFinishedEvent(task.taskId,
- task.successfulAttempt,
- task.getFinishTime(task.successfulAttempt),
- task.taskId.getTaskType(),
- taskState.toString(),
- task.getCounters());
- return tfe;
- }
-
- private static TaskFailedEvent createTaskFailedEvent(TaskImpl task, List<String> diag, TaskStateInternal taskState, TezTaskAttemptID taId) {
- StringBuilder errorSb = new StringBuilder();
- if (diag != null) {
- for (String d : diag) {
- errorSb.append(", ").append(d);
- }
+ if (getInternalState() != TaskStateInternal.SUCCEEDED) {
+ sendDAGSchedulerFinishedEvent(attemptId); // not a retro active action
}
- TaskFailedEvent taskFailedEvent = new TaskFailedEvent(
- TypeConverter.fromYarn(task.taskId),
- // Hack since getFinishTime needs isFinished to be true and that doesn't happen till after the transition.
- task.getFinishTime(taId),
- TypeConverter.fromYarn(task.getType()),
- errorSb.toString(),
- taskState.toString(),
- taId == null ? null : TypeConverter.fromYarn(taId));
- return taskFailedEvent;
}
- */
+ private void sendDAGSchedulerFinishedEvent(TezTaskAttemptID taId) {
+ // send notification to DAG scheduler
+ eventHandler.handle(new DAGEventSchedulerUpdate(
+ DAGEventSchedulerUpdate.UpdateType.TA_COMPLETED, attempts.get(taId)));
+ }
+
private static void unSucceed(TaskImpl task) {
task.commitAttempt = null;
task.successfulAttempt = null;
@@ -1150,10 +1127,6 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
.getID(), diagnostics, errCause));
}
}
- // send notification to DAG scheduler
- task.eventHandler.handle(new DAGEventSchedulerUpdate(
- DAGEventSchedulerUpdate.UpdateType.TA_SUCCEEDED, task.attempts
- .get(task.successfulAttempt)));
task.finished(TaskStateInternal.SUCCEEDED);
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/73d5ce6e/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index cf24348..aec738c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -1126,6 +1126,12 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
readLock.unlock();
}
}
+
+ @Override
+ public int getMaxTaskConcurrency() {
+ return vertexConf.getInt(TezConfiguration.TEZ_AM_VERTEX_MAX_TASK_CONCURRENCY,
+ TezConfiguration.TEZ_AM_VERTEX_MAX_TASK_CONCURRENCY_DEFAULT);
+ }
public VertexStats getVertexStats() {
http://git-wip-us.apache.org/repos/asf/tez/blob/73d5ce6e/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
index 785caf7..ccb4e56 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
@@ -56,7 +56,6 @@ import org.apache.tez.dag.app.dag.Vertex;
import org.apache.tez.dag.app.dag.event.DAGAppMasterEvent;
import org.apache.tez.dag.app.dag.event.DAGAppMasterEventSchedulingServiceError;
import org.apache.tez.dag.app.dag.event.DAGAppMasterEventType;
-import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdateTAAssigned;
import org.apache.tez.dag.app.rm.TaskSchedulerService.TaskSchedulerAppCallback;
import org.apache.tez.dag.app.rm.container.AMContainer;
import org.apache.tez.dag.app.rm.container.AMContainerEventAssignTA;
@@ -437,7 +436,6 @@ public class TaskSchedulerEventHandler extends AbstractService
sendEvent(new AMContainerEventLaunchRequest(containerId, taskAttempt.getVertexID(),
event.getContainerContext()));
}
- sendEvent(new DAGEventSchedulerUpdateTAAssigned(taskAttempt, container));
sendEvent(new AMContainerEventAssignTA(containerId, taskAttempt.getID(),
event.getRemoteTaskSpec(), event.getContainerContext().getLocalResources(), event
.getContainerContext().getCredentials(), event.getPriority()));
http://git-wip-us.apache.org/repos/asf/tez/blob/73d5ce6e/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
index f970264..b059b32 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
@@ -85,7 +85,6 @@ import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
-@SuppressWarnings("unchecked")
public class MockDAGAppMaster extends DAGAppMaster {
private static final Logger LOG = LoggerFactory.getLogger(MockDAGAppMaster.class);
@@ -96,6 +95,7 @@ public class MockDAGAppMaster extends DAGAppMaster {
EventsDelegate eventsDelegate;
CountersDelegate countersDelegate;
StatisticsDelegate statsDelegate;
+ ContainerDelegate containerDelegate;
long launcherSleepTime = 1;
boolean doSleep = true;
int handlerConcurrency = 1;
@@ -116,6 +116,11 @@ public class MockDAGAppMaster extends DAGAppMaster {
public static interface EventsDelegate {
public void getEvents(TaskSpec taskSpec, List<TezEvent> events, long time);
}
+
+ public static interface ContainerDelegate {
+ public void stop();
+ public void launch();
+ }
// mock container launcher does not launch real tasks.
// Upon, launch of a container is simulates the container asking for tasks
@@ -275,6 +280,9 @@ public class MockDAGAppMaster extends DAGAppMaster {
void stop(NMCommunicatorStopRequestEvent event) {
// remove from simulated container list
containers.remove(event.getContainerId());
+ if (containerDelegate != null) {
+ containerDelegate.stop();
+ }
getContext().getEventHandler().handle(
new AMContainerEvent(event.getContainerId(), AMContainerEventType.C_NM_STOP_SENT));
}
@@ -285,6 +293,9 @@ public class MockDAGAppMaster extends DAGAppMaster {
event.getContainerLaunchContext());
containers.put(event.getContainerId(), cData);
containersToProcess.add(cData);
+ if (containerDelegate != null) {
+ containerDelegate.launch();
+ }
getContext().getEventHandler().handle(new AMContainerEventLaunched(event.getContainerId()));
}
http://git-wip-us.apache.org/repos/asf/tez/blob/73d5ce6e/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
index 42d4b0b..9ba7f3c 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
@@ -28,6 +28,7 @@ import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.SystemUtils;
@@ -73,6 +74,7 @@ import org.apache.tez.dag.api.client.VertexStatus;
import org.apache.tez.dag.api.client.VertexStatus.State;
import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
import org.apache.tez.dag.app.MockDAGAppMaster.CountersDelegate;
+import org.apache.tez.dag.app.MockDAGAppMaster.ContainerDelegate;
import org.apache.tez.dag.app.MockDAGAppMaster.EventsDelegate;
import org.apache.tez.dag.app.MockDAGAppMaster.MockContainerLauncher;
import org.apache.tez.dag.app.MockDAGAppMaster.MockContainerLauncher.ContainerData;
@@ -406,6 +408,52 @@ public class TestMockDAGAppMaster {
tezClient.stop();
}
+
+ @Test (timeout = 100000)
+ public void testConcurrencyLimit() throws Exception {
+ // the test relies on local mode behavior of launching a new container per task.
+ // so task concurrency == container concurrency
+ TezConfiguration tezconf = new TezConfiguration(defaultConf);
+
+ final int concurrencyLimit = 5;
+ MockTezClient tezClient = new MockTezClient("testMockAM", tezconf, true, null, null, null,
+ null, false, false, concurrencyLimit*4, 1000);
+
+ tezClient.start();
+
+ MockDAGAppMaster mockApp = tezClient.getLocalClient().getMockApp();
+ MockContainerLauncher mockLauncher = mockApp.getContainerLauncher();
+ mockLauncher.startScheduling(false);
+
+ final AtomicInteger concurrency = new AtomicInteger(0);
+ final AtomicBoolean exceededConcurrency = new AtomicBoolean(false);
+ mockApp.containerDelegate = new ContainerDelegate() {
+ @Override
+ public void stop() {
+ concurrency.decrementAndGet();
+ }
+ @Override
+ public void launch() {
+ int maxConc = concurrency.incrementAndGet();
+ if (maxConc > concurrencyLimit) {
+ exceededConcurrency.set(true);
+ }
+ System.out.println("Launched: " + maxConc);
+ }
+ };
+ DAG dag = DAG.create("testConcurrencyLimit");
+ Vertex vA = Vertex.create("A", ProcessorDescriptor.create("Proc.class"), 20).setConf(
+ TezConfiguration.TEZ_AM_VERTEX_MAX_TASK_CONCURRENCY, String.valueOf(concurrencyLimit));
+ dag.addVertex(vA);
+
+ mockLauncher.startScheduling(true);
+ DAGClient dagClient = tezClient.submitDAG(dag);
+ dagClient.waitForCompletion();
+ Assert.assertEquals(DAGStatus.State.SUCCEEDED, dagClient.getDAGStatus(null).getState());
+ Assert.assertFalse(exceededConcurrency.get());
+ tezClient.stop();
+ }
+
@Test (timeout = 10000)
public void testBasicCounters() throws Exception {
http://git-wip-us.apache.org/repos/asf/tez/blob/73d5ce6e/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
index c7dec36..6d45433 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
@@ -1604,7 +1604,6 @@ public class TestDAGImpl {
Assert.assertEquals(VertexState.SUCCEEDED, v.getState());
Assert.assertEquals(1, dag.getSuccessfulVertices());
Assert.assertEquals(1, dag.numCompletedVertices);
- verify(dag.dagScheduler, times(1)).vertexCompleted(v);
dispatcher.getEventHandler().handle(
new VertexEventTaskReschedule(TezTaskID.getInstance(vId, 0)));
@@ -1621,9 +1620,6 @@ public class TestDAGImpl {
Assert.assertEquals(1, dag.getSuccessfulVertices());
Assert.assertEquals(1, dag.numCompletedVertices);
- // re-completion is not notified again
- verify(dag.dagScheduler, times(1)).vertexCompleted(v);
-
}
@SuppressWarnings("unchecked")
http://git-wip-us.apache.org/repos/asf/tez/blob/73d5ce6e/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGScheduler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGScheduler.java
index 913f5fa..a28f367 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGScheduler.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGScheduler.java
@@ -25,27 +25,34 @@ import org.apache.tez.dag.app.dag.TaskAttempt;
import org.apache.tez.dag.app.dag.Vertex;
import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdate;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventSchedule;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
import org.junit.Assert;
import org.junit.Test;
+import com.google.common.collect.Lists;
+
import static org.mockito.Mockito.*;
+import java.util.List;
+
public class TestDAGScheduler {
class MockEventHandler implements EventHandler<TaskAttemptEventSchedule> {
TaskAttemptEventSchedule event;
+ List<TaskAttemptEventSchedule> events = Lists.newLinkedList();
@Override
public void handle(TaskAttemptEventSchedule event) {
this.event = event;
+ this.events.add(event);
}
-
}
- MockEventHandler mockEventHandler = new MockEventHandler();
@Test(timeout=5000)
public void testDAGSchedulerNaturalOrder() {
+ MockEventHandler mockEventHandler = new MockEventHandler();
DAG mockDag = mock(DAG.class);
Vertex mockVertex = mock(Vertex.class);
TaskAttempt mockAttempt = mock(TaskAttempt.class);
@@ -58,15 +65,125 @@ public class TestDAGScheduler {
DAGScheduler scheduler = new DAGSchedulerNaturalOrder(mockDag,
mockEventHandler);
- scheduler.scheduleTask(event);
+ scheduler.scheduleTaskEx(event);
Assert.assertEquals(1, mockEventHandler.event.getPriorityHighLimit());
Assert.assertEquals(3, mockEventHandler.event.getPriorityLowLimit());
- scheduler.scheduleTask(event);
+ scheduler.scheduleTaskEx(event);
Assert.assertEquals(4, mockEventHandler.event.getPriorityHighLimit());
Assert.assertEquals(6, mockEventHandler.event.getPriorityLowLimit());
- scheduler.scheduleTask(event);
+ scheduler.scheduleTaskEx(event);
Assert.assertEquals(7, mockEventHandler.event.getPriorityHighLimit());
Assert.assertEquals(9, mockEventHandler.event.getPriorityLowLimit());
}
+ @Test(timeout=5000)
+ public void testConcurrencyLimit() {
+ MockEventHandler mockEventHandler = new MockEventHandler();
+ DAG mockDag = mock(DAG.class);
+ TezVertexID vId0 = TezVertexID.fromString("vertex_1436907267600_195589_1_00");
+ TezVertexID vId1 = TezVertexID.fromString("vertex_1436907267600_195589_1_01");
+ TezTaskID tId0 = TezTaskID.getInstance(vId0, 0);
+ TezTaskID tId1 = TezTaskID.getInstance(vId1, 0);
+
+ TaskAttempt mockAttempt;
+
+ Vertex mockVertex = mock(Vertex.class);
+ when(mockDag.getVertex((TezVertexID) any())).thenReturn(mockVertex);
+ when(mockVertex.getDistanceFromRoot()).thenReturn(0);
+
+ DAGScheduler scheduler = new DAGSchedulerNaturalOrder(mockDag,
+ mockEventHandler);
+ scheduler.addVertexConcurrencyLimit(vId0, 0); // not effective
+
+ // schedule beyond limit and it gets scheduled
+ mockAttempt = mock(TaskAttempt.class);
+ when(mockAttempt.getID()).thenReturn(TezTaskAttemptID.getInstance(tId0, 0));
+ scheduler.scheduleTask(new DAGEventSchedulerUpdate(
+ DAGEventSchedulerUpdate.UpdateType.TA_SCHEDULE, mockAttempt));
+ Assert.assertEquals(1, mockEventHandler.events.size());
+ mockAttempt = mock(TaskAttempt.class);
+ when(mockAttempt.getID()).thenReturn(TezTaskAttemptID.getInstance(tId0, 1));
+ scheduler.scheduleTask(new DAGEventSchedulerUpdate(
+ DAGEventSchedulerUpdate.UpdateType.TA_SCHEDULE, mockAttempt));
+ Assert.assertEquals(2, mockEventHandler.events.size());
+ mockAttempt = mock(TaskAttempt.class);
+ when(mockAttempt.getID()).thenReturn(TezTaskAttemptID.getInstance(tId0, 2));
+ scheduler.scheduleTask(new DAGEventSchedulerUpdate(
+ DAGEventSchedulerUpdate.UpdateType.TA_SCHEDULE, mockAttempt));
+ Assert.assertEquals(3, mockEventHandler.events.size());
+
+ mockEventHandler.events.clear();
+ List<TaskAttempt> mockAttempts = Lists.newArrayList();
+ int completed = 0;
+ int requested = 0;
+ int scheduled = 0;
+ scheduler.addVertexConcurrencyLimit(vId1, 2); // effective
+
+ // schedule beyond limit and it gets buffered
+ mockAttempt = mock(TaskAttempt.class);
+ mockAttempts.add(mockAttempt);
+ when(mockAttempt.getID()).thenReturn(TezTaskAttemptID.getInstance(tId1, requested++));
+ scheduler.scheduleTask(new DAGEventSchedulerUpdate(
+ DAGEventSchedulerUpdate.UpdateType.TA_SCHEDULE, mockAttempt));
+ Assert.assertEquals(scheduled+1, mockEventHandler.events.size()); // scheduled
+ Assert.assertEquals(mockAttempts.get(scheduled).getID(),
+ mockEventHandler.events.get(scheduled).getTaskAttemptID()); // matches order
+ scheduled++;
+
+ mockAttempt = mock(TaskAttempt.class);
+ mockAttempts.add(mockAttempt);
+ when(mockAttempt.getID()).thenReturn(TezTaskAttemptID.getInstance(tId1, requested++));
+ scheduler.scheduleTask(new DAGEventSchedulerUpdate(
+ DAGEventSchedulerUpdate.UpdateType.TA_SCHEDULE, mockAttempt));
+ Assert.assertEquals(scheduled+1, mockEventHandler.events.size()); // scheduled
+ Assert.assertEquals(mockAttempts.get(scheduled).getID(),
+ mockEventHandler.events.get(scheduled).getTaskAttemptID()); // matches order
+ scheduled++;
+
+ mockAttempt = mock(TaskAttempt.class);
+ mockAttempts.add(mockAttempt);
+ when(mockAttempt.getID()).thenReturn(TezTaskAttemptID.getInstance(tId1, requested++));
+ scheduler.scheduleTask(new DAGEventSchedulerUpdate(
+ DAGEventSchedulerUpdate.UpdateType.TA_SCHEDULE, mockAttempt));
+ Assert.assertEquals(scheduled, mockEventHandler.events.size()); // buffered
+
+ mockAttempt = mock(TaskAttempt.class);
+ mockAttempts.add(mockAttempt);
+ when(mockAttempt.getID()).thenReturn(TezTaskAttemptID.getInstance(tId1, requested++));
+ scheduler.scheduleTask(new DAGEventSchedulerUpdate(
+ DAGEventSchedulerUpdate.UpdateType.TA_SCHEDULE, mockAttempt));
+ Assert.assertEquals(scheduled, mockEventHandler.events.size()); // buffered
+
+ scheduler.taskCompleted(new DAGEventSchedulerUpdate(
+ DAGEventSchedulerUpdate.UpdateType.TA_COMPLETED, mockAttempts.get(completed++)));
+ Assert.assertEquals(scheduled+1, mockEventHandler.events.size()); // scheduled
+ Assert.assertEquals(mockAttempts.get(scheduled).getID(),
+ mockEventHandler.events.get(scheduled).getTaskAttemptID()); // matches order
+ scheduled++;
+
+ scheduler.taskCompleted(new DAGEventSchedulerUpdate(
+ DAGEventSchedulerUpdate.UpdateType.TA_COMPLETED, mockAttempts.get(completed++)));
+ Assert.assertEquals(scheduled+1, mockEventHandler.events.size()); // scheduled
+ Assert.assertEquals(mockAttempts.get(scheduled).getID(),
+ mockEventHandler.events.get(scheduled).getTaskAttemptID()); // matches order
+ scheduled++;
+
+ scheduler.taskCompleted(new DAGEventSchedulerUpdate(
+ DAGEventSchedulerUpdate.UpdateType.TA_COMPLETED, mockAttempts.get(completed++)));
+ Assert.assertEquals(scheduled, mockEventHandler.events.size()); // no extra scheduling
+
+ mockAttempt = mock(TaskAttempt.class);
+ mockAttempts.add(mockAttempt);
+ when(mockAttempt.getID()).thenReturn(TezTaskAttemptID.getInstance(tId1, requested++));
+ scheduler.scheduleTask(new DAGEventSchedulerUpdate(
+ DAGEventSchedulerUpdate.UpdateType.TA_SCHEDULE, mockAttempt));
+ Assert.assertEquals(scheduled+1, mockEventHandler.events.size()); // scheduled
+ Assert.assertEquals(mockAttempts.get(scheduled).getID(),
+ mockEventHandler.events.get(scheduled).getTaskAttemptID()); // matches order
+ scheduled++;
+
+ }
+
+
+
}
http://git-wip-us.apache.org/repos/asf/tez/blob/73d5ce6e/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGSchedulerNaturalOrderControlled.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGSchedulerNaturalOrderControlled.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGSchedulerNaturalOrderControlled.java
index 88a91b6..3fbee80 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGSchedulerNaturalOrderControlled.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGSchedulerNaturalOrderControlled.java
@@ -62,35 +62,35 @@ public class TestDAGSchedulerNaturalOrderControlled {
// Schedule all tasks belonging to v0
for (int i = 0; i < vertices[0].getTotalTasks(); i++) {
- dagScheduler.scheduleTask(createScheduleRequest(vertices[0].getVertexId(), i, 0));
+ dagScheduler.scheduleTaskEx(createScheduleRequest(vertices[0].getVertexId(), i, 0));
}
verify(eventHandler, times(vertices[0].getTotalTasks())).handle(any(Event.class));
reset(eventHandler);
// Schedule 3 tasks belonging to v2
for (int i = 0; i < 3; i++) {
- dagScheduler.scheduleTask(createScheduleRequest(vertices[2].getVertexId(), i, 0));
+ dagScheduler.scheduleTaskEx(createScheduleRequest(vertices[2].getVertexId(), i, 0));
}
verify(eventHandler, times(3)).handle(any(Event.class));
reset(eventHandler);
// Schedule 3 tasks belonging to v3
for (int i = 0; i < 3; i++) {
- dagScheduler.scheduleTask(createScheduleRequest(vertices[3].getVertexId(), i, 0));
+ dagScheduler.scheduleTaskEx(createScheduleRequest(vertices[3].getVertexId(), i, 0));
}
verify(eventHandler, times(3)).handle(any(Event.class));
reset(eventHandler);
// Schedule remaining tasks belonging to v2
for (int i = 3; i < vertices[2].getTotalTasks(); i++) {
- dagScheduler.scheduleTask(createScheduleRequest(vertices[2].getVertexId(), i, 0));
+ dagScheduler.scheduleTaskEx(createScheduleRequest(vertices[2].getVertexId(), i, 0));
}
verify(eventHandler, times(vertices[2].getTotalTasks() - 3)).handle(any(Event.class));
reset(eventHandler);
// Schedule remaining tasks belonging to v3
for (int i = 3; i < vertices[3].getTotalTasks(); i++) {
- dagScheduler.scheduleTask(createScheduleRequest(vertices[3].getVertexId(), i, 0));
+ dagScheduler.scheduleTaskEx(createScheduleRequest(vertices[3].getVertexId(), i, 0));
}
verify(eventHandler, times(vertices[3].getTotalTasks() - 3)).handle(any(Event.class));
reset(eventHandler);
@@ -98,7 +98,7 @@ public class TestDAGSchedulerNaturalOrderControlled {
// Schedule all tasks belonging to v4
for (int i = 0; i < vertices[4].getTotalTasks(); i++) {
- dagScheduler.scheduleTask(createScheduleRequest(vertices[4].getVertexId(), i, 0));
+ dagScheduler.scheduleTaskEx(createScheduleRequest(vertices[4].getVertexId(), i, 0));
}
verify(eventHandler, times(vertices[4].getTotalTasks())).handle(any(Event.class));
reset(eventHandler);
@@ -120,7 +120,7 @@ public class TestDAGSchedulerNaturalOrderControlled {
// Schedule all tasks belonging to v0
for (int i = 0; i < vertices[0].getTotalTasks(); i++) {
- dagScheduler.scheduleTask(createScheduleRequest(vertices[0].getVertexId(), i, 0));
+ dagScheduler.scheduleTaskEx(createScheduleRequest(vertices[0].getVertexId(), i, 0));
}
verify(eventHandler, times(vertices[0].getTotalTasks())).handle(any(Event.class));
reset(eventHandler);
@@ -128,14 +128,14 @@ public class TestDAGSchedulerNaturalOrderControlled {
// v2 behaving as if configured with slow-start.
// Schedule all tasks belonging to v3.
for (int i = 0; i < vertices[3].getTotalTasks(); i++) {
- dagScheduler.scheduleTask(createScheduleRequest(vertices[3].getVertexId(), i, 0));
+ dagScheduler.scheduleTaskEx(createScheduleRequest(vertices[3].getVertexId(), i, 0));
}
verify(eventHandler, times(vertices[3].getTotalTasks())).handle(any(Event.class));
reset(eventHandler);
// Scheduling all tasks belonging to v4. None should get scheduled.
for (int i = 0; i < vertices[4].getTotalTasks(); i++) {
- dagScheduler.scheduleTask(createScheduleRequest(vertices[4].getVertexId(), i, 0));
+ dagScheduler.scheduleTaskEx(createScheduleRequest(vertices[4].getVertexId(), i, 0));
}
verify(eventHandler, never()).handle(any(Event.class));
reset(eventHandler);
@@ -143,14 +143,14 @@ public class TestDAGSchedulerNaturalOrderControlled {
// v2 now starts scheduling ...
// Schedule 3 tasks for v2 initially.
for (int i = 0; i < 3; i++) {
- dagScheduler.scheduleTask(createScheduleRequest(vertices[2].getVertexId(), i, 0));
+ dagScheduler.scheduleTaskEx(createScheduleRequest(vertices[2].getVertexId(), i, 0));
}
verify(eventHandler, times(3)).handle(any(Event.class));
reset(eventHandler);
// Schedule remaining tasks belonging to v2
for (int i = 3; i < vertices[2].getTotalTasks(); i++) {
- dagScheduler.scheduleTask(createScheduleRequest(vertices[2].getVertexId(), i, 0));
+ dagScheduler.scheduleTaskEx(createScheduleRequest(vertices[2].getVertexId(), i, 0));
}
ArgumentCaptor<Event> args = ArgumentCaptor.forClass(Event.class);
// All of v2 and v3 should be sent out.
@@ -187,7 +187,7 @@ public class TestDAGSchedulerNaturalOrderControlled {
// Schedule all tasks belonging to v0
for (int i = 0; i < vertices[0].getTotalTasks(); i++) {
- dagScheduler.scheduleTask(createScheduleRequest(vertices[0].getVertexId(), i, 0));
+ dagScheduler.scheduleTaskEx(createScheduleRequest(vertices[0].getVertexId(), i, 0));
}
verify(eventHandler, times(vertices[0].getTotalTasks())).handle(any(Event.class));
reset(eventHandler);
@@ -197,14 +197,14 @@ public class TestDAGSchedulerNaturalOrderControlled {
// v2 will change parallelism
// Schedule all tasks belonging to v3
for (int i = 0; i < vertices[3].getTotalTasks(); i++) {
- dagScheduler.scheduleTask(createScheduleRequest(vertices[3].getVertexId(), i, 0));
+ dagScheduler.scheduleTaskEx(createScheduleRequest(vertices[3].getVertexId(), i, 0));
}
verify(eventHandler, times(vertices[3].getTotalTasks())).handle(any(Event.class));
reset(eventHandler);
// Schedule all tasks belonging to v4
for (int i = 0; i < vertices[4].getTotalTasks(); i++) {
- dagScheduler.scheduleTask(createScheduleRequest(vertices[4].getVertexId(), i, 0));
+ dagScheduler.scheduleTaskEx(createScheduleRequest(vertices[4].getVertexId(), i, 0));
}
verify(eventHandler, never()).handle(any(Event.class));
reset(eventHandler);
@@ -215,7 +215,7 @@ public class TestDAGSchedulerNaturalOrderControlled {
// Schedule all tasks belonging to v2
for (int i = 0; i < vertices[2].getTotalTasks(); i++) {
- dagScheduler.scheduleTask(createScheduleRequest(vertices[2].getVertexId(), i, 0));
+ dagScheduler.scheduleTaskEx(createScheduleRequest(vertices[2].getVertexId(), i, 0));
}
verify(eventHandler, times(vertices[2].getTotalTasks() + vertices[4].getTotalTasks()))
.handle(any(Event.class));
@@ -237,7 +237,7 @@ public class TestDAGSchedulerNaturalOrderControlled {
// Schedule all but 1 task belonging to v0
for (int i = 0; i < vertices[0].getTotalTasks() - 1; i++) {
- dagScheduler.scheduleTask(createScheduleRequest(vertices[0].getVertexId(), i, 0));
+ dagScheduler.scheduleTaskEx(createScheduleRequest(vertices[0].getVertexId(), i, 0));
}
verify(eventHandler, times(vertices[0].getTotalTasks() - 1)).handle(any(Event.class));
reset(eventHandler);
@@ -245,7 +245,7 @@ public class TestDAGSchedulerNaturalOrderControlled {
// Schedule all tasks belonging to v2
for (int i = 0; i < vertices[2].getTotalTasks(); i++) {
- dagScheduler.scheduleTask(createScheduleRequest(vertices[2].getVertexId(), i, 0));
+ dagScheduler.scheduleTaskEx(createScheduleRequest(vertices[2].getVertexId(), i, 0));
}
// Nothing should be scheduled
verify(eventHandler, never()).handle(any(Event.class));
@@ -253,14 +253,14 @@ public class TestDAGSchedulerNaturalOrderControlled {
// Schedule an extra attempt for all but 1 task belonging to v0
for (int i = 0; i < vertices[0].getTotalTasks() - 1; i++) {
- dagScheduler.scheduleTask(createScheduleRequest(vertices[0].getVertexId(), i, 1));
+ dagScheduler.scheduleTaskEx(createScheduleRequest(vertices[0].getVertexId(), i, 1));
}
// Only v0 requests should have gone out
verify(eventHandler, times(vertices[0].getTotalTasks() - 1)).handle(any(Event.class));
reset(eventHandler);
// Schedule last task of v0, with attempt 1
- dagScheduler.scheduleTask(
+ dagScheduler.scheduleTaskEx(
createScheduleRequest(vertices[0].getVertexId(), vertices[0].getTotalTasks() - 1, 1));
// One v0 request and all of v2 should have gone out
verify(eventHandler, times(1 + vertices[2].getTotalTasks())).handle(any(Event.class));
http://git-wip-us.apache.org/repos/asf/tez/blob/73d5ce6e/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java
index fc7aa50..d274f8c 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java
@@ -164,10 +164,10 @@ public class TestTaskSchedulerEventHandler {
new AMSchedulerEventTALaunchRequest(mockAttemptId, resource, null, mockTaskAttempt, locHint,
priority, containerContext);
schedulerHandler.taskAllocated(mockTaskAttempt, lr, container);
- assertEquals(2, mockEventHandler.events.size());
- assertTrue(mockEventHandler.events.get(1) instanceof AMContainerEventAssignTA);
+ assertEquals(1, mockEventHandler.events.size());
+ assertTrue(mockEventHandler.events.get(0) instanceof AMContainerEventAssignTA);
AMContainerEventAssignTA assignEvent =
- (AMContainerEventAssignTA) mockEventHandler.events.get(1);
+ (AMContainerEventAssignTA) mockEventHandler.events.get(0);
assertEquals(priority, assignEvent.getPriority());
assertEquals(mockAttemptId, assignEvent.getTaskAttemptId());
}