You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2015/01/31 02:48:35 UTC

tez git commit: TEZ-2011. InputReadyVertexManager not resilient to updates in parallelism (bikas) (cherry picked from commit b7268699e684fc4d1ebc00b47d8f1f7e4163bf97)

Repository: tez
Updated Branches:
  refs/heads/branch-0.6 8e5d18e63 -> 8cdd988b2


TEZ-2011. InputReadyVertexManager not resilient to updates in parallelism (bikas)
(cherry picked from commit b7268699e684fc4d1ebc00b47d8f1f7e4163bf97)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/8cdd988b
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/8cdd988b
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/8cdd988b

Branch: refs/heads/branch-0.6
Commit: 8cdd988b2549f0aec243a40d3bf7b78b2c9fb4e1
Parents: 8e5d18e
Author: Bikas Saha <bi...@apache.org>
Authored: Fri Jan 30 17:46:27 2015 -0800
Committer: Bikas Saha <bi...@apache.org>
Committed: Fri Jan 30 17:47:48 2015 -0800

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/tez/dag/app/dag/impl/VertexImpl.java |  1 +
 .../vertexmanager/InputReadyVertexManager.java  | 66 ++++++++++++---
 .../TestInputReadyVertexManager.java            | 87 +++++++++++++++++++-
 4 files changed, 140 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/8cdd988b/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 6ecc8e7..796ec9a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -125,6 +125,7 @@ TEZ-UI CHANGES (TEZ-8):
 Release 0.5.4: Unreleased
 
 ALL CHANGES:
+  TEZ-2011. InputReadyVertexManager not resilient to updates in parallelism 
   TEZ-1934. TestAMRecovery may fail due to the execution order is not determined.
   TEZ-1642. TestAMRecovery sometimes fail.
   TEZ-1931. Publish tez version info to Timeline.

http://git-wip-us.apache.org/repos/asf/tez/blob/8cdd988b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index b1c93da..a467e4b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -1560,6 +1560,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
       Preconditions.checkState(vertexToBeReconfiguredByManager, "doneReconfiguringVertex() can be "
           + "invoked only after vertexReconfigurationPlanned() is invoked");
       this.vertexToBeReconfiguredByManager = false;
+      // TEZ-2015 VM may not have configured everything eg. input edge. maybeSendConfiguredEvent()
       if (completelyConfiguredSent.compareAndSet(false, true)) {
         // vertex already started and at that time this event was not sent. Send now.
         stateChangeNotifier.stateChanged(vertexId, new VertexStateUpdate(vertexName,

http://git-wip-us.apache.org/repos/asf/tez/blob/8cdd988b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/InputReadyVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/InputReadyVertexManager.java b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/InputReadyVertexManager.java
index 8f30276..0e3a3ce 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/InputReadyVertexManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/InputReadyVertexManager.java
@@ -18,6 +18,8 @@
 
 package org.apache.tez.dag.library.vertexmanager;
 
+import java.util.Collection;
+import java.util.EnumSet;
 import java.util.List;
 import java.util.Map;
 
@@ -32,11 +34,16 @@ import org.apache.tez.dag.api.TaskLocationHint;
 import org.apache.tez.dag.api.VertexManagerPlugin;
 import org.apache.tez.dag.api.VertexManagerPluginContext;
 import org.apache.tez.dag.api.VertexManagerPluginContext.TaskWithLocationHint;
+import org.apache.tez.dag.api.event.VertexState;
+import org.apache.tez.dag.api.event.VertexStateUpdate;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.events.VertexManagerEvent;
 
+import com.google.common.base.Preconditions;
+import com.google.common.collect.LinkedListMultimap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
 
 @Private
 public class InputReadyVertexManager extends VertexManagerPlugin {
@@ -48,6 +55,8 @@ public class InputReadyVertexManager extends VertexManagerPlugin {
   int oneToOneSrcTasksDoneCount[];
   TaskLocationHint oneToOneLocationHints[];
   int numOneToOneEdges;
+  int numSignalsToWaitFor;
+  Multimap<String, Integer> pendingCompletions = LinkedListMultimap.create();
 
   public InputReadyVertexManager(VertexManagerPluginContext context) {
     super(context);
@@ -67,12 +76,10 @@ public class InputReadyVertexManager extends VertexManagerPlugin {
     }
   }
   
-  @Override
-  public void initialize() {
-  }
-
-  @Override
-  public void onVertexStarted(Map<String, List<Integer>> completions) {
+  void start() {
+    if (!ready()) {
+      return;
+    }
     int numManagedTasks = getContext().getVertexNumTasks(getContext().getVertexName());
     LOG.info("Managing " + numManagedTasks + " tasks for vertex: " + getContext().getVertexName());
     taskIsStarted = new boolean[numManagedTasks];
@@ -117,24 +124,61 @@ public class InputReadyVertexManager extends VertexManagerPlugin {
       oneToOneLocationHints = new TaskLocationHint[oneToOneSrcTaskCount];
     }
 
-    for (Map.Entry<String, List<Integer>> entry : completions.entrySet()) {
+    for (Map.Entry<String, Collection<Integer>> entry :  pendingCompletions.asMap().entrySet()) {
       for (Integer task : entry.getValue()) {
         handleSourceTaskFinished(entry.getKey(), task);
       }
     }
   }
+  
+  boolean ready() {
+    int target = getContext().getInputVertexEdgeProperties().size() + 1;
+    Preconditions.checkState(numSignalsToWaitFor <= target);
+    return (numSignalsToWaitFor == target);
+  }
+  
+  @Override
+  public void initialize() {
+    Map<String, EdgeProperty> edges = getContext().getInputVertexEdgeProperties();
+    // wait for sources and self to start
+    numSignalsToWaitFor = 0;
+    for (String entry : edges.keySet()) {
+      getContext().registerForVertexStateUpdates(entry, EnumSet.of(VertexState.CONFIGURED));
+    }
+  }
+  
+  @Override
+  public synchronized void onVertexStateUpdated(VertexStateUpdate stateUpdate) throws Exception {
+    numSignalsToWaitFor++;
+    LOG.info("Received configured signal from: " + stateUpdate.getVertexName() + 
+        " numConfiguredSources: " + numSignalsToWaitFor);
+    start();
+  }
 
   @Override
-  public void onSourceTaskCompleted(String srcVertexName, Integer taskId) {
-    handleSourceTaskFinished(srcVertexName, taskId);
+  public synchronized void onVertexStarted(Map<String, List<Integer>> completions) {
+    for (Map.Entry<String, List<Integer>> entry : completions.entrySet()) {
+      pendingCompletions.putAll(entry.getKey(), entry.getValue());
+    }
+    numSignalsToWaitFor++;
+    start();
+  }
+
+  @Override
+  public synchronized void onSourceTaskCompleted(String srcVertexName, Integer taskId) {
+    if (ready()) {
+      handleSourceTaskFinished(srcVertexName, taskId);
+    } else {
+      pendingCompletions.put(srcVertexName, taskId);
+    }
   }
 
   @Override
-  public void onVertexManagerEventReceived(VertexManagerEvent vmEvent) {
+  public synchronized void onVertexManagerEventReceived(VertexManagerEvent vmEvent) {
   }
 
   @Override
-  public void onRootVertexInitialized(String inputName,
+  public synchronized void onRootVertexInitialized(String inputName,
       InputDescriptor inputDescriptor, List<Event> events) {
   }
   

http://git-wip-us.apache.org/repos/asf/tez/blob/8cdd988b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestInputReadyVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestInputReadyVertexManager.java b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestInputReadyVertexManager.java
index c6981ed..9a83a51 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestInputReadyVertexManager.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestInputReadyVertexManager.java
@@ -34,6 +34,8 @@ import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.VertexManagerPluginContext;
 import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
 import org.apache.tez.dag.api.VertexManagerPluginContext.TaskWithLocationHint;
+import org.apache.tez.dag.api.event.VertexState;
+import org.apache.tez.dag.api.event.VertexStateUpdate;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -55,7 +57,7 @@ public class TestInputReadyVertexManager {
   }
 
   @Test (timeout=5000)
-  public void testBasicScatterGather() {
+  public void testBasicScatterGather() throws Exception {
     HashMap<String, EdgeProperty> mockInputVertices = 
         new HashMap<String, EdgeProperty>();
     String mockSrcVertexId1 = "Vertex1";
@@ -80,6 +82,9 @@ public class TestInputReadyVertexManager {
     
     InputReadyVertexManager manager = new InputReadyVertexManager(mockContext);
     manager.initialize();
+    // first source vertex configured
+    manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
+    // then own vertex started
     manager.onVertexStarted(initialCompletions);
     manager.onSourceTaskCompleted(mockSrcVertexId1, 1);
     verify(mockContext, times(0)).scheduleVertexTasks(anyList());
@@ -89,7 +94,7 @@ public class TestInputReadyVertexManager {
   }
   
   @Test (timeout=5000)
-  public void testBasicOneToOne() {
+  public void testBasicOneToOne() throws Exception {
     HashMap<String, EdgeProperty> mockInputVertices = 
         new HashMap<String, EdgeProperty>();
     String mockSrcVertexId1 = "Vertex1";
@@ -114,7 +119,10 @@ public class TestInputReadyVertexManager {
     
     InputReadyVertexManager manager = new InputReadyVertexManager(mockContext);
     manager.initialize();
+    // first own vertex started
     manager.onVertexStarted(initialCompletions);
+    // then source vertex configured
+    manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
     verify(mockContext, times(1)).scheduleVertexTasks(requestCaptor.capture());
     Assert.assertEquals(1, requestCaptor.getValue().size());
     Assert.assertEquals(0, requestCaptor.getValue().get(0).getTaskIndex().intValue());
@@ -139,9 +147,56 @@ public class TestInputReadyVertexManager {
     Assert.assertEquals(2, requestCaptor.getValue().get(0)
         .getTaskLocationHint().getAffinitizedTask().getTaskIndex());
   }
+  
+  @Test (timeout=5000)
+  public void testDelayedConfigureOneToOne() throws Exception {
+    HashMap<String, EdgeProperty> mockInputVertices = 
+        new HashMap<String, EdgeProperty>();
+    String mockSrcVertexId1 = "Vertex1";
+    EdgeProperty eProp1 = EdgeProperty.create(
+        EdgeProperty.DataMovementType.ONE_TO_ONE,
+        EdgeProperty.DataSourceType.PERSISTED,
+        SchedulingType.SEQUENTIAL,
+        OutputDescriptor.create("out"),
+        InputDescriptor.create("in"));
+    
+    String mockManagedVertexId = "Vertex";
+    
+    VertexManagerPluginContext mockContext = mock(VertexManagerPluginContext.class);
+    when(mockContext.getInputVertexEdgeProperties()).thenReturn(mockInputVertices);
+    when(mockContext.getVertexName()).thenReturn(mockManagedVertexId);
+    when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(3);
+    when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(3);
+    mockInputVertices.put(mockSrcVertexId1, eProp1);
+    
+    Map<String, List<Integer>> initialCompletions = Maps.newHashMap();
+    initialCompletions.put(mockSrcVertexId1, Collections.singletonList(0));
+    
+    InputReadyVertexManager manager = new InputReadyVertexManager(mockContext);
+    manager.initialize();
+    // first own vertex started
+    manager.onVertexStarted(initialCompletions);
+    verify(mockContext, times(0)).scheduleVertexTasks(requestCaptor.capture());
+    manager.onSourceTaskCompleted(mockSrcVertexId1, 1);
+    verify(mockContext, times(0)).scheduleVertexTasks(requestCaptor.capture());
+    // then source vertex configured. now we start
+    manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
+    verify(mockContext, times(2)).scheduleVertexTasks(requestCaptor.capture());
+    Assert.assertEquals(2, requestCaptor.getAllValues().size());
+    Assert.assertEquals(1, requestCaptor.getValue().size());
+    Assert.assertEquals(1, requestCaptor.getValue().get(0).getTaskIndex().intValue());
+    manager.onSourceTaskCompleted(mockSrcVertexId1, 2);
+    verify(mockContext, times(3)).scheduleVertexTasks(requestCaptor.capture());
+    Assert.assertEquals(1, requestCaptor.getValue().size());
+    Assert.assertEquals(2, requestCaptor.getValue().get(0).getTaskIndex().intValue());
+    Assert.assertEquals(mockSrcVertexId1, requestCaptor.getValue().get(0)
+        .getTaskLocationHint().getAffinitizedTask().getVertexName());
+    Assert.assertEquals(2, requestCaptor.getValue().get(0)
+        .getTaskLocationHint().getAffinitizedTask().getTaskIndex());
+  }
 
   @Test (timeout=5000)
-  public void testComplex() {
+  public void testComplex() throws Exception {
     HashMap<String, EdgeProperty> mockInputVertices = 
         new HashMap<String, EdgeProperty>();
     String mockSrcVertexId1 = "Vertex1";
@@ -192,22 +247,43 @@ public class TestInputReadyVertexManager {
     
     Map<String, List<Integer>> initialCompletions = Maps.newHashMap();
     
-    // 1-1 sources do not match managed tasks
+    // 1-1 sources do not match managed tasks before vertex started
     when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(4);
     InputReadyVertexManager manager = new InputReadyVertexManager(mockContext);
     manager.initialize();
+    manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
+    manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
+    manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
     try {
       manager.onVertexStarted(initialCompletions);
       Assert.assertTrue("Should have exception", false);
     } catch (TezUncheckedException e) {
       e.getMessage().contains("Managed task number must equal 1-1 source");
     }
+
+    // 1-1 sources do not match managed tasks after vertex started
+    when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(3);
+    manager = new InputReadyVertexManager(mockContext);
+    manager.initialize();
+    manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
+    manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
+    manager.onVertexStarted(initialCompletions);
+    when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(4);
+    try {
+      manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
+      Assert.assertTrue("Should have exception", false);
+    } catch (TezUncheckedException e) {
+      e.getMessage().contains("Managed task number must equal 1-1 source");
+    }
     
     // 1-1 sources do not match
     when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(3);
     when(mockContext.getVertexNumTasks(mockSrcVertexId3)).thenReturn(4);
     manager = new InputReadyVertexManager(mockContext);
     manager.initialize();
+    manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
+    manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
+    manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
     try {
       manager.onVertexStarted(initialCompletions);
       Assert.assertTrue("Should have exception", false);
@@ -220,6 +296,9 @@ public class TestInputReadyVertexManager {
     when(mockContext.getVertexNumTasks(mockSrcVertexId3)).thenReturn(3);
     manager = new InputReadyVertexManager(mockContext);
     manager.initialize();
+    manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
+    manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
+    manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
     manager.onVertexStarted(initialCompletions);
     // all 1-1 0's done but not scheduled because v1 is not done
     manager.onSourceTaskCompleted(mockSrcVertexId3, 0);