You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2013/12/12 07:43:06 UTC
git commit: CRUNCH-311: Add support for file renaming to
AvroPathPerKeyTarget.
Updated Branches:
refs/heads/master e5a360512 -> 677c26914
CRUNCH-311: Add support for file renaming to AvroPathPerKeyTarget.
Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/677c2691
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/677c2691
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/677c2691
Branch: refs/heads/master
Commit: 677c269142d16aae8205cee52770aca47f638487
Parents: e5a3605
Author: Josh Wills <jw...@apache.org>
Authored: Wed Dec 11 21:01:32 2013 -0800
Committer: Josh Wills <jw...@apache.org>
Committed: Wed Dec 11 21:01:32 2013 -0800
----------------------------------------------------------------------
.../crunch/io/avro/AvroPathPerKeyTarget.java | 28 +++++++++++---------
1 file changed, 16 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/crunch/blob/677c2691/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroPathPerKeyTarget.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroPathPerKeyTarget.java b/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroPathPerKeyTarget.java
index c6be679..6befbad 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroPathPerKeyTarget.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroPathPerKeyTarget.java
@@ -19,6 +19,7 @@ package org.apache.crunch.io.avro;
import org.apache.avro.mapred.AvroWrapper;
import org.apache.crunch.impl.mr.plan.PlanningParameters;
+import org.apache.crunch.io.FileNamingScheme;
import org.apache.crunch.io.FormatBundle;
import org.apache.crunch.io.OutputHandler;
import org.apache.crunch.io.SequentialFileNamingScheme;
@@ -51,7 +52,11 @@ public class AvroPathPerKeyTarget extends FileTargetImpl {
}
public AvroPathPerKeyTarget(Path path) {
- super(path, AvroPathPerKeyOutputFormat.class, SequentialFileNamingScheme.getInstance());
+ this(path, SequentialFileNamingScheme.getInstance());
+ }
+
+ public AvroPathPerKeyTarget(Path path, FileNamingScheme fileNamingScheme) {
+ super(path, AvroPathPerKeyOutputFormat.class, fileNamingScheme);
}
@Override
@@ -83,20 +88,19 @@ public class AvroPathPerKeyTarget extends FileTargetImpl {
@Override
public void handleOutputs(Configuration conf, Path workingPath, int index) throws IOException {
FileSystem srcFs = workingPath.getFileSystem(conf);
- Path src = new Path(workingPath, PlanningParameters.MULTI_OUTPUT_PREFIX + index);
+ Path base = new Path(workingPath, PlanningParameters.MULTI_OUTPUT_PREFIX + index);
+ Path[] keys = FileUtil.stat2Paths(srcFs.listStatus(base), base);
FileSystem dstFs = path.getFileSystem(conf);
- boolean sameFs = isCompatible(srcFs, path);
if (!dstFs.exists(path)) {
- if (sameFs) {
- srcFs.rename(src, path);
- } else {
- dstFs.mkdirs(path);
- FileUtil.copy(srcFs, src, dstFs, path, true, true, conf);
- }
- } else {
- Path[] srcs = FileUtil.stat2Paths(srcFs.listStatus(src));
+ dstFs.mkdirs(path);
+ }
+ boolean sameFs = isCompatible(srcFs, path);
+ for (Path key : keys) {
+ Path[] srcs = FileUtil.stat2Paths(srcFs.listStatus(key), key);
+ Path targetPath = new Path(path, key.getName());
+ dstFs.mkdirs(targetPath);
for (Path s : srcs) {
- Path d = new Path(path, s.getName());
+ Path d = getDestFile(conf, s, targetPath, s.getName().contains("-m-"));
if (sameFs) {
srcFs.rename(s, d);
} else {