You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2015/12/12 02:49:59 UTC
[2/5] tez git commit: TEZ-2633. Allow VertexManagerPlugins to receive
and report based on attempts instead of tasks (bikas) (cherry picked from
commit 7b45e9a142830e7dd8b0263d50dbaaef5fb0da76)
http://git-wip-us.apache.org/repos/asf/tez/blob/17d5c18f/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestInputReadyVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestInputReadyVertexManager.java b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestInputReadyVertexManager.java
index b164a6d..d59439e 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestInputReadyVertexManager.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestInputReadyVertexManager.java
@@ -23,7 +23,6 @@ import static org.mockito.Mockito.*;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -34,9 +33,10 @@ import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.VertexLocationHint;
import org.apache.tez.dag.api.VertexManagerPluginContext;
import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
-import org.apache.tez.dag.api.VertexManagerPluginContext.TaskWithLocationHint;
+import org.apache.tez.dag.api.VertexManagerPluginContext.ScheduleTaskRequest;
import org.apache.tez.dag.api.event.VertexState;
import org.apache.tez.dag.api.event.VertexStateUpdate;
+import org.apache.tez.runtime.api.TaskAttemptIdentifier;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -44,13 +44,13 @@ import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.MockitoAnnotations;
-import com.google.common.collect.Maps;
+import com.google.common.collect.Lists;
@SuppressWarnings("unchecked")
public class TestInputReadyVertexManager {
@Captor
- ArgumentCaptor<List<TaskWithLocationHint>> requestCaptor;
+ ArgumentCaptor<List<ScheduleTaskRequest>> requestCaptor;
@Before
public void init() {
@@ -77,23 +77,23 @@ public class TestInputReadyVertexManager {
when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(2);
when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(3);
mockInputVertices.put(mockSrcVertexId1, eProp1);
-
- Map<String, List<Integer>> initialCompletions = Maps.newHashMap();
- initialCompletions.put(mockSrcVertexId1, Collections.singletonList(0));
-
+
InputReadyVertexManager manager = new InputReadyVertexManager(mockContext);
manager.initialize();
verify(mockContext, times(1)).vertexReconfigurationPlanned();
// source vertex configured
manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
verify(mockContext, times(1)).doneReconfiguringVertex();
- verify(mockContext, times(0)).scheduleVertexTasks(requestCaptor.capture());
+ verify(mockContext, times(0)).scheduleTasks(requestCaptor.capture());
// then own vertex started
- manager.onVertexStarted(initialCompletions);
- manager.onSourceTaskCompleted(mockSrcVertexId1, 1);
- verify(mockContext, times(0)).scheduleVertexTasks(anyList());
- manager.onSourceTaskCompleted(mockSrcVertexId1, 2);
- verify(mockContext, times(1)).scheduleVertexTasks(requestCaptor.capture());
+ manager.onVertexStarted(Collections.singletonList(
+ TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId1, 0)));
+ manager.onSourceTaskCompleted(
+ TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId1, 1));
+ verify(mockContext, times(0)).scheduleTasks(anyList());
+ manager.onSourceTaskCompleted(
+ TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId1, 2));
+ verify(mockContext, times(1)).scheduleTasks(requestCaptor.capture());
Assert.assertEquals(2, requestCaptor.getValue().size());
}
@@ -118,36 +118,36 @@ public class TestInputReadyVertexManager {
when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(3);
mockInputVertices.put(mockSrcVertexId1, eProp1);
- Map<String, List<Integer>> initialCompletions = Maps.newHashMap();
- initialCompletions.put(mockSrcVertexId1, Collections.singletonList(0));
-
InputReadyVertexManager manager = new InputReadyVertexManager(mockContext);
manager.initialize();
verify(mockContext, times(1)).vertexReconfigurationPlanned();
// source vertex configured
manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
verify(mockContext, times(1)).doneReconfiguringVertex();
- verify(mockContext, times(0)).scheduleVertexTasks(requestCaptor.capture());
- manager.onVertexStarted(initialCompletions);
- verify(mockContext, times(1)).scheduleVertexTasks(requestCaptor.capture());
+ verify(mockContext, times(0)).scheduleTasks(requestCaptor.capture());
+ manager.onVertexStarted(Collections.singletonList(
+ TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId1, 0)));
+ verify(mockContext, times(1)).scheduleTasks(requestCaptor.capture());
Assert.assertEquals(1, requestCaptor.getValue().size());
- Assert.assertEquals(0, requestCaptor.getValue().get(0).getTaskIndex().intValue());
+ Assert.assertEquals(0, requestCaptor.getValue().get(0).getTaskIndex());
Assert.assertEquals(mockSrcVertexId1, requestCaptor.getValue().get(0)
.getTaskLocationHint().getAffinitizedTask().getVertexName());
Assert.assertEquals(0, requestCaptor.getValue().get(0)
.getTaskLocationHint().getAffinitizedTask().getTaskIndex());
- manager.onSourceTaskCompleted(mockSrcVertexId1, 1);
- verify(mockContext, times(2)).scheduleVertexTasks(requestCaptor.capture());
+ manager.onSourceTaskCompleted(
+ TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId1, 1));
+ verify(mockContext, times(2)).scheduleTasks(requestCaptor.capture());
Assert.assertEquals(1, requestCaptor.getValue().size());
- Assert.assertEquals(1, requestCaptor.getValue().get(0).getTaskIndex().intValue());
+ Assert.assertEquals(1, requestCaptor.getValue().get(0).getTaskIndex());
Assert.assertEquals(mockSrcVertexId1, requestCaptor.getValue().get(0)
.getTaskLocationHint().getAffinitizedTask().getVertexName());
Assert.assertEquals(1, requestCaptor.getValue().get(0)
.getTaskLocationHint().getAffinitizedTask().getTaskIndex());
- manager.onSourceTaskCompleted(mockSrcVertexId1, 2);
- verify(mockContext, times(3)).scheduleVertexTasks(requestCaptor.capture());
+ manager.onSourceTaskCompleted(
+ TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId1, 2));
+ verify(mockContext, times(3)).scheduleTasks(requestCaptor.capture());
Assert.assertEquals(1, requestCaptor.getValue().size());
- Assert.assertEquals(2, requestCaptor.getValue().get(0).getTaskIndex().intValue());
+ Assert.assertEquals(2, requestCaptor.getValue().get(0).getTaskIndex());
Assert.assertEquals(mockSrcVertexId1, requestCaptor.getValue().get(0)
.getTaskLocationHint().getAffinitizedTask().getVertexName());
Assert.assertEquals(2, requestCaptor.getValue().get(0)
@@ -175,28 +175,28 @@ public class TestInputReadyVertexManager {
when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(3);
mockInputVertices.put(mockSrcVertexId1, eProp1);
- Map<String, List<Integer>> initialCompletions = Maps.newHashMap();
- initialCompletions.put(mockSrcVertexId1, Collections.singletonList(0));
-
InputReadyVertexManager manager = new InputReadyVertexManager(mockContext);
manager.initialize();
verify(mockContext, times(1)).vertexReconfigurationPlanned();
- verify(mockContext, times(0)).scheduleVertexTasks(requestCaptor.capture());
+ verify(mockContext, times(0)).scheduleTasks(requestCaptor.capture());
// ok to have source task complete before anything else
- manager.onSourceTaskCompleted(mockSrcVertexId1, 1);
+ manager.onSourceTaskCompleted(
+ TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId1, 1));
// first own vertex started
- manager.onVertexStarted(initialCompletions);
+ manager.onVertexStarted(Collections.singletonList(
+ TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId1, 0)));
// no scheduling as we are not configured yet
- verify(mockContext, times(0)).scheduleVertexTasks(requestCaptor.capture());
+ verify(mockContext, times(0)).scheduleTasks(requestCaptor.capture());
// then source vertex configured. now we start
manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
verify(mockContext, times(1)).doneReconfiguringVertex();
- verify(mockContext, times(2)).scheduleVertexTasks(requestCaptor.capture());
- manager.onSourceTaskCompleted(mockSrcVertexId1, 2);
- verify(mockContext, times(3)).scheduleVertexTasks(requestCaptor.capture());
+ verify(mockContext, times(2)).scheduleTasks(requestCaptor.capture());
+ manager.onSourceTaskCompleted(
+ TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId1, 2));
+ verify(mockContext, times(3)).scheduleTasks(requestCaptor.capture());
Assert.assertEquals(1, requestCaptor.getValue().size());
- Assert.assertEquals(2, requestCaptor.getValue().get(0).getTaskIndex().intValue());
+ Assert.assertEquals(2, requestCaptor.getValue().get(0).getTaskIndex());
Assert.assertEquals(mockSrcVertexId1, requestCaptor.getValue().get(0)
.getTaskLocationHint().getAffinitizedTask().getVertexName());
Assert.assertEquals(2, requestCaptor.getValue().get(0)
@@ -247,7 +247,7 @@ public class TestInputReadyVertexManager {
mockInputVertices.put(mockSrcVertexId2, eProp2);
mockInputVertices.put(mockSrcVertexId3, eProp3);
- Map<String, List<Integer>> initialCompletions = Maps.newHashMap();
+ List<TaskAttemptIdentifier> initialCompletions = Lists.newArrayList();
// 1-1 sources do not match managed tasks. setParallelism called to make them match
when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(4);
@@ -280,8 +280,8 @@ public class TestInputReadyVertexManager {
when(mockContext.getVertexNumTasks(mockSrcVertexId3)).thenReturn(3);
- initialCompletions.put(mockSrcVertexId1, Collections.singletonList(0));
- initialCompletions.put(mockSrcVertexId2, Collections.singletonList(0));
+ initialCompletions.add(TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId1, 0));
+ initialCompletions.add(TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId2, 0));
manager = new InputReadyVertexManager(mockContext);
manager.initialize();
verify(mockContext, times(3)).vertexReconfigurationPlanned();
@@ -293,44 +293,53 @@ public class TestInputReadyVertexManager {
verify(mockContext, times(2)).doneReconfiguringVertex();
manager.onVertexStarted(initialCompletions);
// all 1-1 0's done but not scheduled because v1 is not done
- manager.onSourceTaskCompleted(mockSrcVertexId3, 0);
- manager.onSourceTaskCompleted(mockSrcVertexId1, 1);
- manager.onSourceTaskCompleted(mockSrcVertexId1, 1); // duplicate
- manager.onSourceTaskCompleted(mockSrcVertexId2, 1);
- verify(mockContext, times(0)).scheduleVertexTasks(anyList());
- manager.onSourceTaskCompleted(mockSrcVertexId1, 2); // v1 done
- verify(mockContext, times(1)).scheduleVertexTasks(requestCaptor.capture());
+ manager.onSourceTaskCompleted(
+ TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId3, 0));
+ manager.onSourceTaskCompleted(
+ TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId1, 1));
+ manager.onSourceTaskCompleted(
+ TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId1, 1)); // duplicate
+ manager.onSourceTaskCompleted(
+ TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId2, 1));
+ verify(mockContext, times(0)).scheduleTasks(anyList());
+ manager.onSourceTaskCompleted(
+ TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId1, 2)); // v1 done
+ verify(mockContext, times(1)).scheduleTasks(requestCaptor.capture());
Assert.assertEquals(1, requestCaptor.getValue().size());
- Assert.assertEquals(0, requestCaptor.getValue().get(0).getTaskIndex().intValue());
+ Assert.assertEquals(0, requestCaptor.getValue().get(0).getTaskIndex());
Assert.assertEquals(mockSrcVertexId3, requestCaptor.getValue().get(0)
.getTaskLocationHint().getAffinitizedTask().getVertexName());
Assert.assertEquals(0, requestCaptor.getValue().get(0)
.getTaskLocationHint().getAffinitizedTask().getTaskIndex()); // affinity to last completion
// 1-1 completion triggers since other 1-1 is done
- manager.onSourceTaskCompleted(mockSrcVertexId3, 1);
- verify(mockContext, times(2)).scheduleVertexTasks(requestCaptor.capture());
+ manager.onSourceTaskCompleted(
+ TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId3, 1));
+ verify(mockContext, times(2)).scheduleTasks(requestCaptor.capture());
Assert.assertEquals(1, requestCaptor.getValue().size());
- Assert.assertEquals(1, requestCaptor.getValue().get(0).getTaskIndex().intValue());
+ Assert.assertEquals(1, requestCaptor.getValue().get(0).getTaskIndex());
Assert.assertEquals(mockSrcVertexId3, requestCaptor.getValue().get(0)
.getTaskLocationHint().getAffinitizedTask().getVertexName());
Assert.assertEquals(1, requestCaptor.getValue().get(0)
.getTaskLocationHint().getAffinitizedTask().getTaskIndex()); // affinity to last completion
// 1-1 completion does not trigger since other 1-1 is not done
- manager.onSourceTaskCompleted(mockSrcVertexId3, 2);
- verify(mockContext, times(2)).scheduleVertexTasks(anyList());
+ manager.onSourceTaskCompleted(
+ TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId3, 2));
+ verify(mockContext, times(2)).scheduleTasks(anyList());
// 1-1 completion trigger start
- manager.onSourceTaskCompleted(mockSrcVertexId2, 2);
- verify(mockContext, times(3)).scheduleVertexTasks(requestCaptor.capture());
+ manager.onSourceTaskCompleted(
+ TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId2, 2));
+ verify(mockContext, times(3)).scheduleTasks(requestCaptor.capture());
Assert.assertEquals(1, requestCaptor.getValue().size());
- Assert.assertEquals(2, requestCaptor.getValue().get(0).getTaskIndex().intValue());
+ Assert.assertEquals(2, requestCaptor.getValue().get(0).getTaskIndex());
Assert.assertEquals(mockSrcVertexId2, requestCaptor.getValue().get(0)
.getTaskLocationHint().getAffinitizedTask().getVertexName());
Assert.assertEquals(2, requestCaptor.getValue().get(0)
.getTaskLocationHint().getAffinitizedTask().getTaskIndex()); // affinity to last completion
// no more starts
- manager.onSourceTaskCompleted(mockSrcVertexId3, 2);
- verify(mockContext, times(3)).scheduleVertexTasks(anyList());
+ manager.onSourceTaskCompleted(
+ TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId3, 2));
+ verify(mockContext, times(3)).scheduleTasks(anyList());
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/17d5c18f/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
index d71eba2..df08060 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
@@ -34,10 +34,18 @@ import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.api.VertexLocationHint;
import org.apache.tez.dag.api.VertexManagerPluginContext;
-import org.apache.tez.dag.api.VertexManagerPluginContext.TaskWithLocationHint;
+import org.apache.tez.dag.api.VertexManagerPluginContext.ScheduleTaskRequest;
import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
import org.apache.tez.dag.api.event.VertexState;
import org.apache.tez.dag.api.event.VertexStateUpdate;
+import org.apache.tez.dag.records.TaskAttemptIdentifierImpl;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.runtime.api.TaskAttemptIdentifier;
+import org.apache.tez.runtime.api.TaskIdentifier;
+import org.apache.tez.runtime.api.VertexIdentifier;
+import org.apache.tez.runtime.api.VertexStatistics;
import org.apache.tez.runtime.api.events.DataMovementEvent;
import org.apache.tez.runtime.api.events.VertexManagerEvent;
import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.VertexManagerEventPayloadProto;
@@ -68,6 +76,10 @@ import static org.mockito.Mockito.when;
@SuppressWarnings({ "unchecked", "rawtypes" })
public class TestShuffleVertexManager {
+ TezVertexID vertexId = TezVertexID.fromString("vertex_1436907267600_195589_1_00");
+ int taskId = 0;
+ List<TaskAttemptIdentifier> emptyCompletions = null;
+
@Test(timeout = 5000)
public void testShuffleVertexManagerAutoParallelism() throws Exception {
Configuration conf = new Configuration();
@@ -156,12 +168,12 @@ public class TestShuffleVertexManager {
public Object answer(InvocationOnMock invocation) {
Object[] args = invocation.getArguments();
scheduledTasks.clear();
- List<TaskWithLocationHint> tasks = (List<TaskWithLocationHint>)args[0];
- for (TaskWithLocationHint task : tasks) {
+ List<ScheduleTaskRequest> tasks = (List<ScheduleTaskRequest>)args[0];
+ for (ScheduleTaskRequest task : tasks) {
scheduledTasks.add(task.getTaskIndex());
}
return null;
- }}).when(mockContext).scheduleVertexTasks(anyList());
+ }}).when(mockContext).scheduleTasks(anyList());
final Map<String, EdgeManagerPlugin> newEdgeManagers =
new HashMap<String, EdgeManagerPlugin>();
@@ -217,7 +229,7 @@ public class TestShuffleVertexManager {
when(mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn(0);
when(mockContext.getVertexNumTasks(mockSrcVertexId3)).thenReturn(1);
- manager.onVertexStarted(null);
+ manager.onVertexStarted(emptyCompletions);
verify(mockContext, times(2)).vertexReconfigurationPlanned();
Assert.assertTrue(manager.bipartiteSources == 2);
@@ -240,7 +252,7 @@ public class TestShuffleVertexManager {
verify(mockContext, times(1)).doneReconfiguringVertex(); // no change. will trigger after start
Assert.assertTrue(scheduledTasks.size() == 0); // no tasks scheduled
// trigger start and processing of pending notification events
- manager.onVertexStarted(null);
+ manager.onVertexStarted(emptyCompletions);
Assert.assertTrue(manager.bipartiteSources == 2);
verify(mockContext, times(2)).doneReconfiguringVertex(); // reconfig done
Assert.assertTrue(manager.pendingTasks.isEmpty());
@@ -250,20 +262,18 @@ public class TestShuffleVertexManager {
when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(2);
when(mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn(2);
- ByteBuffer payload =
- VertexManagerEventPayloadProto.newBuilder().setOutputSize(5000L).build().toByteString().asReadOnlyByteBuffer();
- VertexManagerEvent vmEvent = VertexManagerEvent.create("Vertex", payload);
+ VertexManagerEvent vmEvent = getVertexManagerEvent(null, 5000L, "Vertex");
// parallelism not change due to large data size
manager = createManager(conf, mockContext, 0.1f, 0.1f);
verify(mockContext, times(4)).vertexReconfigurationPlanned(); // Tez notified of reconfig
- manager.onVertexStarted(null);
+ manager.onVertexStarted(emptyCompletions);
Assert.assertTrue(manager.pendingTasks.size() == 4); // no tasks scheduled
Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 4);
manager.onVertexManagerEventReceived(vmEvent);
manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
- manager.onSourceTaskCompleted(mockSrcVertexId1, new Integer(0));
+ manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 0));
verify(mockContext, times(0)).reconfigureVertex(anyInt(), any(VertexLocationHint.class), anyMap());
verify(mockContext, times(2)).doneReconfiguringVertex();
// trigger scheduling
@@ -281,20 +291,18 @@ public class TestShuffleVertexManager {
* Delay determining parallelism until enough data has been received.
*/
scheduledTasks.clear();
- payload =
- VertexManagerEventPayloadProto.newBuilder().setOutputSize(1L).build().toByteString().asReadOnlyByteBuffer();
- vmEvent = VertexManagerEvent.create("Vertex", payload);
//min/max fraction of 0.01/0.75 would ensure that we hit determineParallelism code path on receiving first event itself.
manager = createManager(conf, mockContext, 0.01f, 0.75f);
- manager.onVertexStarted(null);
+ manager.onVertexStarted(emptyCompletions);
Assert.assertEquals(4, manager.pendingTasks.size()); // no tasks scheduled
Assert.assertEquals(4, manager.totalNumBipartiteSourceTasks);
Assert.assertEquals(0, manager.numBipartiteSourceTasksCompleted);
//First task in src1 completed with small payload
+ vmEvent = getVertexManagerEvent(null, 1L, "Vertex");
manager.onVertexManagerEventReceived(vmEvent); //small payload
- manager.onSourceTaskCompleted(mockSrcVertexId1, new Integer(0));
+ manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 0));
Assert.assertTrue(manager.determineParallelismAndApply() == false);
Assert.assertEquals(4, manager.pendingTasks.size());
Assert.assertEquals(0, scheduledTasks.size()); // no tasks scheduled
@@ -303,8 +311,9 @@ public class TestShuffleVertexManager {
Assert.assertEquals(1L, manager.completedSourceTasksOutputSize);
//Second task in src1 completed with small payload
+ vmEvent = getVertexManagerEvent(null, 1L, "Vertex");
manager.onVertexManagerEventReceived(vmEvent); //small payload
- manager.onSourceTaskCompleted(mockSrcVertexId1, new Integer(0));
+ manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 0));
//Still overall data gathered has not reached threshold; So, ensure parallelism can be determined later
Assert.assertTrue(manager.determineParallelismAndApply() == false);
Assert.assertEquals(4, manager.pendingTasks.size());
@@ -314,17 +323,14 @@ public class TestShuffleVertexManager {
Assert.assertEquals(2L, manager.completedSourceTasksOutputSize);
//First task in src2 completed (with larger payload) to trigger determining parallelism
- payload =
- VertexManagerEventPayloadProto.newBuilder().setOutputSize(1200L).build().toByteString()
- .asReadOnlyByteBuffer();
- vmEvent = VertexManagerEvent.create("Vertex", payload);
+ vmEvent = getVertexManagerEvent(null, 1200L, "Vertex");
manager.onVertexManagerEventReceived(vmEvent);
Assert.assertTrue(manager.determineParallelismAndApply()); //ensure parallelism is determined
verify(mockContext, times(1)).reconfigureVertex(eq(2), any(VertexLocationHint.class), anyMap());
manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
- manager.onSourceTaskCompleted(mockSrcVertexId2, new Integer(0));
+ manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 0));
Assert.assertEquals(1, manager.pendingTasks.size());
Assert.assertEquals(1, scheduledTasks.size());
Assert.assertEquals(2, manager.numBipartiteSourceTasksCompleted);
@@ -336,14 +342,11 @@ public class TestShuffleVertexManager {
when(mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn(20);
when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(40);
scheduledTasks.clear();
- payload =
- VertexManagerEventPayloadProto.newBuilder().setOutputSize(100L).build().toByteString()
- .asReadOnlyByteBuffer();
- vmEvent = VertexManagerEvent.create("Vertex", payload);
+ vmEvent = getVertexManagerEvent(null, 100L, "Vertex");
//min/max fraction of 0.0/0.2
manager = createManager(conf, mockContext, 0.0f, 0.2f);
- manager.onVertexStarted(null);
+ manager.onVertexStarted(emptyCompletions);
manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
@@ -353,13 +356,17 @@ public class TestShuffleVertexManager {
//send 7 events with payload size as 100
for(int i=0;i<7;i++) {
manager.onVertexManagerEventReceived(vmEvent); //small payload
- manager.onSourceTaskCompleted(mockSrcVertexId1, new Integer(i));
+ manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, i));
//should not change parallelism
verify(mockContext, times(0)).reconfigureVertex(eq(4), any(VertexLocationHint.class), anyMap());
}
//send 8th event with payload size as 100
manager.onVertexManagerEventReceived(vmEvent);
- manager.onSourceTaskCompleted(mockSrcVertexId2, new Integer(8));
+
+ //ShuffleVertexManager's updatePendingTasks relies on getVertexNumTasks. Setting this for test
+ when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(4);
+
+ manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 8));
//Since max threshold (40 * 0.2 = 8) is met, vertex manager should determine parallelism
verify(mockContext, times(1)).reconfigureVertex(eq(4), any(VertexLocationHint.class), anyMap());
@@ -370,38 +377,39 @@ public class TestShuffleVertexManager {
// parallelism changed due to small data size
scheduledTasks.clear();
- payload =
- VertexManagerEventPayloadProto.newBuilder().setOutputSize(500L).build().toByteString().asReadOnlyByteBuffer();
- vmEvent = VertexManagerEvent.create("Vertex", payload);
manager = createManager(conf, mockContext, 0.5f, 0.5f);
- manager.onVertexStarted(null);
+ manager.onVertexStarted(emptyCompletions);
manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
Assert.assertEquals(4, manager.pendingTasks.size()); // no tasks scheduled
Assert.assertEquals(4, manager.totalNumBipartiteSourceTasks);
// task completion from non-bipartite stage does nothing
- manager.onSourceTaskCompleted(mockSrcVertexId3, new Integer(0));
+ manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId3, 0));
Assert.assertEquals(4, manager.pendingTasks.size()); // no tasks scheduled
Assert.assertEquals(4, manager.totalNumBipartiteSourceTasks);
Assert.assertEquals(0, manager.numBipartiteSourceTasksCompleted);
+ vmEvent = getVertexManagerEvent(null, 500L, "Vertex");
manager.onVertexManagerEventReceived(vmEvent);
- manager.onSourceTaskCompleted(mockSrcVertexId1, new Integer(0));
+ manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 0));
Assert.assertEquals(4, manager.pendingTasks.size());
Assert.assertEquals(0, scheduledTasks.size()); // no tasks scheduled
Assert.assertEquals(1, manager.numBipartiteSourceTasksCompleted);
Assert.assertEquals(1, manager.numVertexManagerEventsReceived);
Assert.assertEquals(500L, manager.completedSourceTasksOutputSize);
// ignore duplicate completion
- manager.onSourceTaskCompleted(mockSrcVertexId1, new Integer(0));
+ manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 0));
Assert.assertEquals(4, manager.pendingTasks.size());
Assert.assertEquals(0, scheduledTasks.size()); // no tasks scheduled
Assert.assertEquals(1, manager.numBipartiteSourceTasksCompleted);
Assert.assertEquals(500L, manager.completedSourceTasksOutputSize);
-
+ vmEvent = getVertexManagerEvent(null, 500L, "Vertex");
manager.onVertexManagerEventReceived(vmEvent);
- manager.onSourceTaskCompleted(mockSrcVertexId2, new Integer(1));
+ //ShuffleVertexManager's updatePendingTasks relies on getVertexNumTasks. Setting this for test
+ when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(2);
+
+ manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 1));
// managedVertex tasks reduced
verify(mockContext, times(2)).reconfigureVertex(eq(2), any(VertexLocationHint.class), anyMap());
Assert.assertEquals(2, newEdgeManagers.size());
@@ -415,7 +423,7 @@ public class TestShuffleVertexManager {
Assert.assertEquals(1000L, manager.completedSourceTasksOutputSize);
// more completions dont cause recalculation of parallelism
- manager.onSourceTaskCompleted(mockSrcVertexId2, new Integer(0));
+ manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 0));
verify(mockContext, times(2)).reconfigureVertex(eq(2), any(VertexLocationHint.class), anyMap());
Assert.assertEquals(2, newEdgeManagers.size());
@@ -490,7 +498,7 @@ public class TestShuffleVertexManager {
mockInputVertices.put(mockSrcVertexId3, eProp3);
try {
manager = createManager(conf, mockContext, 0.1f, 0.1f);
- manager.onVertexStarted(null);
+ manager.onVertexStarted(emptyCompletions);
Assert.assertFalse(true);
} catch (TezUncheckedException e) {
Assert.assertTrue(e.getMessage().contains(
@@ -502,7 +510,7 @@ public class TestShuffleVertexManager {
// check initialization
manager = createManager(conf, mockContext, 0.1f, 0.1f);
- manager.onVertexStarted(null);
+ manager.onVertexStarted(emptyCompletions);
Assert.assertTrue(manager.bipartiteSources == 2);
final HashSet<Integer> scheduledTasks = new HashSet<Integer>();
@@ -510,17 +518,17 @@ public class TestShuffleVertexManager {
public Object answer(InvocationOnMock invocation) {
Object[] args = invocation.getArguments();
scheduledTasks.clear();
- List<TaskWithLocationHint> tasks = (List<TaskWithLocationHint>)args[0];
- for (TaskWithLocationHint task : tasks) {
+ List<ScheduleTaskRequest> tasks = (List<ScheduleTaskRequest>)args[0];
+ for (ScheduleTaskRequest task : tasks) {
scheduledTasks.add(task.getTaskIndex());
}
return null;
- }}).when(mockContext).scheduleVertexTasks(anyList());
+ }}).when(mockContext).scheduleTasks(anyList());
// source vertices have 0 tasks. immediate start of all managed tasks
when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(0);
when(mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn(0);
- manager.onVertexStarted(null);
+ manager.onVertexStarted(emptyCompletions);
Assert.assertTrue(manager.pendingTasks.isEmpty());
Assert.assertTrue(scheduledTasks.size() == 3); // all tasks scheduled
@@ -560,7 +568,7 @@ public class TestShuffleVertexManager {
scheduledTasks.clear();
manager = createManager(conf, mockContext, 0.8f, null);
- manager.onVertexStarted(null);
+ manager.onVertexStarted(emptyCompletions);
manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
@@ -573,7 +581,7 @@ public class TestShuffleVertexManager {
// Finish all tasks before exceeding the threshold
for (String mockSrcVertex : new String[] { mockSrcVertexId1, mockSrcVertexId2 }) {
for (int i = 0; i < mockContext.getVertexNumTasks(mockSrcVertex); ++i) {
- manager.onSourceTaskCompleted(mockSrcVertex, i);
+ manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertex, i));
++completedTasks;
if ((completedTasks + 1) >= completedTasksThreshold) {
// stop before completing more than min/max source tasks
@@ -586,7 +594,7 @@ public class TestShuffleVertexManager {
Assert.assertEquals(0, scheduledTasks.size()); // no tasks scheduled
// Cross the threshold min/max threshold to schedule all tasks
- manager.onSourceTaskCompleted(mockSrcVertexId2, completedTasks);
+ manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, completedTasks));
Assert.assertEquals(0, manager.pendingTasks.size());
Assert.assertEquals(manager.totalTasksToSchedule, scheduledTasks.size()); // all tasks scheduled
@@ -595,8 +603,8 @@ public class TestShuffleVertexManager {
when(mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn(2);
// source vertex have some tasks. min, max == 0
- manager = createManager(conf, mockContext, 0.0f, 0.0f);
- manager.onVertexStarted(null);
+ manager = createManager(conf, mockContext, 0.f, 0.f);
+ manager.onVertexStarted(emptyCompletions);
Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 4);
Assert.assertTrue(manager.totalTasksToSchedule == 3);
Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 0);
@@ -609,122 +617,122 @@ public class TestShuffleVertexManager {
// min, max > 0 and min == max
manager = createManager(conf, mockContext, 0.25f, 0.25f);
- manager.onVertexStarted(null);
+ manager.onVertexStarted(emptyCompletions);
manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 4);
// task completion from non-bipartite stage does nothing
- manager.onSourceTaskCompleted(mockSrcVertexId3, new Integer(0));
+ manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId3, 0));
Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 4);
Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 0);
- manager.onSourceTaskCompleted(mockSrcVertexId1, new Integer(0));
+ manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 0));
Assert.assertTrue(manager.pendingTasks.isEmpty());
Assert.assertTrue(scheduledTasks.size() == 3); // all tasks scheduled
Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 1);
// min, max > 0 and min == max == absolute max 1.0
manager = createManager(conf, mockContext, 1.0f, 1.0f);
- manager.onVertexStarted(null);
+ manager.onVertexStarted(emptyCompletions);
manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 4);
// task completion from non-bipartite stage does nothing
- manager.onSourceTaskCompleted(mockSrcVertexId3, new Integer(0));
+ manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId3, 0));
Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 4);
Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 0);
- manager.onSourceTaskCompleted(mockSrcVertexId1, new Integer(0));
+ manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 0));
Assert.assertTrue(manager.pendingTasks.size() == 3);
Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 1);
- manager.onSourceTaskCompleted(mockSrcVertexId1, new Integer(1));
+ manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 1));
Assert.assertTrue(manager.pendingTasks.size() == 3);
Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 2);
- manager.onSourceTaskCompleted(mockSrcVertexId2, new Integer(0));
+ manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 0));
Assert.assertTrue(manager.pendingTasks.size() == 3);
Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 3);
- manager.onSourceTaskCompleted(mockSrcVertexId2, new Integer(1));
+ manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 1));
Assert.assertTrue(manager.pendingTasks.isEmpty());
Assert.assertTrue(scheduledTasks.size() == 3); // all tasks scheduled
Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 4);
// min, max > 0 and min == max
manager = createManager(conf, mockContext, 1.0f, 1.0f);
- manager.onVertexStarted(null);
+ manager.onVertexStarted(emptyCompletions);
manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 4);
// task completion from non-bipartite stage does nothing
- manager.onSourceTaskCompleted(mockSrcVertexId3, new Integer(0));
+ manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId3, 0));
Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 4);
Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 0);
- manager.onSourceTaskCompleted(mockSrcVertexId1, new Integer(0));
+ manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 0));
Assert.assertTrue(manager.pendingTasks.size() == 3);
Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 1);
- manager.onSourceTaskCompleted(mockSrcVertexId1, new Integer(1));
+ manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 1));
Assert.assertTrue(manager.pendingTasks.size() == 3);
Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 2);
- manager.onSourceTaskCompleted(mockSrcVertexId2, new Integer(0));
+ manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 0));
Assert.assertTrue(manager.pendingTasks.size() == 3);
Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 3);
- manager.onSourceTaskCompleted(mockSrcVertexId2, new Integer(1));
+ manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 1));
Assert.assertTrue(manager.pendingTasks.isEmpty());
Assert.assertTrue(scheduledTasks.size() == 3); // all tasks scheduled
Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 4);
// min, max > and min < max
manager = createManager(conf, mockContext, 0.25f, 0.75f);
- manager.onVertexStarted(null);
+ manager.onVertexStarted(emptyCompletions);
manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 4);
- manager.onSourceTaskCompleted(mockSrcVertexId1, new Integer(0));
- manager.onSourceTaskCompleted(mockSrcVertexId2, new Integer(1));
+ manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 0));
+ manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 1));
Assert.assertTrue(manager.pendingTasks.size() == 2);
Assert.assertTrue(scheduledTasks.size() == 1); // 1 task scheduled
Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 2);
// completion of same task again should not get counted
- manager.onSourceTaskCompleted(mockSrcVertexId2, new Integer(1));
+ manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 1));
Assert.assertTrue(manager.pendingTasks.size() == 2);
Assert.assertTrue(scheduledTasks.size() == 1); // 1 task scheduled
Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 2);
- manager.onSourceTaskCompleted(mockSrcVertexId2, new Integer(0));
+ manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 0));
Assert.assertTrue(manager.pendingTasks.size() == 0);
Assert.assertTrue(scheduledTasks.size() == 2); // 2 tasks scheduled
Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 3);
scheduledTasks.clear();
- manager.onSourceTaskCompleted(mockSrcVertexId1, new Integer(1)); // we are done. no action
+ manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 1)); // we are done. no action
Assert.assertTrue(manager.pendingTasks.size() == 0);
Assert.assertTrue(scheduledTasks.size() == 0); // no task scheduled
Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 4);
// min, max > and min < max
manager = createManager(conf, mockContext, 0.25f, 1.0f);
- manager.onVertexStarted(null);
+ manager.onVertexStarted(emptyCompletions);
manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 4);
- manager.onSourceTaskCompleted(mockSrcVertexId1, new Integer(0));
- manager.onSourceTaskCompleted(mockSrcVertexId2, new Integer(1));
+ manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 0));
+ manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 1));
Assert.assertTrue(manager.pendingTasks.size() == 2);
Assert.assertTrue(scheduledTasks.size() == 1); // 1 task scheduled
Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 2);
- manager.onSourceTaskCompleted(mockSrcVertexId2, new Integer(0));
+ manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId2, 0));
Assert.assertTrue(manager.pendingTasks.size() == 1);
Assert.assertTrue(scheduledTasks.size() == 1); // 1 task scheduled
Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 3);
- manager.onSourceTaskCompleted(mockSrcVertexId1, new Integer(1));
+ manager.onSourceTaskCompleted(createTaskAttemptIdentifier(mockSrcVertexId1, 1));
Assert.assertTrue(manager.pendingTasks.size() == 0);
Assert.assertTrue(scheduledTasks.size() == 1); // no task scheduled
Assert.assertTrue(manager.numBipartiteSourceTasksCompleted == 4);
@@ -734,9 +742,10 @@ public class TestShuffleVertexManager {
/**
* Tasks should be scheduled only when all source vertices are configured completely
+ * @throws IOException
*/
@Test(timeout = 5000)
- public void test_Tez1649_with_scatter_gather_edges() {
+ public void test_Tez1649_with_scatter_gather_edges() throws IOException {
Configuration conf = new Configuration();
conf.setBoolean(
ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL,
@@ -780,10 +789,7 @@ public class TestShuffleVertexManager {
when(mockContext_R2.getVertexNumTasks(m2)).thenReturn(3);
when(mockContext_R2.getVertexNumTasks(m3)).thenReturn(3);
- ByteBuffer payload =
- VertexManagerEventPayloadProto.newBuilder().setOutputSize(50L).build().toByteString().asReadOnlyByteBuffer();
- VertexManagerEvent vmEvent = VertexManagerEvent.create("Vertex", payload);
-
+ VertexManagerEvent vmEvent = getVertexManagerEvent(null, 50L, "Vertex");
// check initialization
manager = createManager(conf, mockContext_R2, 0.001f, 0.001f);
@@ -792,14 +798,14 @@ public class TestShuffleVertexManager {
public Object answer(InvocationOnMock invocation) {
Object[] args = invocation.getArguments();
scheduledTasks.clear();
- List<TaskWithLocationHint> tasks = (List<TaskWithLocationHint>)args[0];
- for (TaskWithLocationHint task : tasks) {
+ List<ScheduleTaskRequest> tasks = (List<ScheduleTaskRequest>)args[0];
+ for (ScheduleTaskRequest task : tasks) {
scheduledTasks.add(task.getTaskIndex());
}
return null;
- }}).when(mockContext_R2).scheduleVertexTasks(anyList());
+ }}).when(mockContext_R2).scheduleTasks(anyList());
- manager.onVertexStarted(null);
+ manager.onVertexStarted(emptyCompletions);
Assert.assertTrue(manager.bipartiteSources == 3);
manager.onVertexStateUpdated(new VertexStateUpdate(m2, VertexState.CONFIGURED));
manager.onVertexStateUpdated(new VertexStateUpdate(m3, VertexState.CONFIGURED));
@@ -813,15 +819,15 @@ public class TestShuffleVertexManager {
Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 9);
//Send events for all tasks of m3.
- manager.onSourceTaskCompleted(m3, new Integer(0));
- manager.onSourceTaskCompleted(m3, new Integer(1));
- manager.onSourceTaskCompleted(m3, new Integer(2));
+ manager.onSourceTaskCompleted(createTaskAttemptIdentifier(m3, 0));
+ manager.onSourceTaskCompleted(createTaskAttemptIdentifier(m3, 1));
+ manager.onSourceTaskCompleted(createTaskAttemptIdentifier(m3, 2));
Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 9);
//Send an event for m2. But still we need to wait for at least 1 event from r1.
- manager.onSourceTaskCompleted(m2, new Integer(0));
+ manager.onSourceTaskCompleted(createTaskAttemptIdentifier(m2, 0));
Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 9);
@@ -846,7 +852,7 @@ public class TestShuffleVertexManager {
when(mockContext_R2.getVertexNumTasks(m3)).thenReturn(3);
manager = createManager(conf, mockContext_R2, 0.001f, 0.001f);
- manager.onVertexStarted(null);
+ manager.onVertexStarted(emptyCompletions);
Assert.assertEquals(3, manager.pendingTasks.size()); // no tasks scheduled
Assert.assertEquals(3, manager.totalNumBipartiteSourceTasks);
Assert.assertEquals(0, manager.numBipartiteSourceTasksCompleted);
@@ -856,11 +862,42 @@ public class TestShuffleVertexManager {
// Only need completed configuration notification from m3
manager.onVertexStateUpdated(new VertexStateUpdate(m3, VertexState.CONFIGURED));
- manager.onSourceTaskCompleted(m3, new Integer(0));
+ manager.onSourceTaskCompleted(createTaskAttemptIdentifier(m3, 0));
Assert.assertTrue(manager.pendingTasks.size() == 0); // all tasks scheduled
Assert.assertTrue(scheduledTasks.size() == 3);
}
+ VertexManagerEvent getVertexManagerEvent(long[] sizes, long totalSize, String vertexName)
+ throws IOException {
+ ByteBuffer payload = null;
+ if (sizes != null) {
+ /*
+ RoaringBitmap partitionStats = ShuffleUtils.getPartitionStatsForPhysicalOutput(sizes);
+ DataOutputBuffer dout = new DataOutputBuffer();
+ partitionStats.serialize(dout);
+ ByteString
+ partitionStatsBytes = TezCommonUtils.compressByteArrayToByteString(dout.getData());
+ payload =
+ VertexManagerEventPayloadProto.newBuilder()
+ .setOutputSize(totalSize)
+ .setPartitionStats(partitionStatsBytes)
+ .build().toByteString()
+ .asReadOnlyByteBuffer();
+ */
+ } else {
+ payload =
+ VertexManagerEventPayloadProto.newBuilder()
+ .setOutputSize(totalSize)
+ .build().toByteString()
+ .asReadOnlyByteBuffer();
+ }
+ TaskAttemptIdentifierImpl taId = new TaskAttemptIdentifierImpl("dag", vertexName,
+ TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexId, taskId++), 0));
+ VertexManagerEvent vmEvent = VertexManagerEvent.create(vertexName, payload);
+ vmEvent.setProducerAttemptIdentifier(taId);
+ return vmEvent;
+ }
+
@Test(timeout = 5000)
public void test_Tez1649_with_mixed_edges() {
Configuration conf = new Configuration();
@@ -913,16 +950,16 @@ public class TestShuffleVertexManager {
public Object answer(InvocationOnMock invocation) {
Object[] args = invocation.getArguments();
scheduledTasks.clear();
- List<TaskWithLocationHint> tasks = (List<TaskWithLocationHint>)args[0];
- for (TaskWithLocationHint task : tasks) {
+ List<ScheduleTaskRequest> tasks = (List<ScheduleTaskRequest>)args[0];
+ for (ScheduleTaskRequest task : tasks) {
scheduledTasks.add(task.getTaskIndex());
}
return null;
- }}).when(mockContext).scheduleVertexTasks(anyList());
+ }}).when(mockContext).scheduleTasks(anyList());
// check initialization
manager = createManager(conf, mockContext, 0.001f, 0.001f);
- manager.onVertexStarted(null);
+ manager.onVertexStarted(emptyCompletions);
Assert.assertTrue(manager.bipartiteSources == 1);
manager.onVertexStateUpdated(new VertexStateUpdate(r1, VertexState.CONFIGURED));
@@ -933,13 +970,13 @@ public class TestShuffleVertexManager {
Assert.assertEquals(0, manager.numBipartiteSourceTasksCompleted);
//Send events for 2 tasks of r1.
- manager.onSourceTaskCompleted(r1, new Integer(0));
- manager.onSourceTaskCompleted(r1, new Integer(1));
+ manager.onSourceTaskCompleted(createTaskAttemptIdentifier(r1, 0));
+ manager.onSourceTaskCompleted(createTaskAttemptIdentifier(r1, 1));
Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 3);
//Send an event for m2.
- manager.onSourceTaskCompleted(m2, new Integer(0));
+ manager.onSourceTaskCompleted(createTaskAttemptIdentifier(m2, 0));
Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 3);
@@ -952,7 +989,7 @@ public class TestShuffleVertexManager {
//Still, wait for a configuration to be completed from other edges
scheduledTasks.clear();
manager = createManager(conf, mockContext, 0.001f, 0.001f);
- manager.onVertexStarted(null);
+ manager.onVertexStarted(emptyCompletions);
manager.onVertexStateUpdated(new VertexStateUpdate(r1, VertexState.CONFIGURED));
when(mockContext.getInputVertexEdgeProperties()).thenReturn(mockInputVertices);
@@ -964,9 +1001,9 @@ public class TestShuffleVertexManager {
Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
Assert.assertTrue(manager.totalNumBipartiteSourceTasks == 3);
- manager.onSourceTaskCompleted(r1, new Integer(0));
- manager.onSourceTaskCompleted(r1, new Integer(1));
- manager.onSourceTaskCompleted(r1, new Integer(2));
+ manager.onSourceTaskCompleted(createTaskAttemptIdentifier(r1, 0));
+ manager.onSourceTaskCompleted(createTaskAttemptIdentifier(r1, 1));
+ manager.onSourceTaskCompleted(createTaskAttemptIdentifier(r1, 2));
//Tasks from non-scatter edges of m2 and m3 are not complete.
Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
manager.onVertexStateUpdated(new VertexStateUpdate(m2, VertexState.CONFIGURED));
@@ -979,7 +1016,7 @@ public class TestShuffleVertexManager {
//try with a zero task vertex (with non-scatter-gather edges)
scheduledTasks.clear();
manager = createManager(conf, mockContext, 0.001f, 0.001f);
- manager.onVertexStarted(null);
+ manager.onVertexStarted(emptyCompletions);
when(mockContext.getInputVertexEdgeProperties()).thenReturn(mockInputVertices);
when(mockContext.getVertexName()).thenReturn(mockManagedVertexId);
when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(3);
@@ -988,7 +1025,7 @@ public class TestShuffleVertexManager {
when(mockContext.getVertexNumTasks(m3)).thenReturn(3); //broadcast
manager = createManager(conf, mockContext, 0.001f, 0.001f);
- manager.onVertexStarted(null);
+ manager.onVertexStarted(emptyCompletions);
manager.onVertexStateUpdated(new VertexStateUpdate(r1, VertexState.CONFIGURED));
Assert.assertEquals(3, manager.pendingTasks.size()); // no tasks scheduled
@@ -996,8 +1033,8 @@ public class TestShuffleVertexManager {
Assert.assertEquals(0, manager.numBipartiteSourceTasksCompleted);
//Send 2 events for tasks of r1.
- manager.onSourceTaskCompleted(r1, new Integer(0));
- manager.onSourceTaskCompleted(r1, new Integer(1));
+ manager.onSourceTaskCompleted(createTaskAttemptIdentifier(r1, 0));
+ manager.onSourceTaskCompleted(createTaskAttemptIdentifier(r1, 1));
Assert.assertTrue(manager.pendingTasks.size() == 3); // no tasks scheduled
Assert.assertTrue(scheduledTasks.size() == 0);
@@ -1009,7 +1046,7 @@ public class TestShuffleVertexManager {
//try with all zero task vertices in non-SG edges
scheduledTasks.clear();
manager = createManager(conf, mockContext, 0.001f, 0.001f);
- manager.onVertexStarted(null);
+ manager.onVertexStarted(emptyCompletions);
when(mockContext.getInputVertexEdgeProperties()).thenReturn(mockInputVertices);
when(mockContext.getVertexName()).thenReturn(mockManagedVertexId);
when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(3);
@@ -1019,10 +1056,22 @@ public class TestShuffleVertexManager {
//Send 1 events for tasks of r1.
manager.onVertexStateUpdated(new VertexStateUpdate(r1, VertexState.CONFIGURED));
- manager.onSourceTaskCompleted(r1, new Integer(0));
+ manager.onSourceTaskCompleted(createTaskAttemptIdentifier(r1, 0));
Assert.assertTrue(manager.pendingTasks.size() == 0); // all tasks scheduled
Assert.assertTrue(scheduledTasks.size() == 3);
}
+
+ public static TaskAttemptIdentifier createTaskAttemptIdentifier(String vName, int tId) {
+ VertexIdentifier mockVertex = mock(VertexIdentifier.class);
+ when(mockVertex.getName()).thenReturn(vName);
+ TaskIdentifier mockTask = mock(TaskIdentifier.class);
+ when(mockTask.getIdentifier()).thenReturn(tId);
+ when(mockTask.getVertexIdentifier()).thenReturn(mockVertex);
+ TaskAttemptIdentifier mockAttempt = mock(TaskAttemptIdentifier.class);
+ when(mockAttempt.getIdentifier()).thenReturn(0);
+ when(mockAttempt.getTaskIdentifier()).thenReturn(mockTask);
+ return mockAttempt;
+ }
private ShuffleVertexManager createManager(Configuration conf,
VertexManagerPluginContext context, Float min, Float max) {
http://git-wip-us.apache.org/repos/asf/tez/blob/17d5c18f/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java b/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java
index 7d7069e..04b0a03 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestAMRecovery.java
@@ -24,7 +24,6 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
-import java.util.Map;
import java.util.Random;
import org.slf4j.Logger;
@@ -56,7 +55,7 @@ import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.VertexManagerPlugin;
import org.apache.tez.dag.api.VertexManagerPluginContext;
-import org.apache.tez.dag.api.VertexManagerPluginContext.TaskWithLocationHint;
+import org.apache.tez.dag.api.VertexManagerPluginContext.ScheduleTaskRequest;
import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGStatus;
@@ -71,6 +70,7 @@ import org.apache.tez.dag.library.vertexmanager.InputReadyVertexManager;
import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.ProcessorContext;
+import org.apache.tez.runtime.api.TaskAttemptIdentifier;
import org.apache.tez.runtime.api.events.VertexManagerEvent;
import org.apache.tez.runtime.library.processor.SimpleProcessor;
import org.junit.After;
@@ -523,8 +523,8 @@ public class TestAMRecovery {
}
@Override
- public void onSourceTaskCompleted(String srcVertexName, Integer taskId) {
- super.onSourceTaskCompleted(srcVertexName, taskId);
+ public void onSourceTaskCompleted(TaskAttemptIdentifier attempt) {
+ super.onSourceTaskCompleted(attempt);
completedTaskNum ++;
if (getContext().getDAGAttemptNumber() == 1) {
if (conf.getBoolean(FAIL_ON_PARTIAL_FINISHED, true)) {
@@ -532,7 +532,8 @@ public class TestAMRecovery {
System.exit(-1);
}
} else {
- if (completedTaskNum == getContext().getVertexNumTasks(srcVertexName)) {
+ if (completedTaskNum == getContext().
+ getVertexNumTasks(attempt.getTaskIdentifier().getVertexIdentifier().getName())) {
System.exit(-1);
}
}
@@ -562,8 +563,8 @@ public class TestAMRecovery {
}
@Override
- public void onSourceTaskCompleted(String srcVertexName, Integer taskId) {
- super.onSourceTaskCompleted(srcVertexName, taskId);
+ public void onSourceTaskCompleted(TaskAttemptIdentifier attempt) {
+ super.onSourceTaskCompleted(attempt);
completedTaskNum ++;
if (getContext().getDAGAttemptNumber() == 1) {
if (conf.getBoolean(FAIL_ON_PARTIAL_FINISHED, true)) {
@@ -571,7 +572,8 @@ public class TestAMRecovery {
System.exit(-1);
}
} else {
- if (completedTaskNum == getContext().getVertexNumTasks(srcVertexName)) {
+ if (completedTaskNum == getContext().
+ getVertexNumTasks(attempt.getTaskIdentifier().getVertexIdentifier().getName())) {
System.exit(-1);
}
}
@@ -602,8 +604,8 @@ public class TestAMRecovery {
}
@Override
- public void onSourceTaskCompleted(String srcVertexName, Integer taskId) {
- super.onSourceTaskCompleted(srcVertexName, taskId);
+ public void onSourceTaskCompleted(TaskAttemptIdentifier attempt) {
+ super.onSourceTaskCompleted(attempt);
completedTaskNum ++;
if (getContext().getDAGAttemptNumber() == 1) {
if (conf.getBoolean(FAIL_ON_PARTIAL_FINISHED, true)) {
@@ -611,7 +613,8 @@ public class TestAMRecovery {
System.exit(-1);
}
} else {
- if (completedTaskNum == getContext().getVertexNumTasks(srcVertexName)) {
+ if (completedTaskNum == getContext().
+ getVertexNumTasks(attempt.getTaskIdentifier().getVertexIdentifier().getName())) {
System.exit(-1);
}
}
@@ -643,26 +646,26 @@ public class TestAMRecovery {
}
@Override
- public void onVertexStarted(Map<String, List<Integer>> completions)
+ public void onVertexStarted(List<TaskAttemptIdentifier> completions)
throws Exception {
if (getContext().getDAGAttemptNumber() == 1) {
// only schedule one task if it is partiallyFinished case
if (conf.getBoolean(FAIL_ON_PARTIAL_FINISHED, true)) {
- getContext().scheduleVertexTasks(Lists.newArrayList(new TaskWithLocationHint(0, null)));
+ getContext().scheduleTasks(Lists.newArrayList(ScheduleTaskRequest.create(0, null)));
return ;
}
}
// schedule all tasks when it is not partiallyFinished
int taskNum = getContext().getVertexNumTasks(getContext().getVertexName());
- List<TaskWithLocationHint> taskWithLocationHints = new ArrayList<TaskWithLocationHint>();
+ List<ScheduleTaskRequest> taskWithLocationHints = new ArrayList<ScheduleTaskRequest>();
for (int i=0;i<taskNum;++i) {
- taskWithLocationHints.add(new TaskWithLocationHint(i, null));
+ taskWithLocationHints.add(ScheduleTaskRequest.create(i, null));
}
- getContext().scheduleVertexTasks(taskWithLocationHints);
+ getContext().scheduleTasks(taskWithLocationHints);
}
@Override
- public void onSourceTaskCompleted(String srcVertexName, Integer taskId)
+ public void onSourceTaskCompleted(TaskAttemptIdentifier attempt)
throws Exception {
}
@@ -704,9 +707,9 @@ public class TestAMRecovery {
}
@Override
- public void onSourceTaskCompleted(String srcVertexName, Integer taskId) {
+ public void onSourceTaskCompleted(TaskAttemptIdentifier attempt) {
int curAttempt = getContext().getDAGAttemptNumber();
- super.onSourceTaskCompleted(srcVertexName, taskId);
+ super.onSourceTaskCompleted(attempt);
int failOnAttempt = conf.getInt(FAIL_ON_ATTEMPT, 1);
LOG.info("failOnAttempt:" + failOnAttempt);
LOG.info("curAttempt:" + curAttempt);
http://git-wip-us.apache.org/repos/asf/tez/blob/17d5c18f/tez-tests/src/test/java/org/apache/tez/test/TestExceptionPropagation.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestExceptionPropagation.java b/tez-tests/src/test/java/org/apache/tez/test/TestExceptionPropagation.java
index caf0822..7d88fdf 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestExceptionPropagation.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestExceptionPropagation.java
@@ -79,6 +79,7 @@ import org.apache.tez.runtime.api.LogicalOutput;
import org.apache.tez.runtime.api.OutputContext;
import org.apache.tez.runtime.api.ProcessorContext;
import org.apache.tez.runtime.api.Reader;
+import org.apache.tez.runtime.api.TaskAttemptIdentifier;
import org.apache.tez.runtime.api.Writer;
import org.apache.tez.runtime.api.events.DataMovementEvent;
import org.apache.tez.runtime.api.events.InputDataInformationEvent;
@@ -696,7 +697,7 @@ public class TestExceptionPropagation {
}
@Override
- public void onVertexStarted(Map<String, List<Integer>> completions) {
+ public void onVertexStarted(List<TaskAttemptIdentifier> completions) {
if (this.exLocation == ExceptionLocation.VM_ON_VERTEX_STARTED) {
throw new RuntimeException(this.exLocation.name());
}
@@ -739,11 +740,11 @@ public class TestExceptionPropagation {
}
@Override
- public void onSourceTaskCompleted(String srcVertexName, Integer attemptId) {
+ public void onSourceTaskCompleted(TaskAttemptIdentifier attempt) {
if (this.exLocation == ExceptionLocation.VM_ON_SOURCETASK_COMPLETED) {
throw new RuntimeException(this.exLocation.name());
}
- super.onSourceTaskCompleted(srcVertexName, attemptId);
+ super.onSourceTaskCompleted(attempt);
}
@Override
http://git-wip-us.apache.org/repos/asf/tez/blob/17d5c18f/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java b/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java
index 7244d8d..5c6f855 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java
@@ -40,7 +40,7 @@ import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.VertexManagerPlugin;
import org.apache.tez.dag.api.VertexManagerPluginContext;
import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
-import org.apache.tez.dag.api.VertexManagerPluginContext.TaskWithLocationHint;
+import org.apache.tez.dag.api.VertexManagerPluginContext.ScheduleTaskRequest;
import org.apache.tez.dag.api.client.VertexStatus.State;
import org.apache.tez.runtime.api.AbstractLogicalInput;
import org.apache.tez.runtime.api.AbstractLogicalOutput;
@@ -49,6 +49,7 @@ import org.apache.tez.runtime.api.MemoryUpdateCallback;
import org.apache.tez.runtime.api.OutputCommitter;
import org.apache.tez.runtime.api.OutputCommitterContext;
import org.apache.tez.runtime.api.Reader;
+import org.apache.tez.runtime.api.TaskAttemptIdentifier;
import org.apache.tez.runtime.api.InputContext;
import org.apache.tez.runtime.api.OutputContext;
import org.apache.tez.runtime.api.InputInitializer;
@@ -57,15 +58,12 @@ import org.apache.tez.runtime.api.Writer;
import org.apache.tez.runtime.api.events.InputDataInformationEvent;
import org.apache.tez.runtime.api.events.InputInitializerEvent;
import org.apache.tez.runtime.api.events.VertexManagerEvent;
-import org.apache.tez.runtime.api.impl.TezEvent;
import org.apache.tez.test.TestInput;
import org.apache.tez.test.TestOutput;
import org.apache.tez.test.TestProcessor;
import java.util.ArrayList;
import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicInteger;
public class MultiAttemptDAG {
@@ -102,14 +100,11 @@ public class MultiAttemptDAG {
}
@Override
- public void onVertexStarted(Map<String, List<Integer>> completions) {
+ public void onVertexStarted(List<TaskAttemptIdentifier> completions) {
if (completions != null) {
- for (Entry<String, List<Integer>> entry : completions.entrySet()) {
- LOG.info("Received completion events on vertexStarted"
- + ", vertex=" + entry.getKey()
- + ", completions=" + entry.getValue().size());
- numCompletions.addAndGet(entry.getValue().size());
- }
+ LOG.info("Received completion events on vertexStarted"
+ + ", completions=" + completions.size());
+ numCompletions.addAndGet(completions.size());
}
maybeScheduleTasks();
}
@@ -129,20 +124,20 @@ public class MultiAttemptDAG {
} else if (successAttemptId == getContext().getDAGAttemptNumber()) {
LOG.info("Scheduling tasks for vertex=" + getContext().getVertexName());
int numTasks = getContext().getVertexNumTasks(getContext().getVertexName());
- List<TaskWithLocationHint> scheduledTasks = Lists.newArrayListWithCapacity(numTasks);
+ List<ScheduleTaskRequest> scheduledTasks = Lists.newArrayListWithCapacity(numTasks);
for (int i=0; i<numTasks; ++i) {
- scheduledTasks.add(new TaskWithLocationHint(new Integer(i), null));
+ scheduledTasks.add(ScheduleTaskRequest.create(i, null));
}
- getContext().scheduleVertexTasks(scheduledTasks);
+ getContext().scheduleTasks(scheduledTasks);
}
}
}
@Override
- public void onSourceTaskCompleted(String srcVertexName, Integer taskId) {
+ public void onSourceTaskCompleted(TaskAttemptIdentifier attempt) {
LOG.info("Received completion events for source task"
- + ", vertex=" + srcVertexName
- + ", taskIdx=" + taskId);
+ + ", vertex=" + attempt.getTaskIdentifier().getVertexIdentifier().getName()
+ + ", taskIdx=" + attempt.getTaskIdentifier().getIdentifier());
numCompletions.incrementAndGet();
maybeScheduleTasks();
}