You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by al...@apache.org on 2015/03/12 22:25:19 UTC

incubator-parquet-mr git commit: PARQUET-215 Discard records with unrecognized union members in the thrift write path

Repository: incubator-parquet-mr
Updated Branches:
  refs/heads/master b58789c5b -> 77826fda8


PARQUET-215 Discard records with unrecognized union members in the thrift write path

Fixes Parquet-215, adds a test case for it, and fixes some tests that were quietly not doing anything previously to actually exercise the code they were intended to exercise. (they were tests that catch exceptions and make assertions about them, but never enforced that the exception was actually thrown, and in one case, it never was).

Author: Alex Levenson <al...@twitter.com>

Closes #146 from isnotinvain/alexlevenson/unrecognized-union and squashes the following commits:

7bec4a6 [Alex Levenson] Add license header
b0d8e6c [Alex Levenson] Merge branch 'master' into alexlevenson/unrecognized-union
e152bc8 [Alex Levenson] Update comment
97232b7 [Alex Levenson] Address comments
c542199 [Alex Levenson] Go back to using boolean for isUnion
2e18dbd [Alex Levenson] Remove exclusion
0a60c46 [Alex Levenson] Support isUnion being unknown
b0dfdf9 [Alex Levenson] Fix tests
68940d7 [Alex Levenson] Discard records with unrecognized union members in the thrift write path


Project: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/commit/77826fda
Tree: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/tree/77826fda
Diff: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/diff/77826fda

Branch: refs/heads/master
Commit: 77826fda8751bd5c0acbca5d0c3887e9a6b10f65
Parents: b58789c
Author: Alex Levenson <al...@twitter.com>
Authored: Thu Mar 12 14:25:13 2015 -0700
Committer: Alex Levenson <al...@twitter.com>
Committed: Thu Mar 12 14:25:13 2015 -0700

----------------------------------------------------------------------
 .../parquet/scrooge/ScroogeStructConverter.java |  7 +-
 .../thrift/BufferedProtocolReadToWrite.java     | 29 +++++++--
 .../parquet/thrift/ThriftSchemaConverter.java   |  9 ++-
 .../java/parquet/thrift/struct/ThriftType.java  | 28 +++++++-
 .../parquet/thrift/TestProtocolReadToWrite.java | 45 ++++++++++++-
 .../parquet/thrift/struct/TestThriftType.java   | 67 ++++++++++++++++++++
 parquet-thrift/src/test/thrift/compat.thrift    | 33 ++++++++++
 7 files changed, 207 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/77826fda/parquet-scrooge/src/main/java/parquet/scrooge/ScroogeStructConverter.java
----------------------------------------------------------------------
diff --git a/parquet-scrooge/src/main/java/parquet/scrooge/ScroogeStructConverter.java b/parquet-scrooge/src/main/java/parquet/scrooge/ScroogeStructConverter.java
index 22bbf6a..c5bb72f 100644
--- a/parquet-scrooge/src/main/java/parquet/scrooge/ScroogeStructConverter.java
+++ b/parquet-scrooge/src/main/java/parquet/scrooge/ScroogeStructConverter.java
@@ -22,6 +22,7 @@ import com.twitter.scrooge.ThriftStructCodec;
 import com.twitter.scrooge.ThriftStructFieldInfo;
 import parquet.thrift.struct.ThriftField;
 import parquet.thrift.struct.ThriftType;
+import parquet.thrift.struct.ThriftType.StructType.StructOrUnionType;
 import parquet.thrift.struct.ThriftTypeID;
 import scala.collection.JavaConversions;
 import scala.collection.JavaConversions$;
@@ -80,7 +81,11 @@ public class ScroogeStructConverter {
     for (ThriftStructFieldInfo field : scroogeFields) {
       children.add(toThriftField(field));
     }
-    return new ThriftType.StructType(children);
+
+    StructOrUnionType structOrUnionType =
+        isUnion(companionObject.getClass()) ? StructOrUnionType.UNION : StructOrUnionType.STRUCT;
+
+    return new ThriftType.StructType(children, structOrUnionType);
   }
 
   private Iterable<ThriftStructFieldInfo> getFieldInfos(ThriftStructCodec c) {

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/77826fda/parquet-thrift/src/main/java/parquet/thrift/BufferedProtocolReadToWrite.java
----------------------------------------------------------------------
diff --git a/parquet-thrift/src/main/java/parquet/thrift/BufferedProtocolReadToWrite.java b/parquet-thrift/src/main/java/parquet/thrift/BufferedProtocolReadToWrite.java
index 75d64bc..839f4d3 100644
--- a/parquet-thrift/src/main/java/parquet/thrift/BufferedProtocolReadToWrite.java
+++ b/parquet-thrift/src/main/java/parquet/thrift/BufferedProtocolReadToWrite.java
@@ -20,6 +20,8 @@ package parquet.thrift;
 
 import org.apache.thrift.TException;
 import org.apache.thrift.protocol.*;
+
+import parquet.ParquetRuntimeException;
 import parquet.thrift.struct.ThriftField;
 import parquet.thrift.struct.ThriftType;
 import parquet.thrift.struct.ThriftType.ListType;
@@ -358,11 +360,28 @@ public class BufferedProtocolReadToWrite implements ProtocolPipe {
       final TField currentField = field;
       ThriftField expectedField;
       if ((expectedField = type.getChildById(field.id)) == null) {
-        notifyIgnoredFieldsOfRecord(field);
-        hasFieldsIgnored |= true;
-        //read the value and ignore it, NullProtocol will do nothing
-        new ProtocolReadToWrite().readOneValue(in, new NullProtocol(), field.type);
-        continue;
+
+        switch (type.getStructOrUnionType()) {
+          case STRUCT:
+            // this is an unrecognized field in a struct, not a union
+            notifyIgnoredFieldsOfRecord(field);
+            hasFieldsIgnored |= true;
+            //read the value and ignore it, NullProtocol will do nothing
+            new ProtocolReadToWrite().readOneValue(in, new NullProtocol(), field.type);
+            continue;
+          case UNION:
+            // this is a union with an unrecognized member -- this is fatal for this record
+            // in the write path, because it will be unreadable in the read path.
+            // throwing here means we will either skip this record entirely, or fail completely.
+            throw new DecodingSchemaMismatchException("Unrecognized union member with id: "
+                + field.id + " for struct:\n" + type);
+          case UNKNOWN:
+            // we should never reach here in the write path -- this only happens if the
+            // deprecated constructor of StructType is used, which should only be used in the
+            // read path.
+            throw new ParquetRuntimeException("This should never happen! "
+                + "Don't know if this field is a union, was the deprecated constructor of StructType used?\n" + type){};
+        }
       }
       buffer.add(new Action() {
         @Override

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/77826fda/parquet-thrift/src/main/java/parquet/thrift/ThriftSchemaConverter.java
----------------------------------------------------------------------
diff --git a/parquet-thrift/src/main/java/parquet/thrift/ThriftSchemaConverter.java b/parquet-thrift/src/main/java/parquet/thrift/ThriftSchemaConverter.java
index c03f93e..735de85 100644
--- a/parquet-thrift/src/main/java/parquet/thrift/ThriftSchemaConverter.java
+++ b/parquet-thrift/src/main/java/parquet/thrift/ThriftSchemaConverter.java
@@ -22,6 +22,8 @@ import com.twitter.elephantbird.thrift.TStructDescriptor;
 import com.twitter.elephantbird.thrift.TStructDescriptor.Field;
 import org.apache.thrift.TBase;
 import org.apache.thrift.TEnum;
+import org.apache.thrift.TUnion;
+
 import parquet.schema.MessageType;
 import parquet.thrift.projection.FieldProjectionFilter;
 import parquet.thrift.projection.PathGlobPattern;
@@ -30,6 +32,7 @@ import parquet.thrift.struct.ThriftField;
 import parquet.thrift.struct.ThriftField.Requirement;
 import parquet.thrift.struct.ThriftType;
 import parquet.thrift.struct.ThriftType.*;
+import parquet.thrift.struct.ThriftType.StructType.StructOrUnionType;
 import parquet.thrift.struct.ThriftTypeID;
 
 import java.util.ArrayList;
@@ -44,6 +47,10 @@ public class ThriftSchemaConverter {
 
   private final FieldProjectionFilter fieldProjectionFilter;
 
+  public static <T extends TBase<?,?>> StructOrUnionType structOrUnionType(Class<T> klass) {
+    return TUnion.class.isAssignableFrom(klass) ? StructOrUnionType.UNION : StructOrUnionType.STRUCT;
+  }
+
   public ThriftSchemaConverter() {
     this(new FieldProjectionFilter());
   }
@@ -93,7 +100,7 @@ public class ThriftSchemaConverter {
                         Requirement.fromType(field.getFieldMetaData().requirementType);
         children.add(toThriftField(field.getName(), field, req));
       }
-      return new StructType(children);
+      return new StructType(children, structOrUnionType(struct.getThriftClass()));
     }
 
     private ThriftField toThriftField(String name, Field field, ThriftField.Requirement requirement) {

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/77826fda/parquet-thrift/src/main/java/parquet/thrift/struct/ThriftType.java
----------------------------------------------------------------------
diff --git a/parquet-thrift/src/main/java/parquet/thrift/struct/ThriftType.java b/parquet-thrift/src/main/java/parquet/thrift/struct/ThriftType.java
index 8a0f79c..7117bb1 100644
--- a/parquet-thrift/src/main/java/parquet/thrift/struct/ThriftType.java
+++ b/parquet-thrift/src/main/java/parquet/thrift/struct/ThriftType.java
@@ -175,9 +175,30 @@ public abstract class ThriftType {
 
     private final ThriftField[] childById;
 
+  /**
+   * Whether a struct is a union or a regular struct is not always known, because it was not always
+   * written to the metadata files.
+   *
+   * We should always know this in the write path, but may not in the read path.
+   */
+   public enum StructOrUnionType {
+      STRUCT,
+      UNION,
+      UNKNOWN
+   }
+
+    private final StructOrUnionType structOrUnionType;
+
+    @Deprecated
+    public StructType(List<ThriftField> children) {
+      this(children, null);
+    }
+
     @JsonCreator
-    public StructType(@JsonProperty("children") List<ThriftField> children) {
+    public StructType(@JsonProperty("children") List<ThriftField> children,
+                      @JsonProperty("structOrUnionType") StructOrUnionType structOrUnionType) {
       super(STRUCT);
+      this.structOrUnionType = structOrUnionType == null ? StructOrUnionType.UNKNOWN : structOrUnionType;
       this.children = children;
       int maxId = 0;
       if (children != null) {
@@ -206,6 +227,11 @@ public abstract class ThriftType {
       }
     }
 
+    @JsonProperty("structOrUnionType")
+    public StructOrUnionType getStructOrUnionType() {
+      return structOrUnionType;
+    }
+
     @Override
     public void accept(TypeVisitor visitor) {
       visitor.visit(this);

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/77826fda/parquet-thrift/src/test/java/parquet/thrift/TestProtocolReadToWrite.java
----------------------------------------------------------------------
diff --git a/parquet-thrift/src/test/java/parquet/thrift/TestProtocolReadToWrite.java b/parquet-thrift/src/test/java/parquet/thrift/TestProtocolReadToWrite.java
index 6a3aee4..e1d9070 100644
--- a/parquet-thrift/src/test/java/parquet/thrift/TestProtocolReadToWrite.java
+++ b/parquet-thrift/src/test/java/parquet/thrift/TestProtocolReadToWrite.java
@@ -41,6 +41,7 @@ import java.util.*;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 public class TestProtocolReadToWrite {
 
@@ -118,7 +119,8 @@ public class TestProtocolReadToWrite {
             ByteBuffer.wrap("a".getBytes()), new ArrayList<Byte>(), new ArrayList<Short>(), new ArrayList<Long>());
     a.write(protocol(in));
     try {
-    p.readOne(protocol(new ByteArrayInputStream(in.toByteArray())), protocol(out));
+      p.readOne(protocol(new ByteArrayInputStream(in.toByteArray())), protocol(out));
+      fail("this should throw");
     } catch (SkippableException e) {
       Throwable cause = e.getCause();
       assertTrue(cause instanceof DecodingSchemaMismatchException);
@@ -129,6 +131,36 @@ public class TestProtocolReadToWrite {
     assertEquals(0, countingHandler.fieldIgnoredCount);
   }
 
+  @Test
+  public void testUnrecognizedUnionMemberSchema() throws Exception {
+    CountingErrorHandler countingHandler = new CountingErrorHandler();
+    BufferedProtocolReadToWrite p = new BufferedProtocolReadToWrite(new ThriftSchemaConverter().toStructType(StructWithUnionV1.class), countingHandler);
+    final ByteArrayOutputStream in = new ByteArrayOutputStream();
+    final ByteArrayOutputStream out = new ByteArrayOutputStream();
+    StructWithUnionV1 validUnion = new StructWithUnionV1("a valid struct", UnionV1.aLong(new ALong(17L)));
+    StructWithUnionV2 invalidUnion = new StructWithUnionV2("a struct with new union member",
+        UnionV2.aNewBool(new ABool(true)));
+
+    validUnion.write(protocol(in));
+    invalidUnion.write(protocol(in));
+
+    ByteArrayInputStream baos = new ByteArrayInputStream(in.toByteArray());
+
+    // first one should not throw
+    p.readOne(protocol(baos), protocol(out));
+
+    try {
+      p.readOne(protocol(baos), protocol(out));
+      fail("this should throw");
+    } catch (SkippableException e) {
+      Throwable cause = e.getCause();
+      assertEquals(DecodingSchemaMismatchException.class, cause.getClass());
+      assertTrue(cause.getMessage().startsWith("Unrecognized union member with id: 3 for struct:"));
+    }
+    assertEquals(0, countingHandler.recordCountOfMissingFields);
+    assertEquals(0, countingHandler.fieldIgnoredCount);
+  }
+
   /**
    * When enum value in data has an undefined index, it's considered as corrupted record and will be skipped.
    *
@@ -144,11 +176,18 @@ public class TestProtocolReadToWrite {
     StructWithMoreEnum extraEnumDefinedInNewDefinition = new StructWithMoreEnum(NumberEnumWithMoreValue.FOUR);
     enumDefinedInOldDefinition.write(protocol(in));
     extraEnumDefinedInNewDefinition.write(protocol(in));
+
+    ByteArrayInputStream baos = new ByteArrayInputStream(in.toByteArray());
+
+    // first should not throw
+    p.readOne(protocol(baos), protocol(out));
+
     try {
-      p.readOne(protocol(new ByteArrayInputStream(in.toByteArray())), protocol(out));
+      p.readOne(protocol(baos), protocol(out));
+      fail("this should throw");
     } catch (SkippableException e) {
       Throwable cause = e.getCause();
-      assertTrue(cause instanceof DecodingSchemaMismatchException);
+      assertEquals(DecodingSchemaMismatchException.class, cause.getClass());
       assertTrue(cause.getMessage().contains("can not find index 4 in enum"));
     }
     assertEquals(0, countingHandler.recordCountOfMissingFields);

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/77826fda/parquet-thrift/src/test/java/parquet/thrift/struct/TestThriftType.java
----------------------------------------------------------------------
diff --git a/parquet-thrift/src/test/java/parquet/thrift/struct/TestThriftType.java b/parquet-thrift/src/test/java/parquet/thrift/struct/TestThriftType.java
new file mode 100644
index 0000000..fee3ef8
--- /dev/null
+++ b/parquet-thrift/src/test/java/parquet/thrift/struct/TestThriftType.java
@@ -0,0 +1,67 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package parquet.thrift.struct;
+
+import java.util.LinkedList;
+
+import org.junit.Test;
+
+import parquet.thrift.struct.ThriftType.StructType;
+import parquet.thrift.struct.ThriftType.StructType.StructOrUnionType;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestThriftType {
+
+  @Test
+  public void testWriteUnionInfo() throws Exception {
+    StructType st = new StructType(new LinkedList<ThriftField>(), null);
+    assertEquals("{\n"
+                +"  \"id\" : \"STRUCT\",\n"
+                +"  \"children\" : [ ],\n"
+                +"  \"structOrUnionType\" : \"UNKNOWN\"\n"
+                +"}", st.toJSON());
+
+    st = new StructType(new LinkedList<ThriftField>(), StructOrUnionType.UNION);
+    assertEquals("{\n"
+        +"  \"id\" : \"STRUCT\",\n"
+        +"  \"children\" : [ ],\n"
+        +"  \"structOrUnionType\" : \"UNION\"\n"
+        +"}", st.toJSON());
+
+    st = new StructType(new LinkedList<ThriftField>(), StructOrUnionType.STRUCT);
+    assertEquals("{\n"
+        +"  \"id\" : \"STRUCT\",\n"
+        +"  \"children\" : [ ],\n"
+        +"  \"structOrUnionType\" : \"STRUCT\"\n"
+        +"}", st.toJSON());
+  }
+
+  @Test
+  public void testParseUnionInfo() throws Exception {
+    StructType st = (StructType) StructType.fromJSON("{\"id\": \"STRUCT\", \"children\":[], \"structOrUnionType\": \"UNION\"}");
+    assertEquals(st.getStructOrUnionType(), StructOrUnionType.UNION);
+    st = (StructType) StructType.fromJSON("{\"id\": \"STRUCT\", \"children\":[], \"structOrUnionType\": \"STRUCT\"}");
+    assertEquals(st.getStructOrUnionType(), StructOrUnionType.STRUCT);
+    st = (StructType) StructType.fromJSON("{\"id\": \"STRUCT\", \"children\":[]}");
+    assertEquals(st.getStructOrUnionType(), StructOrUnionType.UNKNOWN);
+    st = (StructType) StructType.fromJSON("{\"id\": \"STRUCT\", \"children\":[], \"structOrUnionType\": \"UNKNOWN\"}");
+    assertEquals(st.getStructOrUnionType(), StructOrUnionType.UNKNOWN);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/77826fda/parquet-thrift/src/test/thrift/compat.thrift
----------------------------------------------------------------------
diff --git a/parquet-thrift/src/test/thrift/compat.thrift b/parquet-thrift/src/test/thrift/compat.thrift
index 574b4ad..ca4eb57 100644
--- a/parquet-thrift/src/test/thrift/compat.thrift
+++ b/parquet-thrift/src/test/thrift/compat.thrift
@@ -115,3 +115,36 @@ struct ListStructV1{
 struct ListStructV2{
   1: required list<StructV2> list1
 }
+
+struct AString {
+  1: required string s
+}
+
+struct ALong {
+  1: required i64 l
+}
+
+struct ABool {
+  1: required bool b
+}
+
+union UnionV1 {
+  1: AString aString,
+  2: ALong aLong
+}
+
+union UnionV2 {
+  1: AString aString,
+  2: ALong aLong,
+  3: ABool aNewBool
+}
+
+struct StructWithUnionV1 {  
+  1: required string name,
+  2: required UnionV1 aUnion
+}
+
+struct StructWithUnionV2 {  
+  1: required string name,
+  2: required UnionV2 aUnion
+}
\ No newline at end of file