You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2018/02/02 23:38:36 UTC
crunch git commit: Expose combine file split file path via Hadoop
config
Repository: crunch
Updated Branches:
refs/heads/master 41b201a9e -> 8121bdf5a
Expose combine file split file path via Hadoop config
Signed-off-by: Josh Wills <jw...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/8121bdf5
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/8121bdf5
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/8121bdf5
Branch: refs/heads/master
Commit: 8121bdf5a8a292b796fb3dc07f14e96a8f06d5a7
Parents: 41b201a
Author: Ben Roling <be...@cerner.com>
Authored: Wed Jan 24 10:40:18 2018 -0600
Committer: Josh Wills <jw...@apache.org>
Committed: Fri Feb 2 15:10:18 2018 -0800
----------------------------------------------------------------------
.../org/apache/crunch/io/CombineFileIT.java | 54 +++++++++++++++++++-
.../crunch/impl/mr/run/CrunchRecordReader.java | 10 +++-
2 files changed, 61 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/crunch/blob/8121bdf5/crunch-core/src/it/java/org/apache/crunch/io/CombineFileIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/io/CombineFileIT.java b/crunch-core/src/it/java/org/apache/crunch/io/CombineFileIT.java
index d0d61f9..4c8189f 100644
--- a/crunch-core/src/it/java/org/apache/crunch/io/CombineFileIT.java
+++ b/crunch-core/src/it/java/org/apache/crunch/io/CombineFileIT.java
@@ -18,18 +18,26 @@
package org.apache.crunch.io;
import com.google.common.io.Files;
+import org.apache.crunch.DoFn;
+import org.apache.crunch.Emitter;
import org.apache.crunch.PCollection;
+import org.apache.crunch.Pair;
import org.apache.crunch.impl.mr.MRPipeline;
import org.apache.crunch.test.TemporaryPath;
import org.apache.crunch.test.TemporaryPaths;
import org.apache.crunch.test.Tests;
+import org.apache.crunch.types.avro.Avros;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.junit.Rule;
import org.junit.Test;
import java.io.File;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
public class CombineFileIT {
@Rule
@@ -37,7 +45,7 @@ public class CombineFileIT {
@Test
public void testCombine() throws Exception {
- File srcFiles = tmpDir.getFile("srcs");
+ final File srcFiles = tmpDir.getFile("srcs");
File outputFiles = tmpDir.getFile("out");
assertTrue(srcFiles.mkdir());
File src1 = tmpDir.copyResourceFile(Tests.resource(this, "src1.txt"));
@@ -47,9 +55,51 @@ public class CombineFileIT {
MRPipeline p = new MRPipeline(CombineFileIT.class, tmpDir.getDefaultConfiguration());
PCollection<String> in = p.readTextFile(srcFiles.getAbsolutePath());
- in.write(To.textFile(outputFiles.getAbsolutePath()));
+ PCollection<Pair<String, String>> out = in.parallelDo(
+ new IdentityPlusPathFn(srcFiles), Avros.pairs(Avros.strings(), Avros.strings()));
+ out.write(To.textFile(outputFiles.getAbsolutePath()));
p.done();
assertEquals(4, outputFiles.listFiles().length);
+
+ // verify "crunch.split.file" is being handled correctly
+ FileSystem fs = FileSystem.get(tmpDir.getDefaultConfiguration());
+ Path qualifiedSourcePath = fs.makeQualified(new Path(srcFiles.getAbsolutePath()));
+ Iterable<Pair<String, String>> materialized = out.materialize();
+ for (Pair<String, String> pair : materialized) {
+ Path path = new Path(pair.first());
+ String text = pair.second();
+ assertEquals(qualifiedSourcePath, path.getParent());
+ String fileName = path.getName();
+
+ // make sure filename is correct for each record
+ String[] parts = text.split(",");
+ switch (fileName) {
+ case "src1.txt":
+ assertEquals("1", parts[1].substring(0, 1));
+ break;
+ case "src2.txt":
+ assertEquals("2", parts[1].substring(0, 1));
+ break;
+ default:
+ fail("unexpected filename: " + fileName);
+ }
+ }
+
}
+ private static class IdentityPlusPathFn extends DoFn<String, Pair<String, String>> {
+ private final File srcFiles;
+
+ public IdentityPlusPathFn(File srcFiles) {
+ this.srcFiles = srcFiles;
+ }
+
+ @Override
+ public void process(String input, Emitter<Pair<String, String>> emitter) {
+ String filePath = getConfiguration().get("crunch.split.file");
+ assertNotNull(filePath);
+
+ emitter.emit(Pair.of(filePath, input));
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/crunch/blob/8121bdf5/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchRecordReader.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchRecordReader.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchRecordReader.java
index d4175a6..da4bc33 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchRecordReader.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchRecordReader.java
@@ -37,6 +37,7 @@ class CrunchRecordReader<K, V> extends RecordReader<K, V> {
private TaskAttemptContext context;
private int idx;
private long progress;
+ private Configuration rootConf;
public CrunchRecordReader(InputSplit inputSplit, final TaskAttemptContext context) throws IOException,
InterruptedException {
@@ -44,6 +45,7 @@ class CrunchRecordReader<K, V> extends RecordReader<K, V> {
if (crunchSplit.get() instanceof CombineFileSplit) {
combineFileSplit = (CombineFileSplit) crunchSplit.get();
}
+ rootConf = context.getConfiguration();
crunchSplit.setConf(context.getConfiguration());
this.context = new TaskAttemptContextImpl(crunchSplit.getConf(), context.getTaskAttemptID());
initNextRecordReader();
@@ -70,7 +72,13 @@ class CrunchRecordReader<K, V> extends RecordReader<K, V> {
InputFormat<K, V> inputFormat = (InputFormat<K, V>) ReflectionUtils.newInstance(
crunchSplit.getInputFormatClass(),
crunchSplit.getConf());
- this.curReader = inputFormat.createRecordReader(getDelegateSplit(), context);
+
+ InputSplit inputSplit = getDelegateSplit();
+ if (inputSplit instanceof FileSplit)
+ {
+ rootConf.set("crunch.split.file", ((FileSplit) inputSplit).getPath().toString());
+ }
+ this.curReader = inputFormat.createRecordReader(inputSplit, context);
return true;
}