You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2013/10/17 15:45:22 UTC

git commit: CRUNCH-283: Additional diagnostics for the planner dotfile.

Updated Branches:
  refs/heads/master 9c42bab14 -> ff56d0539


CRUNCH-283: Additional diagnostics for the planner dotfile.

1. Add the target dependencies that are implied by ParallelDoOptions
to the directed graphs (target -> PCollectionImpl)  as dotted lines
2. Add a label to each of the clustered subgraphs that includes the
Crunch JobID, to make it easier to map from running jobs to the
dotfile for diagnosis.

Signed-off-by: Josh Wills <jw...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/ff56d053
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/ff56d053
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/ff56d053

Branch: refs/heads/master
Commit: ff56d0539f50b402cc23fe34059e1e44309b5677
Parents: 9c42bab
Author: Josh Wills <jw...@apache.org>
Authored: Wed Oct 16 18:57:34 2013 -0700
Committer: Josh Wills <jw...@apache.org>
Committed: Thu Oct 17 06:43:14 2013 -0700

----------------------------------------------------------------------
 .../crunch/impl/mr/collect/PCollectionImpl.java |  6 +-
 .../crunch/impl/mr/plan/DotfileWriter.java      | 71 +++++++++++++++-----
 .../crunch/impl/mr/plan/DotfileWriterTest.java  | 41 +++++++++++
 3 files changed, 100 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/ff56d053/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java
index 43711fc..958d7f6 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java
@@ -232,7 +232,11 @@ public abstract class PCollectionImpl<S> implements PCollection<S> {
     }
     return pipeline;
   }
-  
+
+  public ParallelDoOptions getParallelDoOptions() {
+    return doOptions;
+  }
+
   public Set<SourceTarget<?>> getTargetDependencies() {
     Set<SourceTarget<?>> targetDeps = doOptions.getSourceTargets();
     for (PCollectionImpl<?> parent : getParents()) {

http://git-wip-us.apache.org/repos/asf/crunch/blob/ff56d053/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/DotfileWriter.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/DotfileWriter.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/DotfileWriter.java
index 2834fb9..dc4a9c2 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/DotfileWriter.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/DotfileWriter.java
@@ -18,9 +18,11 @@
 package org.apache.crunch.impl.mr.plan;
 
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 import org.apache.crunch.Pair;
+import org.apache.crunch.SourceTarget;
 import org.apache.crunch.Target;
 import org.apache.crunch.impl.mr.collect.InputCollection;
 import org.apache.crunch.impl.mr.collect.PCollectionImpl;
@@ -28,6 +30,8 @@ import org.apache.crunch.impl.mr.collect.PGroupedTableImpl;
 
 import com.google.common.base.Joiner;
 import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 
@@ -36,7 +40,7 @@ import com.google.common.collect.Sets;
  * the topology of Crunch pipelines.
  */
 public class DotfileWriter {
-  
+
   /** The types of tasks within a MapReduce job. */
   enum MRTaskType { MAP, REDUCE };
 
@@ -47,7 +51,7 @@ public class DotfileWriter {
 
   /**
    * Format the declaration of a node based on a PCollection.
-   * 
+   *
    * @param pcollectionImpl PCollection for which a node will be declared
    * @param jobPrototype The job containing the PCollection
    * @return The node declaration
@@ -63,7 +67,7 @@ public class DotfileWriter {
 
   /**
    * Format a Target as a node declaration.
-   * 
+   *
    * @param target A Target used within a MapReduce pipeline
    * @return The global node declaration for the Target
    */
@@ -73,7 +77,7 @@ public class DotfileWriter {
 
   /**
    * Format a PCollectionImpl into a format to be used for dot files.
-   * 
+   *
    * @param pcollectionImpl The PCollectionImpl to be formatted
    * @param jobPrototype The job containing the PCollection
    * @return The dot file formatted representation of the PCollectionImpl
@@ -88,29 +92,61 @@ public class DotfileWriter {
 
   /**
    * Format a collection of node strings into dot file syntax.
-   * 
+   *
    * @param nodeCollection Collection of chained node strings
    * @return The dot-formatted chain of nodes
    */
   String formatNodeCollection(List<String> nodeCollection) {
-    return String.format("%s;", Joiner.on(" -> ").join(nodeCollection));
+    return formatNodeCollection(nodeCollection, ImmutableMap.<String,String>of());
+  }
+
+  /**
+   * Format a collection of node strings into dot file syntax.
+   *
+   * @param nodeCollection Collection of chained node strings
+   * @param edgeAttributes map of attribute names and values to be applied to the edge
+   * @return The dot-formatted chain of nodes
+   */
+  String formatNodeCollection(List<String> nodeCollection, Map<String,String> edgeAttributes) {
+    String edgeAttributeString = "";
+    if (!edgeAttributes.isEmpty()) {
+      edgeAttributeString = String.format(" [%s]",
+        Joiner.on(' ').withKeyValueSeparator("=").join(edgeAttributes));
+    }
+    return String.format("%s%s;", Joiner.on(" -> ").join(nodeCollection), edgeAttributeString);
   }
 
   /**
    * Format a NodePath in dot file syntax.
-   * 
+   *
    * @param nodePath The node path to be formatted
    * @param jobPrototype The job containing the NodePath
    * @return The dot file representation of the node path
    */
   List<String> formatNodePath(NodePath nodePath, JobPrototype jobPrototype) {
     List<String> formattedNodePaths = Lists.newArrayList();
-    
-    List<PCollectionImpl<?>> pcollections = Lists.newArrayList(nodePath);
+
+    List<PCollectionImpl<?>> pcollections = ImmutableList.copyOf(nodePath);
     for (int collectionIndex = 1; collectionIndex < pcollections.size(); collectionIndex++){
       String fromNode = formatPCollection(pcollections.get(collectionIndex - 1), jobPrototype);
       String toNode = formatPCollection(pcollections.get(collectionIndex), jobPrototype);
-      formattedNodePaths.add(formatNodeCollection(Lists.newArrayList(fromNode, toNode)));
+      formattedNodePaths.add(formatNodeCollection(ImmutableList.of(fromNode, toNode)));
+    }
+
+    // Add SourceTarget dependencies, if any
+    for (PCollectionImpl<?> pcollection : pcollections) {
+      Set<SourceTarget<?>> targetDeps = pcollection.getParallelDoOptions().getSourceTargets();
+      if (!targetDeps.isEmpty()) {
+        String toNode = formatPCollection(pcollection, jobPrototype);
+        for(Target target : targetDeps) {
+          globalNodeDeclarations.add(formatTargetNodeDeclaration(target));
+          String fromNode = String.format("\"%s\"", target.toString());
+          formattedNodePaths.add(
+            formatNodeCollection(
+              ImmutableList.of(fromNode, toNode),
+              ImmutableMap.of("style", "dashed")));
+        }
+      }
     }
     return formattedNodePaths;
   }
@@ -118,7 +154,7 @@ public class DotfileWriter {
   /**
    * Add a NodePath to be formatted as a list of node declarations within a
    * single job.
-   * 
+   *
    * @param jobPrototype The job containing the node path
    * @param nodePath The node path to be formatted
    */
@@ -128,7 +164,7 @@ public class DotfileWriter {
       if (pcollectionImpl instanceof InputCollection) {
         globalNodeDeclarations.add(formatPCollectionNodeDeclaration(pcollectionImpl, jobPrototype));
       } else {
-        if (!groupingEncountered){
+        if (!groupingEncountered) {
           groupingEncountered = (pcollectionImpl instanceof PGroupedTableImpl);
         }
 
@@ -141,7 +177,7 @@ public class DotfileWriter {
 
   /**
    * Add the chaining of a NodePath to the graph.
-   * 
+   *
    * @param nodePath The path to be formatted as a node chain in the dot file
    * @param jobPrototype The job containing the NodePath
    */
@@ -153,7 +189,7 @@ public class DotfileWriter {
 
   /**
    * Get the graph attributes for a task-specific subgraph.
-   * 
+   *
    * @param taskType The type of task in the subgraph
    * @return Graph attributes
    */
@@ -183,7 +219,7 @@ public class DotfileWriter {
   /**
    * Add the contents of a {@link JobPrototype} to the graph describing a
    * pipeline.
-   * 
+   *
    * @param jobPrototype A JobPrototype representing a portion of a MapReduce
    *          pipeline
    */
@@ -202,7 +238,7 @@ public class DotfileWriter {
   /**
    * Build up the full dot file containing the description of a MapReduce
    * pipeline.
-   * 
+   *
    * @return Graphviz dot file contents
    */
   public String buildDotfile() {
@@ -217,6 +253,7 @@ public class DotfileWriter {
       // Must prefix subgraph name with "cluster", otherwise its border won't render. I don't know why.
       StringBuilder jobProtoStringBuilder = new StringBuilder();
       jobProtoStringBuilder.append(String.format("  subgraph \"cluster-job%d\" {\n", jobPrototype.getJobID()));
+      jobProtoStringBuilder.append(String.format("    label=\"Crunch Job %d\";\n", jobPrototype.getJobID()));
       for (MRTaskType taskType : MRTaskType.values()){
         Pair<JobPrototype,MRTaskType> jobTaskKey = Pair.of(jobPrototype, taskType);
         if (jobNodeDeclarations.containsKey(jobTaskKey)){
@@ -232,7 +269,7 @@ public class DotfileWriter {
       jobProtoStringBuilder.append("  }\n");
       stringBuilder.append(jobProtoStringBuilder.toString());
     }
-    
+
     for (String nodePathChain : nodePathChains) {
       stringBuilder.append(String.format("  %s\n", nodePathChain));
     }

http://git-wip-us.apache.org/repos/asf/crunch/blob/ff56d053/crunch-core/src/test/java/org/apache/crunch/impl/mr/plan/DotfileWriterTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/impl/mr/plan/DotfileWriterTest.java b/crunch-core/src/test/java/org/apache/crunch/impl/mr/plan/DotfileWriterTest.java
index 562238d..e85419c 100644
--- a/crunch-core/src/test/java/org/apache/crunch/impl/mr/plan/DotfileWriterTest.java
+++ b/crunch-core/src/test/java/org/apache/crunch/impl/mr/plan/DotfileWriterTest.java
@@ -23,7 +23,9 @@ import static org.mockito.Mockito.when;
 
 import java.util.List;
 
+import org.apache.crunch.ParallelDoOptions;
 import org.apache.crunch.Source;
+import org.apache.crunch.SourceTarget;
 import org.apache.crunch.Target;
 import org.apache.crunch.impl.mr.collect.InputCollection;
 import org.apache.crunch.impl.mr.collect.PCollectionImpl;
@@ -32,6 +34,8 @@ import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;
 
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 
 public class DotfileWriterTest {
@@ -102,13 +106,24 @@ public class DotfileWriterTest {
   }
 
   @Test
+  public void testFormatNodeCollection_WithStyles() {
+    List<String> nodeCollection = Lists.newArrayList("one", "two");
+    assertEquals(
+      "one -> two [style=dotted];",
+      dotfileWriter.formatNodeCollection(nodeCollection, ImmutableMap.of("style", "dotted")));
+  }
+
+  @Test
   public void testFormatNodePath() {
     PCollectionImpl<?> tail = mock(PCollectionImpl.class);
     PCollectionImpl<?> head = mock(PCollectionImpl.class);
     JobPrototype jobPrototype = mock(JobPrototype.class);
+    ParallelDoOptions doOptions = ParallelDoOptions.builder().build();
 
     when(tail.getName()).thenReturn("tail");
     when(head.getName()).thenReturn("head");
+    when(tail.getParallelDoOptions()).thenReturn(doOptions);
+    when(head.getParallelDoOptions()).thenReturn(doOptions);
 
     NodePath nodePath = new NodePath(tail);
     nodePath.close(head);
@@ -120,6 +135,32 @@ public class DotfileWriterTest {
   }
 
   @Test
+  public void testFormatNodePathWithTargetDependencies() {
+    PCollectionImpl<?> tail = mock(PCollectionImpl.class);
+    PCollectionImpl<?> head = mock(PCollectionImpl.class);
+    SourceTarget<?> srcTarget = mock(SourceTarget.class);
+    JobPrototype jobPrototype = mock(JobPrototype.class);
+
+    ParallelDoOptions tailOptions = ParallelDoOptions.builder().sourceTargets(srcTarget).build();
+    ParallelDoOptions headOptions = ParallelDoOptions.builder().build();
+    when(srcTarget.toString()).thenReturn("target");
+    when(tail.getName()).thenReturn("tail");
+    when(head.getName()).thenReturn("head");
+    when(tail.getParallelDoOptions()).thenReturn(tailOptions);
+    when(head.getParallelDoOptions()).thenReturn(headOptions);
+
+    NodePath nodePath = new NodePath(tail);
+    nodePath.close(head);
+
+    assertEquals(
+        ImmutableList.of("\"head@" + head.hashCode() + "@" + jobPrototype.hashCode() + "\" -> \"tail@"
+            + tail.hashCode() + "@" + jobPrototype.hashCode() + "\";",
+            "\"target\" -> \"tail@" + tail.hashCode() + "@" + jobPrototype.hashCode() + "\" [style=dashed];"),
+        dotfileWriter.formatNodePath(nodePath, jobPrototype));
+  }
+
+
+  @Test
   public void testGetTaskGraphAttributes_Map() {
     assertEquals("label = Map; color = blue;", dotfileWriter.getTaskGraphAttributes(MRTaskType.MAP));
   }