You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2018/03/30 18:36:09 UTC
crunch git commit: CRUNCH-668: Support globbing patterns in
From#avroFile
Repository: crunch
Updated Branches:
refs/heads/master 4df441907 -> 5ef1c4ed2
CRUNCH-668: Support globbing patterns in From#avroFile
Signed-off-by: Josh Wills <jw...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/5ef1c4ed
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/5ef1c4ed
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/5ef1c4ed
Branch: refs/heads/master
Commit: 5ef1c4ed2a09c2a4672450518be71db029f91433
Parents: 4df4419
Author: Clément MATHIEU <cl...@unportant.info>
Authored: Tue Mar 27 17:55:15 2018 +0200
Committer: Josh Wills <jw...@apache.org>
Committed: Fri Mar 30 10:31:44 2018 -0700
----------------------------------------------------------------------
.../main/java/org/apache/crunch/io/From.java | 30 +++++++--
.../java/org/apache/crunch/io/FromTest.java | 69 ++++++++++++++++++++
2 files changed, 95 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/crunch/blob/5ef1c4ed/crunch-core/src/main/java/org/apache/crunch/io/From.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/From.java b/crunch-core/src/main/java/org/apache/crunch/io/From.java
index f15f309..7b0c9dc 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/From.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/From.java
@@ -347,19 +347,41 @@ public class From {
DataFileReader reader = null;
try {
FileSystem fs = path.getFileSystem(conf);
+
if (!fs.isFile(path)) {
- FileStatus[] fstat = fs.listStatus(path, new PathFilter() {
+ PathFilter ignoreHidden = new PathFilter() {
@Override
public boolean accept(Path path) {
String name = path.getName();
return !name.startsWith("_") && !name.startsWith(".");
}
- });
- if (fstat == null || fstat.length == 0) {
+ };
+
+ FileStatus[] globStatus = fs.globStatus(path, ignoreHidden);
+ if (globStatus == null) {
throw new IllegalArgumentException("No valid files found in directory: " + path);
}
- path = fstat[0].getPath();
+
+ Path newPath = null;
+ for (FileStatus status : globStatus) {
+ if (status.isFile()) {
+ newPath = status.getPath();
+ break;
+ } else {
+ FileStatus[] listStatus = fs.listStatus(path, ignoreHidden);
+ if (listStatus != null && listStatus.length > 0) {
+ newPath = listStatus[0].getPath();
+ break;
+ }
+ }
+ }
+
+ if (newPath == null) {
+ throw new IllegalArgumentException("No valid files found in directory: " + path);
+ }
+ path = newPath;
}
+
reader = new DataFileReader(new FsInput(path, conf), new GenericDatumReader<GenericRecord>());
return reader.getSchema();
} catch (IOException e) {
http://git-wip-us.apache.org/repos/asf/crunch/blob/5ef1c4ed/crunch-core/src/test/java/org/apache/crunch/io/FromTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/io/FromTest.java b/crunch-core/src/test/java/org/apache/crunch/io/FromTest.java
index 06ef6cd..ca2f6e4 100644
--- a/crunch-core/src/test/java/org/apache/crunch/io/FromTest.java
+++ b/crunch-core/src/test/java/org/apache/crunch/io/FromTest.java
@@ -17,15 +17,33 @@
*/
package org.apache.crunch.io;
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+
import com.google.common.collect.ImmutableList;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumWriter;
+import org.apache.crunch.Source;
+import org.apache.crunch.types.avro.Avros;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
public class FromTest {
+ @Rule
+ public TemporaryFolder tmp = new TemporaryFolder();
+
@Test(expected=IllegalArgumentException.class)
public void testAvroFile_EmptyPathListNotAllowed() {
From.avroFile(ImmutableList.<Path>of());
@@ -45,4 +63,55 @@ public class FromTest {
public void testSequenceFile_EmptyPathListNotAllowed() {
From.sequenceFile(ImmutableList.<Path>of(), LongWritable.class, Text.class);
}
+
+ @Test
+ public void testAvroFile_GlobWithSchemaInferenceIsSupported() throws IOException {
+ Schema schema = SchemaBuilder.record("record")
+ .fields()
+ .endRecord();
+
+ DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);
+ try (DataFileWriter<GenericRecord> writer = new DataFileWriter<>(datumWriter)) {
+ writer.create(schema, tmp.newFile("1"));
+ writer.append(new GenericData.Record(schema));
+ }
+
+ Source<GenericData.Record> source = From.avroFile(new Path(tmp.getRoot().toString() + "/*"));
+
+ assertEquals(source.getType(), Avros.generics(schema));
+ }
+
+ @Test
+ public void testAvroFile_DirectoryWithSchemaInferenceIsSupported() throws IOException {
+ Schema schema = SchemaBuilder.record("record")
+ .fields()
+ .endRecord();
+
+ DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);
+ try (DataFileWriter<GenericRecord> writer = new DataFileWriter<>(datumWriter)) {
+ writer.create(schema, tmp.newFile("1"));
+ writer.append(new GenericData.Record(schema));
+ }
+
+ Source<GenericData.Record> source = From.avroFile(new Path(tmp.getRoot().toString()));
+
+ assertEquals(source.getType(), Avros.generics(schema));
+ }
+
+ @Test
+ public void testAvroFile_FileWithSchemaInferenceIsSupported() throws IOException {
+ Schema schema = SchemaBuilder.record("record")
+ .fields()
+ .endRecord();
+
+ DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);
+ try (DataFileWriter<GenericRecord> writer = new DataFileWriter<>(datumWriter)) {
+ writer.create(schema, tmp.newFile("1"));
+ writer.append(new GenericData.Record(schema));
+ }
+
+ Source<GenericData.Record> source = From.avroFile(new Path(tmp.getRoot().toString(), "1"));
+
+ assertEquals(source.getType(), Avros.generics(schema));
+ }
}
\ No newline at end of file