You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2013/05/01 03:44:45 UTC

git commit: CRUNCH-200: Refactor the logic for handling output file relocation out of CrunchJobHooks and into PathTarget implementations

Updated Branches:
  refs/heads/master f1e877d4f -> 48cf308c8


CRUNCH-200: Refactor the logic for handling output file relocation out of CrunchJobHooks and into PathTarget implementations


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/48cf308c
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/48cf308c
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/48cf308c

Branch: refs/heads/master
Commit: 48cf308c82ad2e3adb95083a0d10c90e70503fd5
Parents: f1e877d
Author: Josh Wills <jw...@apache.org>
Authored: Sun Apr 28 23:27:44 2013 -0700
Committer: Josh Wills <jw...@apache.org>
Committed: Tue Apr 30 18:40:03 2013 -0700

----------------------------------------------------------------------
 .../apache/crunch/impl/mr/exec/CrunchJobHooks.java |   63 +-------------
 .../main/java/org/apache/crunch/io/PathTarget.java |   15 +++
 .../java/org/apache/crunch/io/PathTargetImpl.java  |   64 --------------
 .../org/apache/crunch/io/impl/FileTargetImpl.java  |   67 ++++++++++++++-
 .../crunch/io/impl/SourcePathTargetImpl.java       |    9 ++
 .../crunch/impl/mr/exec/CrunchJobHooksTest.java    |   42 ---------
 .../apache/crunch/io/impl/CrunchJobHooksTest.java  |   43 +++++++++
 7 files changed, 134 insertions(+), 169 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/48cf308c/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/CrunchJobHooks.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/CrunchJobHooks.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/CrunchJobHooks.java
index 74bc9ac..b06847b 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/CrunchJobHooks.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/CrunchJobHooks.java
@@ -19,17 +19,12 @@ package org.apache.crunch.impl.mr.exec;
 
 import java.io.IOException;
 import java.util.Map;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
 
 import org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchControlledJob;
-import org.apache.crunch.impl.mr.plan.PlanningParameters;
 import org.apache.crunch.impl.mr.run.RuntimeParameters;
-import org.apache.crunch.io.FileNamingScheme;
 import org.apache.crunch.io.PathTarget;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
@@ -89,65 +84,9 @@ public final class CrunchJobHooks {
         // job to the output locations specified in the paths.
         FileSystem srcFs = workingPath.getFileSystem(job.getConfiguration());
         for (Map.Entry<Integer, PathTarget> entry : multiPaths.entrySet()) {
-          final int i = entry.getKey();
-          final Path dst = entry.getValue().getPath();
-          FileNamingScheme fileNamingScheme = entry.getValue().getFileNamingScheme();
-
-          Path src = new Path(workingPath, PlanningParameters.MULTI_OUTPUT_PREFIX + i + "-*");
-          Path[] srcs = FileUtil.stat2Paths(srcFs.globStatus(src), src);
-          Configuration conf = job.getConfiguration();
-          FileSystem dstFs = dst.getFileSystem(conf);
-          if (!dstFs.exists(dst)) {
-            dstFs.mkdirs(dst);
-          }
-          boolean sameFs = isCompatible(srcFs, dst);
-          for (Path s : srcs) {
-            Path d = getDestFile(conf, s, dst, fileNamingScheme);
-            if (sameFs) {
-              srcFs.rename(s, d);
-            } else {
-              FileUtil.copy(srcFs, s, dstFs, d, true, true, job.getConfiguration());
-            }
-          }
+          entry.getValue().handleOutputs(job.getConfiguration(), workingPath, entry.getKey(), mapOnlyJob);
         }
       }
     }
-
-    private boolean isCompatible(FileSystem fs, Path path) {
-      try {
-        fs.makeQualified(path);
-        return true;
-      } catch (IllegalArgumentException e) {
-        return false;
-      }
-    }
-    private Path getDestFile(Configuration conf, Path src, Path dir, FileNamingScheme fileNamingScheme)
-        throws IOException {
-      String outputFilename = null;
-      if (mapOnlyJob) {
-        outputFilename = fileNamingScheme.getMapOutputName(conf, dir);
-      } else {
-        outputFilename = fileNamingScheme.getReduceOutputName(conf, dir, extractPartitionNumber(src.getName()));
-      }
-      if (src.getName().endsWith(org.apache.avro.mapred.AvroOutputFormat.EXT)) {
-        outputFilename += org.apache.avro.mapred.AvroOutputFormat.EXT;
-      }
-      return new Path(dir, outputFilename);
-    }
-  }
-
-  /**
-   * Extract the partition number from a raw reducer output filename.
-   *
-   * @param reduceOutputFileName The raw reducer output file name
-   * @return The partition number encoded in the filename
-   */
-  static int extractPartitionNumber(String reduceOutputFileName) {
-    Matcher matcher = Pattern.compile(".*-r-(\\d{5})").matcher(reduceOutputFileName);
-    if (matcher.find()) {
-      return Integer.parseInt(matcher.group(1), 10);
-    } else {
-      throw new IllegalArgumentException("Reducer output name '" + reduceOutputFileName + "' cannot be parsed");
-    }
   }
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/48cf308c/crunch-core/src/main/java/org/apache/crunch/io/PathTarget.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/PathTarget.java b/crunch-core/src/main/java/org/apache/crunch/io/PathTarget.java
index 7a35209..4f7949f 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/PathTarget.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/PathTarget.java
@@ -17,6 +17,9 @@
  */
 package org.apache.crunch.io;
 
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 
 /**
@@ -33,4 +36,16 @@ public interface PathTarget extends MapReduceTarget {
    * @return the naming scheme to be used
    */
   FileNamingScheme getFileNamingScheme();
+  
+  /**
+   * Handles moving the output data for this target from a temporary location on the
+   * filesystem to its target path at the end of a MapReduce job.
+   * 
+   * @param conf The job {@code Configuration}
+   * @param workingPath The temp directory that contains the output of the job
+   * @param index The index of this target for jobs that write multiple output files to a single directory
+   * @param mapOnlyJob Whether or not this is a map-only job
+   * @throws IOException
+   */
+  void handleOutputs(Configuration conf, Path workingPath, int index, boolean mapOnlyJob) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/48cf308c/crunch-core/src/main/java/org/apache/crunch/io/PathTargetImpl.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/PathTargetImpl.java b/crunch-core/src/main/java/org/apache/crunch/io/PathTargetImpl.java
deleted file mode 100644
index 0be3f9a..0000000
--- a/crunch-core/src/main/java/org/apache/crunch/io/PathTargetImpl.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.io;
-
-import org.apache.crunch.types.PType;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-
-public abstract class PathTargetImpl implements PathTarget {
-
-  private final Path path;
-  private final Class<OutputFormat> outputFormatClass;
-  private final Class keyClass;
-  private final Class valueClass;
-
-  public PathTargetImpl(String path, Class<OutputFormat> outputFormatClass, Class keyClass, Class valueClass) {
-    this(new Path(path), outputFormatClass, keyClass, valueClass);
-  }
-
-  public PathTargetImpl(Path path, Class<OutputFormat> outputFormatClass, Class keyClass, Class valueClass) {
-    this.path = path;
-    this.outputFormatClass = outputFormatClass;
-    this.keyClass = keyClass;
-    this.valueClass = valueClass;
-  }
-
-  @Override
-  public void configureForMapReduce(Job job, PType<?> ptype, Path outputPath, String name) {
-    try {
-      FileOutputFormat.setOutputPath(job, path);
-    } catch (Exception e) {
-      throw new RuntimeException(e);
-    }
-    if (name == null) {
-      job.setOutputFormatClass(outputFormatClass);
-      job.setOutputKeyClass(keyClass);
-      job.setOutputValueClass(valueClass);
-    } else {
-      CrunchOutputs.addNamedOutput(job, name, outputFormatClass, keyClass, valueClass);
-    }
-  }
-
-  @Override
-  public Path getPath() {
-    return path;
-  }
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/48cf308c/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java b/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java
index c1c29e4..5ceb3be 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java
@@ -18,12 +18,15 @@
 package org.apache.crunch.io.impl;
 
 import java.io.IOException;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 import org.apache.commons.lang.builder.HashCodeBuilder;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.crunch.CrunchRuntimeException;
 import org.apache.crunch.SourceTarget;
+import org.apache.crunch.impl.mr.plan.PlanningParameters;
 import org.apache.crunch.io.CrunchOutputs;
 import org.apache.crunch.io.FileNamingScheme;
 import org.apache.crunch.io.OutputHandler;
@@ -31,8 +34,8 @@ import org.apache.crunch.io.PathTarget;
 import org.apache.crunch.types.Converter;
 import org.apache.crunch.types.PType;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
@@ -82,11 +85,73 @@ public class FileTargetImpl implements PathTarget {
     return true;
   }
 
+  public void handleOutputs(Configuration conf, Path workingPath, int index,
+      boolean mapOnlyJob) throws IOException {
+    FileSystem srcFs = workingPath.getFileSystem(conf);
+    Path src = getSourcePattern(workingPath, index);
+    Path[] srcs = FileUtil.stat2Paths(srcFs.globStatus(src), src);
+    FileSystem dstFs = FileSystem.get(conf);
+    if (!dstFs.exists(path)) {
+      dstFs.mkdirs(path);
+    }
+    boolean sameFs = isCompatible(srcFs, path);
+    for (Path s : srcs) {
+      Path d = getDestFile(conf, s, path, mapOnlyJob);
+      if (sameFs) {
+        srcFs.rename(s, d);
+      } else {
+        FileUtil.copy(srcFs, s, dstFs, d, true, true, conf);
+      }
+    }
+  }
+  
+  protected Path getSourcePattern(Path workingPath, int index) {
+    return new Path(workingPath, PlanningParameters.MULTI_OUTPUT_PREFIX + index + "-*");
+  }
+  
   @Override
   public Path getPath() {
     return path;
   }
+  
+  protected static boolean isCompatible(FileSystem fs, Path path) {
+    try {
+      fs.makeQualified(path);
+      return true;
+    } catch (IllegalArgumentException e) {
+      return false;
+    }
+  }
 
+  protected Path getDestFile(Configuration conf, Path src, Path dir, boolean mapOnlyJob)
+      throws IOException {
+    String outputFilename = null;
+    if (mapOnlyJob) {
+      outputFilename = getFileNamingScheme().getMapOutputName(conf, dir);
+    } else {
+      outputFilename = getFileNamingScheme().getReduceOutputName(conf, dir, extractPartitionNumber(src.getName()));
+    }
+    if (src.getName().endsWith(org.apache.avro.mapred.AvroOutputFormat.EXT)) {
+      outputFilename += org.apache.avro.mapred.AvroOutputFormat.EXT;
+    }
+    return new Path(dir, outputFilename);
+  }
+  
+  /**
+   * Extract the partition number from a raw reducer output filename.
+   *
+   * @param reduceOutputFileName The raw reducer output file name
+   * @return The partition number encoded in the filename
+   */
+  public static int extractPartitionNumber(String reduceOutputFileName) {
+    Matcher matcher = Pattern.compile(".*-r-(\\d{5})").matcher(reduceOutputFileName);
+    if (matcher.find()) {
+      return Integer.parseInt(matcher.group(1), 10);
+    } else {
+      throw new IllegalArgumentException("Reducer output name '" + reduceOutputFileName + "' cannot be parsed");
+    }
+  }
+  
   @Override
   public FileNamingScheme getFileNamingScheme() {
     return fileNamingScheme;

http://git-wip-us.apache.org/repos/asf/crunch/blob/48cf308c/crunch-core/src/main/java/org/apache/crunch/io/impl/SourcePathTargetImpl.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/impl/SourcePathTargetImpl.java b/crunch-core/src/main/java/org/apache/crunch/io/impl/SourcePathTargetImpl.java
index c0d7ce0..a90bb7b 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/impl/SourcePathTargetImpl.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/impl/SourcePathTargetImpl.java
@@ -17,10 +17,13 @@
  */
 package org.apache.crunch.io.impl;
 
+import java.io.IOException;
+
 import org.apache.crunch.Source;
 import org.apache.crunch.io.FileNamingScheme;
 import org.apache.crunch.io.PathTarget;
 import org.apache.crunch.types.PType;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.Job;
 
@@ -47,4 +50,10 @@ public class SourcePathTargetImpl<T> extends SourceTargetImpl<T> implements Path
   public FileNamingScheme getFileNamingScheme() {
     return fileNamingScheme;
   }
+
+  @Override
+  public void handleOutputs(Configuration conf, Path workingPath, int index, boolean mapOnlyJob)
+      throws IOException {
+    ((PathTarget) target).handleOutputs(conf, workingPath, index, mapOnlyJob);
+  }
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/48cf308c/crunch-core/src/test/java/org/apache/crunch/impl/mr/exec/CrunchJobHooksTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/impl/mr/exec/CrunchJobHooksTest.java b/crunch-core/src/test/java/org/apache/crunch/impl/mr/exec/CrunchJobHooksTest.java
deleted file mode 100644
index f03c3e2..0000000
--- a/crunch-core/src/test/java/org/apache/crunch/impl/mr/exec/CrunchJobHooksTest.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.impl.mr.exec;
-
-import static org.junit.Assert.assertEquals;
-
-import org.junit.Test;
-
-public class CrunchJobHooksTest {
-
-  @Test
-  public void testExtractPartitionNumber() {
-    assertEquals(0, CrunchJobHooks.extractPartitionNumber("out1-r-00000"));
-    assertEquals(10, CrunchJobHooks.extractPartitionNumber("out2-r-00010"));
-    assertEquals(99999, CrunchJobHooks.extractPartitionNumber("out3-r-99999"));
-  }
-
-  @Test
-  public void testExtractPartitionNumber_WithSuffix() {
-    assertEquals(10, CrunchJobHooks.extractPartitionNumber("out2-r-00010.avro"));
-  }
-
-  @Test(expected = IllegalArgumentException.class)
-  public void testExtractPartitionNumber_MapOutputFile() {
-    CrunchJobHooks.extractPartitionNumber("out1-m-00000");
-  }
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/48cf308c/crunch-core/src/test/java/org/apache/crunch/io/impl/CrunchJobHooksTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/io/impl/CrunchJobHooksTest.java b/crunch-core/src/test/java/org/apache/crunch/io/impl/CrunchJobHooksTest.java
new file mode 100644
index 0000000..705ed10
--- /dev/null
+++ b/crunch-core/src/test/java/org/apache/crunch/io/impl/CrunchJobHooksTest.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.impl;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.crunch.io.impl.FileTargetImpl;
+import org.junit.Test;
+
+public class CrunchJobHooksTest {
+
+  @Test
+  public void testExtractPartitionNumber() {
+    assertEquals(0, FileTargetImpl.extractPartitionNumber("out1-r-00000"));
+    assertEquals(10, FileTargetImpl.extractPartitionNumber("out2-r-00010"));
+    assertEquals(99999, FileTargetImpl.extractPartitionNumber("out3-r-99999"));
+  }
+
+  @Test
+  public void testExtractPartitionNumber_WithSuffix() {
+    assertEquals(10, FileTargetImpl.extractPartitionNumber("out2-r-00010.avro"));
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testExtractPartitionNumber_MapOutputFile() {
+    FileTargetImpl.extractPartitionNumber("out1-m-00000");
+  }
+}