You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2014/04/10 20:32:47 UTC
git commit: TEZ-480. Create InputReady VertexManager (bikas)
Repository: incubator-tez
Updated Branches:
refs/heads/master 2e91bdeb0 -> 5f58ecad2
TEZ-480. Create InputReady VertexManager (bikas)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/5f58ecad
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/5f58ecad
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/5f58ecad
Branch: refs/heads/master
Commit: 5f58ecad2eea60e21ded81d586d60d7fe678588b
Parents: 2e91bde
Author: Bikas Saha <bi...@apache.org>
Authored: Thu Apr 10 11:32:28 2014 -0700
Committer: Bikas Saha <bi...@apache.org>
Committed: Thu Apr 10 11:32:28 2014 -0700
----------------------------------------------------------------------
.../vertexmanager/InputReadyVertexManager.java | 190 ++++++++++++++++
.../TestInputReadyVertexManager.java | 226 +++++++++++++++++++
.../vertexmanager/TestShuffleVertexManager.java | 2 +-
3 files changed, 417 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5f58ecad/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/InputReadyVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/InputReadyVertexManager.java b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/InputReadyVertexManager.java
new file mode 100644
index 0000000..95e34d7
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/InputReadyVertexManager.java
@@ -0,0 +1,190 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.library.vertexmanager;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.VertexManagerPlugin;
+import org.apache.tez.dag.api.VertexManagerPluginContext;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.events.VertexManagerEvent;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+public class InputReadyVertexManager implements VertexManagerPlugin {
+ private static final Log LOG =
+ LogFactory.getLog(InputReadyVertexManager.class);
+
+ VertexManagerPluginContext context;
+ Map<String, SourceVertexInfo> srcVertexInfo = Maps.newHashMap();
+ boolean taskIsStarted[];
+ int templateOneToOne[];
+ int numOneToOneEdges;
+
+ class SourceVertexInfo {
+ EdgeProperty edgeProperty;
+ int numTasks;
+ int numFinishedTasks;
+ Boolean taskIsFinished[];
+
+ SourceVertexInfo(int numTasks, EdgeProperty edgeProperty) {
+ this.numTasks = numTasks;
+ this.numFinishedTasks = 0;
+ this.edgeProperty = edgeProperty;
+ this.taskIsFinished = new Boolean[numTasks];
+ }
+ }
+
+ @Override
+ public void initialize(VertexManagerPluginContext context) {
+ this.context = context;
+ }
+
+ @Override
+ public void onVertexStarted(Map<String, List<Integer>> completions) {
+ int numManagedTasks = context.getVertexNumTasks(context.getVertexName());
+ LOG.info("Managing " + numManagedTasks + " for vertex: " + context.getVertexName());
+ taskIsStarted = new boolean[numManagedTasks];
+
+ // find out about all input edge types. If there is a custom edge then
+ // TODO Until TEZ-1013 we cannot handle custom input formats
+ Map<String, EdgeProperty> edges = context.getInputVertexEdgeProperties();
+ int oneToOneSrcTaskCount = 0;
+ numOneToOneEdges = 0;
+ for (Map.Entry<String, EdgeProperty> entry : edges.entrySet()) {
+ EdgeProperty edgeProp = entry.getValue();
+ String srcVertex = entry.getKey();
+ int numSrcTasks = context.getVertexNumTasks(srcVertex);
+ switch (edgeProp.getDataMovementType()) {
+ case CUSTOM:
+ throw new TezUncheckedException("Cannot handle custom edge");
+ case ONE_TO_ONE:
+ numOneToOneEdges++;
+ if (oneToOneSrcTaskCount == 0) {
+ oneToOneSrcTaskCount = numSrcTasks;
+ } else if (oneToOneSrcTaskCount != numSrcTasks) {
+ throw new TezUncheckedException(
+ "All 1-1 source vertices must have identical concurrency");
+ }
+ break;
+ case SCATTER_GATHER:
+ case BROADCAST:
+ break;
+ default:
+ throw new TezUncheckedException(
+ "Unknown edge type: " + edgeProp.getDataMovementType());
+ }
+ srcVertexInfo.put(srcVertex, new SourceVertexInfo(numSrcTasks, edgeProp));
+ }
+
+ if (numOneToOneEdges > 0) {
+ if (oneToOneSrcTaskCount != numManagedTasks) {
+ throw new TezUncheckedException(
+ "Managed task number must equal 1-1 source task number");
+ }
+ templateOneToOne = new int[oneToOneSrcTaskCount];
+ }
+
+ for (Map.Entry<String, List<Integer>> entry : completions.entrySet()) {
+ for (Integer task : entry.getValue()) {
+ handleSouceTaskFinished(entry.getKey(), task);
+ }
+ }
+ }
+
+ @Override
+ public void onSourceTaskCompleted(String srcVertexName, Integer taskId) {
+ handleSouceTaskFinished(srcVertexName, taskId);
+ }
+
+ @Override
+ public void onVertexManagerEventReceived(VertexManagerEvent vmEvent) {
+ }
+
+ @Override
+ public void onRootVertexInitialized(String inputName,
+ InputDescriptor inputDescriptor, List<Event> events) {
+ }
+
+ void handleSouceTaskFinished(String vertex, Integer taskId) {
+ SourceVertexInfo srcInfo = srcVertexInfo.get(vertex);
+ if (srcInfo.taskIsFinished[taskId.intValue()] == null) {
+ // not a duplicate completion
+ srcInfo.taskIsFinished[taskId.intValue()] = new Boolean(true);
+ srcInfo.numFinishedTasks++;
+ if (srcInfo.edgeProperty.getDataMovementType() == DataMovementType.ONE_TO_ONE) {
+ templateOneToOne[taskId.intValue()]++;
+ }
+ }
+
+ // custom edge needs to tell us which of our tasks its connected to
+ // for now only-built in edges supported
+ if (srcInfo.edgeProperty.getDataMovementType() != DataMovementType.ONE_TO_ONE
+ && srcInfo.numTasks != srcInfo.numFinishedTasks) {
+ // we depend on all tasks to finish. So nothing to do now.
+ return;
+ }
+
+ // currently finished vertex task may trigger us to schedule
+ for (SourceVertexInfo vInfo : srcVertexInfo.values()) {
+ if (vInfo.edgeProperty.getDataMovementType() != DataMovementType.ONE_TO_ONE) {
+ // we depend on all tasks to finish.
+ if (vInfo.numTasks != vInfo.numFinishedTasks) {
+ // we depend on all tasks to finish. So nothing to do now.
+ return;
+ }
+ }
+ }
+
+ // all source vertices will full dependencies are done
+ List<Integer> tasksToStart = null;
+ if (numOneToOneEdges == 0) {
+ // no 1-1 dependency. Start all tasks
+ tasksToStart = Lists.newArrayListWithCapacity(taskIsStarted.length);
+ for (int i=0; i<taskIsStarted.length; ++i) {
+ taskIsStarted[i] = true;
+ tasksToStart.add(new Integer(i));
+ }
+ } else {
+ // start only the ready 1-1 tasks
+ tasksToStart = Lists.newLinkedList();
+ for (int i=0; i<taskIsStarted.length; ++i) {
+ if (!taskIsStarted[i] && templateOneToOne[i] == numOneToOneEdges) {
+ taskIsStarted[i] = true;
+ tasksToStart.add(new Integer(i));
+ }
+ }
+ }
+
+ if (tasksToStart != null && !tasksToStart.isEmpty()) {
+ // TODO determine placement after TEZ-1018
+ context.scheduleVertexTasks(tasksToStart);
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5f58ecad/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestInputReadyVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestInputReadyVertexManager.java b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestInputReadyVertexManager.java
new file mode 100644
index 0000000..289f004
--- /dev/null
+++ b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestInputReadyVertexManager.java
@@ -0,0 +1,226 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.library.vertexmanager;
+
+import static org.mockito.Mockito.*;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.OutputDescriptor;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.VertexManagerPluginContext;
+import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.MockitoAnnotations;
+
+import com.google.common.collect.Maps;
+
+@SuppressWarnings("unchecked")
+public class TestInputReadyVertexManager {
+
+ @Captor
+ ArgumentCaptor<List<Integer>> requestCaptor;
+
+ @Before
+ public void init() {
+ MockitoAnnotations.initMocks(this);
+ }
+
+ @Test (timeout=5000)
+ public void testBasicScatterGather() {
+ HashMap<String, EdgeProperty> mockInputVertices =
+ new HashMap<String, EdgeProperty>();
+ String mockSrcVertexId1 = "Vertex1";
+ EdgeProperty eProp1 = new EdgeProperty(
+ EdgeProperty.DataMovementType.SCATTER_GATHER,
+ EdgeProperty.DataSourceType.PERSISTED,
+ SchedulingType.SEQUENTIAL,
+ new OutputDescriptor("out"),
+ new InputDescriptor("in"));
+
+ String mockManagedVertexId = "Vertex";
+
+ VertexManagerPluginContext mockContext = mock(VertexManagerPluginContext.class);
+ when(mockContext.getInputVertexEdgeProperties()).thenReturn(mockInputVertices);
+ when(mockContext.getVertexName()).thenReturn(mockManagedVertexId);
+ when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(2);
+ when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(3);
+ mockInputVertices.put(mockSrcVertexId1, eProp1);
+
+ Map<String, List<Integer>> initialCompletions = Maps.newHashMap();
+ initialCompletions.put(mockSrcVertexId1, Collections.singletonList(0));
+
+ InputReadyVertexManager manager = new InputReadyVertexManager();
+ manager.initialize(mockContext);
+ manager.onVertexStarted(initialCompletions);
+ manager.onSourceTaskCompleted(mockSrcVertexId1, 1);
+ verify(mockContext, times(0)).scheduleVertexTasks(anyList());
+ manager.onSourceTaskCompleted(mockSrcVertexId1, 2);
+ verify(mockContext, times(1)).scheduleVertexTasks(requestCaptor.capture());
+ Assert.assertEquals(2, requestCaptor.getValue().size());
+ }
+
+ @Test (timeout=5000)
+ public void testBasicOneToOne() {
+ HashMap<String, EdgeProperty> mockInputVertices =
+ new HashMap<String, EdgeProperty>();
+ String mockSrcVertexId1 = "Vertex1";
+ EdgeProperty eProp1 = new EdgeProperty(
+ EdgeProperty.DataMovementType.ONE_TO_ONE,
+ EdgeProperty.DataSourceType.PERSISTED,
+ SchedulingType.SEQUENTIAL,
+ new OutputDescriptor("out"),
+ new InputDescriptor("in"));
+
+ String mockManagedVertexId = "Vertex";
+
+ VertexManagerPluginContext mockContext = mock(VertexManagerPluginContext.class);
+ when(mockContext.getInputVertexEdgeProperties()).thenReturn(mockInputVertices);
+ when(mockContext.getVertexName()).thenReturn(mockManagedVertexId);
+ when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(3);
+ when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(3);
+ mockInputVertices.put(mockSrcVertexId1, eProp1);
+
+ Map<String, List<Integer>> initialCompletions = Maps.newHashMap();
+ initialCompletions.put(mockSrcVertexId1, Collections.singletonList(0));
+
+ InputReadyVertexManager manager = new InputReadyVertexManager();
+ manager.initialize(mockContext);
+ manager.onVertexStarted(initialCompletions);
+ verify(mockContext, times(1)).scheduleVertexTasks(requestCaptor.capture());
+ Assert.assertEquals(1, requestCaptor.getValue().size());
+ Assert.assertEquals(0, requestCaptor.getValue().get(0).intValue());
+ manager.onSourceTaskCompleted(mockSrcVertexId1, 1);
+ verify(mockContext, times(2)).scheduleVertexTasks(requestCaptor.capture());
+ Assert.assertEquals(1, requestCaptor.getValue().size());
+ Assert.assertEquals(1, requestCaptor.getValue().get(0).intValue());
+ manager.onSourceTaskCompleted(mockSrcVertexId1, 2);
+ verify(mockContext, times(3)).scheduleVertexTasks(requestCaptor.capture());
+ Assert.assertEquals(1, requestCaptor.getValue().size());
+ Assert.assertEquals(2, requestCaptor.getValue().get(0).intValue());
+ }
+
+ @Test (timeout=5000)
+ public void testComplex() {
+ HashMap<String, EdgeProperty> mockInputVertices =
+ new HashMap<String, EdgeProperty>();
+ String mockSrcVertexId1 = "Vertex1";
+ EdgeProperty eProp1 = new EdgeProperty(
+ EdgeProperty.DataMovementType.SCATTER_GATHER,
+ EdgeProperty.DataSourceType.PERSISTED,
+ SchedulingType.SEQUENTIAL,
+ new OutputDescriptor("out"),
+ new InputDescriptor("in"));
+ String mockSrcVertexId2 = "Vertex2";
+ EdgeProperty eProp2 = new EdgeProperty(
+ EdgeProperty.DataMovementType.ONE_TO_ONE,
+ EdgeProperty.DataSourceType.PERSISTED,
+ SchedulingType.SEQUENTIAL,
+ new OutputDescriptor("out"),
+ new InputDescriptor("in"));
+ String mockSrcVertexId3 = "Vertex3";
+ EdgeProperty eProp3 = new EdgeProperty(
+ EdgeProperty.DataMovementType.ONE_TO_ONE,
+ EdgeProperty.DataSourceType.PERSISTED,
+ SchedulingType.SEQUENTIAL,
+ new OutputDescriptor("out"),
+ new InputDescriptor("in"));
+
+ String mockManagedVertexId = "Vertex";
+
+ VertexManagerPluginContext mockContext = mock(VertexManagerPluginContext.class);
+ when(mockContext.getInputVertexEdgeProperties()).thenReturn(mockInputVertices);
+ when(mockContext.getVertexName()).thenReturn(mockManagedVertexId);
+ when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(3);
+ when(mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn(3);
+ when(mockContext.getVertexNumTasks(mockSrcVertexId3)).thenReturn(3);
+ mockInputVertices.put(mockSrcVertexId1, eProp1);
+ mockInputVertices.put(mockSrcVertexId2, eProp2);
+ mockInputVertices.put(mockSrcVertexId3, eProp3);
+
+ Map<String, List<Integer>> initialCompletions = Maps.newHashMap();
+
+ // 1-1 sources do not match managed tasks
+ InputReadyVertexManager manager = new InputReadyVertexManager();
+ when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(4);
+ manager.initialize(mockContext);
+ try {
+ manager.onVertexStarted(initialCompletions);
+ Assert.assertTrue("Should have exception", false);
+ } catch (TezUncheckedException e) {
+ e.getMessage().contains("Managed task number must equal 1-1 source");
+ }
+
+ // 1-1 sources do not match
+ manager = new InputReadyVertexManager();
+ when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(3);
+ when(mockContext.getVertexNumTasks(mockSrcVertexId3)).thenReturn(4);
+ manager.initialize(mockContext);
+ try {
+ manager.onVertexStarted(initialCompletions);
+ Assert.assertTrue("Should have exception", false);
+ } catch (TezUncheckedException e) {
+ e.getMessage().contains("1-1 source vertices must have identical concurrency");
+ }
+
+ initialCompletions.put(mockSrcVertexId1, Collections.singletonList(0));
+ initialCompletions.put(mockSrcVertexId2, Collections.singletonList(0));
+ when(mockContext.getVertexNumTasks(mockSrcVertexId3)).thenReturn(3);
+ manager = new InputReadyVertexManager();
+ manager.initialize(mockContext);
+ manager.onVertexStarted(initialCompletions);
+ // all 1-1 0's done but not scheduled because v1 is not done
+ manager.onSourceTaskCompleted(mockSrcVertexId3, 0);
+ manager.onSourceTaskCompleted(mockSrcVertexId1, 1);
+ manager.onSourceTaskCompleted(mockSrcVertexId1, 1); // duplicate
+ manager.onSourceTaskCompleted(mockSrcVertexId2, 1);
+ verify(mockContext, times(0)).scheduleVertexTasks(anyList());
+ manager.onSourceTaskCompleted(mockSrcVertexId1, 2); // v1 done
+ verify(mockContext, times(1)).scheduleVertexTasks(requestCaptor.capture());
+ Assert.assertEquals(1, requestCaptor.getValue().size());
+ Assert.assertEquals(0, requestCaptor.getValue().get(0).intValue());
+ // 1-1 completion triggers since other 1-1 is done
+ manager.onSourceTaskCompleted(mockSrcVertexId3, 1);
+ verify(mockContext, times(2)).scheduleVertexTasks(requestCaptor.capture());
+ Assert.assertEquals(1, requestCaptor.getValue().size());
+ Assert.assertEquals(1, requestCaptor.getValue().get(0).intValue());
+ // 1-1 completion does not trigger since other 1-1 is not done
+ manager.onSourceTaskCompleted(mockSrcVertexId2, 2);
+ verify(mockContext, times(2)).scheduleVertexTasks(anyList());
+ // 1-1 completion trigger start
+ manager.onSourceTaskCompleted(mockSrcVertexId3, 2);
+ verify(mockContext, times(3)).scheduleVertexTasks(requestCaptor.capture());
+ Assert.assertEquals(1, requestCaptor.getValue().size());
+ Assert.assertEquals(2, requestCaptor.getValue().get(0).intValue());
+
+ // no more starts
+ manager.onSourceTaskCompleted(mockSrcVertexId3, 2);
+ verify(mockContext, times(3)).scheduleVertexTasks(anyList());
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5f58ecad/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
index 2db972f..844cb1a 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
@@ -253,7 +253,7 @@ public class TestShuffleVertexManager {
}
@SuppressWarnings({ "unchecked", "rawtypes" })
- @Test//(timeout = 5000)
+ @Test(timeout = 5000)
public void testShuffleVertexManagerSlowStart() {
Configuration conf = new Configuration();
ShuffleVertexManager manager = null;