You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by ga...@apache.org on 2019/11/13 09:14:31 UTC
[parquet-mr] branch master updated: PARQUET-1685: Truncate Min/Max
for Statistics (#696)
This is an automated email from the ASF dual-hosted git repository.
gabor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git
The following commit(s) were added to refs/heads/master by this push:
new 76f9010 PARQUET-1685: Truncate Min/Max for Statistics (#696)
76f9010 is described below
commit 76f90101376d2589fe6071c96ae9d0203b245c0d
Author: shangxinli <31...@users.noreply.github.com>
AuthorDate: Wed Nov 13 01:14:24 2019 -0800
PARQUET-1685: Truncate Min/Max for Statistics (#696)
* Remove unnecessary string converting in readFooter method
---
.../apache/parquet/column/ParquetProperties.java | 20 ++++++-
.../column/statistics/BinaryStatistics.java | 11 ++++
.../column/columnindex/BinaryTruncator.java | 16 +++---
.../format/converter/ParquetMetadataConverter.java | 64 ++++++++++++++++++---
.../apache/parquet/hadoop/ParquetFileWriter.java | 13 ++++-
.../apache/parquet/hadoop/ParquetOutputFormat.java | 17 +++++-
.../org/apache/parquet/hadoop/ParquetWriter.java | 3 +-
.../converter/TestParquetMetadataConverter.java | 66 ++++++++++++++++++++++
8 files changed, 187 insertions(+), 23 deletions(-)
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java
index 7492b54..c022b72 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java
@@ -48,6 +48,7 @@ public class ParquetProperties {
public static final int DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK = 100;
public static final int DEFAULT_MAXIMUM_RECORD_COUNT_FOR_CHECK = 10000;
public static final int DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH = 64;
+ public static final int DEFAULT_STATISTICS_TRUNCATE_LENGTH = Integer.MAX_VALUE;
public static final int DEFAULT_PAGE_ROW_COUNT_LIMIT = 20_000;
public static final boolean DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED = true;
@@ -88,13 +89,14 @@ public class ParquetProperties {
private final ByteBufferAllocator allocator;
private final ValuesWriterFactory valuesWriterFactory;
private final int columnIndexTruncateLength;
+ private final int statisticsTruncateLength;
private final int pageRowCountLimit;
private final boolean pageWriteChecksumEnabled;
private ParquetProperties(WriterVersion writerVersion, int pageSize, int dictPageSize, boolean enableDict, int minRowCountForPageSizeCheck,
int maxRowCountForPageSizeCheck, boolean estimateNextSizeCheck, ByteBufferAllocator allocator,
ValuesWriterFactory writerFactory, int columnIndexMinMaxTruncateLength, int pageRowCountLimit,
- boolean pageWriteChecksumEnabled) {
+ boolean pageWriteChecksumEnabled, int statisticsTruncateLength) {
this.pageSizeThreshold = pageSize;
this.initialSlabSize = CapacityByteArrayOutputStream
.initialSlabSizeHeuristic(MIN_SLAB_SIZE, pageSizeThreshold, 10);
@@ -108,6 +110,7 @@ public class ParquetProperties {
this.valuesWriterFactory = writerFactory;
this.columnIndexTruncateLength = columnIndexMinMaxTruncateLength;
+ this.statisticsTruncateLength = statisticsTruncateLength;
this.pageRowCountLimit = pageRowCountLimit;
this.pageWriteChecksumEnabled = pageWriteChecksumEnabled;
}
@@ -198,6 +201,10 @@ public class ParquetProperties {
return columnIndexTruncateLength;
}
+ public int getStatisticsTruncateLength() {
+ return statisticsTruncateLength;
+ }
+
public boolean estimateNextSizeCheck() {
return estimateNextSizeCheck;
}
@@ -229,6 +236,7 @@ public class ParquetProperties {
private ByteBufferAllocator allocator = new HeapByteBufferAllocator();
private ValuesWriterFactory valuesWriterFactory = DEFAULT_VALUES_WRITER_FACTORY;
private int columnIndexTruncateLength = DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH;
+ private int statisticsTruncateLength = DEFAULT_STATISTICS_TRUNCATE_LENGTH;
private int pageRowCountLimit = DEFAULT_PAGE_ROW_COUNT_LIMIT;
private boolean pageWriteChecksumEnabled = DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED;
@@ -330,11 +338,17 @@ public class ParquetProperties {
}
public Builder withColumnIndexTruncateLength(int length) {
- Preconditions.checkArgument(length > 0, "Invalid column index min/max truncate length (negative) : %s", length);
+ Preconditions.checkArgument(length > 0, "Invalid column index min/max truncate length (negative or zero) : %s", length);
this.columnIndexTruncateLength = length;
return this;
}
+ public Builder withStatisticsTruncateLength(int length) {
+ Preconditions.checkArgument(length > 0, "Invalid statistics min/max truncate length (negative or zero) : %s", length);
+ this.statisticsTruncateLength = length;
+ return this;
+ }
+
public Builder withPageRowCountLimit(int rowCount) {
Preconditions.checkArgument(rowCount > 0, "Invalid row count limit for pages: " + rowCount);
pageRowCountLimit = rowCount;
@@ -351,7 +365,7 @@ public class ParquetProperties {
new ParquetProperties(writerVersion, pageSize, dictPageSize,
enableDict, minRowCountForPageSizeCheck, maxRowCountForPageSizeCheck,
estimateNextSizeCheck, allocator, valuesWriterFactory, columnIndexTruncateLength,
- pageRowCountLimit, pageWriteChecksumEnabled);
+ pageRowCountLimit, pageWriteChecksumEnabled, statisticsTruncateLength);
// we pass a constructed but uninitialized factory to ParquetProperties above as currently
// creation of ValuesWriters is invoked from within ParquetProperties. In the future
// we'd like to decouple that and won't need to pass an object to properties and then pass the
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/statistics/BinaryStatistics.java b/parquet-column/src/main/java/org/apache/parquet/column/statistics/BinaryStatistics.java
index 6746729..6cd5395 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/statistics/BinaryStatistics.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/statistics/BinaryStatistics.java
@@ -103,6 +103,17 @@ public class BinaryStatistics extends Statistics<Binary> {
return !hasNonNullValue() || ((min.length() + max.length()) < size);
}
+ public boolean isSmallerThanWithTruncation(long size, int truncationLength) {
+ if (!hasNonNullValue()) {
+ return true;
+ }
+
+ int minTruncateLength = Math.min(min.length(), truncationLength);
+ int maxTruncateLength = Math.min(max.length(), truncationLength);
+
+ return minTruncateLength + maxTruncateLength < size;
+ }
+
/**
* @param min_value a min binary
* @param max_value a max binary
diff --git a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/BinaryTruncator.java b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/BinaryTruncator.java
index a301f67..8a6f007 100644
--- a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/BinaryTruncator.java
+++ b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/BinaryTruncator.java
@@ -34,7 +34,7 @@ import org.apache.parquet.schema.PrimitiveType;
/**
* Class for truncating min/max values for binary types.
*/
-abstract class BinaryTruncator {
+public abstract class BinaryTruncator {
enum Validity {
VALID, MALFORMED, UNMAPPABLE;
}
@@ -69,12 +69,12 @@ abstract class BinaryTruncator {
private static final BinaryTruncator NO_OP_TRUNCATOR = new BinaryTruncator() {
@Override
- Binary truncateMin(Binary minValue, int length) {
+ public Binary truncateMin(Binary minValue, int length) {
return minValue;
}
@Override
- Binary truncateMax(Binary maxValue, int length) {
+ public Binary truncateMax(Binary maxValue, int length) {
return maxValue;
}
};
@@ -83,7 +83,7 @@ abstract class BinaryTruncator {
private final CharsetValidator validator = new CharsetValidator(StandardCharsets.UTF_8);
@Override
- Binary truncateMin(Binary minValue, int length) {
+ public Binary truncateMin(Binary minValue, int length) {
if (minValue.length() <= length) {
return minValue;
}
@@ -98,7 +98,7 @@ abstract class BinaryTruncator {
}
@Override
- Binary truncateMax(Binary maxValue, int length) {
+ public Binary truncateMax(Binary maxValue, int length) {
if (maxValue.length() <= length) {
return maxValue;
}
@@ -176,7 +176,7 @@ abstract class BinaryTruncator {
}
};
- static BinaryTruncator getTruncator(PrimitiveType type) {
+ public static BinaryTruncator getTruncator(PrimitiveType type) {
if (type == null) {
return NO_OP_TRUNCATOR;
}
@@ -215,7 +215,7 @@ abstract class BinaryTruncator {
}
}
- abstract Binary truncateMin(Binary minValue, int length);
+ public abstract Binary truncateMin(Binary minValue, int length);
- abstract Binary truncateMax(Binary maxValue, int length);
+ public abstract Binary truncateMax(Binary maxValue, int length);
}
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
index b01e73c..03476a1 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
@@ -44,6 +44,8 @@ import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.parquet.CorruptStatistics;
import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.column.statistics.BinaryStatistics;
import org.apache.parquet.format.BsonType;
import org.apache.parquet.format.CompressionCodec;
import org.apache.parquet.format.DateType;
@@ -91,10 +93,12 @@ import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.column.EncodingStats;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.internal.column.columnindex.BinaryTruncator;
import org.apache.parquet.internal.column.columnindex.ColumnIndexBuilder;
import org.apache.parquet.internal.column.columnindex.OffsetIndexBuilder;
import org.apache.parquet.internal.hadoop.metadata.IndexReference;
import org.apache.parquet.io.ParquetDecodingException;
+import org.apache.parquet.io.api.Binary;
import org.apache.parquet.schema.ColumnOrder.ColumnOrderName;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.MessageType;
@@ -120,13 +124,17 @@ public class ParquetMetadataConverter {
private static final Logger LOG = LoggerFactory.getLogger(ParquetMetadataConverter.class);
private static final LogicalTypeConverterVisitor LOGICAL_TYPE_ANNOTATION_VISITOR = new LogicalTypeConverterVisitor();
private static final ConvertedTypeConverterVisitor CONVERTED_TYPE_CONVERTER_VISITOR = new ConvertedTypeConverterVisitor();
-
+ private final int statisticsTruncateLength;
private final boolean useSignedStringMinMax;
public ParquetMetadataConverter() {
this(false);
}
+ public ParquetMetadataConverter(int statisticsTruncateLength) {
+ this(false, statisticsTruncateLength);
+ }
+
/**
* @param conf a configuration
* @deprecated will be removed in 2.0.0; use {@code ParquetMetadataConverter(ParquetReadOptions)}
@@ -141,7 +149,15 @@ public class ParquetMetadataConverter {
}
private ParquetMetadataConverter(boolean useSignedStringMinMax) {
+ this(useSignedStringMinMax, ParquetProperties.DEFAULT_STATISTICS_TRUNCATE_LENGTH);
+ }
+
+ private ParquetMetadataConverter(boolean useSignedStringMinMax, int statisticsTruncateLength) {
+ if (statisticsTruncateLength <= 0) {
+ throw new IllegalArgumentException("Truncate length should be greater than 0");
+ }
this.useSignedStringMinMax = useSignedStringMinMax;
+ this.statisticsTruncateLength = statisticsTruncateLength;
}
// NOTE: this cache is for memory savings, not cpu savings, and is used to de-duplicate
@@ -458,7 +474,7 @@ public class ParquetMetadataConverter {
columnMetaData.getFirstDataPageOffset());
columnChunk.meta_data.dictionary_page_offset = columnMetaData.getDictionaryPageOffset();
if (!columnMetaData.getStatistics().isEmpty()) {
- columnChunk.meta_data.setStatistics(toParquetStatistics(columnMetaData.getStatistics()));
+ columnChunk.meta_data.setStatistics(toParquetStatistics(columnMetaData.getStatistics(), this.statisticsTruncateLength));
}
if (columnMetaData.getEncodingStats() != null) {
columnChunk.meta_data.setEncoding_stats(convertEncodingStats(columnMetaData.getEncodingStats()));
@@ -576,18 +592,31 @@ public class ParquetMetadataConverter {
}
public static Statistics toParquetStatistics(
- org.apache.parquet.column.statistics.Statistics stats) {
+ org.apache.parquet.column.statistics.Statistics stats) {
+ return toParquetStatistics(stats, ParquetProperties.DEFAULT_STATISTICS_TRUNCATE_LENGTH);
+ }
+
+ public static Statistics toParquetStatistics(
+ org.apache.parquet.column.statistics.Statistics stats, int truncateLength) {
Statistics formatStats = new Statistics();
// Don't write stats larger than the max size rather than truncating. The
// rationale is that some engines may use the minimum value in the page as
// the true minimum for aggregations and there is no way to mark that a
// value has been truncated and is a lower bound and not in the page.
- if (!stats.isEmpty() && stats.isSmallerThan(MAX_STATS_SIZE)) {
+ if (!stats.isEmpty() && withinLimit(stats, truncateLength)) {
formatStats.setNull_count(stats.getNumNulls());
if (stats.hasNonNullValue()) {
- byte[] min = stats.getMinBytes();
- byte[] max = stats.getMaxBytes();
-
+ byte[] min;
+ byte[] max;
+
+ if (stats instanceof BinaryStatistics && truncateLength != Integer.MAX_VALUE) {
+ BinaryTruncator truncator = BinaryTruncator.getTruncator(stats.type());
+ min = tuncateMin(truncator, truncateLength, stats.getMinBytes());
+ max = tuncateMax(truncator, truncateLength, stats.getMaxBytes());
+ } else {
+ min = stats.getMinBytes();
+ max = stats.getMaxBytes();
+ }
// Fill the former min-max statistics only if the comparison logic is
// signed so the logic of V1 and V2 stats are the same (which is
// trivially true for equal min-max values)
@@ -605,6 +634,27 @@ public class ParquetMetadataConverter {
return formatStats;
}
+ private static boolean withinLimit(org.apache.parquet.column.statistics.Statistics stats, int truncateLength) {
+ if (stats.isSmallerThan(MAX_STATS_SIZE)) {
+ return true;
+ }
+
+ if (!(stats instanceof BinaryStatistics)) {
+ return false;
+ }
+
+ BinaryStatistics binaryStatistics = (BinaryStatistics) stats;
+ return binaryStatistics.isSmallerThanWithTruncation(MAX_STATS_SIZE, truncateLength);
+ }
+
+ private static byte[] tuncateMin(BinaryTruncator truncator, int truncateLength, byte[] input) {
+ return truncator.truncateMin(Binary.fromConstantByteArray(input), truncateLength).getBytes();
+ }
+
+ private static byte[] tuncateMax(BinaryTruncator truncator, int truncateLength, byte[] input) {
+ return truncator.truncateMax(Binary.fromConstantByteArray(input), truncateLength).getBytes();
+ }
+
private static boolean isMinMaxStatsSupported(PrimitiveType type) {
return type.columnOrder().getColumnOrderName() == ColumnOrderName.TYPE_DEFINED_ORDER;
}
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
index f352391..e402c7d 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
@@ -85,7 +85,7 @@ import org.slf4j.LoggerFactory;
public class ParquetFileWriter {
private static final Logger LOG = LoggerFactory.getLogger(ParquetFileWriter.class);
- private static ParquetMetadataConverter metadataConverter = new ParquetMetadataConverter();
+ private final ParquetMetadataConverter metadataConverter;
public static final String PARQUET_METADATA_FILE = "_metadata";
public static final String MAGIC_STR = "PAR1";
@@ -258,8 +258,10 @@ public class ParquetFileWriter {
throws IOException {
this(file, schema, mode, rowGroupSize, maxPaddingSize,
ParquetProperties.DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH,
+ ParquetProperties.DEFAULT_STATISTICS_TRUNCATE_LENGTH,
ParquetProperties.DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED);
}
+
/**
* @param file OutputFile to create or overwrite
* @param schema the schema of the data
@@ -267,13 +269,14 @@ public class ParquetFileWriter {
* @param rowGroupSize the row group size
* @param maxPaddingSize the maximum padding
* @param columnIndexTruncateLength the length which the min/max values in column indexes tried to be truncated to
+ * @param statisticsTruncateLength the length which the min/max values in row groups tried to be truncated to
* @param pageWriteChecksumEnabled whether to write out page level checksums
* @throws IOException if the file can not be created
*/
public ParquetFileWriter(OutputFile file, MessageType schema, Mode mode,
long rowGroupSize, int maxPaddingSize, int columnIndexTruncateLength,
- boolean pageWriteChecksumEnabled)
- throws IOException {
+ int statisticsTruncateLength, boolean pageWriteChecksumEnabled)
+ throws IOException {
TypeUtil.checkValidWriteSchema(schema);
this.schema = schema;
@@ -296,6 +299,8 @@ public class ParquetFileWriter {
this.columnIndexTruncateLength = columnIndexTruncateLength;
this.pageWriteChecksumEnabled = pageWriteChecksumEnabled;
this.crc = pageWriteChecksumEnabled ? new CRC32() : null;
+
+ this.metadataConverter = new ParquetMetadataConverter(statisticsTruncateLength);
}
/**
@@ -322,6 +327,7 @@ public class ParquetFileWriter {
this.columnIndexTruncateLength = Integer.MAX_VALUE;
this.pageWriteChecksumEnabled = ParquetOutputFormat.getPageWriteChecksumEnabled(configuration);
this.crc = pageWriteChecksumEnabled ? new CRC32() : null;
+ this.metadataConverter = new ParquetMetadataConverter(ParquetProperties.DEFAULT_STATISTICS_TRUNCATE_LENGTH);
}
/**
* start the file
@@ -904,6 +910,7 @@ public class ParquetFileWriter {
private static void serializeFooter(ParquetMetadata footer, PositionOutputStream out) throws IOException {
long footerIndex = out.getPos();
+ ParquetMetadataConverter metadataConverter = new ParquetMetadataConverter();
org.apache.parquet.format.FileMetaData parquetMetadata = metadataConverter.toParquetMetadata(CURRENT_VERSION, footer);
writeFileMetaData(parquetMetadata, out);
LOG.debug("{}: footer length = {}" , out.getPos(), (out.getPos() - footerIndex));
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
index afcbbff..db73fb6 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
@@ -144,6 +144,7 @@ public class ParquetOutputFormat<T> extends FileOutputFormat<Void, T> {
public static final String MAX_ROW_COUNT_FOR_PAGE_SIZE_CHECK = "parquet.page.size.row.check.max";
public static final String ESTIMATE_PAGE_SIZE_CHECK = "parquet.page.size.check.estimate";
public static final String COLUMN_INDEX_TRUNCATE_LENGTH = "parquet.columnindex.truncate.length";
+ public static final String STATISTICS_TRUNCATE_LENGTH = "parquet.statistics.truncate.length";
public static final String PAGE_ROW_COUNT_LIMIT = "parquet.page.row.count.limit";
public static final String PAGE_WRITE_CHECKSUM_ENABLED = "parquet.page.write-checksum.enabled";
@@ -327,6 +328,18 @@ public class ParquetOutputFormat<T> extends FileOutputFormat<Void, T> {
return conf.getInt(COLUMN_INDEX_TRUNCATE_LENGTH, ParquetProperties.DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH);
}
+ public static void setStatisticsTruncateLength(JobContext jobContext, int length) {
+ setStatisticsTruncateLength(getConfiguration(jobContext), length);
+ }
+
+ private static void setStatisticsTruncateLength(Configuration conf, int length) {
+ conf.setInt(STATISTICS_TRUNCATE_LENGTH, length);
+ }
+
+ private static int getStatisticsTruncateLength(Configuration conf) {
+ return conf.getInt(STATISTICS_TRUNCATE_LENGTH, ParquetProperties.DEFAULT_STATISTICS_TRUNCATE_LENGTH);
+ }
+
public static void setPageRowCountLimit(JobContext jobContext, int rowCount) {
setPageRowCountLimit(getConfiguration(jobContext), rowCount);
}
@@ -421,6 +434,7 @@ public class ParquetOutputFormat<T> extends FileOutputFormat<Void, T> {
.withMinRowCountForPageSizeCheck(getMinRowCountForPageSizeCheck(conf))
.withMaxRowCountForPageSizeCheck(getMaxRowCountForPageSizeCheck(conf))
.withColumnIndexTruncateLength(getColumnIndexTruncateLength(conf))
+ .withStatisticsTruncateLength(getStatisticsTruncateLength(conf))
.withPageRowCountLimit(getPageRowCountLimit(conf))
.withPageWriteChecksumEnabled(getPageWriteChecksumEnabled(conf))
.build();
@@ -441,6 +455,7 @@ public class ParquetOutputFormat<T> extends FileOutputFormat<Void, T> {
LOG.info("Min row count for page size check is: {}", props.getMinRowCountForPageSizeCheck());
LOG.info("Max row count for page size check is: {}", props.getMaxRowCountForPageSizeCheck());
LOG.info("Truncate length for column indexes is: {}", props.getColumnIndexTruncateLength());
+ LOG.info("Truncate length for statistics min/max is: {}", props.getStatisticsTruncateLength());
LOG.info("Page row count limit to {}", props.getPageRowCountLimit());
LOG.info("Writing page checksums is: {}", props.getPageWriteChecksumEnabled() ? "on" : "off");
}
@@ -448,7 +463,7 @@ public class ParquetOutputFormat<T> extends FileOutputFormat<Void, T> {
WriteContext init = writeSupport.init(conf);
ParquetFileWriter w = new ParquetFileWriter(HadoopOutputFile.fromPath(file, conf),
init.getSchema(), mode, blockSize, maxPaddingSize, props.getColumnIndexTruncateLength(),
- props.getPageWriteChecksumEnabled());
+ props.getStatisticsTruncateLength(), props.getPageWriteChecksumEnabled());
w.start();
float maxLoad = conf.getFloat(ParquetOutputFormat.MEMORY_POOL_RATIO,
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
index 7fb7186..870a1a6 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
@@ -279,7 +279,8 @@ public class ParquetWriter<T> implements Closeable {
ParquetFileWriter fileWriter = new ParquetFileWriter(
file, schema, mode, rowGroupSize, maxPaddingSize,
- encodingProps.getColumnIndexTruncateLength(), encodingProps.getPageWriteChecksumEnabled());
+ encodingProps.getColumnIndexTruncateLength(), encodingProps.getStatisticsTruncateLength(),
+ encodingProps.getPageWriteChecksumEnabled());
fileWriter.start();
this.codecFactory = new CodecFactory(conf, encodingProps.getPageSizeThreshold());
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/format/converter/TestParquetMetadataConverter.java b/parquet-hadoop/src/test/java/org/apache/parquet/format/converter/TestParquetMetadataConverter.java
index 5da5ddf..e510b60 100644
--- a/parquet-hadoop/src/test/java/org/apache/parquet/format/converter/TestParquetMetadataConverter.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/format/converter/TestParquetMetadataConverter.java
@@ -53,6 +53,8 @@ import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.math.BigInteger;
import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -65,9 +67,11 @@ import java.util.TreeSet;
import com.google.common.collect.Sets;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.UTF8;
import org.apache.parquet.Version;
import org.apache.parquet.bytes.BytesUtils;
import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.column.statistics.BinaryStatistics;
import org.apache.parquet.column.statistics.BooleanStatistics;
import org.apache.parquet.column.statistics.DoubleStatistics;
@@ -113,6 +117,11 @@ import org.apache.parquet.schema.Types;
import com.google.common.collect.Lists;
public class TestParquetMetadataConverter {
+ private static SecureRandom random = new SecureRandom();
+ private static final String CHAR_LOWER = "abcdefghijklmnopqrstuvwxyz";
+ private static final String CHAR_UPPER = CHAR_LOWER.toUpperCase();
+ private static final String NUMBER = "0123456789";
+ private static final String DATA_FOR_RANDOM_STRING = CHAR_LOWER + CHAR_UPPER + NUMBER;
@Test
public void testPageHeader() throws IOException {
@@ -581,6 +590,63 @@ public class TestParquetMetadataConverter {
}
@Test
+ public void testBinaryStatsWithTruncation() {
+ int defaultTruncLen = ParquetProperties.DEFAULT_STATISTICS_TRUNCATE_LENGTH;
+ int[] validLengths = {1, 2, 16, 64, defaultTruncLen - 1};
+ for (int len : validLengths) {
+ testBinaryStatsWithTruncation(len, 60, 70);
+ testBinaryStatsWithTruncation(len, (int) ParquetMetadataConverter.MAX_STATS_SIZE, 190);
+ testBinaryStatsWithTruncation(len, 280, (int) ParquetMetadataConverter.MAX_STATS_SIZE);
+ testBinaryStatsWithTruncation(len, (int) ParquetMetadataConverter.MAX_STATS_SIZE, (int) ParquetMetadataConverter.MAX_STATS_SIZE);
+ }
+
+ int[] invalidLengths = {-1, 0, Integer.MAX_VALUE + 1};
+ for (int len : invalidLengths) {
+ try {
+ testBinaryStatsWithTruncation(len, 80, 20);
+ Assert.fail("Expected IllegalArgumentException but didn't happen");
+ } catch (IllegalArgumentException e) {
+ // expected, nothing to do
+ }
+ }
+ }
+
+ // The number of minLen and maxLen shouldn't matter because the comparision is controlled by prefix
+ private void testBinaryStatsWithTruncation(int truncateLen, int minLen, int maxLen) {
+ BinaryStatistics stats = new BinaryStatistics();
+ byte[] min = generateRandomString("a", minLen).getBytes();
+ byte[] max = generateRandomString("b", maxLen).getBytes();
+ stats.updateStats(Binary.fromConstantByteArray(min));
+ stats.updateStats(Binary.fromConstantByteArray(max));
+ ParquetMetadataConverter metadataConverter = new ParquetMetadataConverter(truncateLen);
+ org.apache.parquet.format.Statistics formatStats = metadataConverter.toParquetStatistics(stats);
+
+ if (minLen + maxLen >= ParquetMetadataConverter.MAX_STATS_SIZE) {
+ assertNull(formatStats.getMin_value());
+ assertNull(formatStats.getMax_value());
+ } else {
+ String minString = new String(min, Charset.forName("UTF-8"));
+ String minStatString = new String(formatStats.getMin_value(), Charset.forName("UTF-8"));
+ assertTrue(minStatString.compareTo(minString) <= 0);
+ String maxString = new String(max, Charset.forName("UTF-8"));
+ String maxStatString = new String(formatStats.getMax_value(), Charset.forName("UTF-8"));
+ assertTrue(maxStatString.compareTo(maxString) >= 0);
+ }
+ }
+
+ private static String generateRandomString(String prefix, int length) {
+ assertTrue(prefix.length() <= length);
+ StringBuilder sb = new StringBuilder(length);
+ sb.append(prefix);
+ for (int i = 0; i < length - prefix.length(); i++) {
+ int rndCharAt = random.nextInt(DATA_FOR_RANDOM_STRING.length());
+ char rndChar = DATA_FOR_RANDOM_STRING.charAt(rndCharAt);
+ sb.append(rndChar);
+ }
+ return sb.toString();
+ }
+
+ @Test
public void testIntegerStatsV1() {
testIntegerStats(StatsHelper.V1);
}