You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2013/05/01 03:44:45 UTC
git commit: CRUNCH-200: Refactor the logic for handling output file
relocation out of CrunchJobHooks and into PathTarget implementations
Updated Branches:
refs/heads/master f1e877d4f -> 48cf308c8
CRUNCH-200: Refactor the logic for handling output file relocation out of CrunchJobHooks and into PathTarget implementations
Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/48cf308c
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/48cf308c
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/48cf308c
Branch: refs/heads/master
Commit: 48cf308c82ad2e3adb95083a0d10c90e70503fd5
Parents: f1e877d
Author: Josh Wills <jw...@apache.org>
Authored: Sun Apr 28 23:27:44 2013 -0700
Committer: Josh Wills <jw...@apache.org>
Committed: Tue Apr 30 18:40:03 2013 -0700
----------------------------------------------------------------------
.../apache/crunch/impl/mr/exec/CrunchJobHooks.java | 63 +-------------
.../main/java/org/apache/crunch/io/PathTarget.java | 15 +++
.../java/org/apache/crunch/io/PathTargetImpl.java | 64 --------------
.../org/apache/crunch/io/impl/FileTargetImpl.java | 67 ++++++++++++++-
.../crunch/io/impl/SourcePathTargetImpl.java | 9 ++
.../crunch/impl/mr/exec/CrunchJobHooksTest.java | 42 ---------
.../apache/crunch/io/impl/CrunchJobHooksTest.java | 43 +++++++++
7 files changed, 134 insertions(+), 169 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/crunch/blob/48cf308c/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/CrunchJobHooks.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/CrunchJobHooks.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/CrunchJobHooks.java
index 74bc9ac..b06847b 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/CrunchJobHooks.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/CrunchJobHooks.java
@@ -19,17 +19,12 @@ package org.apache.crunch.impl.mr.exec;
import java.io.IOException;
import java.util.Map;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
import org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchControlledJob;
-import org.apache.crunch.impl.mr.plan.PlanningParameters;
import org.apache.crunch.impl.mr.run.RuntimeParameters;
-import org.apache.crunch.io.FileNamingScheme;
import org.apache.crunch.io.PathTarget;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
@@ -89,65 +84,9 @@ public final class CrunchJobHooks {
// job to the output locations specified in the paths.
FileSystem srcFs = workingPath.getFileSystem(job.getConfiguration());
for (Map.Entry<Integer, PathTarget> entry : multiPaths.entrySet()) {
- final int i = entry.getKey();
- final Path dst = entry.getValue().getPath();
- FileNamingScheme fileNamingScheme = entry.getValue().getFileNamingScheme();
-
- Path src = new Path(workingPath, PlanningParameters.MULTI_OUTPUT_PREFIX + i + "-*");
- Path[] srcs = FileUtil.stat2Paths(srcFs.globStatus(src), src);
- Configuration conf = job.getConfiguration();
- FileSystem dstFs = dst.getFileSystem(conf);
- if (!dstFs.exists(dst)) {
- dstFs.mkdirs(dst);
- }
- boolean sameFs = isCompatible(srcFs, dst);
- for (Path s : srcs) {
- Path d = getDestFile(conf, s, dst, fileNamingScheme);
- if (sameFs) {
- srcFs.rename(s, d);
- } else {
- FileUtil.copy(srcFs, s, dstFs, d, true, true, job.getConfiguration());
- }
- }
+ entry.getValue().handleOutputs(job.getConfiguration(), workingPath, entry.getKey(), mapOnlyJob);
}
}
}
-
- private boolean isCompatible(FileSystem fs, Path path) {
- try {
- fs.makeQualified(path);
- return true;
- } catch (IllegalArgumentException e) {
- return false;
- }
- }
- private Path getDestFile(Configuration conf, Path src, Path dir, FileNamingScheme fileNamingScheme)
- throws IOException {
- String outputFilename = null;
- if (mapOnlyJob) {
- outputFilename = fileNamingScheme.getMapOutputName(conf, dir);
- } else {
- outputFilename = fileNamingScheme.getReduceOutputName(conf, dir, extractPartitionNumber(src.getName()));
- }
- if (src.getName().endsWith(org.apache.avro.mapred.AvroOutputFormat.EXT)) {
- outputFilename += org.apache.avro.mapred.AvroOutputFormat.EXT;
- }
- return new Path(dir, outputFilename);
- }
- }
-
- /**
- * Extract the partition number from a raw reducer output filename.
- *
- * @param reduceOutputFileName The raw reducer output file name
- * @return The partition number encoded in the filename
- */
- static int extractPartitionNumber(String reduceOutputFileName) {
- Matcher matcher = Pattern.compile(".*-r-(\\d{5})").matcher(reduceOutputFileName);
- if (matcher.find()) {
- return Integer.parseInt(matcher.group(1), 10);
- } else {
- throw new IllegalArgumentException("Reducer output name '" + reduceOutputFileName + "' cannot be parsed");
- }
}
}
http://git-wip-us.apache.org/repos/asf/crunch/blob/48cf308c/crunch-core/src/main/java/org/apache/crunch/io/PathTarget.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/PathTarget.java b/crunch-core/src/main/java/org/apache/crunch/io/PathTarget.java
index 7a35209..4f7949f 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/PathTarget.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/PathTarget.java
@@ -17,6 +17,9 @@
*/
package org.apache.crunch.io;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
/**
@@ -33,4 +36,16 @@ public interface PathTarget extends MapReduceTarget {
* @return the naming scheme to be used
*/
FileNamingScheme getFileNamingScheme();
+
+ /**
+ * Handles moving the output data for this target from a temporary location on the
+ * filesystem to its target path at the end of a MapReduce job.
+ *
+ * @param conf The job {@code Configuration}
+ * @param workingPath The temp directory that contains the output of the job
+ * @param index The index of this target for jobs that write multiple output files to a single directory
+ * @param mapOnlyJob Whether or not this is a map-only job
+ * @throws IOException
+ */
+ void handleOutputs(Configuration conf, Path workingPath, int index, boolean mapOnlyJob) throws IOException;
}
http://git-wip-us.apache.org/repos/asf/crunch/blob/48cf308c/crunch-core/src/main/java/org/apache/crunch/io/PathTargetImpl.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/PathTargetImpl.java b/crunch-core/src/main/java/org/apache/crunch/io/PathTargetImpl.java
deleted file mode 100644
index 0be3f9a..0000000
--- a/crunch-core/src/main/java/org/apache/crunch/io/PathTargetImpl.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.io;
-
-import org.apache.crunch.types.PType;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-
-public abstract class PathTargetImpl implements PathTarget {
-
- private final Path path;
- private final Class<OutputFormat> outputFormatClass;
- private final Class keyClass;
- private final Class valueClass;
-
- public PathTargetImpl(String path, Class<OutputFormat> outputFormatClass, Class keyClass, Class valueClass) {
- this(new Path(path), outputFormatClass, keyClass, valueClass);
- }
-
- public PathTargetImpl(Path path, Class<OutputFormat> outputFormatClass, Class keyClass, Class valueClass) {
- this.path = path;
- this.outputFormatClass = outputFormatClass;
- this.keyClass = keyClass;
- this.valueClass = valueClass;
- }
-
- @Override
- public void configureForMapReduce(Job job, PType<?> ptype, Path outputPath, String name) {
- try {
- FileOutputFormat.setOutputPath(job, path);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- if (name == null) {
- job.setOutputFormatClass(outputFormatClass);
- job.setOutputKeyClass(keyClass);
- job.setOutputValueClass(valueClass);
- } else {
- CrunchOutputs.addNamedOutput(job, name, outputFormatClass, keyClass, valueClass);
- }
- }
-
- @Override
- public Path getPath() {
- return path;
- }
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/48cf308c/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java b/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java
index c1c29e4..5ceb3be 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java
@@ -18,12 +18,15 @@
package org.apache.crunch.io.impl;
import java.io.IOException;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
import org.apache.commons.lang.builder.HashCodeBuilder;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.crunch.CrunchRuntimeException;
import org.apache.crunch.SourceTarget;
+import org.apache.crunch.impl.mr.plan.PlanningParameters;
import org.apache.crunch.io.CrunchOutputs;
import org.apache.crunch.io.FileNamingScheme;
import org.apache.crunch.io.OutputHandler;
@@ -31,8 +34,8 @@ import org.apache.crunch.io.PathTarget;
import org.apache.crunch.types.Converter;
import org.apache.crunch.types.PType;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
@@ -82,11 +85,73 @@ public class FileTargetImpl implements PathTarget {
return true;
}
+ public void handleOutputs(Configuration conf, Path workingPath, int index,
+ boolean mapOnlyJob) throws IOException {
+ FileSystem srcFs = workingPath.getFileSystem(conf);
+ Path src = getSourcePattern(workingPath, index);
+ Path[] srcs = FileUtil.stat2Paths(srcFs.globStatus(src), src);
+ FileSystem dstFs = FileSystem.get(conf);
+ if (!dstFs.exists(path)) {
+ dstFs.mkdirs(path);
+ }
+ boolean sameFs = isCompatible(srcFs, path);
+ for (Path s : srcs) {
+ Path d = getDestFile(conf, s, path, mapOnlyJob);
+ if (sameFs) {
+ srcFs.rename(s, d);
+ } else {
+ FileUtil.copy(srcFs, s, dstFs, d, true, true, conf);
+ }
+ }
+ }
+
+ protected Path getSourcePattern(Path workingPath, int index) {
+ return new Path(workingPath, PlanningParameters.MULTI_OUTPUT_PREFIX + index + "-*");
+ }
+
@Override
public Path getPath() {
return path;
}
+
+ protected static boolean isCompatible(FileSystem fs, Path path) {
+ try {
+ fs.makeQualified(path);
+ return true;
+ } catch (IllegalArgumentException e) {
+ return false;
+ }
+ }
+ protected Path getDestFile(Configuration conf, Path src, Path dir, boolean mapOnlyJob)
+ throws IOException {
+ String outputFilename = null;
+ if (mapOnlyJob) {
+ outputFilename = getFileNamingScheme().getMapOutputName(conf, dir);
+ } else {
+ outputFilename = getFileNamingScheme().getReduceOutputName(conf, dir, extractPartitionNumber(src.getName()));
+ }
+ if (src.getName().endsWith(org.apache.avro.mapred.AvroOutputFormat.EXT)) {
+ outputFilename += org.apache.avro.mapred.AvroOutputFormat.EXT;
+ }
+ return new Path(dir, outputFilename);
+ }
+
+ /**
+ * Extract the partition number from a raw reducer output filename.
+ *
+ * @param reduceOutputFileName The raw reducer output file name
+ * @return The partition number encoded in the filename
+ */
+ public static int extractPartitionNumber(String reduceOutputFileName) {
+ Matcher matcher = Pattern.compile(".*-r-(\\d{5})").matcher(reduceOutputFileName);
+ if (matcher.find()) {
+ return Integer.parseInt(matcher.group(1), 10);
+ } else {
+ throw new IllegalArgumentException("Reducer output name '" + reduceOutputFileName + "' cannot be parsed");
+ }
+ }
+
@Override
public FileNamingScheme getFileNamingScheme() {
return fileNamingScheme;
http://git-wip-us.apache.org/repos/asf/crunch/blob/48cf308c/crunch-core/src/main/java/org/apache/crunch/io/impl/SourcePathTargetImpl.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/impl/SourcePathTargetImpl.java b/crunch-core/src/main/java/org/apache/crunch/io/impl/SourcePathTargetImpl.java
index c0d7ce0..a90bb7b 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/impl/SourcePathTargetImpl.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/impl/SourcePathTargetImpl.java
@@ -17,10 +17,13 @@
*/
package org.apache.crunch.io.impl;
+import java.io.IOException;
+
import org.apache.crunch.Source;
import org.apache.crunch.io.FileNamingScheme;
import org.apache.crunch.io.PathTarget;
import org.apache.crunch.types.PType;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
@@ -47,4 +50,10 @@ public class SourcePathTargetImpl<T> extends SourceTargetImpl<T> implements Path
public FileNamingScheme getFileNamingScheme() {
return fileNamingScheme;
}
+
+ @Override
+ public void handleOutputs(Configuration conf, Path workingPath, int index, boolean mapOnlyJob)
+ throws IOException {
+ ((PathTarget) target).handleOutputs(conf, workingPath, index, mapOnlyJob);
+ }
}
http://git-wip-us.apache.org/repos/asf/crunch/blob/48cf308c/crunch-core/src/test/java/org/apache/crunch/impl/mr/exec/CrunchJobHooksTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/impl/mr/exec/CrunchJobHooksTest.java b/crunch-core/src/test/java/org/apache/crunch/impl/mr/exec/CrunchJobHooksTest.java
deleted file mode 100644
index f03c3e2..0000000
--- a/crunch-core/src/test/java/org/apache/crunch/impl/mr/exec/CrunchJobHooksTest.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.impl.mr.exec;
-
-import static org.junit.Assert.assertEquals;
-
-import org.junit.Test;
-
-public class CrunchJobHooksTest {
-
- @Test
- public void testExtractPartitionNumber() {
- assertEquals(0, CrunchJobHooks.extractPartitionNumber("out1-r-00000"));
- assertEquals(10, CrunchJobHooks.extractPartitionNumber("out2-r-00010"));
- assertEquals(99999, CrunchJobHooks.extractPartitionNumber("out3-r-99999"));
- }
-
- @Test
- public void testExtractPartitionNumber_WithSuffix() {
- assertEquals(10, CrunchJobHooks.extractPartitionNumber("out2-r-00010.avro"));
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void testExtractPartitionNumber_MapOutputFile() {
- CrunchJobHooks.extractPartitionNumber("out1-m-00000");
- }
-}
http://git-wip-us.apache.org/repos/asf/crunch/blob/48cf308c/crunch-core/src/test/java/org/apache/crunch/io/impl/CrunchJobHooksTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/io/impl/CrunchJobHooksTest.java b/crunch-core/src/test/java/org/apache/crunch/io/impl/CrunchJobHooksTest.java
new file mode 100644
index 0000000..705ed10
--- /dev/null
+++ b/crunch-core/src/test/java/org/apache/crunch/io/impl/CrunchJobHooksTest.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.impl;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.crunch.io.impl.FileTargetImpl;
+import org.junit.Test;
+
+public class CrunchJobHooksTest {
+
+ @Test
+ public void testExtractPartitionNumber() {
+ assertEquals(0, FileTargetImpl.extractPartitionNumber("out1-r-00000"));
+ assertEquals(10, FileTargetImpl.extractPartitionNumber("out2-r-00010"));
+ assertEquals(99999, FileTargetImpl.extractPartitionNumber("out3-r-99999"));
+ }
+
+ @Test
+ public void testExtractPartitionNumber_WithSuffix() {
+ assertEquals(10, FileTargetImpl.extractPartitionNumber("out2-r-00010.avro"));
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testExtractPartitionNumber_MapOutputFile() {
+ FileTargetImpl.extractPartitionNumber("out1-m-00000");
+ }
+}