You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by ga...@apache.org on 2019/10/18 06:37:19 UTC
[parquet-mr] branch master updated: PARQUET-1650: Implement unit
test to validate column/offset indexes (#675)
This is an automated email from the ASF dual-hosted git repository.
gabor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git
The following commit(s) were added to refs/heads/master by this push:
new 0c6a650 PARQUET-1650: Implement unit test to validate column/offset indexes (#675)
0c6a650 is described below
commit 0c6a650a01d4075775af8aecdca14af78c5e7157
Author: Gabor Szadovszky <ga...@apache.org>
AuthorDate: Fri Oct 18 08:37:12 2019 +0200
PARQUET-1650: Implement unit test to validate column/offset indexes (#675)
---
.../parquet/hadoop/ColumnIndexValidator.java | 613 +++++++++++++++++++++
.../apache/parquet/statistics/RandomValues.java | 46 +-
.../parquet/statistics/TestColumnIndexes.java | 300 ++++++++++
3 files changed, 956 insertions(+), 3 deletions(-)
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnIndexValidator.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnIndexValidator.java
new file mode 100644
index 0000000..b9cb4ab
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnIndexValidator.java
@@ -0,0 +1,613 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.hadoop;
+
+import static org.apache.parquet.hadoop.ColumnIndexValidator.Contract.MAX_ASCENDING;
+import static org.apache.parquet.hadoop.ColumnIndexValidator.Contract.MAX_DESCENDING;
+import static org.apache.parquet.hadoop.ColumnIndexValidator.Contract.MAX_GTEQ_VALUE;
+import static org.apache.parquet.hadoop.ColumnIndexValidator.Contract.MIN_ASCENDING;
+import static org.apache.parquet.hadoop.ColumnIndexValidator.Contract.MIN_DESCENDING;
+import static org.apache.parquet.hadoop.ColumnIndexValidator.Contract.MIN_LTEQ_VALUE;
+import static org.apache.parquet.hadoop.ColumnIndexValidator.Contract.NULL_COUNT_CORRECT;
+import static org.apache.parquet.hadoop.ColumnIndexValidator.Contract.NULL_PAGE_HAS_NO_MAX;
+import static org.apache.parquet.hadoop.ColumnIndexValidator.Contract.NULL_PAGE_HAS_NO_MIN;
+import static org.apache.parquet.hadoop.ColumnIndexValidator.Contract.NULL_PAGE_HAS_NO_VALUES;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Set;
+import java.util.function.Supplier;
+
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.ColumnReadStore;
+import org.apache.parquet.column.ColumnReader;
+import org.apache.parquet.column.impl.ColumnReadStoreImpl;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.example.DummyRecordConverter;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.hadoop.metadata.FileMetaData;
+import org.apache.parquet.internal.column.columnindex.BoundaryOrder;
+import org.apache.parquet.internal.column.columnindex.ColumnIndex;
+import org.apache.parquet.internal.column.columnindex.OffsetIndex;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveComparator;
+import org.apache.parquet.schema.PrimitiveStringifier;
+import org.apache.parquet.schema.PrimitiveType;
+
+public class ColumnIndexValidator {
+
+ public enum Contract {
+ MIN_LTEQ_VALUE(
+ "The min value stored in the index for the page must be less than or equal to all values in the page.\n"
+ + "Actual value in the page: %s\n"
+ + "Min value in the index: %s\n"),
+ MAX_GTEQ_VALUE(
+ "The max value stored in the index for the page must be greater than or equal to all values in the page.\n"
+ + "Actual value in the page: %s\n"
+ + "Max value in the index: %s\n"),
+ NULL_COUNT_CORRECT(
+ "The null count stored in the index for the page must be equal to the number of nulls in the page.\n"
+ + "Actual null count: %s\n"
+ + "Null count in the index: %s\n"),
+ NULL_PAGE_HAS_NO_VALUES("Only pages consisting entirely of NULL-s can be marked as a null page in the index.\n"
+ + "Actual non-null value in the page: %s"),
+ NULL_PAGE_HAS_NO_MIN("A null page shall not have a min value in the index\n"
+ + "Min value in the index: %s\n"),
+ NULL_PAGE_HAS_NO_MAX("A null page shall not have a max value in the index\n"
+ + "Max value in the index: %s\n"),
+ MIN_ASCENDING(
+ "According to the ASCENDING boundary order, the min value for a page must be greater than or equal to the min value of the previous page.\n"
+ + "Min value for the page: %s\n"
+ + "Min value for the previous page: %s\n"),
+ MAX_ASCENDING(
+ "According to the ASCENDING boundary order, the max value for a page must be greater than or equal to the max value of the previous page.\n"
+ + "Max value for the page: %s\n"
+ + "Max value for the previous page: %s\n"),
+ MIN_DESCENDING(
+ "According to the DESCENDING boundary order, the min value for a page must be less than or equal to the min value of the previous page.\n"
+ + "Min value for the page: %s\n"
+ + "Min value for the previous page: %s\n"),
+ MAX_DESCENDING(
+ "According to the DESCENDING boundary order, the max value for a page must be less than or equal to the max value of the previous page.\n"
+ + "Max value for the page: %s\n"
+ + "Max value for the previous page: %s\n");
+
+ public final String description;
+
+ Contract(String description) {
+ this.description = description;
+ }
+ }
+
+ public static class ContractViolation {
+ public ContractViolation(Contract violatedContract, String referenceValue, String offendingValue,
+ int rowGroupNumber, int columnNumber, ColumnPath columnPath, int pageNumber) {
+ this.violatedContract = violatedContract;
+ this.referenceValue = referenceValue;
+ this.offendingValue = offendingValue;
+ this.rowGroupNumber = rowGroupNumber;
+ this.columnNumber = columnNumber;
+ this.columnPath = columnPath;
+ this.pageNumber = pageNumber;
+ }
+
+ private final Contract violatedContract;
+ private final String referenceValue;
+ private final String offendingValue;
+ private final int rowGroupNumber;
+ private final int columnNumber;
+ private final ColumnPath columnPath;
+ private final int pageNumber;
+
+ @Override
+ public String toString() {
+ return String.format(
+ "Contract violation\nLocation: row group %d, column %d (\"%s\"), page %d\nViolated contract: "
+ + violatedContract.description,
+ rowGroupNumber, columnNumber, columnPath.toDotString(), pageNumber,
+ referenceValue,
+ offendingValue);
+ }
+ }
+
+ static interface StatValue extends Comparable<StatValue> {
+ int compareToValue(ColumnReader reader);
+
+ abstract class Builder {
+ final PrimitiveComparator<Binary> comparator;
+ final PrimitiveStringifier stringifier;
+
+ Builder(PrimitiveType type) {
+ comparator = type.comparator();
+ stringifier = type.stringifier();
+ }
+
+ abstract StatValue build(ByteBuffer value);
+
+ abstract String stringifyValue(ColumnReader reader);
+ }
+ }
+
+ static StatValue.Builder getBuilder(PrimitiveType type) {
+ switch (type.getPrimitiveTypeName()) {
+ case BINARY:
+ case FIXED_LEN_BYTE_ARRAY:
+ case INT96:
+ return new BinaryStatValueBuilder(type);
+ case BOOLEAN:
+ return new BooleanStatValueBuilder(type);
+ case DOUBLE:
+ return new DoubleStatValueBuilder(type);
+ case FLOAT:
+ return new FloatStatValueBuilder(type);
+ case INT32:
+ return new IntStatValueBuilder(type);
+ case INT64:
+ return new LongStatValueBuilder(type);
+ default:
+ throw new IllegalArgumentException("Unsupported type: " + type);
+ }
+ }
+
+ private static class BinaryStatValueBuilder extends StatValue.Builder {
+ private class Value implements StatValue {
+ final Binary value;
+
+ private Value(Binary value) {
+ this.value = value;
+ }
+
+ @Override
+ public int compareTo(StatValue o) {
+ return comparator.compare(value, ((Value) o).value);
+ }
+
+ @Override
+ public int compareToValue(ColumnReader reader) {
+ return comparator.compare(value, reader.getBinary());
+ }
+
+ @Override
+ public String toString() {
+ return stringifier.stringify(value);
+ }
+ }
+
+ private BinaryStatValueBuilder(PrimitiveType type) {
+ super(type);
+ }
+
+ @Override
+ StatValue build(ByteBuffer value) {
+ return new Value(Binary.fromConstantByteBuffer(value));
+ }
+
+ @Override
+ String stringifyValue(ColumnReader reader) {
+ return stringifier.stringify(reader.getBinary());
+ }
+ }
+
+ private static class BooleanStatValueBuilder extends StatValue.Builder {
+ private class Value implements StatValue {
+ final boolean value;
+
+ private Value(boolean value) {
+ this.value = value;
+ }
+
+ @Override
+ public int compareTo(StatValue o) {
+ return comparator.compare(value, ((Value) o).value);
+ }
+
+ @Override
+ public int compareToValue(ColumnReader reader) {
+ return comparator.compare(value, reader.getBoolean());
+ }
+
+ @Override
+ public String toString() {
+ return stringifier.stringify(value);
+ }
+ }
+
+ private BooleanStatValueBuilder(PrimitiveType type) {
+ super(type);
+ }
+
+ @Override
+ StatValue build(ByteBuffer value) {
+ return new Value(value.get(0) != 0);
+ }
+
+ @Override
+ String stringifyValue(ColumnReader reader) {
+ return stringifier.stringify(reader.getBoolean());
+ }
+ }
+
+ private static class DoubleStatValueBuilder extends StatValue.Builder {
+ private class Value implements StatValue {
+ final double value;
+
+ private Value(double value) {
+ this.value = value;
+ }
+
+ @Override
+ public int compareTo(StatValue o) {
+ return comparator.compare(value, ((Value) o).value);
+ }
+
+ @Override
+ public int compareToValue(ColumnReader reader) {
+ return comparator.compare(value, reader.getDouble());
+ }
+
+ @Override
+ public String toString() {
+ return stringifier.stringify(value);
+ }
+ }
+
+ private DoubleStatValueBuilder(PrimitiveType type) {
+ super(type);
+ }
+
+ @Override
+ StatValue build(ByteBuffer value) {
+ return new Value(value.getDouble(0));
+ }
+
+ @Override
+ String stringifyValue(ColumnReader reader) {
+ return stringifier.stringify(reader.getDouble());
+ }
+ }
+
+ private static class FloatStatValueBuilder extends StatValue.Builder {
+ private class Value implements StatValue {
+ final float value;
+
+ private Value(float value) {
+ this.value = value;
+ }
+
+ @Override
+ public int compareTo(StatValue o) {
+ return comparator.compare(value, ((Value) o).value);
+ }
+
+ @Override
+ public int compareToValue(ColumnReader reader) {
+ return comparator.compare(value, reader.getFloat());
+ }
+
+ @Override
+ public String toString() {
+ return stringifier.stringify(value);
+ }
+ }
+
+ private FloatStatValueBuilder(PrimitiveType type) {
+ super(type);
+ }
+
+ @Override
+ StatValue build(ByteBuffer value) {
+ return new Value(value.getFloat(0));
+ }
+
+ @Override
+ String stringifyValue(ColumnReader reader) {
+ return stringifier.stringify(reader.getFloat());
+ }
+ }
+
+ private static class IntStatValueBuilder extends StatValue.Builder {
+ private class Value implements StatValue {
+ final int value;
+
+ private Value(int value) {
+ this.value = value;
+ }
+
+ @Override
+ public int compareTo(StatValue o) {
+ return comparator.compare(value, ((Value) o).value);
+ }
+
+ @Override
+ public int compareToValue(ColumnReader reader) {
+ return comparator.compare(value, reader.getInteger());
+ }
+
+ @Override
+ public String toString() {
+ return stringifier.stringify(value);
+ }
+ }
+
+ private IntStatValueBuilder(PrimitiveType type) {
+ super(type);
+ }
+
+ @Override
+ StatValue build(ByteBuffer value) {
+ return new Value(value.getInt(0));
+ }
+
+ @Override
+ String stringifyValue(ColumnReader reader) {
+ return stringifier.stringify(reader.getInteger());
+ }
+ }
+
+ private static class LongStatValueBuilder extends StatValue.Builder {
+ private class Value implements StatValue {
+ final long value;
+
+ private Value(long value) {
+ this.value = value;
+ }
+
+ @Override
+ public int compareTo(StatValue o) {
+ return comparator.compare(value, ((Value) o).value);
+ }
+
+ @Override
+ public int compareToValue(ColumnReader reader) {
+ return comparator.compare(value, reader.getLong());
+ }
+
+ @Override
+ public String toString() {
+ return stringifier.stringify(value);
+ }
+ }
+
+ private LongStatValueBuilder(PrimitiveType type) {
+ super(type);
+ }
+
+ @Override
+ StatValue build(ByteBuffer value) {
+ return new Value(value.getLong(0));
+ }
+
+ @Override
+ String stringifyValue(ColumnReader reader) {
+ return stringifier.stringify(reader.getLong());
+ }
+ }
+
+ private static class PageValidator {
+ private final int rowGroupNumber;
+ private final int columnNumber;
+ private final ColumnPath columnPath;
+ private final int pageNumber;
+ private final int maxDefinitionLevel;
+ private final long nullCountInIndex;
+ private long nullCountActual;
+ private final boolean isNullPage;
+ private final ColumnReader columnReader;
+ private final List<ContractViolation> violations;
+ private final Set<Contract> pageViolations = EnumSet.noneOf(Contract.class);
+ private final StatValue minValue;
+ private final StatValue maxValue;
+ private final StatValue.Builder statValueBuilder;
+
+ PageValidator(
+ PrimitiveType type,
+ int rowGroupNumber,
+ int columnNumber,
+ ColumnPath columnPath,
+ int pageNumber,
+ List<ContractViolation> violations,
+ ColumnReader columnReader,
+ ByteBuffer minValue,
+ ByteBuffer maxValue,
+ ByteBuffer prevMinValue,
+ ByteBuffer prevMaxValue,
+ BoundaryOrder boundaryOrder,
+ long nullCount,
+ boolean isNullPage) {
+ this.columnReader = columnReader;
+ this.rowGroupNumber = rowGroupNumber;
+ this.columnNumber = columnNumber;
+ this.columnPath = columnPath;
+ this.pageNumber = pageNumber;
+ this.nullCountInIndex = nullCount;
+ this.nullCountActual = 0;
+ this.isNullPage = isNullPage;
+ this.maxDefinitionLevel = columnReader.getDescriptor().getMaxDefinitionLevel();
+ this.violations = violations;
+ this.statValueBuilder = getBuilder(type);
+ this.minValue = isNullPage ? null : statValueBuilder.build(minValue);
+ this.maxValue = isNullPage ? null : statValueBuilder.build(maxValue);
+
+ if (isNullPage) {
+ // By specification null pages have empty byte arrays as min/max values
+ validateContract(!minValue.hasRemaining(),
+ NULL_PAGE_HAS_NO_MIN,
+ () -> statValueBuilder.build(minValue).toString());
+ validateContract(!maxValue.hasRemaining(),
+ NULL_PAGE_HAS_NO_MAX,
+ () -> statValueBuilder.build(maxValue).toString());
+ } else if (prevMinValue != null) {
+ validateBoundaryOrder(statValueBuilder.build(prevMinValue), statValueBuilder.build(prevMaxValue),
+ boundaryOrder);
+ }
+ }
+
+ void validateValuesBelongingToRow() {
+ do {
+ if (columnReader.getCurrentDefinitionLevel() == maxDefinitionLevel) {
+ validateValue();
+ } else {
+ ++nullCountActual;
+ }
+ columnReader.consume();
+ } while (columnReader.getCurrentRepetitionLevel() != 0);
+ }
+
+ void finishPage() {
+ validateContract(nullCountInIndex == nullCountActual,
+ NULL_COUNT_CORRECT,
+ () -> Long.toString(nullCountActual),
+ () -> Long.toString(nullCountInIndex));
+ }
+
+ void validateContract(boolean contractCondition,
+ Contract type,
+ Supplier<String> value1) {
+ validateContract(contractCondition, type, value1, () -> "N/A");
+ }
+
+ void validateContract(boolean contractCondition,
+ Contract type,
+ Supplier<String> value1,
+ Supplier<String> value2) {
+ if (!contractCondition && !pageViolations.contains(type)) {
+ violations.add(
+ new ContractViolation(type, value1.get(), value2.get(), rowGroupNumber,
+ columnNumber, columnPath, pageNumber));
+ pageViolations.add(type);
+ }
+ }
+
+ private void validateValue() {
+ validateContract(!isNullPage,
+ NULL_PAGE_HAS_NO_VALUES,
+ () -> statValueBuilder.stringifyValue(columnReader));
+ validateContract(minValue.compareToValue(columnReader) <= 0,
+ MIN_LTEQ_VALUE,
+ () -> statValueBuilder.stringifyValue(columnReader),
+ minValue::toString);
+ validateContract(maxValue.compareToValue(columnReader) >= 0,
+ MAX_GTEQ_VALUE,
+ () -> statValueBuilder.stringifyValue(columnReader),
+ maxValue::toString);
+ }
+
+ private void validateBoundaryOrder(StatValue prevMinValue, StatValue prevMaxValue, BoundaryOrder boundaryOrder) {
+ switch (boundaryOrder) {
+ case ASCENDING:
+ validateContract(minValue.compareTo(prevMinValue) >= 0,
+ MIN_ASCENDING,
+ minValue::toString,
+ prevMinValue::toString);
+ validateContract(maxValue.compareTo(prevMaxValue) >= 0,
+ MAX_ASCENDING,
+ maxValue::toString,
+ prevMaxValue::toString);
+ break;
+ case DESCENDING:
+ validateContract(minValue.compareTo(prevMinValue) <= 0,
+ MIN_DESCENDING,
+ minValue::toString,
+ prevMinValue::toString);
+ validateContract(maxValue.compareTo(prevMaxValue) <= 0,
+ MAX_DESCENDING,
+ maxValue::toString,
+ prevMaxValue::toString);
+ break;
+ case UNORDERED:
+ // No checks necessary.
+ }
+ }
+ }
+
+ public static List<ContractViolation> checkContractViolations(InputFile file) throws IOException {
+ List<ContractViolation> violations = new ArrayList<>();
+ try (ParquetFileReader reader = ParquetFileReader.open(file)) {
+ FileMetaData meta = reader.getFooter().getFileMetaData();
+ MessageType schema = meta.getSchema();
+ List<ColumnDescriptor> columns = schema.getColumns();
+
+ List<BlockMetaData> blocks = reader.getFooter().getBlocks();
+ int rowGroupNumber = 0;
+ PageReadStore rowGroup = reader.readNextRowGroup();
+ while (rowGroup != null) {
+ ColumnReadStore columnReadStore = new ColumnReadStoreImpl(rowGroup,
+ new DummyRecordConverter(schema).getRootConverter(), schema, null);
+ List<ColumnChunkMetaData> columnChunks = blocks.get(rowGroupNumber).getColumns();
+ assert (columnChunks.size() == columns.size());
+ for (int columnNumber = 0; columnNumber < columns.size(); ++columnNumber) {
+ ColumnDescriptor column = columns.get(columnNumber);
+ ColumnChunkMetaData columnChunk = columnChunks.get(columnNumber);
+ ColumnIndex columnIndex = reader.readColumnIndex(columnChunk);
+ if (columnIndex == null) {
+ continue;
+ }
+ ColumnPath columnPath = columnChunk.getPath();
+ OffsetIndex offsetIndex = reader.readOffsetIndex(columnChunk);
+ List<ByteBuffer> minValues = columnIndex.getMinValues();
+ List<ByteBuffer> maxValues = columnIndex.getMaxValues();
+ BoundaryOrder boundaryOrder = columnIndex.getBoundaryOrder();
+ List<Long> nullCounts = columnIndex.getNullCounts();
+ List<Boolean> nullPages = columnIndex.getNullPages();
+ long rowNumber = 0;
+ ColumnReader columnReader = columnReadStore.getColumnReader(column);
+ ByteBuffer prevMinValue = null;
+ ByteBuffer prevMaxValue = null;
+ for (int pageNumber = 0; pageNumber < offsetIndex.getPageCount(); ++pageNumber) {
+ boolean isNullPage = nullPages.get(pageNumber);
+ ByteBuffer minValue = minValues.get(pageNumber);
+ ByteBuffer maxValue = maxValues.get(pageNumber);
+ PageValidator pageValidator = new PageValidator(
+ column.getPrimitiveType(),
+ rowGroupNumber, columnNumber, columnPath, pageNumber,
+ violations, columnReader,
+ minValue,
+ maxValue,
+ prevMinValue,
+ prevMaxValue,
+ boundaryOrder,
+ nullCounts.get(pageNumber),
+ isNullPage);
+ if (!isNullPage) {
+ prevMinValue = minValue;
+ prevMaxValue = maxValue;
+ }
+ long lastRowNumberInPage = offsetIndex.getLastRowIndex(pageNumber, rowGroup.getRowCount());
+ while (rowNumber <= lastRowNumberInPage) {
+ pageValidator.validateValuesBelongingToRow();
+ ++rowNumber;
+ }
+ pageValidator.finishPage();
+ }
+ }
+ rowGroup = reader.readNextRowGroup();
+ rowGroupNumber++;
+ }
+ }
+ return violations;
+ }
+}
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/statistics/RandomValues.java b/parquet-hadoop/src/test/java/org/apache/parquet/statistics/RandomValues.java
index 16db5cb..152f6ec 100644
--- a/parquet-hadoop/src/test/java/org/apache/parquet/statistics/RandomValues.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/statistics/RandomValues.java
@@ -19,14 +19,20 @@
package org.apache.parquet.statistics;
-import org.apache.parquet.io.api.Binary;
import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
import java.util.Random;
+import java.util.function.Supplier;
+
+import org.apache.parquet.io.api.Binary;
public class RandomValues {
private static final String ALPHABET = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz1234567890";
- static abstract class RandomValueGenerator<T extends Comparable<T>> {
+ static abstract class RandomValueGenerator<T extends Comparable<T>> implements Supplier<T> {
private final Random random;
protected RandomValueGenerator(long seed) {
@@ -80,6 +86,11 @@ public class RandomValues {
}
public abstract T nextValue();
+
+ @Override
+ public T get() {
+ return nextValue();
+ }
}
static abstract class RandomBinaryBase<T extends Comparable<T>> extends RandomValueGenerator<T> {
@@ -277,7 +288,6 @@ public class RandomValues {
return asReusedBinary(nextValue().getBytes());
}
}
-
public static class BinaryGenerator extends RandomBinaryBase<Binary> {
private static final int MAX_STRING_LENGTH = 16;
public BinaryGenerator(long seed) {
@@ -339,4 +349,34 @@ public class RandomValues {
public T minimum() { return this.minimum; }
public T maximum() { return this.maximum; }
}
+
+ public static Supplier<Binary> binaryStringGenerator(long seed) {
+ final StringGenerator generator = new StringGenerator(seed);
+ return generator::nextBinaryValue;
+ }
+
+ public static Supplier<Binary> int96Generator(long seed) {
+ final Int96Generator generator = new Int96Generator(seed);
+ return generator::nextBinaryValue;
+ }
+
+ public static <T extends Comparable<T>> Supplier<T> wrapSorted(Supplier<T> supplier,
+ int recordCount, boolean ascending) {
+ return wrapSorted(supplier, recordCount, ascending, (a, b) -> a.compareTo(b));
+ }
+
+ public static <T> Supplier<T> wrapSorted(Supplier<T> supplier, int recordCount, boolean ascending,
+ Comparator<T> cmp) {
+ List<T> values = new ArrayList<>(recordCount);
+ for (int i = 0; i < recordCount; ++i) {
+ values.add(supplier.get());
+ }
+ if (ascending) {
+ values.sort(cmp);
+ } else {
+ values.sort((a, b) -> cmp.compare(b, a));
+ }
+ final Iterator<T> it = values.iterator();
+ return it::next;
+ }
}
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/statistics/TestColumnIndexes.java b/parquet-hadoop/src/test/java/org/apache/parquet/statistics/TestColumnIndexes.java
new file mode 100644
index 0000000..aac8e43
--- /dev/null
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/statistics/TestColumnIndexes.java
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.statistics;
+
+import static org.apache.parquet.schema.LogicalTypeAnnotation.bsonType;
+import static org.apache.parquet.schema.LogicalTypeAnnotation.dateType;
+import static org.apache.parquet.schema.LogicalTypeAnnotation.decimalType;
+import static org.apache.parquet.schema.LogicalTypeAnnotation.enumType;
+import static org.apache.parquet.schema.LogicalTypeAnnotation.intType;
+import static org.apache.parquet.schema.LogicalTypeAnnotation.jsonType;
+import static org.apache.parquet.schema.LogicalTypeAnnotation.stringType;
+import static org.apache.parquet.schema.LogicalTypeAnnotation.timeType;
+import static org.apache.parquet.schema.LogicalTypeAnnotation.timestampType;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FLOAT;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT96;
+import static org.apache.parquet.schema.Type.Repetition.OPTIONAL;
+import static org.apache.parquet.schema.Type.Repetition.REQUIRED;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Random;
+import java.util.function.Supplier;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.GroupFactory;
+import org.apache.parquet.example.data.simple.SimpleGroupFactory;
+import org.apache.parquet.hadoop.ColumnIndexValidator;
+import org.apache.parquet.hadoop.ColumnIndexValidator.ContractViolation;
+import org.apache.parquet.hadoop.ParquetOutputFormat;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.example.ExampleParquetWriter;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.LogicalTypeAnnotation.TimeUnit;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.OriginalType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Type;
+import org.apache.parquet.schema.Types;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.io.Files;
+
+@RunWith(Parameterized.class)
+public class TestColumnIndexes {
+ private static final int MAX_TOTAL_ROWS = 100_000;
+ private static final MessageType SCHEMA = new MessageType("schema",
+ new PrimitiveType(OPTIONAL, INT32, "i32"),
+ new PrimitiveType(OPTIONAL, INT64, "i64"),
+ new PrimitiveType(OPTIONAL, INT96, "i96"),
+ new PrimitiveType(OPTIONAL, FLOAT, "sngl"),
+ new PrimitiveType(OPTIONAL, DOUBLE, "dbl"),
+ new PrimitiveType(OPTIONAL, BINARY, "strings"),
+ new PrimitiveType(OPTIONAL, BINARY, "binary"),
+ new PrimitiveType(OPTIONAL, FIXED_LEN_BYTE_ARRAY, 17, "fixed-binary"),
+ new PrimitiveType(REQUIRED, INT32, "unconstrained-i32"),
+ new PrimitiveType(REQUIRED, INT64, "unconstrained-i64"),
+ new PrimitiveType(REQUIRED, FLOAT, "unconstrained-sngl"),
+ new PrimitiveType(REQUIRED, DOUBLE, "unconstrained-dbl"),
+ Types.optional(INT32).as(intType(8, true)).named("int8"),
+ Types.optional(INT32).as(intType(8, false)).named("uint8"),
+ Types.optional(INT32).as(intType(16, true)).named("int16"),
+ Types.optional(INT32).as(intType(16, false)).named("uint16"),
+ Types.optional(INT32).as(intType(32, true)).named("int32"),
+ Types.optional(INT32).as(intType(32, false)).named("uint32"),
+ Types.optional(INT64).as(intType(64, true)).named("int64"),
+ Types.optional(INT64).as(intType(64, false)).named("uint64"),
+ Types.optional(INT32).as(decimalType(2, 9)).named("decimal-int32"),
+ Types.optional(INT64).as(decimalType(4, 18)).named("decimal-int64"),
+ Types.optional(FIXED_LEN_BYTE_ARRAY).length(19).as(decimalType(25, 45)).named("decimal-fixed"),
+ Types.optional(BINARY).as(decimalType(20, 38)).named("decimal-binary"),
+ Types.optional(BINARY).as(stringType()).named("utf8"),
+ Types.optional(BINARY).as(enumType()).named("enum"),
+ Types.optional(BINARY).as(jsonType()).named("json"),
+ Types.optional(BINARY).as(bsonType()).named("bson"),
+ Types.optional(INT32).as(dateType()).named("date"),
+ Types.optional(INT32).as(timeType(true, TimeUnit.MILLIS)).named("time-millis"),
+ Types.optional(INT64).as(timeType(false, TimeUnit.MICROS)).named("time-micros"),
+ Types.optional(INT64).as(timestampType(true, TimeUnit.MILLIS)).named("timestamp-millis"),
+ Types.optional(INT64).as(timestampType(false, TimeUnit.NANOS)).named("timestamp-nanos"),
+ Types.optional(FIXED_LEN_BYTE_ARRAY).length(12).as(OriginalType.INTERVAL).named("interval"),
+ Types.optional(BINARY).as(stringType()).named("always-null"));
+
+ private static List<Supplier<?>> buildGenerators(int recordCount, Random random) {
+ int fieldIndex = 0;
+ return Arrays.<Supplier<?>>asList(
+ sortedOrRandom(new RandomValues.IntGenerator(random.nextLong()), random, recordCount, fieldIndex++),
+ sortedOrRandom(new RandomValues.LongGenerator(random.nextLong()), random, recordCount, fieldIndex++),
+ sortedOrRandom(RandomValues.int96Generator(random.nextLong()), random, recordCount, fieldIndex++),
+ sortedOrRandom(new RandomValues.FloatGenerator(random.nextLong()), random, recordCount, fieldIndex++),
+ sortedOrRandom(new RandomValues.DoubleGenerator(random.nextLong()), random, recordCount, fieldIndex++),
+ sortedOrRandom(RandomValues.binaryStringGenerator(random.nextLong()), random, recordCount, fieldIndex++),
+ sortedOrRandom(new RandomValues.BinaryGenerator(random.nextLong()), random, recordCount, fieldIndex++),
+ sortedOrRandom(new RandomValues.FixedGenerator(random.nextLong(), 17), random, recordCount, fieldIndex++),
+ sortedOrRandom(new RandomValues.UnconstrainedIntGenerator(random.nextLong()), random, recordCount,
+ fieldIndex++),
+ sortedOrRandom(new RandomValues.UnconstrainedLongGenerator(random.nextLong()), random, recordCount,
+ fieldIndex++),
+ sortedOrRandom(new RandomValues.UnconstrainedFloatGenerator(random.nextLong()), random, recordCount,
+ fieldIndex++),
+ sortedOrRandom(new RandomValues.UnconstrainedDoubleGenerator(random.nextLong()), random, recordCount,
+ fieldIndex++),
+ sortedOrRandom(new RandomValues.IntGenerator(random.nextLong(), Byte.MIN_VALUE, Byte.MAX_VALUE), random,
+ recordCount, fieldIndex++),
+ sortedOrRandom(new RandomValues.UIntGenerator(random.nextLong(), Byte.MIN_VALUE, Byte.MAX_VALUE), random,
+ recordCount, fieldIndex++),
+ sortedOrRandom(new RandomValues.IntGenerator(random.nextLong(), Short.MIN_VALUE, Short.MAX_VALUE), random,
+ recordCount, fieldIndex++),
+ sortedOrRandom(new RandomValues.UIntGenerator(random.nextLong(), Short.MIN_VALUE, Short.MAX_VALUE), random,
+ recordCount, fieldIndex++),
+ sortedOrRandom(new RandomValues.UnconstrainedIntGenerator(random.nextLong()), random, recordCount,
+ fieldIndex++),
+ sortedOrRandom(new RandomValues.UnconstrainedIntGenerator(random.nextLong()), random, recordCount,
+ fieldIndex++),
+ sortedOrRandom(new RandomValues.UnconstrainedLongGenerator(random.nextLong()), random, recordCount,
+ fieldIndex++),
+ sortedOrRandom(new RandomValues.UnconstrainedLongGenerator(random.nextLong()), random, recordCount,
+ fieldIndex++),
+ sortedOrRandom(new RandomValues.UnconstrainedIntGenerator(random.nextLong()), random, recordCount,
+ fieldIndex++),
+ sortedOrRandom(new RandomValues.UnconstrainedLongGenerator(random.nextLong()), random, recordCount,
+ fieldIndex++),
+ sortedOrRandom(new RandomValues.FixedGenerator(random.nextLong(), 19), random, recordCount, fieldIndex++),
+ sortedOrRandom(new RandomValues.BinaryGenerator(random.nextLong()), random, recordCount, fieldIndex++),
+ sortedOrRandom(RandomValues.binaryStringGenerator(random.nextLong()), random, recordCount, fieldIndex++),
+ sortedOrRandom(RandomValues.binaryStringGenerator(random.nextLong()), random, recordCount, fieldIndex++),
+ sortedOrRandom(RandomValues.binaryStringGenerator(random.nextLong()), random, recordCount, fieldIndex++),
+ sortedOrRandom(new RandomValues.BinaryGenerator(random.nextLong()), random, recordCount, fieldIndex++),
+ sortedOrRandom(new RandomValues.IntGenerator(random.nextLong()), random, recordCount, fieldIndex++),
+ sortedOrRandom(new RandomValues.IntGenerator(random.nextLong()), random, recordCount, fieldIndex++),
+ sortedOrRandom(new RandomValues.LongGenerator(random.nextLong()), random, recordCount, fieldIndex++),
+ sortedOrRandom(new RandomValues.LongGenerator(random.nextLong()), random, recordCount, fieldIndex++),
+ sortedOrRandom(new RandomValues.LongGenerator(random.nextLong()), random, recordCount, fieldIndex++),
+ sortedOrRandom(new RandomValues.FixedGenerator(random.nextLong(), 12), random, recordCount, fieldIndex++),
+ null);
+ }
+
+ private static <T> Supplier<T> sortedOrRandom(Supplier<T> generator, Random random, int recordCount, int fieldIndex) {
+ Comparator<T> cmp = SCHEMA.getType(fieldIndex).asPrimitiveType().comparator();
+
+ // 20% chance for ascending, 20% for descending, 60% to remain random
+ switch (random.nextInt(5)) {
+ case 1:
+ return RandomValues.wrapSorted(generator, recordCount, true, cmp);
+ case 2:
+ return RandomValues.wrapSorted(generator, recordCount, false, cmp);
+ default:
+ return generator;
+ }
+ }
+
+ public static class WriteContext {
+ private static final GroupFactory FACTORY = new SimpleGroupFactory(SCHEMA);
+ private final long seed;
+ private final int pageRowCountLimit;
+ private final int columnIndexTruncateLength;
+
+ private WriteContext(long seed, int pageRowCountLimit, int columnIndexTruncateLength) {
+ this.seed = seed;
+ this.pageRowCountLimit = pageRowCountLimit;
+ this.columnIndexTruncateLength = columnIndexTruncateLength;
+ }
+
+ public Path write(Path directory) throws IOException {
+ Path file = new Path(directory, "testColumnIndexes_" + this + ".parquet");
+ Random random = new Random(seed);
+ int recordCount = random.nextInt(MAX_TOTAL_ROWS) + 1;
+ List<Supplier<?>> generators = buildGenerators(recordCount, random);
+ Configuration conf = new Configuration();
+ ParquetOutputFormat.setColumnIndexTruncateLength(conf, columnIndexTruncateLength);
+ try (ParquetWriter<Group> writer = ExampleParquetWriter.builder(file)
+ .withType(SCHEMA)
+ .withPageRowCountLimit(pageRowCountLimit)
+ .withConf(conf)
+ .build()) {
+ for (int i = 0; i < recordCount; i++) {
+ writer.write(createGroup(generators, random));
+ }
+ }
+ return file;
+ }
+
+ private Group createGroup(List<Supplier<?>> generators, Random random) {
+ Group group = FACTORY.newGroup();
+ for (int column = 0, columnCnt = SCHEMA.getFieldCount(); column < columnCnt; ++column) {
+ Type type = SCHEMA.getType(column);
+ Supplier<?> generator = generators.get(column);
+ // 2% chance of null value for an optional column
+ if (generator == null || (type.isRepetition(OPTIONAL) && random.nextInt(50) == 0)) {
+ continue;
+ }
+ switch (type.asPrimitiveType().getPrimitiveTypeName()) {
+ case BINARY:
+ case FIXED_LEN_BYTE_ARRAY:
+ case INT96:
+ group.append(type.getName(), (Binary) generator.get());
+ break;
+ case INT32:
+ group.append(type.getName(), (Integer) generator.get());
+ break;
+ case INT64:
+ group.append(type.getName(), (Long) generator.get());
+ break;
+ case FLOAT:
+ group.append(type.getName(), (Float) generator.get());
+ break;
+ case DOUBLE:
+ group.append(type.getName(), (Double) generator.get());
+ break;
+ case BOOLEAN:
+ group.append(type.getName(), (Boolean) generator.get());
+ break;
+ }
+ }
+ return group;
+ }
+
+ @Override
+ public String toString() {
+ return "seed=" + seed
+ + ",pageRowCountLimit=" + pageRowCountLimit
+ + ",columnIndexTruncateLength=" + columnIndexTruncateLength;
+ }
+ }
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(TestColumnIndexes.class);
+
+ @Rule
+ public TemporaryFolder tmp = new TemporaryFolder();
+
+ @Parameters
+ public static Collection<WriteContext> getContexts() {
+ return Arrays.asList(
+ new WriteContext(System.nanoTime(), 1000, 8),
+ new WriteContext(System.nanoTime(), 20000, 64),
+ new WriteContext(System.nanoTime(), 50000, 10));
+ }
+
+ public TestColumnIndexes(WriteContext context) {
+ this.context = context;
+ }
+
+ private final WriteContext context;
+
+ @Test
+ public void testColumnIndexes() throws IOException {
+ LOGGER.info("Starting test with context: {}", context);
+
+ Path file = null;
+ try {
+ file = context.write(new Path(tmp.getRoot().getAbsolutePath()));
+ LOGGER.info("Parquet file \"{}\" is successfully created for the context: {}", file, context);
+
+ List<ContractViolation> violations = ColumnIndexValidator
+ .checkContractViolations(HadoopInputFile.fromPath(file, new Configuration()));
+ assertTrue(violations.toString(), violations.isEmpty());
+ } finally {
+ if (file != null) {
+ file.getFileSystem(new Configuration()).delete(file, false);
+ }
+ }
+ }
+}