You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2016/06/14 02:53:03 UTC
tez git commit: TEZ-3294. DAG.createDag() does not clear local state
on repeat calls. (Harish Jaiprakash via hitesh)
Repository: tez
Updated Branches:
refs/heads/branch-0.8 087bf0227 -> 9beba6c3b
TEZ-3294. DAG.createDag() does not clear local state on repeat calls. (Harish Jaiprakash via hitesh)
(cherry picked from commit 7f699f18ff94ca2b44a0eda650f31e1054c79ade)
Conflicts:
CHANGES.txt
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/9beba6c3
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/9beba6c3
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/9beba6c3
Branch: refs/heads/branch-0.8
Commit: 9beba6c3b2a4312bcd64faf37e333f65fd189082
Parents: 087bf02
Author: Hitesh Shah <hi...@apache.org>
Authored: Mon Jun 13 19:27:49 2016 -0700
Committer: Hitesh Shah <hi...@apache.org>
Committed: Mon Jun 13 19:29:00 2016 -0700
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../main/java/org/apache/tez/dag/api/DAG.java | 33 ++++++++++++--------
.../org/apache/tez/client/TestTezClient.java | 28 +++++++++++++++++
.../java/org/apache/tez/dag/api/TestDAG.java | 28 +++++++++++++++++
4 files changed, 77 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/9beba6c3/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index effe0e0..210bf6e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-3294. DAG.createDag() does not clear local state on repeat calls.
TEZ-3297. Deadlock scenario in AM during ShuffleVertexManager auto reduce.
TEZ-3296. Tez fails to compile against hadoop 2.8 after MAPREDUCE-5870
TEZ-3290. Set full task attempt id string in MRInput configuration object.
http://git-wip-us.apache.org/repos/asf/tez/blob/9beba6c3/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
index 902ff21..0eb51e1 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
@@ -21,9 +21,11 @@ import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.Deque;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -99,8 +101,6 @@ public class DAG {
private Map<String,String> dagConf = new HashMap<String, String>();
private VertexExecutionContext defaultExecutionContext;
- private Stack<String> topologicalVertexStack = new Stack<String>();
-
private DAG(String name) {
this.name = name;
}
@@ -552,7 +552,7 @@ public class DAG {
}
@VisibleForTesting
- void verify(boolean restricted) throws IllegalStateException {
+ Deque<String> verify(boolean restricted) throws IllegalStateException {
if (vertices.isEmpty()) {
throw new IllegalStateException("Invalid dag containing 0 vertices");
}
@@ -655,8 +655,8 @@ public class DAG {
// When additional inputs are supported, this can be chceked easily (and early)
// within the addInput / addOutput call itself.
- detectCycles(edgeMap, vertexMap);
-
+ Deque<String> topologicalVertexStack = detectCycles(edgeMap, vertexMap);
+
checkAndInferOneToOneParallelism();
if (restricted) {
@@ -673,29 +673,36 @@ public class DAG {
}
}
}
+
+ return topologicalVertexStack;
}
// Adaptation of Tarjan's algorithm for connected components.
// http://en.wikipedia.org/wiki/Tarjan%27s_strongly_connected_components_algorithm
- private void detectCycles(Map<Vertex, List<Edge>> edgeMap, Map<String, AnnotatedVertex> vertexMap)
+ private Deque<String> detectCycles(Map<Vertex, List<Edge>> edgeMap,
+ Map<String, AnnotatedVertex> vertexMap)
throws IllegalStateException {
+ Deque<String> topologicalVertexStack = new LinkedList<String>();
Integer nextIndex = 0; // boxed integer so it is passed by reference.
Stack<AnnotatedVertex> stack = new Stack<DAG.AnnotatedVertex>();
for (AnnotatedVertex av : vertexMap.values()) {
if (av.index == -1) {
assert stack.empty();
- strongConnect(av, vertexMap, edgeMap, stack, nextIndex);
+ strongConnect(av, vertexMap, edgeMap, stack, nextIndex, topologicalVertexStack);
}
}
+ return topologicalVertexStack;
}
// part of Tarjan's algorithm for connected components.
// http://en.wikipedia.org/wiki/Tarjan%27s_strongly_connected_components_algorithm
private void strongConnect(
- AnnotatedVertex av,
- Map<String, AnnotatedVertex> vertexMap,
- Map<Vertex, List<Edge>> edgeMap,
- Stack<AnnotatedVertex> stack, Integer nextIndex) throws IllegalStateException {
+ AnnotatedVertex av,
+ Map<String, AnnotatedVertex> vertexMap,
+ Map<Vertex, List<Edge>> edgeMap,
+ Stack<AnnotatedVertex> stack,
+ Integer nextIndex,
+ Deque<String> topologicalVertexStack) throws IllegalStateException {
av.index = nextIndex;
av.lowlink = nextIndex;
nextIndex++;
@@ -707,7 +714,7 @@ public class DAG {
for (Edge e : edgeMap.get(av.v)) {
AnnotatedVertex outVertex = vertexMap.get(e.getOutputVertex().getName());
if (outVertex.index == -1) {
- strongConnect(outVertex, vertexMap, edgeMap, stack, nextIndex);
+ strongConnect(outVertex, vertexMap, edgeMap, stack, nextIndex, topologicalVertexStack);
av.lowlink = Math.min(av.lowlink, outVertex.lowlink);
} else if (outVertex.onstack) {
// strongly connected component detected, but we will wait till later so that the full cycle can be displayed.
@@ -760,7 +767,7 @@ public class DAG {
Map<String, LocalResource> tezJarResources, LocalResource binaryConfig,
boolean tezLrsAsArchive, Map<String, String> additionalConfigs,
ServicePluginsDescriptor servicePluginsDescriptor, JavaOptsChecker javaOptsChecker) {
- verify(true);
+ Deque<String> topologicalVertexStack = verify(true);
DAGPlan.Builder dagBuilder = DAGPlan.newBuilder();
dagBuilder.setName(this.name);
http://git-wip-us.apache.org/repos/asf/tez/blob/9beba6c3/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java b/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java
index 42b762c..d49ba48 100644
--- a/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java
+++ b/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java
@@ -77,6 +77,7 @@ import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezConfigurationConstants;
import org.apache.tez.dag.api.TezConstants;
import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.client.DAGClient;
@@ -745,4 +746,31 @@ public class TestTezClient {
Assert.fail("Failed to retrieve local host information");
}
}
+
+ @Test(timeout = 5000)
+ public void testClientResubmit() throws Exception {
+ TezClientForTest client = configureAndCreateTezClient(null, true, null);
+ client.start();
+ Map<String, LocalResource> lrDAG = Collections.singletonMap("LR1",
+ LocalResource.newInstance(
+ URL.newInstance("file", "localhost", 0, "/test1"),
+ LocalResourceType.FILE,
+ LocalResourceVisibility.PUBLIC, 1, 1));
+ Vertex vertex1 = Vertex.create("Vertex1", ProcessorDescriptor.create("P1"), 1,
+ Resource.newInstance(1, 1));
+ vertex1.setTaskLaunchCmdOpts("-XX:+UseParallelGC -XX:+UseG1GC");
+ Vertex vertex2 = Vertex.create("Vertex2", ProcessorDescriptor.create("P2"), 1,
+ Resource.newInstance(1, 1));
+ vertex2.setTaskLaunchCmdOpts("-XX:+UseParallelGC -XX:+UseG1GC");
+ DAG dag = DAG.create("DAG").addVertex(vertex1).addVertex(vertex2).addTaskLocalFiles(lrDAG);
+ for (int i = 0; i < 3; ++i) {
+ try {
+ client.submitDAG(dag);
+ Assert.fail("Expected TezUncheckedException here.");
+ } catch(TezUncheckedException ex) {
+ Assert.assertTrue(ex.getMessage().contains("Invalid/conflicting GC options found"));
+ }
+ }
+ client.stop();
+ }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/9beba6c3/tez-api/src/test/java/org/apache/tez/dag/api/TestDAG.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAG.java b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAG.java
index 839e780..ae5dfbb 100644
--- a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAG.java
+++ b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAG.java
@@ -18,11 +18,19 @@
package org.apache.tez.dag.api;
+import java.util.Collections;
+import java.util.Map;
+
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.URL;
import org.apache.tez.client.CallerContext;
import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
+import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
import org.junit.Assert;
import org.junit.Test;
@@ -333,4 +341,24 @@ public class TestDAG {
}
+ @Test
+ public void testRecreateDAG() {
+ Map<String, LocalResource> lrDAG = Collections.singletonMap("LR1",
+ LocalResource.newInstance(
+ URL.newInstance("file", "localhost", 0, "/test1"),
+ LocalResourceType.FILE,
+ LocalResourceVisibility.PUBLIC, 1, 1));
+ Vertex v1 = Vertex.create("v1", ProcessorDescriptor.create("dummyProcessor1"), 1,
+ Resource.newInstance(1, 1));
+ Vertex v2 = Vertex.create("v2", ProcessorDescriptor.create("dummyProcessor2"), 1,
+ Resource.newInstance(1, 1));
+ DAG dag = DAG.create("dag1").addVertex(v1).addVertex(v2).addTaskLocalFiles(lrDAG);
+
+ TezConfiguration tezConf = new TezConfiguration();
+ DAGPlan firstPlan = dag.createDag(tezConf, null, null, null, false);
+ for (int i = 0; i < 3; ++i) {
+ DAGPlan dagPlan = dag.createDag(tezConf, null, null, null, false);
+ Assert.assertEquals(dagPlan, firstPlan);
+ }
+ }
}