You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2014/05/28 23:37:19 UTC
git commit: TEZ-800. One-one edge with parallelism -1 fails if source
vertex parallelism is not -1 as well (bikas)
Repository: incubator-tez
Updated Branches:
refs/heads/master 0ccc8e0c7 -> ae267de54
TEZ-800. One-one edge with parallelism -1 fails if source vertex parallelism is not -1 as well (bikas)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/ae267de5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/ae267de5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/ae267de5
Branch: refs/heads/master
Commit: ae267de541cc2014c770f34cf0e117c7e003b28c
Parents: 0ccc8e0
Author: Bikas Saha <bi...@apache.org>
Authored: Wed May 28 14:37:11 2014 -0700
Committer: Bikas Saha <bi...@apache.org>
Committed: Wed May 28 14:37:11 2014 -0700
----------------------------------------------------------------------
.../main/java/org/apache/tez/dag/api/DAG.java | 58 ++++-
.../java/org/apache/tez/dag/api/Vertex.java | 6 +-
.../org/apache/tez/dag/api/TestDAGVerify.java | 224 +++++++++++++++++++
.../examples/BroadcastAndOneToOneExample.java | 2 +-
4 files changed, 286 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/ae267de5/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
index 4483e20..c49ac79 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
@@ -236,7 +236,59 @@ public class DAG { // FIXME rename to Topology
}
}
}
-
+
+ void checkAndInferOneToOneParallelism() {
+ // infer all 1-1 via dependencies
+ // collect all 1-1 edges where the source parallelism is set
+ Set<Vertex> newKnownTasksVertices = Sets.newHashSet();
+ for (Vertex vertex : vertices.values()) {
+ if (vertex.getParallelism() > -1) {
+ newKnownTasksVertices.add(vertex);
+ }
+ }
+
+ // walk through all known source 1-1 edges and infer parallelism
+ // add newly inferred vertices for consideration as known sources
+ // the outer loop will run for every new level of inferring the parallelism
+ // however, the entire logic will process each vertex only once
+ while(!newKnownTasksVertices.isEmpty()) {
+ Set<Vertex> knownTasksVertices = Sets.newHashSet(newKnownTasksVertices);
+ newKnownTasksVertices.clear();
+ for (Vertex v : knownTasksVertices) {
+ for (Edge e : v.getOutputEdges()) {
+ if (e.getEdgeProperty().getDataMovementType() == DataMovementType.ONE_TO_ONE) {
+ Vertex outVertex = e.getOutputVertex();
+ if (outVertex.getParallelism() == -1) {
+ LOG.info("Inferring parallelism for vertex: "
+ + outVertex.getVertexName() + " to be " + v.getParallelism()
+ + " from 1-1 connection with vertex " + v.getVertexName());
+ outVertex.setParallelism(v.getParallelism());
+ newKnownTasksVertices.add(outVertex);
+ }
+ }
+ }
+ }
+ }
+
+ // check for inconsistency and errors
+ for (Edge e : edges) {
+ Vertex inputVertex = e.getInputVertex();
+ Vertex outputVertex = e.getOutputVertex();
+
+ if (e.getEdgeProperty().getDataMovementType() == DataMovementType.ONE_TO_ONE) {
+ if (inputVertex.getParallelism() != outputVertex.getParallelism()) {
+ // both should be equal or equal to -1.
+ if (outputVertex.getParallelism() != -1) {
+ throw new TezUncheckedException(
+ "1-1 Edge. Destination vertex parallelism must match source vertex. "
+ + "Vertex: " + inputVertex.getVertexName() + " does not match vertex: "
+ + outputVertex.getVertexName());
+ }
+ }
+ }
+ }
+ }
+
// AnnotatedVertex is used by verify()
private static class AnnotatedVertex {
Vertex v;
@@ -300,7 +352,7 @@ public class DAG { // FIXME rename to Topology
for (Edge e : edges) {
// Construct structure for cycle detection
Vertex inputVertex = e.getInputVertex();
- Vertex outputVertex = e.getOutputVertex();
+ Vertex outputVertex = e.getOutputVertex();
List<Edge> edgeList = edgeMap.get(inputVertex);
if (edgeList == null) {
edgeList = new ArrayList<Edge>();
@@ -378,6 +430,8 @@ public class DAG { // FIXME rename to Topology
// within the addInput / addOutput call itself.
detectCycles(edgeMap, vertexMap);
+
+ checkAndInferOneToOneParallelism();
if (restricted) {
for (Edge e : edges) {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/ae267de5/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java b/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java
index b05a63c..1be75a2 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java
@@ -41,7 +41,7 @@ public class Vertex {
private final String vertexName;
private final ProcessorDescriptor processorDescriptor;
- private final int parallelism;
+ private int parallelism;
private VertexLocationHint taskLocationsHint;
private final Resource taskResource;
private Map<String, LocalResource> taskLocalResources = new HashMap<String, LocalResource>();
@@ -89,6 +89,10 @@ public class Vertex {
public int getParallelism() {
return parallelism;
}
+
+ void setParallelism(int parallelism) {
+ this.parallelism = parallelism;
+ }
public Resource getTaskResource() {
return taskResource;
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/ae267de5/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
----------------------------------------------------------------------
diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
index 7d55c39..4b6dfc6 100644
--- a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
+++ b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
@@ -97,6 +97,179 @@ public class TestDAGVerify {
dag.verify();
}
+ @Test
+ // v1 (known) -> v2 (-1) -> v3 (-1)
+ public void testVerifyOneToOneInferParallelism() {
+ Vertex v1 = new Vertex("v1",
+ new ProcessorDescriptor(dummyProcessorClassName),
+ dummyTaskCount, dummyTaskResource);
+ Vertex v2 = new Vertex("v2",
+ new ProcessorDescriptor("MapProcessor"),
+ -1, dummyTaskResource);
+ Vertex v3 = new Vertex("v3",
+ new ProcessorDescriptor("MapProcessor"),
+ -1, dummyTaskResource);
+ Edge e1 = new Edge(v1, v2,
+ new EdgeProperty(DataMovementType.ONE_TO_ONE,
+ DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
+ new OutputDescriptor(dummyOutputClassName),
+ new InputDescriptor(dummyInputClassName)));
+ Edge e2 = new Edge(v2, v3,
+ new EdgeProperty(DataMovementType.ONE_TO_ONE,
+ DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
+ new OutputDescriptor(dummyOutputClassName),
+ new InputDescriptor(dummyInputClassName)));
+ DAG dag = new DAG("testDag");
+ dag.addVertex(v1);
+ dag.addVertex(v2);
+ dag.addVertex(v3);
+ dag.addEdge(e1);
+ dag.addEdge(e2);
+ dag.verify();
+ Assert.assertEquals(dummyTaskCount, v2.getParallelism());
+ Assert.assertEquals(dummyTaskCount, v3.getParallelism());
+ }
+
+ @Test
+ // v1 (known) -> v2 (-1) -> v3 (-1)
+ // The test checks resiliency to ordering of the vertices/edges
+ public void testVerifyOneToOneInferParallelismReverseOrder() {
+ Vertex v1 = new Vertex("v1",
+ new ProcessorDescriptor(dummyProcessorClassName),
+ dummyTaskCount, dummyTaskResource);
+ Vertex v2 = new Vertex("v2",
+ new ProcessorDescriptor("MapProcessor"),
+ -1, dummyTaskResource);
+ Vertex v3 = new Vertex("v3",
+ new ProcessorDescriptor("MapProcessor"),
+ -1, dummyTaskResource);
+ Edge e1 = new Edge(v1, v2,
+ new EdgeProperty(DataMovementType.ONE_TO_ONE,
+ DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
+ new OutputDescriptor(dummyOutputClassName),
+ new InputDescriptor(dummyInputClassName)));
+ Edge e2 = new Edge(v2, v3,
+ new EdgeProperty(DataMovementType.ONE_TO_ONE,
+ DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
+ new OutputDescriptor(dummyOutputClassName),
+ new InputDescriptor(dummyInputClassName)));
+ DAG dag = new DAG("testDag");
+ dag.addVertex(v3);
+ dag.addVertex(v1);
+ dag.addVertex(v2);
+ dag.addEdge(e2);
+ dag.addEdge(e1);
+ dag.verify();
+ Assert.assertEquals(dummyTaskCount, v2.getParallelism());
+ Assert.assertEquals(dummyTaskCount, v3.getParallelism());
+ }
+
+ @Test
+ public void testVerifyOneToOneNoInferParallelism() {
+ Vertex v1 = new Vertex("v1",
+ new ProcessorDescriptor(dummyProcessorClassName),
+ -1, dummyTaskResource);
+ Vertex v2 = new Vertex("v2",
+ new ProcessorDescriptor("MapProcessor"),
+ -1, dummyTaskResource);
+ Edge e1 = new Edge(v1, v2,
+ new EdgeProperty(DataMovementType.ONE_TO_ONE,
+ DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
+ new OutputDescriptor(dummyOutputClassName),
+ new InputDescriptor(dummyInputClassName)));
+ DAG dag = new DAG("testDag");
+ dag.addVertex(v1);
+ dag.addVertex(v2);
+ dag.addEdge(e1);
+ dag.verify();
+ Assert.assertEquals(-1, v2.getParallelism());
+ }
+
+ @Test
+ // v1 (-1) -> v2 (known) -> v3 (-1)
+ public void testVerifyOneToOneIncorrectParallelism1() {
+ Vertex v1 = new Vertex("v1",
+ new ProcessorDescriptor(dummyProcessorClassName),
+ -1, dummyTaskResource);
+ Vertex v2 = new Vertex("v2",
+ new ProcessorDescriptor(dummyProcessorClassName),
+ dummyTaskCount, dummyTaskResource);
+ Vertex v3 = new Vertex("v3",
+ new ProcessorDescriptor("MapProcessor"),
+ -1, dummyTaskResource);
+ Edge e1 = new Edge(v1, v3,
+ new EdgeProperty(DataMovementType.ONE_TO_ONE,
+ DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
+ new OutputDescriptor(dummyOutputClassName),
+ new InputDescriptor(dummyInputClassName)));
+ Edge e2 = new Edge(v2, v3,
+ new EdgeProperty(DataMovementType.ONE_TO_ONE,
+ DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
+ new OutputDescriptor(dummyOutputClassName),
+ new InputDescriptor(dummyInputClassName)));
+ DAG dag = new DAG("testDag");
+ dag.addVertex(v1);
+ dag.addVertex(v2);
+ dag.addVertex(v3);
+ dag.addEdge(e1);
+ dag.addEdge(e2);
+ try {
+ dag.verify();
+ Assert.assertTrue(false);
+ } catch (TezUncheckedException e) {
+ Assert.assertTrue(e.getMessage().contains(
+ "1-1 Edge. Destination vertex parallelism must match source vertex"));
+ }
+ }
+
+ @Test
+ // v1 (-1) -> v3 (-1), v2 (known) -> v3 (-1)
+ // order of edges should not matter
+ public void testVerifyOneToOneIncorrectParallelism2() {
+ Vertex v1 = new Vertex("v1",
+ new ProcessorDescriptor(dummyProcessorClassName),
+ -1, dummyTaskResource);
+ Vertex v2 = new Vertex("v2",
+ new ProcessorDescriptor(dummyProcessorClassName),
+ dummyTaskCount, dummyTaskResource);
+ Vertex v3 = new Vertex("v3",
+ new ProcessorDescriptor(dummyProcessorClassName),
+ -1, dummyTaskResource);
+ Vertex v4 = new Vertex("v4",
+ new ProcessorDescriptor(dummyProcessorClassName),
+ -1, dummyTaskResource);
+ Edge e1 = new Edge(v1, v4,
+ new EdgeProperty(DataMovementType.ONE_TO_ONE,
+ DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
+ new OutputDescriptor(dummyOutputClassName),
+ new InputDescriptor(dummyInputClassName)));
+ Edge e2 = new Edge(v2, v4,
+ new EdgeProperty(DataMovementType.ONE_TO_ONE,
+ DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
+ new OutputDescriptor(dummyOutputClassName),
+ new InputDescriptor(dummyInputClassName)));
+ Edge e3 = new Edge(v3, v4,
+ new EdgeProperty(DataMovementType.ONE_TO_ONE,
+ DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
+ new OutputDescriptor(dummyOutputClassName),
+ new InputDescriptor(dummyInputClassName)));
+ DAG dag = new DAG("testDag");
+ dag.addVertex(v1);
+ dag.addVertex(v2);
+ dag.addVertex(v3);
+ dag.addVertex(v4);
+ dag.addEdge(e1);
+ dag.addEdge(e2);
+ dag.addEdge(e3);
+ try {
+ dag.verify();
+ Assert.assertTrue(false);
+ } catch (TezUncheckedException e) {
+ Assert.assertTrue(e.getMessage().contains(
+ "1-1 Edge. Destination vertex parallelism must match source vertex"));
+ }
+ }
+
@Test
public void testVerifyBroadcast() {
Vertex v1 = new Vertex("v1",
@@ -544,6 +717,57 @@ public class TestDAGVerify {
Assert.assertTrue(v5.getGroupInputs().containsKey(groupName2));
Assert.assertEquals(2, dag.vertexGroups.size());
}
+
+ @Test
+ public void testVertexGroupOneToOne() {
+ Vertex v1 = new Vertex("v1",
+ new ProcessorDescriptor("Processor"),
+ dummyTaskCount, dummyTaskResource);
+ Vertex v2 = new Vertex("v2",
+ new ProcessorDescriptor("Processor"),
+ dummyTaskCount, dummyTaskResource);
+ Vertex v3 = new Vertex("v3",
+ new ProcessorDescriptor("Processor"),
+ dummyTaskCount, dummyTaskResource);
+ Vertex v4 = new Vertex("v4",
+ new ProcessorDescriptor("Processor"),
+ dummyTaskCount, dummyTaskResource);
+ Vertex v5 = new Vertex("v5",
+ new ProcessorDescriptor("Processor"),
+ -1, dummyTaskResource);
+
+ DAG dag = new DAG("testDag");
+ String groupName1 = "uv12";
+ VertexGroup uv12 = dag.createVertexGroup(groupName1, v1, v2);
+ OutputDescriptor outDesc = new OutputDescriptor();
+ uv12.addOutput("uvOut", outDesc, null);
+
+ String groupName2 = "uv23";
+ VertexGroup uv23 = dag.createVertexGroup(groupName2, v2, v3);
+
+ GroupInputEdge e1 = new GroupInputEdge(uv12, v4,
+ new EdgeProperty(DataMovementType.ONE_TO_ONE,
+ DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
+ new OutputDescriptor("dummy output class"),
+ new InputDescriptor("dummy input class")),
+ new InputDescriptor("dummy input class"));
+ GroupInputEdge e2 = new GroupInputEdge(uv23, v5,
+ new EdgeProperty(DataMovementType.ONE_TO_ONE,
+ DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
+ new OutputDescriptor("dummy output class"),
+ new InputDescriptor("dummy input class")),
+ new InputDescriptor("dummy input class"));
+
+ dag.addVertex(v1);
+ dag.addVertex(v2);
+ dag.addVertex(v3);
+ dag.addVertex(v4);
+ dag.addVertex(v5);
+ dag.addEdge(e1);
+ dag.addEdge(e2);
+ dag.verify();
+ Assert.assertEquals(dummyTaskCount, v5.getParallelism());
+ }
// v1
// | |
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/ae267de5/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java
index 0cd6aaf..3bb4223 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java
@@ -183,7 +183,7 @@ public class BroadcastAndOneToOneExample extends Configured implements Tool {
Vertex oneToOneVertex = new Vertex("OneToOne",
new ProcessorDescriptor(
OneToOneProcessor.class.getName()).setUserPayload(procPayload),
- numOneToOneTasks, MRHelpers.getReduceResource(kvOneToOneConf));
+ -1, MRHelpers.getReduceResource(kvOneToOneConf));
oneToOneVertex.setJavaOpts(
MRHelpers.getReduceJavaOpts(kvOneToOneConf)).setVertexManagerPlugin(
new VertexManagerPluginDescriptor(InputReadyVertexManager.class.getName()));