You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by al...@apache.org on 2015/04/30 08:18:54 UTC

parquet-mr git commit: PARQUET-227 Enforce that unions have only 1 set value, tolerate bad records in read path

Repository: parquet-mr
Updated Branches:
  refs/heads/master b287d35fe -> 9993450ad


PARQUET-227 Enforce that unions have only 1 set value, tolerate bad records in read path

See https://issues.apache.org/jira/browse/PARQUET-227

Author: Alex Levenson <al...@twitter.com>

Closes #153 from isnotinvain/alexlevenson/double-union and squashes the following commits:

ef4d36f [Alex Levenson] fix package names
e201deb [Alex Levenson] Merge branch 'master' into alexlevenson/double-union
01694fa [Alex Levenson] Forgot a break in a switch statement
2f31321 [Alex Levenson] Merge branch 'master' into alexlevenson/double-union
9292274 [Alex Levenson] Add in ShouldNeverHappenException which I forgot to check in
8d61515 [Alex Levenson] Address first round of comments
4d71bcb [Alex Levenson] Merge branch 'master' into alexlevenson/double-union
8f9334c [Alex Levenson] Some cleanup and fixes
8153bc9 [Alex Levenson] Enforce that unions have only 1 set value, tolerate bad records in read path


Project: http://git-wip-us.apache.org/repos/asf/parquet-mr/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-mr/commit/9993450a
Tree: http://git-wip-us.apache.org/repos/asf/parquet-mr/tree/9993450a
Diff: http://git-wip-us.apache.org/repos/asf/parquet-mr/diff/9993450a

Branch: refs/heads/master
Commit: 9993450ad1f023e0e2b59291361d0b3b9f0e1c8d
Parents: b287d35
Author: Alex Levenson <al...@twitter.com>
Authored: Wed Apr 29 23:18:47 2015 -0700
Committer: Alex Levenson <al...@twitter.com>
Committed: Wed Apr 29 23:18:47 2015 -0700

----------------------------------------------------------------------
 .../parquet/io/api/RecordMaterializer.java      |  27 +++
 .../parquet/ShouldNeverHappenException.java     |  40 ++++
 .../hadoop/InternalParquetRecordReader.java     |  16 +-
 .../hadoop/UnmaterializableRecordCounter.java   |  87 ++++++++
 .../scrooge/TestCorruptScroogeRecords.java      |  69 ++++++
 parquet-scrooge/src/test/thrift/test.thrift     |  34 +++
 .../hadoop/thrift/ThriftReadSupport.java        |  15 ++
 .../thrift/BufferedProtocolReadToWrite.java     | 113 +++++++---
 .../apache/parquet/thrift/ParquetProtocol.java  |   4 +-
 .../parquet/thrift/ParquetReadProtocol.java     |   4 +
 .../parquet/thrift/ThriftRecordConverter.java   |   5 +-
 .../hadoop/thrift/TestCorruptThriftRecords.java | 213 +++++++++++++++++++
 .../hadoop/thrift/TestInputOutputFormat.java    |   2 +-
 .../parquet/thrift/TestProtocolReadToWrite.java |  60 ++++++
 parquet-thrift/src/test/thrift/compat.thrift    |  11 +
 15 files changed, 665 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/9993450a/parquet-column/src/main/java/org/apache/parquet/io/api/RecordMaterializer.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/io/api/RecordMaterializer.java b/parquet-column/src/main/java/org/apache/parquet/io/api/RecordMaterializer.java
index 98e4d50..9aee114 100644
--- a/parquet-column/src/main/java/org/apache/parquet/io/api/RecordMaterializer.java
+++ b/parquet-column/src/main/java/org/apache/parquet/io/api/RecordMaterializer.java
@@ -18,6 +18,8 @@
  */
 package org.apache.parquet.io.api;
 
+import org.apache.parquet.io.ParquetDecodingException;
+
 /**
  * Top-level class which should be implemented in order to materialize objects from
  * a stream of Parquet data.
@@ -33,6 +35,7 @@ abstract public class RecordMaterializer<T> {
 
   /**
    * @return the result of the conversion
+   * @throws RecordMaterializationException to signal that a record cannot be materialized, but can be skipped
    */
   abstract public T getCurrentRecord();
 
@@ -45,4 +48,28 @@ abstract public class RecordMaterializer<T> {
    * @return the root converter for this tree
    */
   abstract public GroupConverter getRootConverter();
+
+  /**
+   * This exception signals that the current record is cannot be converted from parquet columns to a materialized
+   * record, but can be skipped if requested. This exception should be used to signal errors like a union with no
+   * set values, or an error in converting parquet primitive values to a materialized record. It should not
+   * be used to signal unrecoverable errors, like a data column being corrupt or unreadable.
+   */
+  public static class RecordMaterializationException extends ParquetDecodingException {
+    public RecordMaterializationException() {
+      super();
+    }
+
+    public RecordMaterializationException(String message, Throwable cause) {
+      super(message, cause);
+    }
+
+    public RecordMaterializationException(String message) {
+      super(message);
+    }
+
+    public RecordMaterializationException(Throwable cause) {
+      super(cause);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/9993450a/parquet-common/src/main/java/org/apache/parquet/ShouldNeverHappenException.java
----------------------------------------------------------------------
diff --git a/parquet-common/src/main/java/org/apache/parquet/ShouldNeverHappenException.java b/parquet-common/src/main/java/org/apache/parquet/ShouldNeverHappenException.java
new file mode 100644
index 0000000..4174bc5
--- /dev/null
+++ b/parquet-common/src/main/java/org/apache/parquet/ShouldNeverHappenException.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet;
+
+/**
+ * Used in code blocks that should be unreachable, but the compiler does
+ * not know this, for example the default clause of an exhaustive switch statement.
+ */
+public class ShouldNeverHappenException extends ParquetRuntimeException {
+  public ShouldNeverHappenException() {
+  }
+
+  public ShouldNeverHappenException(String message, Throwable cause) {
+    super(message, cause);
+  }
+
+  public ShouldNeverHappenException(String message) {
+    super(message);
+  }
+
+  public ShouldNeverHappenException(Throwable cause) {
+    super(cause);
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/9993450a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java
index d40e87f..6ff4eac 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java
@@ -24,8 +24,8 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-
 import java.util.Set;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 
@@ -43,6 +43,7 @@ import org.apache.parquet.io.ColumnIOFactory;
 import org.apache.parquet.io.MessageColumnIO;
 import org.apache.parquet.io.ParquetDecodingException;
 import org.apache.parquet.io.api.RecordMaterializer;
+import org.apache.parquet.io.api.RecordMaterializer.RecordMaterializationException;
 import org.apache.parquet.schema.GroupType;
 import org.apache.parquet.schema.MessageType;
 import org.apache.parquet.schema.Type;
@@ -80,6 +81,7 @@ class InternalParquetRecordReader<T> {
   private long totalCountLoadedSoFar = 0;
 
   private Path file;
+  private UnmaterializableRecordCounter unmaterializableRecordCounter;
 
   /**
    * @param readSupport Object which helps reads files of the given type, e.g. Thrift, Avro.
@@ -179,6 +181,7 @@ class InternalParquetRecordReader<T> {
     for (BlockMetaData block : blocks) {
       total += block.getRowCount();
     }
+    this.unmaterializableRecordCounter = new UnmaterializableRecordCounter(configuration, total);
     LOG.info("RecordReader initialized will read a total of " + total + " records.");
   }
 
@@ -206,8 +209,17 @@ class InternalParquetRecordReader<T> {
 
       try {
         checkRead();
-        currentValue = recordReader.read();
         current ++;
+
+        try {
+          currentValue = recordReader.read();
+        } catch (RecordMaterializationException e) {
+          // this might throw, but it's fatal if it does.
+          unmaterializableRecordCounter.incErrors(e);
+          if (DEBUG) LOG.debug("skipping a corrupt record");
+          continue;
+        }
+
         if (recordReader.shouldSkipCurrentRecord()) {
           // this record is being filtered via the filter2 package
           if (DEBUG) LOG.debug("skipping record");

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/9993450a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/UnmaterializableRecordCounter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/UnmaterializableRecordCounter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/UnmaterializableRecordCounter.java
new file mode 100644
index 0000000..c4de8f3
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/UnmaterializableRecordCounter.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.Log;
+import org.apache.parquet.io.ParquetDecodingException;
+import org.apache.parquet.io.api.RecordMaterializer.RecordMaterializationException;
+
+// Essentially taken from:
+// https://github.com/twitter/elephant-bird/blob/master/core/src/main/java/com/twitter/elephantbird/mapreduce/input/LzoRecordReader.java#L124
+
+/**
+ * Tracks number of records that cannot be materialized and throws ParquetDecodingException
+ * if the rate of errors crosses a limit.<p> These types of errors are meant
+ * to be recoverable record conversion errors, such as a union missing a value, or schema
+ * mismatch and so on. It's not meant to recover from corruptions in the parquet
+ * columns themselves.
+ *
+ * The intention is to skip over very rare file corruption or bugs where
+ * the write path has allowed invalid records into the file, but still catch large
+ * numbers of failures. Not turned on by default (by default, no errors are tolerated).
+ */
+public class UnmaterializableRecordCounter {
+
+  /* Tolerated percent bad records */
+  public static final String BAD_RECORD_THRESHOLD_CONF_KEY = "parquet.read.bad.record.threshold";
+
+  private static final Log LOG = Log.getLog(UnmaterializableRecordCounter.class);
+
+  private static final float DEFAULT_THRESHOLD =  0f;
+
+  private long numErrors;
+
+  private final double errorThreshold; // max fraction of errors allowed
+  private final long totalNumRecords; // how many records are we going to see total?
+
+  public UnmaterializableRecordCounter(Configuration conf, long totalNumRecords) {
+    this(
+        conf.getFloat(BAD_RECORD_THRESHOLD_CONF_KEY, DEFAULT_THRESHOLD),
+        totalNumRecords
+     );
+  }
+
+  public UnmaterializableRecordCounter(double errorThreshold, long totalNumRecords) {
+    this.errorThreshold = errorThreshold;
+    this.totalNumRecords = totalNumRecords;
+    numErrors = 0;
+  }
+
+  public void incErrors(RecordMaterializationException cause) throws ParquetDecodingException {
+    numErrors++;
+
+    LOG.warn(String.format("Error while reading an input record (%s out of %s): ",
+        numErrors, totalNumRecords), cause);
+
+    if (numErrors > 0 && errorThreshold <= 0) { // no errors are tolerated
+      throw new ParquetDecodingException("Error while decoding records", cause);
+    }
+
+    double errRate = numErrors/(double)totalNumRecords;
+
+    if (errRate > errorThreshold) {
+      String message = String.format("Decoding error rate of at least %s/%s crosses configured threshold of %s",
+          numErrors, totalNumRecords, errorThreshold);
+      LOG.error(message);
+      throw new ParquetDecodingException(message, cause);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/9993450a/parquet-scrooge/src/test/java/org/apache/parquet/scrooge/TestCorruptScroogeRecords.java
----------------------------------------------------------------------
diff --git a/parquet-scrooge/src/test/java/org/apache/parquet/scrooge/TestCorruptScroogeRecords.java b/parquet-scrooge/src/test/java/org/apache/parquet/scrooge/TestCorruptScroogeRecords.java
new file mode 100644
index 0000000..377134c
--- /dev/null
+++ b/parquet-scrooge/src/test/java/org/apache/parquet/scrooge/TestCorruptScroogeRecords.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.scrooge;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.thrift.protocol.TBinaryProtocol.Factory;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.transport.TIOStreamTransport;
+
+import org.apache.parquet.hadoop.thrift.TestCorruptThriftRecords;
+import org.apache.parquet.hadoop.thrift.ThriftReadSupport;
+import org.apache.parquet.scrooge.test.StructWithUnionV2;
+import org.apache.parquet.scrooge.test.StructWithUnionV2$;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestCorruptScroogeRecords extends TestCorruptThriftRecords {
+
+  @Override
+  public void setupJob(Job job, Path path) throws Exception {
+    job.setInputFormatClass(ParquetScroogeInputFormat.class);
+    ParquetScroogeInputFormat.setInputPaths(job, path);
+    ParquetScroogeInputFormat.setThriftClass(job.getConfiguration(), StructWithUnionV2.class);
+
+
+    ThriftReadSupport.setRecordConverterClass(job.getConfiguration(), ScroogeRecordConverter.class);
+
+    job.setMapperClass(ReadMapper.class);
+    job.setNumReduceTasks(0);
+    job.setOutputFormatClass(NullOutputFormat.class);
+  }
+
+  @Override
+  protected void assertEqualsExcepted(List<org.apache.parquet.thrift.test.compat.StructWithUnionV2> expected, List<Object> found) throws Exception {
+    List<StructWithUnionV2> scroogeExpected = new ArrayList<StructWithUnionV2>();
+    for (org.apache.parquet.thrift.test.compat.StructWithUnionV2 tbase : expected) {
+      ByteArrayOutputStream baos = new ByteArrayOutputStream();
+      TProtocol out = new Factory().getProtocol(new TIOStreamTransport(baos));
+      tbase.write(out);
+      TProtocol in = new Factory().getProtocol(new TIOStreamTransport(new ByteArrayInputStream(baos.toByteArray())));
+      scroogeExpected.add(StructWithUnionV2$.MODULE$.decode(in));
+    }
+    assertEquals(scroogeExpected, found);
+   }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/9993450a/parquet-scrooge/src/test/thrift/test.thrift
----------------------------------------------------------------------
diff --git a/parquet-scrooge/src/test/thrift/test.thrift b/parquet-scrooge/src/test/thrift/test.thrift
index 34cb7a1..bfb71f6 100644
--- a/parquet-scrooge/src/test/thrift/test.thrift
+++ b/parquet-scrooge/src/test/thrift/test.thrift
@@ -172,3 +172,37 @@ struct StringAndBinary {
   1: required string s;
   2: required binary b;
 }
+
+struct AString {
+  1: required string s
+}
+
+struct ALong {
+  1: required i64 l
+}
+
+struct ABool {
+  1: required bool b
+}
+
+union UnionV2 {
+  1: AString aString,
+  2: ALong aLong,
+  3: ABool aNewBool
+}
+
+struct StructWithUnionV2 {  
+  1: required string name,
+  2: required UnionV2 aUnion
+}
+
+struct AStructThatLooksLikeUnionV2 {
+  1: optional AString aString,
+  2: optional ALong aLong,
+  3: optional ABool aNewBool
+}
+
+struct StructWithAStructThatLooksLikeUnionV2 {  
+  1: required string name,
+  2: required AStructThatLooksLikeUnionV2 aNotQuiteUnion
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/9993450a/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/ThriftReadSupport.java
----------------------------------------------------------------------
diff --git a/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/ThriftReadSupport.java b/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/ThriftReadSupport.java
index 49b4eac..871f817 100644
--- a/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/ThriftReadSupport.java
+++ b/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/ThriftReadSupport.java
@@ -67,12 +67,27 @@ public class ThriftReadSupport<T> extends ReadSupport<T> {
    * implementation creates standard Apache Thrift {@link TBase} objects; to support alternatives, such
    * as <a href="http://github.com/twitter/scrooge">Twiter's Scrooge</a>, a custom converter can be specified
    * (for example, ScroogeRecordConverter from parquet-scrooge).
+   *
+   * @deprecated use {@link #setRecordConverterClass(Configuration, Class)} below
    */
+  @Deprecated
   public static void setRecordConverterClass(JobConf conf,
       Class<?> klass) {
+    setRecordConverterClass((Configuration) conf, klass);
+  }
+
+  /**
+   * A {@link ThriftRecordConverter} builds an object by working with {@link TProtocol}. The default
+   * implementation creates standard Apache Thrift {@link TBase} objects; to support alternatives, such
+   * as <a href="http://github.com/twitter/scrooge">Twiter's Scrooge</a>, a custom converter can be specified
+   * (for example, ScroogeRecordConverter from parquet-scrooge).
+   */
+  public static void setRecordConverterClass(Configuration conf,
+                                             Class<?> klass) {
     conf.set(RECORD_CONVERTER_CLASS_KEY, klass.getName());
   }
 
+
   /**
    * used from hadoop
    * the configuration must contain a "parquet.thrift.read.class" setting

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/9993450a/parquet-thrift/src/main/java/org/apache/parquet/thrift/BufferedProtocolReadToWrite.java
----------------------------------------------------------------------
diff --git a/parquet-thrift/src/main/java/org/apache/parquet/thrift/BufferedProtocolReadToWrite.java b/parquet-thrift/src/main/java/org/apache/parquet/thrift/BufferedProtocolReadToWrite.java
index 45c9bf6..70bd003 100644
--- a/parquet-thrift/src/main/java/org/apache/parquet/thrift/BufferedProtocolReadToWrite.java
+++ b/parquet-thrift/src/main/java/org/apache/parquet/thrift/BufferedProtocolReadToWrite.java
@@ -18,22 +18,31 @@
  */
 package org.apache.parquet.thrift;
 
+import java.nio.ByteBuffer;
+import java.util.LinkedList;
+import java.util.List;
+
 import org.apache.thrift.TException;
-import org.apache.thrift.protocol.*;
+import org.apache.thrift.protocol.TField;
+import org.apache.thrift.protocol.TList;
+import org.apache.thrift.protocol.TMap;
+import org.apache.thrift.protocol.TMessage;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.protocol.TSet;
+import org.apache.thrift.protocol.TStruct;
+import org.apache.thrift.protocol.TType;
 
 import org.apache.parquet.ParquetRuntimeException;
+import org.apache.parquet.ShouldNeverHappenException;
 import org.apache.parquet.thrift.struct.ThriftField;
 import org.apache.parquet.thrift.struct.ThriftType;
 import org.apache.parquet.thrift.struct.ThriftType.ListType;
 import org.apache.parquet.thrift.struct.ThriftType.MapType;
 import org.apache.parquet.thrift.struct.ThriftType.SetType;
 import org.apache.parquet.thrift.struct.ThriftType.StructType;
+import org.apache.parquet.thrift.struct.ThriftType.StructType.StructOrUnionType;
 import org.apache.parquet.thrift.struct.ThriftTypeID;
 
-import java.nio.ByteBuffer;
-import java.util.LinkedList;
-import java.util.List;
-
 /**
  * Class to read from one protocol in a buffer and then write to another one
  * When there is an exception during reading, it's a skippable exception.
@@ -354,34 +363,17 @@ public class BufferedProtocolReadToWrite implements ProtocolPipe {
         return "(";
       }
     });
+
     TField field;
     boolean hasFieldsIgnored = false;
+    int childFieldsPresent = 0;
     while ((field = in.readFieldBegin()).type != TType.STOP) {
       final TField currentField = field;
       ThriftField expectedField;
       if ((expectedField = type.getChildById(field.id)) == null) {
-
-        switch (type.getStructOrUnionType()) {
-          case STRUCT:
-            // this is an unrecognized field in a struct, not a union
-            notifyIgnoredFieldsOfRecord(field);
-            hasFieldsIgnored |= true;
-            //read the value and ignore it, NullProtocol will do nothing
-            new ProtocolReadToWrite().readOneValue(in, new NullProtocol(), field.type);
-            continue;
-          case UNION:
-            // this is a union with an unrecognized member -- this is fatal for this record
-            // in the write path, because it will be unreadable in the read path.
-            // throwing here means we will either skip this record entirely, or fail completely.
-            throw new DecodingSchemaMismatchException("Unrecognized union member with id: "
-                + field.id + " for struct:\n" + type);
-          case UNKNOWN:
-            // we should never reach here in the write path -- this only happens if the
-            // deprecated constructor of StructType is used, which should only be used in the
-            // read path.
-            throw new ParquetRuntimeException("This should never happen! "
-                + "Don't know if this field is a union, was the deprecated constructor of StructType used?\n" + type){};
-        }
+        handleUnrecognizedField(field, type, in);
+        hasFieldsIgnored |= true;
+        continue;
       }
       buffer.add(new Action() {
         @Override
@@ -394,15 +386,80 @@ public class BufferedProtocolReadToWrite implements ProtocolPipe {
           return "f=" + currentField.id + "<t=" + typeName(currentField.type) + ">: ";
         }
       });
-      hasFieldsIgnored |= readOneValue(in, field.type, buffer, expectedField.getType());
+      boolean wasIgnored = readOneValue(in, field.type, buffer, expectedField.getType());
+      if (!wasIgnored) {
+        childFieldsPresent++;
+      }
+      hasFieldsIgnored |= wasIgnored;
       in.readFieldEnd();
       buffer.add(FIELD_END);
     }
+
+    // check that union had exactly 1 (no more no less) child fields.
+    assertUnionHasExactlyOneChild(type, childFieldsPresent);
+
     in.readStructEnd();
     buffer.add(STRUCT_END);
     return hasFieldsIgnored;
   }
 
+  private void handleUnrecognizedField(TField field, StructType type, TProtocol in) throws TException {
+    switch (type.getStructOrUnionType()) {
+      case STRUCT:
+        // this is an unrecognized field in a struct, not a union
+        notifyIgnoredFieldsOfRecord(field);
+        //read the value and ignore it, NullProtocol will do nothing
+        new ProtocolReadToWrite().readOneValue(in, new NullProtocol(), field.type);
+        break;
+      case UNION:
+        // this is a union with an unrecognized member -- this is fatal for this record
+        // in the write path, because it will be unreadable in the read path.
+        // throwing here means we will either skip this record entirely, or fail completely.
+        throw new DecodingSchemaMismatchException("Unrecognized union member with id: "
+            + field.id + " for struct:\n" + type);
+      case UNKNOWN:
+        throw unknownStructOrUnion(type);
+      default:
+        throw unrecognizedStructOrUnion(type.getStructOrUnionType());
+    }
+  }
+
+  private void assertUnionHasExactlyOneChild(StructType type, int childFieldsPresent) {
+    switch (type.getStructOrUnionType()) {
+      case STRUCT:
+        // nothing to do
+        break;
+      case UNION:
+        // childFieldsPresent must == 1
+        if (childFieldsPresent != 1) {
+
+          if (childFieldsPresent == 0) {
+            throw new DecodingSchemaMismatchException("Cannot write a TUnion with no set value in :\n" + type);
+          } else {
+            throw new DecodingSchemaMismatchException("Cannot write a TUnion with more than 1 set value in :\n" + type);
+          }
+
+        }
+        break;
+      case UNKNOWN:
+        throw unknownStructOrUnion(type);
+      default:
+        throw unrecognizedStructOrUnion(type.getStructOrUnionType());
+    }
+  }
+
+  private static ShouldNeverHappenException unrecognizedStructOrUnion(StructOrUnionType type) {
+    return new ShouldNeverHappenException("Unrecognized StructOrUnionType: " + type);
+  }
+
+  // we should never reach here in the write path -- this only happens if the
+  // deprecated constructor of StructType is used, which should only be used in the
+  // read path.
+  private static ShouldNeverHappenException unknownStructOrUnion(StructType type) {
+    return new ShouldNeverHappenException("This should never happen! "
+        + "Don't know if this field is a union, was the deprecated constructor of StructType used?\n" + type);
+  }
+
   private boolean readOneMap(TProtocol in, List<Action> buffer, MapType mapType) throws TException {
     final TMap map = in.readMapBegin();
     buffer.add(new Action() {

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/9993450a/parquet-thrift/src/main/java/org/apache/parquet/thrift/ParquetProtocol.java
----------------------------------------------------------------------
diff --git a/parquet-thrift/src/main/java/org/apache/parquet/thrift/ParquetProtocol.java b/parquet-thrift/src/main/java/org/apache/parquet/thrift/ParquetProtocol.java
index 50f09cf..0151cde 100644
--- a/parquet-thrift/src/main/java/org/apache/parquet/thrift/ParquetProtocol.java
+++ b/parquet-thrift/src/main/java/org/apache/parquet/thrift/ParquetProtocol.java
@@ -61,11 +61,11 @@ public abstract class ParquetProtocol extends TProtocol {
     this.name = name;
   }
 
-  private UnsupportedOperationException exception() {
+  private TException exception() {
     String message = name == null ?
         "in " + getClassInfo() :
         "when we expected " + name + " in " + getClassInfo();
-    return new UnsupportedOperationException(new Exception().getStackTrace()[1].getMethodName() + " was called " + message);
+    return new TException(new UnsupportedOperationException(new Exception().getStackTrace()[1].getMethodName() + " was called " + message));
   }
 
   /** WRITE */

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/9993450a/parquet-thrift/src/main/java/org/apache/parquet/thrift/ParquetReadProtocol.java
----------------------------------------------------------------------
diff --git a/parquet-thrift/src/main/java/org/apache/parquet/thrift/ParquetReadProtocol.java b/parquet-thrift/src/main/java/org/apache/parquet/thrift/ParquetReadProtocol.java
index c7872af..d3b496a 100644
--- a/parquet-thrift/src/main/java/org/apache/parquet/thrift/ParquetReadProtocol.java
+++ b/parquet-thrift/src/main/java/org/apache/parquet/thrift/ParquetReadProtocol.java
@@ -54,6 +54,10 @@ class ParquetReadProtocol extends ParquetProtocol {
     this.events.addAll(events);
   }
 
+  public void clear() {
+    this.events.clear();
+  }
+
   private TProtocol next() {
     return events.removeFirst();
   }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/9993450a/parquet-thrift/src/main/java/org/apache/parquet/thrift/ThriftRecordConverter.java
----------------------------------------------------------------------
diff --git a/parquet-thrift/src/main/java/org/apache/parquet/thrift/ThriftRecordConverter.java b/parquet-thrift/src/main/java/org/apache/parquet/thrift/ThriftRecordConverter.java
index d0daa35..ec0f4ff 100644
--- a/parquet-thrift/src/main/java/org/apache/parquet/thrift/ThriftRecordConverter.java
+++ b/parquet-thrift/src/main/java/org/apache/parquet/thrift/ThriftRecordConverter.java
@@ -33,7 +33,6 @@ import org.apache.thrift.protocol.TSet;
 import org.apache.thrift.protocol.TStruct;
 import org.apache.thrift.protocol.TType;
 
-import org.apache.parquet.io.ParquetDecodingException;
 import org.apache.parquet.io.api.Binary;
 import org.apache.parquet.io.api.Converter;
 import org.apache.parquet.io.api.GroupConverter;
@@ -843,7 +842,9 @@ public class ThriftRecordConverter<T> extends RecordMaterializer<T> {
       rootEvents.clear();
       return thriftReader.readOneRecord(protocol);
     } catch (TException e) {
-      throw new ParquetDecodingException("Could not read thrift object from protocol", e);
+      protocol.clear();
+      rootEvents.clear();
+      throw new RecordMaterializationException("Could not read thrift object from protocol", e);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/9993450a/parquet-thrift/src/test/java/org/apache/parquet/hadoop/thrift/TestCorruptThriftRecords.java
----------------------------------------------------------------------
diff --git a/parquet-thrift/src/test/java/org/apache/parquet/hadoop/thrift/TestCorruptThriftRecords.java b/parquet-thrift/src/test/java/org/apache/parquet/hadoop/thrift/TestCorruptThriftRecords.java
new file mode 100644
index 0000000..c31aa9c
--- /dev/null
+++ b/parquet-thrift/src/test/java/org/apache/parquet/hadoop/thrift/TestCorruptThriftRecords.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.thrift;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import org.apache.parquet.hadoop.UnmaterializableRecordCounter;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.thrift.ThriftParquetWriter;
+import org.apache.parquet.thrift.test.compat.ABool;
+import org.apache.parquet.thrift.test.compat.ALong;
+import org.apache.parquet.thrift.test.compat.AString;
+import org.apache.parquet.thrift.test.compat.AStructThatLooksLikeUnionV2;
+import org.apache.parquet.thrift.test.compat.StructWithAStructThatLooksLikeUnionV2;
+import org.apache.parquet.thrift.test.compat.StructWithUnionV2;
+import org.apache.parquet.thrift.test.compat.UnionV2;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static org.apache.parquet.hadoop.thrift.TestInputOutputFormat.waitForJob;
+
+public class TestCorruptThriftRecords {
+
+  @Rule
+  public final TemporaryFolder tempDir = new TemporaryFolder();
+
+  public static class ReadMapper<T> extends Mapper<Void, T, Void, Void> {
+    public static List<Object> records;
+
+    @Override
+    protected void setup(Context context) throws IOException, InterruptedException {
+      records = new ArrayList<Object>();
+    }
+
+    @Override
+    protected void map(Void key, T value, Context context) throws IOException, InterruptedException {
+      records.add(value);
+    }
+  }
+
+  public static StructWithAStructThatLooksLikeUnionV2 makeValid(int i) {
+    AStructThatLooksLikeUnionV2 validUnion = new AStructThatLooksLikeUnionV2();
+    switch (i % 3) {
+      case 0:
+        validUnion.setALong(new ALong(17L));
+        break;
+      case 1:
+        validUnion.setANewBool(new ABool(false));
+        break;
+      case 2:
+        validUnion.setAString(new AString("bar"));
+        break;
+    }
+    return new StructWithAStructThatLooksLikeUnionV2("foo" + i, validUnion);
+  }
+
+  public static StructWithUnionV2 makeExpectedValid(int i) {
+    UnionV2 validUnion = new UnionV2();
+    switch (i % 3) {
+      case 0:
+        validUnion.setALong(new ALong(17L));
+        break;
+      case 1:
+        validUnion.setANewBool(new ABool(false));
+        break;
+      case 2:
+        validUnion.setAString(new AString("bar"));
+        break;
+    }
+    return new StructWithUnionV2("foo" + i, validUnion);
+  }
+
+  public static StructWithAStructThatLooksLikeUnionV2 makeInvalid(int i) {
+    AStructThatLooksLikeUnionV2 invalid = new AStructThatLooksLikeUnionV2();
+    if (i % 2 == 0) {
+      // sometimes write too many
+      invalid.setALong(new ALong(18l));
+      invalid.setANewBool(new ABool(false));
+    } else {
+      // sometimes write too few
+    }
+    return new StructWithAStructThatLooksLikeUnionV2("foo" + i, invalid);
+  }
+
+  protected void setupJob(Job job, Path path) throws Exception {
+    job.setInputFormatClass(ParquetThriftInputFormat.class);
+    ParquetThriftInputFormat.setInputPaths(job, path);
+    ParquetThriftInputFormat.setThriftClass(job.getConfiguration(), StructWithUnionV2.class);
+
+    job.setMapperClass(ReadMapper.class);
+    job.setNumReduceTasks(0);
+    job.setOutputFormatClass(NullOutputFormat.class);
+  }
+
+  protected void assertEqualsExcepted(List<StructWithUnionV2> expected, List<Object> found) throws Exception {
+    assertEquals(expected, found);
+  }
+
+  private Path writeFileWithCorruptRecords(int numCorrupt, List<StructWithUnionV2> collectExpectedRecords) throws Exception {
+    // generate a file with records that are corrupt according to thrift
+    // by writing some structs that when interpreted as unions will be
+    // unreadable
+    Path outputPath = new Path(new File(tempDir.getRoot(), "corrupt_out").getAbsolutePath());
+    ParquetWriter<StructWithAStructThatLooksLikeUnionV2> writer = new ThriftParquetWriter<StructWithAStructThatLooksLikeUnionV2>(
+        outputPath,
+        StructWithAStructThatLooksLikeUnionV2.class,
+        CompressionCodecName.UNCOMPRESSED
+    );
+
+    int numRecords = 0;
+
+    for (int i = 0; i < 100; i++) {
+      StructWithAStructThatLooksLikeUnionV2 valid  = makeValid(numRecords);
+      StructWithUnionV2 expected = makeExpectedValid(numRecords);
+      numRecords++;
+      collectExpectedRecords.add(expected);
+      writer.write(valid);
+    }
+
+    for (int i = 0; i < numCorrupt; i++) {
+      writer.write(makeInvalid(numRecords++));
+    }
+
+    for (int i = 0; i < 100; i++) {
+      StructWithAStructThatLooksLikeUnionV2 valid  = makeValid(numRecords);
+      StructWithUnionV2 expected = makeExpectedValid(numRecords);
+      numRecords++;
+      collectExpectedRecords.add(expected);
+      writer.write(valid);
+    }
+
+    writer.close();
+
+    return outputPath;
+  }
+
+  private void readFile(Path path, Configuration conf, String name) throws Exception {
+    Job job = new Job(conf, name);
+    setupJob(job, path);
+    waitForJob(job);
+  }
+
+  @Test
+  public void testDefaultsToNoTolerance() throws Exception {
+    ArrayList<StructWithUnionV2> expected = new ArrayList<StructWithUnionV2>();
+    try {
+      readFile(writeFileWithCorruptRecords(1, expected), new Configuration(), "testDefaultsToNoTolerance");
+      fail("This should throw");
+    } catch (RuntimeException e) {
+      // still should have actually read all the valid records
+      assertEquals(100, ReadMapper.records.size());
+      assertEqualsExcepted(expected.subList(0, 100), ReadMapper.records);
+    }
+  }
+
+  @Test
+  public void testCanTolerateBadRecords() throws Exception {
+    Configuration conf = new Configuration();
+    conf.setFloat(UnmaterializableRecordCounter.BAD_RECORD_THRESHOLD_CONF_KEY, 0.1f);
+
+    List<StructWithUnionV2> expected = new ArrayList<StructWithUnionV2>();
+
+    readFile(writeFileWithCorruptRecords(4, expected), conf, "testCanTolerateBadRecords");
+    assertEquals(200, ReadMapper.records.size());
+    assertEqualsExcepted(expected, ReadMapper.records);
+  }
+
+  @Test
+  public void testThrowsWhenTooManyBadRecords() throws Exception {
+    Configuration conf = new Configuration();
+    conf.setFloat(UnmaterializableRecordCounter.BAD_RECORD_THRESHOLD_CONF_KEY, 0.1f);
+
+    ArrayList<StructWithUnionV2> expected = new ArrayList<StructWithUnionV2>();
+
+    try {
+      readFile(writeFileWithCorruptRecords(300, expected), conf, "testThrowsWhenTooManyBadRecords");
+      fail("This should throw");
+    } catch (RuntimeException e) {
+      // still should have actually read all the valid records
+      assertEquals(100, ReadMapper.records.size());
+      assertEqualsExcepted(expected.subList(0, 100), ReadMapper.records);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/9993450a/parquet-thrift/src/test/java/org/apache/parquet/hadoop/thrift/TestInputOutputFormat.java
----------------------------------------------------------------------
diff --git a/parquet-thrift/src/test/java/org/apache/parquet/hadoop/thrift/TestInputOutputFormat.java b/parquet-thrift/src/test/java/org/apache/parquet/hadoop/thrift/TestInputOutputFormat.java
index 69a2d31..0835cdb 100644
--- a/parquet-thrift/src/test/java/org/apache/parquet/hadoop/thrift/TestInputOutputFormat.java
+++ b/parquet-thrift/src/test/java/org/apache/parquet/hadoop/thrift/TestInputOutputFormat.java
@@ -242,7 +242,7 @@ public class TestInputOutputFormat {
     waitForJob(job);
   }
 
-  private void waitForJob(Job job) throws Exception {
+  public static void waitForJob(Job job) throws Exception {
     job.submit();
     while (!job.isComplete()) {
       LOG.debug("waiting for job " + job.getJobName());

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/9993450a/parquet-thrift/src/test/java/org/apache/parquet/thrift/TestProtocolReadToWrite.java
----------------------------------------------------------------------
diff --git a/parquet-thrift/src/test/java/org/apache/parquet/thrift/TestProtocolReadToWrite.java b/parquet-thrift/src/test/java/org/apache/parquet/thrift/TestProtocolReadToWrite.java
index a1b734f..e7be3ea 100644
--- a/parquet-thrift/src/test/java/org/apache/parquet/thrift/TestProtocolReadToWrite.java
+++ b/parquet-thrift/src/test/java/org/apache/parquet/thrift/TestProtocolReadToWrite.java
@@ -161,6 +161,66 @@ public class TestProtocolReadToWrite {
     assertEquals(0, countingHandler.fieldIgnoredCount);
   }
 
+  @Test
+  public void testUnionWithExtraOrNoValues() throws Exception {
+    CountingErrorHandler countingHandler = new CountingErrorHandler();
+    BufferedProtocolReadToWrite p = new BufferedProtocolReadToWrite(new ThriftSchemaConverter().toStructType(StructWithUnionV2.class), countingHandler);
+    ByteArrayOutputStream in = new ByteArrayOutputStream();
+    final ByteArrayOutputStream out = new ByteArrayOutputStream();
+
+    StructWithUnionV2 validUnion = new StructWithUnionV2("a valid struct", UnionV2.aLong(new ALong(17L)));
+
+    StructWithAStructThatLooksLikeUnionV2 allMissing = new StructWithAStructThatLooksLikeUnionV2("all missing",
+        new AStructThatLooksLikeUnionV2());
+
+    AStructThatLooksLikeUnionV2 extra = new AStructThatLooksLikeUnionV2();
+    extra.setALong(new ALong(18L));
+    extra.setANewBool(new ABool(false));
+
+    StructWithAStructThatLooksLikeUnionV2 hasExtra = new StructWithAStructThatLooksLikeUnionV2("has extra",
+        new AStructThatLooksLikeUnionV2(extra));
+
+    validUnion.write(protocol(in));
+    allMissing.write(protocol(in));
+
+    ByteArrayInputStream baos = new ByteArrayInputStream(in.toByteArray());
+
+    // first one should not throw
+    p.readOne(protocol(baos), protocol(out));
+
+    try {
+      p.readOne(protocol(baos), protocol(out));
+      fail("this should throw");
+    } catch (SkippableException e) {
+      Throwable cause = e.getCause();
+      assertEquals(DecodingSchemaMismatchException.class, cause.getClass());
+      assertTrue(cause.getMessage().startsWith("Cannot write a TUnion with no set value in"));
+    }
+    assertEquals(0, countingHandler.recordCountOfMissingFields);
+    assertEquals(0, countingHandler.fieldIgnoredCount);
+
+    in = new ByteArrayOutputStream();
+    validUnion.write(protocol(in));
+    hasExtra.write(protocol(in));
+
+    baos = new ByteArrayInputStream(in.toByteArray());
+
+    // first one should not throw
+    p.readOne(protocol(baos), protocol(out));
+
+    try {
+      p.readOne(protocol(baos), protocol(out));
+      fail("this should throw");
+    } catch (SkippableException e) {
+      Throwable cause = e.getCause();
+      assertEquals(DecodingSchemaMismatchException.class, cause.getClass());
+      assertTrue(cause.getMessage().startsWith("Cannot write a TUnion with more than 1 set value in"));
+    }
+    assertEquals(0, countingHandler.recordCountOfMissingFields);
+    assertEquals(0, countingHandler.fieldIgnoredCount);
+  }
+
+
   /**
    * When enum value in data has an undefined index, it's considered as corrupted record and will be skipped.
    *

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/9993450a/parquet-thrift/src/test/thrift/compat.thrift
----------------------------------------------------------------------
diff --git a/parquet-thrift/src/test/thrift/compat.thrift b/parquet-thrift/src/test/thrift/compat.thrift
index b5d7331..2bd8a8c 100644
--- a/parquet-thrift/src/test/thrift/compat.thrift
+++ b/parquet-thrift/src/test/thrift/compat.thrift
@@ -148,3 +148,14 @@ struct StructWithUnionV2 {
   1: required string name,
   2: required UnionV2 aUnion
 }
+
+struct AStructThatLooksLikeUnionV2 {
+  1: optional AString aString,
+  2: optional ALong aLong,
+  3: optional ABool aNewBool
+}
+
+struct StructWithAStructThatLooksLikeUnionV2 {  
+  1: required string name,
+  2: required AStructThatLooksLikeUnionV2 aNotQuiteUnion
+}