You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2013/12/11 21:08:17 UTC

git commit: CRUNCH-310: AvroParquetFileSource with builder interface for selecting fields and specifying a filter class.

Updated Branches:
  refs/heads/master 22965cf87 -> b46c2b8b4


CRUNCH-310: AvroParquetFileSource with builder interface for selecting fields and
specifying a filter class.


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/b46c2b8b
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/b46c2b8b
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/b46c2b8b

Branch: refs/heads/master
Commit: b46c2b8b4decdf6825f91f68a296e7671b13a76b
Parents: 22965cf
Author: Josh Wills <jw...@apache.org>
Authored: Tue Dec 10 17:50:39 2013 -0800
Committer: Josh Wills <jw...@apache.org>
Committed: Tue Dec 10 22:30:57 2013 -0800

----------------------------------------------------------------------
 .../parquet/AvroParquetFileSourceTargetIT.java  | 122 +++++++++++++++++-
 .../io/parquet/AvroParquetFileSource.java       | 125 +++++++++++++++++--
 .../io/parquet/AvroParquetFileSourceTarget.java |   3 +-
 .../io/parquet/AvroParquetFileTarget.java       |   4 +-
 crunch-core/src/test/avro/person.avsc           |   2 +-
 5 files changed, 240 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/b46c2b8b/crunch-core/src/it/java/org/apache/crunch/io/parquet/AvroParquetFileSourceTargetIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/io/parquet/AvroParquetFileSourceTargetIT.java b/crunch-core/src/it/java/org/apache/crunch/io/parquet/AvroParquetFileSourceTargetIT.java
index b6d51f2..5c7d9e0 100644
--- a/crunch-core/src/it/java/org/apache/crunch/io/parquet/AvroParquetFileSourceTargetIT.java
+++ b/crunch-core/src/it/java/org/apache/crunch/io/parquet/AvroParquetFileSourceTargetIT.java
@@ -17,30 +17,40 @@
  */
 package org.apache.crunch.io.parquet;
 
-import com.google.common.collect.Lists;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
 import java.io.File;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.List;
+
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData.Record;
 import org.apache.avro.generic.GenericRecord;
+import org.apache.crunch.FilterFn;
 import org.apache.crunch.PCollection;
 import org.apache.crunch.Pipeline;
+import org.apache.crunch.Target;
 import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.io.To;
+import org.apache.crunch.io.avro.AvroFileSource;
 import org.apache.crunch.test.Person;
-import org.apache.crunch.test.StringWrapper;
 import org.apache.crunch.test.TemporaryPath;
 import org.apache.crunch.test.TemporaryPaths;
-import org.apache.crunch.types.avro.AvroType;
 import org.apache.crunch.types.avro.Avros;
 import org.apache.hadoop.fs.Path;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
+
 import parquet.avro.AvroParquetWriter;
 
-import static org.junit.Assert.assertEquals;
+import com.google.common.collect.Lists;
+import parquet.column.ColumnReader;
+import parquet.filter.RecordFilter;
+import parquet.filter.UnboundRecordFilter;
 
 @SuppressWarnings("serial")
 public class AvroParquetFileSourceTargetIT implements Serializable {
@@ -110,5 +120,109 @@ public class AvroParquetFileSourceTargetIT implements Serializable {
 
     assertEquals(Lists.newArrayList(savedRecord), Lists.newArrayList(recordList));
   }
+  
+  @Test
+  public void testProjectionSpecific() throws IOException {
+    GenericRecord savedRecord = new Record(Person.SCHEMA$);
+    savedRecord.put("name", "John Doe");
+    savedRecord.put("age", 42);
+    savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane"));
+    populateGenericFile(Lists.newArrayList(savedRecord), Person.SCHEMA$);
+
+    Pipeline pipeline = new MRPipeline(AvroParquetFileSourceTargetIT.class, tmpDir.getDefaultConfiguration());
+    PCollection<Person> genericCollection = pipeline.read(
+        AvroParquetFileSource.builder(Person.class)
+            .includeField("age")
+            .build(new Path(avroFile.getAbsolutePath())));
+
+    File outputFile = tmpDir.getFile("output");
+    Target avroFile = To.avroFile(outputFile.getAbsolutePath());
+    genericCollection.write(avroFile);
+    pipeline.done();
+    
+    Pipeline pipeline2 = new MRPipeline(AvroParquetFileSourceTargetIT.class,
+        tmpDir.getDefaultConfiguration());
+    PCollection<Person> ageOnly = pipeline2.read(
+        new AvroFileSource<Person>(new Path(outputFile.getAbsolutePath()), Avros.specifics(Person.class)));
+
+    for (Person person : ageOnly.materialize()) {
+      assertNull(person.getName());
+      assertEquals(person.getAge(), new Integer(42));
+      assertNull(person.getSiblingnames());
+    }
+  }
+
+  @Test(expected = IndexOutOfBoundsException.class)
+  public void testProjectionGeneric() throws IOException {
+    GenericRecord savedRecord = new Record(Person.SCHEMA$);
+    savedRecord.put("name", "John Doe");
+    savedRecord.put("age", 42);
+    savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane"));
+    populateGenericFile(Lists.newArrayList(savedRecord), Person.SCHEMA$);
+
+    AvroParquetFileSource<GenericRecord> src = AvroParquetFileSource.builder(Person.SCHEMA$)
+        .includeField("age")
+        .build(new Path(avroFile.getAbsolutePath()));
+    Pipeline pipeline = new MRPipeline(AvroParquetFileSourceTargetIT.class, tmpDir.getDefaultConfiguration());
+    PCollection<GenericRecord> genericCollection = pipeline.read(src);
+
+    File outputFile = tmpDir.getFile("output");
+    Target avroFile = To.avroFile(outputFile.getAbsolutePath());
+    genericCollection.write(avroFile);
+    pipeline.done();
+
+    Pipeline pipeline2 = new MRPipeline(AvroParquetFileSourceTargetIT.class,
+        tmpDir.getDefaultConfiguration());
+    PCollection<Record> ageOnly = pipeline2.read(
+        new AvroFileSource<Record>(new Path(outputFile.getAbsolutePath()), Avros.generics(src.getProjectedSchema())));
+
+    for (Record person : ageOnly.materialize()) {
+      assertEquals(person.get(0), 42);
+      Object notAge = person.get(1);
+    }
+  }
 
+  @Test
+  public void testProjectionFiltered() throws IOException {
+    GenericRecord savedRecord = new Record(Person.SCHEMA$);
+    savedRecord.put("name", "John Doe");
+    savedRecord.put("age", 42);
+    savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane"));
+    populateGenericFile(Lists.newArrayList(savedRecord), Person.SCHEMA$);
+
+    Pipeline pipeline = new MRPipeline(AvroParquetFileSourceTargetIT.class, tmpDir.getDefaultConfiguration());
+    PCollection<Person> genericCollection = pipeline.read(
+        AvroParquetFileSource.builder(Person.class)
+            .includeField("age")
+            .filterClass(RejectAllFilter.class)
+            .build(new Path(avroFile.getAbsolutePath())));
+
+    File outputFile = tmpDir.getFile("output");
+    Target avroFile = To.avroFile(outputFile.getAbsolutePath());
+    genericCollection.filter(new FilterFn<Person>() {
+      @Override
+      public boolean accept(Person input) {
+        return input != null;
+      }
+    }).write(avroFile);
+    pipeline.done();
+
+    Pipeline pipeline2 = new MRPipeline(AvroParquetFileSourceTargetIT.class,
+        tmpDir.getDefaultConfiguration());
+    PCollection<Person> ageOnly = pipeline2.read(
+        new AvroFileSource<Person>(new Path(outputFile.getAbsolutePath()), Avros.specifics(Person.class)));
+    assertTrue(Lists.newArrayList(ageOnly.materialize()).isEmpty());
+  }
+
+  public static class RejectAllFilter implements UnboundRecordFilter {
+    @Override
+    public RecordFilter bind(Iterable<ColumnReader> readers) {
+      return new RecordFilter() {
+        @Override
+        public boolean isMatch() {
+          return false;
+        }
+      };
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/b46c2b8b/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileSource.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileSource.java b/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileSource.java
index 76e80ef..41e0d8e 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileSource.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileSource.java
@@ -19,34 +19,80 @@ package org.apache.crunch.io.parquet;
 
 import java.io.IOException;
 import java.util.List;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.crunch.ReadableData;
 import org.apache.crunch.impl.mr.run.RuntimeParameters;
 import org.apache.crunch.io.FormatBundle;
 import org.apache.crunch.io.ReadableSource;
-import org.apache.crunch.ReadableData;
 import org.apache.crunch.io.impl.FileSourceImpl;
 import org.apache.crunch.types.Converter;
 import org.apache.crunch.types.avro.AvroType;
+import org.apache.crunch.types.avro.Avros;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+
+import org.apache.hadoop.util.ReflectionUtils;
 import parquet.avro.AvroParquetInputFormat;
 import parquet.avro.AvroReadSupport;
+import parquet.filter.UnboundRecordFilter;
 
-public class AvroParquetFileSource<T> extends FileSourceImpl<T> implements ReadableSource<T> {
+public class AvroParquetFileSource<T extends IndexedRecord> extends FileSourceImpl<T> implements ReadableSource<T> {
 
-  private static <S> FormatBundle<AvroParquetInputFormat> getBundle(AvroType<S> ptype) {
-    return FormatBundle.forInput(AvroParquetInputFormat.class)
-        .set(AvroReadSupport.AVRO_REQUESTED_PROJECTION, ptype.getSchema().toString())
+  private final String projSchema;
+
+  private static <S> FormatBundle<AvroParquetInputFormat> getBundle(
+      AvroType<S> ptype,
+      Schema extSchema,
+      Class<? extends UnboundRecordFilter> filterClass) {
+    Schema schema = extSchema == null ? ptype.getSchema() : extSchema;
+    // Need to check that all fields are accounted for in ptype schema...
+    FormatBundle<AvroParquetInputFormat> fb = FormatBundle.forInput(AvroParquetInputFormat.class)
+        .set(AvroReadSupport.AVRO_REQUESTED_PROJECTION, schema.toString())
         // ParquetRecordReader expects ParquetInputSplits, not FileSplits, so it
         // doesn't work with CombineFileInputFormat
         .set(RuntimeParameters.DISABLE_COMBINE_FILE, "true");
+    if (filterClass != null) {
+      fb.set("parquet.read.filter", filterClass.getName());
+    }
+    return fb;
   }
 
   public AvroParquetFileSource(Path path, AvroType<T> ptype) {
-    super(path, ptype, getBundle(ptype));
+    this(ImmutableList.of(path), ptype);
+  }
+
+  public AvroParquetFileSource(Path path, AvroType<T> ptype, Schema schema) {
+    this(ImmutableList.of(path), ptype, schema);
   }
 
   public AvroParquetFileSource(List<Path> paths, AvroType<T> ptype) {
-    super(paths, ptype, getBundle(ptype));
+    this(paths, ptype, null, null);
+  }
+  
+  public AvroParquetFileSource(List<Path> paths, AvroType<T> ptype, Schema schema) {
+    this(paths, ptype, schema, null);
+  }
+
+  public AvroParquetFileSource(List<Path> paths, AvroType<T> ptype,
+                               Class<? extends UnboundRecordFilter> filterClass) {
+    this(paths, ptype, null, filterClass);
+  }
+
+  public AvroParquetFileSource(List<Path> paths, AvroType<T> ptype, Schema schema,
+                               Class<? extends UnboundRecordFilter> filterClass) {
+    super(paths, ptype, getBundle(ptype, schema, filterClass));
+    projSchema = schema == null ? null : schema.toString();
+  }
+
+  public Schema getProjectedSchema() {
+    return (new Schema.Parser()).parse(projSchema);
   }
 
   @Override
@@ -70,6 +116,69 @@ public class AvroParquetFileSource<T> extends FileSourceImpl<T> implements Reada
 
   @Override
   public String toString() {
-    return "Parquet(" + pathsAsString() + ")";
+    return "Parquet(" + pathsAsString() + ((projSchema == null) ? ")" : ") -> " + projSchema);
+  }
+
+  public static <T extends SpecificRecord> Builder<T> builder(Class<T> clazz) {
+    return new Builder<T>(Preconditions.checkNotNull(clazz));
+  }
+
+  public static Builder<GenericRecord> builder(Schema schema) {
+    Preconditions.checkNotNull(schema);
+    Preconditions.checkArgument(Schema.Type.RECORD.equals(schema.getType()));
+    return new Builder(schema);
+  }
+
+  /**
+   * Helper class for constructing an {@code AvroParquetFileSource} that only reads a subset of the
+   * fields defined in an Avro schema.
+   */
+  public static class Builder<T extends IndexedRecord> {
+    private Class<T> clazz;
+    private Schema baseSchema;
+    private List<Schema.Field> fields = Lists.newArrayList();
+    private Class<? extends UnboundRecordFilter> filterClass;
+
+    private Builder(Class<T> clazz) {
+      this.clazz = clazz;
+      this.baseSchema = ReflectionUtils.newInstance(clazz, null).getSchema();
+    }
+
+    private Builder(Schema baseSchema) {
+      this.baseSchema = baseSchema;
+    }
+
+    public Builder includeField(String fieldName) {
+      Schema.Field field = baseSchema.getField(fieldName);
+      if (field == null) {
+        throw new IllegalArgumentException("No field " + fieldName + " in schema: " + baseSchema.getName());
+      }
+      fields.add(new Schema.Field(field.name(), field.schema(), field.doc(), field.defaultValue(), field.order()));
+      return this;
+    }
+
+    public Builder filterClass(Class<? extends UnboundRecordFilter> filterClass) {
+      this.filterClass = filterClass;
+      return this;
+    }
+
+    public AvroParquetFileSource<T> build(Path path) {
+      return build(ImmutableList.of(path));
+    }
+
+    public AvroParquetFileSource<T> build(List<Path> paths) {
+      AvroType<T> at = clazz == null ? Avros.generics(baseSchema) : Avros.specifics((Class) clazz);
+      if (fields.isEmpty()) {
+        return new AvroParquetFileSource<T>(paths, at, filterClass);
+      } else {
+        Schema projected = Schema.createRecord(
+            baseSchema.getName(),
+            baseSchema.getDoc(),
+            baseSchema.getNamespace(),
+            baseSchema.isError());
+        projected.setFields(fields);
+        return new AvroParquetFileSource<T>(paths, at, projected, filterClass);
+      }
+    }
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/crunch/blob/b46c2b8b/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileSourceTarget.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileSourceTarget.java b/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileSourceTarget.java
index 8d93eba..359a484 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileSourceTarget.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileSourceTarget.java
@@ -17,13 +17,14 @@
  */
 package org.apache.crunch.io.parquet;
 
+import org.apache.avro.generic.IndexedRecord;
 import org.apache.crunch.io.FileNamingScheme;
 import org.apache.crunch.io.SequentialFileNamingScheme;
 import org.apache.crunch.io.impl.ReadableSourcePathTargetImpl;
 import org.apache.crunch.types.avro.AvroType;
 import org.apache.hadoop.fs.Path;
 
-public class AvroParquetFileSourceTarget<T> extends ReadableSourcePathTargetImpl<T> {
+public class AvroParquetFileSourceTarget<T extends IndexedRecord> extends ReadableSourcePathTargetImpl<T> {
 
   public AvroParquetFileSourceTarget(Path path, AvroType<T> atype) {
     this(path, atype, SequentialFileNamingScheme.getInstance());

http://git-wip-us.apache.org/repos/asf/crunch/blob/b46c2b8b/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileTarget.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileTarget.java b/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileTarget.java
index c67b9f1..a6a34cd 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileTarget.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileTarget.java
@@ -91,8 +91,8 @@ public class AvroParquetFileTarget extends FileTargetImpl {
 
   @Override
   public <T> SourceTarget<T> asSourceTarget(PType<T> ptype) {
-    if (ptype instanceof AvroType) {
-      return new AvroParquetFileSourceTarget<T>(path, (AvroType<T>) ptype);
+    if (ptype instanceof AvroType && IndexedRecord.class.isAssignableFrom(((AvroType) ptype).getTypeClass())) {
+      return new AvroParquetFileSourceTarget(path, (AvroType<T>) ptype);
     }
     return null;
   }

http://git-wip-us.apache.org/repos/asf/crunch/blob/b46c2b8b/crunch-core/src/test/avro/person.avsc
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/avro/person.avsc b/crunch-core/src/test/avro/person.avsc
index babd808..e0fff30 100644
--- a/crunch-core/src/test/avro/person.avsc
+++ b/crunch-core/src/test/avro/person.avsc
@@ -22,5 +22,5 @@
 "fields": [
   {"name": "name", "type": ["string", "null"] },
   {"name": "age", "type": "int"},
-  {"name": "siblingnames", "type": {"type": "array", "items": "string"}} ]
+  {"name": "siblingnames", "type" : [{ "type": "array", "items": "string" },  "null"], "default": null } ]
 }