You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2015/12/26 04:39:56 UTC
tez git commit: TEZ-2914. Ability to limit vertex concurrency (bikas)
Repository: tez
Updated Branches:
refs/heads/master 12bd908f4 -> 34eb75d70
TEZ-2914. Ability to limit vertex concurrency (bikas)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/34eb75d7
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/34eb75d7
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/34eb75d7
Branch: refs/heads/master
Commit: 34eb75d709bbe6e0417f9b4023f4fa1cec81bd8b
Parents: 12bd908
Author: Bikas Saha <bi...@apache.org>
Authored: Fri Dec 25 19:39:04 2015 -0800
Committer: Bikas Saha <bi...@apache.org>
Committed: Fri Dec 25 19:39:04 2015 -0800
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/tez/dag/api/TezConfiguration.java | 12 +-
.../apache/tez/dag/app/dag/DAGScheduler.java | 68 +++++++++-
.../java/org/apache/tez/dag/app/dag/Vertex.java | 2 +-
.../app/dag/event/DAGEventSchedulerUpdate.java | 3 +-
.../DAGEventSchedulerUpdateTAAssigned.java | 36 ------
.../apache/tez/dag/app/dag/impl/DAGImpl.java | 17 +--
.../app/dag/impl/DAGSchedulerNaturalOrder.java | 15 +--
.../DAGSchedulerNaturalOrderControlled.java | 15 +--
.../apache/tez/dag/app/dag/impl/TaskImpl.java | 43 ++-----
.../apache/tez/dag/app/dag/impl/VertexImpl.java | 6 +
.../tez/dag/app/rm/TaskSchedulerManager.java | 2 -
.../apache/tez/dag/app/MockDAGAppMaster.java | 13 +-
.../tez/dag/app/TestMockDAGAppMaster.java | 50 ++++++++
.../tez/dag/app/dag/impl/TestDAGImpl.java | 4 -
.../tez/dag/app/dag/impl/TestDAGScheduler.java | 127 ++++++++++++++++++-
.../TestDAGSchedulerNaturalOrderControlled.java | 38 +++---
.../dag/app/rm/TestTaskSchedulerManager.java | 6 +-
18 files changed, 306 insertions(+), 152 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/34eb75d7/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index a3b0fa6..25cfd86 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -11,6 +11,7 @@ INCOMPATIBLE CHANGES
TEZ-2972. Avoid task rescheduling when a node turns unhealthy
ALL CHANGES:
+ TEZ-2914. Ability to limit vertex concurrency
TEZ-3011. Link Vertex Name in Dag Tasks/Task Attempts to Vertex
TEZ-3006. Remove unused import in TestHistoryParser.
TEZ-2910. Set caller context for tracing ( integrate with HDFS-9184 ).
http://git-wip-us.apache.org/repos/asf/tez/blob/34eb75d7/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index b707857..9f7777f 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -537,7 +537,17 @@ public class TezConfiguration extends Configuration {
public static final String TEZ_AM_MAX_APP_ATTEMPTS = TEZ_AM_PREFIX +
"max.app.attempts";
public static final int TEZ_AM_MAX_APP_ATTEMPTS_DEFAULT = 2;
-
+
+ /**
+ * Int value. The maximum number of attempts that can run concurrently for a given vertex.
+ * Setting <=0 implies no limit
+ */
+ @ConfigurationScope(Scope.VERTEX)
+ @ConfigurationProperty(type="integer")
+ public static final String TEZ_AM_VERTEX_MAX_TASK_CONCURRENCY =
+ TEZ_AM_PREFIX + "vertex.max-task-concurrency";
+ public static final int TEZ_AM_VERTEX_MAX_TASK_CONCURRENCY_DEFAULT = -1;
+
/**
* Int value. The maximum number of attempts that can fail for a particular task before the task is failed.
* This does not count killed attempts. Task failure results in DAG failure.
http://git-wip-us.apache.org/repos/asf/tez/blob/34eb75d7/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGScheduler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGScheduler.java
index 2d3b006..87a6261 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGScheduler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGScheduler.java
@@ -18,16 +18,70 @@
package org.apache.tez.dag.app.dag;
+import java.util.Map;
+import java.util.Queue;
+
import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdate;
-import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdateTAAssigned;
+import org.apache.tez.dag.records.TezVertexID;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
-public interface DAGScheduler {
+public abstract class DAGScheduler {
+ private static class VertexInfo {
+ int concurrencyLimit;
+ int concurrency;
+ Queue<DAGEventSchedulerUpdate> pendingAttempts = Lists.newLinkedList();
+
+ VertexInfo(int limit) {
+ this.concurrencyLimit = limit;
+ }
+ }
- public void vertexCompleted(Vertex vertex);
+ Map<TezVertexID, VertexInfo> vertexInfo = null;
- public void scheduleTask(DAGEventSchedulerUpdate event);
+ public void addVertexConcurrencyLimit(TezVertexID vId, int concurrency) {
+ if (vertexInfo == null) {
+ vertexInfo = Maps.newHashMap();
+ }
+ if (concurrency > 0) {
+ vertexInfo.put(vId, new VertexInfo(concurrency));
+ }
+ }
- public void taskScheduled(DAGEventSchedulerUpdateTAAssigned event);
-
- public void taskSucceeded(DAGEventSchedulerUpdate event);
+ public void scheduleTask(DAGEventSchedulerUpdate event) {
+ VertexInfo vInfo = null;
+ if (vertexInfo != null) {
+ vInfo = vertexInfo.get(event.getAttempt().getID().getTaskID().getVertexID());
+ }
+ scheduleTaskWithLimit(event, vInfo);
+ }
+
+ private void scheduleTaskWithLimit(DAGEventSchedulerUpdate event, VertexInfo vInfo) {
+ if (vInfo != null) {
+ if (vInfo.concurrency >= vInfo.concurrencyLimit) {
+ vInfo.pendingAttempts.add(event);
+ return; // already at max concurrency
+ }
+ vInfo.concurrency++;
+ }
+ scheduleTaskEx(event);
+ }
+
+ public void taskCompleted(DAGEventSchedulerUpdate event) {
+ taskCompletedEx(event);
+ if (vertexInfo != null) {
+ VertexInfo vInfo = vertexInfo.get(event.getAttempt().getID().getTaskID().getVertexID());
+ if (vInfo != null) {
+ vInfo.concurrency--;
+ if (!vInfo.pendingAttempts.isEmpty()) {
+ scheduleTaskWithLimit(vInfo.pendingAttempts.poll(), vInfo);
+ }
+ }
+ }
+ }
+
+ public abstract void scheduleTaskEx(DAGEventSchedulerUpdate event);
+
+ public abstract void taskCompletedEx(DAGEventSchedulerUpdate event);
}
http://git-wip-us.apache.org/repos/asf/tez/blob/34eb75d7/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
index 9fc73a2..54f2ffa 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
@@ -85,7 +85,7 @@ public interface Vertex extends Comparable<Vertex> {
*/
TezCounters getCachedCounters();
-
+ int getMaxTaskConcurrency();
Map<TezTaskID, Task> getTasks();
Task getTask(TezTaskID taskID);
Task getTask(int taskIndex);
http://git-wip-us.apache.org/repos/asf/tez/blob/34eb75d7/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventSchedulerUpdate.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventSchedulerUpdate.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventSchedulerUpdate.java
index a436a8c..eda02b5 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventSchedulerUpdate.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventSchedulerUpdate.java
@@ -24,8 +24,7 @@ public class DAGEventSchedulerUpdate extends DAGEvent {
public enum UpdateType {
TA_SCHEDULE,
- TA_SCHEDULED,
- TA_SUCCEEDED
+ TA_COMPLETED
}
private final TaskAttempt attempt;
http://git-wip-us.apache.org/repos/asf/tez/blob/34eb75d7/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventSchedulerUpdateTAAssigned.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventSchedulerUpdateTAAssigned.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventSchedulerUpdateTAAssigned.java
deleted file mode 100644
index 8e27843..0000000
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/DAGEventSchedulerUpdateTAAssigned.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.dag.app.dag.event;
-
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.tez.dag.app.dag.TaskAttempt;
-
-public class DAGEventSchedulerUpdateTAAssigned extends DAGEventSchedulerUpdate {
-
- final Container container;
-
- public DAGEventSchedulerUpdateTAAssigned(TaskAttempt attempt, Container container) {
- super(DAGEventSchedulerUpdate.UpdateType.TA_SCHEDULED, attempt);
- this.container = container;
- }
-
- public Container getContainer() {
- return container;
- }
-}
http://git-wip-us.apache.org/repos/asf/tez/blob/34eb75d7/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index 3d47450..60f933f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -104,7 +104,6 @@ import org.apache.tez.dag.app.dag.event.DAGEventCounterUpdate;
import org.apache.tez.dag.app.dag.event.DAGEventDiagnosticsUpdate;
import org.apache.tez.dag.app.dag.event.DAGEventRecoverEvent;
import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdate;
-import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdateTAAssigned;
import org.apache.tez.dag.app.dag.event.DAGEventStartDag;
import org.apache.tez.dag.app.dag.event.DAGEventType;
import org.apache.tez.dag.app.dag.event.DAGEventVertexCompleted;
@@ -1592,6 +1591,9 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
LOG.info("Using DAG Scheduler: " + dagSchedulerClassName);
dag.dagScheduler = ReflectionUtils.createClazzInstance(dagSchedulerClassName, new Class<?>[] {
DAG.class, EventHandler.class}, new Object[] {dag, dag.eventHandler});
+ for (Vertex v : dag.vertices.values()) {
+ dag.dagScheduler.addVertexConcurrencyLimit(v.getVertexId(), v.getMaxTaskConcurrency());
+ }
}
private static VertexImpl createVertex(DAGImpl dag, String vertexName, int vId) {
@@ -1903,10 +1905,6 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
Vertex vertex = job.vertices.get(vertexEvent.getVertexId());
job.numCompletedVertices++;
if (vertexEvent.getVertexState() == VertexState.SUCCEEDED) {
- if (!job.reRunningVertices.contains(vertex.getVertexId())) {
- // vertex succeeded for the first time
- job.dagScheduler.vertexCompleted(vertex);
- }
forceTransitionToKillWait = !(job.vertexSucceeded(vertex));
}
else if (vertexEvent.getVertexState() == VertexState.FAILED) {
@@ -2146,13 +2144,8 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
case TA_SCHEDULE:
dag.dagScheduler.scheduleTask(sEvent);
break;
- case TA_SCHEDULED:
- DAGEventSchedulerUpdateTAAssigned taEvent =
- (DAGEventSchedulerUpdateTAAssigned) sEvent;
- dag.dagScheduler.taskScheduled(taEvent);
- break;
- case TA_SUCCEEDED:
- dag.dagScheduler.taskSucceeded(sEvent);
+ case TA_COMPLETED:
+ dag.dagScheduler.taskCompleted(sEvent);
break;
default:
throw new TezUncheckedException("Unknown DAGEventSchedulerUpdate:"
http://git-wip-us.apache.org/repos/asf/tez/blob/34eb75d7/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGSchedulerNaturalOrder.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGSchedulerNaturalOrder.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGSchedulerNaturalOrder.java
index 8d42227..4246ad0 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGSchedulerNaturalOrder.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGSchedulerNaturalOrder.java
@@ -26,11 +26,10 @@ import org.apache.tez.dag.app.dag.DAGScheduler;
import org.apache.tez.dag.app.dag.TaskAttempt;
import org.apache.tez.dag.app.dag.Vertex;
import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdate;
-import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdateTAAssigned;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventSchedule;
@SuppressWarnings("rawtypes")
-public class DAGSchedulerNaturalOrder implements DAGScheduler {
+public class DAGSchedulerNaturalOrder extends DAGScheduler {
private static final Logger LOG =
LoggerFactory.getLogger(DAGSchedulerNaturalOrder.class);
@@ -44,11 +43,7 @@ public class DAGSchedulerNaturalOrder implements DAGScheduler {
}
@Override
- public void vertexCompleted(Vertex vertex) {
- }
-
- @Override
- public void scheduleTask(DAGEventSchedulerUpdate event) {
+ public void scheduleTaskEx(DAGEventSchedulerUpdate event) {
TaskAttempt attempt = event.getAttempt();
Vertex vertex = dag.getVertex(attempt.getVertexID());
int vertexDistanceFromRoot = vertex.getDistanceFromRoot();
@@ -69,11 +64,7 @@ public class DAGSchedulerNaturalOrder implements DAGScheduler {
}
@Override
- public void taskScheduled(DAGEventSchedulerUpdateTAAssigned event) {
- }
-
- @Override
- public void taskSucceeded(DAGEventSchedulerUpdate event) {
+ public void taskCompletedEx(DAGEventSchedulerUpdate event) {
}
@SuppressWarnings("unchecked")
http://git-wip-us.apache.org/repos/asf/tez/blob/34eb75d7/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGSchedulerNaturalOrderControlled.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGSchedulerNaturalOrderControlled.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGSchedulerNaturalOrderControlled.java
index 2469a2f..0802dce 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGSchedulerNaturalOrderControlled.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGSchedulerNaturalOrderControlled.java
@@ -36,7 +36,6 @@ import org.apache.tez.dag.app.dag.DAGScheduler;
import org.apache.tez.dag.app.dag.TaskAttempt;
import org.apache.tez.dag.app.dag.Vertex;
import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdate;
-import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdateTAAssigned;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventSchedule;
import org.apache.tez.dag.records.TezTaskAttemptID;
@@ -50,7 +49,7 @@ import org.apache.tez.dag.records.TezTaskAttemptID;
* - generic slow start mechanism across all vertices - independent of the type of edges.
*/
@SuppressWarnings("rawtypes")
-public class DAGSchedulerNaturalOrderControlled implements DAGScheduler {
+public class DAGSchedulerNaturalOrderControlled extends DAGScheduler {
private static final Logger LOG =
LoggerFactory.getLogger(DAGSchedulerNaturalOrderControlled.class);
@@ -72,13 +71,9 @@ public class DAGSchedulerNaturalOrderControlled implements DAGScheduler {
this.handler = dispatcher;
}
- @Override
- public void vertexCompleted(Vertex vertex) {
- }
-
// TODO Does ordering matter - it currently depends on the order returned by vertex.getOutput*
@Override
- public void scheduleTask(DAGEventSchedulerUpdate event) {
+ public void scheduleTaskEx(DAGEventSchedulerUpdate event) {
TaskAttempt attempt = event.getAttempt();
Vertex vertex = dag.getVertex(attempt.getVertexID());
int vertexDistanceFromRoot = vertex.getDistanceFromRoot();
@@ -241,11 +236,7 @@ public class DAGSchedulerNaturalOrderControlled implements DAGScheduler {
}
@Override
- public void taskScheduled(DAGEventSchedulerUpdateTAAssigned event) {
- }
-
- @Override
- public void taskSucceeded(DAGEventSchedulerUpdate event) {
+ public void taskCompletedEx(DAGEventSchedulerUpdate event) {
}
@SuppressWarnings("unchecked")
http://git-wip-us.apache.org/repos/asf/tez/blob/34eb75d7/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index 0f76a63..c00d674 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -825,40 +825,17 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
private void handleTaskAttemptCompletion(TezTaskAttemptID attemptId,
TaskAttemptStateInternal attemptState) {
this.sendTaskAttemptCompletionEvent(attemptId, attemptState);
- }
-
- // TODO: Recovery
- /*
- private static TaskFinishedEvent createTaskFinishedEvent(TaskImpl task, TaskStateInternal taskState) {
- TaskFinishedEvent tfe =
- new TaskFinishedEvent(task.taskId,
- task.successfulAttempt,
- task.getFinishTime(task.successfulAttempt),
- task.taskId.getTaskType(),
- taskState.toString(),
- task.getCounters());
- return tfe;
- }
-
- private static TaskFailedEvent createTaskFailedEvent(TaskImpl task, List<String> diag, TaskStateInternal taskState, TezTaskAttemptID taId) {
- StringBuilder errorSb = new StringBuilder();
- if (diag != null) {
- for (String d : diag) {
- errorSb.append(", ").append(d);
- }
+ if (getInternalState() != TaskStateInternal.SUCCEEDED) {
+ sendDAGSchedulerFinishedEvent(attemptId); // not a retro active action
}
- TaskFailedEvent taskFailedEvent = new TaskFailedEvent(
- TypeConverter.fromYarn(task.taskId),
- // Hack since getFinishTime needs isFinished to be true and that doesn't happen till after the transition.
- task.getFinishTime(taId),
- TypeConverter.fromYarn(task.getType()),
- errorSb.toString(),
- taskState.toString(),
- taId == null ? null : TypeConverter.fromYarn(taId));
- return taskFailedEvent;
}
- */
+ private void sendDAGSchedulerFinishedEvent(TezTaskAttemptID taId) {
+ // send notification to DAG scheduler
+ eventHandler.handle(new DAGEventSchedulerUpdate(
+ DAGEventSchedulerUpdate.UpdateType.TA_COMPLETED, attempts.get(taId)));
+ }
+
private static void unSucceed(TaskImpl task) {
task.commitAttempt = null;
task.successfulAttempt = null;
@@ -1105,10 +1082,6 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
.getID(), diagnostics, errCause));
}
}
- // send notification to DAG scheduler
- task.eventHandler.handle(new DAGEventSchedulerUpdate(
- DAGEventSchedulerUpdate.UpdateType.TA_SUCCEEDED, task.attempts
- .get(task.successfulAttempt)));
return task.finished(TaskStateInternal.SUCCEEDED);
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/34eb75d7/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 93baa0a..065974e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -1184,6 +1184,12 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
readLock.unlock();
}
}
+
+ @Override
+ public int getMaxTaskConcurrency() {
+ return vertexConf.getInt(TezConfiguration.TEZ_AM_VERTEX_MAX_TASK_CONCURRENCY,
+ TezConfiguration.TEZ_AM_VERTEX_MAX_TASK_CONCURRENCY_DEFAULT);
+ }
public VertexStats getVertexStats() {
http://git-wip-us.apache.org/repos/asf/tez/blob/34eb75d7/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java
index dbf8e38..f688b57 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java
@@ -69,7 +69,6 @@ import org.apache.tez.dag.app.dag.Vertex;
import org.apache.tez.dag.app.dag.event.DAGAppMasterEvent;
import org.apache.tez.dag.app.dag.event.DAGAppMasterEventSchedulingServiceError;
import org.apache.tez.dag.app.dag.event.DAGAppMasterEventType;
-import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdateTAAssigned;
import org.apache.tez.dag.app.rm.container.AMContainer;
import org.apache.tez.dag.app.rm.container.AMContainerEventAssignTA;
import org.apache.tez.dag.app.rm.container.AMContainerEventCompleted;
@@ -569,7 +568,6 @@ public class TaskSchedulerManager extends AbstractService implements
sendEvent(new AMContainerEventLaunchRequest(containerId, taskAttempt.getVertexID(),
event.getContainerContext(), event.getLauncherId(), event.getTaskCommId()));
}
- sendEvent(new DAGEventSchedulerUpdateTAAssigned(taskAttempt, container));
sendEvent(new AMContainerEventAssignTA(containerId, taskAttempt.getID(),
event.getRemoteTaskSpec(), event.getContainerContext().getLocalResources(), event
.getContainerContext().getCredentials(), event.getPriority()));
http://git-wip-us.apache.org/repos/asf/tez/blob/34eb75d7/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
index bc7fa98..b322e05 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/MockDAGAppMaster.java
@@ -83,7 +83,6 @@ import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
-@SuppressWarnings("unchecked")
public class MockDAGAppMaster extends DAGAppMaster {
private static final Logger LOG = LoggerFactory.getLogger(MockDAGAppMaster.class);
@@ -95,6 +94,7 @@ public class MockDAGAppMaster extends DAGAppMaster {
EventsDelegate eventsDelegate;
CountersDelegate countersDelegate;
StatisticsDelegate statsDelegate;
+ ContainerDelegate containerDelegate;
long launcherSleepTime = 1;
boolean doSleep = true;
int handlerConcurrency = 1;
@@ -115,6 +115,11 @@ public class MockDAGAppMaster extends DAGAppMaster {
public static interface EventsDelegate {
public void getEvents(TaskSpec taskSpec, List<TezEvent> events, long time);
}
+
+ public static interface ContainerDelegate {
+ public void stop(ContainerStopRequest event);
+ public void launch(ContainerLaunchRequest event);
+ }
// mock container launcher does not launch real tasks.
// Upon, launch of a container is simulates the container asking for tasks
@@ -268,6 +273,9 @@ public class MockDAGAppMaster extends DAGAppMaster {
void stop(ContainerStopRequest event) {
// remove from simulated container list
containers.remove(event.getContainerId());
+ if (containerDelegate != null) {
+ containerDelegate.stop(event);
+ }
getContext().containerStopRequested(event.getContainerId());
}
@@ -277,6 +285,9 @@ public class MockDAGAppMaster extends DAGAppMaster {
event.getContainerLaunchContext());
containers.put(event.getContainerId(), cData);
containersToProcess.add(cData);
+ if (containerDelegate != null) {
+ containerDelegate.launch(event);
+ }
getContext().containerLaunched(event.getContainerId());
}
http://git-wip-us.apache.org/repos/asf/tez/blob/34eb75d7/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
index b0bc571..d5ee67d 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
@@ -28,6 +28,7 @@ import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.SystemUtils;
@@ -73,6 +74,7 @@ import org.apache.tez.dag.api.client.VertexStatus;
import org.apache.tez.dag.api.client.VertexStatus.State;
import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
import org.apache.tez.dag.app.MockDAGAppMaster.CountersDelegate;
+import org.apache.tez.dag.app.MockDAGAppMaster.ContainerDelegate;
import org.apache.tez.dag.app.MockDAGAppMaster.EventsDelegate;
import org.apache.tez.dag.app.MockDAGAppMaster.MockContainerLauncher;
import org.apache.tez.dag.app.MockDAGAppMaster.MockContainerLauncher.ContainerData;
@@ -100,6 +102,8 @@ import org.apache.tez.runtime.api.impl.TaskSpec;
import org.apache.tez.runtime.api.impl.TaskStatistics;
import org.apache.tez.runtime.api.impl.TezEvent;
import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType;
+import org.apache.tez.serviceplugins.api.ContainerLaunchRequest;
+import org.apache.tez.serviceplugins.api.ContainerStopRequest;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
@@ -406,6 +410,52 @@ public class TestMockDAGAppMaster {
tezClient.stop();
}
+
+ @Test (timeout = 100000)
+ public void testConcurrencyLimit() throws Exception {
+ // the test relies on local mode behavior of launching a new container per task.
+ // so task concurrency == container concurrency
+ TezConfiguration tezconf = new TezConfiguration(defaultConf);
+
+ final int concurrencyLimit = 5;
+ MockTezClient tezClient = new MockTezClient("testMockAM", tezconf, true, null, null, null,
+ null, false, false, concurrencyLimit*4, 1000);
+
+ tezClient.start();
+
+ MockDAGAppMaster mockApp = tezClient.getLocalClient().getMockApp();
+ MockContainerLauncher mockLauncher = mockApp.getContainerLauncher();
+ mockLauncher.startScheduling(false);
+
+ final AtomicInteger concurrency = new AtomicInteger(0);
+ final AtomicBoolean exceededConcurrency = new AtomicBoolean(false);
+ mockApp.containerDelegate = new ContainerDelegate() {
+ @Override
+ public void stop(ContainerStopRequest event) {
+ concurrency.decrementAndGet();
+ }
+ @Override
+ public void launch(ContainerLaunchRequest event) {
+ int maxConc = concurrency.incrementAndGet();
+ if (maxConc > concurrencyLimit) {
+ exceededConcurrency.set(true);
+ }
+ System.out.println("Launched: " + maxConc);
+ }
+ };
+ DAG dag = DAG.create("testConcurrencyLimit");
+ Vertex vA = Vertex.create("A", ProcessorDescriptor.create("Proc.class"), 20).setConf(
+ TezConfiguration.TEZ_AM_VERTEX_MAX_TASK_CONCURRENCY, String.valueOf(concurrencyLimit));
+ dag.addVertex(vA);
+
+ mockLauncher.startScheduling(true);
+ DAGClient dagClient = tezClient.submitDAG(dag);
+ dagClient.waitForCompletion();
+ Assert.assertEquals(DAGStatus.State.SUCCEEDED, dagClient.getDAGStatus(null).getState());
+ Assert.assertFalse(exceededConcurrency.get());
+ tezClient.stop();
+ }
+
@Test (timeout = 10000)
public void testBasicCounters() throws Exception {
http://git-wip-us.apache.org/repos/asf/tez/blob/34eb75d7/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
index 1809230..2158368 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
@@ -1617,7 +1617,6 @@ public class TestDAGImpl {
Assert.assertEquals(VertexState.SUCCEEDED, v.getState());
Assert.assertEquals(1, dag.getSuccessfulVertices());
Assert.assertEquals(1, dag.numCompletedVertices);
- verify(dag.dagScheduler, times(1)).vertexCompleted(v);
dispatcher.getEventHandler().handle(
new VertexEventTaskReschedule(TezTaskID.getInstance(vId, 0)));
@@ -1634,9 +1633,6 @@ public class TestDAGImpl {
Assert.assertEquals(1, dag.getSuccessfulVertices());
Assert.assertEquals(1, dag.numCompletedVertices);
- // re-completion is not notified again
- verify(dag.dagScheduler, times(1)).vertexCompleted(v);
-
}
@SuppressWarnings("unchecked")
http://git-wip-us.apache.org/repos/asf/tez/blob/34eb75d7/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGScheduler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGScheduler.java
index 913f5fa..a28f367 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGScheduler.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGScheduler.java
@@ -25,27 +25,34 @@ import org.apache.tez.dag.app.dag.TaskAttempt;
import org.apache.tez.dag.app.dag.Vertex;
import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdate;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventSchedule;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
import org.junit.Assert;
import org.junit.Test;
+import com.google.common.collect.Lists;
+
import static org.mockito.Mockito.*;
+import java.util.List;
+
public class TestDAGScheduler {
class MockEventHandler implements EventHandler<TaskAttemptEventSchedule> {
TaskAttemptEventSchedule event;
+ List<TaskAttemptEventSchedule> events = Lists.newLinkedList();
@Override
public void handle(TaskAttemptEventSchedule event) {
this.event = event;
+ this.events.add(event);
}
-
}
- MockEventHandler mockEventHandler = new MockEventHandler();
@Test(timeout=5000)
public void testDAGSchedulerNaturalOrder() {
+ MockEventHandler mockEventHandler = new MockEventHandler();
DAG mockDag = mock(DAG.class);
Vertex mockVertex = mock(Vertex.class);
TaskAttempt mockAttempt = mock(TaskAttempt.class);
@@ -58,15 +65,125 @@ public class TestDAGScheduler {
DAGScheduler scheduler = new DAGSchedulerNaturalOrder(mockDag,
mockEventHandler);
- scheduler.scheduleTask(event);
+ scheduler.scheduleTaskEx(event);
Assert.assertEquals(1, mockEventHandler.event.getPriorityHighLimit());
Assert.assertEquals(3, mockEventHandler.event.getPriorityLowLimit());
- scheduler.scheduleTask(event);
+ scheduler.scheduleTaskEx(event);
Assert.assertEquals(4, mockEventHandler.event.getPriorityHighLimit());
Assert.assertEquals(6, mockEventHandler.event.getPriorityLowLimit());
- scheduler.scheduleTask(event);
+ scheduler.scheduleTaskEx(event);
Assert.assertEquals(7, mockEventHandler.event.getPriorityHighLimit());
Assert.assertEquals(9, mockEventHandler.event.getPriorityLowLimit());
}
+ @Test(timeout=5000)
+ public void testConcurrencyLimit() {
+ MockEventHandler mockEventHandler = new MockEventHandler();
+ DAG mockDag = mock(DAG.class);
+ TezVertexID vId0 = TezVertexID.fromString("vertex_1436907267600_195589_1_00");
+ TezVertexID vId1 = TezVertexID.fromString("vertex_1436907267600_195589_1_01");
+ TezTaskID tId0 = TezTaskID.getInstance(vId0, 0);
+ TezTaskID tId1 = TezTaskID.getInstance(vId1, 0);
+
+ TaskAttempt mockAttempt;
+
+ Vertex mockVertex = mock(Vertex.class);
+ when(mockDag.getVertex((TezVertexID) any())).thenReturn(mockVertex);
+ when(mockVertex.getDistanceFromRoot()).thenReturn(0);
+
+ DAGScheduler scheduler = new DAGSchedulerNaturalOrder(mockDag,
+ mockEventHandler);
+ scheduler.addVertexConcurrencyLimit(vId0, 0); // not effective
+
+ // schedule beyond limit and it gets scheduled
+ mockAttempt = mock(TaskAttempt.class);
+ when(mockAttempt.getID()).thenReturn(TezTaskAttemptID.getInstance(tId0, 0));
+ scheduler.scheduleTask(new DAGEventSchedulerUpdate(
+ DAGEventSchedulerUpdate.UpdateType.TA_SCHEDULE, mockAttempt));
+ Assert.assertEquals(1, mockEventHandler.events.size());
+ mockAttempt = mock(TaskAttempt.class);
+ when(mockAttempt.getID()).thenReturn(TezTaskAttemptID.getInstance(tId0, 1));
+ scheduler.scheduleTask(new DAGEventSchedulerUpdate(
+ DAGEventSchedulerUpdate.UpdateType.TA_SCHEDULE, mockAttempt));
+ Assert.assertEquals(2, mockEventHandler.events.size());
+ mockAttempt = mock(TaskAttempt.class);
+ when(mockAttempt.getID()).thenReturn(TezTaskAttemptID.getInstance(tId0, 2));
+ scheduler.scheduleTask(new DAGEventSchedulerUpdate(
+ DAGEventSchedulerUpdate.UpdateType.TA_SCHEDULE, mockAttempt));
+ Assert.assertEquals(3, mockEventHandler.events.size());
+
+ mockEventHandler.events.clear();
+ List<TaskAttempt> mockAttempts = Lists.newArrayList();
+ int completed = 0;
+ int requested = 0;
+ int scheduled = 0;
+ scheduler.addVertexConcurrencyLimit(vId1, 2); // effective
+
+ // schedule beyond limit and it gets buffered
+ mockAttempt = mock(TaskAttempt.class);
+ mockAttempts.add(mockAttempt);
+ when(mockAttempt.getID()).thenReturn(TezTaskAttemptID.getInstance(tId1, requested++));
+ scheduler.scheduleTask(new DAGEventSchedulerUpdate(
+ DAGEventSchedulerUpdate.UpdateType.TA_SCHEDULE, mockAttempt));
+ Assert.assertEquals(scheduled+1, mockEventHandler.events.size()); // scheduled
+ Assert.assertEquals(mockAttempts.get(scheduled).getID(),
+ mockEventHandler.events.get(scheduled).getTaskAttemptID()); // matches order
+ scheduled++;
+
+ mockAttempt = mock(TaskAttempt.class);
+ mockAttempts.add(mockAttempt);
+ when(mockAttempt.getID()).thenReturn(TezTaskAttemptID.getInstance(tId1, requested++));
+ scheduler.scheduleTask(new DAGEventSchedulerUpdate(
+ DAGEventSchedulerUpdate.UpdateType.TA_SCHEDULE, mockAttempt));
+ Assert.assertEquals(scheduled+1, mockEventHandler.events.size()); // scheduled
+ Assert.assertEquals(mockAttempts.get(scheduled).getID(),
+ mockEventHandler.events.get(scheduled).getTaskAttemptID()); // matches order
+ scheduled++;
+
+ mockAttempt = mock(TaskAttempt.class);
+ mockAttempts.add(mockAttempt);
+ when(mockAttempt.getID()).thenReturn(TezTaskAttemptID.getInstance(tId1, requested++));
+ scheduler.scheduleTask(new DAGEventSchedulerUpdate(
+ DAGEventSchedulerUpdate.UpdateType.TA_SCHEDULE, mockAttempt));
+ Assert.assertEquals(scheduled, mockEventHandler.events.size()); // buffered
+
+ mockAttempt = mock(TaskAttempt.class);
+ mockAttempts.add(mockAttempt);
+ when(mockAttempt.getID()).thenReturn(TezTaskAttemptID.getInstance(tId1, requested++));
+ scheduler.scheduleTask(new DAGEventSchedulerUpdate(
+ DAGEventSchedulerUpdate.UpdateType.TA_SCHEDULE, mockAttempt));
+ Assert.assertEquals(scheduled, mockEventHandler.events.size()); // buffered
+
+ scheduler.taskCompleted(new DAGEventSchedulerUpdate(
+ DAGEventSchedulerUpdate.UpdateType.TA_COMPLETED, mockAttempts.get(completed++)));
+ Assert.assertEquals(scheduled+1, mockEventHandler.events.size()); // scheduled
+ Assert.assertEquals(mockAttempts.get(scheduled).getID(),
+ mockEventHandler.events.get(scheduled).getTaskAttemptID()); // matches order
+ scheduled++;
+
+ scheduler.taskCompleted(new DAGEventSchedulerUpdate(
+ DAGEventSchedulerUpdate.UpdateType.TA_COMPLETED, mockAttempts.get(completed++)));
+ Assert.assertEquals(scheduled+1, mockEventHandler.events.size()); // scheduled
+ Assert.assertEquals(mockAttempts.get(scheduled).getID(),
+ mockEventHandler.events.get(scheduled).getTaskAttemptID()); // matches order
+ scheduled++;
+
+ scheduler.taskCompleted(new DAGEventSchedulerUpdate(
+ DAGEventSchedulerUpdate.UpdateType.TA_COMPLETED, mockAttempts.get(completed++)));
+ Assert.assertEquals(scheduled, mockEventHandler.events.size()); // no extra scheduling
+
+ mockAttempt = mock(TaskAttempt.class);
+ mockAttempts.add(mockAttempt);
+ when(mockAttempt.getID()).thenReturn(TezTaskAttemptID.getInstance(tId1, requested++));
+ scheduler.scheduleTask(new DAGEventSchedulerUpdate(
+ DAGEventSchedulerUpdate.UpdateType.TA_SCHEDULE, mockAttempt));
+ Assert.assertEquals(scheduled+1, mockEventHandler.events.size()); // scheduled
+ Assert.assertEquals(mockAttempts.get(scheduled).getID(),
+ mockEventHandler.events.get(scheduled).getTaskAttemptID()); // matches order
+ scheduled++;
+
+ }
+
+
+
}
http://git-wip-us.apache.org/repos/asf/tez/blob/34eb75d7/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGSchedulerNaturalOrderControlled.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGSchedulerNaturalOrderControlled.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGSchedulerNaturalOrderControlled.java
index bc86761..63137c7 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGSchedulerNaturalOrderControlled.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGSchedulerNaturalOrderControlled.java
@@ -63,35 +63,35 @@ public class TestDAGSchedulerNaturalOrderControlled {
// Schedule all tasks belonging to v0
for (int i = 0; i < vertices[0].getTotalTasks(); i++) {
- dagScheduler.scheduleTask(createScheduleRequest(vertices[0].getVertexId(), i, 0));
+ dagScheduler.scheduleTaskEx(createScheduleRequest(vertices[0].getVertexId(), i, 0));
}
verify(eventHandler, times(vertices[0].getTotalTasks())).handle(any(Event.class));
reset(eventHandler);
// Schedule 3 tasks belonging to v2
for (int i = 0; i < 3; i++) {
- dagScheduler.scheduleTask(createScheduleRequest(vertices[2].getVertexId(), i, 0));
+ dagScheduler.scheduleTaskEx(createScheduleRequest(vertices[2].getVertexId(), i, 0));
}
verify(eventHandler, times(3)).handle(any(Event.class));
reset(eventHandler);
// Schedule 3 tasks belonging to v3
for (int i = 0; i < 3; i++) {
- dagScheduler.scheduleTask(createScheduleRequest(vertices[3].getVertexId(), i, 0));
+ dagScheduler.scheduleTaskEx(createScheduleRequest(vertices[3].getVertexId(), i, 0));
}
verify(eventHandler, times(3)).handle(any(Event.class));
reset(eventHandler);
// Schedule remaining tasks belonging to v2
for (int i = 3; i < vertices[2].getTotalTasks(); i++) {
- dagScheduler.scheduleTask(createScheduleRequest(vertices[2].getVertexId(), i, 0));
+ dagScheduler.scheduleTaskEx(createScheduleRequest(vertices[2].getVertexId(), i, 0));
}
verify(eventHandler, times(vertices[2].getTotalTasks() - 3)).handle(any(Event.class));
reset(eventHandler);
// Schedule remaining tasks belonging to v3
for (int i = 3; i < vertices[3].getTotalTasks(); i++) {
- dagScheduler.scheduleTask(createScheduleRequest(vertices[3].getVertexId(), i, 0));
+ dagScheduler.scheduleTaskEx(createScheduleRequest(vertices[3].getVertexId(), i, 0));
}
verify(eventHandler, times(vertices[3].getTotalTasks() - 3)).handle(any(Event.class));
reset(eventHandler);
@@ -99,7 +99,7 @@ public class TestDAGSchedulerNaturalOrderControlled {
// Schedule all tasks belonging to v4
for (int i = 0; i < vertices[4].getTotalTasks(); i++) {
- dagScheduler.scheduleTask(createScheduleRequest(vertices[4].getVertexId(), i, 0));
+ dagScheduler.scheduleTaskEx(createScheduleRequest(vertices[4].getVertexId(), i, 0));
}
verify(eventHandler, times(vertices[4].getTotalTasks())).handle(any(Event.class));
reset(eventHandler);
@@ -122,7 +122,7 @@ public class TestDAGSchedulerNaturalOrderControlled {
// Schedule all tasks belonging to v0
for (int i = 0; i < vertices[0].getTotalTasks(); i++) {
- dagScheduler.scheduleTask(createScheduleRequest(vertices[0].getVertexId(), i, 0));
+ dagScheduler.scheduleTaskEx(createScheduleRequest(vertices[0].getVertexId(), i, 0));
}
verify(eventHandler, times(vertices[0].getTotalTasks())).handle(any(Event.class));
reset(eventHandler);
@@ -130,14 +130,14 @@ public class TestDAGSchedulerNaturalOrderControlled {
// v2 behaving as if configured with slow-start.
// Schedule all tasks belonging to v3.
for (int i = 0; i < vertices[3].getTotalTasks(); i++) {
- dagScheduler.scheduleTask(createScheduleRequest(vertices[3].getVertexId(), i, 0));
+ dagScheduler.scheduleTaskEx(createScheduleRequest(vertices[3].getVertexId(), i, 0));
}
verify(eventHandler, times(vertices[3].getTotalTasks())).handle(any(Event.class));
reset(eventHandler);
// Scheduling all tasks belonging to v4. None should get scheduled.
for (int i = 0; i < vertices[4].getTotalTasks(); i++) {
- dagScheduler.scheduleTask(createScheduleRequest(vertices[4].getVertexId(), i, 0));
+ dagScheduler.scheduleTaskEx(createScheduleRequest(vertices[4].getVertexId(), i, 0));
}
verify(eventHandler, never()).handle(any(Event.class));
reset(eventHandler);
@@ -145,14 +145,14 @@ public class TestDAGSchedulerNaturalOrderControlled {
// v2 now starts scheduling ...
// Schedule 3 tasks for v2 initially.
for (int i = 0; i < 3; i++) {
- dagScheduler.scheduleTask(createScheduleRequest(vertices[2].getVertexId(), i, 0));
+ dagScheduler.scheduleTaskEx(createScheduleRequest(vertices[2].getVertexId(), i, 0));
}
verify(eventHandler, times(3)).handle(any(Event.class));
reset(eventHandler);
// Schedule remaining tasks belonging to v2
for (int i = 3; i < vertices[2].getTotalTasks(); i++) {
- dagScheduler.scheduleTask(createScheduleRequest(vertices[2].getVertexId(), i, 0));
+ dagScheduler.scheduleTaskEx(createScheduleRequest(vertices[2].getVertexId(), i, 0));
}
ArgumentCaptor<Event> args = ArgumentCaptor.forClass(Event.class);
// All of v2 and v3 should be sent out.
@@ -190,7 +190,7 @@ public class TestDAGSchedulerNaturalOrderControlled {
// Schedule all tasks belonging to v0
for (int i = 0; i < vertices[0].getTotalTasks(); i++) {
- dagScheduler.scheduleTask(createScheduleRequest(vertices[0].getVertexId(), i, 0));
+ dagScheduler.scheduleTaskEx(createScheduleRequest(vertices[0].getVertexId(), i, 0));
}
verify(eventHandler, times(vertices[0].getTotalTasks())).handle(any(Event.class));
reset(eventHandler);
@@ -200,14 +200,14 @@ public class TestDAGSchedulerNaturalOrderControlled {
// v2 will change parallelism
// Schedule all tasks belonging to v3
for (int i = 0; i < vertices[3].getTotalTasks(); i++) {
- dagScheduler.scheduleTask(createScheduleRequest(vertices[3].getVertexId(), i, 0));
+ dagScheduler.scheduleTaskEx(createScheduleRequest(vertices[3].getVertexId(), i, 0));
}
verify(eventHandler, times(vertices[3].getTotalTasks())).handle(any(Event.class));
reset(eventHandler);
// Schedule all tasks belonging to v4
for (int i = 0; i < vertices[4].getTotalTasks(); i++) {
- dagScheduler.scheduleTask(createScheduleRequest(vertices[4].getVertexId(), i, 0));
+ dagScheduler.scheduleTaskEx(createScheduleRequest(vertices[4].getVertexId(), i, 0));
}
verify(eventHandler, never()).handle(any(Event.class));
reset(eventHandler);
@@ -218,7 +218,7 @@ public class TestDAGSchedulerNaturalOrderControlled {
// Schedule all tasks belonging to v2
for (int i = 0; i < vertices[2].getTotalTasks(); i++) {
- dagScheduler.scheduleTask(createScheduleRequest(vertices[2].getVertexId(), i, 0));
+ dagScheduler.scheduleTaskEx(createScheduleRequest(vertices[2].getVertexId(), i, 0));
}
verify(eventHandler, times(vertices[2].getTotalTasks() + vertices[4].getTotalTasks()))
.handle(any(Event.class));
@@ -241,7 +241,7 @@ public class TestDAGSchedulerNaturalOrderControlled {
// Schedule all but 1 task belonging to v0
for (int i = 0; i < vertices[0].getTotalTasks() - 1; i++) {
- dagScheduler.scheduleTask(createScheduleRequest(vertices[0].getVertexId(), i, 0));
+ dagScheduler.scheduleTaskEx(createScheduleRequest(vertices[0].getVertexId(), i, 0));
}
verify(eventHandler, times(vertices[0].getTotalTasks() - 1)).handle(any(Event.class));
reset(eventHandler);
@@ -249,7 +249,7 @@ public class TestDAGSchedulerNaturalOrderControlled {
// Schedule all tasks belonging to v2
for (int i = 0; i < vertices[2].getTotalTasks(); i++) {
- dagScheduler.scheduleTask(createScheduleRequest(vertices[2].getVertexId(), i, 0));
+ dagScheduler.scheduleTaskEx(createScheduleRequest(vertices[2].getVertexId(), i, 0));
}
// Nothing should be scheduled
verify(eventHandler, never()).handle(any(Event.class));
@@ -257,14 +257,14 @@ public class TestDAGSchedulerNaturalOrderControlled {
// Schedule an extra attempt for all but 1 task belonging to v0
for (int i = 0; i < vertices[0].getTotalTasks() - 1; i++) {
- dagScheduler.scheduleTask(createScheduleRequest(vertices[0].getVertexId(), i, 1));
+ dagScheduler.scheduleTaskEx(createScheduleRequest(vertices[0].getVertexId(), i, 1));
}
// Only v0 requests should have gone out
verify(eventHandler, times(vertices[0].getTotalTasks() - 1)).handle(any(Event.class));
reset(eventHandler);
// Schedule last task of v0, with attempt 1
- dagScheduler.scheduleTask(
+ dagScheduler.scheduleTaskEx(
createScheduleRequest(vertices[0].getVertexId(), vertices[0].getTotalTasks() - 1, 1));
// One v0 request and all of v2 should have gone out
verify(eventHandler, times(1 + vertices[2].getTotalTasks())).handle(any(Event.class));
http://git-wip-us.apache.org/repos/asf/tez/blob/34eb75d7/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerManager.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerManager.java
index 4db51b9..8e4e4f0 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerManager.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerManager.java
@@ -191,10 +191,10 @@ public class TestTaskSchedulerManager {
new AMSchedulerEventTALaunchRequest(mockAttemptId, resource, null, mockTaskAttempt, locHint,
priority, containerContext, 0, 0, 0);
schedulerHandler.taskAllocated(0, mockTaskAttempt, lr, container);
- assertEquals(2, mockEventHandler.events.size());
- assertTrue(mockEventHandler.events.get(1) instanceof AMContainerEventAssignTA);
+ assertEquals(1, mockEventHandler.events.size());
+ assertTrue(mockEventHandler.events.get(0) instanceof AMContainerEventAssignTA);
AMContainerEventAssignTA assignEvent =
- (AMContainerEventAssignTA) mockEventHandler.events.get(1);
+ (AMContainerEventAssignTA) mockEventHandler.events.get(0);
assertEquals(priority, assignEvent.getPriority());
assertEquals(mockAttemptId, assignEvent.getTaskAttemptId());
}