You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@parquet.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/07/09 13:34:01 UTC

[jira] [Commented] (PARQUET-1214) Column indexes: Truncate min/max values

    [ https://issues.apache.org/jira/browse/PARQUET-1214?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16536922#comment-16536922 ] 

ASF GitHub Bot commented on PARQUET-1214:
-----------------------------------------

gszadovszky closed pull request #481: PARQUET-1214: Column indexes: Truncate min/max values
URL: https://github.com/apache/parquet-mr/pull/481
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java
index f965511da..b17323933 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java
@@ -47,6 +47,7 @@
   public static final boolean DEFAULT_ESTIMATE_ROW_COUNT_FOR_PAGE_SIZE_CHECK = true;
   public static final int DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK = 100;
   public static final int DEFAULT_MAXIMUM_RECORD_COUNT_FOR_CHECK = 10000;
+  public static final int DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH = 64;
 
   public static final ValuesWriterFactory DEFAULT_VALUES_WRITER_FACTORY = new DefaultValuesWriterFactory();
 
@@ -83,10 +84,11 @@ public static WriterVersion fromString(String name) {
   private final boolean estimateNextSizeCheck;
   private final ByteBufferAllocator allocator;
   private final ValuesWriterFactory valuesWriterFactory;
+  private final int columnIndexTruncateLength;
 
   private ParquetProperties(WriterVersion writerVersion, int pageSize, int dictPageSize, boolean enableDict, int minRowCountForPageSizeCheck,
                             int maxRowCountForPageSizeCheck, boolean estimateNextSizeCheck, ByteBufferAllocator allocator,
-                            ValuesWriterFactory writerFactory) {
+                            ValuesWriterFactory writerFactory, int columnIndexMinMaxTruncateLength) {
     this.pageSizeThreshold = pageSize;
     this.initialSlabSize = CapacityByteArrayOutputStream
       .initialSlabSizeHeuristic(MIN_SLAB_SIZE, pageSizeThreshold, 10);
@@ -99,6 +101,7 @@ private ParquetProperties(WriterVersion writerVersion, int pageSize, int dictPag
     this.allocator = allocator;
 
     this.valuesWriterFactory = writerFactory;
+    this.columnIndexTruncateLength = columnIndexMinMaxTruncateLength;
   }
 
   public ValuesWriter newRepetitionLevelWriter(ColumnDescriptor path) {
@@ -183,6 +186,10 @@ public ValuesWriterFactory getValuesWriterFactory() {
     return valuesWriterFactory;
   }
 
+  public int getColumnIndexTruncateLength() {
+    return columnIndexTruncateLength;
+  }
+
   public boolean estimateNextSizeCheck() {
     return estimateNextSizeCheck;
   }
@@ -205,6 +212,7 @@ public static Builder copy(ParquetProperties toCopy) {
     private boolean estimateNextSizeCheck = DEFAULT_ESTIMATE_ROW_COUNT_FOR_PAGE_SIZE_CHECK;
     private ByteBufferAllocator allocator = new HeapByteBufferAllocator();
     private ValuesWriterFactory valuesWriterFactory = DEFAULT_VALUES_WRITER_FACTORY;
+    private int columnIndexTruncateLength = DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH;
 
     private Builder() {
     }
@@ -299,11 +307,17 @@ public Builder withValuesWriterFactory(ValuesWriterFactory factory) {
       return this;
     }
 
+    public Builder withColumnIndexTruncateLength(int length) {
+      Preconditions.checkArgument(length > 0, "Invalid column index min/max truncate length (negative) : %s", length);
+      this.columnIndexTruncateLength = length;
+      return this;
+    }
+
     public ParquetProperties build() {
       ParquetProperties properties =
         new ParquetProperties(writerVersion, pageSize, dictPageSize,
           enableDict, minRowCountForPageSizeCheck, maxRowCountForPageSizeCheck,
-          estimateNextSizeCheck, allocator, valuesWriterFactory);
+          estimateNextSizeCheck, allocator, valuesWriterFactory, columnIndexTruncateLength);
       // we pass a constructed but uninitialized factory to ParquetProperties above as currently
       // creation of ValuesWriters is invoked from within ParquetProperties. In the future
       // we'd like to decouple that and won't need to pass an object to properties and then pass the
diff --git a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/BinaryColumnIndexBuilder.java b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/BinaryColumnIndexBuilder.java
index 12ed7b4f8..950b70f54 100644
--- a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/BinaryColumnIndexBuilder.java
+++ b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/BinaryColumnIndexBuilder.java
@@ -58,6 +58,8 @@ String getMaxValueAsString(int pageIndex) {
 
   private final List<Binary> minValues = new ArrayList<>();
   private final List<Binary> maxValues = new ArrayList<>();
+  private final BinaryTruncator truncator;
+  private final int truncateLength;
 
   private static Binary convert(ByteBuffer buffer) {
     return Binary.fromReusedByteBuffer(buffer);
@@ -67,6 +69,11 @@ private static ByteBuffer convert(Binary value) {
     return value.toByteBuffer();
   }
 
+  BinaryColumnIndexBuilder(PrimitiveType type, int truncateLength) {
+    truncator = BinaryTruncator.getTruncator(type);
+    this.truncateLength = truncateLength;
+  }
+
   @Override
   void addMinMaxFromBytes(ByteBuffer min, ByteBuffer max) {
     minValues.add(min == null ? null : convert(min));
@@ -75,8 +82,8 @@ void addMinMaxFromBytes(ByteBuffer min, ByteBuffer max) {
 
   @Override
   void addMinMax(Object min, Object max) {
-    minValues.add((Binary) min);
-    maxValues.add((Binary) max);
+    minValues.add(min == null ? null : truncator.truncateMin((Binary) min, truncateLength));
+    maxValues.add(max == null ? null : truncator.truncateMax((Binary) max, truncateLength));
   }
 
   @Override
diff --git a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/BinaryTruncator.java b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/BinaryTruncator.java
new file mode 100644
index 000000000..bcc43fb86
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/BinaryTruncator.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.internal.column.columnindex;
+
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.nio.charset.Charset;
+import java.nio.charset.CharsetDecoder;
+import java.nio.charset.CoderResult;
+import java.nio.charset.CodingErrorAction;
+import java.nio.charset.StandardCharsets;
+
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.OriginalType;
+import org.apache.parquet.schema.PrimitiveType;
+
+/**
+ * Class for truncating min/max values for binary types.
+ */
+abstract class BinaryTruncator {
+  enum Validity {
+    VALID, MALFORMED, UNMAPPABLE;
+  }
+
+  private static class CharsetValidator {
+    private final CharBuffer dummyBuffer = CharBuffer.allocate(1024);
+    private final CharsetDecoder decoder;
+
+    CharsetValidator(Charset charset) {
+      decoder = charset.newDecoder();
+      decoder.onMalformedInput(CodingErrorAction.REPORT);
+      decoder.onUnmappableCharacter(CodingErrorAction.REPORT);
+    }
+
+    Validity checkValidity(ByteBuffer buffer) {
+      int pos = buffer.position();
+      CoderResult result = CoderResult.OVERFLOW;
+      while (result.isOverflow()) {
+        dummyBuffer.clear();
+        result = decoder.decode(buffer, dummyBuffer, true);
+      }
+      buffer.position(pos);
+      if (result.isUnderflow()) {
+        return Validity.VALID;
+      } else if (result.isMalformed()) {
+        return Validity.MALFORMED;
+      } else {
+        return Validity.UNMAPPABLE;
+      }
+    }
+  }
+
+  private static final BinaryTruncator NO_OP_TRUNCATOR = new BinaryTruncator() {
+    @Override
+    Binary truncateMin(Binary minValue, int length) {
+      return minValue;
+    }
+
+    @Override
+    Binary truncateMax(Binary maxValue, int length) {
+      return maxValue;
+    }
+  };
+
+  private static final BinaryTruncator DEFAULT_UTF8_TRUNCATOR = new BinaryTruncator() {
+    private final CharsetValidator validator = new CharsetValidator(StandardCharsets.UTF_8);
+
+    @Override
+    Binary truncateMin(Binary minValue, int length) {
+      if (minValue.length() <= length) {
+        return minValue;
+      }
+      ByteBuffer buffer = minValue.toByteBuffer();
+      byte[] array;
+      if (validator.checkValidity(buffer) == Validity.VALID) {
+        array = truncateUtf8(buffer, length);
+      } else {
+        array = truncate(buffer, length);
+      }
+      return array == null ? minValue : Binary.fromConstantByteArray(array);
+    }
+
+    @Override
+    Binary truncateMax(Binary maxValue, int length) {
+      if (maxValue.length() <= length) {
+        return maxValue;
+      }
+      byte[] array;
+      ByteBuffer buffer = maxValue.toByteBuffer();
+      if (validator.checkValidity(buffer) == Validity.VALID) {
+        array = incrementUtf8(truncateUtf8(buffer, length));
+      } else {
+        array = increment(truncate(buffer, length));
+      }
+      return array == null ? maxValue : Binary.fromConstantByteArray(array);
+    }
+
+    // Simply truncate to length
+    private byte[] truncate(ByteBuffer buffer, int length) {
+      assert length < buffer.remaining();
+      byte[] array = new byte[length];
+      buffer.get(array);
+      return array;
+    }
+
+    // Trying to increment the bytes from the last one to the beginning
+    private byte[] increment(byte[] array) {
+      for (int i = array.length - 1; i >= 0; --i) {
+        byte elem = array[i];
+        ++elem;
+        array[i] = elem;
+        if (elem != 0) { // Did not overflow: 0xFF -> 0x00
+          return array;
+        }
+      }
+      return null;
+    }
+
+    // Truncates the buffer to length or less so the remaining bytes form a valid UTF-8 string
+    private byte[] truncateUtf8(ByteBuffer buffer, int length) {
+      assert length < buffer.remaining();
+      ByteBuffer newBuffer = buffer.slice();
+      newBuffer.limit(newBuffer.position() + length);
+      while (validator.checkValidity(newBuffer) != Validity.VALID) {
+        newBuffer.limit(newBuffer.limit() - 1);
+        if (newBuffer.remaining() == 0) {
+          return null;
+        }
+      }
+      byte[] array = new byte[newBuffer.remaining()];
+      newBuffer.get(array);
+      return array;
+    }
+
+    // Trying to increment the bytes from the last one to the beginning until the bytes form a valid UTF-8 string
+    private byte[] incrementUtf8(byte[] array) {
+      if (array == null) {
+        return null;
+      }
+      ByteBuffer buffer = ByteBuffer.wrap(array);
+      for (int i = array.length - 1; i >= 0; --i) {
+        byte prev = array[i];
+        byte inc = prev;
+        while (++inc != 0) { // Until overflow: 0xFF -> 0x00
+          array[i] = inc;
+          switch (validator.checkValidity(buffer)) {
+            case VALID:
+              return array;
+            case UNMAPPABLE:
+              continue; // Increment the i byte once more
+            case MALFORMED:
+              break; // Stop incrementing the i byte; go to the i-1
+          }
+          break; // MALFORMED
+        }
+        array[i] = prev;
+      }
+      return null; // All characters are the largest possible; unable to increment
+    }
+  };
+
+  static BinaryTruncator getTruncator(PrimitiveType type) {
+    if (type == null) {
+      return NO_OP_TRUNCATOR;
+    }
+    switch (type.getPrimitiveTypeName()) {
+      case INT96:
+        return NO_OP_TRUNCATOR;
+      case BINARY:
+      case FIXED_LEN_BYTE_ARRAY:
+        OriginalType originalType = type.getOriginalType();
+        if (originalType == null) {
+          return DEFAULT_UTF8_TRUNCATOR;
+        }
+        switch (originalType) {
+          case UTF8:
+          case ENUM:
+          case JSON:
+          case BSON:
+            return DEFAULT_UTF8_TRUNCATOR;
+          default:
+            return NO_OP_TRUNCATOR;
+        }
+      default:
+        throw new IllegalArgumentException("No truncator is available for the type: " + type);
+    }
+  }
+
+  abstract Binary truncateMin(Binary minValue, int length);
+
+  abstract Binary truncateMax(Binary maxValue, int length);
+}
diff --git a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndexBuilder.java b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndexBuilder.java
index aa0502ba1..6d05558ec 100644
--- a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndexBuilder.java
+++ b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndexBuilder.java
@@ -220,20 +220,22 @@ public static ColumnIndexBuilder getNoOpBuilder() {
   /**
    * @param type
    *          the type this builder is to be created for
+   * @param truncateLength
+   *          the length to be used for truncating binary values if possible
    * @return a {@link ColumnIndexBuilder} instance to be used for creating {@link ColumnIndex} objects
    */
-  public static ColumnIndexBuilder getBuilder(PrimitiveType type) {
-    ColumnIndexBuilder builder = createNewBuilder(type.getPrimitiveTypeName());
+  public static ColumnIndexBuilder getBuilder(PrimitiveType type, int truncateLength) {
+    ColumnIndexBuilder builder = createNewBuilder(type, truncateLength);
     builder.type = type;
     return builder;
   }
 
-  private static ColumnIndexBuilder createNewBuilder(PrimitiveTypeName type) {
-    switch (type) {
+  private static ColumnIndexBuilder createNewBuilder(PrimitiveType type, int truncateLength) {
+    switch (type.getPrimitiveTypeName()) {
       case BINARY:
       case FIXED_LEN_BYTE_ARRAY:
       case INT96:
-        return new BinaryColumnIndexBuilder();
+        return new BinaryColumnIndexBuilder(type, truncateLength);
       case BOOLEAN:
         return new BooleanColumnIndexBuilder();
       case DOUBLE:
@@ -276,7 +278,7 @@ public static ColumnIndex build(
     PrimitiveTypeName typeName = type.getPrimitiveTypeName();
     ColumnIndexBuilder builder = BUILDERS.get(typeName);
     if (builder == null) {
-      builder = createNewBuilder(typeName);
+      builder = createNewBuilder(type, Integer.MAX_VALUE);
       BUILDERS.put(typeName, builder);
     }
 
diff --git a/parquet-column/src/test/java/org/apache/parquet/internal/column/columnindex/TestBinaryTruncator.java b/parquet-column/src/test/java/org/apache/parquet/internal/column/columnindex/TestBinaryTruncator.java
new file mode 100644
index 000000000..c3e3d8574
--- /dev/null
+++ b/parquet-column/src/test/java/org/apache/parquet/internal/column/columnindex/TestBinaryTruncator.java
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.internal.column.columnindex;
+
+import static org.apache.parquet.schema.OriginalType.BSON;
+import static org.apache.parquet.schema.OriginalType.DECIMAL;
+import static org.apache.parquet.schema.OriginalType.ENUM;
+import static org.apache.parquet.schema.OriginalType.INTERVAL;
+import static org.apache.parquet.schema.OriginalType.JSON;
+import static org.apache.parquet.schema.OriginalType.UTF8;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT96;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+
+import java.nio.charset.CharacterCodingException;
+import java.nio.charset.CharsetDecoder;
+import java.nio.charset.CodingErrorAction;
+import java.nio.charset.StandardCharsets;
+import java.util.Comparator;
+import java.util.Random;
+
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.PrimitiveStringifier;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Types;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Tests for {@link BinaryTruncator}
+ */
+public class TestBinaryTruncator {
+
+  private static final Logger LOG = LoggerFactory.getLogger(TestBinaryTruncator.class);
+  private static final PrimitiveStringifier HEXA_STRINGIFIER = Types.required(BINARY)
+      .named("dummy_type").stringifier();
+  private static final Random RANDOM = new Random(42);
+  private static final CharsetDecoder UTF8_DECODER = StandardCharsets.UTF_8.newDecoder();
+  static {
+    UTF8_DECODER.onMalformedInput(CodingErrorAction.REPORT);
+    UTF8_DECODER.onUnmappableCharacter(CodingErrorAction.REPORT);
+  }
+
+  // The maximum values in UTF-8 for the 1, 2, 3 and 4 bytes representations
+  private static final String UTF8_1BYTE_MAX_CHAR = "\u007F";
+  private static final String UTF8_2BYTES_MAX_CHAR = "\u07FF";
+  private static final String UTF8_3BYTES_MAX_CHAR = "\uFFFF";
+  private static final String UTF8_4BYTES_MAX_CHAR = "\uDBFF\uDFFF";
+
+  @Test
+  public void testNonStringTruncate() {
+    BinaryTruncator truncator = BinaryTruncator
+        .getTruncator(Types.required(BINARY).as(DECIMAL).precision(10).scale(2).named("test_binary_decimal"));
+    assertEquals(binary(0xFF, 0xFE, 0xFD, 0xFC, 0xFB, 0xFA),
+        truncator.truncateMin(binary(0xFF, 0xFE, 0xFD, 0xFC, 0xFB, 0xFA), 2));
+    assertEquals(binary(0x01, 0x02, 0x03, 0x04, 0x05, 0x06),
+        truncator.truncateMax(binary(0x01, 0x02, 0x03, 0x04, 0x05, 0x06), 2));
+  }
+
+  @Test
+  public void testContractNonStringTypes() {
+    testTruncator(
+        Types.required(FIXED_LEN_BYTE_ARRAY).length(8).as(DECIMAL).precision(18).scale(4).named("test_fixed_decimal"),
+        false);
+    testTruncator(Types.required(FIXED_LEN_BYTE_ARRAY).length(12).as(INTERVAL).named("test_fixed_interval"), false);
+    testTruncator(Types.required(BINARY).as(DECIMAL).precision(10).scale(2).named("test_binary_decimal"), false);
+    testTruncator(Types.required(INT96).named("test_int96"), false);
+  }
+
+  @Test
+  public void testStringTruncate() {
+    BinaryTruncator truncator = BinaryTruncator.getTruncator(Types.required(BINARY).as(UTF8).named("test_utf8"));
+
+    // Truncate 1 byte characters
+    assertEquals(Binary.fromString("abc"), truncator.truncateMin(Binary.fromString("abcdef"), 3));
+    assertEquals(Binary.fromString("abd"), truncator.truncateMax(Binary.fromString("abcdef"), 3));
+
+    // Truncate 1-2 bytes characters; the target length is "inside" a UTF-8 character
+    assertEquals(Binary.fromString("árvízt"), truncator.truncateMin(Binary.fromString("árvíztűrő"), 9));
+    assertEquals(Binary.fromString("árvízu"), truncator.truncateMax(Binary.fromString("árvíztűrő"), 9));
+
+    // Truncate highest UTF-8 values -> unable to increment
+    assertEquals(
+        Binary.fromString(
+            UTF8_1BYTE_MAX_CHAR
+                + UTF8_2BYTES_MAX_CHAR),
+        truncator.truncateMin(Binary.fromString(
+            UTF8_1BYTE_MAX_CHAR
+                + UTF8_2BYTES_MAX_CHAR
+                + UTF8_3BYTES_MAX_CHAR
+                + UTF8_4BYTES_MAX_CHAR),
+            5));
+    assertEquals(
+        Binary.fromString(
+            UTF8_1BYTE_MAX_CHAR
+                + UTF8_2BYTES_MAX_CHAR
+                + UTF8_3BYTES_MAX_CHAR
+                + UTF8_4BYTES_MAX_CHAR),
+        truncator.truncateMax(Binary.fromString(
+            UTF8_1BYTE_MAX_CHAR
+                + UTF8_2BYTES_MAX_CHAR
+                + UTF8_3BYTES_MAX_CHAR
+                + UTF8_4BYTES_MAX_CHAR),
+            5));
+
+    // Truncate highest UTF-8 values at the end -> increment the first possible character
+    assertEquals(
+        Binary.fromString(
+            UTF8_1BYTE_MAX_CHAR
+                + UTF8_2BYTES_MAX_CHAR
+                + "b"
+                + UTF8_3BYTES_MAX_CHAR),
+        truncator.truncateMax(Binary.fromString(
+            UTF8_1BYTE_MAX_CHAR
+                + UTF8_2BYTES_MAX_CHAR
+                + "a"
+                + UTF8_3BYTES_MAX_CHAR
+                + UTF8_4BYTES_MAX_CHAR),
+            10));
+
+    // Truncate invalid UTF-8 values -> truncate without validity check
+    assertEquals(binary(0xFF, 0xFE, 0xFD), truncator.truncateMin(binary(0xFF, 0xFE, 0xFD, 0xFC, 0xFB, 0xFA), 3));
+    assertEquals(binary(0xFF, 0xFE, 0xFE), truncator.truncateMax(binary(0xFF, 0xFE, 0xFD, 0xFC, 0xFB, 0xFA), 3));
+    assertEquals(binary(0xFF, 0xFE, 0xFE, 0x00, 0x00), truncator.truncateMax(binary(0xFF, 0xFE, 0xFD, 0xFF, 0xFF, 0xFF), 5));
+  }
+
+  @Test
+  public void testContractStringTypes() {
+    testTruncator(Types.required(BINARY).named("test_binary"), true);
+    testTruncator(Types.required(BINARY).as(UTF8).named("test_utf8"), true);
+    testTruncator(Types.required(BINARY).as(ENUM).named("test_enum"), true);
+    testTruncator(Types.required(BINARY).as(JSON).named("test_json"), true);
+    testTruncator(Types.required(BINARY).as(BSON).named("test_bson"), true);
+    testTruncator(Types.required(FIXED_LEN_BYTE_ARRAY).length(5).named("test_fixed"), true);
+  }
+
+  private void testTruncator(PrimitiveType type, boolean strict) {
+    BinaryTruncator truncator = BinaryTruncator.getTruncator(type);
+    Comparator<Binary> comparator = type.comparator();
+
+    checkContract(truncator, comparator, Binary.fromString("aaaaaaaaaa"), strict, strict);
+    checkContract(truncator, comparator, Binary.fromString("árvíztűrő tükörfúrógép"), strict, strict);
+    checkContract(truncator, comparator, Binary.fromString("aaaaaaaaaa" + UTF8_3BYTES_MAX_CHAR), strict, strict);
+    checkContract(truncator, comparator, Binary.fromString("a" + UTF8_3BYTES_MAX_CHAR + UTF8_1BYTE_MAX_CHAR), strict,
+        strict);
+
+    checkContract(truncator, comparator,
+        Binary.fromConstantByteArray(new byte[] { (byte) 0xFE, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, (byte) 0xFF }), strict,
+        strict);
+
+    // Edge case: zero length -> unable to truncate
+    checkContract(truncator, comparator, Binary.fromString(""), false, false);
+    // Edge case: containing only UTF-8 max characters -> unable to truncate for max
+    checkContract(truncator, comparator, Binary.fromString(
+        UTF8_1BYTE_MAX_CHAR +
+            UTF8_4BYTES_MAX_CHAR +
+            UTF8_3BYTES_MAX_CHAR +
+            UTF8_4BYTES_MAX_CHAR +
+            UTF8_2BYTES_MAX_CHAR +
+            UTF8_3BYTES_MAX_CHAR +
+            UTF8_3BYTES_MAX_CHAR +
+            UTF8_1BYTE_MAX_CHAR +
+            UTF8_2BYTES_MAX_CHAR +
+            UTF8_3BYTES_MAX_CHAR +
+            UTF8_4BYTES_MAX_CHAR),
+        strict, false);
+    // Edge case: non-UTF-8; max bytes -> unable to truncate for max
+    checkContract(
+        truncator, comparator,
+        binary(0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF),
+        strict, false);
+  }
+
+  // Checks the contract of truncator
+  // strict means actual truncation is required and the truncated value is a valid UTF-8 string
+  private void checkContract(BinaryTruncator truncator, Comparator<Binary> comparator, Binary value, boolean strictMin,
+      boolean strictMax) {
+    int length = value.length();
+
+    // Edge cases: returning the original value if no truncation is required
+    assertSame(value, truncator.truncateMin(value, length));
+    assertSame(value, truncator.truncateMax(value, length));
+    assertSame(value, truncator.truncateMin(value, random(length + 1, length * 2 + 1)));
+    assertSame(value, truncator.truncateMax(value, random(length + 1, length * 2 + 1)));
+
+    if (length > 1) {
+      checkMinContract(truncator, comparator, value, length - 1, strictMin);
+      checkMaxContract(truncator, comparator, value, length - 1, strictMax);
+      checkMinContract(truncator, comparator, value, random(1, length - 1), strictMin);
+      checkMaxContract(truncator, comparator, value, random(1, length - 1), strictMax);
+    }
+
+    // Edge case: possible to truncate min value to 0 length if original value is not empty
+    checkMinContract(truncator, comparator, value, 0, strictMin);
+    // Edge case: impossible to truncate max value to 0 length -> returning the original value
+    assertSame(value, truncator.truncateMax(value, 0));
+  }
+
+  private void checkMinContract(BinaryTruncator truncator, Comparator<Binary> comparator, Binary value, int length,
+      boolean strict) {
+    Binary truncated = truncator.truncateMin(value, length);
+    LOG.debug("\"{}\" --truncMin({})--> \"{}\" [{}]", value.toStringUsingUTF8(), length, truncated.toStringUsingUTF8(),
+        HEXA_STRINGIFIER.stringify(truncated));
+    assertTrue("truncatedMin(value) should be <= than value", comparator.compare(truncated, value) <= 0);
+    assertFalse("length of truncateMin(value) should not be > than the length of value",
+        truncated.length() > value.length());
+    if (isValidUtf8(value)) {
+      checkValidUtf8(truncated);
+    }
+    if (strict) {
+      assertTrue("length of truncateMin(value) ahould be < than the length of value",
+          truncated.length() < value.length());
+    }
+  }
+
+  private void checkMaxContract(BinaryTruncator truncator, Comparator<Binary> comparator, Binary value, int length,
+      boolean strict) {
+    Binary truncated = truncator.truncateMax(value, length);
+    LOG.debug("\"{}\" --truncMax({})--> \"{}\" [{}]", value.toStringUsingUTF8(), length, truncated.toStringUsingUTF8(),
+        HEXA_STRINGIFIER.stringify(truncated));
+    assertTrue("truncatedMax(value) should be >= than value", comparator.compare(truncated, value) >= 0);
+    assertFalse("length of truncateMax(value) should not be > than the length of value",
+        truncated.length() > value.length());
+    if (isValidUtf8(value)) {
+      checkValidUtf8(truncated);
+    }
+    if (strict) {
+      assertTrue("length of truncateMax(value) ahould be < than the length of value",
+          truncated.length() < value.length());
+    }
+  }
+
+  private static boolean isValidUtf8(Binary binary) {
+    try {
+      UTF8_DECODER.decode(binary.toByteBuffer());
+      return true;
+    } catch (CharacterCodingException e) {
+      return false;
+    }
+  }
+
+  private static void checkValidUtf8(Binary binary) {
+    try {
+      UTF8_DECODER.decode(binary.toByteBuffer());
+    } catch (CharacterCodingException e) {
+      throw new AssertionError("Truncated value should be a valid UTF-8 string", e);
+    }
+  }
+
+  private static int random(int min, int max) {
+    return RANDOM.nextInt(max - min + 1) + min;
+  }
+
+  private static Binary binary(int... unsignedBytes) {
+    byte[] byteArray = new byte[unsignedBytes.length];
+    for (int i = 0, n = byteArray.length; i < n; ++i) {
+      int b = unsignedBytes[i];
+      assert (0xFFFFFF00 & b) == 0;
+      byteArray[i] = (byte) b;
+    }
+    return Binary.fromConstantByteArray(byteArray);
+  }
+
+}
diff --git a/parquet-column/src/test/java/org/apache/parquet/internal/column/columnindex/TestColumnIndexBuilder.java b/parquet-column/src/test/java/org/apache/parquet/internal/column/columnindex/TestColumnIndexBuilder.java
index 5acae97c9..7a5745e7b 100644
--- a/parquet-column/src/test/java/org/apache/parquet/internal/column/columnindex/TestColumnIndexBuilder.java
+++ b/parquet-column/src/test/java/org/apache/parquet/internal/column/columnindex/TestColumnIndexBuilder.java
@@ -65,7 +65,7 @@
   @Test
   public void testBuildBinaryDecimal() {
     PrimitiveType type = Types.required(BINARY).as(DECIMAL).precision(12).scale(2).named("test_binary_decimal");
-    ColumnIndexBuilder builder = ColumnIndexBuilder.getBuilder(type);
+    ColumnIndexBuilder builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
     assertThat(builder, instanceOf(BinaryColumnIndexBuilder.class));
     assertNull(builder.build());
 
@@ -103,7 +103,7 @@ public void testBuildBinaryDecimal() {
         null,
         decimalBinary("87656273"));
 
-    builder = ColumnIndexBuilder.getBuilder(type);
+    builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
     sb = new StatsBuilder();
     builder.add(sb.stats(type, null, null, null, null));
     builder.add(sb.stats(type, decimalBinary("-9999293.23"), decimalBinary("-234.23")));
@@ -138,7 +138,7 @@ public void testBuildBinaryDecimal() {
         decimalBinary("1234567890.12"),
         null);
 
-    builder = ColumnIndexBuilder.getBuilder(type);
+    builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
     sb = new StatsBuilder();
     builder.add(sb.stats(type, null, null, null));
     builder.add(sb.stats(type, null, null));
@@ -177,7 +177,7 @@ public void testBuildBinaryDecimal() {
   @Test
   public void testBuildBinaryUtf8() {
     PrimitiveType type = Types.required(BINARY).as(UTF8).named("test_binary_utf8");
-    ColumnIndexBuilder builder = ColumnIndexBuilder.getBuilder(type);
+    ColumnIndexBuilder builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
     assertThat(builder, instanceOf(BinaryColumnIndexBuilder.class));
     assertNull(builder.build());
 
@@ -215,7 +215,7 @@ public void testBuildBinaryUtf8() {
         stringBinary("Beeblebrox"),
         null);
 
-    builder = ColumnIndexBuilder.getBuilder(type);
+    builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
     sb = new StatsBuilder();
     builder.add(sb.stats(type, stringBinary("Beeblebrox"), stringBinary("Dent"), null, null));
     builder.add(sb.stats(type, null, null));
@@ -250,7 +250,7 @@ public void testBuildBinaryUtf8() {
         stringBinary("Slartibartfast"),
         null);
 
-    builder = ColumnIndexBuilder.getBuilder(type);
+    builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
     sb = new StatsBuilder();
     builder.add(sb.stats(type, null, null));
     builder.add(sb.stats(type, stringBinary("Slartibartfast")));
@@ -337,11 +337,11 @@ public void testStaticBuildBinary() {
   @Test
   public void testBuildBoolean() {
     PrimitiveType type = Types.required(BOOLEAN).named("test_boolean");
-    ColumnIndexBuilder builder = ColumnIndexBuilder.getBuilder(type);
+    ColumnIndexBuilder builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
     assertThat(builder, instanceOf(BooleanColumnIndexBuilder.class));
     assertNull(builder.build());
 
-    builder = ColumnIndexBuilder.getBuilder(type);
+    builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
     StatsBuilder sb = new StatsBuilder();
     builder.add(sb.stats(type, false, true));
     builder.add(sb.stats(type, true, false, null));
@@ -357,7 +357,7 @@ public void testBuildBoolean() {
     assertCorrectValues(columnIndex.getMaxValues(), true, true, true, null, false);
     assertCorrectValues(columnIndex.getMinValues(), false, false, true, null, false);
 
-    builder = ColumnIndexBuilder.getBuilder(type);
+    builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
     sb = new StatsBuilder();
     builder.add(sb.stats(type, null, null));
     builder.add(sb.stats(type, false, false));
@@ -375,7 +375,7 @@ public void testBuildBoolean() {
     assertCorrectValues(columnIndex.getMaxValues(), null, false, null, null, true, true, null);
     assertCorrectValues(columnIndex.getMinValues(), null, false, null, null, false, false, null);
 
-    builder = ColumnIndexBuilder.getBuilder(type);
+    builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
     sb = new StatsBuilder();
     builder.add(sb.stats(type, null, null));
     builder.add(sb.stats(type, true, true));
@@ -413,7 +413,7 @@ public void testStaticBuildBoolean() {
   @Test
   public void testBuildDouble() {
     PrimitiveType type = Types.required(DOUBLE).named("test_double");
-    ColumnIndexBuilder builder = ColumnIndexBuilder.getBuilder(type);
+    ColumnIndexBuilder builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
     assertThat(builder, instanceOf(DoubleColumnIndexBuilder.class));
     assertNull(builder.build());
 
@@ -433,7 +433,7 @@ public void testBuildDouble() {
     assertCorrectValues(columnIndex.getMaxValues(), -4.1, 7.0, 2.2, null, 2.32, 8.1);
     assertCorrectValues(columnIndex.getMinValues(), -4.2, -11.7, 2.2, null, 1.9, -21.0);
 
-    builder = ColumnIndexBuilder.getBuilder(type);
+    builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
     sb = new StatsBuilder();
     builder.add(sb.stats(type, null, null));
     builder.add(sb.stats(type, -532.3, -345.2, null, null));
@@ -453,7 +453,7 @@ public void testBuildDouble() {
     assertCorrectValues(columnIndex.getMaxValues(), null, -345.2, -234.6, null, null, 2.99999, null, 42.83, null);
     assertCorrectValues(columnIndex.getMinValues(), null, -532.3, -234.7, null, null, -234.6, null, 3.0, null);
 
-    builder = ColumnIndexBuilder.getBuilder(type);
+    builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
     sb = new StatsBuilder();
     builder.add(sb.stats(type, null, null, null, null, null));
     builder.add(sb.stats(type, 532.3, 345.2));
@@ -493,7 +493,7 @@ public void testStaticBuildDouble() {
   @Test
   public void testBuildFloat() {
     PrimitiveType type = Types.required(FLOAT).named("test_float");
-    ColumnIndexBuilder builder = ColumnIndexBuilder.getBuilder(type);
+    ColumnIndexBuilder builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
     assertThat(builder, instanceOf(FloatColumnIndexBuilder.class));
     assertNull(builder.build());
 
@@ -513,7 +513,7 @@ public void testBuildFloat() {
     assertCorrectValues(columnIndex.getMaxValues(), -4.1f, 7.0f, 2.2f, null, 2.32f, 8.1f);
     assertCorrectValues(columnIndex.getMinValues(), -4.2f, -11.7f, 2.2f, null, 1.9f, -21.0f);
 
-    builder = ColumnIndexBuilder.getBuilder(type);
+    builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
     sb = new StatsBuilder();
     builder.add(sb.stats(type, null, null));
     builder.add(sb.stats(type, -532.3f, -345.2f, null, null));
@@ -533,7 +533,7 @@ public void testBuildFloat() {
     assertCorrectValues(columnIndex.getMaxValues(), null, -345.2f, -234.7f, null, null, 2.99999f, null, 42.83f, null);
     assertCorrectValues(columnIndex.getMinValues(), null, -532.3f, -300.6f, null, null, -234.6f, null, 3.0f, null);
 
-    builder = ColumnIndexBuilder.getBuilder(type);
+    builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
     sb = new StatsBuilder();
     builder.add(sb.stats(type, null, null, null, null, null));
     builder.add(sb.stats(type, 532.3f, 345.2f));
@@ -573,7 +573,7 @@ public void testStaticBuildFloat() {
   @Test
   public void testBuildInt32() {
     PrimitiveType type = Types.required(INT32).named("test_int32");
-    ColumnIndexBuilder builder = ColumnIndexBuilder.getBuilder(type);
+    ColumnIndexBuilder builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
     assertThat(builder, instanceOf(IntColumnIndexBuilder.class));
     assertNull(builder.build());
 
@@ -593,7 +593,7 @@ public void testBuildInt32() {
     assertCorrectValues(columnIndex.getMaxValues(), 10, 7, 2, null, 2, 8);
     assertCorrectValues(columnIndex.getMinValues(), -4, -11, 2, null, 1, -21);
 
-    builder = ColumnIndexBuilder.getBuilder(type);
+    builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
     sb = new StatsBuilder();
     builder.add(sb.stats(type, null, null));
     builder.add(sb.stats(type, -532, -345, null, null));
@@ -613,7 +613,7 @@ public void testBuildInt32() {
     assertCorrectValues(columnIndex.getMaxValues(), null, -345, -42, null, null, 2, null, 42, null);
     assertCorrectValues(columnIndex.getMinValues(), null, -532, -500, null, null, -42, null, 3, null);
 
-    builder = ColumnIndexBuilder.getBuilder(type);
+    builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
     sb = new StatsBuilder();
     builder.add(sb.stats(type, null, null, null, null, null));
     builder.add(sb.stats(type, 532, 345));
@@ -653,7 +653,7 @@ public void testStaticBuildInt32() {
   @Test
   public void testBuildUInt8() {
     PrimitiveType type = Types.required(INT32).as(UINT_8).named("test_uint8");
-    ColumnIndexBuilder builder = ColumnIndexBuilder.getBuilder(type);
+    ColumnIndexBuilder builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
     assertThat(builder, instanceOf(IntColumnIndexBuilder.class));
     assertNull(builder.build());
 
@@ -673,7 +673,7 @@ public void testBuildUInt8() {
     assertCorrectValues(columnIndex.getMaxValues(), 10, 17, 2, null, 0xFF, 0xFA);
     assertCorrectValues(columnIndex.getMinValues(), 4, 11, 2, null, 1, 0xEF);
 
-    builder = ColumnIndexBuilder.getBuilder(type);
+    builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
     sb = new StatsBuilder();
     builder.add(sb.stats(type, null, null));
     builder.add(sb.stats(type, 0, 0, null, null));
@@ -693,7 +693,7 @@ public void testBuildUInt8() {
     assertCorrectValues(columnIndex.getMaxValues(), null, 0, 42, null, null, 0xEE, null, 0xFF, null);
     assertCorrectValues(columnIndex.getMinValues(), null, 0, 0, null, null, 42, null, 0xEF, null);
 
-    builder = ColumnIndexBuilder.getBuilder(type);
+    builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
     sb = new StatsBuilder();
     builder.add(sb.stats(type, null, null, null, null, null));
     builder.add(sb.stats(type, 0xFF, 0xFF));
@@ -717,7 +717,7 @@ public void testBuildUInt8() {
   @Test
   public void testBuildInt64() {
     PrimitiveType type = Types.required(INT64).named("test_int64");
-    ColumnIndexBuilder builder = ColumnIndexBuilder.getBuilder(type);
+    ColumnIndexBuilder builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
     assertThat(builder, instanceOf(LongColumnIndexBuilder.class));
     assertNull(builder.build());
 
@@ -737,7 +737,7 @@ public void testBuildInt64() {
     assertCorrectValues(columnIndex.getMaxValues(), 10l, 7l, 2l, null, 2l, 8l);
     assertCorrectValues(columnIndex.getMinValues(), -4l, -11l, 2l, null, 1l, -21l);
 
-    builder = ColumnIndexBuilder.getBuilder(type);
+    builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
     sb = new StatsBuilder();
     builder.add(sb.stats(type, null, null));
     builder.add(sb.stats(type, -532l, -345l, null, null));
@@ -757,7 +757,7 @@ public void testBuildInt64() {
     assertCorrectValues(columnIndex.getMaxValues(), null, -345l, -42l, null, null, 2l, null, 42l, null);
     assertCorrectValues(columnIndex.getMinValues(), null, -532l, -234l, null, null, -42l, null, -3l, null);
 
-    builder = ColumnIndexBuilder.getBuilder(type);
+    builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
     sb = new StatsBuilder();
     builder.add(sb.stats(type, null, null, null, null, null));
     builder.add(sb.stats(type, 532l, 345l));
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java
index 064649334..29a353fd2 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java
@@ -76,12 +76,13 @@
 
     private ColumnChunkPageWriter(ColumnDescriptor path,
                                   BytesCompressor compressor,
-                                  ByteBufferAllocator allocator) {
+                                  ByteBufferAllocator allocator,
+                                  int columnIndexTruncateLength) {
       this.path = path;
       this.compressor = compressor;
       this.allocator = allocator;
       this.buf = new ConcatenatingByteArrayCollector();
-      this.columnIndexBuilder = ColumnIndexBuilder.getBuilder(path.getPrimitiveType());
+      this.columnIndexBuilder = ColumnIndexBuilder.getBuilder(path.getPrimitiveType(), columnIndexTruncateLength);
       this.offsetIndexBuilder = OffsetIndexBuilder.getBuilder();
     }
 
@@ -273,10 +274,11 @@ public String memUsageString(String prefix) {
   private final Map<ColumnDescriptor, ColumnChunkPageWriter> writers = new HashMap<ColumnDescriptor, ColumnChunkPageWriter>();
   private final MessageType schema;
 
-  public ColumnChunkPageWriteStore(BytesCompressor compressor, MessageType schema, ByteBufferAllocator allocator) {
+  public ColumnChunkPageWriteStore(BytesCompressor compressor, MessageType schema, ByteBufferAllocator allocator,
+      int columnIndexTruncateLength) {
     this.schema = schema;
     for (ColumnDescriptor path : schema.getColumns()) {
-      writers.put(path,  new ColumnChunkPageWriter(path, compressor, allocator));
+      writers.put(path, new ColumnChunkPageWriter(path, compressor, allocator, columnIndexTruncateLength));
     }
   }
 
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java
index d9e9b5e15..1f8a093b1 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java
@@ -20,7 +20,6 @@
 
 import static java.lang.Math.max;
 import static java.lang.Math.min;
-import static java.lang.String.format;
 import static org.apache.parquet.Preconditions.checkNotNull;
 
 import java.io.IOException;
@@ -102,7 +101,8 @@ public ParquetMetadata getFooter() {
   }
 
   private void initStore() {
-    pageStore = new ColumnChunkPageWriteStore(compressor, schema, props.getAllocator());
+    pageStore = new ColumnChunkPageWriteStore(compressor, schema, props.getAllocator(),
+        props.getColumnIndexTruncateLength());
     columnStore = props.newColumnWriteStore(schema, pageStore);
     MessageColumnIO columnIO = new ColumnIOFactory(validating).getColumnIO(schema);
     this.recordConsumer = columnIO.getRecordWriter(columnStore);
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
index 3c85b02ec..3a65624b0 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
@@ -47,6 +47,7 @@
 import org.apache.parquet.column.ColumnDescriptor;
 import org.apache.parquet.column.Encoding;
 import org.apache.parquet.column.EncodingStats;
+import org.apache.parquet.column.ParquetProperties;
 import org.apache.parquet.column.page.DictionaryPage;
 import org.apache.parquet.column.statistics.Statistics;
 import org.apache.parquet.hadoop.ParquetOutputFormat.JobSummaryLevel;
@@ -100,6 +101,7 @@
   private final MessageType schema;
   private final PositionOutputStream out;
   private final AlignmentStrategy alignment;
+  private final int columnIndexTruncateLength;
 
   // file data
   private List<BlockMetaData> blocks = new ArrayList<BlockMetaData>();
@@ -244,10 +246,27 @@ public ParquetFileWriter(Configuration configuration, MessageType schema,
    * @param rowGroupSize the row group size
    * @param maxPaddingSize the maximum padding
    * @throws IOException if the file can not be created
+   * @deprecated will be removed in 2.0.0
    */
+  @Deprecated
   public ParquetFileWriter(OutputFile file, MessageType schema, Mode mode,
                            long rowGroupSize, int maxPaddingSize)
       throws IOException {
+    this(file, schema, mode, rowGroupSize, maxPaddingSize,
+        ParquetProperties.DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH);
+  }
+  /**
+   * @param file OutputFile to create or overwrite
+   * @param schema the schema of the data
+   * @param mode file creation mode
+   * @param rowGroupSize the row group size
+   * @param maxPaddingSize the maximum padding
+   * @param columnIndexTruncateLength the length which the min/max values in column indexes tried to be truncated to
+   * @throws IOException if the file can not be created
+   */
+  public ParquetFileWriter(OutputFile file, MessageType schema, Mode mode,
+                           long rowGroupSize, int maxPaddingSize, int columnIndexTruncateLength)
+      throws IOException {
     TypeUtil.checkValidWriteSchema(schema);
 
     this.schema = schema;
@@ -267,6 +286,7 @@ public ParquetFileWriter(OutputFile file, MessageType schema, Mode mode,
     }
 
     this.encodingStatsBuilder = new EncodingStats.Builder();
+    this.columnIndexTruncateLength = columnIndexTruncateLength;
   }
 
   /**
@@ -289,6 +309,8 @@ public ParquetFileWriter(OutputFile file, MessageType schema, Mode mode,
     this.out = HadoopStreams.wrap(
         fs.create(file, true, 8192, fs.getDefaultReplication(file), rowAndBlockSize));
     this.encodingStatsBuilder = new EncodingStats.Builder();
+    // no truncation is needed for testing
+    this.columnIndexTruncateLength = Integer.MAX_VALUE;
   }
   /**
    * start the file
@@ -342,7 +364,7 @@ public void startColumn(ColumnDescriptor descriptor,
     // The statistics will be copied from the first one added at writeDataPage(s) so we have the correct typed one
     currentStatistics = null;
 
-    columnIndexBuilder = ColumnIndexBuilder.getBuilder(currentChunkType);
+    columnIndexBuilder = ColumnIndexBuilder.getBuilder(currentChunkType, columnIndexTruncateLength);
     offsetIndexBuilder = OffsetIndexBuilder.getBuilder();
     firstPageOffset = -1;
   }
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
index ff5bab397..0789bf50d 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
@@ -143,6 +143,7 @@
   public static final String MIN_ROW_COUNT_FOR_PAGE_SIZE_CHECK = "parquet.page.size.row.check.min";
   public static final String MAX_ROW_COUNT_FOR_PAGE_SIZE_CHECK = "parquet.page.size.row.check.max";
   public static final String ESTIMATE_PAGE_SIZE_CHECK = "parquet.page.size.check.estimate";
+  public static final String COLUMN_INDEX_TRUNCATE_LENGTH = "parquet.columnindex.truncate.length";
 
   public static JobSummaryLevel getJobSummaryLevel(Configuration conf) {
     String level = conf.get(JOB_SUMMARY_LEVEL);
@@ -312,6 +313,18 @@ private static int getMaxPaddingSize(Configuration conf) {
     return conf.getInt(MAX_PADDING_BYTES, ParquetWriter.MAX_PADDING_SIZE_DEFAULT);
   }
 
+  public static void setColumnIndexTruncateLength(JobContext jobContext, int length) {
+    setColumnIndexTruncateLength(getConfiguration(jobContext), length);
+  }
+
+  public static void setColumnIndexTruncateLength(Configuration conf, int length) {
+    conf.setInt(COLUMN_INDEX_TRUNCATE_LENGTH, length);
+  }
+
+  private static int getColumnIndexTruncateLength(Configuration conf) {
+    return conf.getInt(COLUMN_INDEX_TRUNCATE_LENGTH, ParquetProperties.DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH);
+  }
+
   private WriteSupport<T> writeSupport;
   private ParquetOutputCommitter committer;
 
@@ -366,6 +379,7 @@ private static int getMaxPaddingSize(Configuration conf) {
         .estimateRowCountForPageSizeCheck(getEstimatePageSizeCheck(conf))
         .withMinRowCountForPageSizeCheck(getMinRowCountForPageSizeCheck(conf))
         .withMaxRowCountForPageSizeCheck(getMaxRowCountForPageSizeCheck(conf))
+        .withColumnIndexTruncateLength(getColumnIndexTruncateLength(conf))
         .build();
 
     long blockSize = getLongBlockSize(conf);
@@ -383,11 +397,12 @@ private static int getMaxPaddingSize(Configuration conf) {
       LOG.info("Page size checking is: {}", (props.estimateNextSizeCheck() ? "estimated" : "constant"));
       LOG.info("Min row count for page size check is: {}", props.getMinRowCountForPageSizeCheck());
       LOG.info("Max row count for page size check is: {}", props.getMaxRowCountForPageSizeCheck());
+      LOG.info("Truncate length for column indexes is: {}", props.getColumnIndexTruncateLength());
     }
 
     WriteContext init = writeSupport.init(conf);
     ParquetFileWriter w = new ParquetFileWriter(HadoopOutputFile.fromPath(file, conf),
-        init.getSchema(), Mode.CREATE, blockSize, maxPaddingSize);
+        init.getSchema(), Mode.CREATE, blockSize, maxPaddingSize, props.getColumnIndexTruncateLength());
     w.start();
 
     float maxLoad = conf.getFloat(ParquetOutputFormat.MEMORY_POOL_RATIO,
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
index a32df39a5..5b0e4f82d 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
@@ -278,7 +278,7 @@ public ParquetWriter(Path file, Configuration conf, WriteSupport<T> writeSupport
     MessageType schema = writeContext.getSchema();
 
     ParquetFileWriter fileWriter = new ParquetFileWriter(
-        file, schema, mode, rowGroupSize, maxPaddingSize);
+        file, schema, mode, rowGroupSize, maxPaddingSize, encodingProps.getColumnIndexTruncateLength());
     fileWriter.start();
 
     this.codecFactory = new CodecFactory(conf, encodingProps.getPageSizeThreshold());
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/format/converter/TestParquetMetadataConverter.java b/parquet-hadoop/src/test/java/org/apache/parquet/format/converter/TestParquetMetadataConverter.java
index b9382fc0b..90f4a5bdd 100644
--- a/parquet-hadoop/src/test/java/org/apache/parquet/format/converter/TestParquetMetadataConverter.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/format/converter/TestParquetMetadataConverter.java
@@ -918,7 +918,7 @@ public void testOffsetIndexConversion() {
   @Test
   public void testColumnIndexConversion() {
     PrimitiveType type = Types.required(PrimitiveTypeName.INT64).named("test_int64");
-    ColumnIndexBuilder builder = ColumnIndexBuilder.getBuilder(type);
+    ColumnIndexBuilder builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
     Statistics<?> stats = Statistics.createStats(type);
     stats.incrementNumNulls(16);
     stats.updateStats(-100l);
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java
index b78726838..9a27defe1 100644
--- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java
@@ -160,7 +160,8 @@ public void test() throws Exception {
       writer.startBlock(rowCount);
       pageOffset = outputFile.out().getPos();
       {
-        ColumnChunkPageWriteStore store = new ColumnChunkPageWriteStore(compressor(GZIP), schema , new HeapByteBufferAllocator());
+        ColumnChunkPageWriteStore store = new ColumnChunkPageWriteStore(compressor(GZIP), schema,
+            new HeapByteBufferAllocator(), Integer.MAX_VALUE);
         PageWriter pageWriter = store.getPageWriter(col);
         pageWriter.writePageV2(
             rowCount, nullCount, valueCount,
@@ -235,7 +236,7 @@ public void testColumnOrderV1() throws IOException {
     // TODO - look back at this, an allocator was being passed here in the ByteBuffer changes
     // see comment at this constructor
     ColumnChunkPageWriteStore store = new ColumnChunkPageWriteStore(
-        compressor(UNCOMPRESSED), schema, new HeapByteBufferAllocator());
+        compressor(UNCOMPRESSED), schema, new HeapByteBufferAllocator(), Integer.MAX_VALUE);
 
     for (ColumnDescriptor col : schema.getColumns()) {
       PageWriter pageWriter = store.getPageWriter(col);


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Column indexes: Truncate min/max values
> ---------------------------------------
>
>                 Key: PARQUET-1214
>                 URL: https://issues.apache.org/jira/browse/PARQUET-1214
>             Project: Parquet
>          Issue Type: Sub-task
>            Reporter: Gabor Szadovszky
>            Assignee: Gabor Szadovszky
>            Priority: Major
>              Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)