You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2014/11/12 20:46:35 UTC

crunch git commit: CRUNCH-480: Differentiate between read/projection schemas and selectively enable/disable the combine file format by default

Repository: crunch
Updated Branches:
  refs/heads/master 253326148 -> 2c7821fd3


CRUNCH-480: Differentiate between read/projection schemas and selectively enable/disable the combine file format by default


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/2c7821fd
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/2c7821fd
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/2c7821fd

Branch: refs/heads/master
Commit: 2c7821fd353dd1618af112d27cbc3a936142d61c
Parents: 2533261
Author: Josh Wills <jw...@apache.org>
Authored: Wed Nov 12 11:08:13 2014 -0800
Committer: Josh Wills <jw...@apache.org>
Committed: Wed Nov 12 11:08:13 2014 -0800

----------------------------------------------------------------------
 .../parquet/AvroParquetFileSourceTargetIT.java  | 139 +++++++++++++++++--
 .../io/parquet/AvroParquetFileSource.java       |  65 ++++++---
 pom.xml                                         |   2 +-
 3 files changed, 180 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/2c7821fd/crunch-core/src/it/java/org/apache/crunch/io/parquet/AvroParquetFileSourceTargetIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/io/parquet/AvroParquetFileSourceTargetIT.java b/crunch-core/src/it/java/org/apache/crunch/io/parquet/AvroParquetFileSourceTargetIT.java
index 5c7d9e0..d75d9da 100644
--- a/crunch-core/src/it/java/org/apache/crunch/io/parquet/AvroParquetFileSourceTargetIT.java
+++ b/crunch-core/src/it/java/org/apache/crunch/io/parquet/AvroParquetFileSourceTargetIT.java
@@ -20,20 +20,26 @@ package org.apache.crunch.io.parquet;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 import java.io.File;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.List;
 
+import com.google.common.collect.Iterables;
 import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
+import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericData.Record;
 import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.util.Utf8;
 import org.apache.crunch.FilterFn;
 import org.apache.crunch.PCollection;
 import org.apache.crunch.Pipeline;
 import org.apache.crunch.Target;
 import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.io.From;
 import org.apache.crunch.io.To;
 import org.apache.crunch.io.avro.AvroFileSource;
 import org.apache.crunch.test.Person;
@@ -145,14 +151,13 @@ public class AvroParquetFileSourceTargetIT implements Serializable {
     PCollection<Person> ageOnly = pipeline2.read(
         new AvroFileSource<Person>(new Path(outputFile.getAbsolutePath()), Avros.specifics(Person.class)));
 
-    for (Person person : ageOnly.materialize()) {
-      assertNull(person.getName());
-      assertEquals(person.getAge(), new Integer(42));
-      assertNull(person.getSiblingnames());
-    }
+    Person person = Iterables.getOnlyElement(ageOnly.materialize());
+    assertNull(person.getName());
+    assertEquals(person.getAge(), new Integer(42));
+    assertNull(person.getSiblingnames());
   }
 
-  @Test(expected = IndexOutOfBoundsException.class)
+  @Test
   public void testProjectionGeneric() throws IOException {
     GenericRecord savedRecord = new Record(Person.SCHEMA$);
     savedRecord.put("name", "John Doe");
@@ -176,13 +181,129 @@ public class AvroParquetFileSourceTargetIT implements Serializable {
     PCollection<Record> ageOnly = pipeline2.read(
         new AvroFileSource<Record>(new Path(outputFile.getAbsolutePath()), Avros.generics(src.getProjectedSchema())));
 
-    for (Record person : ageOnly.materialize()) {
-      assertEquals(person.get(0), 42);
-      Object notAge = person.get(1);
+    Record person = Iterables.getOnlyElement(ageOnly.materialize());
+    assertEquals(person.get(0), 42);
+    try {
+      person.get(1);
+      fail("Trying to get field outside of projection should fail");
+    } catch (IndexOutOfBoundsException e) {
+      // Expected
     }
   }
 
   @Test
+  public void testCustomReadSchema_FieldSubset() throws IOException {
+    Schema readSchema = SchemaBuilder.record("PersonSubset")
+        .namespace("org.apache.crunch.test")
+        .fields()
+        .optionalString("name")
+        .endRecord();
+    GenericRecord savedRecord = new Record(Person.SCHEMA$);
+    savedRecord.put("name", "John Doe");
+    savedRecord.put("age", 42);
+    savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane"));
+    populateGenericFile(Lists.newArrayList(savedRecord), Person.SCHEMA$);
+
+    Pipeline pipeline = new MRPipeline(AvroParquetFileSourceTargetIT.class, tmpDir.getDefaultConfiguration());
+    PCollection<GenericRecord> genericCollection = pipeline.read(
+        AvroParquetFileSource.builder(readSchema)
+            .includeField("name")
+            .build(new Path(avroFile.getAbsolutePath())));
+
+    File outputFile = tmpDir.getFile("output");
+    Target avroFile = To.avroFile(outputFile.getAbsolutePath());
+    genericCollection.write(avroFile);
+    pipeline.done();
+
+    Pipeline pipeline2 = new MRPipeline(AvroParquetFileSourceTargetIT.class,
+        tmpDir.getDefaultConfiguration());
+    PCollection<GenericData.Record> namedPersonRecords = pipeline2.read(
+        From.avroFile(new Path(outputFile.getAbsolutePath())));
+
+    GenericRecord personSubset = Iterables.getOnlyElement(namedPersonRecords.materialize());
+
+    assertEquals(readSchema, personSubset.getSchema());
+    assertEquals(new Utf8("John Doe"), personSubset.get("name"));
+  }
+
+  @Test
+  public void testCustomReadSchemaGeneric_FieldSuperset() throws IOException {
+    Schema readSchema = SchemaBuilder.record("PersonSuperset")
+        .namespace("org.apache.crunch.test")
+        .fields()
+        .optionalString("name")
+        .optionalInt("age")
+        .name("siblingnames").type(Person.SCHEMA$.getField("siblingnames").schema()).withDefault(null)
+        .name("employer").type().stringType().stringDefault("Acme Corp")
+        .endRecord();
+    GenericRecord savedRecord = new Record(Person.SCHEMA$);
+    savedRecord.put("name", "John Doe");
+    savedRecord.put("age", 42);
+    savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane"));
+    populateGenericFile(Lists.newArrayList(savedRecord), Person.SCHEMA$);
+
+    Pipeline pipeline = new MRPipeline(AvroParquetFileSourceTargetIT.class, tmpDir.getDefaultConfiguration());
+    PCollection<GenericRecord> genericCollection = pipeline.read(
+        AvroParquetFileSource.builder(readSchema)
+            .build(new Path(avroFile.getAbsolutePath())));
+
+    File outputFile = tmpDir.getFile("output");
+    Target avroFile = To.avroFile(outputFile.getAbsolutePath());
+    genericCollection.write(avroFile);
+    pipeline.done();
+
+    Pipeline pipeline2 = new MRPipeline(AvroParquetFileSourceTargetIT.class,
+        tmpDir.getDefaultConfiguration());
+    PCollection<GenericData.Record> namedPersonRecords = pipeline2.read(
+        From.avroFile(new Path(outputFile.getAbsolutePath())));
+
+    GenericRecord personSuperset = Iterables.getOnlyElement(namedPersonRecords.materialize());
+
+    assertEquals(readSchema, personSuperset.getSchema());
+    assertEquals(new Utf8("John Doe"), personSuperset.get("name"));
+    assertEquals(42, personSuperset.get("age"));
+    assertEquals(Lists.newArrayList(new Utf8("Jimmy"), new Utf8("Jane")), personSuperset.get("siblingnames"));
+    assertEquals(new Utf8("Acme Corp"), personSuperset.get("employer"));
+  }
+
+  @Test
+  public void testCustomReadSchemaWithProjection() throws IOException {
+    Schema readSchema = SchemaBuilder.record("PersonSubsetWithProjection")
+        .namespace("org.apache.crunch.test")
+        .fields()
+        .optionalString("name")
+        .optionalInt("age")
+        .endRecord();
+    GenericRecord savedRecord = new Record(Person.SCHEMA$);
+    savedRecord.put("name", "John Doe");
+    savedRecord.put("age", 42);
+    savedRecord.put("siblingnames", Lists.newArrayList("Jimmy", "Jane"));
+    populateGenericFile(Lists.newArrayList(savedRecord), Person.SCHEMA$);
+
+    Pipeline pipeline = new MRPipeline(AvroParquetFileSourceTargetIT.class, tmpDir.getDefaultConfiguration());
+    PCollection<GenericRecord> genericCollection = pipeline.read(
+        AvroParquetFileSource.builder(readSchema)
+            .includeField("name")
+            .build(new Path(avroFile.getAbsolutePath())));
+
+    File outputFile = tmpDir.getFile("output");
+    Target avroFile = To.avroFile(outputFile.getAbsolutePath());
+    genericCollection.write(avroFile);
+    pipeline.done();
+
+    Pipeline pipeline2 = new MRPipeline(AvroParquetFileSourceTargetIT.class,
+        tmpDir.getDefaultConfiguration());
+    PCollection<GenericData.Record> namedPersonRecords = pipeline2.read(
+        From.avroFile(new Path(outputFile.getAbsolutePath())));
+
+    GenericRecord personSubset = Iterables.getOnlyElement(namedPersonRecords.materialize());
+
+    assertEquals(readSchema, personSubset.getSchema());
+    assertEquals(new Utf8("John Doe"), personSubset.get("name"));
+    assertNull(personSubset.get("age"));
+  }
+
+  @Test
   public void testProjectionFiltered() throws IOException {
     GenericRecord savedRecord = new Record(Person.SCHEMA$);
     savedRecord.put("name", "John Doe");

http://git-wip-us.apache.org/repos/asf/crunch/blob/2c7821fd/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileSource.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileSource.java b/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileSource.java
index 41e0d8e..ffca414 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileSource.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/parquet/AvroParquetFileSource.java
@@ -38,29 +38,37 @@ import org.apache.crunch.types.avro.Avros;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
 import org.apache.hadoop.util.ReflectionUtils;
 import parquet.avro.AvroParquetInputFormat;
 import parquet.avro.AvroReadSupport;
 import parquet.filter.UnboundRecordFilter;
+import parquet.hadoop.ParquetInputSplit;
 
 public class AvroParquetFileSource<T extends IndexedRecord> extends FileSourceImpl<T> implements ReadableSource<T> {
 
+  private static final String AVRO_READ_SCHEMA = "parquet.avro.read.schema";
+
   private final String projSchema;
 
   private static <S> FormatBundle<AvroParquetInputFormat> getBundle(
       AvroType<S> ptype,
-      Schema extSchema,
+      Schema projSchema,
       Class<? extends UnboundRecordFilter> filterClass) {
-    Schema schema = extSchema == null ? ptype.getSchema() : extSchema;
-    // Need to check that all fields are accounted for in ptype schema...
     FormatBundle<AvroParquetInputFormat> fb = FormatBundle.forInput(AvroParquetInputFormat.class)
-        .set(AvroReadSupport.AVRO_REQUESTED_PROJECTION, schema.toString())
-        // ParquetRecordReader expects ParquetInputSplits, not FileSplits, so it
-        // doesn't work with CombineFileInputFormat
-        .set(RuntimeParameters.DISABLE_COMBINE_FILE, "true");
+        .set(AVRO_READ_SCHEMA, ptype.getSchema().toString());
+
+    if (projSchema != null) {
+      fb.set(AvroReadSupport.AVRO_REQUESTED_PROJECTION, projSchema.toString());
+    }
     if (filterClass != null) {
       fb.set("parquet.read.filter", filterClass.getName());
     }
+    if (!FileSplit.class.isAssignableFrom(ParquetInputSplit.class)) {
+      // Older ParquetRecordReader expects ParquetInputSplits, not FileSplits, so it
+      // doesn't work with CombineFileInputFormat
+      fb.set(RuntimeParameters.DISABLE_COMBINE_FILE, "true");
+    }
     return fb;
   }
 
@@ -68,16 +76,32 @@ public class AvroParquetFileSource<T extends IndexedRecord> extends FileSourceIm
     this(ImmutableList.of(path), ptype);
   }
 
-  public AvroParquetFileSource(Path path, AvroType<T> ptype, Schema schema) {
-    this(ImmutableList.of(path), ptype, schema);
+  /**
+   * Read the Parquet data at the given path using the schema of the {@code AvroType}, and projecting
+   * a subset of the columns from this schema via the separately given {@code Schema}.
+   *
+   * @param path the path of the file to read
+   * @param ptype the AvroType to use in reading the file
+   * @param projSchema the subset of columns from the input schema to read
+   */
+  public AvroParquetFileSource(Path path, AvroType<T> ptype, Schema projSchema) {
+    this(ImmutableList.of(path), ptype, projSchema);
   }
 
   public AvroParquetFileSource(List<Path> paths, AvroType<T> ptype) {
     this(paths, ptype, null, null);
   }
-  
-  public AvroParquetFileSource(List<Path> paths, AvroType<T> ptype, Schema schema) {
-    this(paths, ptype, schema, null);
+
+  /**
+   * Read the Parquet data at the given paths using the schema of the {@code AvroType}, and projecting
+   * a subset of the columns from this schema via the separately given {@code Schema}.
+   *
+   * @param paths the list of paths to read
+   * @param ptype the AvroType to use in reading the file
+   * @param projSchema the subset of columns from the input schema to read
+   */
+  public AvroParquetFileSource(List<Path> paths, AvroType<T> ptype, Schema projSchema) {
+    this(paths, ptype, projSchema, null);
   }
 
   public AvroParquetFileSource(List<Path> paths, AvroType<T> ptype,
@@ -85,10 +109,19 @@ public class AvroParquetFileSource<T extends IndexedRecord> extends FileSourceIm
     this(paths, ptype, null, filterClass);
   }
 
-  public AvroParquetFileSource(List<Path> paths, AvroType<T> ptype, Schema schema,
+  /**
+   * Read the Parquet data at the given paths using the schema of the {@code AvroType}, projecting
+   * a subset of the columns from this schema via the separately given {@code Schema}, and using
+   * the filter class to select the input records.
+   *
+   * @param paths the list of paths to read
+   * @param ptype the AvroType to use in reading the file
+   * @param projSchema the subset of columns from the input schema to read
+   */
+  public AvroParquetFileSource(List<Path> paths, AvroType<T> ptype, Schema projSchema,
                                Class<? extends UnboundRecordFilter> filterClass) {
-    super(paths, ptype, getBundle(ptype, schema, filterClass));
-    projSchema = schema == null ? null : schema.toString();
+    super(paths, ptype, getBundle(ptype, projSchema, filterClass));
+    this.projSchema = projSchema == null ? null : projSchema.toString();
   }
 
   public Schema getProjectedSchema() {
@@ -181,4 +214,4 @@ public class AvroParquetFileSource<T extends IndexedRecord> extends FileSourceIm
       }
     }
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/2c7821fd/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 569c2e3..1237347 100644
--- a/pom.xml
+++ b/pom.xml
@@ -75,7 +75,7 @@ under the License.
     <commons-cli.version>1.2</commons-cli.version>
     <avro.version>1.7.7</avro.version>
     <hive.version>0.13.1</hive.version>
-    <parquet.version>1.3.2</parquet.version>
+    <parquet.version>1.4.3</parquet.version>
     <javassist.version>3.16.1-GA</javassist.version>
     <jackson.version>1.8.8</jackson.version>
     <protobuf-java.version>2.5.0</protobuf-java.version>