You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by al...@apache.org on 2015/04/30 08:18:54 UTC
parquet-mr git commit: PARQUET-227 Enforce that unions have only 1
set value, tolerate bad records in read path
Repository: parquet-mr
Updated Branches:
refs/heads/master b287d35fe -> 9993450ad
PARQUET-227 Enforce that unions have only 1 set value, tolerate bad records in read path
See https://issues.apache.org/jira/browse/PARQUET-227
Author: Alex Levenson <al...@twitter.com>
Closes #153 from isnotinvain/alexlevenson/double-union and squashes the following commits:
ef4d36f [Alex Levenson] fix package names
e201deb [Alex Levenson] Merge branch 'master' into alexlevenson/double-union
01694fa [Alex Levenson] Forgot a break in a switch statement
2f31321 [Alex Levenson] Merge branch 'master' into alexlevenson/double-union
9292274 [Alex Levenson] Add in ShouldNeverHappenException which I forgot to check in
8d61515 [Alex Levenson] Address first round of comments
4d71bcb [Alex Levenson] Merge branch 'master' into alexlevenson/double-union
8f9334c [Alex Levenson] Some cleanup and fixes
8153bc9 [Alex Levenson] Enforce that unions have only 1 set value, tolerate bad records in read path
Project: http://git-wip-us.apache.org/repos/asf/parquet-mr/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-mr/commit/9993450a
Tree: http://git-wip-us.apache.org/repos/asf/parquet-mr/tree/9993450a
Diff: http://git-wip-us.apache.org/repos/asf/parquet-mr/diff/9993450a
Branch: refs/heads/master
Commit: 9993450ad1f023e0e2b59291361d0b3b9f0e1c8d
Parents: b287d35
Author: Alex Levenson <al...@twitter.com>
Authored: Wed Apr 29 23:18:47 2015 -0700
Committer: Alex Levenson <al...@twitter.com>
Committed: Wed Apr 29 23:18:47 2015 -0700
----------------------------------------------------------------------
.../parquet/io/api/RecordMaterializer.java | 27 +++
.../parquet/ShouldNeverHappenException.java | 40 ++++
.../hadoop/InternalParquetRecordReader.java | 16 +-
.../hadoop/UnmaterializableRecordCounter.java | 87 ++++++++
.../scrooge/TestCorruptScroogeRecords.java | 69 ++++++
parquet-scrooge/src/test/thrift/test.thrift | 34 +++
.../hadoop/thrift/ThriftReadSupport.java | 15 ++
.../thrift/BufferedProtocolReadToWrite.java | 113 +++++++---
.../apache/parquet/thrift/ParquetProtocol.java | 4 +-
.../parquet/thrift/ParquetReadProtocol.java | 4 +
.../parquet/thrift/ThriftRecordConverter.java | 5 +-
.../hadoop/thrift/TestCorruptThriftRecords.java | 213 +++++++++++++++++++
.../hadoop/thrift/TestInputOutputFormat.java | 2 +-
.../parquet/thrift/TestProtocolReadToWrite.java | 60 ++++++
parquet-thrift/src/test/thrift/compat.thrift | 11 +
15 files changed, 665 insertions(+), 35 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/9993450a/parquet-column/src/main/java/org/apache/parquet/io/api/RecordMaterializer.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/io/api/RecordMaterializer.java b/parquet-column/src/main/java/org/apache/parquet/io/api/RecordMaterializer.java
index 98e4d50..9aee114 100644
--- a/parquet-column/src/main/java/org/apache/parquet/io/api/RecordMaterializer.java
+++ b/parquet-column/src/main/java/org/apache/parquet/io/api/RecordMaterializer.java
@@ -18,6 +18,8 @@
*/
package org.apache.parquet.io.api;
+import org.apache.parquet.io.ParquetDecodingException;
+
/**
* Top-level class which should be implemented in order to materialize objects from
* a stream of Parquet data.
@@ -33,6 +35,7 @@ abstract public class RecordMaterializer<T> {
/**
* @return the result of the conversion
+ * @throws RecordMaterializationException to signal that a record cannot be materialized, but can be skipped
*/
abstract public T getCurrentRecord();
@@ -45,4 +48,28 @@ abstract public class RecordMaterializer<T> {
* @return the root converter for this tree
*/
abstract public GroupConverter getRootConverter();
+
+ /**
+ * This exception signals that the current record is cannot be converted from parquet columns to a materialized
+ * record, but can be skipped if requested. This exception should be used to signal errors like a union with no
+ * set values, or an error in converting parquet primitive values to a materialized record. It should not
+ * be used to signal unrecoverable errors, like a data column being corrupt or unreadable.
+ */
+ public static class RecordMaterializationException extends ParquetDecodingException {
+ public RecordMaterializationException() {
+ super();
+ }
+
+ public RecordMaterializationException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public RecordMaterializationException(String message) {
+ super(message);
+ }
+
+ public RecordMaterializationException(Throwable cause) {
+ super(cause);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/9993450a/parquet-common/src/main/java/org/apache/parquet/ShouldNeverHappenException.java
----------------------------------------------------------------------
diff --git a/parquet-common/src/main/java/org/apache/parquet/ShouldNeverHappenException.java b/parquet-common/src/main/java/org/apache/parquet/ShouldNeverHappenException.java
new file mode 100644
index 0000000..4174bc5
--- /dev/null
+++ b/parquet-common/src/main/java/org/apache/parquet/ShouldNeverHappenException.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet;
+
+/**
+ * Used in code blocks that should be unreachable, but the compiler does
+ * not know this, for example the default clause of an exhaustive switch statement.
+ */
+public class ShouldNeverHappenException extends ParquetRuntimeException {
+ public ShouldNeverHappenException() {
+ }
+
+ public ShouldNeverHappenException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public ShouldNeverHappenException(String message) {
+ super(message);
+ }
+
+ public ShouldNeverHappenException(Throwable cause) {
+ super(cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/9993450a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java
index d40e87f..6ff4eac 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java
@@ -24,8 +24,8 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
-
import java.util.Set;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
@@ -43,6 +43,7 @@ import org.apache.parquet.io.ColumnIOFactory;
import org.apache.parquet.io.MessageColumnIO;
import org.apache.parquet.io.ParquetDecodingException;
import org.apache.parquet.io.api.RecordMaterializer;
+import org.apache.parquet.io.api.RecordMaterializer.RecordMaterializationException;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.Type;
@@ -80,6 +81,7 @@ class InternalParquetRecordReader<T> {
private long totalCountLoadedSoFar = 0;
private Path file;
+ private UnmaterializableRecordCounter unmaterializableRecordCounter;
/**
* @param readSupport Object which helps reads files of the given type, e.g. Thrift, Avro.
@@ -179,6 +181,7 @@ class InternalParquetRecordReader<T> {
for (BlockMetaData block : blocks) {
total += block.getRowCount();
}
+ this.unmaterializableRecordCounter = new UnmaterializableRecordCounter(configuration, total);
LOG.info("RecordReader initialized will read a total of " + total + " records.");
}
@@ -206,8 +209,17 @@ class InternalParquetRecordReader<T> {
try {
checkRead();
- currentValue = recordReader.read();
current ++;
+
+ try {
+ currentValue = recordReader.read();
+ } catch (RecordMaterializationException e) {
+ // this might throw, but it's fatal if it does.
+ unmaterializableRecordCounter.incErrors(e);
+ if (DEBUG) LOG.debug("skipping a corrupt record");
+ continue;
+ }
+
if (recordReader.shouldSkipCurrentRecord()) {
// this record is being filtered via the filter2 package
if (DEBUG) LOG.debug("skipping record");
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/9993450a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/UnmaterializableRecordCounter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/UnmaterializableRecordCounter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/UnmaterializableRecordCounter.java
new file mode 100644
index 0000000..c4de8f3
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/UnmaterializableRecordCounter.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.Log;
+import org.apache.parquet.io.ParquetDecodingException;
+import org.apache.parquet.io.api.RecordMaterializer.RecordMaterializationException;
+
+// Essentially taken from:
+// https://github.com/twitter/elephant-bird/blob/master/core/src/main/java/com/twitter/elephantbird/mapreduce/input/LzoRecordReader.java#L124
+
+/**
+ * Tracks number of records that cannot be materialized and throws ParquetDecodingException
+ * if the rate of errors crosses a limit.<p> These types of errors are meant
+ * to be recoverable record conversion errors, such as a union missing a value, or schema
+ * mismatch and so on. It's not meant to recover from corruptions in the parquet
+ * columns themselves.
+ *
+ * The intention is to skip over very rare file corruption or bugs where
+ * the write path has allowed invalid records into the file, but still catch large
+ * numbers of failures. Not turned on by default (by default, no errors are tolerated).
+ */
+public class UnmaterializableRecordCounter {
+
+ /* Tolerated percent bad records */
+ public static final String BAD_RECORD_THRESHOLD_CONF_KEY = "parquet.read.bad.record.threshold";
+
+ private static final Log LOG = Log.getLog(UnmaterializableRecordCounter.class);
+
+ private static final float DEFAULT_THRESHOLD = 0f;
+
+ private long numErrors;
+
+ private final double errorThreshold; // max fraction of errors allowed
+ private final long totalNumRecords; // how many records are we going to see total?
+
+ public UnmaterializableRecordCounter(Configuration conf, long totalNumRecords) {
+ this(
+ conf.getFloat(BAD_RECORD_THRESHOLD_CONF_KEY, DEFAULT_THRESHOLD),
+ totalNumRecords
+ );
+ }
+
+ public UnmaterializableRecordCounter(double errorThreshold, long totalNumRecords) {
+ this.errorThreshold = errorThreshold;
+ this.totalNumRecords = totalNumRecords;
+ numErrors = 0;
+ }
+
+ public void incErrors(RecordMaterializationException cause) throws ParquetDecodingException {
+ numErrors++;
+
+ LOG.warn(String.format("Error while reading an input record (%s out of %s): ",
+ numErrors, totalNumRecords), cause);
+
+ if (numErrors > 0 && errorThreshold <= 0) { // no errors are tolerated
+ throw new ParquetDecodingException("Error while decoding records", cause);
+ }
+
+ double errRate = numErrors/(double)totalNumRecords;
+
+ if (errRate > errorThreshold) {
+ String message = String.format("Decoding error rate of at least %s/%s crosses configured threshold of %s",
+ numErrors, totalNumRecords, errorThreshold);
+ LOG.error(message);
+ throw new ParquetDecodingException(message, cause);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/9993450a/parquet-scrooge/src/test/java/org/apache/parquet/scrooge/TestCorruptScroogeRecords.java
----------------------------------------------------------------------
diff --git a/parquet-scrooge/src/test/java/org/apache/parquet/scrooge/TestCorruptScroogeRecords.java b/parquet-scrooge/src/test/java/org/apache/parquet/scrooge/TestCorruptScroogeRecords.java
new file mode 100644
index 0000000..377134c
--- /dev/null
+++ b/parquet-scrooge/src/test/java/org/apache/parquet/scrooge/TestCorruptScroogeRecords.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.scrooge;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.thrift.protocol.TBinaryProtocol.Factory;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.transport.TIOStreamTransport;
+
+import org.apache.parquet.hadoop.thrift.TestCorruptThriftRecords;
+import org.apache.parquet.hadoop.thrift.ThriftReadSupport;
+import org.apache.parquet.scrooge.test.StructWithUnionV2;
+import org.apache.parquet.scrooge.test.StructWithUnionV2$;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestCorruptScroogeRecords extends TestCorruptThriftRecords {
+
+ @Override
+ public void setupJob(Job job, Path path) throws Exception {
+ job.setInputFormatClass(ParquetScroogeInputFormat.class);
+ ParquetScroogeInputFormat.setInputPaths(job, path);
+ ParquetScroogeInputFormat.setThriftClass(job.getConfiguration(), StructWithUnionV2.class);
+
+
+ ThriftReadSupport.setRecordConverterClass(job.getConfiguration(), ScroogeRecordConverter.class);
+
+ job.setMapperClass(ReadMapper.class);
+ job.setNumReduceTasks(0);
+ job.setOutputFormatClass(NullOutputFormat.class);
+ }
+
+ @Override
+ protected void assertEqualsExcepted(List<org.apache.parquet.thrift.test.compat.StructWithUnionV2> expected, List<Object> found) throws Exception {
+ List<StructWithUnionV2> scroogeExpected = new ArrayList<StructWithUnionV2>();
+ for (org.apache.parquet.thrift.test.compat.StructWithUnionV2 tbase : expected) {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ TProtocol out = new Factory().getProtocol(new TIOStreamTransport(baos));
+ tbase.write(out);
+ TProtocol in = new Factory().getProtocol(new TIOStreamTransport(new ByteArrayInputStream(baos.toByteArray())));
+ scroogeExpected.add(StructWithUnionV2$.MODULE$.decode(in));
+ }
+ assertEquals(scroogeExpected, found);
+ }
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/9993450a/parquet-scrooge/src/test/thrift/test.thrift
----------------------------------------------------------------------
diff --git a/parquet-scrooge/src/test/thrift/test.thrift b/parquet-scrooge/src/test/thrift/test.thrift
index 34cb7a1..bfb71f6 100644
--- a/parquet-scrooge/src/test/thrift/test.thrift
+++ b/parquet-scrooge/src/test/thrift/test.thrift
@@ -172,3 +172,37 @@ struct StringAndBinary {
1: required string s;
2: required binary b;
}
+
+struct AString {
+ 1: required string s
+}
+
+struct ALong {
+ 1: required i64 l
+}
+
+struct ABool {
+ 1: required bool b
+}
+
+union UnionV2 {
+ 1: AString aString,
+ 2: ALong aLong,
+ 3: ABool aNewBool
+}
+
+struct StructWithUnionV2 {
+ 1: required string name,
+ 2: required UnionV2 aUnion
+}
+
+struct AStructThatLooksLikeUnionV2 {
+ 1: optional AString aString,
+ 2: optional ALong aLong,
+ 3: optional ABool aNewBool
+}
+
+struct StructWithAStructThatLooksLikeUnionV2 {
+ 1: required string name,
+ 2: required AStructThatLooksLikeUnionV2 aNotQuiteUnion
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/9993450a/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/ThriftReadSupport.java
----------------------------------------------------------------------
diff --git a/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/ThriftReadSupport.java b/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/ThriftReadSupport.java
index 49b4eac..871f817 100644
--- a/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/ThriftReadSupport.java
+++ b/parquet-thrift/src/main/java/org/apache/parquet/hadoop/thrift/ThriftReadSupport.java
@@ -67,12 +67,27 @@ public class ThriftReadSupport<T> extends ReadSupport<T> {
* implementation creates standard Apache Thrift {@link TBase} objects; to support alternatives, such
* as <a href="http://github.com/twitter/scrooge">Twiter's Scrooge</a>, a custom converter can be specified
* (for example, ScroogeRecordConverter from parquet-scrooge).
+ *
+ * @deprecated use {@link #setRecordConverterClass(Configuration, Class)} below
*/
+ @Deprecated
public static void setRecordConverterClass(JobConf conf,
Class<?> klass) {
+ setRecordConverterClass((Configuration) conf, klass);
+ }
+
+ /**
+ * A {@link ThriftRecordConverter} builds an object by working with {@link TProtocol}. The default
+ * implementation creates standard Apache Thrift {@link TBase} objects; to support alternatives, such
+ * as <a href="http://github.com/twitter/scrooge">Twiter's Scrooge</a>, a custom converter can be specified
+ * (for example, ScroogeRecordConverter from parquet-scrooge).
+ */
+ public static void setRecordConverterClass(Configuration conf,
+ Class<?> klass) {
conf.set(RECORD_CONVERTER_CLASS_KEY, klass.getName());
}
+
/**
* used from hadoop
* the configuration must contain a "parquet.thrift.read.class" setting
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/9993450a/parquet-thrift/src/main/java/org/apache/parquet/thrift/BufferedProtocolReadToWrite.java
----------------------------------------------------------------------
diff --git a/parquet-thrift/src/main/java/org/apache/parquet/thrift/BufferedProtocolReadToWrite.java b/parquet-thrift/src/main/java/org/apache/parquet/thrift/BufferedProtocolReadToWrite.java
index 45c9bf6..70bd003 100644
--- a/parquet-thrift/src/main/java/org/apache/parquet/thrift/BufferedProtocolReadToWrite.java
+++ b/parquet-thrift/src/main/java/org/apache/parquet/thrift/BufferedProtocolReadToWrite.java
@@ -18,22 +18,31 @@
*/
package org.apache.parquet.thrift;
+import java.nio.ByteBuffer;
+import java.util.LinkedList;
+import java.util.List;
+
import org.apache.thrift.TException;
-import org.apache.thrift.protocol.*;
+import org.apache.thrift.protocol.TField;
+import org.apache.thrift.protocol.TList;
+import org.apache.thrift.protocol.TMap;
+import org.apache.thrift.protocol.TMessage;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.protocol.TSet;
+import org.apache.thrift.protocol.TStruct;
+import org.apache.thrift.protocol.TType;
import org.apache.parquet.ParquetRuntimeException;
+import org.apache.parquet.ShouldNeverHappenException;
import org.apache.parquet.thrift.struct.ThriftField;
import org.apache.parquet.thrift.struct.ThriftType;
import org.apache.parquet.thrift.struct.ThriftType.ListType;
import org.apache.parquet.thrift.struct.ThriftType.MapType;
import org.apache.parquet.thrift.struct.ThriftType.SetType;
import org.apache.parquet.thrift.struct.ThriftType.StructType;
+import org.apache.parquet.thrift.struct.ThriftType.StructType.StructOrUnionType;
import org.apache.parquet.thrift.struct.ThriftTypeID;
-import java.nio.ByteBuffer;
-import java.util.LinkedList;
-import java.util.List;
-
/**
* Class to read from one protocol in a buffer and then write to another one
* When there is an exception during reading, it's a skippable exception.
@@ -354,34 +363,17 @@ public class BufferedProtocolReadToWrite implements ProtocolPipe {
return "(";
}
});
+
TField field;
boolean hasFieldsIgnored = false;
+ int childFieldsPresent = 0;
while ((field = in.readFieldBegin()).type != TType.STOP) {
final TField currentField = field;
ThriftField expectedField;
if ((expectedField = type.getChildById(field.id)) == null) {
-
- switch (type.getStructOrUnionType()) {
- case STRUCT:
- // this is an unrecognized field in a struct, not a union
- notifyIgnoredFieldsOfRecord(field);
- hasFieldsIgnored |= true;
- //read the value and ignore it, NullProtocol will do nothing
- new ProtocolReadToWrite().readOneValue(in, new NullProtocol(), field.type);
- continue;
- case UNION:
- // this is a union with an unrecognized member -- this is fatal for this record
- // in the write path, because it will be unreadable in the read path.
- // throwing here means we will either skip this record entirely, or fail completely.
- throw new DecodingSchemaMismatchException("Unrecognized union member with id: "
- + field.id + " for struct:\n" + type);
- case UNKNOWN:
- // we should never reach here in the write path -- this only happens if the
- // deprecated constructor of StructType is used, which should only be used in the
- // read path.
- throw new ParquetRuntimeException("This should never happen! "
- + "Don't know if this field is a union, was the deprecated constructor of StructType used?\n" + type){};
- }
+ handleUnrecognizedField(field, type, in);
+ hasFieldsIgnored |= true;
+ continue;
}
buffer.add(new Action() {
@Override
@@ -394,15 +386,80 @@ public class BufferedProtocolReadToWrite implements ProtocolPipe {
return "f=" + currentField.id + "<t=" + typeName(currentField.type) + ">: ";
}
});
- hasFieldsIgnored |= readOneValue(in, field.type, buffer, expectedField.getType());
+ boolean wasIgnored = readOneValue(in, field.type, buffer, expectedField.getType());
+ if (!wasIgnored) {
+ childFieldsPresent++;
+ }
+ hasFieldsIgnored |= wasIgnored;
in.readFieldEnd();
buffer.add(FIELD_END);
}
+
+ // check that union had exactly 1 (no more no less) child fields.
+ assertUnionHasExactlyOneChild(type, childFieldsPresent);
+
in.readStructEnd();
buffer.add(STRUCT_END);
return hasFieldsIgnored;
}
+ private void handleUnrecognizedField(TField field, StructType type, TProtocol in) throws TException {
+ switch (type.getStructOrUnionType()) {
+ case STRUCT:
+ // this is an unrecognized field in a struct, not a union
+ notifyIgnoredFieldsOfRecord(field);
+ //read the value and ignore it, NullProtocol will do nothing
+ new ProtocolReadToWrite().readOneValue(in, new NullProtocol(), field.type);
+ break;
+ case UNION:
+ // this is a union with an unrecognized member -- this is fatal for this record
+ // in the write path, because it will be unreadable in the read path.
+ // throwing here means we will either skip this record entirely, or fail completely.
+ throw new DecodingSchemaMismatchException("Unrecognized union member with id: "
+ + field.id + " for struct:\n" + type);
+ case UNKNOWN:
+ throw unknownStructOrUnion(type);
+ default:
+ throw unrecognizedStructOrUnion(type.getStructOrUnionType());
+ }
+ }
+
+ private void assertUnionHasExactlyOneChild(StructType type, int childFieldsPresent) {
+ switch (type.getStructOrUnionType()) {
+ case STRUCT:
+ // nothing to do
+ break;
+ case UNION:
+ // childFieldsPresent must == 1
+ if (childFieldsPresent != 1) {
+
+ if (childFieldsPresent == 0) {
+ throw new DecodingSchemaMismatchException("Cannot write a TUnion with no set value in :\n" + type);
+ } else {
+ throw new DecodingSchemaMismatchException("Cannot write a TUnion with more than 1 set value in :\n" + type);
+ }
+
+ }
+ break;
+ case UNKNOWN:
+ throw unknownStructOrUnion(type);
+ default:
+ throw unrecognizedStructOrUnion(type.getStructOrUnionType());
+ }
+ }
+
+ private static ShouldNeverHappenException unrecognizedStructOrUnion(StructOrUnionType type) {
+ return new ShouldNeverHappenException("Unrecognized StructOrUnionType: " + type);
+ }
+
+ // we should never reach here in the write path -- this only happens if the
+ // deprecated constructor of StructType is used, which should only be used in the
+ // read path.
+ private static ShouldNeverHappenException unknownStructOrUnion(StructType type) {
+ return new ShouldNeverHappenException("This should never happen! "
+ + "Don't know if this field is a union, was the deprecated constructor of StructType used?\n" + type);
+ }
+
private boolean readOneMap(TProtocol in, List<Action> buffer, MapType mapType) throws TException {
final TMap map = in.readMapBegin();
buffer.add(new Action() {
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/9993450a/parquet-thrift/src/main/java/org/apache/parquet/thrift/ParquetProtocol.java
----------------------------------------------------------------------
diff --git a/parquet-thrift/src/main/java/org/apache/parquet/thrift/ParquetProtocol.java b/parquet-thrift/src/main/java/org/apache/parquet/thrift/ParquetProtocol.java
index 50f09cf..0151cde 100644
--- a/parquet-thrift/src/main/java/org/apache/parquet/thrift/ParquetProtocol.java
+++ b/parquet-thrift/src/main/java/org/apache/parquet/thrift/ParquetProtocol.java
@@ -61,11 +61,11 @@ public abstract class ParquetProtocol extends TProtocol {
this.name = name;
}
- private UnsupportedOperationException exception() {
+ private TException exception() {
String message = name == null ?
"in " + getClassInfo() :
"when we expected " + name + " in " + getClassInfo();
- return new UnsupportedOperationException(new Exception().getStackTrace()[1].getMethodName() + " was called " + message);
+ return new TException(new UnsupportedOperationException(new Exception().getStackTrace()[1].getMethodName() + " was called " + message));
}
/** WRITE */
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/9993450a/parquet-thrift/src/main/java/org/apache/parquet/thrift/ParquetReadProtocol.java
----------------------------------------------------------------------
diff --git a/parquet-thrift/src/main/java/org/apache/parquet/thrift/ParquetReadProtocol.java b/parquet-thrift/src/main/java/org/apache/parquet/thrift/ParquetReadProtocol.java
index c7872af..d3b496a 100644
--- a/parquet-thrift/src/main/java/org/apache/parquet/thrift/ParquetReadProtocol.java
+++ b/parquet-thrift/src/main/java/org/apache/parquet/thrift/ParquetReadProtocol.java
@@ -54,6 +54,10 @@ class ParquetReadProtocol extends ParquetProtocol {
this.events.addAll(events);
}
+ public void clear() {
+ this.events.clear();
+ }
+
private TProtocol next() {
return events.removeFirst();
}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/9993450a/parquet-thrift/src/main/java/org/apache/parquet/thrift/ThriftRecordConverter.java
----------------------------------------------------------------------
diff --git a/parquet-thrift/src/main/java/org/apache/parquet/thrift/ThriftRecordConverter.java b/parquet-thrift/src/main/java/org/apache/parquet/thrift/ThriftRecordConverter.java
index d0daa35..ec0f4ff 100644
--- a/parquet-thrift/src/main/java/org/apache/parquet/thrift/ThriftRecordConverter.java
+++ b/parquet-thrift/src/main/java/org/apache/parquet/thrift/ThriftRecordConverter.java
@@ -33,7 +33,6 @@ import org.apache.thrift.protocol.TSet;
import org.apache.thrift.protocol.TStruct;
import org.apache.thrift.protocol.TType;
-import org.apache.parquet.io.ParquetDecodingException;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.io.api.Converter;
import org.apache.parquet.io.api.GroupConverter;
@@ -843,7 +842,9 @@ public class ThriftRecordConverter<T> extends RecordMaterializer<T> {
rootEvents.clear();
return thriftReader.readOneRecord(protocol);
} catch (TException e) {
- throw new ParquetDecodingException("Could not read thrift object from protocol", e);
+ protocol.clear();
+ rootEvents.clear();
+ throw new RecordMaterializationException("Could not read thrift object from protocol", e);
}
}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/9993450a/parquet-thrift/src/test/java/org/apache/parquet/hadoop/thrift/TestCorruptThriftRecords.java
----------------------------------------------------------------------
diff --git a/parquet-thrift/src/test/java/org/apache/parquet/hadoop/thrift/TestCorruptThriftRecords.java b/parquet-thrift/src/test/java/org/apache/parquet/hadoop/thrift/TestCorruptThriftRecords.java
new file mode 100644
index 0000000..c31aa9c
--- /dev/null
+++ b/parquet-thrift/src/test/java/org/apache/parquet/hadoop/thrift/TestCorruptThriftRecords.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.thrift;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import org.apache.parquet.hadoop.UnmaterializableRecordCounter;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.thrift.ThriftParquetWriter;
+import org.apache.parquet.thrift.test.compat.ABool;
+import org.apache.parquet.thrift.test.compat.ALong;
+import org.apache.parquet.thrift.test.compat.AString;
+import org.apache.parquet.thrift.test.compat.AStructThatLooksLikeUnionV2;
+import org.apache.parquet.thrift.test.compat.StructWithAStructThatLooksLikeUnionV2;
+import org.apache.parquet.thrift.test.compat.StructWithUnionV2;
+import org.apache.parquet.thrift.test.compat.UnionV2;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static org.apache.parquet.hadoop.thrift.TestInputOutputFormat.waitForJob;
+
+public class TestCorruptThriftRecords {
+
+ @Rule
+ public final TemporaryFolder tempDir = new TemporaryFolder();
+
+ public static class ReadMapper<T> extends Mapper<Void, T, Void, Void> {
+ public static List<Object> records;
+
+ @Override
+ protected void setup(Context context) throws IOException, InterruptedException {
+ records = new ArrayList<Object>();
+ }
+
+ @Override
+ protected void map(Void key, T value, Context context) throws IOException, InterruptedException {
+ records.add(value);
+ }
+ }
+
+ public static StructWithAStructThatLooksLikeUnionV2 makeValid(int i) {
+ AStructThatLooksLikeUnionV2 validUnion = new AStructThatLooksLikeUnionV2();
+ switch (i % 3) {
+ case 0:
+ validUnion.setALong(new ALong(17L));
+ break;
+ case 1:
+ validUnion.setANewBool(new ABool(false));
+ break;
+ case 2:
+ validUnion.setAString(new AString("bar"));
+ break;
+ }
+ return new StructWithAStructThatLooksLikeUnionV2("foo" + i, validUnion);
+ }
+
+ public static StructWithUnionV2 makeExpectedValid(int i) {
+ UnionV2 validUnion = new UnionV2();
+ switch (i % 3) {
+ case 0:
+ validUnion.setALong(new ALong(17L));
+ break;
+ case 1:
+ validUnion.setANewBool(new ABool(false));
+ break;
+ case 2:
+ validUnion.setAString(new AString("bar"));
+ break;
+ }
+ return new StructWithUnionV2("foo" + i, validUnion);
+ }
+
+ public static StructWithAStructThatLooksLikeUnionV2 makeInvalid(int i) {
+ AStructThatLooksLikeUnionV2 invalid = new AStructThatLooksLikeUnionV2();
+ if (i % 2 == 0) {
+ // sometimes write too many
+ invalid.setALong(new ALong(18l));
+ invalid.setANewBool(new ABool(false));
+ } else {
+ // sometimes write too few
+ }
+ return new StructWithAStructThatLooksLikeUnionV2("foo" + i, invalid);
+ }
+
+ protected void setupJob(Job job, Path path) throws Exception {
+ job.setInputFormatClass(ParquetThriftInputFormat.class);
+ ParquetThriftInputFormat.setInputPaths(job, path);
+ ParquetThriftInputFormat.setThriftClass(job.getConfiguration(), StructWithUnionV2.class);
+
+ job.setMapperClass(ReadMapper.class);
+ job.setNumReduceTasks(0);
+ job.setOutputFormatClass(NullOutputFormat.class);
+ }
+
+ protected void assertEqualsExcepted(List<StructWithUnionV2> expected, List<Object> found) throws Exception {
+ assertEquals(expected, found);
+ }
+
+ private Path writeFileWithCorruptRecords(int numCorrupt, List<StructWithUnionV2> collectExpectedRecords) throws Exception {
+ // generate a file with records that are corrupt according to thrift
+ // by writing some structs that when interpreted as unions will be
+ // unreadable
+ Path outputPath = new Path(new File(tempDir.getRoot(), "corrupt_out").getAbsolutePath());
+ ParquetWriter<StructWithAStructThatLooksLikeUnionV2> writer = new ThriftParquetWriter<StructWithAStructThatLooksLikeUnionV2>(
+ outputPath,
+ StructWithAStructThatLooksLikeUnionV2.class,
+ CompressionCodecName.UNCOMPRESSED
+ );
+
+ int numRecords = 0;
+
+ for (int i = 0; i < 100; i++) {
+ StructWithAStructThatLooksLikeUnionV2 valid = makeValid(numRecords);
+ StructWithUnionV2 expected = makeExpectedValid(numRecords);
+ numRecords++;
+ collectExpectedRecords.add(expected);
+ writer.write(valid);
+ }
+
+ for (int i = 0; i < numCorrupt; i++) {
+ writer.write(makeInvalid(numRecords++));
+ }
+
+ for (int i = 0; i < 100; i++) {
+ StructWithAStructThatLooksLikeUnionV2 valid = makeValid(numRecords);
+ StructWithUnionV2 expected = makeExpectedValid(numRecords);
+ numRecords++;
+ collectExpectedRecords.add(expected);
+ writer.write(valid);
+ }
+
+ writer.close();
+
+ return outputPath;
+ }
+
+ private void readFile(Path path, Configuration conf, String name) throws Exception {
+ Job job = new Job(conf, name);
+ setupJob(job, path);
+ waitForJob(job);
+ }
+
+ @Test
+ public void testDefaultsToNoTolerance() throws Exception {
+ ArrayList<StructWithUnionV2> expected = new ArrayList<StructWithUnionV2>();
+ try {
+ readFile(writeFileWithCorruptRecords(1, expected), new Configuration(), "testDefaultsToNoTolerance");
+ fail("This should throw");
+ } catch (RuntimeException e) {
+ // still should have actually read all the valid records
+ assertEquals(100, ReadMapper.records.size());
+ assertEqualsExcepted(expected.subList(0, 100), ReadMapper.records);
+ }
+ }
+
+ @Test
+ public void testCanTolerateBadRecords() throws Exception {
+ Configuration conf = new Configuration();
+ conf.setFloat(UnmaterializableRecordCounter.BAD_RECORD_THRESHOLD_CONF_KEY, 0.1f);
+
+ List<StructWithUnionV2> expected = new ArrayList<StructWithUnionV2>();
+
+ readFile(writeFileWithCorruptRecords(4, expected), conf, "testCanTolerateBadRecords");
+ assertEquals(200, ReadMapper.records.size());
+ assertEqualsExcepted(expected, ReadMapper.records);
+ }
+
+ @Test
+ public void testThrowsWhenTooManyBadRecords() throws Exception {
+ Configuration conf = new Configuration();
+ conf.setFloat(UnmaterializableRecordCounter.BAD_RECORD_THRESHOLD_CONF_KEY, 0.1f);
+
+ ArrayList<StructWithUnionV2> expected = new ArrayList<StructWithUnionV2>();
+
+ try {
+ readFile(writeFileWithCorruptRecords(300, expected), conf, "testThrowsWhenTooManyBadRecords");
+ fail("This should throw");
+ } catch (RuntimeException e) {
+ // still should have actually read all the valid records
+ assertEquals(100, ReadMapper.records.size());
+ assertEqualsExcepted(expected.subList(0, 100), ReadMapper.records);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/9993450a/parquet-thrift/src/test/java/org/apache/parquet/hadoop/thrift/TestInputOutputFormat.java
----------------------------------------------------------------------
diff --git a/parquet-thrift/src/test/java/org/apache/parquet/hadoop/thrift/TestInputOutputFormat.java b/parquet-thrift/src/test/java/org/apache/parquet/hadoop/thrift/TestInputOutputFormat.java
index 69a2d31..0835cdb 100644
--- a/parquet-thrift/src/test/java/org/apache/parquet/hadoop/thrift/TestInputOutputFormat.java
+++ b/parquet-thrift/src/test/java/org/apache/parquet/hadoop/thrift/TestInputOutputFormat.java
@@ -242,7 +242,7 @@ public class TestInputOutputFormat {
waitForJob(job);
}
- private void waitForJob(Job job) throws Exception {
+ public static void waitForJob(Job job) throws Exception {
job.submit();
while (!job.isComplete()) {
LOG.debug("waiting for job " + job.getJobName());
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/9993450a/parquet-thrift/src/test/java/org/apache/parquet/thrift/TestProtocolReadToWrite.java
----------------------------------------------------------------------
diff --git a/parquet-thrift/src/test/java/org/apache/parquet/thrift/TestProtocolReadToWrite.java b/parquet-thrift/src/test/java/org/apache/parquet/thrift/TestProtocolReadToWrite.java
index a1b734f..e7be3ea 100644
--- a/parquet-thrift/src/test/java/org/apache/parquet/thrift/TestProtocolReadToWrite.java
+++ b/parquet-thrift/src/test/java/org/apache/parquet/thrift/TestProtocolReadToWrite.java
@@ -161,6 +161,66 @@ public class TestProtocolReadToWrite {
assertEquals(0, countingHandler.fieldIgnoredCount);
}
+ @Test
+ public void testUnionWithExtraOrNoValues() throws Exception {
+ CountingErrorHandler countingHandler = new CountingErrorHandler();
+ BufferedProtocolReadToWrite p = new BufferedProtocolReadToWrite(new ThriftSchemaConverter().toStructType(StructWithUnionV2.class), countingHandler);
+ ByteArrayOutputStream in = new ByteArrayOutputStream();
+ final ByteArrayOutputStream out = new ByteArrayOutputStream();
+
+ StructWithUnionV2 validUnion = new StructWithUnionV2("a valid struct", UnionV2.aLong(new ALong(17L)));
+
+ StructWithAStructThatLooksLikeUnionV2 allMissing = new StructWithAStructThatLooksLikeUnionV2("all missing",
+ new AStructThatLooksLikeUnionV2());
+
+ AStructThatLooksLikeUnionV2 extra = new AStructThatLooksLikeUnionV2();
+ extra.setALong(new ALong(18L));
+ extra.setANewBool(new ABool(false));
+
+ StructWithAStructThatLooksLikeUnionV2 hasExtra = new StructWithAStructThatLooksLikeUnionV2("has extra",
+ new AStructThatLooksLikeUnionV2(extra));
+
+ validUnion.write(protocol(in));
+ allMissing.write(protocol(in));
+
+ ByteArrayInputStream baos = new ByteArrayInputStream(in.toByteArray());
+
+ // first one should not throw
+ p.readOne(protocol(baos), protocol(out));
+
+ try {
+ p.readOne(protocol(baos), protocol(out));
+ fail("this should throw");
+ } catch (SkippableException e) {
+ Throwable cause = e.getCause();
+ assertEquals(DecodingSchemaMismatchException.class, cause.getClass());
+ assertTrue(cause.getMessage().startsWith("Cannot write a TUnion with no set value in"));
+ }
+ assertEquals(0, countingHandler.recordCountOfMissingFields);
+ assertEquals(0, countingHandler.fieldIgnoredCount);
+
+ in = new ByteArrayOutputStream();
+ validUnion.write(protocol(in));
+ hasExtra.write(protocol(in));
+
+ baos = new ByteArrayInputStream(in.toByteArray());
+
+ // first one should not throw
+ p.readOne(protocol(baos), protocol(out));
+
+ try {
+ p.readOne(protocol(baos), protocol(out));
+ fail("this should throw");
+ } catch (SkippableException e) {
+ Throwable cause = e.getCause();
+ assertEquals(DecodingSchemaMismatchException.class, cause.getClass());
+ assertTrue(cause.getMessage().startsWith("Cannot write a TUnion with more than 1 set value in"));
+ }
+ assertEquals(0, countingHandler.recordCountOfMissingFields);
+ assertEquals(0, countingHandler.fieldIgnoredCount);
+ }
+
+
/**
* When enum value in data has an undefined index, it's considered as corrupted record and will be skipped.
*
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/9993450a/parquet-thrift/src/test/thrift/compat.thrift
----------------------------------------------------------------------
diff --git a/parquet-thrift/src/test/thrift/compat.thrift b/parquet-thrift/src/test/thrift/compat.thrift
index b5d7331..2bd8a8c 100644
--- a/parquet-thrift/src/test/thrift/compat.thrift
+++ b/parquet-thrift/src/test/thrift/compat.thrift
@@ -148,3 +148,14 @@ struct StructWithUnionV2 {
1: required string name,
2: required UnionV2 aUnion
}
+
+struct AStructThatLooksLikeUnionV2 {
+ 1: optional AString aString,
+ 2: optional ALong aLong,
+ 3: optional ABool aNewBool
+}
+
+struct StructWithAStructThatLooksLikeUnionV2 {
+ 1: required string name,
+ 2: required AStructThatLooksLikeUnionV2 aNotQuiteUnion
+}