You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2014/04/10 20:32:47 UTC

git commit: TEZ-480. Create InputReady VertexManager (bikas)

Repository: incubator-tez
Updated Branches:
  refs/heads/master 2e91bdeb0 -> 5f58ecad2


TEZ-480. Create InputReady VertexManager (bikas)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/5f58ecad
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/5f58ecad
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/5f58ecad

Branch: refs/heads/master
Commit: 5f58ecad2eea60e21ded81d586d60d7fe678588b
Parents: 2e91bde
Author: Bikas Saha <bi...@apache.org>
Authored: Thu Apr 10 11:32:28 2014 -0700
Committer: Bikas Saha <bi...@apache.org>
Committed: Thu Apr 10 11:32:28 2014 -0700

----------------------------------------------------------------------
 .../vertexmanager/InputReadyVertexManager.java  | 190 ++++++++++++++++
 .../TestInputReadyVertexManager.java            | 226 +++++++++++++++++++
 .../vertexmanager/TestShuffleVertexManager.java |   2 +-
 3 files changed, 417 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5f58ecad/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/InputReadyVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/InputReadyVertexManager.java b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/InputReadyVertexManager.java
new file mode 100644
index 0000000..95e34d7
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/InputReadyVertexManager.java
@@ -0,0 +1,190 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.library.vertexmanager;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.VertexManagerPlugin;
+import org.apache.tez.dag.api.VertexManagerPluginContext;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.events.VertexManagerEvent;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+public class InputReadyVertexManager implements VertexManagerPlugin {
+  private static final Log LOG = 
+      LogFactory.getLog(InputReadyVertexManager.class);
+
+  VertexManagerPluginContext context;
+  Map<String, SourceVertexInfo> srcVertexInfo = Maps.newHashMap();
+  boolean taskIsStarted[];
+  int templateOneToOne[];
+  int numOneToOneEdges;
+  
+  class SourceVertexInfo {
+    EdgeProperty edgeProperty;
+    int numTasks;
+    int numFinishedTasks;
+    Boolean taskIsFinished[];
+    
+    SourceVertexInfo(int numTasks, EdgeProperty edgeProperty) {
+      this.numTasks = numTasks;
+      this.numFinishedTasks = 0;
+      this.edgeProperty = edgeProperty;
+      this.taskIsFinished = new Boolean[numTasks];
+    }
+  }
+  
+  @Override
+  public void initialize(VertexManagerPluginContext context) {
+    this.context = context;
+  }
+
+  @Override
+  public void onVertexStarted(Map<String, List<Integer>> completions) {
+    int numManagedTasks = context.getVertexNumTasks(context.getVertexName());
+    LOG.info("Managing " + numManagedTasks + " for vertex: " + context.getVertexName());
+    taskIsStarted = new boolean[numManagedTasks];
+
+    // find out about all input edge types. If there is a custom edge then 
+    // TODO Until TEZ-1013 we cannot handle custom input formats
+    Map<String, EdgeProperty> edges = context.getInputVertexEdgeProperties();
+    int oneToOneSrcTaskCount = 0;
+    numOneToOneEdges = 0;
+    for (Map.Entry<String, EdgeProperty> entry : edges.entrySet()) {
+      EdgeProperty edgeProp = entry.getValue();
+      String srcVertex = entry.getKey();
+      int numSrcTasks = context.getVertexNumTasks(srcVertex);
+      switch (edgeProp.getDataMovementType()) {
+      case CUSTOM:
+        throw new TezUncheckedException("Cannot handle custom edge");
+      case ONE_TO_ONE:
+        numOneToOneEdges++;
+        if (oneToOneSrcTaskCount == 0) {
+          oneToOneSrcTaskCount = numSrcTasks;
+        } else if (oneToOneSrcTaskCount != numSrcTasks) {
+          throw new TezUncheckedException(
+              "All 1-1 source vertices must have identical concurrency");
+        }
+        break;
+      case SCATTER_GATHER:
+      case BROADCAST:
+        break;
+      default:
+        throw new TezUncheckedException(
+            "Unknown edge type: " + edgeProp.getDataMovementType());
+      }
+      srcVertexInfo.put(srcVertex, new SourceVertexInfo(numSrcTasks, edgeProp));
+    }
+    
+    if (numOneToOneEdges > 0) {
+      if (oneToOneSrcTaskCount != numManagedTasks) {
+        throw new TezUncheckedException(
+            "Managed task number must equal 1-1 source task number");
+      }
+      templateOneToOne = new int[oneToOneSrcTaskCount];
+    }
+
+    for (Map.Entry<String, List<Integer>> entry : completions.entrySet()) {
+      for (Integer task : entry.getValue()) {
+        handleSouceTaskFinished(entry.getKey(), task);
+      }
+    }
+  }
+
+  @Override
+  public void onSourceTaskCompleted(String srcVertexName, Integer taskId) {
+    handleSouceTaskFinished(srcVertexName, taskId);
+  }
+
+  @Override
+  public void onVertexManagerEventReceived(VertexManagerEvent vmEvent) {
+  }
+
+  @Override
+  public void onRootVertexInitialized(String inputName,
+      InputDescriptor inputDescriptor, List<Event> events) {
+  }
+  
+  void handleSouceTaskFinished(String vertex, Integer taskId) {
+    SourceVertexInfo srcInfo = srcVertexInfo.get(vertex);
+    if (srcInfo.taskIsFinished[taskId.intValue()] == null) {
+      // not a duplicate completion
+      srcInfo.taskIsFinished[taskId.intValue()] = new Boolean(true);
+      srcInfo.numFinishedTasks++;
+      if (srcInfo.edgeProperty.getDataMovementType() == DataMovementType.ONE_TO_ONE) {
+        templateOneToOne[taskId.intValue()]++;
+      }
+    }
+    
+    // custom edge needs to tell us which of our tasks its connected to
+    // for now only-built in edges supported
+    if (srcInfo.edgeProperty.getDataMovementType() != DataMovementType.ONE_TO_ONE
+        && srcInfo.numTasks != srcInfo.numFinishedTasks) {
+      // we depend on all tasks to finish. So nothing to do now.
+      return;
+    }
+    
+    // currently finished vertex task may trigger us to schedule
+    for (SourceVertexInfo vInfo : srcVertexInfo.values()) {
+      if (vInfo.edgeProperty.getDataMovementType() != DataMovementType.ONE_TO_ONE) {
+        // we depend on all tasks to finish.
+        if (vInfo.numTasks != vInfo.numFinishedTasks) {
+          // we depend on all tasks to finish. So nothing to do now.
+          return;
+        }
+      }
+    }
+    
+    // all source vertices will full dependencies are done
+    List<Integer> tasksToStart = null;
+    if (numOneToOneEdges == 0) {
+      // no 1-1 dependency. Start all tasks
+      tasksToStart = Lists.newArrayListWithCapacity(taskIsStarted.length);
+      for (int i=0; i<taskIsStarted.length; ++i) {
+        taskIsStarted[i] = true;
+        tasksToStart.add(new Integer(i));
+      }
+    } else {
+      // start only the ready 1-1 tasks
+      tasksToStart = Lists.newLinkedList();
+      for (int i=0; i<taskIsStarted.length; ++i) {
+        if (!taskIsStarted[i] && templateOneToOne[i] == numOneToOneEdges) {
+          taskIsStarted[i] = true;
+          tasksToStart.add(new Integer(i));
+        }
+      }
+    }
+    
+    if (tasksToStart != null && !tasksToStart.isEmpty()) {
+      // TODO determine placement after TEZ-1018
+      context.scheduleVertexTasks(tasksToStart);
+    }
+    
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5f58ecad/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestInputReadyVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestInputReadyVertexManager.java b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestInputReadyVertexManager.java
new file mode 100644
index 0000000..289f004
--- /dev/null
+++ b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestInputReadyVertexManager.java
@@ -0,0 +1,226 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.library.vertexmanager;
+
+import static org.mockito.Mockito.*;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.OutputDescriptor;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.VertexManagerPluginContext;
+import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.MockitoAnnotations;
+
+import com.google.common.collect.Maps;
+
+@SuppressWarnings("unchecked")
+public class TestInputReadyVertexManager {
+  
+  @Captor
+  ArgumentCaptor<List<Integer>> requestCaptor;
+  
+  @Before
+  public void init() {
+    MockitoAnnotations.initMocks(this);
+  }
+
+  @Test (timeout=5000)
+  public void testBasicScatterGather() {
+    HashMap<String, EdgeProperty> mockInputVertices = 
+        new HashMap<String, EdgeProperty>();
+    String mockSrcVertexId1 = "Vertex1";
+    EdgeProperty eProp1 = new EdgeProperty(
+        EdgeProperty.DataMovementType.SCATTER_GATHER,
+        EdgeProperty.DataSourceType.PERSISTED,
+        SchedulingType.SEQUENTIAL, 
+        new OutputDescriptor("out"),
+        new InputDescriptor("in"));
+    
+    String mockManagedVertexId = "Vertex";
+    
+    VertexManagerPluginContext mockContext = mock(VertexManagerPluginContext.class);
+    when(mockContext.getInputVertexEdgeProperties()).thenReturn(mockInputVertices);
+    when(mockContext.getVertexName()).thenReturn(mockManagedVertexId);
+    when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(2);
+    when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(3);
+    mockInputVertices.put(mockSrcVertexId1, eProp1);
+    
+    Map<String, List<Integer>> initialCompletions = Maps.newHashMap();
+    initialCompletions.put(mockSrcVertexId1, Collections.singletonList(0));
+    
+    InputReadyVertexManager manager = new InputReadyVertexManager();
+    manager.initialize(mockContext);
+    manager.onVertexStarted(initialCompletions);
+    manager.onSourceTaskCompleted(mockSrcVertexId1, 1);
+    verify(mockContext, times(0)).scheduleVertexTasks(anyList());
+    manager.onSourceTaskCompleted(mockSrcVertexId1, 2);
+    verify(mockContext, times(1)).scheduleVertexTasks(requestCaptor.capture());
+    Assert.assertEquals(2, requestCaptor.getValue().size());
+  }
+  
+  @Test (timeout=5000)
+  public void testBasicOneToOne() {
+    HashMap<String, EdgeProperty> mockInputVertices = 
+        new HashMap<String, EdgeProperty>();
+    String mockSrcVertexId1 = "Vertex1";
+    EdgeProperty eProp1 = new EdgeProperty(
+        EdgeProperty.DataMovementType.ONE_TO_ONE,
+        EdgeProperty.DataSourceType.PERSISTED,
+        SchedulingType.SEQUENTIAL, 
+        new OutputDescriptor("out"),
+        new InputDescriptor("in"));
+    
+    String mockManagedVertexId = "Vertex";
+    
+    VertexManagerPluginContext mockContext = mock(VertexManagerPluginContext.class);
+    when(mockContext.getInputVertexEdgeProperties()).thenReturn(mockInputVertices);
+    when(mockContext.getVertexName()).thenReturn(mockManagedVertexId);
+    when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(3);
+    when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(3);
+    mockInputVertices.put(mockSrcVertexId1, eProp1);
+    
+    Map<String, List<Integer>> initialCompletions = Maps.newHashMap();
+    initialCompletions.put(mockSrcVertexId1, Collections.singletonList(0));
+    
+    InputReadyVertexManager manager = new InputReadyVertexManager();
+    manager.initialize(mockContext);
+    manager.onVertexStarted(initialCompletions);
+    verify(mockContext, times(1)).scheduleVertexTasks(requestCaptor.capture());
+    Assert.assertEquals(1, requestCaptor.getValue().size());
+    Assert.assertEquals(0, requestCaptor.getValue().get(0).intValue());
+    manager.onSourceTaskCompleted(mockSrcVertexId1, 1);
+    verify(mockContext, times(2)).scheduleVertexTasks(requestCaptor.capture());
+    Assert.assertEquals(1, requestCaptor.getValue().size());
+    Assert.assertEquals(1, requestCaptor.getValue().get(0).intValue());
+    manager.onSourceTaskCompleted(mockSrcVertexId1, 2);
+    verify(mockContext, times(3)).scheduleVertexTasks(requestCaptor.capture());
+    Assert.assertEquals(1, requestCaptor.getValue().size());
+    Assert.assertEquals(2, requestCaptor.getValue().get(0).intValue());
+  }
+
+  @Test (timeout=5000)
+  public void testComplex() {
+    HashMap<String, EdgeProperty> mockInputVertices = 
+        new HashMap<String, EdgeProperty>();
+    String mockSrcVertexId1 = "Vertex1";
+    EdgeProperty eProp1 = new EdgeProperty(
+        EdgeProperty.DataMovementType.SCATTER_GATHER,
+        EdgeProperty.DataSourceType.PERSISTED,
+        SchedulingType.SEQUENTIAL, 
+        new OutputDescriptor("out"),
+        new InputDescriptor("in"));
+    String mockSrcVertexId2 = "Vertex2";
+    EdgeProperty eProp2 = new EdgeProperty(
+        EdgeProperty.DataMovementType.ONE_TO_ONE,
+        EdgeProperty.DataSourceType.PERSISTED,
+        SchedulingType.SEQUENTIAL, 
+        new OutputDescriptor("out"),
+        new InputDescriptor("in"));
+    String mockSrcVertexId3 = "Vertex3";
+    EdgeProperty eProp3 = new EdgeProperty(
+        EdgeProperty.DataMovementType.ONE_TO_ONE,
+        EdgeProperty.DataSourceType.PERSISTED, 
+        SchedulingType.SEQUENTIAL, 
+        new OutputDescriptor("out"),
+        new InputDescriptor("in"));
+    
+    String mockManagedVertexId = "Vertex";
+    
+    VertexManagerPluginContext mockContext = mock(VertexManagerPluginContext.class);
+    when(mockContext.getInputVertexEdgeProperties()).thenReturn(mockInputVertices);
+    when(mockContext.getVertexName()).thenReturn(mockManagedVertexId);
+    when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(3);
+    when(mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn(3);
+    when(mockContext.getVertexNumTasks(mockSrcVertexId3)).thenReturn(3);
+    mockInputVertices.put(mockSrcVertexId1, eProp1);
+    mockInputVertices.put(mockSrcVertexId2, eProp2);
+    mockInputVertices.put(mockSrcVertexId3, eProp3);
+    
+    Map<String, List<Integer>> initialCompletions = Maps.newHashMap();
+    
+    // 1-1 sources do not match managed tasks
+    InputReadyVertexManager manager = new InputReadyVertexManager();
+    when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(4);
+    manager.initialize(mockContext);
+    try {
+      manager.onVertexStarted(initialCompletions);
+      Assert.assertTrue("Should have exception", false);
+    } catch (TezUncheckedException e) {
+      e.getMessage().contains("Managed task number must equal 1-1 source");
+    }
+    
+    // 1-1 sources do not match
+    manager = new InputReadyVertexManager();
+    when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(3);
+    when(mockContext.getVertexNumTasks(mockSrcVertexId3)).thenReturn(4);
+    manager.initialize(mockContext);
+    try {
+      manager.onVertexStarted(initialCompletions);
+      Assert.assertTrue("Should have exception", false);
+    } catch (TezUncheckedException e) {
+      e.getMessage().contains("1-1 source vertices must have identical concurrency");
+    }
+    
+    initialCompletions.put(mockSrcVertexId1, Collections.singletonList(0));
+    initialCompletions.put(mockSrcVertexId2, Collections.singletonList(0));
+    when(mockContext.getVertexNumTasks(mockSrcVertexId3)).thenReturn(3);
+    manager = new InputReadyVertexManager();
+    manager.initialize(mockContext);
+    manager.onVertexStarted(initialCompletions);
+    // all 1-1 0's done but not scheduled because v1 is not done
+    manager.onSourceTaskCompleted(mockSrcVertexId3, 0);
+    manager.onSourceTaskCompleted(mockSrcVertexId1, 1);
+    manager.onSourceTaskCompleted(mockSrcVertexId1, 1); // duplicate
+    manager.onSourceTaskCompleted(mockSrcVertexId2, 1);
+    verify(mockContext, times(0)).scheduleVertexTasks(anyList());
+    manager.onSourceTaskCompleted(mockSrcVertexId1, 2); // v1 done
+    verify(mockContext, times(1)).scheduleVertexTasks(requestCaptor.capture());
+    Assert.assertEquals(1, requestCaptor.getValue().size());
+    Assert.assertEquals(0, requestCaptor.getValue().get(0).intValue());
+    // 1-1 completion triggers since other 1-1 is done
+    manager.onSourceTaskCompleted(mockSrcVertexId3, 1);
+    verify(mockContext, times(2)).scheduleVertexTasks(requestCaptor.capture());
+    Assert.assertEquals(1, requestCaptor.getValue().size());
+    Assert.assertEquals(1, requestCaptor.getValue().get(0).intValue());
+    // 1-1 completion does not trigger since other 1-1 is not done
+    manager.onSourceTaskCompleted(mockSrcVertexId2, 2);
+    verify(mockContext, times(2)).scheduleVertexTasks(anyList());
+    // 1-1 completion trigger start
+    manager.onSourceTaskCompleted(mockSrcVertexId3, 2);
+    verify(mockContext, times(3)).scheduleVertexTasks(requestCaptor.capture());
+    Assert.assertEquals(1, requestCaptor.getValue().size());
+    Assert.assertEquals(2, requestCaptor.getValue().get(0).intValue());
+    
+    // no more starts
+    manager.onSourceTaskCompleted(mockSrcVertexId3, 2);
+    verify(mockContext, times(3)).scheduleVertexTasks(anyList());
+    
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5f58ecad/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
index 2db972f..844cb1a 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java
@@ -253,7 +253,7 @@ public class TestShuffleVertexManager {
   }
   
   @SuppressWarnings({ "unchecked", "rawtypes" })
-  @Test//(timeout = 5000)
+  @Test(timeout = 5000)
   public void testShuffleVertexManagerSlowStart() {
     Configuration conf = new Configuration();
     ShuffleVertexManager manager = null;