You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2013/12/11 21:08:17 UTC
git commit: CRUNCH-310: AvroParquetFileSource with builder interface
for selecting fields and specifying a filter class.
Updated Branches:
refs/heads/master 22965cf87 -> b46c2b8b4
CRUNCH-310: AvroParquetFileSource with builder interface for selecting fields and
specifying a filter class.
Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/b46c2b8b
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/b46c2b8b
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/b46c2b8b
Branch: refs/heads/master
Commit: b46c2b8b4decdf6825f91f68a296e7671b13a76b
Parents: 22965cf
Author: Josh Wills <jw...@apache.org>
Authored: Tue Dec 10 17:50:39 2013 -0800
Committer: Josh Wills <jw...@apache.org>
Committed: Tue Dec 10 22:30:57 2013 -0800
----------------------------------------------------------------------
.../parquet/AvroParquetFileSourceTargetIT.java | 122 +++++++++++++++++-
.../io/parquet/AvroParquetFileSource.java | 125 +++++++++++++++++--
.../io/parquet/AvroParquetFileSourceTarget.java | 3 +-
.../io/parquet/AvroParquetFileTarget.java | 4 +-
crunch-core/src/test/avro/person.avsc | 2 +-
5 files changed, 240 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/crunch/blob/b46c2b8b/crunch-core/src/it/java/org/apache/crunch/io/parquet/AvroParquetFileSourceTargetIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/io/parquet/AvroParquetFileSourceTargetIT.java b/crunch-core/src/it/java/org/apache/crunch/io/parquet/AvroParquetFileSourceTargetIT.java
index b6d51f2..5c7d9e0 100644
--- a/crunch-core/src/it/java/org/apache/crunch/io/parquet/AvroParquetFileSourceTargetIT.java
+++ b/crunch-core/src/it/java/org/apache/crunch/io/parquet/AvroParquetFileSourceTargetIT.java
@@ -17,30 +17,40 @@
*/
package org.apache.crunch.io.parquet;
-import com.google.common.collect.Lists;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.util.List;
+
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData.Record;
import org.apache.avro.generic.GenericRecord;
+import org.apache.crunch.FilterFn;
import org.apache.crunch.PCollection;
import org.apache.crunch.Pipeline;
+import org.apache.crunch.Target;
import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.io.To;
+import org.apache.crunch.io.avro.AvroFileSource;
import org.apache.crunch.test.Person;
-import org.apache.crunch.test.StringWrapper;
import org.apache.crunch.test.TemporaryPath;
import org.apache.crunch.test.TemporaryPaths;
-import org.apache.crunch.types.avro.AvroType;
import org.apache.crunch.types.avro.Avros;
import org.apache.hadoop.fs.Path;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
+
import parquet.avro.AvroParquetWriter;
-import static org.junit.Assert.assertEquals;
+import com.google.common.collect.Lists;
+import parquet.column.ColumnReader;
+import parquet.filter.RecordFilter;
+import parquet.filter.UnboundRecordFilter;
@SuppressWarnings("serial")
public class AvroParquetFileSourceTargetIT implements Serializable {
@@ -110,5 +120,109 @@ public class AvroParquetFileSourceTargetIT implements Serializable {
assertEquals(Lists.newArrayList(savedRecord), Lists.newArrayList(recordList));
}
+
+ @Test
+ public void testProjectionSpecific() throws IOException {
+ GenericRecord savedRecord = new Record(Person.SCHEMA$);
+ savedRecord.put("name", "John Doe");
+ savedRecord.put("age", 42);
+ savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane"));
+ populateGenericFile(Lists.newArrayList(savedRecord), Person.SCHEMA$);
+
+ Pipeline pipeline = new MRPipeline(AvroParquetFileSourceTargetIT.class, tmpDir.getDefaultConfiguration());
+ PCollection<Person> genericCollection = pipeline.read(
+ AvroParquetFileSource.builder(Person.class)
+ .includeField("age")
+ .build(new Path(avroFile.getAbsolutePath())));
+
+ File outputFile = tmpDir.getFile("output");
+ Target avroFile = To.avroFile(outputFile.getAbsolutePath());
+ genericCollection.write(avroFile);
+ pipeline.done();
+
+ Pipeline pipeline2 = new MRPipeline(AvroParquetFileSourceTargetIT.class,
+ tmpDir.getDefaultConfiguration());
+ PCollection<Person> ageOnly = pipeline2.read(
+ new AvroFileSource<Person>(new Path(outputFile.getAbsolutePath()), Avros.specifics(Person.class)));
+
+ for (Person person : ageOnly.materialize()) {
+ assertNull(person.getName());
+ assertEquals(person.getAge(), new Integer(42));
+ assertNull(person.getSiblingnames());
+ }
+ }
+
+ @Test(expected = IndexOutOfBoundsException.class)
+ public void testProjectionGeneric() throws IOException {
+ GenericRecord savedRecord = new Record(Person.SCHEMA$);
+ savedRecord.put("name", "John Doe");
+ savedRecord.put("age", 42);
+ savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane"));
+ populateGenericFile(Lists.newArrayList(savedRecord), Person.SCHEMA$);
+
+ AvroParquetFileSource<GenericRecord> src = AvroParquetFileSource.builder(Person.SCHEMA$)
+ .includeField("age")
+ .build(new Path(avroFile.getAbsolutePath()));
+ Pipeline pipeline = new MRPipeline(AvroParquetFileSourceTargetIT.class, tmpDir.getDefaultConfiguration());
+ PCollection<GenericRecord> genericCollection = pipeline.read(src);
+
+ File outputFile = tmpDir.getFile("output");
+ Target avroFile = To.avroFile(outputFile.getAbsolutePath());
+ genericCollection.write(avroFile);
+ pipeline.done();
+
+ Pipeline pipeline2 = new MRPipeline(AvroParquetFileSourceTargetIT.class,
+ tmpDir.getDefaultConfiguration());
+ PCollection<Record> ageOnly = pipeline2.read(
+ new AvroFileSource<Record>(new Path(outputFile.getAbsolutePath()), Avros.generics(src.getProjectedSchema())));
+
+ for (Record person : ageOnly.materialize()) {
+ assertEquals(person.get(0), 42);
+ Object notAge = person.get(1);
+ }
+ }
+ @Test
+ public void testProjectionFiltered() throws IOException {
+ GenericRecord savedRecord = new Record(Person.SCHEMA$);
+ savedRecord.put("name", "John Doe");
+ savedRecord.put("age", 42);
+ savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane"));
+ populateGenericFile(Lists.newArrayList(savedRecord), Person.SCHEMA$);
+
+ Pipeline pipeline = new MRPipeline(AvroParquetFileSourceTargetIT.class, tmpDir.getDefaultConfiguration());
+ PCollection<Person> genericCollection = pipeline.read(
+ AvroParquetFileSource.builder(Person.class)
+ .includeField("age")
+ .filterClass(RejectAllFilter.class)
+ .build(new Path(avroFile.getAbsolutePath())));
+
+ File outputFile = tmpDir.getFile("output");
+ Target avroFile = To.avroFile(outputFile.getAbsolutePath());
+ genericCollection.filter(new FilterFn<Person>() {
+ @Override
+ public boolean accept(Person input) {
+ return input != null;
+ }
+ }).write(avroFile);
+ pipeline.done();
+
+ Pipeline pipeline2 = new MRPipeline(AvroParquetFileSourceTargetIT.class,
+ tmpDir.getDefaultConfiguration());
+ PCollection<Person> ageOnly = pipeline2.read(
+ new AvroFileSource<Person>(new Path(outputFile.getAbsolutePath()), Avros.specifics(Person.class)));
+ assertTrue(Lists.newArrayList(ageOnly.materialize()).isEmpty());
+ }
+
+ public static class RejectAllFilter implements UnboundRecordFilter {
+ @Override
+ public RecordFilter bind(Iterable<ColumnReader> readers) {
+ return new RecordFilter() {
+ @Override
+ public boolean isMatch() {
+ return false;
+ }
+ };
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/crunch/blob/b46c2b8b/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileSource.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileSource.java b/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileSource.java
index 76e80ef..41e0d8e 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileSource.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileSource.java
@@ -19,34 +19,80 @@ package org.apache.crunch.io.parquet;
import java.io.IOException;
import java.util.List;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.crunch.ReadableData;
import org.apache.crunch.impl.mr.run.RuntimeParameters;
import org.apache.crunch.io.FormatBundle;
import org.apache.crunch.io.ReadableSource;
-import org.apache.crunch.ReadableData;
import org.apache.crunch.io.impl.FileSourceImpl;
import org.apache.crunch.types.Converter;
import org.apache.crunch.types.avro.AvroType;
+import org.apache.crunch.types.avro.Avros;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+
+import org.apache.hadoop.util.ReflectionUtils;
import parquet.avro.AvroParquetInputFormat;
import parquet.avro.AvroReadSupport;
+import parquet.filter.UnboundRecordFilter;
-public class AvroParquetFileSource<T> extends FileSourceImpl<T> implements ReadableSource<T> {
+public class AvroParquetFileSource<T extends IndexedRecord> extends FileSourceImpl<T> implements ReadableSource<T> {
- private static <S> FormatBundle<AvroParquetInputFormat> getBundle(AvroType<S> ptype) {
- return FormatBundle.forInput(AvroParquetInputFormat.class)
- .set(AvroReadSupport.AVRO_REQUESTED_PROJECTION, ptype.getSchema().toString())
+ private final String projSchema;
+
+ private static <S> FormatBundle<AvroParquetInputFormat> getBundle(
+ AvroType<S> ptype,
+ Schema extSchema,
+ Class<? extends UnboundRecordFilter> filterClass) {
+ Schema schema = extSchema == null ? ptype.getSchema() : extSchema;
+ // Need to check that all fields are accounted for in ptype schema...
+ FormatBundle<AvroParquetInputFormat> fb = FormatBundle.forInput(AvroParquetInputFormat.class)
+ .set(AvroReadSupport.AVRO_REQUESTED_PROJECTION, schema.toString())
// ParquetRecordReader expects ParquetInputSplits, not FileSplits, so it
// doesn't work with CombineFileInputFormat
.set(RuntimeParameters.DISABLE_COMBINE_FILE, "true");
+ if (filterClass != null) {
+ fb.set("parquet.read.filter", filterClass.getName());
+ }
+ return fb;
}
public AvroParquetFileSource(Path path, AvroType<T> ptype) {
- super(path, ptype, getBundle(ptype));
+ this(ImmutableList.of(path), ptype);
+ }
+
+ public AvroParquetFileSource(Path path, AvroType<T> ptype, Schema schema) {
+ this(ImmutableList.of(path), ptype, schema);
}
public AvroParquetFileSource(List<Path> paths, AvroType<T> ptype) {
- super(paths, ptype, getBundle(ptype));
+ this(paths, ptype, null, null);
+ }
+
+ public AvroParquetFileSource(List<Path> paths, AvroType<T> ptype, Schema schema) {
+ this(paths, ptype, schema, null);
+ }
+
+ public AvroParquetFileSource(List<Path> paths, AvroType<T> ptype,
+ Class<? extends UnboundRecordFilter> filterClass) {
+ this(paths, ptype, null, filterClass);
+ }
+
+ public AvroParquetFileSource(List<Path> paths, AvroType<T> ptype, Schema schema,
+ Class<? extends UnboundRecordFilter> filterClass) {
+ super(paths, ptype, getBundle(ptype, schema, filterClass));
+ projSchema = schema == null ? null : schema.toString();
+ }
+
+ public Schema getProjectedSchema() {
+ return (new Schema.Parser()).parse(projSchema);
}
@Override
@@ -70,6 +116,69 @@ public class AvroParquetFileSource<T> extends FileSourceImpl<T> implements Reada
@Override
public String toString() {
- return "Parquet(" + pathsAsString() + ")";
+ return "Parquet(" + pathsAsString() + ((projSchema == null) ? ")" : ") -> " + projSchema);
+ }
+
+ public static <T extends SpecificRecord> Builder<T> builder(Class<T> clazz) {
+ return new Builder<T>(Preconditions.checkNotNull(clazz));
+ }
+
+ public static Builder<GenericRecord> builder(Schema schema) {
+ Preconditions.checkNotNull(schema);
+ Preconditions.checkArgument(Schema.Type.RECORD.equals(schema.getType()));
+ return new Builder(schema);
+ }
+
+ /**
+ * Helper class for constructing an {@code AvroParquetFileSource} that only reads a subset of the
+ * fields defined in an Avro schema.
+ */
+ public static class Builder<T extends IndexedRecord> {
+ private Class<T> clazz;
+ private Schema baseSchema;
+ private List<Schema.Field> fields = Lists.newArrayList();
+ private Class<? extends UnboundRecordFilter> filterClass;
+
+ private Builder(Class<T> clazz) {
+ this.clazz = clazz;
+ this.baseSchema = ReflectionUtils.newInstance(clazz, null).getSchema();
+ }
+
+ private Builder(Schema baseSchema) {
+ this.baseSchema = baseSchema;
+ }
+
+ public Builder includeField(String fieldName) {
+ Schema.Field field = baseSchema.getField(fieldName);
+ if (field == null) {
+ throw new IllegalArgumentException("No field " + fieldName + " in schema: " + baseSchema.getName());
+ }
+ fields.add(new Schema.Field(field.name(), field.schema(), field.doc(), field.defaultValue(), field.order()));
+ return this;
+ }
+
+ public Builder filterClass(Class<? extends UnboundRecordFilter> filterClass) {
+ this.filterClass = filterClass;
+ return this;
+ }
+
+ public AvroParquetFileSource<T> build(Path path) {
+ return build(ImmutableList.of(path));
+ }
+
+ public AvroParquetFileSource<T> build(List<Path> paths) {
+ AvroType<T> at = clazz == null ? Avros.generics(baseSchema) : Avros.specifics((Class) clazz);
+ if (fields.isEmpty()) {
+ return new AvroParquetFileSource<T>(paths, at, filterClass);
+ } else {
+ Schema projected = Schema.createRecord(
+ baseSchema.getName(),
+ baseSchema.getDoc(),
+ baseSchema.getNamespace(),
+ baseSchema.isError());
+ projected.setFields(fields);
+ return new AvroParquetFileSource<T>(paths, at, projected, filterClass);
+ }
+ }
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/crunch/blob/b46c2b8b/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileSourceTarget.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileSourceTarget.java b/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileSourceTarget.java
index 8d93eba..359a484 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileSourceTarget.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileSourceTarget.java
@@ -17,13 +17,14 @@
*/
package org.apache.crunch.io.parquet;
+import org.apache.avro.generic.IndexedRecord;
import org.apache.crunch.io.FileNamingScheme;
import org.apache.crunch.io.SequentialFileNamingScheme;
import org.apache.crunch.io.impl.ReadableSourcePathTargetImpl;
import org.apache.crunch.types.avro.AvroType;
import org.apache.hadoop.fs.Path;
-public class AvroParquetFileSourceTarget<T> extends ReadableSourcePathTargetImpl<T> {
+public class AvroParquetFileSourceTarget<T extends IndexedRecord> extends ReadableSourcePathTargetImpl<T> {
public AvroParquetFileSourceTarget(Path path, AvroType<T> atype) {
this(path, atype, SequentialFileNamingScheme.getInstance());
http://git-wip-us.apache.org/repos/asf/crunch/blob/b46c2b8b/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileTarget.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileTarget.java b/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileTarget.java
index c67b9f1..a6a34cd 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileTarget.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileTarget.java
@@ -91,8 +91,8 @@ public class AvroParquetFileTarget extends FileTargetImpl {
@Override
public <T> SourceTarget<T> asSourceTarget(PType<T> ptype) {
- if (ptype instanceof AvroType) {
- return new AvroParquetFileSourceTarget<T>(path, (AvroType<T>) ptype);
+ if (ptype instanceof AvroType && IndexedRecord.class.isAssignableFrom(((AvroType) ptype).getTypeClass())) {
+ return new AvroParquetFileSourceTarget(path, (AvroType<T>) ptype);
}
return null;
}
http://git-wip-us.apache.org/repos/asf/crunch/blob/b46c2b8b/crunch-core/src/test/avro/person.avsc
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/avro/person.avsc b/crunch-core/src/test/avro/person.avsc
index babd808..e0fff30 100644
--- a/crunch-core/src/test/avro/person.avsc
+++ b/crunch-core/src/test/avro/person.avsc
@@ -22,5 +22,5 @@
"fields": [
{"name": "name", "type": ["string", "null"] },
{"name": "age", "type": "int"},
- {"name": "siblingnames", "type": {"type": "array", "items": "string"}} ]
+ {"name": "siblingnames", "type" : [{ "type": "array", "items": "string" }, "null"], "default": null } ]
}