You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2018/03/30 18:36:09 UTC

crunch git commit: CRUNCH-668: Support globbing patterns in From#avroFile

Repository: crunch
Updated Branches:
  refs/heads/master 4df441907 -> 5ef1c4ed2


CRUNCH-668: Support globbing patterns in From#avroFile

Signed-off-by: Josh Wills <jw...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/5ef1c4ed
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/5ef1c4ed
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/5ef1c4ed

Branch: refs/heads/master
Commit: 5ef1c4ed2a09c2a4672450518be71db029f91433
Parents: 4df4419
Author: Clément MATHIEU <cl...@unportant.info>
Authored: Tue Mar 27 17:55:15 2018 +0200
Committer: Josh Wills <jw...@apache.org>
Committed: Fri Mar 30 10:31:44 2018 -0700

----------------------------------------------------------------------
 .../main/java/org/apache/crunch/io/From.java    | 30 +++++++--
 .../java/org/apache/crunch/io/FromTest.java     | 69 ++++++++++++++++++++
 2 files changed, 95 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/5ef1c4ed/crunch-core/src/main/java/org/apache/crunch/io/From.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/From.java b/crunch-core/src/main/java/org/apache/crunch/io/From.java
index f15f309..7b0c9dc 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/From.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/From.java
@@ -347,19 +347,41 @@ public class From {
     DataFileReader reader = null;
     try {
       FileSystem fs = path.getFileSystem(conf);
+
       if (!fs.isFile(path)) {
-        FileStatus[] fstat = fs.listStatus(path, new PathFilter() {
+        PathFilter ignoreHidden = new PathFilter() {
           @Override
           public boolean accept(Path path) {
             String name = path.getName();
             return !name.startsWith("_") && !name.startsWith(".");
           }
-        });
-        if (fstat == null || fstat.length == 0) {
+        };
+
+        FileStatus[] globStatus = fs.globStatus(path, ignoreHidden);
+        if (globStatus == null) {
           throw new IllegalArgumentException("No valid files found in directory: " + path);
         }
-        path = fstat[0].getPath();
+
+        Path newPath = null;
+        for (FileStatus status : globStatus) {
+          if (status.isFile()) {
+              newPath = status.getPath();
+              break;
+          } else {
+            FileStatus[] listStatus = fs.listStatus(path, ignoreHidden);
+            if (listStatus != null && listStatus.length > 0) {
+                newPath = listStatus[0].getPath();
+                break;
+            }
+          }
+        }
+
+        if (newPath == null) {
+          throw new IllegalArgumentException("No valid files found in directory: " + path);
+        }
+        path = newPath;
       }
+
       reader = new DataFileReader(new FsInput(path, conf), new GenericDatumReader<GenericRecord>());
       return reader.getSchema();
     } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/crunch/blob/5ef1c4ed/crunch-core/src/test/java/org/apache/crunch/io/FromTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/io/FromTest.java b/crunch-core/src/test/java/org/apache/crunch/io/FromTest.java
index 06ef6cd..ca2f6e4 100644
--- a/crunch-core/src/test/java/org/apache/crunch/io/FromTest.java
+++ b/crunch-core/src/test/java/org/apache/crunch/io/FromTest.java
@@ -17,15 +17,33 @@
  */
 package org.apache.crunch.io;
 
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+
 import com.google.common.collect.ImmutableList;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumWriter;
+import org.apache.crunch.Source;
+import org.apache.crunch.types.avro.Avros;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 
 public class FromTest {
 
+  @Rule
+  public TemporaryFolder tmp = new TemporaryFolder();
+
   @Test(expected=IllegalArgumentException.class)
   public void testAvroFile_EmptyPathListNotAllowed() {
     From.avroFile(ImmutableList.<Path>of());
@@ -45,4 +63,55 @@ public class FromTest {
   public void testSequenceFile_EmptyPathListNotAllowed() {
     From.sequenceFile(ImmutableList.<Path>of(), LongWritable.class, Text.class);
   }
+
+  @Test
+  public void testAvroFile_GlobWithSchemaInferenceIsSupported() throws IOException {
+    Schema schema = SchemaBuilder.record("record")
+            .fields()
+            .endRecord();
+
+    DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);
+    try (DataFileWriter<GenericRecord> writer = new DataFileWriter<>(datumWriter)) {
+      writer.create(schema, tmp.newFile("1"));
+      writer.append(new GenericData.Record(schema));
+    }
+
+    Source<GenericData.Record> source = From.avroFile(new Path(tmp.getRoot().toString() + "/*"));
+
+    assertEquals(source.getType(), Avros.generics(schema));
+  }
+
+  @Test
+  public void testAvroFile_DirectoryWithSchemaInferenceIsSupported() throws IOException {
+    Schema schema = SchemaBuilder.record("record")
+            .fields()
+            .endRecord();
+
+    DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);
+    try (DataFileWriter<GenericRecord> writer = new DataFileWriter<>(datumWriter)) {
+      writer.create(schema, tmp.newFile("1"));
+      writer.append(new GenericData.Record(schema));
+    }
+
+    Source<GenericData.Record> source = From.avroFile(new Path(tmp.getRoot().toString()));
+
+    assertEquals(source.getType(), Avros.generics(schema));
+  }
+
+  @Test
+  public void testAvroFile_FileWithSchemaInferenceIsSupported() throws IOException {
+    Schema schema = SchemaBuilder.record("record")
+            .fields()
+            .endRecord();
+
+    DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);
+    try (DataFileWriter<GenericRecord> writer = new DataFileWriter<>(datumWriter)) {
+      writer.create(schema, tmp.newFile("1"));
+      writer.append(new GenericData.Record(schema));
+    }
+
+    Source<GenericData.Record> source = From.avroFile(new Path(tmp.getRoot().toString(), "1"));
+
+    assertEquals(source.getType(), Avros.generics(schema));
+  }
 }
\ No newline at end of file