You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by ga...@apache.org on 2019/11/13 09:14:31 UTC

[parquet-mr] branch master updated: PARQUET-1685: Truncate Min/Max for Statistics (#696)

This is an automated email from the ASF dual-hosted git repository.

gabor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git


The following commit(s) were added to refs/heads/master by this push:
     new 76f9010  PARQUET-1685: Truncate Min/Max for Statistics (#696)
76f9010 is described below

commit 76f90101376d2589fe6071c96ae9d0203b245c0d
Author: shangxinli <31...@users.noreply.github.com>
AuthorDate: Wed Nov 13 01:14:24 2019 -0800

    PARQUET-1685: Truncate Min/Max for Statistics (#696)
    
    * Remove unnecessary string converting in readFooter method
---
 .../apache/parquet/column/ParquetProperties.java   | 20 ++++++-
 .../column/statistics/BinaryStatistics.java        | 11 ++++
 .../column/columnindex/BinaryTruncator.java        | 16 +++---
 .../format/converter/ParquetMetadataConverter.java | 64 ++++++++++++++++++---
 .../apache/parquet/hadoop/ParquetFileWriter.java   | 13 ++++-
 .../apache/parquet/hadoop/ParquetOutputFormat.java | 17 +++++-
 .../org/apache/parquet/hadoop/ParquetWriter.java   |  3 +-
 .../converter/TestParquetMetadataConverter.java    | 66 ++++++++++++++++++++++
 8 files changed, 187 insertions(+), 23 deletions(-)

diff --git a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java
index 7492b54..c022b72 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java
@@ -48,6 +48,7 @@ public class ParquetProperties {
   public static final int DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK = 100;
   public static final int DEFAULT_MAXIMUM_RECORD_COUNT_FOR_CHECK = 10000;
   public static final int DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH = 64;
+  public static final int DEFAULT_STATISTICS_TRUNCATE_LENGTH = Integer.MAX_VALUE;
   public static final int DEFAULT_PAGE_ROW_COUNT_LIMIT = 20_000;
 
   public static final boolean DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED = true;
@@ -88,13 +89,14 @@ public class ParquetProperties {
   private final ByteBufferAllocator allocator;
   private final ValuesWriterFactory valuesWriterFactory;
   private final int columnIndexTruncateLength;
+  private final int statisticsTruncateLength;
   private final int pageRowCountLimit;
   private final boolean pageWriteChecksumEnabled;
 
   private ParquetProperties(WriterVersion writerVersion, int pageSize, int dictPageSize, boolean enableDict, int minRowCountForPageSizeCheck,
                             int maxRowCountForPageSizeCheck, boolean estimateNextSizeCheck, ByteBufferAllocator allocator,
                             ValuesWriterFactory writerFactory, int columnIndexMinMaxTruncateLength, int pageRowCountLimit,
-                            boolean pageWriteChecksumEnabled) {
+                            boolean pageWriteChecksumEnabled, int statisticsTruncateLength) {
     this.pageSizeThreshold = pageSize;
     this.initialSlabSize = CapacityByteArrayOutputStream
       .initialSlabSizeHeuristic(MIN_SLAB_SIZE, pageSizeThreshold, 10);
@@ -108,6 +110,7 @@ public class ParquetProperties {
 
     this.valuesWriterFactory = writerFactory;
     this.columnIndexTruncateLength = columnIndexMinMaxTruncateLength;
+    this.statisticsTruncateLength = statisticsTruncateLength;
     this.pageRowCountLimit = pageRowCountLimit;
     this.pageWriteChecksumEnabled = pageWriteChecksumEnabled;
   }
@@ -198,6 +201,10 @@ public class ParquetProperties {
     return columnIndexTruncateLength;
   }
 
+  public int getStatisticsTruncateLength() {
+    return statisticsTruncateLength;
+  }
+
   public boolean estimateNextSizeCheck() {
     return estimateNextSizeCheck;
   }
@@ -229,6 +236,7 @@ public class ParquetProperties {
     private ByteBufferAllocator allocator = new HeapByteBufferAllocator();
     private ValuesWriterFactory valuesWriterFactory = DEFAULT_VALUES_WRITER_FACTORY;
     private int columnIndexTruncateLength = DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH;
+    private int statisticsTruncateLength = DEFAULT_STATISTICS_TRUNCATE_LENGTH;
     private int pageRowCountLimit = DEFAULT_PAGE_ROW_COUNT_LIMIT;
     private boolean pageWriteChecksumEnabled = DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED;
 
@@ -330,11 +338,17 @@ public class ParquetProperties {
     }
 
     public Builder withColumnIndexTruncateLength(int length) {
-      Preconditions.checkArgument(length > 0, "Invalid column index min/max truncate length (negative) : %s", length);
+      Preconditions.checkArgument(length > 0, "Invalid column index min/max truncate length (negative or zero) : %s", length);
       this.columnIndexTruncateLength = length;
       return this;
     }
 
+    public Builder withStatisticsTruncateLength(int length) {
+      Preconditions.checkArgument(length > 0, "Invalid statistics min/max truncate length (negative or zero) : %s", length);
+      this.statisticsTruncateLength = length;
+      return this;
+    }
+
     public Builder withPageRowCountLimit(int rowCount) {
       Preconditions.checkArgument(rowCount > 0, "Invalid row count limit for pages: " + rowCount);
       pageRowCountLimit = rowCount;
@@ -351,7 +365,7 @@ public class ParquetProperties {
         new ParquetProperties(writerVersion, pageSize, dictPageSize,
           enableDict, minRowCountForPageSizeCheck, maxRowCountForPageSizeCheck,
           estimateNextSizeCheck, allocator, valuesWriterFactory, columnIndexTruncateLength,
-          pageRowCountLimit, pageWriteChecksumEnabled);
+          pageRowCountLimit, pageWriteChecksumEnabled, statisticsTruncateLength);
       // we pass a constructed but uninitialized factory to ParquetProperties above as currently
       // creation of ValuesWriters is invoked from within ParquetProperties. In the future
       // we'd like to decouple that and won't need to pass an object to properties and then pass the
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/statistics/BinaryStatistics.java b/parquet-column/src/main/java/org/apache/parquet/column/statistics/BinaryStatistics.java
index 6746729..6cd5395 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/statistics/BinaryStatistics.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/statistics/BinaryStatistics.java
@@ -103,6 +103,17 @@ public class BinaryStatistics extends Statistics<Binary> {
     return !hasNonNullValue() || ((min.length() + max.length()) < size);
   }
 
+  public boolean isSmallerThanWithTruncation(long size, int truncationLength) {
+    if (!hasNonNullValue()) {
+      return true;
+    }
+
+    int minTruncateLength = Math.min(min.length(), truncationLength);
+    int maxTruncateLength = Math.min(max.length(), truncationLength);
+
+    return minTruncateLength + maxTruncateLength < size;
+  }
+
   /**
    * @param min_value a min binary
    * @param max_value a max binary
diff --git a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/BinaryTruncator.java b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/BinaryTruncator.java
index a301f67..8a6f007 100644
--- a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/BinaryTruncator.java
+++ b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/BinaryTruncator.java
@@ -34,7 +34,7 @@ import org.apache.parquet.schema.PrimitiveType;
 /**
  * Class for truncating min/max values for binary types.
  */
-abstract class BinaryTruncator {
+public abstract class BinaryTruncator {
   enum Validity {
     VALID, MALFORMED, UNMAPPABLE;
   }
@@ -69,12 +69,12 @@ abstract class BinaryTruncator {
 
   private static final BinaryTruncator NO_OP_TRUNCATOR = new BinaryTruncator() {
     @Override
-    Binary truncateMin(Binary minValue, int length) {
+    public Binary truncateMin(Binary minValue, int length) {
       return minValue;
     }
 
     @Override
-    Binary truncateMax(Binary maxValue, int length) {
+    public Binary truncateMax(Binary maxValue, int length) {
       return maxValue;
     }
   };
@@ -83,7 +83,7 @@ abstract class BinaryTruncator {
     private final CharsetValidator validator = new CharsetValidator(StandardCharsets.UTF_8);
 
     @Override
-    Binary truncateMin(Binary minValue, int length) {
+    public Binary truncateMin(Binary minValue, int length) {
       if (minValue.length() <= length) {
         return minValue;
       }
@@ -98,7 +98,7 @@ abstract class BinaryTruncator {
     }
 
     @Override
-    Binary truncateMax(Binary maxValue, int length) {
+    public Binary truncateMax(Binary maxValue, int length) {
       if (maxValue.length() <= length) {
         return maxValue;
       }
@@ -176,7 +176,7 @@ abstract class BinaryTruncator {
     }
   };
 
-  static BinaryTruncator getTruncator(PrimitiveType type) {
+  public static BinaryTruncator getTruncator(PrimitiveType type) {
     if (type == null) {
       return NO_OP_TRUNCATOR;
     }
@@ -215,7 +215,7 @@ abstract class BinaryTruncator {
     }
   }
 
-  abstract Binary truncateMin(Binary minValue, int length);
+  public abstract Binary truncateMin(Binary minValue, int length);
 
-  abstract Binary truncateMax(Binary maxValue, int length);
+  public abstract Binary truncateMax(Binary maxValue, int length);
 }
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
index b01e73c..03476a1 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
@@ -44,6 +44,8 @@ import java.util.concurrent.ConcurrentHashMap;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.parquet.CorruptStatistics;
 import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.column.statistics.BinaryStatistics;
 import org.apache.parquet.format.BsonType;
 import org.apache.parquet.format.CompressionCodec;
 import org.apache.parquet.format.DateType;
@@ -91,10 +93,12 @@ import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
 import org.apache.parquet.hadoop.metadata.CompressionCodecName;
 import org.apache.parquet.column.EncodingStats;
 import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.internal.column.columnindex.BinaryTruncator;
 import org.apache.parquet.internal.column.columnindex.ColumnIndexBuilder;
 import org.apache.parquet.internal.column.columnindex.OffsetIndexBuilder;
 import org.apache.parquet.internal.hadoop.metadata.IndexReference;
 import org.apache.parquet.io.ParquetDecodingException;
+import org.apache.parquet.io.api.Binary;
 import org.apache.parquet.schema.ColumnOrder.ColumnOrderName;
 import org.apache.parquet.schema.GroupType;
 import org.apache.parquet.schema.MessageType;
@@ -120,13 +124,17 @@ public class ParquetMetadataConverter {
   private static final Logger LOG = LoggerFactory.getLogger(ParquetMetadataConverter.class);
   private static final LogicalTypeConverterVisitor LOGICAL_TYPE_ANNOTATION_VISITOR = new LogicalTypeConverterVisitor();
   private static final ConvertedTypeConverterVisitor CONVERTED_TYPE_CONVERTER_VISITOR = new ConvertedTypeConverterVisitor();
-
+  private final int statisticsTruncateLength;
   private final boolean useSignedStringMinMax;
 
   public ParquetMetadataConverter() {
     this(false);
   }
 
+  public ParquetMetadataConverter(int statisticsTruncateLength) {
+    this(false, statisticsTruncateLength);
+  }
+
   /**
    * @param conf a configuration
    * @deprecated will be removed in 2.0.0; use {@code ParquetMetadataConverter(ParquetReadOptions)}
@@ -141,7 +149,15 @@ public class ParquetMetadataConverter {
   }
 
   private ParquetMetadataConverter(boolean useSignedStringMinMax) {
+    this(useSignedStringMinMax, ParquetProperties.DEFAULT_STATISTICS_TRUNCATE_LENGTH);
+  }
+
+  private ParquetMetadataConverter(boolean useSignedStringMinMax, int statisticsTruncateLength) {
+    if (statisticsTruncateLength <= 0) {
+      throw new IllegalArgumentException("Truncate length should be greater than 0");
+    }
     this.useSignedStringMinMax = useSignedStringMinMax;
+    this.statisticsTruncateLength = statisticsTruncateLength;
   }
 
   // NOTE: this cache is for memory savings, not cpu savings, and is used to de-duplicate
@@ -458,7 +474,7 @@ public class ParquetMetadataConverter {
           columnMetaData.getFirstDataPageOffset());
       columnChunk.meta_data.dictionary_page_offset = columnMetaData.getDictionaryPageOffset();
       if (!columnMetaData.getStatistics().isEmpty()) {
-        columnChunk.meta_data.setStatistics(toParquetStatistics(columnMetaData.getStatistics()));
+        columnChunk.meta_data.setStatistics(toParquetStatistics(columnMetaData.getStatistics(), this.statisticsTruncateLength));
       }
       if (columnMetaData.getEncodingStats() != null) {
         columnChunk.meta_data.setEncoding_stats(convertEncodingStats(columnMetaData.getEncodingStats()));
@@ -576,18 +592,31 @@ public class ParquetMetadataConverter {
   }
 
   public static Statistics toParquetStatistics(
-      org.apache.parquet.column.statistics.Statistics stats) {
+    org.apache.parquet.column.statistics.Statistics stats) {
+    return toParquetStatistics(stats, ParquetProperties.DEFAULT_STATISTICS_TRUNCATE_LENGTH);
+  }
+
+  public static Statistics toParquetStatistics(
+      org.apache.parquet.column.statistics.Statistics stats, int truncateLength) {
     Statistics formatStats = new Statistics();
     // Don't write stats larger than the max size rather than truncating. The
     // rationale is that some engines may use the minimum value in the page as
     // the true minimum for aggregations and there is no way to mark that a
     // value has been truncated and is a lower bound and not in the page.
-    if (!stats.isEmpty() && stats.isSmallerThan(MAX_STATS_SIZE)) {
+    if (!stats.isEmpty() && withinLimit(stats, truncateLength)) {
       formatStats.setNull_count(stats.getNumNulls());
       if (stats.hasNonNullValue()) {
-        byte[] min = stats.getMinBytes();
-        byte[] max = stats.getMaxBytes();
-
+        byte[] min;
+        byte[] max;
+
+        if (stats instanceof BinaryStatistics && truncateLength != Integer.MAX_VALUE) {
+          BinaryTruncator truncator = BinaryTruncator.getTruncator(stats.type());
+          min = tuncateMin(truncator, truncateLength, stats.getMinBytes());
+          max = tuncateMax(truncator, truncateLength, stats.getMaxBytes());
+        } else {
+          min = stats.getMinBytes();
+          max = stats.getMaxBytes();
+        }
         // Fill the former min-max statistics only if the comparison logic is
         // signed so the logic of V1 and V2 stats are the same (which is
         // trivially true for equal min-max values)
@@ -605,6 +634,27 @@ public class ParquetMetadataConverter {
     return formatStats;
   }
 
+  private static boolean withinLimit(org.apache.parquet.column.statistics.Statistics stats, int truncateLength) {
+    if (stats.isSmallerThan(MAX_STATS_SIZE)) {
+      return true;
+    }
+
+    if (!(stats instanceof BinaryStatistics)) {
+      return false;
+    }
+
+    BinaryStatistics binaryStatistics = (BinaryStatistics) stats;
+    return binaryStatistics.isSmallerThanWithTruncation(MAX_STATS_SIZE, truncateLength);
+  }
+
+  private static byte[] tuncateMin(BinaryTruncator truncator, int truncateLength, byte[] input) {
+    return truncator.truncateMin(Binary.fromConstantByteArray(input), truncateLength).getBytes();
+  }
+
+  private static byte[] tuncateMax(BinaryTruncator truncator, int truncateLength, byte[] input) {
+    return truncator.truncateMax(Binary.fromConstantByteArray(input), truncateLength).getBytes();
+  }
+
   private static boolean isMinMaxStatsSupported(PrimitiveType type) {
     return type.columnOrder().getColumnOrderName() == ColumnOrderName.TYPE_DEFINED_ORDER;
   }
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
index f352391..e402c7d 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
@@ -85,7 +85,7 @@ import org.slf4j.LoggerFactory;
 public class ParquetFileWriter {
   private static final Logger LOG = LoggerFactory.getLogger(ParquetFileWriter.class);
 
-  private static ParquetMetadataConverter metadataConverter = new ParquetMetadataConverter();
+  private final ParquetMetadataConverter metadataConverter;
 
   public static final String PARQUET_METADATA_FILE = "_metadata";
   public static final String MAGIC_STR = "PAR1";
@@ -258,8 +258,10 @@ public class ParquetFileWriter {
       throws IOException {
     this(file, schema, mode, rowGroupSize, maxPaddingSize,
         ParquetProperties.DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH,
+      ParquetProperties.DEFAULT_STATISTICS_TRUNCATE_LENGTH,
         ParquetProperties.DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED);
   }
+
   /**
    * @param file OutputFile to create or overwrite
    * @param schema the schema of the data
@@ -267,13 +269,14 @@ public class ParquetFileWriter {
    * @param rowGroupSize the row group size
    * @param maxPaddingSize the maximum padding
    * @param columnIndexTruncateLength the length which the min/max values in column indexes tried to be truncated to
+   * @param statisticsTruncateLength the length which the min/max values in row groups tried to be truncated to
    * @param pageWriteChecksumEnabled whether to write out page level checksums
    * @throws IOException if the file can not be created
    */
   public ParquetFileWriter(OutputFile file, MessageType schema, Mode mode,
                            long rowGroupSize, int maxPaddingSize, int columnIndexTruncateLength,
-                           boolean pageWriteChecksumEnabled)
-      throws IOException {
+                           int statisticsTruncateLength, boolean pageWriteChecksumEnabled)
+    throws IOException {
     TypeUtil.checkValidWriteSchema(schema);
 
     this.schema = schema;
@@ -296,6 +299,8 @@ public class ParquetFileWriter {
     this.columnIndexTruncateLength = columnIndexTruncateLength;
     this.pageWriteChecksumEnabled = pageWriteChecksumEnabled;
     this.crc = pageWriteChecksumEnabled ? new CRC32() : null;
+
+    this.metadataConverter = new ParquetMetadataConverter(statisticsTruncateLength);
   }
 
   /**
@@ -322,6 +327,7 @@ public class ParquetFileWriter {
     this.columnIndexTruncateLength = Integer.MAX_VALUE;
     this.pageWriteChecksumEnabled = ParquetOutputFormat.getPageWriteChecksumEnabled(configuration);
     this.crc = pageWriteChecksumEnabled ? new CRC32() : null;
+    this.metadataConverter = new ParquetMetadataConverter(ParquetProperties.DEFAULT_STATISTICS_TRUNCATE_LENGTH);
   }
   /**
    * start the file
@@ -904,6 +910,7 @@ public class ParquetFileWriter {
 
   private static void serializeFooter(ParquetMetadata footer, PositionOutputStream out) throws IOException {
     long footerIndex = out.getPos();
+    ParquetMetadataConverter metadataConverter = new ParquetMetadataConverter();
     org.apache.parquet.format.FileMetaData parquetMetadata = metadataConverter.toParquetMetadata(CURRENT_VERSION, footer);
     writeFileMetaData(parquetMetadata, out);
     LOG.debug("{}: footer length = {}" , out.getPos(), (out.getPos() - footerIndex));
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
index afcbbff..db73fb6 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
@@ -144,6 +144,7 @@ public class ParquetOutputFormat<T> extends FileOutputFormat<Void, T> {
   public static final String MAX_ROW_COUNT_FOR_PAGE_SIZE_CHECK = "parquet.page.size.row.check.max";
   public static final String ESTIMATE_PAGE_SIZE_CHECK = "parquet.page.size.check.estimate";
   public static final String COLUMN_INDEX_TRUNCATE_LENGTH = "parquet.columnindex.truncate.length";
+  public static final String STATISTICS_TRUNCATE_LENGTH = "parquet.statistics.truncate.length";
   public static final String PAGE_ROW_COUNT_LIMIT = "parquet.page.row.count.limit";
   public static final String PAGE_WRITE_CHECKSUM_ENABLED = "parquet.page.write-checksum.enabled";
 
@@ -327,6 +328,18 @@ public class ParquetOutputFormat<T> extends FileOutputFormat<Void, T> {
     return conf.getInt(COLUMN_INDEX_TRUNCATE_LENGTH, ParquetProperties.DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH);
   }
 
+  public static void setStatisticsTruncateLength(JobContext jobContext, int length) {
+    setStatisticsTruncateLength(getConfiguration(jobContext), length);
+  }
+
+  private static void setStatisticsTruncateLength(Configuration conf, int length) {
+    conf.setInt(STATISTICS_TRUNCATE_LENGTH, length);
+  }
+
+  private static int getStatisticsTruncateLength(Configuration conf) {
+    return conf.getInt(STATISTICS_TRUNCATE_LENGTH, ParquetProperties.DEFAULT_STATISTICS_TRUNCATE_LENGTH);
+  }
+
   public static void setPageRowCountLimit(JobContext jobContext, int rowCount) {
     setPageRowCountLimit(getConfiguration(jobContext), rowCount);
   }
@@ -421,6 +434,7 @@ public class ParquetOutputFormat<T> extends FileOutputFormat<Void, T> {
         .withMinRowCountForPageSizeCheck(getMinRowCountForPageSizeCheck(conf))
         .withMaxRowCountForPageSizeCheck(getMaxRowCountForPageSizeCheck(conf))
         .withColumnIndexTruncateLength(getColumnIndexTruncateLength(conf))
+        .withStatisticsTruncateLength(getStatisticsTruncateLength(conf))
         .withPageRowCountLimit(getPageRowCountLimit(conf))
         .withPageWriteChecksumEnabled(getPageWriteChecksumEnabled(conf))
         .build();
@@ -441,6 +455,7 @@ public class ParquetOutputFormat<T> extends FileOutputFormat<Void, T> {
       LOG.info("Min row count for page size check is: {}", props.getMinRowCountForPageSizeCheck());
       LOG.info("Max row count for page size check is: {}", props.getMaxRowCountForPageSizeCheck());
       LOG.info("Truncate length for column indexes is: {}", props.getColumnIndexTruncateLength());
+      LOG.info("Truncate length for statistics min/max  is: {}", props.getStatisticsTruncateLength());
       LOG.info("Page row count limit to {}", props.getPageRowCountLimit());
       LOG.info("Writing page checksums is: {}", props.getPageWriteChecksumEnabled() ? "on" : "off");
     }
@@ -448,7 +463,7 @@ public class ParquetOutputFormat<T> extends FileOutputFormat<Void, T> {
     WriteContext init = writeSupport.init(conf);
     ParquetFileWriter w = new ParquetFileWriter(HadoopOutputFile.fromPath(file, conf),
         init.getSchema(), mode, blockSize, maxPaddingSize, props.getColumnIndexTruncateLength(),
-        props.getPageWriteChecksumEnabled());
+        props.getStatisticsTruncateLength(), props.getPageWriteChecksumEnabled());
     w.start();
 
     float maxLoad = conf.getFloat(ParquetOutputFormat.MEMORY_POOL_RATIO,
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
index 7fb7186..870a1a6 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
@@ -279,7 +279,8 @@ public class ParquetWriter<T> implements Closeable {
 
     ParquetFileWriter fileWriter = new ParquetFileWriter(
       file, schema, mode, rowGroupSize, maxPaddingSize,
-      encodingProps.getColumnIndexTruncateLength(), encodingProps.getPageWriteChecksumEnabled());
+      encodingProps.getColumnIndexTruncateLength(), encodingProps.getStatisticsTruncateLength(),
+      encodingProps.getPageWriteChecksumEnabled());
     fileWriter.start();
 
     this.codecFactory = new CodecFactory(conf, encodingProps.getPageSizeThreshold());
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/format/converter/TestParquetMetadataConverter.java b/parquet-hadoop/src/test/java/org/apache/parquet/format/converter/TestParquetMetadataConverter.java
index 5da5ddf..e510b60 100644
--- a/parquet-hadoop/src/test/java/org/apache/parquet/format/converter/TestParquetMetadataConverter.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/format/converter/TestParquetMetadataConverter.java
@@ -53,6 +53,8 @@ import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.math.BigInteger;
 import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.security.SecureRandom;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -65,9 +67,11 @@ import java.util.TreeSet;
 
 import com.google.common.collect.Sets;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.UTF8;
 import org.apache.parquet.Version;
 import org.apache.parquet.bytes.BytesUtils;
 import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.ParquetProperties;
 import org.apache.parquet.column.statistics.BinaryStatistics;
 import org.apache.parquet.column.statistics.BooleanStatistics;
 import org.apache.parquet.column.statistics.DoubleStatistics;
@@ -113,6 +117,11 @@ import org.apache.parquet.schema.Types;
 import com.google.common.collect.Lists;
 
 public class TestParquetMetadataConverter {
+  private static SecureRandom random = new SecureRandom();
+  private static final String CHAR_LOWER = "abcdefghijklmnopqrstuvwxyz";
+  private static final String CHAR_UPPER = CHAR_LOWER.toUpperCase();
+  private static final String NUMBER = "0123456789";
+  private static final String DATA_FOR_RANDOM_STRING = CHAR_LOWER + CHAR_UPPER + NUMBER;
 
   @Test
   public void testPageHeader() throws IOException {
@@ -581,6 +590,63 @@ public class TestParquetMetadataConverter {
   }
 
   @Test
+  public void testBinaryStatsWithTruncation() {
+    int defaultTruncLen = ParquetProperties.DEFAULT_STATISTICS_TRUNCATE_LENGTH;
+    int[] validLengths = {1, 2, 16, 64, defaultTruncLen - 1};
+    for (int len : validLengths) {
+      testBinaryStatsWithTruncation(len, 60, 70);
+      testBinaryStatsWithTruncation(len, (int) ParquetMetadataConverter.MAX_STATS_SIZE, 190);
+      testBinaryStatsWithTruncation(len, 280, (int) ParquetMetadataConverter.MAX_STATS_SIZE);
+      testBinaryStatsWithTruncation(len, (int) ParquetMetadataConverter.MAX_STATS_SIZE, (int) ParquetMetadataConverter.MAX_STATS_SIZE);
+    }
+
+    int[] invalidLengths = {-1, 0,  Integer.MAX_VALUE + 1};
+    for (int len : invalidLengths) {
+      try {
+        testBinaryStatsWithTruncation(len, 80, 20);
+        Assert.fail("Expected IllegalArgumentException but didn't happen");
+      } catch (IllegalArgumentException e) {
+        // expected, nothing to do
+      }
+    }
+  }
+
+  // The number of minLen and maxLen shouldn't matter because the comparision is controlled by prefix
+  private void testBinaryStatsWithTruncation(int truncateLen, int minLen, int maxLen) {
+    BinaryStatistics stats = new BinaryStatistics();
+    byte[] min = generateRandomString("a", minLen).getBytes();
+    byte[] max = generateRandomString("b", maxLen).getBytes();
+    stats.updateStats(Binary.fromConstantByteArray(min));
+    stats.updateStats(Binary.fromConstantByteArray(max));
+    ParquetMetadataConverter metadataConverter = new ParquetMetadataConverter(truncateLen);
+    org.apache.parquet.format.Statistics formatStats = metadataConverter.toParquetStatistics(stats);
+
+    if (minLen + maxLen >= ParquetMetadataConverter.MAX_STATS_SIZE) {
+      assertNull(formatStats.getMin_value());
+      assertNull(formatStats.getMax_value());
+    } else {
+      String minString = new String(min, Charset.forName("UTF-8"));
+      String minStatString = new String(formatStats.getMin_value(), Charset.forName("UTF-8"));
+      assertTrue(minStatString.compareTo(minString) <= 0);
+      String maxString = new String(max, Charset.forName("UTF-8"));
+      String maxStatString = new String(formatStats.getMax_value(), Charset.forName("UTF-8"));
+      assertTrue(maxStatString.compareTo(maxString) >= 0);
+    }
+  }
+
+  private static String generateRandomString(String prefix, int length) {
+    assertTrue(prefix.length() <= length);
+    StringBuilder sb = new StringBuilder(length);
+    sb.append(prefix);
+    for (int i = 0; i < length - prefix.length(); i++) {
+      int rndCharAt = random.nextInt(DATA_FOR_RANDOM_STRING.length());
+      char rndChar = DATA_FOR_RANDOM_STRING.charAt(rndCharAt);
+      sb.append(rndChar);
+    }
+    return sb.toString();
+  }
+
+  @Test
   public void testIntegerStatsV1() {
     testIntegerStats(StatsHelper.V1);
   }