You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by bl...@apache.org on 2016/07/17 21:59:27 UTC

parquet-mr git commit: PARQUET-651: Improve Avro's isElementType check.

Repository: parquet-mr
Updated Branches:
  refs/heads/master a421d952e -> 626014eaf


PARQUET-651: Improve Avro's isElementType check.

The Avro implementation needs to check whether the read schema that is
passed by the user (or automatically converted from the file schema)
expects an extra 1-field layer to be returned, which matches the
previous behavior of Avro when reading a 3-level list. Before this
commit, the check was done by testing the structure of the expected list
element type against the repeated group's schema. If they matched, then
Avro assumed that the user expected an extra layer. However, for records
that happened to match (1-field records with a field named "element")
the check could be wrong and would cause exceptions later.

This commit updates the check to convert the file's element schema to
Avro and compare the compatibility of that schema with what was passed
by the user. This checks the entire tree from the element down and gets
the answer right based on the element and its children, not just the
field names on the element.

Author: Ryan Blue <bl...@apache.org>

Closes #352 from rdblue/PARQUET-651-improve-is-element-type-check and squashes the following commits:

ad9c1ee [Ryan Blue] PARQUET-651: Undo accidental default setting change.
1efa248 [Ryan Blue] PARQUET-651: Improve Avro's isElementType check.


Project: http://git-wip-us.apache.org/repos/asf/parquet-mr/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-mr/commit/626014ea
Tree: http://git-wip-us.apache.org/repos/asf/parquet-mr/tree/626014ea
Diff: http://git-wip-us.apache.org/repos/asf/parquet-mr/diff/626014ea

Branch: refs/heads/master
Commit: 626014eaf093fc2e3b53f5ad00c425bc209e1428
Parents: a421d95
Author: Ryan Blue <bl...@apache.org>
Authored: Sun Jul 17 14:59:20 2016 -0700
Committer: Ryan Blue <bl...@apache.org>
Committed: Sun Jul 17 14:59:20 2016 -0700

----------------------------------------------------------------------
 .../parquet/avro/AvroRecordConverter.java       |  23 ++-
 .../parquet/avro/AvroSchemaConverter.java       |  15 ++
 .../parquet/avro/TestArrayCompatibility.java    | 148 ++++++++++++++++++-
 3 files changed, 179 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/626014ea/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java
index 10bb29b..c0d6dc2 100644
--- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java
+++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java
@@ -42,9 +42,11 @@ import org.apache.avro.AvroTypeException;
 import org.apache.avro.Conversion;
 import org.apache.avro.LogicalType;
 import org.apache.avro.Schema;
+import org.apache.avro.SchemaCompatibility;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.reflect.AvroIgnore;
 import org.apache.avro.reflect.AvroName;
+import org.apache.avro.reflect.AvroSchema;
 import org.apache.avro.reflect.ReflectData;
 import org.apache.avro.reflect.Stringable;
 import org.apache.avro.specific.SpecificData;
@@ -52,6 +54,7 @@ import org.apache.avro.util.ClassUtils;
 import org.apache.parquet.Preconditions;
 import org.apache.parquet.avro.AvroConverters.FieldStringConverter;
 import org.apache.parquet.avro.AvroConverters.FieldStringableConverter;
+import org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator;
 import org.apache.parquet.io.InvalidRecordException;
 import org.apache.parquet.io.api.Converter;
 import org.apache.parquet.io.api.GroupConverter;
@@ -59,6 +62,8 @@ import org.apache.parquet.schema.GroupType;
 import org.apache.parquet.schema.MessageType;
 import org.apache.parquet.schema.Type;
 
+import static org.apache.avro.SchemaCompatibility.SchemaCompatibilityType.COMPATIBLE;
+import static org.apache.avro.SchemaCompatibility.checkReaderWriterCompatibility;
 import static org.apache.parquet.schema.Type.Repetition.REPEATED;
 import static org.apache.parquet.schema.Type.Repetition.REQUIRED;
 
@@ -827,6 +832,14 @@ class AvroRecordConverter<T> extends AvroConverters.AvroGroupConverter {
     }
   }
 
+  // Converter used to test whether a requested schema is a 2-level schema.
+  // This is used to convert the file's type assuming that the file uses
+  // 2-level lists and the result is checked to see if it matches the requested
+  // element type. This should always convert assuming 2-level lists because
+  // 2-level and 3-level can't be mixed.
+  private static final AvroSchemaConverter CONVERTER =
+      new AvroSchemaConverter(true);
+
   /**
    * Returns whether the given type is the element type of a list or is a
    * synthetic group with one field that is the element type. This is
@@ -849,13 +862,11 @@ class AvroRecordConverter<T> extends AvroConverters.AvroGroupConverter {
       return true;
     } else if (elementSchema != null &&
         elementSchema.getType() == Schema.Type.RECORD) {
-      Set<String> fieldNames = new HashSet<String>();
-      for (Schema.Field field : elementSchema.getFields()) {
-        fieldNames.add(field.name());
+      Schema schemaFromRepeated = CONVERTER.convert(repeatedType.asGroupType());
+      if (checkReaderWriterCompatibility(elementSchema, schemaFromRepeated)
+          .getType() == COMPATIBLE) {
+        return true;
       }
-      // The repeated type must be the element type because it matches the
-      // structure of the Avro element's schema.
-      return fieldNames.contains(repeatedType.asGroupType().getFieldName(0));
     }
     return false;
   }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/626014ea/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java
index 6b9b94c..70b6525 100644
--- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java
+++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java
@@ -64,6 +64,17 @@ public class AvroSchemaConverter {
     this.writeOldListStructure = WRITE_OLD_LIST_STRUCTURE_DEFAULT;
   }
 
+  /**
+   * Constructor used by {@link AvroRecordConverter#isElementType}, which always
+   * uses the 2-level list conversion.
+   *
+   * @param assumeRepeatedIsListElement whether to assume 2-level lists
+   */
+  AvroSchemaConverter(boolean assumeRepeatedIsListElement) {
+    this.assumeRepeatedIsListElement = assumeRepeatedIsListElement;
+    this.writeOldListStructure = WRITE_OLD_LIST_STRUCTURE_DEFAULT;
+  }
+
   public AvroSchemaConverter(Configuration conf) {
     this.assumeRepeatedIsListElement = conf.getBoolean(
         ADD_LIST_ELEMENT_RECORDS, ADD_LIST_ELEMENT_RECORDS_DEFAULT);
@@ -220,6 +231,10 @@ public class AvroSchemaConverter {
     return convertFields(parquetSchema.getName(), parquetSchema.getFields());
   }
 
+  Schema convert(GroupType parquetSchema) {
+    return convertFields(parquetSchema.getName(), parquetSchema.getFields());
+  }
+
   private Schema convertFields(String name, List<Type> parquetFields) {
     List<Schema.Field> fields = new ArrayList<Schema.Field>();
     for (Type parquetType : parquetFields) {

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/626014ea/parquet-avro/src/test/java/org/apache/parquet/avro/TestArrayCompatibility.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/test/java/org/apache/parquet/avro/TestArrayCompatibility.java b/parquet-avro/src/test/java/org/apache/parquet/avro/TestArrayCompatibility.java
index 29264f0..aa577ab 100644
--- a/parquet-avro/src/test/java/org/apache/parquet/avro/TestArrayCompatibility.java
+++ b/parquet-avro/src/test/java/org/apache/parquet/avro/TestArrayCompatibility.java
@@ -27,6 +27,8 @@ import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.IndexedRecord;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.schema.MessageType;
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Ignore;
@@ -44,10 +46,13 @@ import static org.apache.parquet.avro.AvroTestUtil.record;
 
 public class TestArrayCompatibility extends DirectWriterTest {
 
+  public static final Configuration OLD_BEHAVIOR_CONF = new Configuration();
   public static final Configuration NEW_BEHAVIOR_CONF = new Configuration();
 
   @BeforeClass
   public static void setupNewBehaviorConfiguration() {
+    OLD_BEHAVIOR_CONF.setBoolean(
+        AvroSchemaConverter.ADD_LIST_ELEMENT_RECORDS, true);
     NEW_BEHAVIOR_CONF.setBoolean(
         AvroSchemaConverter.ADD_LIST_ELEMENT_RECORDS, false);
   }
@@ -1035,9 +1040,143 @@ public class TestArrayCompatibility extends DirectWriterTest {
     assertReaderContains(newBehaviorReader(test), newSchema, newRecord);
   }
 
+  @Test
+  public void testListOfSingleElementStructsWithElementField()
+      throws Exception {
+    Path test = writeDirect(
+        "message ListOfSingleElementStructsWithElementField {" +
+            "  optional group list_of_structs (LIST) {" +
+            "    repeated group list {" +
+            "      required group element {" +
+            "        required float element;" +
+            "      }" +
+            "    }" +
+            "  }" +
+            "}",
+        new DirectWriter() {
+          @Override
+          public void write(RecordConsumer rc) {
+            rc.startMessage();
+            rc.startField("list_of_structs", 0);
+
+            rc.startGroup();
+            rc.startField("list", 0); // start writing array contents
+
+            // write a non-null element
+            rc.startGroup(); // array level
+            rc.startField("element", 0);
+
+            // the inner element field
+            rc.startGroup();
+            rc.startField("element", 0);
+            rc.addFloat(33.0F);
+            rc.endField("element", 0);
+            rc.endGroup();
+
+            rc.endField("element", 0);
+            rc.endGroup(); // array level
+
+            // write a second non-null element
+            rc.startGroup(); // array level
+            rc.startField("element", 0);
+
+            // the inner element field
+            rc.startGroup();
+            rc.startField("element", 0);
+            rc.addFloat(34.0F);
+            rc.endField("element", 0);
+            rc.endGroup();
+
+            rc.endField("element", 0);
+            rc.endGroup(); // array level
+
+            rc.endField("list", 0); // finished writing array contents
+            rc.endGroup();
+
+            rc.endField("list_of_structs", 0);
+            rc.endMessage();
+          }
+        });
+
+    Schema structWithElementField = record("element",
+        field("element", primitive(Schema.Type.FLOAT)));
+
+    // old behavior - assume that the repeated type is the element type
+    Schema elementRecord = record("list",
+        field("element", structWithElementField));
+    Schema oldSchema = record("ListOfSingleElementStructsWithElementField",
+        optionalField("list_of_structs", array(elementRecord)));
+    GenericRecord oldRecord = instance(oldSchema,
+        "list_of_structs", Arrays.asList(
+            instance(elementRecord, "element",
+                instance(structWithElementField, "element", 33.0F)),
+            instance(elementRecord, "element",
+                instance(structWithElementField, "element", 34.0F))));
+
+    // check the schema
+    ParquetFileReader reader = ParquetFileReader
+        .open(new Configuration(), test);
+    MessageType fileSchema = reader.getFileMetaData().getSchema();
+    Assert.assertEquals("Converted schema should assume 2-layer structure",
+        oldSchema,
+        new AvroSchemaConverter(OLD_BEHAVIOR_CONF).convert(fileSchema));
+
+    // both should default to the 2-layer structure
+    assertReaderContains(oldBehaviorReader(test), oldSchema, oldRecord);
+
+    Schema newSchema = record("ListOfSingleElementStructsWithElementField",
+        optionalField("list_of_structs", array(structWithElementField)));
+    GenericRecord newRecord = instance(newSchema,
+        "list_of_structs", Arrays.asList(
+            instance(structWithElementField, "element", 33.0F),
+            instance(structWithElementField, "element", 34.0F)));
+
+    // check the schema
+    Assert.assertEquals("Converted schema should assume 3-layer structure",
+        newSchema,
+        new AvroSchemaConverter(NEW_BEHAVIOR_CONF).convert(fileSchema));
+    assertReaderContains(newBehaviorReader(test), newSchema, newRecord);
+
+    // check that this works with compatible nested schemas
+
+    Schema structWithDoubleElementField = record("element",
+        field("element", primitive(Schema.Type.DOUBLE)));
+
+    Schema doubleElementRecord = record("list",
+        field("element", structWithDoubleElementField));
+    Schema oldDoubleSchema = record(
+        "ListOfSingleElementStructsWithElementField",
+        optionalField("list_of_structs", array(doubleElementRecord)));
+    GenericRecord oldDoubleRecord = instance(oldDoubleSchema,
+        "list_of_structs", Arrays.asList(
+            instance(doubleElementRecord, "element",
+                instance(structWithDoubleElementField, "element", 33.0)),
+            instance(doubleElementRecord, "element",
+                instance(structWithDoubleElementField, "element", 34.0))));
+    assertReaderContains(oldBehaviorReader(test, oldDoubleSchema),
+        oldDoubleSchema, oldDoubleRecord);
+
+    Schema newDoubleSchema = record(
+        "ListOfSingleElementStructsWithElementField",
+        optionalField("list_of_structs", array(structWithDoubleElementField)));
+    GenericRecord newDoubleRecord = instance(newDoubleSchema,
+        "list_of_structs", Arrays.asList(
+            instance(structWithDoubleElementField, "element", 33.0),
+            instance(structWithDoubleElementField, "element", 34.0)));
+    assertReaderContains(newBehaviorReader(test, newDoubleSchema),
+        newDoubleSchema, newDoubleRecord);
+  }
+
   public <T extends IndexedRecord> AvroParquetReader<T> oldBehaviorReader(
       Path path) throws IOException {
-    return new AvroParquetReader<T>(path);
+    return new AvroParquetReader<T>(OLD_BEHAVIOR_CONF, path);
+  }
+
+  public <T extends IndexedRecord> AvroParquetReader<T> oldBehaviorReader(
+      Path path, Schema expectedSchema) throws IOException {
+    Configuration conf = new Configuration(OLD_BEHAVIOR_CONF);
+    AvroReadSupport.setAvroReadSchema(conf, expectedSchema);
+    return new AvroParquetReader<T>(conf, path);
   }
 
   public <T extends IndexedRecord> AvroParquetReader<T> newBehaviorReader(
@@ -1045,6 +1184,13 @@ public class TestArrayCompatibility extends DirectWriterTest {
     return new AvroParquetReader<T>(NEW_BEHAVIOR_CONF, path);
   }
 
+  public <T extends IndexedRecord> AvroParquetReader<T> newBehaviorReader(
+      Path path, Schema expectedSchema) throws IOException {
+    Configuration conf = new Configuration(NEW_BEHAVIOR_CONF);
+    AvroReadSupport.setAvroReadSchema(conf, expectedSchema);
+    return new AvroParquetReader<T>(conf, path);
+  }
+
   public <T extends IndexedRecord> void assertReaderContains(
       AvroParquetReader<T> reader, Schema expectedSchema, T... expectedRecords)
       throws IOException {