You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2013/12/12 07:43:06 UTC

git commit: CRUNCH-311: Add support for file renaming to AvroPathPerKeyTarget.

Updated Branches:
  refs/heads/master e5a360512 -> 677c26914


CRUNCH-311: Add support for file renaming to AvroPathPerKeyTarget.


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/677c2691
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/677c2691
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/677c2691

Branch: refs/heads/master
Commit: 677c269142d16aae8205cee52770aca47f638487
Parents: e5a3605
Author: Josh Wills <jw...@apache.org>
Authored: Wed Dec 11 21:01:32 2013 -0800
Committer: Josh Wills <jw...@apache.org>
Committed: Wed Dec 11 21:01:32 2013 -0800

----------------------------------------------------------------------
 .../crunch/io/avro/AvroPathPerKeyTarget.java    | 28 +++++++++++---------
 1 file changed, 16 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/677c2691/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroPathPerKeyTarget.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroPathPerKeyTarget.java b/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroPathPerKeyTarget.java
index c6be679..6befbad 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroPathPerKeyTarget.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroPathPerKeyTarget.java
@@ -19,6 +19,7 @@ package org.apache.crunch.io.avro;
 
 import org.apache.avro.mapred.AvroWrapper;
 import org.apache.crunch.impl.mr.plan.PlanningParameters;
+import org.apache.crunch.io.FileNamingScheme;
 import org.apache.crunch.io.FormatBundle;
 import org.apache.crunch.io.OutputHandler;
 import org.apache.crunch.io.SequentialFileNamingScheme;
@@ -51,7 +52,11 @@ public class AvroPathPerKeyTarget extends FileTargetImpl {
   }
 
   public AvroPathPerKeyTarget(Path path) {
-    super(path, AvroPathPerKeyOutputFormat.class, SequentialFileNamingScheme.getInstance());
+    this(path, SequentialFileNamingScheme.getInstance());
+  }
+
+  public AvroPathPerKeyTarget(Path path, FileNamingScheme fileNamingScheme) {
+    super(path, AvroPathPerKeyOutputFormat.class, fileNamingScheme);
   }
 
   @Override
@@ -83,20 +88,19 @@ public class AvroPathPerKeyTarget extends FileTargetImpl {
   @Override
   public void handleOutputs(Configuration conf, Path workingPath, int index) throws IOException {
     FileSystem srcFs = workingPath.getFileSystem(conf);
-    Path src = new Path(workingPath, PlanningParameters.MULTI_OUTPUT_PREFIX + index);
+    Path base = new Path(workingPath, PlanningParameters.MULTI_OUTPUT_PREFIX + index);
+    Path[] keys = FileUtil.stat2Paths(srcFs.listStatus(base), base);
     FileSystem dstFs = path.getFileSystem(conf);
-    boolean sameFs = isCompatible(srcFs, path);
     if (!dstFs.exists(path)) {
-      if (sameFs) {
-        srcFs.rename(src, path);
-      } else {
-        dstFs.mkdirs(path);
-        FileUtil.copy(srcFs, src, dstFs, path, true, true, conf);
-      }
-    } else {
-      Path[] srcs = FileUtil.stat2Paths(srcFs.listStatus(src));
+      dstFs.mkdirs(path);
+    }
+    boolean sameFs = isCompatible(srcFs, path);
+    for (Path key : keys) {
+      Path[] srcs = FileUtil.stat2Paths(srcFs.listStatus(key), key);
+      Path targetPath = new Path(path, key.getName());
+      dstFs.mkdirs(targetPath);
       for (Path s : srcs) {
-        Path d = new Path(path, s.getName());
+        Path d = getDestFile(conf, s, targetPath, s.getName().contains("-m-"));
         if (sameFs) {
           srcFs.rename(s, d);
         } else {