You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2013/10/17 15:45:22 UTC
git commit: CRUNCH-283: Additional diagnostics for the planner
dotfile.
Updated Branches:
refs/heads/master 9c42bab14 -> ff56d0539
CRUNCH-283: Additional diagnostics for the planner dotfile.
1. Add the target dependencies that are implied by ParallelDoOptions
to the directed graphs (target -> PCollectionImpl) as dotted lines
2. Add a label to each of the clustered subgraphs that includes the
Crunch JobID, to make it easier to map from running jobs to the
dotfile for diagnosis.
Signed-off-by: Josh Wills <jw...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/ff56d053
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/ff56d053
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/ff56d053
Branch: refs/heads/master
Commit: ff56d0539f50b402cc23fe34059e1e44309b5677
Parents: 9c42bab
Author: Josh Wills <jw...@apache.org>
Authored: Wed Oct 16 18:57:34 2013 -0700
Committer: Josh Wills <jw...@apache.org>
Committed: Thu Oct 17 06:43:14 2013 -0700
----------------------------------------------------------------------
.../crunch/impl/mr/collect/PCollectionImpl.java | 6 +-
.../crunch/impl/mr/plan/DotfileWriter.java | 71 +++++++++++++++-----
.../crunch/impl/mr/plan/DotfileWriterTest.java | 41 +++++++++++
3 files changed, 100 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/crunch/blob/ff56d053/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java
index 43711fc..958d7f6 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java
@@ -232,7 +232,11 @@ public abstract class PCollectionImpl<S> implements PCollection<S> {
}
return pipeline;
}
-
+
+ public ParallelDoOptions getParallelDoOptions() {
+ return doOptions;
+ }
+
public Set<SourceTarget<?>> getTargetDependencies() {
Set<SourceTarget<?>> targetDeps = doOptions.getSourceTargets();
for (PCollectionImpl<?> parent : getParents()) {
http://git-wip-us.apache.org/repos/asf/crunch/blob/ff56d053/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/DotfileWriter.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/DotfileWriter.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/DotfileWriter.java
index 2834fb9..dc4a9c2 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/DotfileWriter.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/DotfileWriter.java
@@ -18,9 +18,11 @@
package org.apache.crunch.impl.mr.plan;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import org.apache.crunch.Pair;
+import org.apache.crunch.SourceTarget;
import org.apache.crunch.Target;
import org.apache.crunch.impl.mr.collect.InputCollection;
import org.apache.crunch.impl.mr.collect.PCollectionImpl;
@@ -28,6 +30,8 @@ import org.apache.crunch.impl.mr.collect.PGroupedTableImpl;
import com.google.common.base.Joiner;
import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
@@ -36,7 +40,7 @@ import com.google.common.collect.Sets;
* the topology of Crunch pipelines.
*/
public class DotfileWriter {
-
+
/** The types of tasks within a MapReduce job. */
enum MRTaskType { MAP, REDUCE };
@@ -47,7 +51,7 @@ public class DotfileWriter {
/**
* Format the declaration of a node based on a PCollection.
- *
+ *
* @param pcollectionImpl PCollection for which a node will be declared
* @param jobPrototype The job containing the PCollection
* @return The node declaration
@@ -63,7 +67,7 @@ public class DotfileWriter {
/**
* Format a Target as a node declaration.
- *
+ *
* @param target A Target used within a MapReduce pipeline
* @return The global node declaration for the Target
*/
@@ -73,7 +77,7 @@ public class DotfileWriter {
/**
* Format a PCollectionImpl into a format to be used for dot files.
- *
+ *
* @param pcollectionImpl The PCollectionImpl to be formatted
* @param jobPrototype The job containing the PCollection
* @return The dot file formatted representation of the PCollectionImpl
@@ -88,29 +92,61 @@ public class DotfileWriter {
/**
* Format a collection of node strings into dot file syntax.
- *
+ *
* @param nodeCollection Collection of chained node strings
* @return The dot-formatted chain of nodes
*/
String formatNodeCollection(List<String> nodeCollection) {
- return String.format("%s;", Joiner.on(" -> ").join(nodeCollection));
+ return formatNodeCollection(nodeCollection, ImmutableMap.<String,String>of());
+ }
+
+ /**
+ * Format a collection of node strings into dot file syntax.
+ *
+ * @param nodeCollection Collection of chained node strings
+ * @param edgeAttributes map of attribute names and values to be applied to the edge
+ * @return The dot-formatted chain of nodes
+ */
+ String formatNodeCollection(List<String> nodeCollection, Map<String,String> edgeAttributes) {
+ String edgeAttributeString = "";
+ if (!edgeAttributes.isEmpty()) {
+ edgeAttributeString = String.format(" [%s]",
+ Joiner.on(' ').withKeyValueSeparator("=").join(edgeAttributes));
+ }
+ return String.format("%s%s;", Joiner.on(" -> ").join(nodeCollection), edgeAttributeString);
}
/**
* Format a NodePath in dot file syntax.
- *
+ *
* @param nodePath The node path to be formatted
* @param jobPrototype The job containing the NodePath
* @return The dot file representation of the node path
*/
List<String> formatNodePath(NodePath nodePath, JobPrototype jobPrototype) {
List<String> formattedNodePaths = Lists.newArrayList();
-
- List<PCollectionImpl<?>> pcollections = Lists.newArrayList(nodePath);
+
+ List<PCollectionImpl<?>> pcollections = ImmutableList.copyOf(nodePath);
for (int collectionIndex = 1; collectionIndex < pcollections.size(); collectionIndex++){
String fromNode = formatPCollection(pcollections.get(collectionIndex - 1), jobPrototype);
String toNode = formatPCollection(pcollections.get(collectionIndex), jobPrototype);
- formattedNodePaths.add(formatNodeCollection(Lists.newArrayList(fromNode, toNode)));
+ formattedNodePaths.add(formatNodeCollection(ImmutableList.of(fromNode, toNode)));
+ }
+
+ // Add SourceTarget dependencies, if any
+ for (PCollectionImpl<?> pcollection : pcollections) {
+ Set<SourceTarget<?>> targetDeps = pcollection.getParallelDoOptions().getSourceTargets();
+ if (!targetDeps.isEmpty()) {
+ String toNode = formatPCollection(pcollection, jobPrototype);
+ for(Target target : targetDeps) {
+ globalNodeDeclarations.add(formatTargetNodeDeclaration(target));
+ String fromNode = String.format("\"%s\"", target.toString());
+ formattedNodePaths.add(
+ formatNodeCollection(
+ ImmutableList.of(fromNode, toNode),
+ ImmutableMap.of("style", "dashed")));
+ }
+ }
}
return formattedNodePaths;
}
@@ -118,7 +154,7 @@ public class DotfileWriter {
/**
* Add a NodePath to be formatted as a list of node declarations within a
* single job.
- *
+ *
* @param jobPrototype The job containing the node path
* @param nodePath The node path to be formatted
*/
@@ -128,7 +164,7 @@ public class DotfileWriter {
if (pcollectionImpl instanceof InputCollection) {
globalNodeDeclarations.add(formatPCollectionNodeDeclaration(pcollectionImpl, jobPrototype));
} else {
- if (!groupingEncountered){
+ if (!groupingEncountered) {
groupingEncountered = (pcollectionImpl instanceof PGroupedTableImpl);
}
@@ -141,7 +177,7 @@ public class DotfileWriter {
/**
* Add the chaining of a NodePath to the graph.
- *
+ *
* @param nodePath The path to be formatted as a node chain in the dot file
* @param jobPrototype The job containing the NodePath
*/
@@ -153,7 +189,7 @@ public class DotfileWriter {
/**
* Get the graph attributes for a task-specific subgraph.
- *
+ *
* @param taskType The type of task in the subgraph
* @return Graph attributes
*/
@@ -183,7 +219,7 @@ public class DotfileWriter {
/**
* Add the contents of a {@link JobPrototype} to the graph describing a
* pipeline.
- *
+ *
* @param jobPrototype A JobPrototype representing a portion of a MapReduce
* pipeline
*/
@@ -202,7 +238,7 @@ public class DotfileWriter {
/**
* Build up the full dot file containing the description of a MapReduce
* pipeline.
- *
+ *
* @return Graphviz dot file contents
*/
public String buildDotfile() {
@@ -217,6 +253,7 @@ public class DotfileWriter {
// Must prefix subgraph name with "cluster", otherwise its border won't render. I don't know why.
StringBuilder jobProtoStringBuilder = new StringBuilder();
jobProtoStringBuilder.append(String.format(" subgraph \"cluster-job%d\" {\n", jobPrototype.getJobID()));
+ jobProtoStringBuilder.append(String.format(" label=\"Crunch Job %d\";\n", jobPrototype.getJobID()));
for (MRTaskType taskType : MRTaskType.values()){
Pair<JobPrototype,MRTaskType> jobTaskKey = Pair.of(jobPrototype, taskType);
if (jobNodeDeclarations.containsKey(jobTaskKey)){
@@ -232,7 +269,7 @@ public class DotfileWriter {
jobProtoStringBuilder.append(" }\n");
stringBuilder.append(jobProtoStringBuilder.toString());
}
-
+
for (String nodePathChain : nodePathChains) {
stringBuilder.append(String.format(" %s\n", nodePathChain));
}
http://git-wip-us.apache.org/repos/asf/crunch/blob/ff56d053/crunch-core/src/test/java/org/apache/crunch/impl/mr/plan/DotfileWriterTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/impl/mr/plan/DotfileWriterTest.java b/crunch-core/src/test/java/org/apache/crunch/impl/mr/plan/DotfileWriterTest.java
index 562238d..e85419c 100644
--- a/crunch-core/src/test/java/org/apache/crunch/impl/mr/plan/DotfileWriterTest.java
+++ b/crunch-core/src/test/java/org/apache/crunch/impl/mr/plan/DotfileWriterTest.java
@@ -23,7 +23,9 @@ import static org.mockito.Mockito.when;
import java.util.List;
+import org.apache.crunch.ParallelDoOptions;
import org.apache.crunch.Source;
+import org.apache.crunch.SourceTarget;
import org.apache.crunch.Target;
import org.apache.crunch.impl.mr.collect.InputCollection;
import org.apache.crunch.impl.mr.collect.PCollectionImpl;
@@ -32,6 +34,8 @@ import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
public class DotfileWriterTest {
@@ -102,13 +106,24 @@ public class DotfileWriterTest {
}
@Test
+ public void testFormatNodeCollection_WithStyles() {
+ List<String> nodeCollection = Lists.newArrayList("one", "two");
+ assertEquals(
+ "one -> two [style=dotted];",
+ dotfileWriter.formatNodeCollection(nodeCollection, ImmutableMap.of("style", "dotted")));
+ }
+
+ @Test
public void testFormatNodePath() {
PCollectionImpl<?> tail = mock(PCollectionImpl.class);
PCollectionImpl<?> head = mock(PCollectionImpl.class);
JobPrototype jobPrototype = mock(JobPrototype.class);
+ ParallelDoOptions doOptions = ParallelDoOptions.builder().build();
when(tail.getName()).thenReturn("tail");
when(head.getName()).thenReturn("head");
+ when(tail.getParallelDoOptions()).thenReturn(doOptions);
+ when(head.getParallelDoOptions()).thenReturn(doOptions);
NodePath nodePath = new NodePath(tail);
nodePath.close(head);
@@ -120,6 +135,32 @@ public class DotfileWriterTest {
}
@Test
+ public void testFormatNodePathWithTargetDependencies() {
+ PCollectionImpl<?> tail = mock(PCollectionImpl.class);
+ PCollectionImpl<?> head = mock(PCollectionImpl.class);
+ SourceTarget<?> srcTarget = mock(SourceTarget.class);
+ JobPrototype jobPrototype = mock(JobPrototype.class);
+
+ ParallelDoOptions tailOptions = ParallelDoOptions.builder().sourceTargets(srcTarget).build();
+ ParallelDoOptions headOptions = ParallelDoOptions.builder().build();
+ when(srcTarget.toString()).thenReturn("target");
+ when(tail.getName()).thenReturn("tail");
+ when(head.getName()).thenReturn("head");
+ when(tail.getParallelDoOptions()).thenReturn(tailOptions);
+ when(head.getParallelDoOptions()).thenReturn(headOptions);
+
+ NodePath nodePath = new NodePath(tail);
+ nodePath.close(head);
+
+ assertEquals(
+ ImmutableList.of("\"head@" + head.hashCode() + "@" + jobPrototype.hashCode() + "\" -> \"tail@"
+ + tail.hashCode() + "@" + jobPrototype.hashCode() + "\";",
+ "\"target\" -> \"tail@" + tail.hashCode() + "@" + jobPrototype.hashCode() + "\" [style=dashed];"),
+ dotfileWriter.formatNodePath(nodePath, jobPrototype));
+ }
+
+
+ @Test
public void testGetTaskGraphAttributes_Map() {
assertEquals("label = Map; color = blue;", dotfileWriter.getTaskGraphAttributes(MRTaskType.MAP));
}