You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by al...@apache.org on 2015/07/01 03:34:27 UTC
[1/2] parquet-mr git commit: PARQUET-251: Binary column statistics
error when reuse byte[] among rows
Repository: parquet-mr
Updated Branches:
refs/heads/master e6ee42e9b -> e3b95020f
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/e3b95020/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordReader.java
index 7b83770..beb7dd6 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordReader.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordReader.java
@@ -188,9 +188,8 @@ public class ParquetRecordReader<T> extends RecordReader<Void, T> {
}
}
MessageType fileSchema = footer.getFileMetaData().getSchema();
- Map<String, String> fileMetaData = footer.getFileMetaData().getKeyValueMetaData();
internalReader.initialize(
- fileSchema, fileMetaData, path, filteredBlocks, configuration);
+ fileSchema, footer.getFileMetaData(), path, filteredBlocks, configuration);
}
/**
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/e3b95020/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java
index c8b3778..2c644b6 100644
--- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java
@@ -116,7 +116,8 @@ public class TestColumnChunkPageWriteStore {
{
ParquetMetadata footer = ParquetFileReader.readFooter(conf, file, NO_FILTER);
- ParquetFileReader reader = new ParquetFileReader(conf, file, footer.getBlocks(), schema.getColumns());
+ ParquetFileReader reader = new ParquetFileReader(
+ conf, footer.getFileMetaData(), file, footer.getBlocks(), schema.getColumns());
PageReadStore rowGroup = reader.readNextRowGroup();
PageReader pageReader = rowGroup.getPageReader(col);
DataPageV2 page = (DataPageV2)pageReader.readPage();
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/e3b95020/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java
index 1f17209..6151f48 100644
--- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java
@@ -172,7 +172,8 @@ public class TestParquetFileWriter {
assertEquals(expectedEncoding,readFooter.getBlocks().get(0).getColumns().get(0).getEncodings());
{ // read first block of col #1
- ParquetFileReader r = new ParquetFileReader(configuration, path, Arrays.asList(readFooter.getBlocks().get(0)), Arrays.asList(SCHEMA.getColumnDescription(PATH1)));
+ ParquetFileReader r = new ParquetFileReader(configuration, readFooter.getFileMetaData(), path,
+ Arrays.asList(readFooter.getBlocks().get(0)), Arrays.asList(SCHEMA.getColumnDescription(PATH1)));
PageReadStore pages = r.readNextRowGroup();
assertEquals(3, pages.getRowCount());
validateContains(SCHEMA, pages, PATH1, 2, BytesInput.from(BYTES1));
@@ -182,7 +183,8 @@ public class TestParquetFileWriter {
{ // read all blocks of col #1 and #2
- ParquetFileReader r = new ParquetFileReader(configuration, path, readFooter.getBlocks(), Arrays.asList(SCHEMA.getColumnDescription(PATH1), SCHEMA.getColumnDescription(PATH2)));
+ ParquetFileReader r = new ParquetFileReader(configuration, readFooter.getFileMetaData(), path,
+ readFooter.getBlocks(), Arrays.asList(SCHEMA.getColumnDescription(PATH1), SCHEMA.getColumnDescription(PATH2)));
PageReadStore pages = r.readNextRowGroup();
assertEquals(3, pages.getRowCount());
@@ -275,7 +277,8 @@ public class TestParquetFileWriter {
120, readFooter.getBlocks().get(1).getStartingPos());
{ // read first block of col #1
- ParquetFileReader r = new ParquetFileReader(conf, path, Arrays.asList(readFooter.getBlocks().get(0)), Arrays.asList(SCHEMA.getColumnDescription(PATH1)));
+ ParquetFileReader r = new ParquetFileReader(conf, readFooter.getFileMetaData(), path,
+ Arrays.asList(readFooter.getBlocks().get(0)), Arrays.asList(SCHEMA.getColumnDescription(PATH1)));
PageReadStore pages = r.readNextRowGroup();
assertEquals(3, pages.getRowCount());
validateContains(SCHEMA, pages, PATH1, 2, BytesInput.from(BYTES1));
@@ -285,7 +288,8 @@ public class TestParquetFileWriter {
{ // read all blocks of col #1 and #2
- ParquetFileReader r = new ParquetFileReader(conf, path, readFooter.getBlocks(), Arrays.asList(SCHEMA.getColumnDescription(PATH1), SCHEMA.getColumnDescription(PATH2)));
+ ParquetFileReader r = new ParquetFileReader(conf, readFooter.getFileMetaData(), path,
+ readFooter.getBlocks(), Arrays.asList(SCHEMA.getColumnDescription(PATH1), SCHEMA.getColumnDescription(PATH2)));
PageReadStore pages = r.readNextRowGroup();
assertEquals(3, pages.getRowCount());
@@ -378,7 +382,8 @@ public class TestParquetFileWriter {
109, readFooter.getBlocks().get(1).getStartingPos());
{ // read first block of col #1
- ParquetFileReader r = new ParquetFileReader(conf, path, Arrays.asList(readFooter.getBlocks().get(0)), Arrays.asList(SCHEMA.getColumnDescription(PATH1)));
+ ParquetFileReader r = new ParquetFileReader(conf, readFooter.getFileMetaData(), path,
+ Arrays.asList(readFooter.getBlocks().get(0)), Arrays.asList(SCHEMA.getColumnDescription(PATH1)));
PageReadStore pages = r.readNextRowGroup();
assertEquals(3, pages.getRowCount());
validateContains(SCHEMA, pages, PATH1, 2, BytesInput.from(BYTES1));
@@ -388,7 +393,8 @@ public class TestParquetFileWriter {
{ // read all blocks of col #1 and #2
- ParquetFileReader r = new ParquetFileReader(conf, path, readFooter.getBlocks(), Arrays.asList(SCHEMA.getColumnDescription(PATH1), SCHEMA.getColumnDescription(PATH2)));
+ ParquetFileReader r = new ParquetFileReader(conf, readFooter.getFileMetaData(), path,
+ readFooter.getBlocks(), Arrays.asList(SCHEMA.getColumnDescription(PATH1), SCHEMA.getColumnDescription(PATH2)));
PageReadStore pages = r.readNextRowGroup();
assertEquals(3, pages.getRowCount());
@@ -417,8 +423,13 @@ public class TestParquetFileWriter {
for (long l: longArray) {
parquetMRstats.updateStats(l);
}
- Statistics thriftStats = org.apache.parquet.format.converter.ParquetMetadataConverter.toParquetStatistics(parquetMRstats);
- LongStatistics convertedBackStats = (LongStatistics) org.apache.parquet.format.converter.ParquetMetadataConverter.fromParquetStatistics(thriftStats, PrimitiveTypeName.INT64);
+ final String createdBy =
+ "parquet-mr version 1.8.0 (build d4d5a07ec9bd262ca1e93c309f1d7d4a74ebda4c)";
+ Statistics thriftStats =
+ org.apache.parquet.format.converter.ParquetMetadataConverter.toParquetStatistics(parquetMRstats);
+ LongStatistics convertedBackStats =
+ (LongStatistics) org.apache.parquet.format.converter.ParquetMetadataConverter.fromParquetStatistics(
+ createdBy, thriftStats, PrimitiveTypeName.INT64);
assertEquals(parquetMRstats.getMax(), convertedBackStats.getMax());
assertEquals(parquetMRstats.getMin(), convertedBackStats.getMin());
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/e3b95020/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java
index 2583278..3dcec30 100644
--- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java
@@ -37,7 +37,6 @@ import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.Test;
@@ -95,7 +94,7 @@ public class TestParquetWriter {
.append("float_field", 1.0f)
.append("double_field", 2.0d)
.append("flba_field", "foo")
- .append("int96_field", Binary.fromByteArray(new byte[12])));
+ .append("int96_field", Binary.fromConstantByteArray(new byte[12])));
}
writer.close();
ParquetReader<Group> reader = ParquetReader.builder(new GroupReadSupport(), file).withConf(conf).build();
@@ -108,7 +107,8 @@ public class TestParquetWriter {
assertEquals(1.0f, group.getFloat("float_field", 0), 0.001);
assertEquals(2.0d, group.getDouble("double_field", 0), 0.001);
assertEquals("foo", group.getBinary("flba_field", 0).toStringUsingUTF8());
- assertEquals(Binary.fromByteArray(new byte[12]), group.getInt96("int96_field", 0));
+ assertEquals(Binary.fromConstantByteArray(new byte[12]),
+ group.getInt96("int96_field",0));
}
reader.close();
ParquetMetadata footer = readFooter(conf, file, NO_FILTER);
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/e3b95020/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriterNewPage.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriterNewPage.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriterNewPage.java
index 9dd1323..b885a86 100644
--- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriterNewPage.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriterNewPage.java
@@ -99,7 +99,7 @@ public class TestParquetWriterNewPage {
.append("float_field", 1.0f)
.append("double_field", 2.0d)
.append("flba_field", "foo")
- .append("int96_field", Binary.fromByteArray(new byte[12])));
+ .append("int96_field", Binary.fromConstantByteArray(new byte[12])));
}
writer.close();
@@ -113,7 +113,8 @@ public class TestParquetWriterNewPage {
assertEquals(1.0f, group.getFloat("float_field", 0), 0.001);
assertEquals(2.0d, group.getDouble("double_field", 0), 0.001);
assertEquals("foo", group.getBinary("flba_field", 0).toStringUsingUTF8());
- assertEquals(Binary.fromByteArray(new byte[12]), group.getInt96("int96_field", 0));
+ assertEquals(Binary.fromConstantByteArray(new byte[12]), group.getInt96("int96_field",
+ 0));
assertEquals(0, group.getFieldRepetitionCount("null_field"));
}
reader.close();
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/e3b95020/parquet-hive/parquet-hive-storage-handler/src/main/java/org/apache/hadoop/hive/ql/io/parquet/writable/BinaryWritable.java
----------------------------------------------------------------------
diff --git a/parquet-hive/parquet-hive-storage-handler/src/main/java/org/apache/hadoop/hive/ql/io/parquet/writable/BinaryWritable.java b/parquet-hive/parquet-hive-storage-handler/src/main/java/org/apache/hadoop/hive/ql/io/parquet/writable/BinaryWritable.java
index 0779822..09744a5 100644
--- a/parquet-hive/parquet-hive-storage-handler/src/main/java/org/apache/hadoop/hive/ql/io/parquet/writable/BinaryWritable.java
+++ b/parquet-hive/parquet-hive-storage-handler/src/main/java/org/apache/hadoop/hive/ql/io/parquet/writable/BinaryWritable.java
@@ -57,7 +57,7 @@ public class BinaryWritable implements Writable {
public void readFields(DataInput input) throws IOException {
byte[] bytes = new byte[input.readInt()];
input.readFully(bytes);
- binary = Binary.fromByteArray(bytes);
+ binary = Binary.fromConstantByteArray(bytes);
}
@Override
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/e3b95020/parquet-hive/parquet-hive-storage-handler/src/main/java/org/apache/hadoop/hive/serde2/objectinspector/primitive/ParquetStringInspector.java
----------------------------------------------------------------------
diff --git a/parquet-hive/parquet-hive-storage-handler/src/main/java/org/apache/hadoop/hive/serde2/objectinspector/primitive/ParquetStringInspector.java b/parquet-hive/parquet-hive-storage-handler/src/main/java/org/apache/hadoop/hive/serde2/objectinspector/primitive/ParquetStringInspector.java
index b734f8a..1f056e2 100644
--- a/parquet-hive/parquet-hive-storage-handler/src/main/java/org/apache/hadoop/hive/serde2/objectinspector/primitive/ParquetStringInspector.java
+++ b/parquet-hive/parquet-hive-storage-handler/src/main/java/org/apache/hadoop/hive/serde2/objectinspector/primitive/ParquetStringInspector.java
@@ -77,7 +77,8 @@ public class ParquetStringInspector extends JavaStringObjectInspector implements
@Override
public Object set(final Object o, final Text text) {
- return new BinaryWritable(text == null ? null : Binary.fromByteArray(text.getBytes()));
+ return new BinaryWritable(text == null ? null : Binary.fromReusedByteArray(text.getBytes
+ ()));
}
@Override
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/e3b95020/parquet-pig/src/main/java/org/apache/parquet/pig/TupleWriteSupport.java
----------------------------------------------------------------------
diff --git a/parquet-pig/src/main/java/org/apache/parquet/pig/TupleWriteSupport.java b/parquet-pig/src/main/java/org/apache/parquet/pig/TupleWriteSupport.java
index 2cf79ee..829fe70 100644
--- a/parquet-pig/src/main/java/org/apache/parquet/pig/TupleWriteSupport.java
+++ b/parquet-pig/src/main/java/org/apache/parquet/pig/TupleWriteSupport.java
@@ -172,7 +172,7 @@ public class TupleWriteSupport extends WriteSupport<Tuple> {
} else {
throw new UnsupportedOperationException("can not convert from " + DataType.findTypeName(pigType.type) + " to BINARY ");
}
- recordConsumer.addBinary(Binary.fromByteArray(bytes));
+ recordConsumer.addBinary(Binary.fromReusedByteArray(bytes));
break;
case BOOLEAN:
recordConsumer.addBoolean((Boolean)t.get(i));
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/e3b95020/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoWriteSupport.java
----------------------------------------------------------------------
diff --git a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoWriteSupport.java b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoWriteSupport.java
index 180ce41..40e36d5 100644
--- a/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoWriteSupport.java
+++ b/parquet-protobuf/src/main/java/org/apache/parquet/proto/ProtoWriteSupport.java
@@ -318,7 +318,7 @@ public class ProtoWriteSupport<T extends MessageOrBuilder> extends WriteSupport<
@Override
final void writeRawValue(Object value) {
ByteString byteString = (ByteString) value;
- Binary binary = Binary.fromByteArray(byteString.toByteArray());
+ Binary binary = Binary.fromConstantByteArray(byteString.toByteArray());
recordConsumer.addBinary(binary);
}
}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/e3b95020/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoWriteSupportTest.java
----------------------------------------------------------------------
diff --git a/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoWriteSupportTest.java b/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoWriteSupportTest.java
index c7430c5..73f7734 100644
--- a/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoWriteSupportTest.java
+++ b/parquet-protobuf/src/test/java/org/apache/parquet/proto/ProtoWriteSupportTest.java
@@ -50,7 +50,7 @@ public class ProtoWriteSupportTest {
inOrder.verify(readConsumerMock).startMessage();
inOrder.verify(readConsumerMock).startField("one", 0);
- inOrder.verify(readConsumerMock).addBinary(Binary.fromByteArray("oneValue".getBytes()));
+ inOrder.verify(readConsumerMock).addBinary(Binary.fromString("oneValue"));
inOrder.verify(readConsumerMock).endField("one", 0);
inOrder.verify(readConsumerMock).endMessage();
@@ -95,10 +95,10 @@ public class ProtoWriteSupportTest {
inOrder.verify(readConsumerMock).startField("inner", 0);
inOrder.verify(readConsumerMock).startGroup();
inOrder.verify(readConsumerMock).startField("one", 0);
- inOrder.verify(readConsumerMock).addBinary(Binary.fromByteArray("one".getBytes()));
+ inOrder.verify(readConsumerMock).addBinary(Binary.fromConstantByteArray("one".getBytes()));
inOrder.verify(readConsumerMock).endField("one", 0);
inOrder.verify(readConsumerMock).startField("two", 1);
- inOrder.verify(readConsumerMock).addBinary(Binary.fromByteArray("two".getBytes()));
+ inOrder.verify(readConsumerMock).addBinary(Binary.fromConstantByteArray("two".getBytes()));
inOrder.verify(readConsumerMock).endField("two", 1);
inOrder.verify(readConsumerMock).endGroup();
inOrder.verify(readConsumerMock).endField("inner", 0);
@@ -124,14 +124,14 @@ public class ProtoWriteSupportTest {
//first inner message
inOrder.verify(readConsumerMock).startGroup();
inOrder.verify(readConsumerMock).startField("one", 0);
- inOrder.verify(readConsumerMock).addBinary(Binary.fromByteArray("one".getBytes()));
+ inOrder.verify(readConsumerMock).addBinary(Binary.fromConstantByteArray("one".getBytes()));
inOrder.verify(readConsumerMock).endField("one", 0);
inOrder.verify(readConsumerMock).endGroup();
//second inner message
inOrder.verify(readConsumerMock).startGroup();
inOrder.verify(readConsumerMock).startField("two", 1);
- inOrder.verify(readConsumerMock).addBinary(Binary.fromByteArray("two".getBytes()));
+ inOrder.verify(readConsumerMock).addBinary(Binary.fromConstantByteArray("two".getBytes()));
inOrder.verify(readConsumerMock).endField("two", 1);
inOrder.verify(readConsumerMock).endGroup();
@@ -157,7 +157,7 @@ public class ProtoWriteSupportTest {
inOrder.verify(readConsumerMock).startGroup();
inOrder.verify(readConsumerMock).startField("one", 0);
- inOrder.verify(readConsumerMock).addBinary(Binary.fromByteArray("one".getBytes()));
+ inOrder.verify(readConsumerMock).addBinary(Binary.fromConstantByteArray("one".getBytes()));
inOrder.verify(readConsumerMock).endField("one", 0);
inOrder.verify(readConsumerMock).endGroup();
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/e3b95020/parquet-thrift/src/main/java/org/apache/parquet/thrift/ParquetWriteProtocol.java
----------------------------------------------------------------------
diff --git a/parquet-thrift/src/main/java/org/apache/parquet/thrift/ParquetWriteProtocol.java b/parquet-thrift/src/main/java/org/apache/parquet/thrift/ParquetWriteProtocol.java
index 6530823..40984cc 100644
--- a/parquet-thrift/src/main/java/org/apache/parquet/thrift/ParquetWriteProtocol.java
+++ b/parquet-thrift/src/main/java/org/apache/parquet/thrift/ParquetWriteProtocol.java
@@ -645,7 +645,8 @@ public class ParquetWriteProtocol extends ParquetProtocol {
}
private void writeBinaryToRecordConsumer(ByteBuffer buf) {
- recordConsumer.addBinary(Binary.fromByteArray(buf.array(), buf.position(), buf.limit() - buf.position()));
+ recordConsumer.addBinary(Binary.fromReusedByteArray(buf.array(), buf.position(),
+ buf.limit() - buf.position()));
}
private void writeStringToRecordConsumer(String str) {
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/e3b95020/parquet-tools/src/main/java/org/apache/parquet/tools/command/DumpCommand.java
----------------------------------------------------------------------
diff --git a/parquet-tools/src/main/java/org/apache/parquet/tools/command/DumpCommand.java b/parquet-tools/src/main/java/org/apache/parquet/tools/command/DumpCommand.java
index a4660bd..837a46a 100644
--- a/parquet-tools/src/main/java/org/apache/parquet/tools/command/DumpCommand.java
+++ b/parquet-tools/src/main/java/org/apache/parquet/tools/command/DumpCommand.java
@@ -178,7 +178,8 @@ public class DumpCommand extends ArgsOnlyCommand {
MetadataUtils.showDetails(out, ccmds);
List<BlockMetaData> rblocks = Collections.singletonList(block);
- freader = new ParquetFileReader(conf, inpath, rblocks, columns);
+ freader = new ParquetFileReader(
+ conf, meta.getFileMetaData(), inpath, rblocks, columns);
PageReadStore store = freader.readNextRowGroup();
while (store != null) {
out.incrementTabLevel();
@@ -211,7 +212,8 @@ public class DumpCommand extends ArgsOnlyCommand {
long page = 1;
long total = blocks.size();
long offset = 1;
- freader = new ParquetFileReader(conf, inpath, blocks, Collections.singletonList(column));
+ freader = new ParquetFileReader(
+ conf, meta.getFileMetaData(), inpath, blocks, Collections.singletonList(column));
PageReadStore store = freader.readNextRowGroup();
while (store != null) {
ColumnReadStoreImpl crstore = new ColumnReadStoreImpl(store, new DumpGroupConverter(), schema);
@@ -307,7 +309,7 @@ public class DumpCommand extends ArgsOnlyCommand {
}
public static String binaryToString(Binary value) {
- byte[] data = value.getBytes();
+ byte[] data = value.getBytesUnsafe();
if (data == null) return null;
try {
@@ -320,7 +322,7 @@ public class DumpCommand extends ArgsOnlyCommand {
}
public static BigInteger binaryToBigInteger(Binary value) {
- byte[] data = value.getBytes();
+ byte[] data = value.getBytesUnsafe();
if (data == null) return null;
return new BigInteger(data);
[2/2] parquet-mr git commit: PARQUET-251: Binary column statistics
error when reuse byte[] among rows
Posted by al...@apache.org.
PARQUET-251: Binary column statistics error when reuse byte[] among rows
Author: asingh <as...@cloudera.com>
Author: Alex Levenson <al...@twitter.com>
Author: Ashish Singh <as...@cloudera.com>
Closes #197 from SinghAsDev/PARQUET-251 and squashes the following commits:
68e0eae [asingh] Remove deprecated constructors from private classes
67e4e5f [asingh] Add removed public methods in Binary and deprecate them
0e71728 [asingh] Add comment for BinaryStatistics.setMinMaxFromBytes
fbe873f [Ashish Singh] Merge pull request #4 from isnotinvain/PR-197-3
9826ee6 [Alex Levenson] Some minor cleanup
7570035 [asingh] Remove test for stats getting ingnored for version 160 when type is int64
af43d28 [Alex Levenson] Address PR feedback
89ab4ee [Alex Levenson] put the headers in the right location
2838cc9 [Alex Levenson] Split out version checks to separate files, add some tests
5af9142 [Alex Levenson] Generalize tests, make Binary.fromString reused=false
e00d9b7 [asingh] Rename isReused => isBackingBytesReused
d2ad939 [asingh] Rebase over latest trunk
857141a [asingh] Remove redundant junit dependency
32b88ed [asingh] Remove semver from hadoop-common
7a0e99e [asingh] Revert to fromConstantByteArray for ByteString
c820ec9 [asingh] Add unit tests for Binary and to check if stats are ignored for version 160
9bbd1e5 [asingh] Improve version parsing
84a1d8b [asingh] Remove ignoring stats on write side and ignore it on read side
903f8e3 [asingh] Address some review comments. * Ignore stats for writer's version < 1.8.0 * Refactor shoudlIgnoreStatistics method a bit * Assume implementations other than parquet-mr were writing binary statistics correctly * Add toParquetStatistics method's original method signature to maintain backwards compatibility and mark it as deprecated
64c2617 [asingh] Revert changes for ignoring stats at RowGroupFilter level
e861b18 [asingh] Ignore max min stats while reading
3a8cb8d [asingh] Fix typo
8e12618 [asingh] Fix usage of fromConstant versions of Binary constructors
860adf7 [asingh] Rename unmodified to constant and isReused instead of isUnmodifiable
0d127a7 [asingh] Add unmodfied and Reused versions for creating a Binary. Add copy() to Binary.
b4e2950 [asingh] Skip filtering based on stats when file was written with version older than 1.6.1
6fcee8c [asingh] Add getBytesUnsafe() to Binary that returns backing byte[] if possible, else returns result of getBytes()
30b07dd [asingh] PARQUET-251: Binary column statistics error when reuse byte[] among rows
Project: http://git-wip-us.apache.org/repos/asf/parquet-mr/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-mr/commit/e3b95020
Tree: http://git-wip-us.apache.org/repos/asf/parquet-mr/tree/e3b95020
Diff: http://git-wip-us.apache.org/repos/asf/parquet-mr/diff/e3b95020
Branch: refs/heads/master
Commit: e3b95020f777eb5e0651977f654c1662e3ea1f29
Parents: e6ee42e
Author: asingh <as...@cloudera.com>
Authored: Tue Jun 30 18:34:48 2015 -0700
Committer: Alex Levenson <al...@twitter.com>
Committed: Tue Jun 30 18:34:48 2015 -0700
----------------------------------------------------------------------
.../apache/parquet/avro/AvroWriteSupport.java | 8 +-
.../org/apache/parquet/avro/TestReadWrite.java | 5 +-
.../avro/TestReadWriteOldListBehavior.java | 4 +-
.../parquet/benchmarks/DataGenerator.java | 2 +-
.../org/apache/parquet/CorruptStatistics.java | 104 +++++++++
.../column/statistics/BinaryStatistics.java | 18 +-
.../DeltaLengthByteArrayValuesReader.java | 2 +-
.../DeltaLengthByteArrayValuesWriter.java | 2 +-
.../deltastrings/DeltaByteArrayReader.java | 8 +-
.../deltastrings/DeltaByteArrayWriter.java | 2 +-
.../dictionary/DictionaryValuesWriter.java | 9 +-
.../dictionary/PlainValuesDictionary.java | 4 +-
.../values/plain/BinaryPlainValuesReader.java | 2 +-
.../FixedLenByteArrayPlainValuesReader.java | 2 +-
.../parquet/example/data/simple/NanoTime.java | 2 +-
.../io/RecordConsumerLoggingWrapper.java | 2 +-
.../java/org/apache/parquet/io/api/Binary.java | 169 +++++++++++++--
.../apache/parquet/schema/PrimitiveType.java | 2 +-
.../apache/parquet/CorruptStatisticsTest.java | 78 +++++++
.../column/statistics/TestStatistics.java | 18 ++
.../values/dictionary/TestDictionary.java | 8 +-
.../org/apache/parquet/io/api/TestBinary.java | 215 +++++++++++++++++++
.../org/apache/parquet/SemanticVersion.java | 133 ++++++++++++
.../java/org/apache/parquet/VersionParser.java | 105 +++++++++
.../org/apache/parquet/SemanticVersionTest.java | 53 +++++
.../java/org/apache/parquet/VersionTest.java | 44 +++-
.../converter/ParquetMetadataConverter.java | 41 +++-
.../hadoop/InternalParquetRecordReader.java | 6 +-
.../parquet/hadoop/ParquetFileReader.java | 37 +++-
.../apache/parquet/hadoop/ParquetReader.java | 2 +-
.../parquet/hadoop/ParquetRecordReader.java | 3 +-
.../hadoop/TestColumnChunkPageWriteStore.java | 3 +-
.../parquet/hadoop/TestParquetFileWriter.java | 27 ++-
.../parquet/hadoop/TestParquetWriter.java | 6 +-
.../hadoop/TestParquetWriterNewPage.java | 5 +-
.../ql/io/parquet/writable/BinaryWritable.java | 2 +-
.../primitive/ParquetStringInspector.java | 3 +-
.../apache/parquet/pig/TupleWriteSupport.java | 2 +-
.../apache/parquet/proto/ProtoWriteSupport.java | 2 +-
.../parquet/proto/ProtoWriteSupportTest.java | 12 +-
.../parquet/thrift/ParquetWriteProtocol.java | 3 +-
.../parquet/tools/command/DumpCommand.java | 10 +-
42 files changed, 1044 insertions(+), 121 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/e3b95020/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java
index e86c579..35e3924 100644
--- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java
+++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java
@@ -252,9 +252,9 @@ public class AvroWriteSupport<T> extends WriteSupport<T> {
recordConsumer.addDouble(((Number) value).doubleValue());
} else if (avroType.equals(Schema.Type.BYTES)) {
if (value instanceof byte[]) {
- recordConsumer.addBinary(Binary.fromByteArray((byte[]) value));
+ recordConsumer.addBinary(Binary.fromReusedByteArray((byte[]) value));
} else {
- recordConsumer.addBinary(Binary.fromByteBuffer((ByteBuffer) value));
+ recordConsumer.addBinary(Binary.fromReusedByteBuffer((ByteBuffer) value));
}
} else if (avroType.equals(Schema.Type.STRING)) {
recordConsumer.addBinary(fromAvroString(value));
@@ -269,14 +269,14 @@ public class AvroWriteSupport<T> extends WriteSupport<T> {
} else if (avroType.equals(Schema.Type.UNION)) {
writeUnion(type.asGroupType(), nonNullAvroSchema, value);
} else if (avroType.equals(Schema.Type.FIXED)) {
- recordConsumer.addBinary(Binary.fromByteArray(((GenericFixed) value).bytes()));
+ recordConsumer.addBinary(Binary.fromReusedByteArray(((GenericFixed) value).bytes()));
}
}
private Binary fromAvroString(Object value) {
if (value instanceof Utf8) {
Utf8 utf8 = (Utf8) value;
- return Binary.fromByteArray(utf8.getBytes(), 0, utf8.getByteLength());
+ return Binary.fromReusedByteArray(utf8.getBytes(), 0, utf8.getByteLength());
}
return Binary.fromString(value.toString());
}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/e3b95020/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java b/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java
index 4d37f40..855a5b1 100644
--- a/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java
+++ b/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java
@@ -362,7 +362,8 @@ public class TestReadWrite {
recordConsumer.endField("mydouble", index++);
recordConsumer.startField("mybytes", index);
- recordConsumer.addBinary(Binary.fromByteBuffer((ByteBuffer) record.get("mybytes")));
+ recordConsumer.addBinary(
+ Binary.fromReusedByteBuffer((ByteBuffer) record.get("mybytes")));
recordConsumer.endField("mybytes", index++);
recordConsumer.startField("mystring", index);
@@ -457,7 +458,7 @@ public class TestReadWrite {
recordConsumer.endField("mymap", index++);
recordConsumer.startField("myfixed", index);
- recordConsumer.addBinary(Binary.fromByteArray((byte[]) record.get("myfixed")));
+ recordConsumer.addBinary(Binary.fromReusedByteArray((byte[]) record.get("myfixed")));
recordConsumer.endField("myfixed", index++);
recordConsumer.endMessage();
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/e3b95020/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWriteOldListBehavior.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWriteOldListBehavior.java b/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWriteOldListBehavior.java
index 34a160a..7c2bc27 100644
--- a/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWriteOldListBehavior.java
+++ b/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWriteOldListBehavior.java
@@ -404,7 +404,7 @@ public class TestReadWriteOldListBehavior {
recordConsumer.endField("mydouble", index++);
recordConsumer.startField("mybytes", index);
- recordConsumer.addBinary(Binary.fromByteBuffer((ByteBuffer) record.get("mybytes")));
+ recordConsumer.addBinary(Binary.fromReusedByteBuffer((ByteBuffer) record.get("mybytes")));
recordConsumer.endField("mybytes", index++);
recordConsumer.startField("mystring", index);
@@ -499,7 +499,7 @@ public class TestReadWriteOldListBehavior {
recordConsumer.endField("mymap", index++);
recordConsumer.startField("myfixed", index);
- recordConsumer.addBinary(Binary.fromByteArray((byte[]) record.get("myfixed")));
+ recordConsumer.addBinary(Binary.fromReusedByteArray((byte[]) record.get("myfixed")));
recordConsumer.endField("myfixed", index++);
recordConsumer.endMessage();
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/e3b95020/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/DataGenerator.java
----------------------------------------------------------------------
diff --git a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/DataGenerator.java b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/DataGenerator.java
index 05c35bd..42d9953 100644
--- a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/DataGenerator.java
+++ b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/DataGenerator.java
@@ -107,7 +107,7 @@ public class DataGenerator {
.append("float_field", 1.0f)
.append("double_field", 2.0d)
.append("flba_field", new String(chars))
- .append("int96_field", Binary.fromByteArray(new byte[12]))
+ .append("int96_field", Binary.fromConstantByteArray(new byte[12]))
);
}
writer.close();
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/e3b95020/parquet-column/src/main/java/org/apache/parquet/CorruptStatistics.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/CorruptStatistics.java b/parquet-column/src/main/java/org/apache/parquet/CorruptStatistics.java
new file mode 100644
index 0000000..8e15d01
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/CorruptStatistics.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet;
+
+import org.apache.parquet.SemanticVersion.SemanticVersionParseException;
+import org.apache.parquet.VersionParser.ParsedVersion;
+import org.apache.parquet.VersionParser.VersionParseException;
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
+
+/**
+ * There was a bug (PARQUET-251) that caused the statistics metadata
+ * for binary columns to be corrupted in the write path.
+ *
+ * This class is used to detect whether a file was written with this bug,
+ * and thus it's statistics should be ignored / not trusted.
+ */
+public class CorruptStatistics {
+ private static final Log LOG = Log.getLog(CorruptStatistics.class);
+
+ // the version in which the bug described by jira: PARQUET-251 was fixed
+ // the bug involved writing invalid binary statistics, so stats written prior to this
+ // fix must be ignored / assumed invalid
+ private static final SemanticVersion PARQUET_251_FIXED_VERSION = new SemanticVersion(1, 8, 0);
+
+ /**
+ * Decides if the statistics from a file created by createdBy (the created_by field from parquet format)
+ * should be ignored because they are potentially corrupt.
+ */
+ public static boolean shouldIgnoreStatistics(String createdBy, PrimitiveTypeName columnType) {
+
+ if (columnType != PrimitiveTypeName.BINARY && columnType != PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) {
+ // the bug only applies to binary columns
+ return false;
+ }
+
+ if (Strings.isNullOrEmpty(createdBy)) {
+ // created_by is not populated, which could have been caused by
+ // parquet-mr during the same time as PARQUET-251, see PARQUET-297
+ LOG.info("Ignoring statistics because created_by is null or empty! See PARQUET-251 and PARQUET-297");
+ return true;
+ }
+
+ try {
+ ParsedVersion version = VersionParser.parse(createdBy);
+
+ if (!"parquet-mr".equals(version.application)) {
+ // assume other applications don't have this bug
+ return false;
+ }
+
+ if (Strings.isNullOrEmpty(version.semver)) {
+ LOG.warn("Ignoring statistics because created_by did not contain a semver (see PARQUET-251): " + createdBy);
+ return true;
+ }
+
+ SemanticVersion semver = SemanticVersion.parse(version.semver);
+
+ if (semver.compareTo(PARQUET_251_FIXED_VERSION) < 0) {
+ LOG.info("Ignoring statistics because this file was created prior to "
+ + PARQUET_251_FIXED_VERSION
+ + ", see PARQUET-251" );
+ return true;
+ }
+
+ // this file was created after the fix
+ return false;
+ } catch (RuntimeException e) {
+ // couldn't parse the created_by field, log what went wrong, don't trust the stats,
+ // but don't make this fatal.
+ warnParseError(createdBy, e);
+ return true;
+ } catch (SemanticVersionParseException e) {
+ // couldn't parse the created_by field, log what went wrong, don't trust the stats,
+ // but don't make this fatal.
+ warnParseError(createdBy, e);
+ return true;
+ } catch (VersionParseException e) {
+ // couldn't parse the created_by field, log what went wrong, don't trust the stats,
+ // but don't make this fatal.
+ warnParseError(createdBy, e);
+ return true;
+ }
+ }
+
+ private static void warnParseError(String createdBy, Throwable e) {
+ LOG.warn("Ignoring statistics because created_by could not be parsed (see PARQUET-251): " + createdBy, e);
+ }
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/e3b95020/parquet-column/src/main/java/org/apache/parquet/column/statistics/BinaryStatistics.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/statistics/BinaryStatistics.java b/parquet-column/src/main/java/org/apache/parquet/column/statistics/BinaryStatistics.java
index 6341f96..e8439f0 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/statistics/BinaryStatistics.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/statistics/BinaryStatistics.java
@@ -44,10 +44,16 @@ public class BinaryStatistics extends Statistics<Binary> {
}
}
+ /**
+ * Sets min and max values, re-uses the byte[] passed in.
+ * Any changes made to byte[] will be reflected in min and max values as well.
+ * @param minBytes byte array to set the min value to
+ * @param maxBytes byte array to set the max value to
+ */
@Override
public void setMinMaxFromBytes(byte[] minBytes, byte[] maxBytes) {
- max = Binary.fromByteArray(maxBytes);
- min = Binary.fromByteArray(minBytes);
+ max = Binary.fromReusedByteArray(maxBytes);
+ min = Binary.fromReusedByteArray(minBytes);
this.markAsNotEmpty();
}
@@ -72,13 +78,13 @@ public class BinaryStatistics extends Statistics<Binary> {
}
public void updateStats(Binary min_value, Binary max_value) {
- if (min.compareTo(min_value) > 0) { min = min_value; }
- if (max.compareTo(max_value) < 0) { max = max_value; }
+ if (min.compareTo(min_value) > 0) { min = min_value.copy(); }
+ if (max.compareTo(max_value) < 0) { max = max_value.copy(); }
}
public void initializeStats(Binary min_value, Binary max_value) {
- min = min_value;
- max = max_value;
+ min = min_value.copy();
+ max = max_value.copy();
this.markAsNotEmpty();
}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/e3b95020/parquet-column/src/main/java/org/apache/parquet/column/values/deltalengthbytearray/DeltaLengthByteArrayValuesReader.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/deltalengthbytearray/DeltaLengthByteArrayValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/deltalengthbytearray/DeltaLengthByteArrayValuesReader.java
index 2db1336..fb9bdc5 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/deltalengthbytearray/DeltaLengthByteArrayValuesReader.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/deltalengthbytearray/DeltaLengthByteArrayValuesReader.java
@@ -59,7 +59,7 @@ public class DeltaLengthByteArrayValuesReader extends ValuesReader {
int length = lengthReader.readInteger();
int start = offset;
offset = start + length;
- return Binary.fromByteArray(in, start, length);
+ return Binary.fromConstantByteArray(in, start, length);
}
@Override
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/e3b95020/parquet-column/src/main/java/org/apache/parquet/column/values/deltalengthbytearray/DeltaLengthByteArrayValuesWriter.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/deltalengthbytearray/DeltaLengthByteArrayValuesWriter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/deltalengthbytearray/DeltaLengthByteArrayValuesWriter.java
index 0a498b1..3f686cc 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/deltalengthbytearray/DeltaLengthByteArrayValuesWriter.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/deltalengthbytearray/DeltaLengthByteArrayValuesWriter.java
@@ -61,7 +61,7 @@ public class DeltaLengthByteArrayValuesWriter extends ValuesWriter {
public void writeBytes(Binary v) {
try {
lengthWriter.writeInteger(v.length());
- out.write(v.getBytes());
+ v.writeTo(out);
} catch (IOException e) {
throw new ParquetEncodingException("could not write bytes", e);
}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/e3b95020/parquet-column/src/main/java/org/apache/parquet/column/values/deltastrings/DeltaByteArrayReader.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/deltastrings/DeltaByteArrayReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/deltastrings/DeltaByteArrayReader.java
index de3df02..fd55035 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/deltastrings/DeltaByteArrayReader.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/deltastrings/DeltaByteArrayReader.java
@@ -40,7 +40,7 @@ public class DeltaByteArrayReader extends ValuesReader {
public DeltaByteArrayReader() {
this.prefixLengthReader = new DeltaBinaryPackingValuesReader();
this.suffixReader = new DeltaLengthByteArrayValuesReader();
- this.previous = Binary.fromByteArray(new byte[0]);
+ this.previous = Binary.fromConstantByteArray(new byte[0]);
}
@Override
@@ -67,9 +67,9 @@ public class DeltaByteArrayReader extends ValuesReader {
// We have to do this to materialize the output
if(prefixLength != 0) {
byte[] out = new byte[length];
- System.arraycopy(previous.getBytes(), 0, out, 0, prefixLength);
- System.arraycopy(suffix.getBytes(), 0, out, prefixLength, suffix.length());
- previous = Binary.fromByteArray(out);
+ System.arraycopy(previous.getBytesUnsafe(), 0, out, 0, prefixLength);
+ System.arraycopy(suffix.getBytesUnsafe(), 0, out, prefixLength, suffix.length());
+ previous = Binary.fromConstantByteArray(out);
} else {
previous = suffix;
}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/e3b95020/parquet-column/src/main/java/org/apache/parquet/column/values/deltastrings/DeltaByteArrayWriter.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/deltastrings/DeltaByteArrayWriter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/deltastrings/DeltaByteArrayWriter.java
index eb33d30..54234db 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/deltastrings/DeltaByteArrayWriter.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/deltastrings/DeltaByteArrayWriter.java
@@ -87,7 +87,7 @@ public class DeltaByteArrayWriter extends ValuesWriter{
int length = previous.length < vb.length ? previous.length : vb.length;
for(i = 0; (i < length) && (previous[i] == vb[i]); i++);
prefixLengthWriter.writeInteger(i);
- suffixWriter.writeBytes(Binary.fromByteArray(vb, i, vb.length - i));
+ suffixWriter.writeBytes(v.slice(i, vb.length - i));
previous = vb;
}
}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/e3b95020/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/DictionaryValuesWriter.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/DictionaryValuesWriter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/DictionaryValuesWriter.java
index 928c125..eb9fdd9 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/DictionaryValuesWriter.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/DictionaryValuesWriter.java
@@ -235,7 +235,7 @@ public abstract class DictionaryValuesWriter extends ValuesWriter implements Req
int id = binaryDictionaryContent.getInt(v);
if (id == -1) {
id = binaryDictionaryContent.size();
- binaryDictionaryContent.put(copy(v), id);
+ binaryDictionaryContent.put(v.copy(), id);
// length as int (4 bytes) + actual bytes
dictionaryByteSize += 4 + v.length();
}
@@ -283,11 +283,6 @@ public abstract class DictionaryValuesWriter extends ValuesWriter implements Req
writer.writeBytes(reverseDictionary[id]);
}
}
-
- protected static Binary copy(Binary binary) {
- return Binary.fromByteArray(
- Arrays.copyOf(binary.getBytes(), binary.length()));
- }
}
/**
@@ -311,7 +306,7 @@ public abstract class DictionaryValuesWriter extends ValuesWriter implements Req
int id = binaryDictionaryContent.getInt(value);
if (id == -1) {
id = binaryDictionaryContent.size();
- binaryDictionaryContent.put(copy(value), id);
+ binaryDictionaryContent.put(value.copy(), id);
dictionaryByteSize += length;
}
encodedValues.add(id);
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/e3b95020/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/PlainValuesDictionary.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/PlainValuesDictionary.java b/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/PlainValuesDictionary.java
index 055dd73..e671310 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/PlainValuesDictionary.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/PlainValuesDictionary.java
@@ -96,7 +96,7 @@ public abstract class PlainValuesDictionary extends Dictionary {
// read the length
offset += 4;
// wrap the content in a binary
- binaryDictionaryContent[i] = Binary.fromByteArray(dictionaryBytes, offset, len);
+ binaryDictionaryContent[i] = Binary.fromConstantByteArray(dictionaryBytes, offset, len);
// increment to the next value
offset += len;
}
@@ -106,7 +106,7 @@ public abstract class PlainValuesDictionary extends Dictionary {
"Invalid byte array length: " + length);
for (int i = 0; i < binaryDictionaryContent.length; i++) {
// wrap the content in a Binary
- binaryDictionaryContent[i] = Binary.fromByteArray(
+ binaryDictionaryContent[i] = Binary.fromConstantByteArray(
dictionaryBytes, offset, length);
// increment to the next value
offset += length;
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/e3b95020/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BinaryPlainValuesReader.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BinaryPlainValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BinaryPlainValuesReader.java
index f567803..4346e02 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BinaryPlainValuesReader.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BinaryPlainValuesReader.java
@@ -39,7 +39,7 @@ public class BinaryPlainValuesReader extends ValuesReader {
int length = BytesUtils.readIntLittleEndian(in, offset);
int start = offset + 4;
offset = start + length;
- return Binary.fromByteArray(in, start, length);
+ return Binary.fromConstantByteArray(in, start, length);
} catch (IOException e) {
throw new ParquetDecodingException("could not read bytes at offset " + offset, e);
} catch (RuntimeException e) {
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/e3b95020/parquet-column/src/main/java/org/apache/parquet/column/values/plain/FixedLenByteArrayPlainValuesReader.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/FixedLenByteArrayPlainValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/FixedLenByteArrayPlainValuesReader.java
index 3a7d245..098a486 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/FixedLenByteArrayPlainValuesReader.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/FixedLenByteArrayPlainValuesReader.java
@@ -46,7 +46,7 @@ public class FixedLenByteArrayPlainValuesReader extends ValuesReader {
try {
int start = offset;
offset = start + length;
- return Binary.fromByteArray(in, start, length);
+ return Binary.fromConstantByteArray(in, start, length);
} catch (RuntimeException e) {
throw new ParquetDecodingException("could not read bytes at offset " + offset, e);
}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/e3b95020/parquet-column/src/main/java/org/apache/parquet/example/data/simple/NanoTime.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/example/data/simple/NanoTime.java b/parquet-column/src/main/java/org/apache/parquet/example/data/simple/NanoTime.java
index 61eff42..f8f3979 100644
--- a/parquet-column/src/main/java/org/apache/parquet/example/data/simple/NanoTime.java
+++ b/parquet-column/src/main/java/org/apache/parquet/example/data/simple/NanoTime.java
@@ -61,7 +61,7 @@ public class NanoTime extends Primitive {
buf.putLong(timeOfDayNanos);
buf.putInt(julianDay);
buf.flip();
- return Binary.fromByteBuffer(buf);
+ return Binary.fromConstantByteBuffer(buf);
}
public Int96Value toInt96() {
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/e3b95020/parquet-column/src/main/java/org/apache/parquet/io/RecordConsumerLoggingWrapper.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/io/RecordConsumerLoggingWrapper.java b/parquet-column/src/main/java/org/apache/parquet/io/RecordConsumerLoggingWrapper.java
index f753b2a..642c1f4 100644
--- a/parquet-column/src/main/java/org/apache/parquet/io/RecordConsumerLoggingWrapper.java
+++ b/parquet-column/src/main/java/org/apache/parquet/io/RecordConsumerLoggingWrapper.java
@@ -112,7 +112,7 @@ public class RecordConsumerLoggingWrapper extends RecordConsumer {
*/
@Override
public void addBinary(Binary value) {
- if (DEBUG) log(Arrays.toString(value.getBytes()));
+ if (DEBUG) log(Arrays.toString(value.getBytesUnsafe()));
delegate.addBinary(value);
}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/e3b95020/parquet-column/src/main/java/org/apache/parquet/io/api/Binary.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/io/api/Binary.java b/parquet-column/src/main/java/org/apache/parquet/io/api/Binary.java
index d3abec5..f88d740 100644
--- a/parquet-column/src/main/java/org/apache/parquet/io/api/Binary.java
+++ b/parquet-column/src/main/java/org/apache/parquet/io/api/Binary.java
@@ -33,10 +33,12 @@ import static org.apache.parquet.bytes.BytesUtils.UTF8;
abstract public class Binary implements Comparable<Binary>, Serializable {
+ protected boolean isBackingBytesReused;
+
// this isn't really something others should extend
private Binary() { }
- public static final Binary EMPTY = fromByteArray(new byte[0]);
+ public static final Binary EMPTY = fromConstantByteArray(new byte[0]);
abstract public String toStringUsingUTF8();
@@ -48,6 +50,16 @@ abstract public class Binary implements Comparable<Binary>, Serializable {
abstract public byte[] getBytes();
+ /**
+ * Variant of getBytes() that avoids copying backing data structure by returning
+ * backing byte[] of the Binary. Do not modify backing byte[] unless you know what
+ * you are doing.
+ * @return backing byte[] of correct size, with an offset of 0, if possible, else returns result of getBytes()
+ */
+ abstract public byte[] getBytesUnsafe();
+
+ abstract public Binary slice(int start, int length);
+
abstract boolean equals(byte[] bytes, int offset, int length);
abstract boolean equals(Binary other);
@@ -71,7 +83,28 @@ abstract public class Binary implements Comparable<Binary>, Serializable {
@Override
public String toString() {
- return "Binary{" + length() + " bytes, " + Arrays.toString(getBytes()) + "}";
+ return "Binary{" +
+ length() +
+ (isBackingBytesReused ? " reused": " constant") +
+ " bytes, " +
+ Arrays.toString(getBytesUnsafe())
+ + "}";
+ }
+
+ public Binary copy() {
+ if (isBackingBytesReused) {
+ return Binary.fromConstantByteArray(getBytes());
+ } else {
+ return this;
+ }
+ }
+
+ /**
+ * Signals if backing bytes are owned, and can be modified, by producer of the Binary
+ * @return if backing bytes are held on by producer of the Binary
+ */
+ public boolean isBackingBytesReused() {
+ return isBackingBytesReused;
}
private static class ByteArraySliceBackedBinary extends Binary {
@@ -79,10 +112,11 @@ abstract public class Binary implements Comparable<Binary>, Serializable {
private final int offset;
private final int length;
- public ByteArraySliceBackedBinary(byte[] value, int offset, int length) {
+ public ByteArraySliceBackedBinary(byte[] value, int offset, int length, boolean isBackingBytesReused) {
this.value = value;
this.offset = offset;
this.length = length;
+ this.isBackingBytesReused = isBackingBytesReused;
}
@Override
@@ -110,6 +144,21 @@ abstract public class Binary implements Comparable<Binary>, Serializable {
}
@Override
+ public byte[] getBytesUnsafe() {
+ // Backing array is larger than the slice used for this Binary.
+ return getBytes();
+ }
+
+ @Override
+ public Binary slice(int start, int length) {
+ if (isBackingBytesReused) {
+ return Binary.fromReusedByteArray(value, offset + start, length);
+ } else {
+ return Binary.fromConstantByteArray(value, offset + start, length);
+ }
+ }
+
+ @Override
public int hashCode() {
return Binary.hashCode(value, offset, length);
}
@@ -147,8 +196,19 @@ abstract public class Binary implements Comparable<Binary>, Serializable {
}
private static class FromStringBinary extends ByteArrayBackedBinary {
- public FromStringBinary(byte[] value) {
- super(value);
+ public FromStringBinary(String value) {
+ // reused is false, because we do not
+ // hold on to the underlying bytes,
+ // and nobody else has a handle to them
+ super(encodeUTF8(value), false);
+ }
+
+ private static byte[] encodeUTF8(String value) {
+ try {
+ return value.getBytes("UTF-8");
+ } catch (UnsupportedEncodingException e) {
+ throw new ParquetEncodingException("UTF-8 not supported.", e);
+ }
}
@Override
@@ -157,15 +217,29 @@ abstract public class Binary implements Comparable<Binary>, Serializable {
}
}
+ public static Binary fromReusedByteArray(final byte[] value, final int offset, final int length) {
+ return new ByteArraySliceBackedBinary(value, offset, length, true);
+ }
+
+ public static Binary fromConstantByteArray(final byte[] value, final int offset,
+ final int length) {
+ return new ByteArraySliceBackedBinary(value, offset, length, false);
+ }
+
+ @Deprecated
+ /**
+ * @deprecated Use @link{fromReusedByteArray} or @link{fromConstantByteArray} instead
+ */
public static Binary fromByteArray(final byte[] value, final int offset, final int length) {
- return new ByteArraySliceBackedBinary(value, offset, length);
+ return fromReusedByteArray(value, offset, length); // Assume producer intends to reuse byte[]
}
private static class ByteArrayBackedBinary extends Binary {
private final byte[] value;
- public ByteArrayBackedBinary(byte[] value) {
+ public ByteArrayBackedBinary(byte[] value, boolean isBackingBytesReused) {
this.value = value;
+ this.isBackingBytesReused = isBackingBytesReused;
}
@Override
@@ -185,10 +259,24 @@ abstract public class Binary implements Comparable<Binary>, Serializable {
@Override
public byte[] getBytes() {
+ return Arrays.copyOfRange(value, 0, value.length);
+ }
+
+ @Override
+ public byte[] getBytesUnsafe() {
return value;
}
@Override
+ public Binary slice(int start, int length) {
+ if (isBackingBytesReused) {
+ return Binary.fromReusedByteArray(value, start, length);
+ } else {
+ return Binary.fromConstantByteArray(value, start, length);
+ }
+ }
+
+ @Override
public int hashCode() {
return Binary.hashCode(value, 0, value.length);
}
@@ -225,15 +313,29 @@ abstract public class Binary implements Comparable<Binary>, Serializable {
}
+ public static Binary fromReusedByteArray(final byte[] value) {
+ return new ByteArrayBackedBinary(value, true);
+ }
+
+ public static Binary fromConstantByteArray(final byte[] value) {
+ return new ByteArrayBackedBinary(value, false);
+ }
+
+ @Deprecated
+ /**
+ * @deprecated Use @link{fromReusedByteArray} or @link{fromConstantByteArray} instead
+ */
public static Binary fromByteArray(final byte[] value) {
- return new ByteArrayBackedBinary(value);
+ return fromReusedByteArray(value); // Assume producer intends to reuse byte[]
}
private static class ByteBufferBackedBinary extends Binary {
private transient ByteBuffer value;
+ private transient byte[] cachedBytes;
- public ByteBufferBackedBinary(ByteBuffer value) {
+ public ByteBufferBackedBinary(ByteBuffer value, boolean isBackingBytesReused) {
this.value = value;
+ this.isBackingBytesReused = isBackingBytesReused;
}
@Override
@@ -249,7 +351,7 @@ abstract public class Binary implements Comparable<Binary>, Serializable {
@Override
public void writeTo(OutputStream out) throws IOException {
// TODO: should not have to materialize those bytes
- out.write(getBytes());
+ out.write(getBytesUnsafe());
}
@Override
@@ -258,16 +360,29 @@ abstract public class Binary implements Comparable<Binary>, Serializable {
value.mark();
value.get(bytes).reset();
+ if (!isBackingBytesReused) { // backing buffer might change
+ cachedBytes = bytes;
+ }
return bytes;
}
@Override
+ public byte[] getBytesUnsafe() {
+ return cachedBytes != null ? cachedBytes : getBytes();
+ }
+
+ @Override
+ public Binary slice(int start, int length) {
+ return Binary.fromConstantByteArray(getBytesUnsafe(), start, length);
+ }
+
+ @Override
public int hashCode() {
if (value.hasArray()) {
return Binary.hashCode(value.array(), value.arrayOffset() + value.position(),
value.arrayOffset() + value.remaining());
}
- byte[] bytes = getBytes();
+ byte[] bytes = getBytesUnsafe();
return Binary.hashCode(bytes, 0, bytes.length);
}
@@ -277,7 +392,7 @@ abstract public class Binary implements Comparable<Binary>, Serializable {
return other.equals(value.array(), value.arrayOffset() + value.position(),
value.arrayOffset() + value.remaining());
}
- byte[] bytes = getBytes();
+ byte[] bytes = getBytesUnsafe();
return other.equals(bytes, 0, bytes.length);
}
@@ -287,7 +402,7 @@ abstract public class Binary implements Comparable<Binary>, Serializable {
return Binary.equals(value.array(), value.arrayOffset() + value.position(),
value.arrayOffset() + value.remaining(), other, otherOffset, otherLength);
}
- byte[] bytes = getBytes();
+ byte[] bytes = getBytesUnsafe();
return Binary.equals(bytes, 0, bytes.length, other, otherOffset, otherLength);
}
@@ -297,7 +412,7 @@ abstract public class Binary implements Comparable<Binary>, Serializable {
return other.compareTo(value.array(), value.arrayOffset() + value.position(),
value.arrayOffset() + value.remaining());
}
- byte[] bytes = getBytes();
+ byte[] bytes = getBytesUnsafe();
return other.compareTo(bytes, 0, bytes.length);
}
@@ -307,7 +422,7 @@ abstract public class Binary implements Comparable<Binary>, Serializable {
return Binary.compareTwoByteArrays(value.array(), value.arrayOffset() + value.position(),
value.arrayOffset() + value.remaining(), other, otherOffset, otherLength);
}
- byte[] bytes = getBytes();
+ byte[] bytes = getBytesUnsafe();
return Binary.compareTwoByteArrays(bytes, 0, bytes.length, other, otherOffset, otherLength);
}
@@ -319,11 +434,11 @@ abstract public class Binary implements Comparable<Binary>, Serializable {
@Override
public void writeTo(DataOutput out) throws IOException {
// TODO: should not have to materialize those bytes
- out.write(getBytes());
+ out.write(getBytesUnsafe());
}
private void writeObject(java.io.ObjectOutputStream out) throws IOException {
- byte[] bytes = getBytes();
+ byte[] bytes = getBytesUnsafe();
out.writeInt(bytes.length);
out.write(bytes);
}
@@ -341,16 +456,24 @@ abstract public class Binary implements Comparable<Binary>, Serializable {
}
+ public static Binary fromReusedByteBuffer(final ByteBuffer value) {
+ return new ByteBufferBackedBinary(value, true);
+ }
+
+ public static Binary fromConstantByteBuffer(final ByteBuffer value) {
+ return new ByteBufferBackedBinary(value, false);
+ }
+
+ @Deprecated
+ /**
+ * @deprecated Use @link{fromReusedByteBuffer} or @link{fromConstantByteBuffer} instead
+ */
public static Binary fromByteBuffer(final ByteBuffer value) {
- return new ByteBufferBackedBinary(value);
+ return fromReusedByteBuffer(value); // Assume producer intends to reuse byte[]
}
public static Binary fromString(final String value) {
- try {
- return new FromStringBinary(value.getBytes("UTF-8"));
- } catch (UnsupportedEncodingException e) {
- throw new ParquetEncodingException("UTF-8 not supported.", e);
- }
+ return new FromStringBinary(value);
}
/**
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/e3b95020/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveType.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveType.java b/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveType.java
index e8d98c0..7988f4a 100644
--- a/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveType.java
+++ b/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveType.java
@@ -204,7 +204,7 @@ public final class PrimitiveType extends Type {
INT96("getBinary", Binary.class) {
@Override
public String toString(ColumnReader columnReader) {
- return Arrays.toString(columnReader.getBinary().getBytes());
+ return Arrays.toString(columnReader.getBinary().getBytesUnsafe());
}
@Override
public void addValueToRecordConsumer(RecordConsumer recordConsumer,
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/e3b95020/parquet-column/src/test/java/org/apache/parquet/CorruptStatisticsTest.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/CorruptStatisticsTest.java b/parquet-column/src/test/java/org/apache/parquet/CorruptStatisticsTest.java
new file mode 100644
index 0000000..084d63a
--- /dev/null
+++ b/parquet-column/src/test/java/org/apache/parquet/CorruptStatisticsTest.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet;
+
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
+import org.junit.Test;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class CorruptStatisticsTest {
+
+ @Test
+ public void testOnlyAppliesToBinary() {
+ assertTrue(CorruptStatistics.shouldIgnoreStatistics("parquet-mr version 1.6.0 (build abcd)", PrimitiveTypeName.BINARY));
+ assertTrue(CorruptStatistics.shouldIgnoreStatistics("parquet-mr version 1.6.0 (build abcd)", PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY));
+ assertFalse(CorruptStatistics.shouldIgnoreStatistics("parquet-mr version 1.6.0 (build abcd)", PrimitiveTypeName.DOUBLE));
+ }
+
+ @Test
+ public void testCorruptStatistics() {
+ assertTrue(CorruptStatistics.shouldIgnoreStatistics("parquet-mr version 1.6.0 (build abcd)", PrimitiveTypeName.BINARY));
+ assertTrue(CorruptStatistics.shouldIgnoreStatistics("parquet-mr version 1.4.2 (build abcd)", PrimitiveTypeName.BINARY));
+ assertTrue(CorruptStatistics.shouldIgnoreStatistics("parquet-mr version 1.6.100 (build abcd)", PrimitiveTypeName.BINARY));
+ assertTrue(CorruptStatistics.shouldIgnoreStatistics("parquet-mr version 1.7.999 (build abcd)", PrimitiveTypeName.BINARY));
+ assertTrue(CorruptStatistics.shouldIgnoreStatistics("parquet-mr version 1.6.22rc99 (build abcd)", PrimitiveTypeName.BINARY));
+ assertTrue(CorruptStatistics.shouldIgnoreStatistics("parquet-mr version 1.6.22rc99-SNAPSHOT (build abcd)", PrimitiveTypeName.BINARY));
+ assertTrue(CorruptStatistics.shouldIgnoreStatistics("parquet-mr version 1.6.1-SNAPSHOT (build abcd)", PrimitiveTypeName.BINARY));
+ assertTrue(CorruptStatistics.shouldIgnoreStatistics("parquet-mr version 1.6.0t-01-abcdefg (build abcd)", PrimitiveTypeName.BINARY));
+
+ assertTrue(CorruptStatistics.shouldIgnoreStatistics("unparseable string", PrimitiveTypeName.BINARY));
+
+ // missing semver
+ assertTrue(CorruptStatistics.shouldIgnoreStatistics("parquet-mr version (build abcd)", PrimitiveTypeName.BINARY));
+ assertTrue(CorruptStatistics.shouldIgnoreStatistics("parquet-mr version (build abcd)", PrimitiveTypeName.BINARY));
+
+ // missing build hash
+ assertTrue(CorruptStatistics.shouldIgnoreStatistics("parquet-mr version 1.6.0 (build )", PrimitiveTypeName.BINARY));
+ assertTrue(CorruptStatistics.shouldIgnoreStatistics("parquet-mr version 1.6.0 (build)", PrimitiveTypeName.BINARY));
+ assertTrue(CorruptStatistics.shouldIgnoreStatistics("parquet-mr version (build)", PrimitiveTypeName.BINARY));
+
+ assertFalse(CorruptStatistics.shouldIgnoreStatistics("imapla version 1.6.0 (build abcd)", PrimitiveTypeName.BINARY));
+ assertFalse(CorruptStatistics.shouldIgnoreStatistics("imapla version 1.10.0 (build abcd)", PrimitiveTypeName.BINARY));
+ assertFalse(CorruptStatistics.shouldIgnoreStatistics("parquet-mr version 1.8.0 (build abcd)", PrimitiveTypeName.BINARY));
+ assertFalse(CorruptStatistics.shouldIgnoreStatistics("parquet-mr version 1.8.1 (build abcd)", PrimitiveTypeName.BINARY));
+ assertFalse(CorruptStatistics.shouldIgnoreStatistics("parquet-mr version 1.8.1rc3 (build abcd)", PrimitiveTypeName.BINARY));
+ assertFalse(CorruptStatistics.shouldIgnoreStatistics("parquet-mr version 1.8.1rc3-SNAPSHOT (build abcd)", PrimitiveTypeName.BINARY));
+ assertFalse(CorruptStatistics.shouldIgnoreStatistics("parquet-mr version 1.9.0 (build abcd)", PrimitiveTypeName.BINARY));
+ assertFalse(CorruptStatistics.shouldIgnoreStatistics("parquet-mr version 2.0.0 (build abcd)", PrimitiveTypeName.BINARY));
+ assertFalse(CorruptStatistics.shouldIgnoreStatistics("parquet-mr version 1.9.0t-01-abcdefg (build abcd)", PrimitiveTypeName.BINARY));
+
+ // missing semver
+ assertFalse(CorruptStatistics.shouldIgnoreStatistics("impala version (build abcd)", PrimitiveTypeName.BINARY));
+ assertFalse(CorruptStatistics.shouldIgnoreStatistics("impala version (build abcd)", PrimitiveTypeName.BINARY));
+
+ // missing build hash
+ assertFalse(CorruptStatistics.shouldIgnoreStatistics("impala version 1.6.0 (build )", PrimitiveTypeName.BINARY));
+ assertFalse(CorruptStatistics.shouldIgnoreStatistics("impala version 1.6.0 (build)", PrimitiveTypeName.BINARY));
+ assertFalse(CorruptStatistics.shouldIgnoreStatistics("impala version (build)", PrimitiveTypeName.BINARY));
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/e3b95020/parquet-column/src/test/java/org/apache/parquet/column/statistics/TestStatistics.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/statistics/TestStatistics.java b/parquet-column/src/test/java/org/apache/parquet/column/statistics/TestStatistics.java
index e0c6a10..128acb4 100644
--- a/parquet-column/src/test/java/org/apache/parquet/column/statistics/TestStatistics.java
+++ b/parquet-column/src/test/java/org/apache/parquet/column/statistics/TestStatistics.java
@@ -384,6 +384,24 @@ public class TestStatistics {
}
@Test
+ public void testBinaryMinMaxForReusedBackingByteArray() {
+ BinaryStatistics stats = new BinaryStatistics();
+
+ byte[] bytes = new byte[] { 10 };
+ final Binary value = Binary.fromReusedByteArray(bytes);
+ stats.updateStats(value);
+
+ bytes[0] = 20;
+ stats.updateStats(value);
+
+ bytes[0] = 15;
+ stats.updateStats(value);
+
+ assertArrayEquals(new byte[] { 20 }, stats.getMaxBytes());
+ assertArrayEquals(new byte[] { 10 }, stats.getMinBytes());
+ }
+
+ @Test
public void testMergingStatistics() {
testMergingIntStats();
testMergingLongStats();
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/e3b95020/parquet-column/src/test/java/org/apache/parquet/column/values/dictionary/TestDictionary.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/dictionary/TestDictionary.java b/parquet-column/src/test/java/org/apache/parquet/column/values/dictionary/TestDictionary.java
index e60b3ec..020868e 100644
--- a/parquet-column/src/test/java/org/apache/parquet/column/values/dictionary/TestDictionary.java
+++ b/parquet-column/src/test/java/org/apache/parquet/column/values/dictionary/TestDictionary.java
@@ -27,6 +27,7 @@ import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FLOAT;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
import java.io.IOException;
+import java.io.UnsupportedEncodingException;
import org.junit.Assert;
import org.junit.Test;
@@ -512,11 +513,12 @@ public class TestDictionary {
}
}
- private void writeRepeatedWithReuse(int COUNT, ValuesWriter cw, String prefix) {
- Binary reused = Binary.fromString(prefix + "0");
+ private void writeRepeatedWithReuse(int COUNT, ValuesWriter cw,
+ String prefix) throws UnsupportedEncodingException {
+ Binary reused = Binary.fromReusedByteArray((prefix + "0").getBytes("UTF-8"));
for (int i = 0; i < COUNT; i++) {
Binary content = Binary.fromString(prefix + i % 10);
- System.arraycopy(content.getBytes(), 0, reused.getBytes(), 0, reused.length());
+ System.arraycopy(content.getBytesUnsafe(), 0, reused.getBytesUnsafe(), 0, reused.length());
cw.writeBytes(reused);
}
}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/e3b95020/parquet-column/src/test/java/org/apache/parquet/io/api/TestBinary.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/io/api/TestBinary.java b/parquet-column/src/test/java/org/apache/parquet/io/api/TestBinary.java
new file mode 100644
index 0000000..bd8a69d
--- /dev/null
+++ b/parquet-column/src/test/java/org/apache/parquet/io/api/TestBinary.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.io.api;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+import org.apache.parquet.io.api.TestBinary.BinaryFactory.BinaryAndOriginal;
+import org.junit.Test;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+
+public class TestBinary {
+
+ private static final String testString = "test-123";
+ private static final String UTF8 = "UTF-8";
+
+ static interface BinaryFactory {
+ static class BinaryAndOriginal {
+ public Binary binary;
+ public byte[] original;
+
+ public BinaryAndOriginal(Binary binary, byte[] original) {
+ this.binary = binary;
+ this.original = original;
+ }
+ }
+
+ BinaryAndOriginal get(byte[] bytes, boolean reused) throws Exception;
+ }
+
+ private static void mutate(byte[] bytes) {
+ for (int i = 0; i < bytes.length; i++) {
+ bytes[i] = (byte) (bytes[i] + 1);
+ }
+ }
+
+ private static final BinaryFactory BYTE_ARRAY_BACKED_BF = new BinaryFactory() {
+ @Override
+ public BinaryAndOriginal get(byte[] bytes, boolean reused) throws Exception {
+ byte[] orig = Arrays.copyOf(bytes, bytes.length);
+ if (reused) {
+ return new BinaryAndOriginal(Binary.fromReusedByteArray(orig), orig);
+ } else {
+ return new BinaryAndOriginal(Binary.fromConstantByteArray(orig), orig);
+ }
+ }
+ };
+
+ private static final BinaryFactory BYTE_ARRAY_SLICE_BACKED_BF = new BinaryFactory() {
+ @Override
+ public BinaryAndOriginal get(byte[] bytes, boolean reused) throws Exception {
+ byte [] orig = padded(bytes);
+ Binary b;
+ if (reused) {
+ b = Binary.fromReusedByteArray(orig, 5, bytes.length);
+ } else {
+ b = Binary.fromConstantByteArray(orig, 5, bytes.length);
+ }
+ assertArrayEquals(bytes, b.getBytes());
+ return new BinaryAndOriginal(b, orig);
+ }
+ };
+
+ private static final BinaryFactory BUFFER_BF = new BinaryFactory() {
+ @Override
+ public BinaryAndOriginal get(byte[] bytes, boolean reused) throws Exception {
+ byte [] orig = padded(bytes);
+ ByteBuffer buff = ByteBuffer.wrap(orig, 5, bytes.length);
+ Binary b;
+
+ if (reused) {
+ b = Binary.fromReusedByteBuffer(buff);
+ } else {
+ b = Binary.fromConstantByteBuffer(buff);
+ }
+
+ buff.mark();
+ assertArrayEquals(bytes, b.getBytes());
+ buff.reset();
+ return new BinaryAndOriginal(b, orig);
+ }
+ };
+
+ private static final BinaryFactory STRING_BF = new BinaryFactory() {
+ @Override
+ public BinaryAndOriginal get(byte[] bytes, boolean reused) throws Exception {
+ Binary b = Binary.fromString(new String(bytes, UTF8));
+ return new BinaryAndOriginal(b, b.getBytesUnsafe()); // only way to get underlying bytes for testing
+ }
+ };
+
+ private static byte[] padded(byte[] bytes) {
+ byte[] padded = new byte[bytes.length + 10];
+
+ for (int i = 0; i < 5; i++) {
+ padded[i] = (byte) i;
+ }
+
+ System.arraycopy(bytes, 0, padded, 5, bytes.length);
+
+ for (int i = 0; i < 5; i++) {
+ padded[i + 5 + bytes.length] = (byte) i;
+ }
+
+ return padded;
+ }
+
+ @Test
+ public void testByteArrayBackedBinary() throws Exception {
+ testBinary(BYTE_ARRAY_BACKED_BF, true);
+ testBinary(BYTE_ARRAY_BACKED_BF, false);
+ }
+
+ @Test
+ public void testByteArraySliceBackedBinary() throws Exception {
+ testBinary(BYTE_ARRAY_SLICE_BACKED_BF, true);
+ testBinary(BYTE_ARRAY_SLICE_BACKED_BF, false);
+ }
+
+ @Test
+ public void testByteBufferBackedBinary() throws Exception {
+ testBinary(BUFFER_BF, true);
+ testBinary(BUFFER_BF, false);
+ }
+
+ @Test
+ public void testFromStringBinary() throws Exception {
+ testBinary(STRING_BF, false);
+ }
+
+ private void testSlice(BinaryFactory bf, boolean reused) throws Exception {
+ BinaryAndOriginal bao = bf.get(testString.getBytes(UTF8), reused);
+
+ assertArrayEquals(testString.getBytes(UTF8), bao.binary.slice(0, testString.length()).getBytesUnsafe());
+ assertArrayEquals("123".getBytes(UTF8), bao.binary.slice(5, 3).getBytesUnsafe());
+ }
+
+ private void testConstantCopy(BinaryFactory bf) throws Exception {
+ BinaryAndOriginal bao = bf.get(testString.getBytes(UTF8), false);
+ assertEquals(false, bao.binary.isBackingBytesReused());
+
+ assertArrayEquals(testString.getBytes(UTF8), bao.binary.getBytes());
+ assertArrayEquals(testString.getBytes(UTF8), bao.binary.getBytesUnsafe());
+ assertArrayEquals(testString.getBytes(UTF8), bao.binary.copy().getBytesUnsafe());
+ assertArrayEquals(testString.getBytes(UTF8), bao.binary.copy().getBytes());
+
+ bao = bf.get(testString.getBytes(UTF8), false);
+ assertEquals(false, bao.binary.isBackingBytesReused());
+
+ Binary copy = bao.binary.copy();
+
+ assertSame(copy, bao.binary);
+
+ mutate(bao.original);
+
+ byte[] expected = testString.getBytes(UTF8);
+ mutate(expected);
+
+ assertArrayEquals(expected, copy.getBytes());
+ assertArrayEquals(expected, copy.getBytesUnsafe());
+ assertArrayEquals(expected, copy.copy().getBytesUnsafe());
+ assertArrayEquals(expected, copy.copy().getBytes());
+ }
+
+ private void testReusedCopy(BinaryFactory bf) throws Exception {
+ BinaryAndOriginal bao = bf.get(testString.getBytes(UTF8), true);
+ assertEquals(true, bao.binary.isBackingBytesReused());
+
+ assertArrayEquals(testString.getBytes(UTF8), bao.binary.getBytes());
+ assertArrayEquals(testString.getBytes(UTF8), bao.binary.getBytesUnsafe());
+ assertArrayEquals(testString.getBytes(UTF8), bao.binary.copy().getBytesUnsafe());
+ assertArrayEquals(testString.getBytes(UTF8), bao.binary.copy().getBytes());
+
+ bao = bf.get(testString.getBytes(UTF8), true);
+ assertEquals(true, bao.binary.isBackingBytesReused());
+
+ Binary copy = bao.binary.copy();
+ mutate(bao.original);
+
+ assertArrayEquals(testString.getBytes(UTF8), copy.getBytes());
+ assertArrayEquals(testString.getBytes(UTF8), copy.getBytesUnsafe());
+ assertArrayEquals(testString.getBytes(UTF8), copy.copy().getBytesUnsafe());
+ assertArrayEquals(testString.getBytes(UTF8), copy.copy().getBytes());
+ }
+
+ private void testBinary(BinaryFactory bf, boolean reused) throws Exception {
+ testSlice(bf, reused);
+
+ if (reused) {
+ testReusedCopy(bf);
+ } else {
+ testConstantCopy(bf);
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/e3b95020/parquet-common/src/main/java/org/apache/parquet/SemanticVersion.java
----------------------------------------------------------------------
diff --git a/parquet-common/src/main/java/org/apache/parquet/SemanticVersion.java b/parquet-common/src/main/java/org/apache/parquet/SemanticVersion.java
new file mode 100644
index 0000000..feee8de
--- /dev/null
+++ b/parquet-common/src/main/java/org/apache/parquet/SemanticVersion.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet;
+
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * Very basic semver parser, only pays attention to major, minor, and patch numbers.
+ * Attempts to do a little bit of validation that the version string is valid, but
+ * is not a full implementation of the semver spec.
+ *
+ * NOTE: compareTo only respects major, minor, and patch (ignores rc numbers, SNAPSHOT, etc)
+ */
+public final class SemanticVersion implements Comparable<SemanticVersion> {
+ // (major).(minor).(patch)[(rc)(rcnum)]?(-(SNAPSHOT))?
+ private static final String FORMAT = "^(\\d+)\\.(\\d+)\\.(\\d+)((.*)(\\d+))?(\\-(.*))?$";
+ private static final Pattern PATTERN = Pattern.compile(FORMAT);
+
+ public final int major;
+ public final int minor;
+ public final int patch;
+
+ public SemanticVersion(int major, int minor, int patch) {
+ Preconditions.checkArgument(major >= 0, "major must be >= 0");
+ Preconditions.checkArgument(minor >= 0, "minor must be >= 0");
+ Preconditions.checkArgument(patch >= 0, "patch must be >= 0");
+
+ this.major = major;
+ this.minor = minor;
+ this.patch = patch;
+ }
+
+ public static SemanticVersion parse(String version) throws SemanticVersionParseException {
+ Matcher matcher = PATTERN.matcher(version);
+
+ if (!matcher.matches()) {
+ throw new SemanticVersionParseException("" + version + " does not match format " + FORMAT);
+ }
+
+ final int major;
+ final int minor;
+ final int patch;
+
+ try {
+ major = Integer.valueOf(matcher.group(1));
+ minor = Integer.valueOf(matcher.group(2));
+ patch = Integer.valueOf(matcher.group(3));
+ } catch (NumberFormatException e) {
+ throw new SemanticVersionParseException(e);
+ }
+
+ if (major < 0 || minor < 0 || patch < 0) {
+ throw new SemanticVersionParseException(
+ String.format("major(%d), minor(%d), and patch(%d) must all be >= 0", major, minor, patch));
+ }
+
+ return new SemanticVersion(major, minor, patch);
+ }
+
+ @Override
+ public int compareTo(SemanticVersion o) {
+ int cmp;
+
+ cmp = Integer.compare(major, o.major);
+ if (cmp != 0) {
+ return cmp;
+ }
+
+ cmp = Integer.compare(minor, o.minor);
+ if (cmp != 0) {
+ return cmp;
+ }
+
+ return Integer.compare(patch, o.patch);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ SemanticVersion that = (SemanticVersion) o;
+ return compareTo(that) == 0;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = major;
+ result = 31 * result + minor;
+ result = 31 * result + patch;
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return major + "." + minor + "." + patch;
+ }
+
+ public static class SemanticVersionParseException extends Exception {
+ public SemanticVersionParseException() {
+ super();
+ }
+
+ public SemanticVersionParseException(String message) {
+ super(message);
+ }
+
+ public SemanticVersionParseException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public SemanticVersionParseException(Throwable cause) {
+ super(cause);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/e3b95020/parquet-common/src/main/java/org/apache/parquet/VersionParser.java
----------------------------------------------------------------------
diff --git a/parquet-common/src/main/java/org/apache/parquet/VersionParser.java b/parquet-common/src/main/java/org/apache/parquet/VersionParser.java
new file mode 100644
index 0000000..a0c6c3d
--- /dev/null
+++ b/parquet-common/src/main/java/org/apache/parquet/VersionParser.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet;
+
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static org.apache.parquet.Preconditions.checkArgument;
+
+/**
+ * Parses a parquet Version string
+ * Tolerates missing semver and buildhash
+ * (semver and build hash may be null)
+ */
+public class VersionParser {
+ // example: parquet-mr version 1.8.0rc2-SNAPSHOT (build ddb469afac70404ea63b72ed2f07a911a8592ff7)
+ public static final String FORMAT = "(.+) version ((.*) )?\\(build ?(.*)\\)";
+ public static final Pattern PATTERN = Pattern.compile(FORMAT);
+
+ public static class ParsedVersion {
+ public final String application;
+ public final String semver;
+ public final String appBuildHash;
+
+ public ParsedVersion(String application, String semver, String appBuildHash) {
+ checkArgument(!Strings.isNullOrEmpty(application), "application cannot be null or empty");
+ this.application = application;
+ this.semver = Strings.isNullOrEmpty(semver) ? null : semver;
+ this.appBuildHash = Strings.isNullOrEmpty(appBuildHash) ? null : appBuildHash;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ ParsedVersion version = (ParsedVersion) o;
+
+ if (appBuildHash != null ? !appBuildHash.equals(version.appBuildHash) : version.appBuildHash != null)
+ return false;
+ if (application != null ? !application.equals(version.application) : version.application != null) return false;
+ if (semver != null ? !semver.equals(version.semver) : version.semver != null) return false;
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = application != null ? application.hashCode() : 0;
+ result = 31 * result + (semver != null ? semver.hashCode() : 0);
+ result = 31 * result + (appBuildHash != null ? appBuildHash.hashCode() : 0);
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return "ParsedVersion(" +
+ "application=" + application +
+ ", semver=" + semver +
+ ", appBuildHash=" + appBuildHash +
+ ')';
+ }
+ }
+
+ public static ParsedVersion parse(String createdBy) throws VersionParseException {
+ Matcher matcher = PATTERN.matcher(createdBy);
+
+ if(!matcher.matches()){
+ throw new VersionParseException("Could not parse created_by: " + createdBy + " using format: " + FORMAT);
+ }
+
+ String application = matcher.group(1);
+ String semver = matcher.group(3);
+ String appBuildHash = matcher.group(4);
+
+ if (Strings.isNullOrEmpty(application)) {
+ throw new VersionParseException("application cannot be null or empty");
+ }
+
+ return new ParsedVersion(application, semver, appBuildHash);
+ }
+
+ public static class VersionParseException extends Exception {
+ public VersionParseException(String message) {
+ super(message);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/e3b95020/parquet-common/src/test/java/org/apache/parquet/SemanticVersionTest.java
----------------------------------------------------------------------
diff --git a/parquet-common/src/test/java/org/apache/parquet/SemanticVersionTest.java b/parquet-common/src/test/java/org/apache/parquet/SemanticVersionTest.java
new file mode 100644
index 0000000..6a1c47f
--- /dev/null
+++ b/parquet-common/src/test/java/org/apache/parquet/SemanticVersionTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class SemanticVersionTest {
+ @Test
+ public void testCompare() {
+ assertTrue(new SemanticVersion(1, 8, 1).compareTo(new SemanticVersion(1, 8, 1)) == 0);
+ assertTrue(new SemanticVersion(1, 8, 0).compareTo(new SemanticVersion(1, 8, 1)) < 0);
+ assertTrue(new SemanticVersion(1, 8, 2).compareTo(new SemanticVersion(1, 8, 1)) > 0);
+
+ assertTrue(new SemanticVersion(1, 8, 1).compareTo(new SemanticVersion(1, 8, 1)) == 0);
+ assertTrue(new SemanticVersion(1, 8, 0).compareTo(new SemanticVersion(1, 8, 1)) < 0);
+ assertTrue(new SemanticVersion(1, 8, 2).compareTo(new SemanticVersion(1, 8, 1)) > 0);
+
+ assertTrue(new SemanticVersion(1, 7, 0).compareTo(new SemanticVersion(1, 8, 0)) < 0);
+ assertTrue(new SemanticVersion(1, 9, 0).compareTo(new SemanticVersion(1, 8, 0)) > 0);
+
+ assertTrue(new SemanticVersion(0, 0, 0).compareTo(new SemanticVersion(1, 0, 0)) < 0);
+ assertTrue(new SemanticVersion(2, 0, 0).compareTo(new SemanticVersion(1, 0, 0)) > 0);
+
+ assertTrue(new SemanticVersion(1, 8, 100).compareTo(new SemanticVersion(1, 9, 0)) < 0);
+ }
+
+ @Test
+ public void testParse() throws Exception {
+ assertEquals(new SemanticVersion(1, 8, 0), SemanticVersion.parse("1.8.0"));
+ assertEquals(new SemanticVersion(1, 8, 0), SemanticVersion.parse("1.8.0rc3"));
+ assertEquals(new SemanticVersion(1, 8, 0), SemanticVersion.parse("1.8.0rc3-SNAPSHOT"));
+ assertEquals(new SemanticVersion(1, 8, 0), SemanticVersion.parse("1.8.0-SNAPSHOT"));
+ }
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/e3b95020/parquet-common/src/test/java/org/apache/parquet/VersionTest.java
----------------------------------------------------------------------
diff --git a/parquet-common/src/test/java/org/apache/parquet/VersionTest.java b/parquet-common/src/test/java/org/apache/parquet/VersionTest.java
index e68b2f2..3720007 100644
--- a/parquet-common/src/test/java/org/apache/parquet/VersionTest.java
+++ b/parquet-common/src/test/java/org/apache/parquet/VersionTest.java
@@ -21,11 +21,14 @@ package org.apache.parquet;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import org.apache.parquet.VersionParser.ParsedVersion;
+import org.apache.parquet.VersionParser.VersionParseException;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
/**
* This test doesn't do much, but it makes sure that the Version class
@@ -49,14 +52,37 @@ public class VersionTest {
}
@Test
- public void testFullVersion() {
- // example: parquet-mr version 1.8.0rc2-SNAPSHOT (build ddb469afac70404ea63b72ed2f07a911a8592ff7)
- String regex = "parquet-mr version (.*) \\(build (.*)\\)";
- Pattern pattern = Pattern.compile(regex);
- Matcher m = pattern.matcher(Version.FULL_VERSION);
- assertTrue(Version.FULL_VERSION + " did not match " + pattern, m.matches());
- assertVersionValid(m.group(1));
- assertEquals(Version.VERSION_NUMBER, m.group(1));
- assertFalse(m.group(2).isEmpty());
+ public void testFullVersion() throws Exception {
+ ParsedVersion version = VersionParser.parse(Version.FULL_VERSION);
+
+ assertVersionValid(version.semver);
+ assertEquals(Version.VERSION_NUMBER, version.semver);
+ assertEquals("parquet-mr", version.application);
+ }
+
+ @Test
+ public void testVersionParser() throws Exception {
+ assertEquals(new ParsedVersion("parquet-mr", "1.6.0", "abcd"),
+ VersionParser.parse("parquet-mr version 1.6.0 (build abcd)"));
+
+ assertEquals(new ParsedVersion("parquet-mr", "1.6.22rc99-SNAPSHOT", "abcd"),
+ VersionParser.parse("parquet-mr version 1.6.22rc99-SNAPSHOT (build abcd)"));
+
+ try {
+ VersionParser.parse("unparseable string");
+ fail("this should throw");
+ } catch (VersionParseException e) {
+ //
+ }
+
+ // missing semver
+ assertEquals(new ParsedVersion("parquet-mr", null, "abcd"), VersionParser.parse("parquet-mr version (build abcd)"));
+ assertEquals(new ParsedVersion("parquet-mr", null, "abcd"), VersionParser.parse("parquet-mr version (build abcd)"));
+
+ // missing build hash
+ assertEquals(new ParsedVersion("parquet-mr", "1.6.0", null), VersionParser.parse("parquet-mr version 1.6.0 (build )"));
+ assertEquals(new ParsedVersion("parquet-mr", "1.6.0", null), VersionParser.parse("parquet-mr version 1.6.0 (build)"));
+ assertEquals(new ParsedVersion("parquet-mr", null, null), VersionParser.parse("parquet-mr version (build)"));
+ assertEquals(new ParsedVersion("parquet-mr", null, null), VersionParser.parse("parquet-mr version (build )"));
}
}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/e3b95020/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
index 1481783..420d97e 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
@@ -36,6 +36,7 @@ import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import org.apache.parquet.CorruptStatistics;
import org.apache.parquet.Log;
import org.apache.parquet.hadoop.metadata.ColumnPath;
import org.apache.parquet.format.ColumnChunk;
@@ -232,23 +233,35 @@ public class ParquetMetadataConverter {
return Encoding.valueOf(encoding.name());
}
- public static Statistics toParquetStatistics(org.apache.parquet.column.statistics.Statistics statistics) {
+ public static Statistics toParquetStatistics(
+ org.apache.parquet.column.statistics.Statistics statistics) {
Statistics stats = new Statistics();
if (!statistics.isEmpty()) {
stats.setNull_count(statistics.getNumNulls());
- if(statistics.hasNonNullValue()) {
+ if (statistics.hasNonNullValue()) {
stats.setMax(statistics.getMaxBytes());
stats.setMin(statistics.getMinBytes());
- }
+ }
}
return stats;
}
-
+ /**
+ * @deprecated Replaced by {@link #fromParquetStatistics(
+ * String createdBy, Statistics statistics, PrimitiveTypeName type)}
+ */
+ @Deprecated
public static org.apache.parquet.column.statistics.Statistics fromParquetStatistics(Statistics statistics, PrimitiveTypeName type) {
+ return fromParquetStatistics(null, statistics, type);
+ }
+
+ public static org.apache.parquet.column.statistics.Statistics fromParquetStatistics
+ (String createdBy, Statistics statistics, PrimitiveTypeName type) {
// create stats object based on the column type
org.apache.parquet.column.statistics.Statistics stats = org.apache.parquet.column.statistics.Statistics.getStatsBasedOnType(type);
// If there was no statistics written to the footer, create an empty Statistics object and return
- if (statistics != null) {
+
+ // NOTE: See docs in CorruptStatistics for explanation of why this check is needed
+ if (statistics != null && !CorruptStatistics.shouldIgnoreStatistics(createdBy, type)) {
if (statistics.isSetMax() && statistics.isSetMin()) {
stats.setMinMaxFromBytes(statistics.min.array(), statistics.max.array());
}
@@ -517,10 +530,12 @@ public class ParquetMetadataConverter {
public FileMetaData visit(NoFilter filter) throws IOException {
return readFileMetaData(from);
}
+
@Override
public FileMetaData visit(SkipMetadataFilter filter) throws IOException {
return readFileMetaData(from, true);
}
+
@Override
public FileMetaData visit(RangeMetadataFilter filter) throws IOException {
return filterFileMetaData(readFileMetaData(from), filter);
@@ -555,7 +570,10 @@ public class ParquetMetadataConverter {
messageType.getType(path.toArray()).asPrimitiveType().getPrimitiveTypeName(),
CompressionCodecName.fromParquet(metaData.codec),
fromFormatEncodings(metaData.encodings),
- fromParquetStatistics(metaData.statistics, messageType.getType(path.toArray()).asPrimitiveType().getPrimitiveTypeName()),
+ fromParquetStatistics(
+ parquetMetadata.getCreated_by(),
+ metaData.statistics,
+ messageType.getType(path.toArray()).asPrimitiveType().getPrimitiveTypeName()),
metaData.data_page_offset,
metaData.dictionary_page_offset,
metaData.num_values,
@@ -672,7 +690,10 @@ public class ParquetMetadataConverter {
org.apache.parquet.column.Encoding dlEncoding,
org.apache.parquet.column.Encoding valuesEncoding,
OutputStream to) throws IOException {
- writePageHeader(newDataPageHeader(uncompressedSize, compressedSize, valueCount, statistics, rlEncoding, dlEncoding, valuesEncoding), to);
+ writePageHeader(
+ newDataPageHeader(uncompressedSize, compressedSize, valueCount, statistics,
+ rlEncoding, dlEncoding, valuesEncoding),
+ to);
}
private static PageHeader newDataPageHeader(
@@ -690,7 +711,8 @@ public class ParquetMetadataConverter {
getEncoding(dlEncoding),
getEncoding(rlEncoding)));
if (!statistics.isEmpty()) {
- pageHeader.getData_page_header().setStatistics(toParquetStatistics(statistics));
+ pageHeader.getData_page_header().setStatistics(
+ toParquetStatistics(statistics));
}
return pageHeader;
}
@@ -723,7 +745,8 @@ public class ParquetMetadataConverter {
getEncoding(dataEncoding),
dlByteLength, rlByteLength);
if (!statistics.isEmpty()) {
- dataPageHeaderV2.setStatistics(toParquetStatistics(statistics));
+ dataPageHeaderV2.setStatistics(
+ toParquetStatistics(statistics));
}
PageHeader pageHeader = new PageHeader(PageType.DATA_PAGE_V2, uncompressedSize, compressedSize);
pageHeader.setData_page_header_v2(dataPageHeaderV2);
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/e3b95020/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java
index 6ff4eac..ce8c287 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java
@@ -38,6 +38,7 @@ import org.apache.parquet.filter2.compat.FilterCompat.Filter;
import org.apache.parquet.hadoop.api.InitContext;
import org.apache.parquet.hadoop.api.ReadSupport;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.FileMetaData;
import org.apache.parquet.hadoop.util.counters.BenchmarkCounter;
import org.apache.parquet.io.ColumnIOFactory;
import org.apache.parquet.io.MessageColumnIO;
@@ -163,10 +164,11 @@ class InternalParquetRecordReader<T> {
}
public void initialize(MessageType fileSchema,
- Map<String, String> fileMetadata,
+ FileMetaData parquetFileMetadata,
Path file, List<BlockMetaData> blocks, Configuration configuration)
throws IOException {
// initialize a ReadContext for this file
+ Map<String, String> fileMetadata = parquetFileMetadata.getKeyValueMetaData();
ReadSupport.ReadContext readContext = readSupport.init(new InitContext(
configuration, toSetMultiMap(fileMetadata), fileSchema));
this.requestedSchema = readContext.getRequestedSchema();
@@ -177,7 +179,7 @@ class InternalParquetRecordReader<T> {
configuration, fileMetadata, fileSchema, readContext);
this.strictTypeChecking = configuration.getBoolean(STRICT_TYPE_CHECKING, true);
List<ColumnDescriptor> columns = requestedSchema.getColumns();
- reader = new ParquetFileReader(configuration, file, blocks, columns);
+ reader = new ParquetFileReader(configuration, parquetFileMetadata, file, blocks, columns);
for (BlockMetaData block : blocks) {
total += block.getRowCount();
}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/e3b95020/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
index 7f97b19..19370cd 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
@@ -73,6 +73,7 @@ import org.apache.parquet.hadoop.CodecFactory.BytesDecompressor;
import org.apache.parquet.hadoop.ColumnChunkPageReadStore.ColumnChunkPageReader;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.FileMetaData;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.hadoop.util.HiddenFileFilter;
import org.apache.parquet.hadoop.util.counters.BenchmarkCounter;
@@ -435,18 +436,33 @@ public class ParquetFileReader implements Closeable {
private final List<BlockMetaData> blocks;
private final FSDataInputStream f;
private final Path filePath;
- private int currentBlock = 0;
private final Map<ColumnPath, ColumnDescriptor> paths = new HashMap<ColumnPath, ColumnDescriptor>();
+ private final FileMetaData fileMetaData;
+ private final String createdBy;
+
+ private int currentBlock = 0;
+
+ /**
+ * @deprecated use @link{ParquetFileReader(Configuration configuration, FileMetaData fileMetaData,
+ * Path filePath, List<BlockMetaData> blocks, List<ColumnDescriptor> columns)} instead
+ */
+ public ParquetFileReader(Configuration configuration, Path filePath, List<BlockMetaData> blocks, List<ColumnDescriptor> columns) throws IOException {
+ this(configuration, null, filePath, blocks, columns);
+ }
/**
- * @param f the Parquet file (will be opened for read in this constructor)
+ * @param configuration the Hadoop conf
+ * @param fileMetaData fileMetaData for parquet file
* @param blocks the blocks to read
- * @param colums the columns to read (their path)
- * @param codecClassName the codec used to compress the blocks
+ * @param columns the columns to read (their path)
* @throws IOException if the file can not be opened
*/
- public ParquetFileReader(Configuration configuration, Path filePath, List<BlockMetaData> blocks, List<ColumnDescriptor> columns) throws IOException {
+ public ParquetFileReader(
+ Configuration configuration, FileMetaData fileMetaData,
+ Path filePath, List<BlockMetaData> blocks, List<ColumnDescriptor> columns) throws IOException {
this.filePath = filePath;
+ this.fileMetaData = fileMetaData;
+ this.createdBy = fileMetaData == null ? null : fileMetaData.getCreatedBy();
FileSystem fs = filePath.getFileSystem(configuration);
this.f = fs.open(filePath);
this.blocks = blocks;
@@ -456,6 +472,7 @@ public class ParquetFileReader implements Closeable {
this.codecFactory = new CodecFactory(configuration);
}
+
/**
* Reads all the columns requested from the row group at the current file position.
* @throws IOException if an error occurs while reading
@@ -566,7 +583,10 @@ public class ParquetFileReader implements Closeable {
this.readAsBytesInput(compressedPageSize),
dataHeaderV1.getNum_values(),
uncompressedPageSize,
- fromParquetStatistics(dataHeaderV1.getStatistics(), descriptor.col.getType()),
+ fromParquetStatistics(
+ createdBy,
+ dataHeaderV1.getStatistics(),
+ descriptor.col.getType()),
ParquetMetadataConverter.getEncoding(dataHeaderV1.getRepetition_level_encoding()),
ParquetMetadataConverter.getEncoding(dataHeaderV1.getDefinition_level_encoding()),
ParquetMetadataConverter.getEncoding(dataHeaderV1.getEncoding())
@@ -586,7 +606,10 @@ public class ParquetFileReader implements Closeable {
ParquetMetadataConverter.getEncoding(dataHeaderV2.getEncoding()),
this.readAsBytesInput(dataSize),
uncompressedPageSize,
- fromParquetStatistics(dataHeaderV2.getStatistics(), descriptor.col.getType()),
+ fromParquetStatistics(
+ createdBy,
+ dataHeaderV2.getStatistics(),
+ descriptor.col.getType()),
dataHeaderV2.isIs_compressed()
));
valuesCountReadSoFar += dataHeaderV2.getNum_values();
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/e3b95020/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java
index a45dde5..b23273a 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java
@@ -149,7 +149,7 @@ public class ParquetReader<T> implements Closeable {
reader = new InternalParquetRecordReader<T>(readSupport, filter);
reader.initialize(fileSchema,
- footer.getParquetMetadata().getFileMetaData().getKeyValueMetaData(),
+ footer.getParquetMetadata().getFileMetaData(),
footer.getFile(), filteredBlocks, conf);
}
}