You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by gr...@apache.org on 2014/03/31 18:37:01 UTC

git commit: CRUNCH-371 Empty output in AvroPathPerKeyTarget

Repository: crunch
Updated Branches:
  refs/heads/master d4917a217 -> ada7e3a64


CRUNCH-371 Empty output in AvroPathPerKeyTarget

Correction handle an empty output in AvroPathPerKeyTarget, i.e.
do not attempt to copy/create any output paths.


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/ada7e3a6
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/ada7e3a6
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/ada7e3a6

Branch: refs/heads/master
Commit: ada7e3a6470447dc0b90884a3d59da50b52c5315
Parents: d4917a2
Author: Gabriel Reid <gr...@apache.org>
Authored: Sat Mar 29 13:06:52 2014 +0100
Committer: Gabriel Reid <gr...@apache.org>
Committed: Mon Mar 31 18:34:53 2014 +0200

----------------------------------------------------------------------
 .../apache/crunch/io/avro/AvroPathPerKeyIT.java | 25 ++++++++++++++++++++
 .../crunch/io/avro/AvroPathPerKeyTarget.java    | 12 ++++++++--
 2 files changed, 35 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/ada7e3a6/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroPathPerKeyIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroPathPerKeyIT.java b/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroPathPerKeyIT.java
index 7b30a60..c1f7fa6 100644
--- a/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroPathPerKeyIT.java
+++ b/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroPathPerKeyIT.java
@@ -22,6 +22,7 @@ import com.google.common.collect.Sets;
 import org.apache.crunch.MapFn;
 import org.apache.crunch.Pair;
 import org.apache.crunch.Pipeline;
+import org.apache.crunch.fn.FilterFns;
 import org.apache.crunch.impl.mr.MRPipeline;
 import org.apache.crunch.io.From;
 import org.apache.crunch.test.CrunchTestSupport;
@@ -35,6 +36,7 @@ import java.io.Serializable;
 import java.util.Set;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 
 public class AvroPathPerKeyIT extends CrunchTestSupport implements Serializable {
   @Test
@@ -68,4 +70,27 @@ public class AvroPathPerKeyIT extends CrunchTestSupport implements Serializable
     assertEquals(1, bStat.length);
     assertEquals("part-r-00000.avro", bStat[0].getPath().getName());
   }
+
+  @Test
+  public void testOutputFilePerKey_NothingToOutput() throws Exception {
+    Pipeline p = new MRPipeline(AvroPathPerKeyIT.class, tempDir.getDefaultConfiguration());
+    Path outDir = tempDir.getPath("out");
+
+    p.read(From.textFile(tempDir.copyResourceFileName("docs.txt")))
+        .parallelDo(new MapFn<String, Pair<String, String>>() {
+          @Override
+          public Pair<String, String> map(String input) {
+            String[] p = input.split("\t");
+            return Pair.of(p[0], p[1]);
+          }
+        }, Avros.tableOf(Avros.strings(), Avros.strings()))
+        .filter(FilterFns.<Pair<String, String>>REJECT_ALL())
+        .groupByKey()
+        .write(new AvroPathPerKeyTarget(outDir));
+    p.done();
+
+    FileSystem fs = outDir.getFileSystem(tempDir.getDefaultConfiguration());
+    assertFalse(fs.exists(outDir));
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/ada7e3a6/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroPathPerKeyTarget.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroPathPerKeyTarget.java b/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroPathPerKeyTarget.java
index 6befbad..5c47b8a 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroPathPerKeyTarget.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroPathPerKeyTarget.java
@@ -18,6 +18,8 @@
 package org.apache.crunch.io.avro;
 
 import org.apache.avro.mapred.AvroWrapper;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.crunch.impl.mr.plan.PlanningParameters;
 import org.apache.crunch.io.FileNamingScheme;
 import org.apache.crunch.io.FormatBundle;
@@ -26,8 +28,8 @@ import org.apache.crunch.io.SequentialFileNamingScheme;
 import org.apache.crunch.io.impl.FileTargetImpl;
 import org.apache.crunch.types.PTableType;
 import org.apache.crunch.types.PType;
-import org.apache.crunch.types.avro.AvroPathPerKeyOutputFormat;
 import org.apache.crunch.types.avro.AvroMode;
+import org.apache.crunch.types.avro.AvroPathPerKeyOutputFormat;
 import org.apache.crunch.types.avro.AvroType;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -47,6 +49,8 @@ import java.io.IOException;
  */
 public class AvroPathPerKeyTarget extends FileTargetImpl {
 
+  private static final Log LOG = LogFactory.getLog(AvroPathPerKeyTarget.class);
+
   public AvroPathPerKeyTarget(String path) {
     this(new Path(path));
   }
@@ -89,7 +93,11 @@ public class AvroPathPerKeyTarget extends FileTargetImpl {
   public void handleOutputs(Configuration conf, Path workingPath, int index) throws IOException {
     FileSystem srcFs = workingPath.getFileSystem(conf);
     Path base = new Path(workingPath, PlanningParameters.MULTI_OUTPUT_PREFIX + index);
-    Path[] keys = FileUtil.stat2Paths(srcFs.listStatus(base), base);
+    if (!srcFs.exists(base)) {
+      LOG.warn("Nothing to copy from " + base);
+      return;
+    }
+    Path[] keys = FileUtil.stat2Paths(srcFs.listStatus(base));
     FileSystem dstFs = path.getFileSystem(conf);
     if (!dstFs.exists(path)) {
       dstFs.mkdirs(path);