You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2014/06/20 04:40:08 UTC
git commit: CRUNCH-418: Add a logging directory parameter for Crunch
DOT plan files. Contributed by Allan Shoup.
Repository: crunch
Updated Branches:
refs/heads/master ac4a525ad -> 189febe62
CRUNCH-418: Add a logging directory parameter for Crunch DOT plan files. Contributed by Allan Shoup.
Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/189febe6
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/189febe6
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/189febe6
Branch: refs/heads/master
Commit: 189febe62496e3e44386c3d095949a3041a6bae0
Parents: ac4a525
Author: Josh Wills <jw...@apache.org>
Authored: Thu Jun 19 19:39:13 2014 -0700
Committer: Josh Wills <jw...@apache.org>
Committed: Thu Jun 19 19:39:43 2014 -0700
----------------------------------------------------------------------
.../it/java/org/apache/crunch/MRPipelineIT.java | 29 +++++++++
.../org/apache/crunch/impl/mr/MRPipeline.java | 67 +++++++++++++++++++-
.../crunch/impl/mr/plan/PlanningParameters.java | 7 ++
3 files changed, 100 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/crunch/blob/189febe6/crunch-core/src/it/java/org/apache/crunch/MRPipelineIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/MRPipelineIT.java b/crunch-core/src/it/java/org/apache/crunch/MRPipelineIT.java
index 25c85c8..6af3f84 100644
--- a/crunch-core/src/it/java/org/apache/crunch/MRPipelineIT.java
+++ b/crunch-core/src/it/java/org/apache/crunch/MRPipelineIT.java
@@ -17,16 +17,22 @@
*/
package org.apache.crunch;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.File;
+import java.io.FileFilter;
import java.io.IOException;
import java.io.Serializable;
+import java.net.URLEncoder;
+import com.google.common.io.Files;
+import org.apache.commons.io.filefilter.SuffixFileFilter;
import org.apache.crunch.PipelineResult.StageResult;
import org.apache.crunch.fn.FilterFns;
import org.apache.crunch.fn.IdentityFn;
import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.impl.mr.plan.PlanningParameters;
import org.apache.crunch.io.To;
import org.apache.crunch.test.TemporaryPath;
import org.apache.crunch.test.TemporaryPaths;
@@ -79,5 +85,28 @@ public class MRPipelineIT implements Serializable {
assertTrue(new File(outputDirA, "part-r-00000").exists());
assertTrue(new File(outputDirB, "part-r-00000").exists());
}
+
+ @Test
+ public void testWritingOfDotfile() throws IOException {
+ File dotfileDir = Files.createTempDir();
+ Pipeline pipeline = new MRPipeline(MRPipelineIT.class, tmpDir.getDefaultConfiguration());
+ pipeline.getConfiguration().set(PlanningParameters.PIPELINE_DOTFILE_OUTPUT_DIR, dotfileDir.getAbsolutePath());
+
+ PCollection<String> lines = pipeline.readTextFile(tmpDir.copyResourceFileName("set1.txt"));
+ pipeline.write(
+ lines.parallelDo(IdentityFn.<String>getInstance(), Writables.strings()),
+ To.textFile(tmpDir.getFile("output").getAbsolutePath()));
+ pipeline.done();
+ File[] files = dotfileDir.listFiles((FileFilter)new SuffixFileFilter(".dot"));
+ assertEquals(1, files.length);
+ String fileName = files[0].getName();
+ String fileNamePrefix = URLEncoder.encode(pipeline.getName(), "UTF-8");
+ fileNamePrefix = (fileNamePrefix.length() < 150) ? fileNamePrefix : fileNamePrefix.substring(0, 150);
+ assertTrue("DOT file name '" + fileName + "' did not start with the pipeline name '" + fileNamePrefix + "'.",
+ fileName.startsWith(fileNamePrefix));
+
+ String regex = ".*_\\d{4}-\\d{2}-\\d{2}_\\d{2}\\.\\d{2}\\.\\d{2}\\.\\d{3}_jobplan\\.dot";
+ assertTrue("DOT file name '" + fileName + "' did not match regex '" + regex + "'.", fileName.matches(regex));
+ }
}
http://git-wip-us.apache.org/repos/asf/crunch/blob/189febe6/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java
index 01a3ead..6cfc6d0 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java
@@ -18,8 +18,16 @@
package org.apache.crunch.impl.mr;
import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URLEncoder;
+import java.text.SimpleDateFormat;
+import java.util.Date;
import java.util.Map;
+import com.google.common.base.Charsets;
+import com.google.common.collect.Maps;
+import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.crunch.CachingOptions;
@@ -32,11 +40,13 @@ import org.apache.crunch.impl.dist.collect.PCollectionImpl;
import org.apache.crunch.impl.mr.collect.MRCollectionFactory;
import org.apache.crunch.impl.mr.exec.MRExecutor;
import org.apache.crunch.impl.mr.plan.MSCRPlanner;
+import org.apache.crunch.impl.mr.plan.PlanningParameters;
import org.apache.crunch.io.ReadableSource;
import org.apache.crunch.materialize.MaterializableIterable;
import org.apache.hadoop.conf.Configuration;
-
-import com.google.common.collect.Maps;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
/**
* Pipeline implementation that is executed within Hadoop MapReduce.
@@ -120,7 +130,9 @@ public class MRPipeline extends DistributedPipeline {
@Override
public MRPipelineExecution runAsync() {
- MRPipelineExecution res = plan().execute();
+ MRExecutor mrExecutor = plan();
+ writePlanDotFile(mrExecutor.getPlanDotFile());
+ MRPipelineExecution res = mrExecutor.execute();
outputTargets.clear();
return res;
}
@@ -141,4 +153,53 @@ public class MRPipeline extends DistributedPipeline {
// Identical to materialization in a MapReduce context
materialize(pcollection);
}
+
+ /**
+ * Writes the MR job plan dot file contents to a timestamped file if the PIPELINE_DOTFILE_OUTPUT_DIR
+ * config key is set with an output directory.
+ *
+ * @param dotFileContents contents to be written to the dot file
+ */
+ private void writePlanDotFile(String dotFileContents) {
+ String dotFileDir = getConfiguration().get(PlanningParameters.PIPELINE_DOTFILE_OUTPUT_DIR);
+ if (dotFileDir != null) {
+ FSDataOutputStream outputStream = null;
+ Exception thrownException = null;
+ try {
+ URI uri = new URI(dotFileDir);
+ FileSystem fs = FileSystem.get(uri, getConfiguration());
+ SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd_HH.mm.ss.SSS");
+ String filenameSuffix = String.format("_%s_jobplan.dot", dateFormat.format(new Date()));
+ String encodedName = URLEncoder.encode(getName(), "UTF-8");
+ // We limit the pipeline name to the first 150 characters to keep the output dotfile length less
+ // than 200, as it's not clear what the exact limits are on the filesystem we're writing to (this
+ // might be HDFS or it might be a local filesystem)
+ final int maxPipeNameLength = 150;
+ String filenamePrefix = encodedName.substring(0, Math.min(maxPipeNameLength, encodedName.length()));
+ Path jobPlanPath = new Path(uri.getPath(), filenamePrefix + filenameSuffix);
+ LOG.info("Writing jobplan to " + jobPlanPath);
+ outputStream = fs.create(jobPlanPath, true);
+ outputStream.write(dotFileContents.getBytes(Charsets.UTF_8));
+ } catch (URISyntaxException e) {
+ thrownException = e;
+ throw new CrunchRuntimeException("Invalid dot file dir URI, job plan will not be written: " + dotFileDir, e);
+ } catch (IOException e) {
+ thrownException = e;
+ throw new CrunchRuntimeException("Error writing dotfile contents to " + dotFileDir, e);
+ } catch (RuntimeException e) {
+ thrownException = e;
+ throw e;
+ } finally {
+ if (outputStream != null) {
+ try {
+ outputStream.close();
+ } catch (IOException e) {
+ if (thrownException == null)
+ throw new CrunchRuntimeException("Error closing dotfile", e);
+ }
+ }
+ }
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/crunch/blob/189febe6/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/PlanningParameters.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/PlanningParameters.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/PlanningParameters.java
index cdfb46f..de89c48 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/PlanningParameters.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/PlanningParameters.java
@@ -33,6 +33,13 @@ public final class PlanningParameters {
*/
public static final String PIPELINE_PLAN_DOTFILE = "crunch.planner.dotfile";
+ /**
+ * Configuration key under which a directory URI can be stored where MapReduce pipeline job plans in
+ * <a href="http://www.graphviz.org">DOT</a> format are stored. The dot files are only written if this configuration
+ * parameter is set.
+ */
+ public static final String PIPELINE_DOTFILE_OUTPUT_DIR = "crunch.planner.dotfile.outputdir";
+
public static final String JOB_NAME_MAX_STACK_LENGTH = "crunch.job.name.max.stack.length";
private PlanningParameters() {