You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by fo...@apache.org on 2023/05/19 21:05:33 UTC
[iceberg] branch master updated: Parquet: Update parquet to 1.13.1 (#7301)
This is an automated email from the ASF dual-hosted git repository.
fokko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 14a30c015b Parquet: Update parquet to 1.13.1 (#7301)
14a30c015b is described below
commit 14a30c015be6e6a9d3f2f1c1afc1ac67a3d40382
Author: Prashant Singh <35...@users.noreply.github.com>
AuthorDate: Fri May 19 14:05:27 2023 -0700
Parquet: Update parquet to 1.13.1 (#7301)
* Parquet: Update parquet to 1.13.0
* fix aliyun failures
* Disable dictionary encoding to make sure BF always gets created
* Update the doc
* point to 1.13.1 using staging repo until officially released
* revert staging remote repo
* remove apache commons dep
* address review feedback
---------
Co-authored-by: Prashant Singh <ps...@amazon.com>
---
.../java/org/apache/iceberg/TableProperties.java | 3 +
docs/configuration.md | 91 +++++++++++-----------
.../java/org/apache/iceberg/parquet/Parquet.java | 18 +++++
.../iceberg/parquet/TestBloomRowGroupFilter.java | 2 +
versions.props | 4 +-
5 files changed, 71 insertions(+), 47 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/TableProperties.java b/core/src/main/java/org/apache/iceberg/TableProperties.java
index b14354def6..770865a569 100644
--- a/core/src/main/java/org/apache/iceberg/TableProperties.java
+++ b/core/src/main/java/org/apache/iceberg/TableProperties.java
@@ -135,6 +135,9 @@ public class TableProperties {
public static final String DELETE_PARQUET_PAGE_ROW_LIMIT = "write.delete.parquet.page-row-limit";
public static final int PARQUET_PAGE_ROW_LIMIT_DEFAULT = 20_000;
+ public static final String PARQUET_DICT_ENABLED = "write.parquet.enable.dictionary";
+ public static final boolean PARQUET_DICT_ENABLED_DEFAULT = true;
+
public static final String PARQUET_DICT_SIZE_BYTES = "write.parquet.dict-size-bytes";
public static final String DELETE_PARQUET_DICT_SIZE_BYTES =
"write.delete.parquet.dict-size-bytes";
diff --git a/docs/configuration.md b/docs/configuration.md
index 15e36b34e4..99a25fec12 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -47,51 +47,52 @@ Iceberg tables support table properties to configure table behavior, like the de
### Write properties
-| Property | Default | Description |
-|-----------------------------------------------------|----------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
-| write.format.default | parquet | Default file format for the table; parquet, avro, or orc |
-| write.delete.format.default | data file format | Default delete file format for the table; parquet, avro, or orc |
-| write.parquet.row-group-size-bytes | 134217728 (128 MB) | Parquet row group size |
-| write.parquet.page-size-bytes | 1048576 (1 MB) | Parquet page size |
-| write.parquet.page-row-limit | 20000 | Parquet page row limit |
-| write.parquet.dict-size-bytes | 2097152 (2 MB) | Parquet dictionary page size |
-| write.parquet.compression-codec | gzip | Parquet compression codec: zstd, brotli, lz4, gzip, snappy, uncompressed |
-| write.parquet.compression-level | null | Parquet compression level |
-| write.parquet.bloom-filter-enabled.column.col1 | (not set) | Enables writing a bloom filter for the column: col1 |
-| write.parquet.bloom-filter-max-bytes | 1048576 (1 MB) | The maximum number of bytes for a bloom filter bitset |
-| write.avro.compression-codec | gzip | Avro compression codec: gzip(deflate with 9 level), zstd, snappy, uncompressed |
-| write.avro.compression-level | null | Avro compression level |
-| write.orc.stripe-size-bytes | 67108864 (64 MB) | Define the default ORC stripe size, in bytes |
-| write.orc.block-size-bytes | 268435456 (256 MB) | Define the default file system block size for ORC files |
-| write.orc.compression-codec | zlib | ORC compression codec: zstd, lz4, lzo, zlib, snappy, none |
-| write.orc.compression-strategy | speed | ORC compression strategy: speed, compression |
-| write.orc.bloom.filter.columns | (not set) | Comma separated list of column names for which a Bloom filter must be created |
-| write.orc.bloom.filter.fpp | 0.05 | False positive probability for Bloom filter (must > 0.0 and < 1.0) |
-| write.location-provider.impl | null | Optional custom implementation for LocationProvider |
-| write.metadata.compression-codec | none | Metadata compression codec; none or gzip |
-| write.metadata.metrics.max-inferred-column-defaults | 100 | Defines the maximum number of columns for which metrics are collected |
-| write.metadata.metrics.default | truncate(16) | Default metrics mode for all columns in the table; none, counts, truncate(length), or full |
-| write.metadata.metrics.column.col1 | (not set) | Metrics mode for column 'col1' to allow per-column tuning; none, counts, truncate(length), or full |
-| write.target-file-size-bytes | 536870912 (512 MB) | Controls the size of files generated to target about this many bytes |
-| write.delete.target-file-size-bytes | 67108864 (64 MB) | Controls the size of delete files generated to target about this many bytes |
-| write.distribution-mode | none | Defines distribution of write data: __none__: don't shuffle rows; __hash__: hash distribute by partition key ; __range__: range distribute by partition key or sort key if table has an SortOrder |
-| write.delete.distribution-mode | hash | Defines distribution of write delete data |
-| write.update.distribution-mode | hash | Defines distribution of write update data |
-| write.merge.distribution-mode | none | Defines distribution of write merge data |
-| write.wap.enabled | false | Enables write-audit-publish writes |
-| write.summary.partition-limit | 0 | Includes partition-level summary stats in snapshot summaries if the changed partition count is less than this limit |
-| write.metadata.delete-after-commit.enabled | false | Controls whether to delete the oldest **tracked** version metadata files after commit |
-| write.metadata.previous-versions-max | 100 | The max number of previous version metadata files to keep before deleting after commit |
-| write.spark.fanout.enabled | false | Enables the fanout writer in Spark that does not require data to be clustered; uses more memory |
-| write.object-storage.enabled | false | Enables the object storage location provider that adds a hash component to file paths |
-| write.data.path | table location + /data | Base location for data files |
-| write.metadata.path | table location + /metadata | Base location for metadata files |
-| write.delete.mode | copy-on-write | Mode used for delete commands: copy-on-write or merge-on-read (v2 only) |
-| write.delete.isolation-level | serializable | Isolation level for delete commands: serializable or snapshot |
-| write.update.mode | copy-on-write | Mode used for update commands: copy-on-write or merge-on-read (v2 only) |
-| write.update.isolation-level | serializable | Isolation level for update commands: serializable or snapshot |
-| write.merge.mode | copy-on-write | Mode used for merge commands: copy-on-write or merge-on-read (v2 only) |
-| write.merge.isolation-level | serializable | Isolation level for merge commands: serializable or snapshot |
+| Property | Default | Description |
+|------------------------------------------------------|-----------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| write.format.default | parquet | Default file format for the table; parquet, avro, or orc |
+| write.delete.format.default | data file format | Default delete file format for the table; parquet, avro, or orc |
+| write.parquet.row-group-size-bytes | 134217728 (128 MB) | Parquet row group size |
+| write.parquet.page-size-bytes | 1048576 (1 MB) | Parquet page size |
+| write.parquet.page-row-limit | 20000 | Parquet page row limit |
+ | write.parquet.dictionary.enabled | true | Enable dictionary encoding |
+ | write.parquet.dict-size-bytes | 2097152 (2 MB) | Parquet dictionary page size |
+| write.parquet.compression-codec | gzip | Parquet compression codec: zstd, brotli, lz4, gzip, snappy, uncompressed |
+| write.parquet.compression-level | null | Parquet compression level |
+| write.parquet.bloom-filter-enabled.column.col1 | (not set) | Hint to parquet to write a bloom filter for the column: col1 |
+| write.parquet.bloom-filter-max-bytes | 1048576 (1 MB) | The maximum number of bytes for a bloom filter bitset |
+| write.avro.compression-codec | gzip | Avro compression codec: gzip(deflate with 9 level), zstd, snappy, uncompressed |
+| write.avro.compression-level | null | Avro compression level |
+| write.orc.stripe-size-bytes | 67108864 (64 MB) | Define the default ORC stripe size, in bytes |
+| write.orc.block-size-bytes | 268435456 (256 MB) | Define the default file system block size for ORC files |
+| write.orc.compression-codec | zlib | ORC compression codec: zstd, lz4, lzo, zlib, snappy, none |
+| write.orc.compression-strategy | speed | ORC compression strategy: speed, compression |
+| write.orc.bloom.filter.columns | (not set) | Comma separated list of column names for which a Bloom filter must be created |
+| write.orc.bloom.filter.fpp | 0.05 | False positive probability for Bloom filter (must > 0.0 and < 1.0) |
+| write.location-provider.impl | null | Optional custom implementation for LocationProvider |
+| write.metadata.compression-codec | none | Metadata compression codec; none or gzip |
+| write.metadata.metrics.max-inferred-column-defaults | 100 | Defines the maximum number of columns for which metrics are collected |
+| write.metadata.metrics.default | truncate(16) | Default metrics mode for all columns in the table; none, counts, truncate(length), or full |
+| write.metadata.metrics.column.col1 | (not set) | Metrics mode for column 'col1' to allow per-column tuning; none, counts, truncate(length), or full |
+| write.target-file-size-bytes | 536870912 (512 MB) | Controls the size of files generated to target about this many bytes |
+| write.delete.target-file-size-bytes | 67108864 (64 MB) | Controls the size of delete files generated to target about this many bytes |
+| write.distribution-mode | none | Defines distribution of write data: __none__: don't shuffle rows; __hash__: hash distribute by partition key ; __range__: range distribute by partition key or sort key if table has an SortOrder |
+| write.delete.distribution-mode | hash | Defines distribution of write delete data |
+| write.update.distribution-mode | hash | Defines distribution of write update data |
+| write.merge.distribution-mode | none | Defines distribution of write merge data |
+| write.wap.enabled | false | Enables write-audit-publish writes |
+| write.summary.partition-limit | 0 | Includes partition-level summary stats in snapshot summaries if the changed partition count is less than this limit |
+| write.metadata.delete-after-commit.enabled | false | Controls whether to delete the oldest **tracked** version metadata files after commit |
+| write.metadata.previous-versions-max | 100 | The max number of previous version metadata files to keep before deleting after commit |
+| write.spark.fanout.enabled | false | Enables the fanout writer in Spark that does not require data to be clustered; uses more memory |
+| write.object-storage.enabled | false | Enables the object storage location provider that adds a hash component to file paths |
+| write.data.path | table location + /data | Base location for data files |
+| write.metadata.path | table location + /metadata | Base location for metadata files |
+| write.delete.mode | copy-on-write | Mode used for delete commands: copy-on-write or merge-on-read (v2 only) |
+| write.delete.isolation-level | serializable | Isolation level for delete commands: serializable or snapshot |
+| write.update.mode | copy-on-write | Mode used for update commands: copy-on-write or merge-on-read (v2 only) |
+| write.update.isolation-level | serializable | Isolation level for update commands: serializable or snapshot |
+| write.merge.mode | copy-on-write | Mode used for merge commands: copy-on-write or merge-on-read (v2 only) |
+| write.merge.isolation-level | serializable | Isolation level for merge commands: serializable or snapshot |
### Table behavior properties
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
index 8b1e6c0564..7caeb7c7cd 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
@@ -33,6 +33,8 @@ import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION;
import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_DEFAULT;
import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_LEVEL;
import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_LEVEL_DEFAULT;
+import static org.apache.iceberg.TableProperties.PARQUET_DICT_ENABLED;
+import static org.apache.iceberg.TableProperties.PARQUET_DICT_ENABLED_DEFAULT;
import static org.apache.iceberg.TableProperties.PARQUET_DICT_SIZE_BYTES;
import static org.apache.iceberg.TableProperties.PARQUET_DICT_SIZE_BYTES_DEFAULT;
import static org.apache.iceberg.TableProperties.PARQUET_PAGE_ROW_LIMIT;
@@ -244,6 +246,7 @@ public class Parquet {
int rowGroupSize = context.rowGroupSize();
int pageSize = context.pageSize();
int pageRowLimit = context.pageRowLimit();
+ boolean dictionaryEnabled = context.dictionaryEnabled();
int dictionaryPageSize = context.dictionaryPageSize();
String compressionLevel = context.compressionLevel();
CompressionCodecName codec = context.codec();
@@ -286,6 +289,7 @@ public class Parquet {
.withWriterVersion(writerVersion)
.withPageSize(pageSize)
.withPageRowCountLimit(pageRowLimit)
+ .withDictionaryEncoding(dictionaryEnabled)
.withDictionaryPageSize(dictionaryPageSize)
.withMinRowCountForPageSizeCheck(rowGroupCheckMinRecordCount)
.withMaxRowCountForPageSizeCheck(rowGroupCheckMaxRecordCount)
@@ -323,6 +327,7 @@ public class Parquet {
.withRowGroupSize(rowGroupSize)
.withPageSize(pageSize)
.withPageRowCountLimit(pageRowLimit)
+ .withDictionaryEncoding(dictionaryEnabled)
.withDictionaryPageSize(dictionaryPageSize);
for (Map.Entry<String, String> entry : columnBloomFilterEnabled.entrySet()) {
@@ -339,6 +344,7 @@ public class Parquet {
private final int rowGroupSize;
private final int pageSize;
private final int pageRowLimit;
+ private final boolean dictionaryEnabled;
private final int dictionaryPageSize;
private final CompressionCodecName codec;
private final String compressionLevel;
@@ -351,6 +357,7 @@ public class Parquet {
int rowGroupSize,
int pageSize,
int pageRowLimit,
+ boolean dictionaryEnabled,
int dictionaryPageSize,
CompressionCodecName codec,
String compressionLevel,
@@ -361,6 +368,7 @@ public class Parquet {
this.rowGroupSize = rowGroupSize;
this.pageSize = pageSize;
this.pageRowLimit = pageRowLimit;
+ this.dictionaryEnabled = dictionaryEnabled;
this.dictionaryPageSize = dictionaryPageSize;
this.codec = codec;
this.compressionLevel = compressionLevel;
@@ -386,6 +394,10 @@ public class Parquet {
config, PARQUET_PAGE_ROW_LIMIT, PARQUET_PAGE_ROW_LIMIT_DEFAULT);
Preconditions.checkArgument(pageRowLimit > 0, "Page row count limit must be > 0");
+ boolean dictionaryEnabled =
+ PropertyUtil.propertyAsBoolean(
+ config, PARQUET_DICT_ENABLED, PARQUET_DICT_ENABLED_DEFAULT);
+
int dictionaryPageSize =
PropertyUtil.propertyAsInt(
config, PARQUET_DICT_SIZE_BYTES, PARQUET_DICT_SIZE_BYTES_DEFAULT);
@@ -429,6 +441,7 @@ public class Parquet {
rowGroupSize,
pageSize,
pageRowLimit,
+ dictionaryEnabled,
dictionaryPageSize,
codec,
compressionLevel,
@@ -500,6 +513,7 @@ public class Parquet {
rowGroupSize,
pageSize,
pageRowLimit,
+ dataContext.dictionaryEnabled(),
dictionaryPageSize,
codec,
compressionLevel,
@@ -529,6 +543,10 @@ public class Parquet {
return pageRowLimit;
}
+ boolean dictionaryEnabled() {
+ return dictionaryEnabled;
+ }
+
int dictionaryPageSize() {
return dictionaryPageSize;
}
diff --git a/parquet/src/test/java/org/apache/iceberg/parquet/TestBloomRowGroupFilter.java b/parquet/src/test/java/org/apache/iceberg/parquet/TestBloomRowGroupFilter.java
index 34a92a9b44..acb318a232 100644
--- a/parquet/src/test/java/org/apache/iceberg/parquet/TestBloomRowGroupFilter.java
+++ b/parquet/src/test/java/org/apache/iceberg/parquet/TestBloomRowGroupFilter.java
@@ -19,6 +19,7 @@
package org.apache.iceberg.parquet;
import static org.apache.iceberg.TableProperties.PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX;
+import static org.apache.iceberg.TableProperties.PARQUET_DICT_ENABLED;
import static org.apache.iceberg.avro.AvroSchemaUtil.convert;
import static org.apache.iceberg.expressions.Expressions.and;
import static org.apache.iceberg.expressions.Expressions.equal;
@@ -197,6 +198,7 @@ public class TestBloomRowGroupFilter {
try (FileAppender<Record> appender =
Parquet.write(outFile)
.schema(FILE_SCHEMA)
+ .set(PARQUET_DICT_ENABLED, "false")
.set(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX + "_id", "true")
.set(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX + "_long", "true")
.set(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX + "_double", "true")
diff --git a/versions.props b/versions.props
index 7af5f0eaa9..c9028be3cf 100644
--- a/versions.props
+++ b/versions.props
@@ -5,7 +5,7 @@ org.apache.hadoop:* = 2.7.3
org.apache.hive:* = 2.3.9
org.apache.httpcomponents.client5:* = 5.2.1
org.apache.orc:* = 1.8.3
-org.apache.parquet:* = 1.12.3
+org.apache.parquet:* = 1.13.1
org.apache.pig:pig = 0.14.0
com.fasterxml.jackson.*:* = 2.14.1
com.google.code.findbugs:jsr305 = 3.0.2
@@ -48,4 +48,4 @@ com.esotericsoftware:kryo = 4.0.2
org.eclipse.jetty:* = 9.4.43.v20210629
org.testcontainers:* = 1.17.6
io.delta:delta-core_* = 2.2.0
-org.awaitility:awaitility = 4.2.0
+org.awaitility:awaitility = 4.2.0
\ No newline at end of file