You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by ga...@apache.org on 2019/10/18 06:37:19 UTC

[parquet-mr] branch master updated: PARQUET-1650: Implement unit test to validate column/offset indexes (#675)

This is an automated email from the ASF dual-hosted git repository.

gabor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git


The following commit(s) were added to refs/heads/master by this push:
     new 0c6a650  PARQUET-1650: Implement unit test to validate column/offset indexes (#675)
0c6a650 is described below

commit 0c6a650a01d4075775af8aecdca14af78c5e7157
Author: Gabor Szadovszky <ga...@apache.org>
AuthorDate: Fri Oct 18 08:37:12 2019 +0200

    PARQUET-1650: Implement unit test to validate column/offset indexes (#675)
---
 .../parquet/hadoop/ColumnIndexValidator.java       | 613 +++++++++++++++++++++
 .../apache/parquet/statistics/RandomValues.java    |  46 +-
 .../parquet/statistics/TestColumnIndexes.java      | 300 ++++++++++
 3 files changed, 956 insertions(+), 3 deletions(-)

diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnIndexValidator.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnIndexValidator.java
new file mode 100644
index 0000000..b9cb4ab
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnIndexValidator.java
@@ -0,0 +1,613 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.hadoop;
+
+import static org.apache.parquet.hadoop.ColumnIndexValidator.Contract.MAX_ASCENDING;
+import static org.apache.parquet.hadoop.ColumnIndexValidator.Contract.MAX_DESCENDING;
+import static org.apache.parquet.hadoop.ColumnIndexValidator.Contract.MAX_GTEQ_VALUE;
+import static org.apache.parquet.hadoop.ColumnIndexValidator.Contract.MIN_ASCENDING;
+import static org.apache.parquet.hadoop.ColumnIndexValidator.Contract.MIN_DESCENDING;
+import static org.apache.parquet.hadoop.ColumnIndexValidator.Contract.MIN_LTEQ_VALUE;
+import static org.apache.parquet.hadoop.ColumnIndexValidator.Contract.NULL_COUNT_CORRECT;
+import static org.apache.parquet.hadoop.ColumnIndexValidator.Contract.NULL_PAGE_HAS_NO_MAX;
+import static org.apache.parquet.hadoop.ColumnIndexValidator.Contract.NULL_PAGE_HAS_NO_MIN;
+import static org.apache.parquet.hadoop.ColumnIndexValidator.Contract.NULL_PAGE_HAS_NO_VALUES;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Set;
+import java.util.function.Supplier;
+
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.ColumnReadStore;
+import org.apache.parquet.column.ColumnReader;
+import org.apache.parquet.column.impl.ColumnReadStoreImpl;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.example.DummyRecordConverter;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.hadoop.metadata.FileMetaData;
+import org.apache.parquet.internal.column.columnindex.BoundaryOrder;
+import org.apache.parquet.internal.column.columnindex.ColumnIndex;
+import org.apache.parquet.internal.column.columnindex.OffsetIndex;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveComparator;
+import org.apache.parquet.schema.PrimitiveStringifier;
+import org.apache.parquet.schema.PrimitiveType;
+
+public class ColumnIndexValidator {
+
+  public enum Contract {
+    MIN_LTEQ_VALUE(
+        "The min value stored in the index for the page must be less than or equal to all values in the page.\n"
+            + "Actual value in the page: %s\n"
+            + "Min value in the index: %s\n"),
+    MAX_GTEQ_VALUE(
+        "The max value stored in the index for the page must be greater than or equal to all values in the page.\n"
+            + "Actual value in the page: %s\n"
+            + "Max value in the index: %s\n"),
+    NULL_COUNT_CORRECT(
+        "The null count stored in the index for the page must be equal to the number of nulls in the page.\n"
+            + "Actual null count: %s\n"
+            + "Null count in the index: %s\n"),
+    NULL_PAGE_HAS_NO_VALUES("Only pages consisting entirely of NULL-s can be marked as a null page in the index.\n"
+        + "Actual non-null value in the page: %s"),
+    NULL_PAGE_HAS_NO_MIN("A null page shall not have a min value in the index\n"
+        + "Min value in the index: %s\n"),
+    NULL_PAGE_HAS_NO_MAX("A null page shall not have a max value in the index\n"
+        + "Max value in the index: %s\n"),
+    MIN_ASCENDING(
+        "According to the ASCENDING boundary order, the min value for a page must be greater than or equal to the min value of the previous page.\n"
+            + "Min value for the page: %s\n"
+            + "Min value for the previous page: %s\n"),
+    MAX_ASCENDING(
+        "According to the ASCENDING boundary order, the max value for a page must be greater than or equal to the max value of the previous page.\n"
+            + "Max value for the page: %s\n"
+            + "Max value for the previous page: %s\n"),
+    MIN_DESCENDING(
+        "According to the DESCENDING boundary order, the min value for a page must be less than or equal to the min value of the previous page.\n"
+            + "Min value for the page: %s\n"
+            + "Min value for the previous page: %s\n"),
+    MAX_DESCENDING(
+        "According to the DESCENDING boundary order, the max value for a page must be less than or equal to the max value of the previous page.\n"
+            + "Max value for the page: %s\n"
+            + "Max value for the previous page: %s\n");
+
+    public final String description;
+
+    Contract(String description) {
+      this.description = description;
+    }
+  }
+
+  public static class ContractViolation {
+    public ContractViolation(Contract violatedContract, String referenceValue, String offendingValue,
+        int rowGroupNumber, int columnNumber, ColumnPath columnPath, int pageNumber) {
+      this.violatedContract = violatedContract;
+      this.referenceValue = referenceValue;
+      this.offendingValue = offendingValue;
+      this.rowGroupNumber = rowGroupNumber;
+      this.columnNumber = columnNumber;
+      this.columnPath = columnPath;
+      this.pageNumber = pageNumber;
+    }
+
+    private final Contract violatedContract;
+    private final String referenceValue;
+    private final String offendingValue;
+    private final int rowGroupNumber;
+    private final int columnNumber;
+    private final ColumnPath columnPath;
+    private final int pageNumber;
+
+    @Override
+    public String toString() {
+      return String.format(
+          "Contract violation\nLocation: row group %d, column %d (\"%s\"), page %d\nViolated contract: "
+              + violatedContract.description,
+          rowGroupNumber, columnNumber, columnPath.toDotString(), pageNumber,
+          referenceValue,
+          offendingValue);
+    }
+  }
+
+  static interface StatValue extends Comparable<StatValue> {
+    int compareToValue(ColumnReader reader);
+
+    abstract class Builder {
+      final PrimitiveComparator<Binary> comparator;
+      final PrimitiveStringifier stringifier;
+
+      Builder(PrimitiveType type) {
+        comparator = type.comparator();
+        stringifier = type.stringifier();
+      }
+
+      abstract StatValue build(ByteBuffer value);
+
+      abstract String stringifyValue(ColumnReader reader);
+    }
+  }
+
+  static StatValue.Builder getBuilder(PrimitiveType type) {
+    switch (type.getPrimitiveTypeName()) {
+    case BINARY:
+    case FIXED_LEN_BYTE_ARRAY:
+    case INT96:
+      return new BinaryStatValueBuilder(type);
+    case BOOLEAN:
+      return new BooleanStatValueBuilder(type);
+    case DOUBLE:
+      return new DoubleStatValueBuilder(type);
+    case FLOAT:
+      return new FloatStatValueBuilder(type);
+    case INT32:
+      return new IntStatValueBuilder(type);
+    case INT64:
+      return new LongStatValueBuilder(type);
+    default:
+      throw new IllegalArgumentException("Unsupported type: " + type);
+    }
+  }
+
+  private static class BinaryStatValueBuilder extends StatValue.Builder {
+    private class Value implements StatValue {
+      final Binary value;
+
+      private Value(Binary value) {
+        this.value = value;
+      }
+
+      @Override
+      public int compareTo(StatValue o) {
+        return comparator.compare(value, ((Value) o).value);
+      }
+
+      @Override
+      public int compareToValue(ColumnReader reader) {
+        return comparator.compare(value, reader.getBinary());
+      }
+
+      @Override
+      public String toString() {
+        return stringifier.stringify(value);
+      }
+    }
+
+    private BinaryStatValueBuilder(PrimitiveType type) {
+      super(type);
+    }
+
+    @Override
+    StatValue build(ByteBuffer value) {
+      return new Value(Binary.fromConstantByteBuffer(value));
+    }
+
+    @Override
+    String stringifyValue(ColumnReader reader) {
+      return stringifier.stringify(reader.getBinary());
+    }
+  }
+
+  private static class BooleanStatValueBuilder extends StatValue.Builder {
+    private class Value implements StatValue {
+      final boolean value;
+
+      private Value(boolean value) {
+        this.value = value;
+      }
+
+      @Override
+      public int compareTo(StatValue o) {
+        return comparator.compare(value, ((Value) o).value);
+      }
+
+      @Override
+      public int compareToValue(ColumnReader reader) {
+        return comparator.compare(value, reader.getBoolean());
+      }
+
+      @Override
+      public String toString() {
+        return stringifier.stringify(value);
+      }
+    }
+
+    private BooleanStatValueBuilder(PrimitiveType type) {
+      super(type);
+    }
+
+    @Override
+    StatValue build(ByteBuffer value) {
+      return new Value(value.get(0) != 0);
+    }
+
+    @Override
+    String stringifyValue(ColumnReader reader) {
+      return stringifier.stringify(reader.getBoolean());
+    }
+  }
+
+  private static class DoubleStatValueBuilder extends StatValue.Builder {
+    private class Value implements StatValue {
+      final double value;
+
+      private Value(double value) {
+        this.value = value;
+      }
+
+      @Override
+      public int compareTo(StatValue o) {
+        return comparator.compare(value, ((Value) o).value);
+      }
+
+      @Override
+      public int compareToValue(ColumnReader reader) {
+        return comparator.compare(value, reader.getDouble());
+      }
+
+      @Override
+      public String toString() {
+        return stringifier.stringify(value);
+      }
+    }
+
+    private DoubleStatValueBuilder(PrimitiveType type) {
+      super(type);
+    }
+
+    @Override
+    StatValue build(ByteBuffer value) {
+      return new Value(value.getDouble(0));
+    }
+
+    @Override
+    String stringifyValue(ColumnReader reader) {
+      return stringifier.stringify(reader.getDouble());
+    }
+  }
+
+  private static class FloatStatValueBuilder extends StatValue.Builder {
+    private class Value implements StatValue {
+      final float value;
+
+      private Value(float value) {
+        this.value = value;
+      }
+
+      @Override
+      public int compareTo(StatValue o) {
+        return comparator.compare(value, ((Value) o).value);
+      }
+
+      @Override
+      public int compareToValue(ColumnReader reader) {
+        return comparator.compare(value, reader.getFloat());
+      }
+
+      @Override
+      public String toString() {
+        return stringifier.stringify(value);
+      }
+    }
+
+    private FloatStatValueBuilder(PrimitiveType type) {
+      super(type);
+    }
+
+    @Override
+    StatValue build(ByteBuffer value) {
+      return new Value(value.getFloat(0));
+    }
+
+    @Override
+    String stringifyValue(ColumnReader reader) {
+      return stringifier.stringify(reader.getFloat());
+    }
+  }
+
+  private static class IntStatValueBuilder extends StatValue.Builder {
+    private class Value implements StatValue {
+      final int value;
+
+      private Value(int value) {
+        this.value = value;
+      }
+
+      @Override
+      public int compareTo(StatValue o) {
+        return comparator.compare(value, ((Value) o).value);
+      }
+
+      @Override
+      public int compareToValue(ColumnReader reader) {
+        return comparator.compare(value, reader.getInteger());
+      }
+
+      @Override
+      public String toString() {
+        return stringifier.stringify(value);
+      }
+    }
+
+    private IntStatValueBuilder(PrimitiveType type) {
+      super(type);
+    }
+
+    @Override
+    StatValue build(ByteBuffer value) {
+      return new Value(value.getInt(0));
+    }
+
+    @Override
+    String stringifyValue(ColumnReader reader) {
+      return stringifier.stringify(reader.getInteger());
+    }
+  }
+
+  private static class LongStatValueBuilder extends StatValue.Builder {
+    private class Value implements StatValue {
+      final long value;
+
+      private Value(long value) {
+        this.value = value;
+      }
+
+      @Override
+      public int compareTo(StatValue o) {
+        return comparator.compare(value, ((Value) o).value);
+      }
+
+      @Override
+      public int compareToValue(ColumnReader reader) {
+        return comparator.compare(value, reader.getLong());
+      }
+
+      @Override
+      public String toString() {
+        return stringifier.stringify(value);
+      }
+    }
+
+    private LongStatValueBuilder(PrimitiveType type) {
+      super(type);
+    }
+
+    @Override
+    StatValue build(ByteBuffer value) {
+      return new Value(value.getLong(0));
+    }
+
+    @Override
+    String stringifyValue(ColumnReader reader) {
+      return stringifier.stringify(reader.getLong());
+    }
+  }
+
+  private static class PageValidator {
+    private final int rowGroupNumber;
+    private final int columnNumber;
+    private final ColumnPath columnPath;
+    private final int pageNumber;
+    private final int maxDefinitionLevel;
+    private final long nullCountInIndex;
+    private long nullCountActual;
+    private final boolean isNullPage;
+    private final ColumnReader columnReader;
+    private final List<ContractViolation> violations;
+    private final Set<Contract> pageViolations = EnumSet.noneOf(Contract.class);
+    private final StatValue minValue;
+    private final StatValue maxValue;
+    private final StatValue.Builder statValueBuilder;
+
+    PageValidator(
+        PrimitiveType type,
+        int rowGroupNumber,
+        int columnNumber,
+        ColumnPath columnPath,
+        int pageNumber,
+        List<ContractViolation> violations,
+        ColumnReader columnReader,
+        ByteBuffer minValue,
+        ByteBuffer maxValue,
+        ByteBuffer prevMinValue,
+        ByteBuffer prevMaxValue,
+        BoundaryOrder boundaryOrder,
+        long nullCount,
+        boolean isNullPage) {
+      this.columnReader = columnReader;
+      this.rowGroupNumber = rowGroupNumber;
+      this.columnNumber = columnNumber;
+      this.columnPath = columnPath;
+      this.pageNumber = pageNumber;
+      this.nullCountInIndex = nullCount;
+      this.nullCountActual = 0;
+      this.isNullPage = isNullPage;
+      this.maxDefinitionLevel = columnReader.getDescriptor().getMaxDefinitionLevel();
+      this.violations = violations;
+      this.statValueBuilder = getBuilder(type);
+      this.minValue = isNullPage ? null : statValueBuilder.build(minValue);
+      this.maxValue = isNullPage ? null : statValueBuilder.build(maxValue);
+
+      if (isNullPage) {
+        // By specification null pages have empty byte arrays as min/max values
+        validateContract(!minValue.hasRemaining(),
+            NULL_PAGE_HAS_NO_MIN,
+            () -> statValueBuilder.build(minValue).toString());
+        validateContract(!maxValue.hasRemaining(),
+            NULL_PAGE_HAS_NO_MAX,
+            () -> statValueBuilder.build(maxValue).toString());
+      } else if (prevMinValue != null) {
+        validateBoundaryOrder(statValueBuilder.build(prevMinValue), statValueBuilder.build(prevMaxValue),
+            boundaryOrder);
+      }
+    }
+
+    void validateValuesBelongingToRow() {
+      do {
+        if (columnReader.getCurrentDefinitionLevel() == maxDefinitionLevel) {
+          validateValue();
+        } else {
+          ++nullCountActual;
+        }
+        columnReader.consume();
+      } while (columnReader.getCurrentRepetitionLevel() != 0);
+    }
+
+    void finishPage() {
+      validateContract(nullCountInIndex == nullCountActual,
+          NULL_COUNT_CORRECT,
+          () -> Long.toString(nullCountActual),
+          () -> Long.toString(nullCountInIndex));
+    }
+
+    void validateContract(boolean contractCondition,
+        Contract type,
+        Supplier<String> value1) {
+      validateContract(contractCondition, type, value1, () -> "N/A");
+    }
+
+    void validateContract(boolean contractCondition,
+        Contract type,
+        Supplier<String> value1,
+        Supplier<String> value2) {
+      if (!contractCondition && !pageViolations.contains(type)) {
+        violations.add(
+            new ContractViolation(type, value1.get(), value2.get(), rowGroupNumber,
+                columnNumber, columnPath, pageNumber));
+        pageViolations.add(type);
+      }
+    }
+
+    private void validateValue() {
+      validateContract(!isNullPage,
+          NULL_PAGE_HAS_NO_VALUES,
+          () -> statValueBuilder.stringifyValue(columnReader));
+      validateContract(minValue.compareToValue(columnReader) <= 0,
+          MIN_LTEQ_VALUE,
+          () -> statValueBuilder.stringifyValue(columnReader),
+          minValue::toString);
+      validateContract(maxValue.compareToValue(columnReader) >= 0,
+          MAX_GTEQ_VALUE,
+          () -> statValueBuilder.stringifyValue(columnReader),
+          maxValue::toString);
+    }
+
+    private void validateBoundaryOrder(StatValue prevMinValue, StatValue prevMaxValue, BoundaryOrder boundaryOrder) {
+      switch (boundaryOrder) {
+      case ASCENDING:
+        validateContract(minValue.compareTo(prevMinValue) >= 0,
+            MIN_ASCENDING,
+            minValue::toString,
+            prevMinValue::toString);
+        validateContract(maxValue.compareTo(prevMaxValue) >= 0,
+            MAX_ASCENDING,
+            maxValue::toString,
+            prevMaxValue::toString);
+        break;
+      case DESCENDING:
+        validateContract(minValue.compareTo(prevMinValue) <= 0,
+            MIN_DESCENDING,
+            minValue::toString,
+            prevMinValue::toString);
+        validateContract(maxValue.compareTo(prevMaxValue) <= 0,
+            MAX_DESCENDING,
+            maxValue::toString,
+            prevMaxValue::toString);
+        break;
+      case UNORDERED:
+        // No checks necessary.
+      }
+    }
+  }
+
+  public static List<ContractViolation> checkContractViolations(InputFile file) throws IOException {
+    List<ContractViolation> violations = new ArrayList<>();
+    try (ParquetFileReader reader = ParquetFileReader.open(file)) {
+      FileMetaData meta = reader.getFooter().getFileMetaData();
+      MessageType schema = meta.getSchema();
+      List<ColumnDescriptor> columns = schema.getColumns();
+
+      List<BlockMetaData> blocks = reader.getFooter().getBlocks();
+      int rowGroupNumber = 0;
+      PageReadStore rowGroup = reader.readNextRowGroup();
+      while (rowGroup != null) {
+        ColumnReadStore columnReadStore = new ColumnReadStoreImpl(rowGroup,
+            new DummyRecordConverter(schema).getRootConverter(), schema, null);
+        List<ColumnChunkMetaData> columnChunks = blocks.get(rowGroupNumber).getColumns();
+        assert (columnChunks.size() == columns.size());
+        for (int columnNumber = 0; columnNumber < columns.size(); ++columnNumber) {
+          ColumnDescriptor column = columns.get(columnNumber);
+          ColumnChunkMetaData columnChunk = columnChunks.get(columnNumber);
+          ColumnIndex columnIndex = reader.readColumnIndex(columnChunk);
+          if (columnIndex == null) {
+            continue;
+          }
+          ColumnPath columnPath = columnChunk.getPath();
+          OffsetIndex offsetIndex = reader.readOffsetIndex(columnChunk);
+          List<ByteBuffer> minValues = columnIndex.getMinValues();
+          List<ByteBuffer> maxValues = columnIndex.getMaxValues();
+          BoundaryOrder boundaryOrder = columnIndex.getBoundaryOrder();
+          List<Long> nullCounts = columnIndex.getNullCounts();
+          List<Boolean> nullPages = columnIndex.getNullPages();
+          long rowNumber = 0;
+          ColumnReader columnReader = columnReadStore.getColumnReader(column);
+          ByteBuffer prevMinValue = null;
+          ByteBuffer prevMaxValue = null;
+          for (int pageNumber = 0; pageNumber < offsetIndex.getPageCount(); ++pageNumber) {
+            boolean isNullPage = nullPages.get(pageNumber);
+            ByteBuffer minValue = minValues.get(pageNumber);
+            ByteBuffer maxValue = maxValues.get(pageNumber);
+            PageValidator pageValidator = new PageValidator(
+                column.getPrimitiveType(),
+                rowGroupNumber, columnNumber, columnPath, pageNumber,
+                violations, columnReader,
+                minValue,
+                maxValue,
+                prevMinValue,
+                prevMaxValue,
+                boundaryOrder,
+                nullCounts.get(pageNumber),
+                isNullPage);
+            if (!isNullPage) {
+              prevMinValue = minValue;
+              prevMaxValue = maxValue;
+            }
+            long lastRowNumberInPage = offsetIndex.getLastRowIndex(pageNumber, rowGroup.getRowCount());
+            while (rowNumber <= lastRowNumberInPage) {
+              pageValidator.validateValuesBelongingToRow();
+              ++rowNumber;
+            }
+            pageValidator.finishPage();
+          }
+        }
+        rowGroup = reader.readNextRowGroup();
+        rowGroupNumber++;
+      }
+    }
+    return violations;
+  }
+}
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/statistics/RandomValues.java b/parquet-hadoop/src/test/java/org/apache/parquet/statistics/RandomValues.java
index 16db5cb..152f6ec 100644
--- a/parquet-hadoop/src/test/java/org/apache/parquet/statistics/RandomValues.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/statistics/RandomValues.java
@@ -19,14 +19,20 @@
 
 package org.apache.parquet.statistics;
 
-import org.apache.parquet.io.api.Binary;
 import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
 import java.util.Random;
+import java.util.function.Supplier;
+
+import org.apache.parquet.io.api.Binary;
 
 public class RandomValues {
   private static final String ALPHABET = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz1234567890";
 
-  static abstract class RandomValueGenerator<T extends Comparable<T>> {
+  static abstract class RandomValueGenerator<T extends Comparable<T>> implements Supplier<T> {
     private final Random random;
 
     protected RandomValueGenerator(long seed) {
@@ -80,6 +86,11 @@ public class RandomValues {
     }
 
     public abstract T nextValue();
+
+    @Override
+    public T get() {
+      return nextValue();
+    }
   }
 
   static abstract class RandomBinaryBase<T extends Comparable<T>> extends RandomValueGenerator<T> {
@@ -277,7 +288,6 @@ public class RandomValues {
       return asReusedBinary(nextValue().getBytes());
     }
   }
-
   public static class BinaryGenerator extends RandomBinaryBase<Binary> {
     private static final int MAX_STRING_LENGTH = 16;
     public BinaryGenerator(long seed) {
@@ -339,4 +349,34 @@ public class RandomValues {
     public T minimum() { return this.minimum; }
     public T maximum() { return this.maximum; }
   }
+
+  public static Supplier<Binary> binaryStringGenerator(long seed) {
+    final StringGenerator generator = new StringGenerator(seed);
+    return generator::nextBinaryValue;
+  }
+
+  public static Supplier<Binary> int96Generator(long seed) {
+    final Int96Generator generator = new Int96Generator(seed);
+    return generator::nextBinaryValue;
+  }
+
+  public static <T extends Comparable<T>> Supplier<T> wrapSorted(Supplier<T> supplier,
+      int recordCount, boolean ascending) {
+    return wrapSorted(supplier, recordCount, ascending, (a, b) -> a.compareTo(b));
+  }
+
+  public static <T> Supplier<T> wrapSorted(Supplier<T> supplier, int recordCount, boolean ascending,
+      Comparator<T> cmp) {
+    List<T> values = new ArrayList<>(recordCount);
+    for (int i = 0; i < recordCount; ++i) {
+      values.add(supplier.get());
+    }
+    if (ascending) {
+      values.sort(cmp);
+    } else {
+      values.sort((a, b) -> cmp.compare(b, a));
+    }
+    final Iterator<T> it = values.iterator();
+    return it::next;
+  }
 }
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/statistics/TestColumnIndexes.java b/parquet-hadoop/src/test/java/org/apache/parquet/statistics/TestColumnIndexes.java
new file mode 100644
index 0000000..aac8e43
--- /dev/null
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/statistics/TestColumnIndexes.java
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.statistics;
+
+import static org.apache.parquet.schema.LogicalTypeAnnotation.bsonType;
+import static org.apache.parquet.schema.LogicalTypeAnnotation.dateType;
+import static org.apache.parquet.schema.LogicalTypeAnnotation.decimalType;
+import static org.apache.parquet.schema.LogicalTypeAnnotation.enumType;
+import static org.apache.parquet.schema.LogicalTypeAnnotation.intType;
+import static org.apache.parquet.schema.LogicalTypeAnnotation.jsonType;
+import static org.apache.parquet.schema.LogicalTypeAnnotation.stringType;
+import static org.apache.parquet.schema.LogicalTypeAnnotation.timeType;
+import static org.apache.parquet.schema.LogicalTypeAnnotation.timestampType;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FLOAT;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT96;
+import static org.apache.parquet.schema.Type.Repetition.OPTIONAL;
+import static org.apache.parquet.schema.Type.Repetition.REQUIRED;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Random;
+import java.util.function.Supplier;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.GroupFactory;
+import org.apache.parquet.example.data.simple.SimpleGroupFactory;
+import org.apache.parquet.hadoop.ColumnIndexValidator;
+import org.apache.parquet.hadoop.ColumnIndexValidator.ContractViolation;
+import org.apache.parquet.hadoop.ParquetOutputFormat;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.example.ExampleParquetWriter;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.LogicalTypeAnnotation.TimeUnit;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.OriginalType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Type;
+import org.apache.parquet.schema.Types;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.io.Files;
+
+@RunWith(Parameterized.class)
+public class TestColumnIndexes {
+  private static final int MAX_TOTAL_ROWS = 100_000;
+  private static final MessageType SCHEMA = new MessageType("schema",
+      new PrimitiveType(OPTIONAL, INT32, "i32"),
+      new PrimitiveType(OPTIONAL, INT64, "i64"),
+      new PrimitiveType(OPTIONAL, INT96, "i96"),
+      new PrimitiveType(OPTIONAL, FLOAT, "sngl"),
+      new PrimitiveType(OPTIONAL, DOUBLE, "dbl"),
+      new PrimitiveType(OPTIONAL, BINARY, "strings"),
+      new PrimitiveType(OPTIONAL, BINARY, "binary"),
+      new PrimitiveType(OPTIONAL, FIXED_LEN_BYTE_ARRAY, 17, "fixed-binary"),
+      new PrimitiveType(REQUIRED, INT32, "unconstrained-i32"),
+      new PrimitiveType(REQUIRED, INT64, "unconstrained-i64"),
+      new PrimitiveType(REQUIRED, FLOAT, "unconstrained-sngl"),
+      new PrimitiveType(REQUIRED, DOUBLE, "unconstrained-dbl"),
+      Types.optional(INT32).as(intType(8, true)).named("int8"),
+      Types.optional(INT32).as(intType(8, false)).named("uint8"),
+      Types.optional(INT32).as(intType(16, true)).named("int16"),
+      Types.optional(INT32).as(intType(16, false)).named("uint16"),
+      Types.optional(INT32).as(intType(32, true)).named("int32"),
+      Types.optional(INT32).as(intType(32, false)).named("uint32"),
+      Types.optional(INT64).as(intType(64, true)).named("int64"),
+      Types.optional(INT64).as(intType(64, false)).named("uint64"),
+      Types.optional(INT32).as(decimalType(2, 9)).named("decimal-int32"),
+      Types.optional(INT64).as(decimalType(4, 18)).named("decimal-int64"),
+      Types.optional(FIXED_LEN_BYTE_ARRAY).length(19).as(decimalType(25, 45)).named("decimal-fixed"),
+      Types.optional(BINARY).as(decimalType(20, 38)).named("decimal-binary"),
+      Types.optional(BINARY).as(stringType()).named("utf8"),
+      Types.optional(BINARY).as(enumType()).named("enum"),
+      Types.optional(BINARY).as(jsonType()).named("json"),
+      Types.optional(BINARY).as(bsonType()).named("bson"),
+      Types.optional(INT32).as(dateType()).named("date"),
+      Types.optional(INT32).as(timeType(true, TimeUnit.MILLIS)).named("time-millis"),
+      Types.optional(INT64).as(timeType(false, TimeUnit.MICROS)).named("time-micros"),
+      Types.optional(INT64).as(timestampType(true, TimeUnit.MILLIS)).named("timestamp-millis"),
+      Types.optional(INT64).as(timestampType(false, TimeUnit.NANOS)).named("timestamp-nanos"),
+      Types.optional(FIXED_LEN_BYTE_ARRAY).length(12).as(OriginalType.INTERVAL).named("interval"),
+      Types.optional(BINARY).as(stringType()).named("always-null"));
+
+  private static List<Supplier<?>> buildGenerators(int recordCount, Random random) {
+    int fieldIndex = 0;
+    return Arrays.<Supplier<?>>asList(
+        sortedOrRandom(new RandomValues.IntGenerator(random.nextLong()), random, recordCount, fieldIndex++),
+        sortedOrRandom(new RandomValues.LongGenerator(random.nextLong()), random, recordCount, fieldIndex++),
+        sortedOrRandom(RandomValues.int96Generator(random.nextLong()), random, recordCount, fieldIndex++),
+        sortedOrRandom(new RandomValues.FloatGenerator(random.nextLong()), random, recordCount, fieldIndex++),
+        sortedOrRandom(new RandomValues.DoubleGenerator(random.nextLong()), random, recordCount, fieldIndex++),
+        sortedOrRandom(RandomValues.binaryStringGenerator(random.nextLong()), random, recordCount, fieldIndex++),
+        sortedOrRandom(new RandomValues.BinaryGenerator(random.nextLong()), random, recordCount, fieldIndex++),
+        sortedOrRandom(new RandomValues.FixedGenerator(random.nextLong(), 17), random, recordCount, fieldIndex++),
+        sortedOrRandom(new RandomValues.UnconstrainedIntGenerator(random.nextLong()), random, recordCount,
+            fieldIndex++),
+        sortedOrRandom(new RandomValues.UnconstrainedLongGenerator(random.nextLong()), random, recordCount,
+            fieldIndex++),
+        sortedOrRandom(new RandomValues.UnconstrainedFloatGenerator(random.nextLong()), random, recordCount,
+            fieldIndex++),
+        sortedOrRandom(new RandomValues.UnconstrainedDoubleGenerator(random.nextLong()), random, recordCount,
+            fieldIndex++),
+        sortedOrRandom(new RandomValues.IntGenerator(random.nextLong(), Byte.MIN_VALUE, Byte.MAX_VALUE), random,
+            recordCount, fieldIndex++),
+        sortedOrRandom(new RandomValues.UIntGenerator(random.nextLong(), Byte.MIN_VALUE, Byte.MAX_VALUE), random,
+            recordCount, fieldIndex++),
+        sortedOrRandom(new RandomValues.IntGenerator(random.nextLong(), Short.MIN_VALUE, Short.MAX_VALUE), random,
+            recordCount, fieldIndex++),
+        sortedOrRandom(new RandomValues.UIntGenerator(random.nextLong(), Short.MIN_VALUE, Short.MAX_VALUE), random,
+            recordCount, fieldIndex++),
+        sortedOrRandom(new RandomValues.UnconstrainedIntGenerator(random.nextLong()), random, recordCount,
+            fieldIndex++),
+        sortedOrRandom(new RandomValues.UnconstrainedIntGenerator(random.nextLong()), random, recordCount,
+            fieldIndex++),
+        sortedOrRandom(new RandomValues.UnconstrainedLongGenerator(random.nextLong()), random, recordCount,
+            fieldIndex++),
+        sortedOrRandom(new RandomValues.UnconstrainedLongGenerator(random.nextLong()), random, recordCount,
+            fieldIndex++),
+        sortedOrRandom(new RandomValues.UnconstrainedIntGenerator(random.nextLong()), random, recordCount,
+            fieldIndex++),
+        sortedOrRandom(new RandomValues.UnconstrainedLongGenerator(random.nextLong()), random, recordCount,
+            fieldIndex++),
+        sortedOrRandom(new RandomValues.FixedGenerator(random.nextLong(), 19), random, recordCount, fieldIndex++),
+        sortedOrRandom(new RandomValues.BinaryGenerator(random.nextLong()), random, recordCount, fieldIndex++),
+        sortedOrRandom(RandomValues.binaryStringGenerator(random.nextLong()), random, recordCount, fieldIndex++),
+        sortedOrRandom(RandomValues.binaryStringGenerator(random.nextLong()), random, recordCount, fieldIndex++),
+        sortedOrRandom(RandomValues.binaryStringGenerator(random.nextLong()), random, recordCount, fieldIndex++),
+        sortedOrRandom(new RandomValues.BinaryGenerator(random.nextLong()), random, recordCount, fieldIndex++),
+        sortedOrRandom(new RandomValues.IntGenerator(random.nextLong()), random, recordCount, fieldIndex++),
+        sortedOrRandom(new RandomValues.IntGenerator(random.nextLong()), random, recordCount, fieldIndex++),
+        sortedOrRandom(new RandomValues.LongGenerator(random.nextLong()), random, recordCount, fieldIndex++),
+        sortedOrRandom(new RandomValues.LongGenerator(random.nextLong()), random, recordCount, fieldIndex++),
+        sortedOrRandom(new RandomValues.LongGenerator(random.nextLong()), random, recordCount, fieldIndex++),
+        sortedOrRandom(new RandomValues.FixedGenerator(random.nextLong(), 12), random, recordCount, fieldIndex++),
+        null);
+  }
+
+  private static <T> Supplier<T> sortedOrRandom(Supplier<T> generator, Random random, int recordCount, int fieldIndex) {
+    Comparator<T> cmp = SCHEMA.getType(fieldIndex).asPrimitiveType().comparator();
+
+    // 20% chance for ascending, 20% for descending, 60% to remain random
+    switch (random.nextInt(5)) {
+    case 1:
+      return RandomValues.wrapSorted(generator, recordCount, true, cmp);
+    case 2:
+      return RandomValues.wrapSorted(generator, recordCount, false, cmp);
+    default:
+      return generator;
+    }
+  }
+
+  public static class WriteContext {
+    private static final GroupFactory FACTORY = new SimpleGroupFactory(SCHEMA);
+    private final long seed;
+    private final int pageRowCountLimit;
+    private final int columnIndexTruncateLength;
+
+    private WriteContext(long seed, int pageRowCountLimit, int columnIndexTruncateLength) {
+      this.seed = seed;
+      this.pageRowCountLimit = pageRowCountLimit;
+      this.columnIndexTruncateLength = columnIndexTruncateLength;
+    }
+
+    public Path write(Path directory) throws IOException {
+      Path file = new Path(directory, "testColumnIndexes_" + this + ".parquet");
+      Random random = new Random(seed);
+      int recordCount = random.nextInt(MAX_TOTAL_ROWS) + 1;
+      List<Supplier<?>> generators = buildGenerators(recordCount, random);
+      Configuration conf = new Configuration();
+      ParquetOutputFormat.setColumnIndexTruncateLength(conf, columnIndexTruncateLength);
+      try (ParquetWriter<Group> writer = ExampleParquetWriter.builder(file)
+          .withType(SCHEMA)
+          .withPageRowCountLimit(pageRowCountLimit)
+          .withConf(conf)
+          .build()) {
+        for (int i = 0; i < recordCount; i++) {
+          writer.write(createGroup(generators, random));
+        }
+      }
+      return file;
+    }
+
+    private Group createGroup(List<Supplier<?>> generators, Random random) {
+      Group group = FACTORY.newGroup();
+      for (int column = 0, columnCnt = SCHEMA.getFieldCount(); column < columnCnt; ++column) {
+        Type type = SCHEMA.getType(column);
+        Supplier<?> generator = generators.get(column);
+        // 2% chance of null value for an optional column
+        if (generator == null || (type.isRepetition(OPTIONAL) && random.nextInt(50) == 0)) {
+          continue;
+        }
+        switch (type.asPrimitiveType().getPrimitiveTypeName()) {
+        case BINARY:
+        case FIXED_LEN_BYTE_ARRAY:
+        case INT96:
+          group.append(type.getName(), (Binary) generator.get());
+          break;
+        case INT32:
+          group.append(type.getName(), (Integer) generator.get());
+          break;
+        case INT64:
+          group.append(type.getName(), (Long) generator.get());
+          break;
+        case FLOAT:
+          group.append(type.getName(), (Float) generator.get());
+          break;
+        case DOUBLE:
+          group.append(type.getName(), (Double) generator.get());
+          break;
+        case BOOLEAN:
+          group.append(type.getName(), (Boolean) generator.get());
+          break;
+        }
+      }
+      return group;
+    }
+
+    @Override
+    public String toString() {
+      return "seed=" + seed
+          + ",pageRowCountLimit=" + pageRowCountLimit
+          + ",columnIndexTruncateLength=" + columnIndexTruncateLength;
+    }
+  }
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(TestColumnIndexes.class);
+
+  @Rule
+  public TemporaryFolder tmp = new TemporaryFolder();
+
+  @Parameters
+  public static Collection<WriteContext> getContexts() {
+    return Arrays.asList(
+        new WriteContext(System.nanoTime(), 1000, 8),
+        new WriteContext(System.nanoTime(), 20000, 64),
+        new WriteContext(System.nanoTime(), 50000, 10));
+  }
+
+  public TestColumnIndexes(WriteContext context) {
+    this.context = context;
+  }
+
+  private final WriteContext context;
+
+  @Test
+  public void testColumnIndexes() throws IOException {
+    LOGGER.info("Starting test with context: {}", context);
+
+    Path file = null;
+    try {
+      file = context.write(new Path(tmp.getRoot().getAbsolutePath()));
+      LOGGER.info("Parquet file \"{}\" is successfully created for the context: {}", file, context);
+
+      List<ContractViolation> violations = ColumnIndexValidator
+          .checkContractViolations(HadoopInputFile.fromPath(file, new Configuration()));
+      assertTrue(violations.toString(), violations.isEmpty());
+    } finally {
+      if (file != null) {
+        file.getFileSystem(new Configuration()).delete(file, false);
+      }
+    }
+  }
+}