You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2016/06/14 02:53:03 UTC

tez git commit: TEZ-3294. DAG.createDag() does not clear local state on repeat calls. (Harish Jaiprakash via hitesh)

Repository: tez
Updated Branches:
  refs/heads/branch-0.8 087bf0227 -> 9beba6c3b


TEZ-3294. DAG.createDag() does not clear local state on repeat calls. (Harish Jaiprakash via hitesh)

(cherry picked from commit 7f699f18ff94ca2b44a0eda650f31e1054c79ade)

Conflicts:
	CHANGES.txt


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/9beba6c3
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/9beba6c3
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/9beba6c3

Branch: refs/heads/branch-0.8
Commit: 9beba6c3b2a4312bcd64faf37e333f65fd189082
Parents: 087bf02
Author: Hitesh Shah <hi...@apache.org>
Authored: Mon Jun 13 19:27:49 2016 -0700
Committer: Hitesh Shah <hi...@apache.org>
Committed: Mon Jun 13 19:29:00 2016 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../main/java/org/apache/tez/dag/api/DAG.java   | 33 ++++++++++++--------
 .../org/apache/tez/client/TestTezClient.java    | 28 +++++++++++++++++
 .../java/org/apache/tez/dag/api/TestDAG.java    | 28 +++++++++++++++++
 4 files changed, 77 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/9beba6c3/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index effe0e0..210bf6e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
 
 ALL CHANGES:
 
+  TEZ-3294. DAG.createDag() does not clear local state on repeat calls.
   TEZ-3297. Deadlock scenario in AM during ShuffleVertexManager auto reduce.
   TEZ-3296. Tez fails to compile against hadoop 2.8 after MAPREDUCE-5870
   TEZ-3290. Set full task attempt id string in MRInput configuration object.

http://git-wip-us.apache.org/repos/asf/tez/blob/9beba6c3/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
index 902ff21..0eb51e1 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
@@ -21,9 +21,11 @@ import java.net.URI;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Deque;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -99,8 +101,6 @@ public class DAG {
   private Map<String,String> dagConf = new HashMap<String, String>();
   private VertexExecutionContext defaultExecutionContext;
 
-  private Stack<String> topologicalVertexStack = new Stack<String>();
-
   private DAG(String name) {
     this.name = name;
   }
@@ -552,7 +552,7 @@ public class DAG {
   }
 
   @VisibleForTesting
-  void verify(boolean restricted) throws IllegalStateException {
+  Deque<String> verify(boolean restricted) throws IllegalStateException {
     if (vertices.isEmpty()) {
       throw new IllegalStateException("Invalid dag containing 0 vertices");
     }
@@ -655,8 +655,8 @@ public class DAG {
     // When additional inputs are supported, this can be chceked easily (and early)
     // within the addInput / addOutput call itself.
 
-    detectCycles(edgeMap, vertexMap);
-    
+    Deque<String> topologicalVertexStack = detectCycles(edgeMap, vertexMap);
+
     checkAndInferOneToOneParallelism();
 
     if (restricted) {
@@ -673,29 +673,36 @@ public class DAG {
         }
       }
     }
+
+    return topologicalVertexStack;
   }
 
   // Adaptation of Tarjan's algorithm for connected components.
   // http://en.wikipedia.org/wiki/Tarjan%27s_strongly_connected_components_algorithm
-  private void detectCycles(Map<Vertex, List<Edge>> edgeMap, Map<String, AnnotatedVertex> vertexMap)
+  private Deque<String> detectCycles(Map<Vertex, List<Edge>> edgeMap,
+      Map<String, AnnotatedVertex> vertexMap)
     throws IllegalStateException {
+    Deque<String> topologicalVertexStack = new LinkedList<String>();
     Integer nextIndex = 0; // boxed integer so it is passed by reference.
     Stack<AnnotatedVertex> stack = new Stack<DAG.AnnotatedVertex>();
     for (AnnotatedVertex av : vertexMap.values()) {
       if (av.index == -1) {
         assert stack.empty();
-        strongConnect(av, vertexMap, edgeMap, stack, nextIndex);
+        strongConnect(av, vertexMap, edgeMap, stack, nextIndex, topologicalVertexStack);
       }
     }
+    return topologicalVertexStack;
   }
 
   // part of Tarjan's algorithm for connected components.
   // http://en.wikipedia.org/wiki/Tarjan%27s_strongly_connected_components_algorithm
   private void strongConnect(
-    AnnotatedVertex av,
-    Map<String, AnnotatedVertex> vertexMap,
-    Map<Vertex, List<Edge>> edgeMap,
-    Stack<AnnotatedVertex> stack, Integer nextIndex) throws IllegalStateException {
+      AnnotatedVertex av,
+      Map<String, AnnotatedVertex> vertexMap,
+      Map<Vertex, List<Edge>> edgeMap,
+      Stack<AnnotatedVertex> stack,
+      Integer nextIndex,
+      Deque<String> topologicalVertexStack) throws IllegalStateException {
     av.index = nextIndex;
     av.lowlink = nextIndex;
     nextIndex++;
@@ -707,7 +714,7 @@ public class DAG {
       for (Edge e : edgeMap.get(av.v)) {
         AnnotatedVertex outVertex = vertexMap.get(e.getOutputVertex().getName());
         if (outVertex.index == -1) {
-          strongConnect(outVertex, vertexMap, edgeMap, stack, nextIndex);
+          strongConnect(outVertex, vertexMap, edgeMap, stack, nextIndex, topologicalVertexStack);
           av.lowlink = Math.min(av.lowlink, outVertex.lowlink);
         } else if (outVertex.onstack) {
           // strongly connected component detected, but we will wait till later so that the full cycle can be displayed.
@@ -760,7 +767,7 @@ public class DAG {
       Map<String, LocalResource> tezJarResources, LocalResource binaryConfig,
       boolean tezLrsAsArchive, Map<String, String> additionalConfigs,
       ServicePluginsDescriptor servicePluginsDescriptor, JavaOptsChecker javaOptsChecker) {
-    verify(true);
+    Deque<String> topologicalVertexStack = verify(true);
 
     DAGPlan.Builder dagBuilder = DAGPlan.newBuilder();
     dagBuilder.setName(this.name);

http://git-wip-us.apache.org/repos/asf/tez/blob/9beba6c3/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java b/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java
index 42b762c..d49ba48 100644
--- a/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java
+++ b/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java
@@ -77,6 +77,7 @@ import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezConfigurationConstants;
 import org.apache.tez.dag.api.TezConstants;
 import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.dag.api.client.DAGClient;
@@ -745,4 +746,31 @@ public class TestTezClient {
       Assert.fail("Failed to retrieve local host information");
     }
   }
+
+  @Test(timeout = 5000)
+  public void testClientResubmit() throws Exception {
+    TezClientForTest client = configureAndCreateTezClient(null, true, null);
+    client.start();
+    Map<String, LocalResource> lrDAG = Collections.singletonMap("LR1",
+        LocalResource.newInstance(
+            URL.newInstance("file", "localhost", 0, "/test1"),
+            LocalResourceType.FILE,
+            LocalResourceVisibility.PUBLIC, 1, 1));
+    Vertex vertex1 = Vertex.create("Vertex1", ProcessorDescriptor.create("P1"), 1,
+        Resource.newInstance(1, 1));
+    vertex1.setTaskLaunchCmdOpts("-XX:+UseParallelGC -XX:+UseG1GC");
+    Vertex vertex2 = Vertex.create("Vertex2", ProcessorDescriptor.create("P2"), 1,
+        Resource.newInstance(1, 1));
+    vertex2.setTaskLaunchCmdOpts("-XX:+UseParallelGC -XX:+UseG1GC");
+    DAG dag = DAG.create("DAG").addVertex(vertex1).addVertex(vertex2).addTaskLocalFiles(lrDAG);
+    for (int i = 0; i < 3; ++i) {
+      try {
+        client.submitDAG(dag);
+        Assert.fail("Expected TezUncheckedException here.");
+      } catch(TezUncheckedException ex) {
+        Assert.assertTrue(ex.getMessage().contains("Invalid/conflicting GC options found"));
+      }
+    }
+    client.stop();
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/9beba6c3/tez-api/src/test/java/org/apache/tez/dag/api/TestDAG.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAG.java b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAG.java
index 839e780..ae5dfbb 100644
--- a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAG.java
+++ b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAG.java
@@ -18,11 +18,19 @@
 
 package org.apache.tez.dag.api;
 
+import java.util.Collections;
+import java.util.Map;
+
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.URL;
 import org.apache.tez.client.CallerContext;
 import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
 import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
 import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
+import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -333,4 +341,24 @@ public class TestDAG {
 
   }
 
+  @Test
+  public void testRecreateDAG() {
+    Map<String, LocalResource> lrDAG = Collections.singletonMap("LR1",
+        LocalResource.newInstance(
+            URL.newInstance("file", "localhost", 0, "/test1"),
+            LocalResourceType.FILE,
+            LocalResourceVisibility.PUBLIC, 1, 1));
+    Vertex v1 = Vertex.create("v1", ProcessorDescriptor.create("dummyProcessor1"), 1,
+        Resource.newInstance(1, 1));
+    Vertex v2 = Vertex.create("v2", ProcessorDescriptor.create("dummyProcessor2"), 1,
+        Resource.newInstance(1, 1));
+    DAG dag = DAG.create("dag1").addVertex(v1).addVertex(v2).addTaskLocalFiles(lrDAG);
+
+    TezConfiguration tezConf = new TezConfiguration();
+    DAGPlan firstPlan = dag.createDag(tezConf, null, null, null, false);
+    for (int i = 0; i < 3; ++i) {
+        DAGPlan dagPlan = dag.createDag(tezConf, null, null, null, false);
+        Assert.assertEquals(dagPlan, firstPlan);
+    }
+  }
 }