You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by gr...@apache.org on 2014/03/31 18:37:01 UTC
git commit: CRUNCH-371 Empty output in AvroPathPerKeyTarget
Repository: crunch
Updated Branches:
refs/heads/master d4917a217 -> ada7e3a64
CRUNCH-371 Empty output in AvroPathPerKeyTarget
Correction handle an empty output in AvroPathPerKeyTarget, i.e.
do not attempt to copy/create any output paths.
Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/ada7e3a6
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/ada7e3a6
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/ada7e3a6
Branch: refs/heads/master
Commit: ada7e3a6470447dc0b90884a3d59da50b52c5315
Parents: d4917a2
Author: Gabriel Reid <gr...@apache.org>
Authored: Sat Mar 29 13:06:52 2014 +0100
Committer: Gabriel Reid <gr...@apache.org>
Committed: Mon Mar 31 18:34:53 2014 +0200
----------------------------------------------------------------------
.../apache/crunch/io/avro/AvroPathPerKeyIT.java | 25 ++++++++++++++++++++
.../crunch/io/avro/AvroPathPerKeyTarget.java | 12 ++++++++--
2 files changed, 35 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/crunch/blob/ada7e3a6/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroPathPerKeyIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroPathPerKeyIT.java b/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroPathPerKeyIT.java
index 7b30a60..c1f7fa6 100644
--- a/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroPathPerKeyIT.java
+++ b/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroPathPerKeyIT.java
@@ -22,6 +22,7 @@ import com.google.common.collect.Sets;
import org.apache.crunch.MapFn;
import org.apache.crunch.Pair;
import org.apache.crunch.Pipeline;
+import org.apache.crunch.fn.FilterFns;
import org.apache.crunch.impl.mr.MRPipeline;
import org.apache.crunch.io.From;
import org.apache.crunch.test.CrunchTestSupport;
@@ -35,6 +36,7 @@ import java.io.Serializable;
import java.util.Set;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
public class AvroPathPerKeyIT extends CrunchTestSupport implements Serializable {
@Test
@@ -68,4 +70,27 @@ public class AvroPathPerKeyIT extends CrunchTestSupport implements Serializable
assertEquals(1, bStat.length);
assertEquals("part-r-00000.avro", bStat[0].getPath().getName());
}
+
+ @Test
+ public void testOutputFilePerKey_NothingToOutput() throws Exception {
+ Pipeline p = new MRPipeline(AvroPathPerKeyIT.class, tempDir.getDefaultConfiguration());
+ Path outDir = tempDir.getPath("out");
+
+ p.read(From.textFile(tempDir.copyResourceFileName("docs.txt")))
+ .parallelDo(new MapFn<String, Pair<String, String>>() {
+ @Override
+ public Pair<String, String> map(String input) {
+ String[] p = input.split("\t");
+ return Pair.of(p[0], p[1]);
+ }
+ }, Avros.tableOf(Avros.strings(), Avros.strings()))
+ .filter(FilterFns.<Pair<String, String>>REJECT_ALL())
+ .groupByKey()
+ .write(new AvroPathPerKeyTarget(outDir));
+ p.done();
+
+ FileSystem fs = outDir.getFileSystem(tempDir.getDefaultConfiguration());
+ assertFalse(fs.exists(outDir));
+ }
+
}
http://git-wip-us.apache.org/repos/asf/crunch/blob/ada7e3a6/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroPathPerKeyTarget.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroPathPerKeyTarget.java b/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroPathPerKeyTarget.java
index 6befbad..5c47b8a 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroPathPerKeyTarget.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroPathPerKeyTarget.java
@@ -18,6 +18,8 @@
package org.apache.crunch.io.avro;
import org.apache.avro.mapred.AvroWrapper;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.crunch.impl.mr.plan.PlanningParameters;
import org.apache.crunch.io.FileNamingScheme;
import org.apache.crunch.io.FormatBundle;
@@ -26,8 +28,8 @@ import org.apache.crunch.io.SequentialFileNamingScheme;
import org.apache.crunch.io.impl.FileTargetImpl;
import org.apache.crunch.types.PTableType;
import org.apache.crunch.types.PType;
-import org.apache.crunch.types.avro.AvroPathPerKeyOutputFormat;
import org.apache.crunch.types.avro.AvroMode;
+import org.apache.crunch.types.avro.AvroPathPerKeyOutputFormat;
import org.apache.crunch.types.avro.AvroType;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -47,6 +49,8 @@ import java.io.IOException;
*/
public class AvroPathPerKeyTarget extends FileTargetImpl {
+ private static final Log LOG = LogFactory.getLog(AvroPathPerKeyTarget.class);
+
public AvroPathPerKeyTarget(String path) {
this(new Path(path));
}
@@ -89,7 +93,11 @@ public class AvroPathPerKeyTarget extends FileTargetImpl {
public void handleOutputs(Configuration conf, Path workingPath, int index) throws IOException {
FileSystem srcFs = workingPath.getFileSystem(conf);
Path base = new Path(workingPath, PlanningParameters.MULTI_OUTPUT_PREFIX + index);
- Path[] keys = FileUtil.stat2Paths(srcFs.listStatus(base), base);
+ if (!srcFs.exists(base)) {
+ LOG.warn("Nothing to copy from " + base);
+ return;
+ }
+ Path[] keys = FileUtil.stat2Paths(srcFs.listStatus(base));
FileSystem dstFs = path.getFileSystem(conf);
if (!dstFs.exists(path)) {
dstFs.mkdirs(path);