You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2014/11/12 20:46:35 UTC
crunch git commit: CRUNCH-480: Differentiate between read/projection
schemas and selectively enable/disable the combine file format by default
Repository: crunch
Updated Branches:
refs/heads/master 253326148 -> 2c7821fd3
CRUNCH-480: Differentiate between read/projection schemas and selectively enable/disable the combine file format by default
Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/2c7821fd
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/2c7821fd
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/2c7821fd
Branch: refs/heads/master
Commit: 2c7821fd353dd1618af112d27cbc3a936142d61c
Parents: 2533261
Author: Josh Wills <jw...@apache.org>
Authored: Wed Nov 12 11:08:13 2014 -0800
Committer: Josh Wills <jw...@apache.org>
Committed: Wed Nov 12 11:08:13 2014 -0800
----------------------------------------------------------------------
.../parquet/AvroParquetFileSourceTargetIT.java | 139 +++++++++++++++++--
.../io/parquet/AvroParquetFileSource.java | 65 ++++++---
pom.xml | 2 +-
3 files changed, 180 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/crunch/blob/2c7821fd/crunch-core/src/it/java/org/apache/crunch/io/parquet/AvroParquetFileSourceTargetIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/io/parquet/AvroParquetFileSourceTargetIT.java b/crunch-core/src/it/java/org/apache/crunch/io/parquet/AvroParquetFileSourceTargetIT.java
index 5c7d9e0..d75d9da 100644
--- a/crunch-core/src/it/java/org/apache/crunch/io/parquet/AvroParquetFileSourceTargetIT.java
+++ b/crunch-core/src/it/java/org/apache/crunch/io/parquet/AvroParquetFileSourceTargetIT.java
@@ -20,20 +20,26 @@ package org.apache.crunch.io.parquet;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.util.List;
+import com.google.common.collect.Iterables;
import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
+import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericData.Record;
import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.util.Utf8;
import org.apache.crunch.FilterFn;
import org.apache.crunch.PCollection;
import org.apache.crunch.Pipeline;
import org.apache.crunch.Target;
import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.io.From;
import org.apache.crunch.io.To;
import org.apache.crunch.io.avro.AvroFileSource;
import org.apache.crunch.test.Person;
@@ -145,14 +151,13 @@ public class AvroParquetFileSourceTargetIT implements Serializable {
PCollection<Person> ageOnly = pipeline2.read(
new AvroFileSource<Person>(new Path(outputFile.getAbsolutePath()), Avros.specifics(Person.class)));
- for (Person person : ageOnly.materialize()) {
- assertNull(person.getName());
- assertEquals(person.getAge(), new Integer(42));
- assertNull(person.getSiblingnames());
- }
+ Person person = Iterables.getOnlyElement(ageOnly.materialize());
+ assertNull(person.getName());
+ assertEquals(person.getAge(), new Integer(42));
+ assertNull(person.getSiblingnames());
}
- @Test(expected = IndexOutOfBoundsException.class)
+ @Test
public void testProjectionGeneric() throws IOException {
GenericRecord savedRecord = new Record(Person.SCHEMA$);
savedRecord.put("name", "John Doe");
@@ -176,13 +181,129 @@ public class AvroParquetFileSourceTargetIT implements Serializable {
PCollection<Record> ageOnly = pipeline2.read(
new AvroFileSource<Record>(new Path(outputFile.getAbsolutePath()), Avros.generics(src.getProjectedSchema())));
- for (Record person : ageOnly.materialize()) {
- assertEquals(person.get(0), 42);
- Object notAge = person.get(1);
+ Record person = Iterables.getOnlyElement(ageOnly.materialize());
+ assertEquals(person.get(0), 42);
+ try {
+ person.get(1);
+ fail("Trying to get field outside of projection should fail");
+ } catch (IndexOutOfBoundsException e) {
+ // Expected
}
}
@Test
+ public void testCustomReadSchema_FieldSubset() throws IOException {
+ Schema readSchema = SchemaBuilder.record("PersonSubset")
+ .namespace("org.apache.crunch.test")
+ .fields()
+ .optionalString("name")
+ .endRecord();
+ GenericRecord savedRecord = new Record(Person.SCHEMA$);
+ savedRecord.put("name", "John Doe");
+ savedRecord.put("age", 42);
+ savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane"));
+ populateGenericFile(Lists.newArrayList(savedRecord), Person.SCHEMA$);
+
+ Pipeline pipeline = new MRPipeline(AvroParquetFileSourceTargetIT.class, tmpDir.getDefaultConfiguration());
+ PCollection<GenericRecord> genericCollection = pipeline.read(
+ AvroParquetFileSource.builder(readSchema)
+ .includeField("name")
+ .build(new Path(avroFile.getAbsolutePath())));
+
+ File outputFile = tmpDir.getFile("output");
+ Target avroFile = To.avroFile(outputFile.getAbsolutePath());
+ genericCollection.write(avroFile);
+ pipeline.done();
+
+ Pipeline pipeline2 = new MRPipeline(AvroParquetFileSourceTargetIT.class,
+ tmpDir.getDefaultConfiguration());
+ PCollection<GenericData.Record> namedPersonRecords = pipeline2.read(
+ From.avroFile(new Path(outputFile.getAbsolutePath())));
+
+ GenericRecord personSubset = Iterables.getOnlyElement(namedPersonRecords.materialize());
+
+ assertEquals(readSchema, personSubset.getSchema());
+ assertEquals(new Utf8("John Doe"), personSubset.get("name"));
+ }
+
+ @Test
+ public void testCustomReadSchemaGeneric_FieldSuperset() throws IOException {
+ Schema readSchema = SchemaBuilder.record("PersonSuperset")
+ .namespace("org.apache.crunch.test")
+ .fields()
+ .optionalString("name")
+ .optionalInt("age")
+ .name("siblingnames").type(Person.SCHEMA$.getField("siblingnames").schema()).withDefault(null)
+ .name("employer").type().stringType().stringDefault("Acme Corp")
+ .endRecord();
+ GenericRecord savedRecord = new Record(Person.SCHEMA$);
+ savedRecord.put("name", "John Doe");
+ savedRecord.put("age", 42);
+ savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane"));
+ populateGenericFile(Lists.newArrayList(savedRecord), Person.SCHEMA$);
+
+ Pipeline pipeline = new MRPipeline(AvroParquetFileSourceTargetIT.class, tmpDir.getDefaultConfiguration());
+ PCollection<GenericRecord> genericCollection = pipeline.read(
+ AvroParquetFileSource.builder(readSchema)
+ .build(new Path(avroFile.getAbsolutePath())));
+
+ File outputFile = tmpDir.getFile("output");
+ Target avroFile = To.avroFile(outputFile.getAbsolutePath());
+ genericCollection.write(avroFile);
+ pipeline.done();
+
+ Pipeline pipeline2 = new MRPipeline(AvroParquetFileSourceTargetIT.class,
+ tmpDir.getDefaultConfiguration());
+ PCollection<GenericData.Record> namedPersonRecords = pipeline2.read(
+ From.avroFile(new Path(outputFile.getAbsolutePath())));
+
+ GenericRecord personSuperset = Iterables.getOnlyElement(namedPersonRecords.materialize());
+
+ assertEquals(readSchema, personSuperset.getSchema());
+ assertEquals(new Utf8("John Doe"), personSuperset.get("name"));
+ assertEquals(42, personSuperset.get("age"));
+ assertEquals(Lists.newArrayList(new Utf8("Jimmy"), new Utf8("Jane")), personSuperset.get("siblingnames"));
+ assertEquals(new Utf8("Acme Corp"), personSuperset.get("employer"));
+ }
+
+ @Test
+ public void testCustomReadSchemaWithProjection() throws IOException {
+ Schema readSchema = SchemaBuilder.record("PersonSubsetWithProjection")
+ .namespace("org.apache.crunch.test")
+ .fields()
+ .optionalString("name")
+ .optionalInt("age")
+ .endRecord();
+ GenericRecord savedRecord = new Record(Person.SCHEMA$);
+ savedRecord.put("name", "John Doe");
+ savedRecord.put("age", 42);
+ savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane"));
+ populateGenericFile(Lists.newArrayList(savedRecord), Person.SCHEMA$);
+
+ Pipeline pipeline = new MRPipeline(AvroParquetFileSourceTargetIT.class, tmpDir.getDefaultConfiguration());
+ PCollection<GenericRecord> genericCollection = pipeline.read(
+ AvroParquetFileSource.builder(readSchema)
+ .includeField("name")
+ .build(new Path(avroFile.getAbsolutePath())));
+
+ File outputFile = tmpDir.getFile("output");
+ Target avroFile = To.avroFile(outputFile.getAbsolutePath());
+ genericCollection.write(avroFile);
+ pipeline.done();
+
+ Pipeline pipeline2 = new MRPipeline(AvroParquetFileSourceTargetIT.class,
+ tmpDir.getDefaultConfiguration());
+ PCollection<GenericData.Record> namedPersonRecords = pipeline2.read(
+ From.avroFile(new Path(outputFile.getAbsolutePath())));
+
+ GenericRecord personSubset = Iterables.getOnlyElement(namedPersonRecords.materialize());
+
+ assertEquals(readSchema, personSubset.getSchema());
+ assertEquals(new Utf8("John Doe"), personSubset.get("name"));
+ assertNull(personSubset.get("age"));
+ }
+
+ @Test
public void testProjectionFiltered() throws IOException {
GenericRecord savedRecord = new Record(Person.SCHEMA$);
savedRecord.put("name", "John Doe");
http://git-wip-us.apache.org/repos/asf/crunch/blob/2c7821fd/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileSource.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileSource.java b/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileSource.java
index 41e0d8e..ffca414 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileSource.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileSource.java
@@ -38,29 +38,37 @@ import org.apache.crunch.types.avro.Avros;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.util.ReflectionUtils;
import parquet.avro.AvroParquetInputFormat;
import parquet.avro.AvroReadSupport;
import parquet.filter.UnboundRecordFilter;
+import parquet.hadoop.ParquetInputSplit;
public class AvroParquetFileSource<T extends IndexedRecord> extends FileSourceImpl<T> implements ReadableSource<T> {
+ private static final String AVRO_READ_SCHEMA = "parquet.avro.read.schema";
+
private final String projSchema;
private static <S> FormatBundle<AvroParquetInputFormat> getBundle(
AvroType<S> ptype,
- Schema extSchema,
+ Schema projSchema,
Class<? extends UnboundRecordFilter> filterClass) {
- Schema schema = extSchema == null ? ptype.getSchema() : extSchema;
- // Need to check that all fields are accounted for in ptype schema...
FormatBundle<AvroParquetInputFormat> fb = FormatBundle.forInput(AvroParquetInputFormat.class)
- .set(AvroReadSupport.AVRO_REQUESTED_PROJECTION, schema.toString())
- // ParquetRecordReader expects ParquetInputSplits, not FileSplits, so it
- // doesn't work with CombineFileInputFormat
- .set(RuntimeParameters.DISABLE_COMBINE_FILE, "true");
+ .set(AVRO_READ_SCHEMA, ptype.getSchema().toString());
+
+ if (projSchema != null) {
+ fb.set(AvroReadSupport.AVRO_REQUESTED_PROJECTION, projSchema.toString());
+ }
if (filterClass != null) {
fb.set("parquet.read.filter", filterClass.getName());
}
+ if (!FileSplit.class.isAssignableFrom(ParquetInputSplit.class)) {
+ // Older ParquetRecordReader expects ParquetInputSplits, not FileSplits, so it
+ // doesn't work with CombineFileInputFormat
+ fb.set(RuntimeParameters.DISABLE_COMBINE_FILE, "true");
+ }
return fb;
}
@@ -68,16 +76,32 @@ public class AvroParquetFileSource<T extends IndexedRecord> extends FileSourceIm
this(ImmutableList.of(path), ptype);
}
- public AvroParquetFileSource(Path path, AvroType<T> ptype, Schema schema) {
- this(ImmutableList.of(path), ptype, schema);
+ /**
+ * Read the Parquet data at the given path using the schema of the {@code AvroType}, and projecting
+ * a subset of the columns from this schema via the separately given {@code Schema}.
+ *
+ * @param path the path of the file to read
+ * @param ptype the AvroType to use in reading the file
+ * @param projSchema the subset of columns from the input schema to read
+ */
+ public AvroParquetFileSource(Path path, AvroType<T> ptype, Schema projSchema) {
+ this(ImmutableList.of(path), ptype, projSchema);
}
public AvroParquetFileSource(List<Path> paths, AvroType<T> ptype) {
this(paths, ptype, null, null);
}
-
- public AvroParquetFileSource(List<Path> paths, AvroType<T> ptype, Schema schema) {
- this(paths, ptype, schema, null);
+
+ /**
+ * Read the Parquet data at the given paths using the schema of the {@code AvroType}, and projecting
+ * a subset of the columns from this schema via the separately given {@code Schema}.
+ *
+ * @param paths the list of paths to read
+ * @param ptype the AvroType to use in reading the file
+ * @param projSchema the subset of columns from the input schema to read
+ */
+ public AvroParquetFileSource(List<Path> paths, AvroType<T> ptype, Schema projSchema) {
+ this(paths, ptype, projSchema, null);
}
public AvroParquetFileSource(List<Path> paths, AvroType<T> ptype,
@@ -85,10 +109,19 @@ public class AvroParquetFileSource<T extends IndexedRecord> extends FileSourceIm
this(paths, ptype, null, filterClass);
}
- public AvroParquetFileSource(List<Path> paths, AvroType<T> ptype, Schema schema,
+ /**
+ * Read the Parquet data at the given paths using the schema of the {@code AvroType}, projecting
+ * a subset of the columns from this schema via the separately given {@code Schema}, and using
+ * the filter class to select the input records.
+ *
+ * @param paths the list of paths to read
+ * @param ptype the AvroType to use in reading the file
+ * @param projSchema the subset of columns from the input schema to read
+ */
+ public AvroParquetFileSource(List<Path> paths, AvroType<T> ptype, Schema projSchema,
Class<? extends UnboundRecordFilter> filterClass) {
- super(paths, ptype, getBundle(ptype, schema, filterClass));
- projSchema = schema == null ? null : schema.toString();
+ super(paths, ptype, getBundle(ptype, projSchema, filterClass));
+ this.projSchema = projSchema == null ? null : projSchema.toString();
}
public Schema getProjectedSchema() {
@@ -181,4 +214,4 @@ public class AvroParquetFileSource<T extends IndexedRecord> extends FileSourceIm
}
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/2c7821fd/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 569c2e3..1237347 100644
--- a/pom.xml
+++ b/pom.xml
@@ -75,7 +75,7 @@ under the License.
<commons-cli.version>1.2</commons-cli.version>
<avro.version>1.7.7</avro.version>
<hive.version>0.13.1</hive.version>
- <parquet.version>1.3.2</parquet.version>
+ <parquet.version>1.4.3</parquet.version>
<javassist.version>3.16.1-GA</javassist.version>
<jackson.version>1.8.8</jackson.version>
<protobuf-java.version>2.5.0</protobuf-java.version>