You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2014/11/08 23:02:51 UTC
tez git commit: TEZ-1750. Add a DAGScheduler which schedules tasks
only when sources have been scheduled. (sseth) (cherry picked from commit
67944a1dff87faf3b8bf7ecf3afb1bcda6c43dda)
Repository: tez
Updated Branches:
refs/heads/branch-0.5 b99d9afac -> be83286d6
TEZ-1750. Add a DAGScheduler which schedules tasks only when sources
have been scheduled. (sseth)
(cherry picked from commit 67944a1dff87faf3b8bf7ecf3afb1bcda6c43dda)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/be83286d
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/be83286d
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/be83286d
Branch: refs/heads/branch-0.5
Commit: be83286d657732329f109d1c1d7c9881cfe7ac0c
Parents: b99d9af
Author: Siddharth Seth <ss...@apache.org>
Authored: Sat Nov 8 14:02:04 2014 -0800
Committer: Siddharth Seth <ss...@apache.org>
Committed: Sat Nov 8 14:02:41 2014 -0800
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/tez/dag/api/TezConfiguration.java | 8 +-
.../apache/tez/dag/app/dag/impl/DAGImpl.java | 9 +-
.../DAGSchedulerNaturalOrderControlled.java | 256 +++++++++++++
.../TestDAGSchedulerNaturalOrderControlled.java | 374 +++++++++++++++++++
5 files changed, 645 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/be83286d/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 55c195e..348541d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -10,6 +10,7 @@ ALL CHANGES:
TEZ-1747. Increase test timeout for TestSecureShuffle.
TEZ-1746. Flaky test in TestVertexImpl and TestExceptionPropagation.
TEZ-1749. Increase test timeout for TestLocalMode.testMultipleClientsWithSession
+ TEZ-1750. Add a DAGScheduler which schedules tasks only when sources have been scheduled.
Release 0.5.2: 2014-11-07
http://git-wip-us.apache.org/repos/asf/tez/blob/be83286d/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index d9003b3..6873863 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -339,6 +339,12 @@ public class TezConfiguration extends Configuration {
public static final String TEZ_AM_CLIENT_AM_PORT_RANGE =
TEZ_AM_PREFIX + "client.am.port-range";
+ /**
+ * String value. The class to be used for DAG Scheduling. Expert level setting.
+ */
+ public static final String TEZ_AM_DAG_SCHEDULER_CLASS = TEZ_AM_PREFIX + "dag.scheduler.class";
+ public static final String TEZ_AM_DAG_SCHEDULER_CLASS_DEFAULT =
+ "org.apache.tez.dag.app.dag.impl.DAGSchedulerNaturalOrder";
/** Int value. The amount of memory in MB to be used by the AppMaster */
public static final String TEZ_AM_RESOURCE_MEMORY_MB = TEZ_AM_PREFIX
@@ -367,7 +373,7 @@ public class TezConfiguration extends Configuration {
/**
* Int value. The maximum heartbeat interval between the AM and RM in milliseconds
* Increasing this reduces the communication between the AM and the RM and can
- * help in scaling up. Expert level setting. Expert level setting.
+ * help in scaling up. Expert level setting.
*/
public static final String TEZ_AM_RM_HEARTBEAT_INTERVAL_MS_MAX = TEZ_AM_PREFIX
+ "am-rm.heartbeat.interval-ms.max";
http://git-wip-us.apache.org/repos/asf/tez/blob/be83286d/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index cddcbd5..f877eb4 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -54,6 +54,7 @@ import org.apache.hadoop.yarn.state.StateMachine;
import org.apache.hadoop.yarn.state.StateMachineFactory;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.tez.common.ATSConstants;
+import org.apache.tez.common.ReflectionUtils;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.DagTypeConverters;
import org.apache.tez.dag.api.EdgeProperty;
@@ -75,6 +76,7 @@ import org.apache.tez.dag.api.records.DAGProtos.VertexPlan;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.TaskAttemptListener;
import org.apache.tez.dag.app.TaskHeartbeatHandler;
+import org.apache.tez.dag.app.dag.DAG;
import org.apache.tez.dag.app.dag.DAGReport;
import org.apache.tez.dag.app.dag.DAGScheduler;
import org.apache.tez.dag.app.dag.DAGState;
@@ -1333,8 +1335,11 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
}
private static void assignDAGScheduler(DAGImpl dag) {
- LOG.info("Using Natural order dag scheduler");
- dag.dagScheduler = new DAGSchedulerNaturalOrder(dag, dag.eventHandler);
+ String dagSchedulerClassName = dag.conf.get(TezConfiguration.TEZ_AM_DAG_SCHEDULER_CLASS,
+ TezConfiguration.TEZ_AM_DAG_SCHEDULER_CLASS_DEFAULT);
+ LOG.info("Using DAG Scheduler: " + dagSchedulerClassName);
+ dag.dagScheduler = ReflectionUtils.createClazzInstance(dagSchedulerClassName, new Class<?>[] {
+ DAG.class, EventHandler.class}, new Object[] {dag, dag.eventHandler});
}
private static VertexImpl createVertex(DAGImpl dag, String vertexName, int vId) {
http://git-wip-us.apache.org/repos/asf/tez/blob/be83286d/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGSchedulerNaturalOrderControlled.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGSchedulerNaturalOrderControlled.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGSchedulerNaturalOrderControlled.java
new file mode 100644
index 0000000..7cfbf5a
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGSchedulerNaturalOrderControlled.java
@@ -0,0 +1,256 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.dag.impl;
+
+import java.util.BitSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.collect.LinkedListMultimap;
+import com.google.common.collect.ListMultimap;
+import com.google.common.collect.Lists;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.tez.dag.app.dag.DAG;
+import org.apache.tez.dag.app.dag.DAGScheduler;
+import org.apache.tez.dag.app.dag.TaskAttempt;
+import org.apache.tez.dag.app.dag.Vertex;
+import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdate;
+import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdateTAAssigned;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventSchedule;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+
+/**
+ * Schedules task attempts belonging to downstream vertices only after all attempts belonging to
+ * upstream vertices have been scheduled. If there's a slow start or delayed start of a particular
+ * vertex, this ensures that downstream tasks are not started before this</p>
+ * Some future enhancements
+ * - consider cluster capacity - and be more aggressive about scheduling downstream tasks before
+ * upstream tasks have completed. </p>
+ * - generic slow start mechanism across all vertices - independent of the type of edges.
+ */
+@SuppressWarnings("rawtypes")
+public class DAGSchedulerNaturalOrderControlled implements DAGScheduler {
+
+ private static final Log LOG =
+ LogFactory.getLog(DAGSchedulerNaturalOrderControlled.class);
+
+ private final DAG dag;
+ private final EventHandler handler;
+
+ // Tracks pending events, in case they're not sent immediately.
+ private final ListMultimap<String, TaskAttemptEventSchedule> pendingEvents =
+ LinkedListMultimap.create();
+ // Tacks vertices for which no additional scheduling checks are required. Once in this list, the
+ // vertex is considered to be fully scheduled.
+ private final Set<String> scheduledVertices = new HashSet<String>();
+ // Tracks tasks scheduled for a vertex.
+ private final Map<String, BitSet> vertexScheduledTasks = new HashMap<String, BitSet>();
+
+ public DAGSchedulerNaturalOrderControlled(DAG dag, EventHandler dispatcher) {
+ this.dag = dag;
+ this.handler = dispatcher;
+ }
+
+ @Override
+ public void vertexCompleted(Vertex vertex) {
+ }
+
+ // TODO Does ordering matter - it currently depends on the order returned by vertex.getOutput*
+ @Override
+ public void scheduleTask(DAGEventSchedulerUpdate event) {
+ TaskAttempt attempt = event.getAttempt();
+ Vertex vertex = dag.getVertex(attempt.getVertexID());
+ int vertexDistanceFromRoot = vertex.getDistanceFromRoot();
+
+ // natural priority. Handles failures and retries.
+ int priorityLowLimit = (vertexDistanceFromRoot + 1) * 3;
+ int priorityHighLimit = priorityLowLimit - 2;
+
+ TaskAttemptEventSchedule attemptEvent = new TaskAttemptEventSchedule(
+ attempt.getID(), priorityLowLimit, priorityHighLimit);
+
+ taskAttemptSeen(vertex.getName(), attempt.getID());
+
+ if (vertexAlreadyScheduled(vertex)) {
+ // Vertex previously marked ready for scheduling.
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Scheduling " + attempt.getID() + " between priorityLow: " + priorityLowLimit
+ + " and priorityHigh: " + priorityHighLimit);
+ }
+ sendEvent(attemptEvent);
+ // A new taks coming in here could send us over the enough tasks scheduled limit.
+ processDownstreamVertices(vertex);
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Attempting to schedule vertex: " + vertex.getLogIdentifier() +
+ " due to schedule event");
+ }
+ boolean scheduled = trySchedulingVertex(vertex);
+ if (scheduled) {
+ LOG.info("Scheduled vertex: " + vertex.getLogIdentifier());
+ // If ready to be scheduled, send out pending events and the current event.
+ // Send events out for this vertex first. Then try scheduling downstream vertices.
+ sendEventsForVertex(vertex.getName());
+ sendEvent(attemptEvent);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Processing downstream vertices for vertex: " + vertex.getLogIdentifier());
+ }
+ processDownstreamVertices(vertex);
+ } else {
+ pendingEvents.put(vertex.getName(), attemptEvent);
+ }
+ }
+ }
+
+ private void taskAttemptSeen(String vertexName, TezTaskAttemptID taskAttemptID) {
+ BitSet scheduledTasks = vertexScheduledTasks.get(vertexName);
+ if (scheduledTasks == null) {
+ scheduledTasks = new BitSet();
+ vertexScheduledTasks.put(vertexName, scheduledTasks);
+ }
+ if (taskAttemptID != null) { // null for 0 task vertices
+ scheduledTasks.set(taskAttemptID.getTaskID().getId());
+ }
+ }
+
+ private void sendEventsForVertex(String vertexName) {
+ for (TaskAttemptEventSchedule event : pendingEvents.removeAll(vertexName)) {
+ sendEvent(event);
+ }
+ }
+
+ /* Checks whether this vertex has been marked as ready to go in the past */
+ private boolean vertexAlreadyScheduled(Vertex vertex) {
+ return scheduledVertices.contains(vertex.getName());
+ }
+
+ private boolean scheduledTasksForwarded(Vertex vertex) {
+ boolean canSchedule = false;
+ BitSet scheduledTasks = vertexScheduledTasks.get(vertex.getName());
+ if (scheduledTasks != null) {
+ if (scheduledTasks.cardinality() >= vertex.getTotalTasks()) {
+ canSchedule = true;
+ }
+ }
+ return canSchedule;
+ }
+
+ private void processDownstreamVertices(Vertex vertex) {
+ List<Vertex> newlyScheduledVertices = Lists.newLinkedList();
+ Map<Vertex, Edge> outputVertexEdgeMap = vertex.getOutputVertices();
+ for (Vertex destVertex : outputVertexEdgeMap.keySet()) {
+ if (vertexAlreadyScheduled(destVertex)) { // Nothing to do if already scheduled.
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Attempting to schedule vertex: " + destVertex.getLogIdentifier() +
+ " due to upstream event from " + vertex.getLogIdentifier());
+ }
+ boolean scheduled = trySchedulingVertex(destVertex);
+ if (scheduled) {
+ LOG.info("Scheduled vertex: " + destVertex.getLogIdentifier() +
+ " due to upstream event from " + vertex.getLogIdentifier());
+ sendEventsForVertex(destVertex.getName());
+ newlyScheduledVertices.add(destVertex);
+ }
+ }
+ }
+
+ // Try scheduling all downstream vertices which were scheduled in this run.
+ for (Vertex downStreamVertex : newlyScheduledVertices) {
+ processDownstreamVertices(downStreamVertex);
+ }
+ }
+
+ /* Process the specified vertex, and add it to the cache of scheduled vertices if it can be scheduled */
+ private boolean trySchedulingVertex(Vertex vertex) {
+ boolean canSchedule = true;
+ if (vertexScheduledTasks.get(vertex.getName()) == null) {
+ // No scheduled requests seen yet. Do not mark this as ready.
+ // 0 task vertices handled elsewhere.
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "No schedule requests for vertex: " + vertex.getLogIdentifier() + ", Not scheduling");
+ }
+ canSchedule = false;
+ } else {
+ Map<Vertex, Edge> inputVertexEdgeMap = vertex.getInputVertices();
+ if (inputVertexEdgeMap == null || inputVertexEdgeMap.isEmpty()) {
+ // Nothing to wait for. Go ahead and scheduled.
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("No sources for vertex: " + vertex.getLogIdentifier() + ", Scheduling now");
+ }
+ } else {
+ // Check if all sources are scheduled.
+ for (Vertex srcVertex : inputVertexEdgeMap.keySet()) {
+ if (scheduledTasksForwarded(srcVertex)) {
+ // Nothing to wait for. Go ahead and check the next source.
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Trying to schedule: " + vertex.getLogIdentifier() +
+ ", All tasks forwarded for srcVertex: " + srcVertex.getLogIdentifier() +
+ ", count: " + srcVertex.getTotalTasks());
+ }
+ } else {
+ // Special case for vertices with 0 tasks. 0 check is sufficient since parallelism cannot increase.
+ if (srcVertex.getTotalTasks() == 0) {
+ LOG.info(
+ "Vertex: " + srcVertex.getLogIdentifier() + " has 0 tasks. Marking as scheduled");
+ scheduledVertices.add(srcVertex.getName());
+ taskAttemptSeen(srcVertex.getName(), null);
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Not all sources schedule requests complete while trying to schedule: " +
+ vertex.getLogIdentifier() + ", For source vertex: " +
+ srcVertex.getLogIdentifier() + ", Forwarded requests: " +
+ (vertexScheduledTasks.get(srcVertex.getName()) == null ? "null(0)" :
+ vertexScheduledTasks.get(srcVertex.getName()).cardinality()) +
+ " out of " + srcVertex.getTotalTasks());
+ }
+ canSchedule = false;
+ break;
+ }
+ }
+ }
+ }
+ }
+ if (canSchedule) {
+ scheduledVertices.add(vertex.getName());
+ }
+ return canSchedule;
+ }
+
+ @Override
+ public void taskScheduled(DAGEventSchedulerUpdateTAAssigned event) {
+ }
+
+ @Override
+ public void taskSucceeded(DAGEventSchedulerUpdate event) {
+ }
+
+ @SuppressWarnings("unchecked")
+ private void sendEvent(TaskAttemptEventSchedule event) {
+ handler.handle(event);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/be83286d/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGSchedulerNaturalOrderControlled.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGSchedulerNaturalOrderControlled.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGSchedulerNaturalOrderControlled.java
new file mode 100644
index 0000000..88a91b6
--- /dev/null
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGSchedulerNaturalOrderControlled.java
@@ -0,0 +1,374 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.dag.impl;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.event.Event;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.tez.dag.app.dag.DAG;
+import org.apache.tez.dag.app.dag.TaskAttempt;
+import org.apache.tez.dag.app.dag.Vertex;
+import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdate;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventSchedule;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.dag.records.TezVertexID;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+public class TestDAGSchedulerNaturalOrderControlled {
+
+ @Test(timeout = 5000)
+ public void testSimpleFlow() {
+ EventHandler eventHandler = mock(EventHandler.class);
+ DAG dag = createMockDag();
+ DAGSchedulerNaturalOrderControlled dagScheduler =
+ new DAGSchedulerNaturalOrderControlled(dag, eventHandler);
+
+ int numVertices = 5;
+ Vertex[] vertices = new Vertex[numVertices];
+ for (int i = 0; i < numVertices; i++) {
+ vertices[i] = dag.getVertex("vertex" + i);
+ }
+
+ // Schedule all tasks belonging to v0
+ for (int i = 0; i < vertices[0].getTotalTasks(); i++) {
+ dagScheduler.scheduleTask(createScheduleRequest(vertices[0].getVertexId(), i, 0));
+ }
+ verify(eventHandler, times(vertices[0].getTotalTasks())).handle(any(Event.class));
+ reset(eventHandler);
+
+ // Schedule 3 tasks belonging to v2
+ for (int i = 0; i < 3; i++) {
+ dagScheduler.scheduleTask(createScheduleRequest(vertices[2].getVertexId(), i, 0));
+ }
+ verify(eventHandler, times(3)).handle(any(Event.class));
+ reset(eventHandler);
+
+ // Schedule 3 tasks belonging to v3
+ for (int i = 0; i < 3; i++) {
+ dagScheduler.scheduleTask(createScheduleRequest(vertices[3].getVertexId(), i, 0));
+ }
+ verify(eventHandler, times(3)).handle(any(Event.class));
+ reset(eventHandler);
+
+ // Schedule remaining tasks belonging to v2
+ for (int i = 3; i < vertices[2].getTotalTasks(); i++) {
+ dagScheduler.scheduleTask(createScheduleRequest(vertices[2].getVertexId(), i, 0));
+ }
+ verify(eventHandler, times(vertices[2].getTotalTasks() - 3)).handle(any(Event.class));
+ reset(eventHandler);
+
+ // Schedule remaining tasks belonging to v3
+ for (int i = 3; i < vertices[3].getTotalTasks(); i++) {
+ dagScheduler.scheduleTask(createScheduleRequest(vertices[3].getVertexId(), i, 0));
+ }
+ verify(eventHandler, times(vertices[3].getTotalTasks() - 3)).handle(any(Event.class));
+ reset(eventHandler);
+
+
+ // Schedule all tasks belonging to v4
+ for (int i = 0; i < vertices[4].getTotalTasks(); i++) {
+ dagScheduler.scheduleTask(createScheduleRequest(vertices[4].getVertexId(), i, 0));
+ }
+ verify(eventHandler, times(vertices[4].getTotalTasks())).handle(any(Event.class));
+ reset(eventHandler);
+ }
+
+ @Test(timeout = 5000)
+ public void testSourceRequestDelayed() {
+ // ShuffleVertexHandler - slowstart simulation
+ EventHandler eventHandler = mock(EventHandler.class);
+ DAG dag = createMockDag();
+ DAGSchedulerNaturalOrderControlled dagScheduler =
+ new DAGSchedulerNaturalOrderControlled(dag, eventHandler);
+
+ int numVertices = 5;
+ Vertex[] vertices = new Vertex[numVertices];
+ for (int i = 0; i < numVertices; i++) {
+ vertices[i] = dag.getVertex("vertex" + i);
+ }
+
+ // Schedule all tasks belonging to v0
+ for (int i = 0; i < vertices[0].getTotalTasks(); i++) {
+ dagScheduler.scheduleTask(createScheduleRequest(vertices[0].getVertexId(), i, 0));
+ }
+ verify(eventHandler, times(vertices[0].getTotalTasks())).handle(any(Event.class));
+ reset(eventHandler);
+
+ // v2 behaving as if configured with slow-start.
+ // Schedule all tasks belonging to v3.
+ for (int i = 0; i < vertices[3].getTotalTasks(); i++) {
+ dagScheduler.scheduleTask(createScheduleRequest(vertices[3].getVertexId(), i, 0));
+ }
+ verify(eventHandler, times(vertices[3].getTotalTasks())).handle(any(Event.class));
+ reset(eventHandler);
+
+ // Scheduling all tasks belonging to v4. None should get scheduled.
+ for (int i = 0; i < vertices[4].getTotalTasks(); i++) {
+ dagScheduler.scheduleTask(createScheduleRequest(vertices[4].getVertexId(), i, 0));
+ }
+ verify(eventHandler, never()).handle(any(Event.class));
+ reset(eventHandler);
+
+ // v2 now starts scheduling ...
+ // Schedule 3 tasks for v2 initially.
+ for (int i = 0; i < 3; i++) {
+ dagScheduler.scheduleTask(createScheduleRequest(vertices[2].getVertexId(), i, 0));
+ }
+ verify(eventHandler, times(3)).handle(any(Event.class));
+ reset(eventHandler);
+
+ // Schedule remaining tasks belonging to v2
+ for (int i = 3; i < vertices[2].getTotalTasks(); i++) {
+ dagScheduler.scheduleTask(createScheduleRequest(vertices[2].getVertexId(), i, 0));
+ }
+ ArgumentCaptor<Event> args = ArgumentCaptor.forClass(Event.class);
+ // All of v2 and v3 should be sent out.
+ verify(eventHandler, times(vertices[2].getTotalTasks() - 3 + vertices[4].getTotalTasks()))
+ .handle(
+ args.capture());
+ int count = 0;
+ // Verify the order in which the events were sent out.
+ for (Event raw : args.getAllValues()) {
+ TaskAttemptEventSchedule event = (TaskAttemptEventSchedule) raw;
+ if (count < vertices[2].getTotalTasks() - 3) {
+ assertEquals(2, event.getTaskAttemptID().getTaskID().getVertexID().getId());
+ } else {
+ assertEquals(4, event.getTaskAttemptID().getTaskID().getVertexID().getId());
+ }
+ count++;
+ }
+ reset(eventHandler);
+ }
+
+
+ @Test(timeout = 5000)
+ public void testParallelismUpdated() {
+ EventHandler eventHandler = mock(EventHandler.class);
+ DAG dag = createMockDag();
+ DAGSchedulerNaturalOrderControlled dagScheduler =
+ new DAGSchedulerNaturalOrderControlled(dag, eventHandler);
+
+ int numVertices = 5;
+ Vertex[] vertices = new Vertex[numVertices];
+ for (int i = 0; i < numVertices; i++) {
+ vertices[i] = dag.getVertex("vertex" + i);
+ }
+
+ // Schedule all tasks belonging to v0
+ for (int i = 0; i < vertices[0].getTotalTasks(); i++) {
+ dagScheduler.scheduleTask(createScheduleRequest(vertices[0].getVertexId(), i, 0));
+ }
+ verify(eventHandler, times(vertices[0].getTotalTasks())).handle(any(Event.class));
+ reset(eventHandler);
+
+ assertEquals(10, vertices[2].getTotalTasks());
+
+ // v2 will change parallelism
+ // Schedule all tasks belonging to v3
+ for (int i = 0; i < vertices[3].getTotalTasks(); i++) {
+ dagScheduler.scheduleTask(createScheduleRequest(vertices[3].getVertexId(), i, 0));
+ }
+ verify(eventHandler, times(vertices[3].getTotalTasks())).handle(any(Event.class));
+ reset(eventHandler);
+
+ // Schedule all tasks belonging to v4
+ for (int i = 0; i < vertices[4].getTotalTasks(); i++) {
+ dagScheduler.scheduleTask(createScheduleRequest(vertices[4].getVertexId(), i, 0));
+ }
+ verify(eventHandler, never()).handle(any(Event.class));
+ reset(eventHandler);
+
+ // Reset the parallelism for v2.
+ updateParallelismOnMockVertex(vertices[2], 3);
+ assertEquals(3, vertices[2].getTotalTasks());
+
+ // Schedule all tasks belonging to v2
+ for (int i = 0; i < vertices[2].getTotalTasks(); i++) {
+ dagScheduler.scheduleTask(createScheduleRequest(vertices[2].getVertexId(), i, 0));
+ }
+ verify(eventHandler, times(vertices[2].getTotalTasks() + vertices[4].getTotalTasks()))
+ .handle(any(Event.class));
+ reset(eventHandler);
+ }
+
+ @Test(timeout = 5000)
+ public void testMultipleRequestsForSameTask() {
+ EventHandler eventHandler = mock(EventHandler.class);
+ DAG dag = createMockDag();
+ DAGSchedulerNaturalOrderControlled dagScheduler =
+ new DAGSchedulerNaturalOrderControlled(dag, eventHandler);
+
+ int numVertices = 5;
+ Vertex[] vertices = new Vertex[numVertices];
+ for (int i = 0; i < numVertices; i++) {
+ vertices[i] = dag.getVertex("vertex" + i);
+ }
+
+ // Schedule all but 1 task belonging to v0
+ for (int i = 0; i < vertices[0].getTotalTasks() - 1; i++) {
+ dagScheduler.scheduleTask(createScheduleRequest(vertices[0].getVertexId(), i, 0));
+ }
+ verify(eventHandler, times(vertices[0].getTotalTasks() - 1)).handle(any(Event.class));
+ reset(eventHandler);
+
+
+ // Schedule all tasks belonging to v2
+ for (int i = 0; i < vertices[2].getTotalTasks(); i++) {
+ dagScheduler.scheduleTask(createScheduleRequest(vertices[2].getVertexId(), i, 0));
+ }
+ // Nothing should be scheduled
+ verify(eventHandler, never()).handle(any(Event.class));
+ reset(eventHandler);
+
+ // Schedule an extra attempt for all but 1 task belonging to v0
+ for (int i = 0; i < vertices[0].getTotalTasks() - 1; i++) {
+ dagScheduler.scheduleTask(createScheduleRequest(vertices[0].getVertexId(), i, 1));
+ }
+ // Only v0 requests should have gone out
+ verify(eventHandler, times(vertices[0].getTotalTasks() - 1)).handle(any(Event.class));
+ reset(eventHandler);
+
+ // Schedule last task of v0, with attempt 1
+ dagScheduler.scheduleTask(
+ createScheduleRequest(vertices[0].getVertexId(), vertices[0].getTotalTasks() - 1, 1));
+ // One v0 request and all of v2 should have gone out
+ verify(eventHandler, times(1 + vertices[2].getTotalTasks())).handle(any(Event.class));
+ }
+
+
+ // Test parallelism updated form -1
+ // Reduce parallelism
+ // Different attempts scheduled for a single task.
+
+ private DAG createMockDag() {
+ DAG dag = mock(DAG.class);
+ /*
+ v0 v1
+ \ /
+ \ /
+ v2 v3
+ \ /
+ \ /
+ \ /
+ v4
+ v0 - Root
+ v1 - Root with 0 tasks.
+ v2 - can simulate AutoReduce. Parallelism goes down. Slow schedule.
+ v3 - can simulate ImmediateStart
+ v4 - Simulate one shuffle input, one broadcast input.
+ */
+
+ int numVertices = 5;
+ Vertex[] vertices = new Vertex[numVertices];
+
+ vertices[0] = createMockVertex("vertex0", 0, 10, 0);
+ vertices[1] = createMockVertex("vertex1", 1, 0, 0);
+ vertices[2] = createMockVertex("vertex2", 2, 10, 1);
+ vertices[3] = createMockVertex("vertex3", 3, 10, 1);
+ vertices[4] = createMockVertex("vertex4", 4, 10, 2);
+
+ for (int i = 0; i < numVertices; i++) {
+ String name = vertices[i].getName();
+ TezVertexID vertexId = vertices[i].getVertexId();
+ doReturn(vertices[i]).when(dag).getVertex(name);
+ doReturn(vertices[i]).when(dag).getVertex(vertexId);
+ }
+
+
+ updateMockVertexWithConnections(vertices[0], createConnectionMap(null),
+ createConnectionMap(vertices[2]));
+ updateMockVertexWithConnections(vertices[1], createConnectionMap(null),
+ createConnectionMap(vertices[3]));
+ updateMockVertexWithConnections(vertices[2], createConnectionMap(vertices[0]),
+ createConnectionMap(vertices[4]));
+ updateMockVertexWithConnections(vertices[3], createConnectionMap(vertices[1]),
+ createConnectionMap(vertices[4]));
+ updateMockVertexWithConnections(vertices[4], createConnectionMap(vertices[2], vertices[3]),
+ createConnectionMap(null));
+
+ return dag;
+ }
+
+
+ private void updateParallelismOnMockVertex(Vertex vertex, int newParallelism) {
+ doReturn(newParallelism).when(vertex).getTotalTasks();
+ }
+
+ private Vertex createMockVertex(String name, int vertexIdInt, int totalTasks,
+ int distanceFromRoot) {
+ ApplicationId appId = ApplicationId.newInstance(1000, 1);
+ TezDAGID dagId = TezDAGID.getInstance(appId, 1);
+ TezVertexID vertexId = TezVertexID.getInstance(dagId, vertexIdInt);
+
+ Vertex vertex = mock(Vertex.class);
+ doReturn(name).when(vertex).getName();
+ doReturn(totalTasks).when(vertex).getTotalTasks();
+ doReturn(vertexId).when(vertex).getVertexId();
+ doReturn(distanceFromRoot).when(vertex).getDistanceFromRoot();
+ doReturn(vertexId + " [" + name + "]").when(vertex).getLogIdentifier();
+ return vertex;
+ }
+
+ private Map<Vertex, Edge> createConnectionMap(Vertex... vertices) {
+ Map<Vertex, Edge> map = new HashMap<Vertex, Edge>();
+ if (vertices != null) {
+ for (Vertex vertex : vertices) {
+ map.put(vertex, mock(Edge.class));
+ }
+ }
+ return map;
+ }
+
+ private void updateMockVertexWithConnections(Vertex mockVertex, Map<Vertex, Edge> sources,
+ Map<Vertex, Edge> destinations) {
+ doReturn(sources).when(mockVertex).getInputVertices();
+ doReturn(destinations).when(mockVertex).getOutputVertices();
+ }
+
+ private TaskAttempt createTaskAttempt(TezVertexID vertexId, int taskIdInt, int attemptIdInt) {
+ TaskAttempt taskAttempt = mock(TaskAttempt.class);
+ TezTaskID taskId = TezTaskID.getInstance(vertexId, taskIdInt);
+ TezTaskAttemptID taskAttemptId = TezTaskAttemptID.getInstance(taskId, attemptIdInt);
+ doReturn(taskAttemptId).when(taskAttempt).getID();
+ doReturn(vertexId).when(taskAttempt).getVertexID();
+ return taskAttempt;
+ }
+
+ private DAGEventSchedulerUpdate createScheduleRequest(TezVertexID vertexId, int taskIdInt,
+ int attemptIdInt) {
+ TaskAttempt mockAttempt = createTaskAttempt(vertexId, taskIdInt, attemptIdInt);
+ return new DAGEventSchedulerUpdate(DAGEventSchedulerUpdate.UpdateType.TA_SCHEDULE, mockAttempt);
+ }
+
+}