You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2014/06/20 04:40:08 UTC

git commit: CRUNCH-418: Add a logging directory parameter for Crunch DOT plan files. Contributed by Allan Shoup.

Repository: crunch
Updated Branches:
  refs/heads/master ac4a525ad -> 189febe62


CRUNCH-418: Add a logging directory parameter for Crunch DOT plan files. Contributed by Allan Shoup.


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/189febe6
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/189febe6
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/189febe6

Branch: refs/heads/master
Commit: 189febe62496e3e44386c3d095949a3041a6bae0
Parents: ac4a525
Author: Josh Wills <jw...@apache.org>
Authored: Thu Jun 19 19:39:13 2014 -0700
Committer: Josh Wills <jw...@apache.org>
Committed: Thu Jun 19 19:39:43 2014 -0700

----------------------------------------------------------------------
 .../it/java/org/apache/crunch/MRPipelineIT.java | 29 +++++++++
 .../org/apache/crunch/impl/mr/MRPipeline.java   | 67 +++++++++++++++++++-
 .../crunch/impl/mr/plan/PlanningParameters.java |  7 ++
 3 files changed, 100 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/189febe6/crunch-core/src/it/java/org/apache/crunch/MRPipelineIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/MRPipelineIT.java b/crunch-core/src/it/java/org/apache/crunch/MRPipelineIT.java
index 25c85c8..6af3f84 100644
--- a/crunch-core/src/it/java/org/apache/crunch/MRPipelineIT.java
+++ b/crunch-core/src/it/java/org/apache/crunch/MRPipelineIT.java
@@ -17,16 +17,22 @@
  */
 package org.apache.crunch;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 import java.io.File;
+import java.io.FileFilter;
 import java.io.IOException;
 import java.io.Serializable;
+import java.net.URLEncoder;
 
+import com.google.common.io.Files;
+import org.apache.commons.io.filefilter.SuffixFileFilter;
 import org.apache.crunch.PipelineResult.StageResult;
 import org.apache.crunch.fn.FilterFns;
 import org.apache.crunch.fn.IdentityFn;
 import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.impl.mr.plan.PlanningParameters;
 import org.apache.crunch.io.To;
 import org.apache.crunch.test.TemporaryPath;
 import org.apache.crunch.test.TemporaryPaths;
@@ -79,5 +85,28 @@ public class MRPipelineIT implements Serializable {
     assertTrue(new File(outputDirA, "part-r-00000").exists());
     assertTrue(new File(outputDirB, "part-r-00000").exists());
   }
+ 
+  @Test
+  public void testWritingOfDotfile() throws IOException {
+    File dotfileDir = Files.createTempDir();
+    Pipeline pipeline = new MRPipeline(MRPipelineIT.class, tmpDir.getDefaultConfiguration());
+    pipeline.getConfiguration().set(PlanningParameters.PIPELINE_DOTFILE_OUTPUT_DIR, dotfileDir.getAbsolutePath());
+
+    PCollection<String> lines = pipeline.readTextFile(tmpDir.copyResourceFileName("set1.txt"));
+    pipeline.write(
+        lines.parallelDo(IdentityFn.<String>getInstance(), Writables.strings()),
+        To.textFile(tmpDir.getFile("output").getAbsolutePath()));
+    pipeline.done();
 
+    File[] files = dotfileDir.listFiles((FileFilter)new SuffixFileFilter(".dot"));
+    assertEquals(1, files.length);
+    String fileName = files[0].getName();
+    String fileNamePrefix = URLEncoder.encode(pipeline.getName(), "UTF-8");
+    fileNamePrefix = (fileNamePrefix.length() < 150) ? fileNamePrefix : fileNamePrefix.substring(0, 150);
+    assertTrue("DOT file name '" + fileName + "' did not start with the pipeline name '" + fileNamePrefix + "'.",
+        fileName.startsWith(fileNamePrefix));
+    
+    String regex = ".*_\\d{4}-\\d{2}-\\d{2}_\\d{2}\\.\\d{2}\\.\\d{2}\\.\\d{3}_jobplan\\.dot";
+    assertTrue("DOT file name '" + fileName + "' did not match regex '" + regex + "'.", fileName.matches(regex));
+  }
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/189febe6/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java
index 01a3ead..6cfc6d0 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java
@@ -18,8 +18,16 @@
 package org.apache.crunch.impl.mr;
 
 import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URLEncoder;
+import java.text.SimpleDateFormat;
+import java.util.Date;
 import java.util.Map;
 
+import com.google.common.base.Charsets;
+import com.google.common.collect.Maps;
+import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.crunch.CachingOptions;
@@ -32,11 +40,13 @@ import org.apache.crunch.impl.dist.collect.PCollectionImpl;
 import org.apache.crunch.impl.mr.collect.MRCollectionFactory;
 import org.apache.crunch.impl.mr.exec.MRExecutor;
 import org.apache.crunch.impl.mr.plan.MSCRPlanner;
+import org.apache.crunch.impl.mr.plan.PlanningParameters;
 import org.apache.crunch.io.ReadableSource;
 import org.apache.crunch.materialize.MaterializableIterable;
 import org.apache.hadoop.conf.Configuration;
-
-import com.google.common.collect.Maps;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 
 /**
  * Pipeline implementation that is executed within Hadoop MapReduce.
@@ -120,7 +130,9 @@ public class MRPipeline extends DistributedPipeline {
   
   @Override
   public MRPipelineExecution runAsync() {
-    MRPipelineExecution res = plan().execute();
+    MRExecutor mrExecutor = plan();
+    writePlanDotFile(mrExecutor.getPlanDotFile());
+    MRPipelineExecution res = mrExecutor.execute();
     outputTargets.clear();
     return res;
   }
@@ -141,4 +153,53 @@ public class MRPipeline extends DistributedPipeline {
     // Identical to materialization in a MapReduce context
     materialize(pcollection);
   }
+
+  /**
+   * Writes the MR job plan dot file contents to a timestamped file if the PIPELINE_DOTFILE_OUTPUT_DIR
+   * config key is set with an output directory.
+   *
+   * @param dotFileContents contents to be written to the dot file
+   */
+  private void writePlanDotFile(String dotFileContents) {
+    String dotFileDir = getConfiguration().get(PlanningParameters.PIPELINE_DOTFILE_OUTPUT_DIR);
+    if (dotFileDir != null) {
+      FSDataOutputStream outputStream = null;
+      Exception thrownException = null;
+      try {
+        URI uri = new URI(dotFileDir);
+        FileSystem fs = FileSystem.get(uri, getConfiguration());
+        SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd_HH.mm.ss.SSS");
+        String filenameSuffix = String.format("_%s_jobplan.dot", dateFormat.format(new Date()));
+        String encodedName = URLEncoder.encode(getName(), "UTF-8");
+        // We limit the pipeline name to the first 150 characters to keep the output dotfile length less 
+        // than 200, as it's not clear what the exact limits are on the filesystem we're writing to (this
+        // might be HDFS or it might be a local filesystem)
+        final int maxPipeNameLength = 150;
+        String filenamePrefix = encodedName.substring(0, Math.min(maxPipeNameLength, encodedName.length()));
+        Path jobPlanPath = new Path(uri.getPath(), filenamePrefix + filenameSuffix);
+        LOG.info("Writing jobplan to " + jobPlanPath);
+        outputStream = fs.create(jobPlanPath, true);
+        outputStream.write(dotFileContents.getBytes(Charsets.UTF_8));
+      } catch (URISyntaxException e) {
+        thrownException = e;
+        throw new CrunchRuntimeException("Invalid dot file dir URI, job plan will not be written: " + dotFileDir, e);
+      } catch (IOException e) {
+        thrownException = e;
+        throw new CrunchRuntimeException("Error writing dotfile contents to " + dotFileDir, e);
+      } catch (RuntimeException e) {
+        thrownException = e;
+        throw e;
+      } finally {
+        if (outputStream != null) {
+          try {
+            outputStream.close();
+          } catch (IOException e) {
+            if (thrownException == null)
+              throw new CrunchRuntimeException("Error closing dotfile", e);
+          }
+        }
+      }
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/189febe6/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/PlanningParameters.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/PlanningParameters.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/PlanningParameters.java
index cdfb46f..de89c48 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/PlanningParameters.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/PlanningParameters.java
@@ -33,6 +33,13 @@ public final class PlanningParameters {
    */
   public static final String PIPELINE_PLAN_DOTFILE = "crunch.planner.dotfile";
 
+  /**
+   * Configuration key under which a directory URI can be stored where MapReduce pipeline job plans in
+   * <a href="http://www.graphviz.org">DOT</a> format are stored. The dot files are only written if this configuration
+   * parameter is set.
+   */
+  public static final String PIPELINE_DOTFILE_OUTPUT_DIR = "crunch.planner.dotfile.outputdir";
+
   public static final String JOB_NAME_MAX_STACK_LENGTH = "crunch.job.name.max.stack.length";
 
   private PlanningParameters() {