You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by je...@apache.org on 2014/09/18 21:49:48 UTC

[03/25] git commit: Revert "TEZ-1494. DAG hangs waiting for ShuffleManager.getNextInput() (Rajesh Balamohan)"

Revert "TEZ-1494. DAG hangs waiting for ShuffleManager.getNextInput() (Rajesh Balamohan)"

This reverts commit de008b54a208881321d15addfe35b8094720be25.


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/80b5795b
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/80b5795b
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/80b5795b

Branch: refs/heads/TEZ-8
Commit: 80b5795b13b7483d876b741245708a56794cd771
Parents: f65e65a
Author: Rajesh Balamohan <rb...@apache.org>
Authored: Thu Sep 11 03:07:16 2014 +0530
Committer: Rajesh Balamohan <rb...@apache.org>
Committed: Fri Sep 12 03:54:46 2014 +0530

----------------------------------------------------------------------
 .../dag/impl/ImmediateStartVertexManager.java   |  92 +----
 .../app/dag/impl/RootInputVertexManager.java    |  27 +-
 .../tez/dag/app/dag/impl/TestVertexImpl.java    | 364 ++-----------------
 3 files changed, 66 insertions(+), 417 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/80b5795b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ImmediateStartVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ImmediateStartVertexManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ImmediateStartVertexManager.java
index ac2b851..b202d70 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ImmediateStartVertexManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ImmediateStartVertexManager.java
@@ -18,11 +18,9 @@
 
 package org.apache.tez.dag.app.dag.impl;
 
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.tez.dag.api.EdgeProperty;
+import java.util.List;
+import java.util.Map;
+
 import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.VertexManagerPlugin;
 import org.apache.tez.dag.api.VertexManagerPluginContext;
@@ -30,101 +28,29 @@ import org.apache.tez.dag.api.VertexManagerPluginContext.TaskWithLocationHint;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.events.VertexManagerEvent;
 
-import java.util.List;
-import java.util.Map;
+import com.google.common.collect.Lists;
 
 /**
  * Starts all tasks immediately on vertex start
  */
 public class ImmediateStartVertexManager extends VertexManagerPlugin {
 
-  private static final Log LOG = LogFactory.getLog(ImmediateStartVertexManager.class);
-
-  private final Map<String, SourceVertexInfo> srcVertexInfo = Maps.newHashMap();
-  private int managedTasks;
-  private boolean tasksScheduled = false;
-
-  class SourceVertexInfo {
-    EdgeProperty edgeProperty;
-    int numFinishedTasks;
-
-    SourceVertexInfo(EdgeProperty edgeProperty) {
-      this.edgeProperty = edgeProperty;
-    }
-  }
-
   public ImmediateStartVertexManager(VertexManagerPluginContext context) {
     super(context);
   }
 
   @Override
   public void onVertexStarted(Map<String, List<Integer>> completions) {
-    managedTasks = getContext().getVertexNumTasks(getContext().getVertexName());
-    Map<String, EdgeProperty> edges = getContext().getInputVertexEdgeProperties();
-    for (Map.Entry<String, EdgeProperty> entry : edges.entrySet()) {
-      String srcVertex = entry.getKey();
-      EdgeProperty edgeProp = entry.getValue();
-      srcVertexInfo.put(srcVertex, new SourceVertexInfo(edgeProp));
-    }
-
-    //handle completions
-    for (Map.Entry<String, List<Integer>> entry : completions.entrySet()) {
-      for (Integer task : entry.getValue()) {
-        handleSourceTaskFinished(entry.getKey(), task);
-      }
-    }
-    scheduleTasks();
-  }
-
-  private void handleSourceTaskFinished(String vertex, Integer taskId) {
-    SourceVertexInfo srcInfo = srcVertexInfo.get(vertex);
-    //Not mandatory to check for duplicate completions here
-    srcInfo.numFinishedTasks++;
-  }
-
-  private void scheduleTasks() {
-    if (!canScheduleTasks()) {
-      return;
-    }
-
-    List<TaskWithLocationHint> tasksToStart = Lists.newArrayListWithCapacity(managedTasks);
-    for (int i = 0; i < managedTasks; ++i) {
-      tasksToStart.add(new TaskWithLocationHint(new Integer(i), null));
-    }
-
-    if (!tasksToStart.isEmpty()) {
-      LOG.info("Starting " + tasksToStart.size() + " in " + getContext().getVertexName());
-      getContext().scheduleVertexTasks(tasksToStart);
+    int numTasks = getContext().getVertexNumTasks(getContext().getVertexName());
+    List<TaskWithLocationHint> scheduledTasks = Lists.newArrayListWithCapacity(numTasks);
+    for (int i=0; i<numTasks; ++i) {
+      scheduledTasks.add(new TaskWithLocationHint(new Integer(i), null));
     }
-    tasksScheduled = true;
-  }
-
-  private boolean canScheduleTasks() {
-    //Check if at least 1 task is finished from each source vertex (in case of broadcast &
-    // one-to-one or custom)
-    for (Map.Entry<String, SourceVertexInfo> entry : srcVertexInfo.entrySet()) {
-      SourceVertexInfo srcVertexInfo = entry.getValue();
-      switch(srcVertexInfo.edgeProperty.getDataMovementType()) {
-      case ONE_TO_ONE:
-      case BROADCAST:
-      case CUSTOM:
-        if (srcVertexInfo.numFinishedTasks == 0) {
-          //do not schedule tasks until a task from source task is complete
-          return false;
-        }
-      default:
-        break;
-      }
-    }
-    return true;
+    getContext().scheduleVertexTasks(scheduledTasks);
   }
 
   @Override
   public void onSourceTaskCompleted(String srcVertexName, Integer attemptId) {
-    handleSourceTaskFinished(srcVertexName, attemptId);
-    if (!tasksScheduled) {
-      scheduleTasks();
-    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tez/blob/80b5795b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/RootInputVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/RootInputVertexManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/RootInputVertexManager.java
index e850286..e6ffdc5 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/RootInputVertexManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/RootInputVertexManager.java
@@ -18,23 +18,27 @@
 
 package org.apache.tez.dag.app.dag.impl;
 
+import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
 import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.dag.api.VertexManagerPlugin;
 import org.apache.tez.dag.api.VertexManagerPluginContext;
+import org.apache.tez.dag.api.VertexManagerPluginContext.TaskWithLocationHint;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.InputSpecUpdate;
 import org.apache.tez.runtime.api.events.InputConfigureVertexTasksEvent;
 import org.apache.tez.runtime.api.events.InputDataInformationEvent;
 import org.apache.tez.runtime.api.events.InputUpdatePayloadEvent;
+import org.apache.tez.runtime.api.events.VertexManagerEvent;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 
-public class RootInputVertexManager extends ImmediateStartVertexManager {
+public class RootInputVertexManager extends VertexManagerPlugin {
 
   private String configuredInputName;
 
@@ -42,6 +46,27 @@ public class RootInputVertexManager extends ImmediateStartVertexManager {
     super(context);
   }
 
+  @Override
+  public void initialize() {
+  }
+
+  @Override
+  public void onVertexStarted(Map<String, List<Integer>> completions) {
+    int numTasks = getContext().getVertexNumTasks(getContext().getVertexName());
+    List<TaskWithLocationHint> scheduledTasks = Lists.newArrayListWithCapacity(numTasks);
+    for (int i=0; i<numTasks; ++i) {
+      scheduledTasks.add(new TaskWithLocationHint(new Integer(i), null));
+    }
+    getContext().scheduleVertexTasks(scheduledTasks);
+  }
+
+  @Override
+  public void onSourceTaskCompleted(String srcVertexName, Integer attemptId) {
+  }
+
+  @Override
+  public void onVertexManagerEventReceived(VertexManagerEvent vmEvent) {
+  }
 
   @Override
   public void onRootVertexInitialized(String inputName, InputDescriptor inputDescriptor,

http://git-wip-us.apache.org/repos/asf/tez/blob/80b5795b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index 04e2219..d894928 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -20,7 +20,6 @@ package org.apache.tez.dag.app.dag.impl;
 
 import java.nio.ByteBuffer;
 
-import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyInt;
 import static org.mockito.Mockito.doReturn;
@@ -1880,39 +1879,39 @@ public class TestVertexImpl {
     Assert.assertEquals(2, v3.getOutputVerticesCount());
     Assert.assertEquals(2, v3.getOutputVerticesCount());
 
-    assertTrue("vertex1".equals(v3.getInputSpecList(0).get(0)
+    Assert.assertTrue("vertex1".equals(v3.getInputSpecList(0).get(0)
         .getSourceVertexName())
         || "vertex2".equals(v3.getInputSpecList(0).get(0)
-        .getSourceVertexName()));
-    assertTrue("vertex1".equals(v3.getInputSpecList(0).get(1)
+            .getSourceVertexName()));
+    Assert.assertTrue("vertex1".equals(v3.getInputSpecList(0).get(1)
         .getSourceVertexName())
         || "vertex2".equals(v3.getInputSpecList(0).get(1)
-        .getSourceVertexName()));
-    assertTrue("i3_v1".equals(v3.getInputSpecList(0).get(0)
+            .getSourceVertexName()));
+    Assert.assertTrue("i3_v1".equals(v3.getInputSpecList(0).get(0)
         .getInputDescriptor().getClassName())
         || "i3_v2".equals(v3.getInputSpecList(0).get(0)
-        .getInputDescriptor().getClassName()));
-    assertTrue("i3_v1".equals(v3.getInputSpecList(0).get(1)
+            .getInputDescriptor().getClassName()));
+    Assert.assertTrue("i3_v1".equals(v3.getInputSpecList(0).get(1)
         .getInputDescriptor().getClassName())
         || "i3_v2".equals(v3.getInputSpecList(0).get(1)
-        .getInputDescriptor().getClassName()));
+            .getInputDescriptor().getClassName()));
 
-    assertTrue("vertex4".equals(v3.getOutputSpecList(0).get(0)
+    Assert.assertTrue("vertex4".equals(v3.getOutputSpecList(0).get(0)
         .getDestinationVertexName())
         || "vertex5".equals(v3.getOutputSpecList(0).get(0)
-        .getDestinationVertexName()));
-    assertTrue("vertex4".equals(v3.getOutputSpecList(0).get(1)
+            .getDestinationVertexName()));
+    Assert.assertTrue("vertex4".equals(v3.getOutputSpecList(0).get(1)
         .getDestinationVertexName())
         || "vertex5".equals(v3.getOutputSpecList(0).get(1)
-        .getDestinationVertexName()));
-    assertTrue("o3_v4".equals(v3.getOutputSpecList(0).get(0)
+            .getDestinationVertexName()));
+    Assert.assertTrue("o3_v4".equals(v3.getOutputSpecList(0).get(0)
         .getOutputDescriptor().getClassName())
         || "o3_v5".equals(v3.getOutputSpecList(0).get(0)
-        .getOutputDescriptor().getClassName()));
-    assertTrue("o3_v4".equals(v3.getOutputSpecList(0).get(1)
+            .getOutputDescriptor().getClassName()));
+    Assert.assertTrue("o3_v4".equals(v3.getOutputSpecList(0).get(1)
         .getOutputDescriptor().getClassName())
         || "o3_v5".equals(v3.getOutputSpecList(0).get(1)
-        .getOutputDescriptor().getClassName()));
+            .getOutputDescriptor().getClassName()));
   }
 
   @Test(timeout = 5000)
@@ -1941,13 +1940,13 @@ public class TestVertexImpl {
     Map<String, EdgeManagerPluginDescriptor> edgeManagerDescriptors =
         Collections.singletonMap(
        v1.getName(), mockEdgeManagerDescriptor);
-    assertTrue(v3.setParallelism(1, null, edgeManagerDescriptors, null));
-    assertTrue(v3.sourceVertices.get(v1).getEdgeManager() instanceof
+    Assert.assertTrue(v3.setParallelism(1, null, edgeManagerDescriptors, null));
+    Assert.assertTrue(v3.sourceVertices.get(v1).getEdgeManager() instanceof
         EdgeManagerForTest);
     Assert.assertEquals(1, v3.getTotalTasks());
     Assert.assertEquals(1, tasks.size());
     // the last one is removed
-    assertTrue(tasks.keySet().iterator().next().equals(firstTask));
+    Assert.assertTrue(tasks.keySet().iterator().next().equals(firstTask));
 
   }
   
@@ -1994,7 +1993,7 @@ public class TestVertexImpl {
     Edge edge = edges.get("e4");
     EdgeManagerPlugin em = edge.getEdgeManager();
     EdgeManagerForTest originalEm = (EdgeManagerForTest) em;
-    assertTrue(Arrays.equals(edgePayload, originalEm.getEdgeManagerContext()
+    Assert.assertTrue(Arrays.equals(edgePayload, originalEm.getEdgeManagerContext()
         .getUserPayload().deepCopyAsArray()));
 
     UserPayload userPayload = UserPayload.create(ByteBuffer.wrap(new String("foo").getBytes()));
@@ -2008,7 +2007,7 @@ public class TestVertexImpl {
 
     Map<String, EdgeManagerPluginDescriptor> edgeManagerDescriptors =
         Collections.singletonMap(v3.getName(), edgeManagerDescriptor);
-    assertTrue(v5.setParallelism(v5.getTotalTasks() - 1, null,
+    Assert.assertTrue(v5.setParallelism(v5.getTotalTasks() - 1, null,
         edgeManagerDescriptors, null)); // Must decrease.
 
     VertexImpl v5Impl = (VertexImpl) v5;
@@ -2016,10 +2015,10 @@ public class TestVertexImpl {
     EdgeManagerPlugin modifiedEdgeManager = v5Impl.sourceVertices.get(v3)
         .getEdgeManager();
     Assert.assertNotNull(modifiedEdgeManager);
-    assertTrue(modifiedEdgeManager instanceof EdgeManagerForTest);
+    Assert.assertTrue(modifiedEdgeManager instanceof EdgeManagerForTest);
 
     // Ensure initialize() is called with the correct payload
-    assertTrue(Arrays.equals(userPayload.deepCopyAsArray(),
+    Assert.assertTrue(Arrays.equals(userPayload.deepCopyAsArray(),
         ((EdgeManagerForTest) modifiedEdgeManager).getUserPayload().deepCopyAsArray()));
   }
 
@@ -2093,7 +2092,7 @@ public class TestVertexImpl {
     Assert.assertEquals(VertexTerminationCause.OWN_TASK_FAILURE, v.getTerminationCause());
     String diagnostics =
         StringUtils.join(v.getDiagnostics(), ",").toLowerCase();
-    assertTrue(diagnostics.contains("task failed"
+    Assert.assertTrue(diagnostics.contains("task failed"
         + ", taskid=" + t1.toString()));
   }
 
@@ -2105,7 +2104,7 @@ public class TestVertexImpl {
     String diagnostics =
         StringUtils.join(v2.getDiagnostics(), ",").toLowerCase();
     LOG.info("diagnostics v2: " + diagnostics);
-    assertTrue(diagnostics.contains(
+    Assert.assertTrue(diagnostics.contains(
         "vertex received kill in inited state"));
   }
 
@@ -2119,7 +2118,7 @@ public class TestVertexImpl {
     killVertex(v3);
     String diagnostics =
         StringUtils.join(v3.getDiagnostics(), ",").toLowerCase();
-    assertTrue(diagnostics.contains(
+    Assert.assertTrue(diagnostics.contains(
         "vertex received kill while in running state"));
   }
 
@@ -2201,7 +2200,7 @@ public class TestVertexImpl {
     Assert.assertNull(v2.getOutputCommitter("output"));
 
     VertexImpl v6 = vertices.get("vertex6");
-    assertTrue(v6.getOutputCommitter("outputx")
+    Assert.assertTrue(v6.getOutputCommitter("outputx")
         instanceof CountingOutputCommitter);
   }
 
@@ -2209,11 +2208,11 @@ public class TestVertexImpl {
   public void testVertexManagerInit() {
     initAllVertices(VertexState.INITED);
     VertexImpl v2 = vertices.get("vertex2");
-    assertTrue(v2.getVertexManager().getPlugin()
+    Assert.assertTrue(v2.getVertexManager().getPlugin()
         instanceof ImmediateStartVertexManager);
 
     VertexImpl v6 = vertices.get("vertex6");
-    assertTrue(v6.getVertexManager().getPlugin()
+    Assert.assertTrue(v6.getVertexManager().getPlugin()
         instanceof ShuffleVertexManager);
   }
 
@@ -3021,306 +3020,6 @@ public class TestVertexImpl {
     }
   }
 
-  @Test(timeout = 5000)
-  /**
-   * Ref: TEZ-1494
-   * If broadcast, one-to-one or custom edges are present in source, tasks should not start until
-   * 1 task from each source vertex is complete.
-   */
-  public void testTaskSchedulingWithCustomEdges() {
-    setupPreDagCreation();
-    dagPlan = createCustomDAGWithCustomEdges();
-    setupPostDagCreation();
-
-    /**
-     *
-     *   M2 --(SG)--> R3 --(B)--\
-     *                           \
-     *   M7 --(B)---------------->M5 ---(SG)--> R6
-     *                            /
-     *   M8---(C)--------------->/
-     */
-
-    //init M2
-    VertexImpl m2 = vertices.get("M2");
-    VertexImpl m7 = vertices.get("M7");
-    VertexImpl r3 = vertices.get("R3");
-    VertexImpl m5 = vertices.get("M5");
-    VertexImpl m8 = vertices.get("M8");
-
-    initVertex(m2);
-    initVertex(m7);
-    initVertex(m8);
-    assertTrue(m7.getState().equals(VertexState.INITED));
-    assertTrue(m5.getState().equals(VertexState.INITED));
-    assertTrue(m8.getState().equals(VertexState.INITED));
-    assertTrue(m7.getVertexManager().getPlugin() instanceof ImmediateStartVertexManager);
-
-    //Start M2; Let tasks complete in M2; Also let 1 task complete in R3
-    dispatcher.getEventHandler().handle(new VertexEvent(m2.getVertexId(), VertexEventType.V_START));
-    dispatcher.await();
-    VertexEventTaskAttemptCompleted taskAttemptCompleted = new VertexEventTaskAttemptCompleted
-        (TezTaskAttemptID.getInstance(TezTaskID.getInstance(m2.getVertexId(),0), 0), TaskAttemptStateInternal.SUCCEEDED);
-    VertexEventTaskCompleted taskCompleted = new VertexEventTaskCompleted(TezTaskID.getInstance(m2
-        .getVertexId(), 0), TaskState.SUCCEEDED);
-    dispatcher.getEventHandler().handle(taskAttemptCompleted);
-    dispatcher.getEventHandler().handle(taskCompleted);
-    dispatcher.await();
-    taskAttemptCompleted = new VertexEventTaskAttemptCompleted
-        (TezTaskAttemptID.getInstance(TezTaskID.getInstance(r3.getVertexId(),0), 0),
-            TaskAttemptStateInternal.SUCCEEDED);
-    taskCompleted = new VertexEventTaskCompleted(TezTaskID.getInstance(r3
-        .getVertexId(), 0), TaskState.SUCCEEDED);
-    dispatcher.getEventHandler().handle(taskAttemptCompleted);
-    dispatcher.getEventHandler().handle(taskCompleted);
-    dispatcher.await();
-    assertTrue(m2.getState().equals(VertexState.SUCCEEDED));
-    assertTrue(m5.numSuccessSourceAttemptCompletions == 1);
-    assertTrue(m5.getState().equals(VertexState.INITED));
-
-    //R3 should be in running state as it has one task completed, and rest are pending
-    assertTrue(r3.getState().equals(VertexState.RUNNING));
-
-    //Let us start M7; M5 should start not start as it is dependent on M8 as well
-    dispatcher.getEventHandler().handle(new VertexEvent(m7.getVertexId(),VertexEventType.V_START));
-    dispatcher.await();
-    //Let one of the tasks get over in M7 as well.
-    taskAttemptCompleted = new VertexEventTaskAttemptCompleted
-        (TezTaskAttemptID.getInstance(TezTaskID.getInstance(m7.getVertexId(),0), 0),
-            TaskAttemptStateInternal.SUCCEEDED);
-    taskCompleted = new VertexEventTaskCompleted(TezTaskID.getInstance(m7
-        .getVertexId(), 0), TaskState.SUCCEEDED);
-    dispatcher.getEventHandler().handle(taskAttemptCompleted);
-    dispatcher.getEventHandler().handle(taskCompleted);
-    dispatcher.await();
-    assertTrue(m5.numSuccessSourceAttemptCompletions == 2);
-
-    //M5 should be in INITED state, as it depends on M8
-    assertTrue(m5.getState().equals(VertexState.INITED));
-    for(Task task : m5.getTasks().values()) {
-      assertTrue(task.getState().equals(TaskState.NEW));
-    }
-
-    //Let us start M8; M5 should start now
-    dispatcher.getEventHandler().handle(new VertexEvent(m8.getVertexId(),VertexEventType.V_START));
-    dispatcher.await();
-
-    //M5 in running state. But tasks should not be scheduled until M8 finishes a task.
-    assertTrue(m5.getState().equals(VertexState.RUNNING));
-    for(Task task : m5.getTasks().values()) {
-      assertTrue(task.getState().equals(TaskState.NEW));
-    }
-
-    //Let one of the tasks get over in M8 as well. This should trigger tasks to be scheduled in M5
-    taskAttemptCompleted = new VertexEventTaskAttemptCompleted
-        (TezTaskAttemptID.getInstance(TezTaskID.getInstance(m8.getVertexId(),0), 0),
-            TaskAttemptStateInternal.SUCCEEDED);
-    taskCompleted = new VertexEventTaskCompleted(TezTaskID.getInstance(m8
-        .getVertexId(), 0), TaskState.SUCCEEDED);
-    dispatcher.getEventHandler().handle(taskAttemptCompleted);
-    dispatcher.getEventHandler().handle(taskCompleted);
-    dispatcher.await();
-
-    assertTrue(m5.numSuccessSourceAttemptCompletions == 3);
-    //Ensure all tasks in M5 are in scheduled state
-    for(Task task : m5.getTasks().values()) {
-      assertTrue(task.getState().equals(TaskState.SCHEDULED));
-    }
-  }
-
-  //For TEZ-1494
-  private DAGPlan createCustomDAGWithCustomEdges() {
-    /**
-     *
-     *   M2 --(SG)--> R3 --(B)--\
-     *                           \
-     *   M7 --(B)---------------->M5 ---(SG)--> R6
-     *                            /
-     *   M8---(C)--------------->/
-     */
-    DAGPlan dag = DAGPlan.newBuilder().setName("TestSamplerDAG")
-        .addVertex(VertexPlan.newBuilder()
-                .setName("M2")
-                .setProcessorDescriptor(
-                    TezEntityDescriptorProto.newBuilder().setClassName("M2.class"))
-                .setType(PlanVertexType.NORMAL)
-                .addTaskLocationHint(
-                    PlanTaskLocationHint.newBuilder().addHost("host1").addRack("rack1").build())
-                .setTaskConfig(PlanTaskConfiguration.newBuilder()
-                        .setNumTasks(1)
-                        .setVirtualCores(4)
-                        .setMemoryMb(1024)
-                        .setJavaOpts("")
-                        .setTaskModule("M2.class")
-                        .build()
-                )
-                .addOutEdgeId("M2_R3")
-                .build()
-        )
-        .addVertex(VertexPlan.newBuilder()
-                .setName("M8")
-                .setProcessorDescriptor(
-                    TezEntityDescriptorProto.newBuilder().setClassName("M8.class"))
-                .setType(PlanVertexType.NORMAL)
-                .addTaskLocationHint(
-                    PlanTaskLocationHint.newBuilder().addHost("host1").addRack("rack1").build())
-                .setTaskConfig(PlanTaskConfiguration.newBuilder()
-                        .setNumTasks(1)
-                        .setVirtualCores(4)
-                        .setMemoryMb(1024)
-                        .setJavaOpts("")
-                        .setTaskModule("M8.class")
-                        .build()
-                )
-                .addOutEdgeId("M8_M5")
-                .build()
-        )
-         .addVertex(VertexPlan.newBuilder()
-                 .setName("R3")
-                 .setProcessorDescriptor(
-                     TezEntityDescriptorProto.newBuilder().setClassName("M2.class"))
-                 .setType(PlanVertexType.NORMAL)
-                 .addTaskLocationHint(
-                     PlanTaskLocationHint.newBuilder().addHost("host2").addRack("rack1").build())
-                 .setTaskConfig(PlanTaskConfiguration.newBuilder()
-                         .setNumTasks(10)
-                         .setVirtualCores(4)
-                         .setMemoryMb(1024)
-                         .setJavaOpts("")
-                         .setTaskModule("R3.class")
-                         .build()
-                 )
-                 .addInEdgeId("M2_R3")
-                 .addOutEdgeId("R3_M5")
-                 .build()
-         )
-        .addVertex(VertexPlan.newBuilder()
-                .setName("M5")
-                .setProcessorDescriptor(
-                    TezEntityDescriptorProto.newBuilder().setClassName("M5.class"))
-                .setType(PlanVertexType.NORMAL)
-                .addTaskLocationHint(
-                    PlanTaskLocationHint.newBuilder().addHost("host3").addRack("rack1").build())
-                .setTaskConfig(PlanTaskConfiguration.newBuilder()
-                        .setNumTasks(10)
-                        .setVirtualCores(4)
-                        .setMemoryMb(1024)
-                        .setJavaOpts("")
-                        .setTaskModule("M5.class")
-                        .build()
-                )
-                .addInEdgeId("R3_M5")
-                .addInEdgeId("M7_M5")
-                .addInEdgeId("M8_M5")
-                .addOutEdgeId("M5_R6")
-                .build()
-        )
-        .addVertex(VertexPlan.newBuilder()
-                .setName("M7")
-                .setProcessorDescriptor(
-                    TezEntityDescriptorProto.newBuilder().setClassName("M7.class"))
-                .setType(PlanVertexType.NORMAL)
-                .addTaskLocationHint(
-                    PlanTaskLocationHint.newBuilder().addHost("host4").addRack("rack1").build())
-                .setTaskConfig(PlanTaskConfiguration.newBuilder()
-                        .setNumTasks(10)
-                        .setVirtualCores(4)
-                        .setMemoryMb(1024)
-                        .setJavaOpts("")
-                        .setTaskModule("M7.class")
-                        .build()
-                )
-                .addOutEdgeId("M7_M5")
-                .build()
-        )
-        .addVertex(VertexPlan.newBuilder()
-                .setName("R6")
-                .setProcessorDescriptor(
-                    TezEntityDescriptorProto.newBuilder().setClassName("R6.class"))
-                .setType(PlanVertexType.NORMAL)
-                .addTaskLocationHint(
-                    PlanTaskLocationHint.newBuilder().addHost("host3").addRack("rack1").build())
-                .setTaskConfig(PlanTaskConfiguration.newBuilder()
-                        .setNumTasks(1)
-                        .setVirtualCores(4)
-                        .setMemoryMb(1024)
-                        .setJavaOpts("")
-                        .setTaskModule("R6.class")
-                        .build()
-                )
-                .addInEdgeId("M5_R6")
-                .build()
-        )
-        .addEdge(
-            EdgePlan.newBuilder()
-                .setEdgeDestination(TezEntityDescriptorProto.newBuilder().setClassName("M2_R3"))
-                .setInputVertexName("M2")
-                .setEdgeSource(TezEntityDescriptorProto.newBuilder().setClassName("M2_R3.class"))
-                .setOutputVertexName("R3")
-                .setDataMovementType(PlanEdgeDataMovementType.SCATTER_GATHER)
-                .setId("M2_R3")
-                .setDataSourceType(PlanEdgeDataSourceType.PERSISTED)
-                .setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL)
-                .build()
-        )
-        .addEdge(
-            EdgePlan.newBuilder()
-                .setEdgeDestination(TezEntityDescriptorProto.newBuilder().setClassName("R3_M5"))
-                .setInputVertexName("R3")
-                .setEdgeSource(TezEntityDescriptorProto.newBuilder().setClassName("R3_M5.class"))
-                .setOutputVertexName("M5")
-                .setDataMovementType(PlanEdgeDataMovementType.BROADCAST)
-                .setId("R3_M5")
-                .setDataSourceType(PlanEdgeDataSourceType.PERSISTED)
-                .setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL)
-                .build()
-        )
-        .addEdge(
-            EdgePlan.newBuilder()
-                .setEdgeDestination(TezEntityDescriptorProto.newBuilder().setClassName("M7_M5"))
-                .setInputVertexName("M7")
-                .setEdgeSource(TezEntityDescriptorProto.newBuilder().setClassName("M7_M5.class"))
-                .setOutputVertexName("M5")
-                .setDataMovementType(PlanEdgeDataMovementType.BROADCAST)
-                .setId("M7_M5")
-                .setDataSourceType(PlanEdgeDataSourceType.PERSISTED)
-                .setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL)
-                .build()
-        )
-        .addEdge(
-            EdgePlan.newBuilder()
-                .setEdgeDestination(TezEntityDescriptorProto.newBuilder().setClassName("M5_R6"))
-                .setInputVertexName("M5")
-                .setEdgeSource(TezEntityDescriptorProto.newBuilder().setClassName("M5_R6.class"))
-                .setOutputVertexName("R6")
-                .setDataMovementType(PlanEdgeDataMovementType.SCATTER_GATHER)
-                .setId("M5_R6")
-                .setDataSourceType(PlanEdgeDataSourceType.PERSISTED)
-                .setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL)
-                .build()
-        )
-        .addEdge(
-            EdgePlan.newBuilder()
-                .setEdgeDestination(TezEntityDescriptorProto.newBuilder().setClassName("M8_M5"))
-                .setInputVertexName("M8")
-                .setEdgeSource(TezEntityDescriptorProto.newBuilder().setClassName("M8_M5.class"))
-                .setEdgeManager(
-                    TezEntityDescriptorProto.newBuilder()
-                        .setClassName(EdgeManagerForTest.class.getName())
-                        .setUserPayload(ByteString.copyFrom(edgePayload))
-                        .build())
-                .setOutputVertexName("M5")
-                .setDataMovementType(PlanEdgeDataMovementType.CUSTOM)
-                .setId("M8_M5")
-                .setDataSourceType(PlanEdgeDataSourceType.PERSISTED)
-                .setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL)
-                .build()
-        )
-        .build();
-
-    return dag;
-  }
-
   @SuppressWarnings("unchecked")
   @Test(timeout = 5000)
   public void testVertexWithInitializerSuccess() {
@@ -3516,7 +3215,6 @@ public class TestVertexImpl {
           dispatcher.getEventHandler(), taskAttemptListener,
           clock, thh, true, appContext, vertexLocationHint, null, taskSpecificLaunchCmdOption,
           updateTracker);
-      v.setInputVertices(new HashMap());
       vertexIdMap.put(vId, v);
       vertices.put(v.getName(), v);
       v.handle(new VertexEvent(vId, VertexEventType.V_INIT));
@@ -3756,8 +3454,8 @@ public class TestVertexImpl {
     List<GroupInputSpec> groupInSpec = vC.getGroupInputSpecList(0);
     Assert.assertEquals(1, groupInSpec.size());
     Assert.assertEquals("Group", groupInSpec.get(0).getGroupName());
-    assertTrue(groupInSpec.get(0).getGroupVertices().contains("A"));
-    assertTrue(groupInSpec.get(0).getGroupVertices().contains("B"));
+    Assert.assertTrue(groupInSpec.get(0).getGroupVertices().contains("A"));
+    Assert.assertTrue(groupInSpec.get(0).getGroupVertices().contains("B"));
     groupInSpec.get(0).getMergedInputDescriptor().getClassName().equals("Group.class");
   }