You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by st...@apache.org on 2022/10/26 15:54:22 UTC
[iceberg] branch master updated: Flink: Add Sink options to override the compression properties of the Table (#6049)
This is an automated email from the ASF dual-hosted git repository.
stevenwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new e35f0d2f9d Flink: Add Sink options to override the compression properties of the Table (#6049)
e35f0d2f9d is described below
commit e35f0d2f9d18bd34ce2375fb018bb8b8211d41aa
Author: pvary <pv...@cloudera.com>
AuthorDate: Wed Oct 26 17:54:16 2022 +0200
Flink: Add Sink options to override the compression properties of the Table (#6049)
* Flink compression properties
* Review comments
Co-authored-by: Peter Vary <pe...@apple.com>
---
docs/flink-getting-started.md | 17 +-
.../org/apache/iceberg/flink/FlinkWriteConf.java | 60 +++++
.../apache/iceberg/flink/FlinkWriteOptions.java | 12 +
.../org/apache/iceberg/flink/sink/FlinkSink.java | 52 ++++-
.../flink/sink/RowDataTaskWriterFactory.java | 9 +-
.../iceberg/flink/source/RowDataRewriter.java | 8 +-
.../flink/sink/TestCompressionSettings.java | 254 +++++++++++++++++++++
.../iceberg/flink/sink/TestDeltaTaskWriter.java | 1 +
.../apache/iceberg/flink/sink/TestTaskWriters.java | 1 +
.../flink/source/TestProjectMetaColumn.java | 1 +
10 files changed, 402 insertions(+), 13 deletions(-)
diff --git a/docs/flink-getting-started.md b/docs/flink-getting-started.md
index 1ca421e0fc..c6a2e0d490 100644
--- a/docs/flink-getting-started.md
+++ b/docs/flink-getting-started.md
@@ -700,13 +700,16 @@ INSERT INTO tableName /*+ OPTIONS('upsert-enabled'='true') */
...
```
-| Flink option | Default | Description |
-|------------------------| -------------------------- |------------------------------------------------------------------------------------------------------------|
-| write-format | Table write.format.default | File format to use for this write operation; parquet, avro, or orc |
-| target-file-size-bytes | As per table property | Overrides this table's write.target-file-size-bytes |
-| upsert-enabled | Table write.upsert.enabled | Overrides this table's write.upsert.enabled |
-| overwrite-enabled | false | Overwrite the table's data, overwrite mode shouldn't be enable when configuring to use UPSERT data stream. |
-| distribution-mode | Table write.distribution-mode | Overrides this table's write.distribution-mode |
+| Flink option | Default | Description |
+|------------------------|--------------------------------------------|------------------------------------------------------------------------------------------------------------|
+| write-format | Table write.format.default | File format to use for this write operation; parquet, avro, or orc |
+| target-file-size-bytes | As per table property | Overrides this table's write.target-file-size-bytes |
+| upsert-enabled | Table write.upsert.enabled | Overrides this table's write.upsert.enabled |
+| overwrite-enabled | false | Overwrite the table's data, overwrite mode shouldn't be enable when configuring to use UPSERT data stream. |
+| distribution-mode | Table write.distribution-mode | Overrides this table's write.distribution-mode |
+| compression-codec | Table write.(fileformat).compression-codec | Overrides this table's compression codec for this write |
+| compression-level | Table write.(fileformat).compression-level | Overrides this table's compression level for Parquet and Avro tables for this write |
+| compression-strategy | Table write.orc.compression-strategy | Overrides this table's compression strategy for ORC tables for this write |
## Inspecting tables.
diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java
index 06528c9716..448b2aa2d8 100644
--- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java
@@ -93,6 +93,66 @@ public class FlinkWriteConf {
.parse();
}
+ public String parquetCompressionCodec() {
+ return confParser
+ .stringConf()
+ .option(FlinkWriteOptions.COMPRESSION_CODEC.key())
+ .flinkConfig(FlinkWriteOptions.COMPRESSION_CODEC)
+ .tableProperty(TableProperties.PARQUET_COMPRESSION)
+ .defaultValue(TableProperties.PARQUET_COMPRESSION_DEFAULT)
+ .parse();
+ }
+
+ public String parquetCompressionLevel() {
+ return confParser
+ .stringConf()
+ .option(FlinkWriteOptions.COMPRESSION_LEVEL.key())
+ .flinkConfig(FlinkWriteOptions.COMPRESSION_LEVEL)
+ .tableProperty(TableProperties.PARQUET_COMPRESSION_LEVEL)
+ .defaultValue(TableProperties.PARQUET_COMPRESSION_LEVEL_DEFAULT)
+ .parseOptional();
+ }
+
+ public String avroCompressionCodec() {
+ return confParser
+ .stringConf()
+ .option(FlinkWriteOptions.COMPRESSION_CODEC.key())
+ .flinkConfig(FlinkWriteOptions.COMPRESSION_CODEC)
+ .tableProperty(TableProperties.AVRO_COMPRESSION)
+ .defaultValue(TableProperties.AVRO_COMPRESSION_DEFAULT)
+ .parse();
+ }
+
+ public String avroCompressionLevel() {
+ return confParser
+ .stringConf()
+ .option(FlinkWriteOptions.COMPRESSION_LEVEL.key())
+ .flinkConfig(FlinkWriteOptions.COMPRESSION_LEVEL)
+ .tableProperty(TableProperties.AVRO_COMPRESSION_LEVEL)
+ .defaultValue(TableProperties.AVRO_COMPRESSION_LEVEL_DEFAULT)
+ .parseOptional();
+ }
+
+ public String orcCompressionCodec() {
+ return confParser
+ .stringConf()
+ .option(FlinkWriteOptions.COMPRESSION_CODEC.key())
+ .flinkConfig(FlinkWriteOptions.COMPRESSION_CODEC)
+ .tableProperty(TableProperties.ORC_COMPRESSION)
+ .defaultValue(TableProperties.ORC_COMPRESSION_DEFAULT)
+ .parse();
+ }
+
+ public String orcCompressionStrategy() {
+ return confParser
+ .stringConf()
+ .option(FlinkWriteOptions.COMPRESSION_STRATEGY.key())
+ .flinkConfig(FlinkWriteOptions.COMPRESSION_STRATEGY)
+ .tableProperty(TableProperties.ORC_COMPRESSION_STRATEGY)
+ .defaultValue(TableProperties.ORC_COMPRESSION_STRATEGY_DEFAULT)
+ .parse();
+ }
+
public DistributionMode distributionMode() {
String modeName =
confParser
diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java
index a3091d5779..f3cc52972b 100644
--- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java
@@ -34,6 +34,18 @@ public class FlinkWriteOptions {
public static final ConfigOption<Long> TARGET_FILE_SIZE_BYTES =
ConfigOptions.key("target-file-size-bytes").longType().noDefaultValue();
+ // Overrides this table's write.<FILE_FORMAT>.compression-codec
+ public static final ConfigOption<String> COMPRESSION_CODEC =
+ ConfigOptions.key("compression-codec").stringType().noDefaultValue();
+
+ // Overrides this table's write.<FILE_FORMAT>.compression-level
+ public static final ConfigOption<String> COMPRESSION_LEVEL =
+ ConfigOptions.key("compression-level").stringType().noDefaultValue();
+
+ // Overrides this table's write.<FILE_FORMAT>.compression-strategy
+ public static final ConfigOption<String> COMPRESSION_STRATEGY =
+ ConfigOptions.key("compression-strategy").stringType().noDefaultValue();
+
// Overrides this table's write.upsert.enabled
public static final ConfigOption<Boolean> WRITE_UPSERT_ENABLED =
ConfigOptions.key("upsert-enabled").booleanType().noDefaultValue();
diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
index 172484fc11..81706e5824 100644
--- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
@@ -18,6 +18,12 @@
*/
package org.apache.iceberg.flink.sink;
+import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION;
+import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION_LEVEL;
+import static org.apache.iceberg.TableProperties.ORC_COMPRESSION;
+import static org.apache.iceberg.TableProperties.ORC_COMPRESSION_STRATEGY;
+import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION;
+import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_LEVEL;
import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE;
import java.io.IOException;
@@ -43,6 +49,7 @@ import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.Row;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DistributionMode;
+import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
@@ -564,14 +571,57 @@ public class FlinkSink {
Preconditions.checkArgument(table != null, "Iceberg table shouldn't be null");
Table serializableTable = SerializableTable.copyOf(table);
+ FileFormat format = flinkWriteConf.dataFileFormat();
TaskWriterFactory<RowData> taskWriterFactory =
new RowDataTaskWriterFactory(
serializableTable,
flinkRowType,
flinkWriteConf.targetDataFileSize(),
- flinkWriteConf.dataFileFormat(),
+ format,
+ writeProperties(table, format, flinkWriteConf),
equalityFieldIds,
flinkWriteConf.upsertMode());
return new IcebergStreamWriter<>(table.name(), taskWriterFactory);
}
+
+ /**
+ * Based on the {@link FileFormat} overwrites the table level compression properties for the table
+ * write.
+ *
+ * @param table The table to get the table level settings
+ * @param format The FileFormat to use
+ * @param conf The write configuration
+ * @return The properties to use for writing
+ */
+ private static Map<String, String> writeProperties(
+ Table table, FileFormat format, FlinkWriteConf conf) {
+ Map<String, String> writeProperties = Maps.newHashMap(table.properties());
+
+ switch (format) {
+ case PARQUET:
+ writeProperties.put(PARQUET_COMPRESSION, conf.parquetCompressionCodec());
+ String parquetCompressionLevel = conf.parquetCompressionLevel();
+ if (parquetCompressionLevel != null) {
+ writeProperties.put(PARQUET_COMPRESSION_LEVEL, parquetCompressionLevel);
+ }
+
+ break;
+ case AVRO:
+ writeProperties.put(AVRO_COMPRESSION, conf.avroCompressionCodec());
+ String avroCompressionLevel = conf.avroCompressionLevel();
+ if (avroCompressionLevel != null) {
+ writeProperties.put(AVRO_COMPRESSION_LEVEL, conf.avroCompressionLevel());
+ }
+
+ break;
+ case ORC:
+ writeProperties.put(ORC_COMPRESSION, conf.orcCompressionCodec());
+ writeProperties.put(ORC_COMPRESSION_STRATEGY, conf.orcCompressionStrategy());
+ break;
+ default:
+ throw new IllegalArgumentException(String.format("Unknown file format %s", format));
+ }
+
+ return writeProperties;
+ }
}
diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java
index 1c330434d0..634c2dfdda 100644
--- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java
@@ -19,6 +19,7 @@
package org.apache.iceberg.flink.sink;
import java.util.List;
+import java.util.Map;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.iceberg.FileFormat;
@@ -57,6 +58,7 @@ public class RowDataTaskWriterFactory implements TaskWriterFactory<RowData> {
RowType flinkSchema,
long targetFileSizeBytes,
FileFormat format,
+ Map<String, String> writeProperties,
List<Integer> equalityFieldIds,
boolean upsert) {
this.table = table;
@@ -70,8 +72,7 @@ public class RowDataTaskWriterFactory implements TaskWriterFactory<RowData> {
this.upsert = upsert;
if (equalityFieldIds == null || equalityFieldIds.isEmpty()) {
- this.appenderFactory =
- new FlinkAppenderFactory(schema, flinkSchema, table.properties(), spec);
+ this.appenderFactory = new FlinkAppenderFactory(schema, flinkSchema, writeProperties, spec);
} else if (upsert) {
// In upsert mode, only the new row is emitted using INSERT row kind. Therefore, any column of
// the inserted row
@@ -82,7 +83,7 @@ public class RowDataTaskWriterFactory implements TaskWriterFactory<RowData> {
new FlinkAppenderFactory(
schema,
flinkSchema,
- table.properties(),
+ writeProperties,
spec,
ArrayUtil.toIntArray(equalityFieldIds),
TypeUtil.select(schema, Sets.newHashSet(equalityFieldIds)),
@@ -92,7 +93,7 @@ public class RowDataTaskWriterFactory implements TaskWriterFactory<RowData> {
new FlinkAppenderFactory(
schema,
flinkSchema,
- table.properties(),
+ writeProperties,
spec,
ArrayUtil.toIntArray(equalityFieldIds),
schema,
diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/RowDataRewriter.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/RowDataRewriter.java
index 1468879097..23665b7c9f 100644
--- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/RowDataRewriter.java
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/RowDataRewriter.java
@@ -77,7 +77,13 @@ public class RowDataRewriter {
RowType flinkSchema = FlinkSchemaUtil.convert(table.schema());
this.taskWriterFactory =
new RowDataTaskWriterFactory(
- SerializableTable.copyOf(table), flinkSchema, Long.MAX_VALUE, format, null, false);
+ SerializableTable.copyOf(table),
+ flinkSchema,
+ Long.MAX_VALUE,
+ format,
+ table.properties(),
+ null,
+ false);
}
public List<DataFile> rewriteDataForTasks(
diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestCompressionSettings.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestCompressionSettings.java
new file mode 100644
index 0000000000..49f472b732
--- /dev/null
+++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestCompressionSettings.java
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.common.DynFields;
+import org.apache.iceberg.flink.FlinkWriteConf;
+import org.apache.iceberg.flink.FlinkWriteOptions;
+import org.apache.iceberg.flink.SimpleDataUtil;
+import org.apache.iceberg.io.BaseTaskWriter;
+import org.apache.iceberg.io.TaskWriter;
+import org.apache.iceberg.io.WriteResult;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class TestCompressionSettings {
+ @Rule public TemporaryFolder tempFolder = new TemporaryFolder();
+
+ private Table table;
+
+ private final Map<String, String> initProperties;
+
+ @Parameterized.Parameters(name = "tableProperties = {0}")
+ public static Object[] parameters() {
+ return new Object[] {
+ ImmutableMap.of(),
+ ImmutableMap.of(
+ TableProperties.AVRO_COMPRESSION,
+ "zstd",
+ TableProperties.AVRO_COMPRESSION_LEVEL,
+ "3",
+ TableProperties.PARQUET_COMPRESSION,
+ "zstd",
+ TableProperties.PARQUET_COMPRESSION_LEVEL,
+ "3",
+ TableProperties.ORC_COMPRESSION,
+ "zstd",
+ TableProperties.ORC_COMPRESSION_STRATEGY,
+ "compression")
+ };
+ }
+
+ public TestCompressionSettings(Map<String, String> initProperties) {
+ this.initProperties = initProperties;
+ }
+
+ @Before
+ public void before() throws IOException {
+ File folder = tempFolder.newFolder();
+ table = SimpleDataUtil.createTable(folder.getAbsolutePath(), initProperties, false);
+ }
+
+ @Test
+ public void testCompressionAvro() throws Exception {
+ // No override provided
+ Map<String, String> resultProperties =
+ appenderProperties(
+ table,
+ SimpleDataUtil.FLINK_SCHEMA,
+ ImmutableMap.of(FlinkWriteOptions.WRITE_FORMAT.key(), "AVRO"));
+
+ if (initProperties.get(TableProperties.AVRO_COMPRESSION) == null) {
+ Assert.assertEquals(
+ TableProperties.AVRO_COMPRESSION_DEFAULT,
+ resultProperties.get(TableProperties.AVRO_COMPRESSION));
+ Assert.assertEquals(
+ TableProperties.AVRO_COMPRESSION_LEVEL_DEFAULT,
+ resultProperties.get(TableProperties.AVRO_COMPRESSION_LEVEL));
+ } else {
+ Assert.assertEquals(
+ initProperties.get(TableProperties.AVRO_COMPRESSION),
+ resultProperties.get(TableProperties.AVRO_COMPRESSION));
+ Assert.assertEquals(
+ initProperties.get(TableProperties.AVRO_COMPRESSION_LEVEL),
+ resultProperties.get(TableProperties.AVRO_COMPRESSION_LEVEL));
+ }
+
+ // Override compression to snappy and some random level
+ resultProperties =
+ appenderProperties(
+ table,
+ SimpleDataUtil.FLINK_SCHEMA,
+ ImmutableMap.of(
+ FlinkWriteOptions.WRITE_FORMAT.key(),
+ "AVRO",
+ FlinkWriteOptions.COMPRESSION_CODEC.key(),
+ "snappy",
+ FlinkWriteOptions.COMPRESSION_LEVEL.key(),
+ "6"));
+
+ Assert.assertEquals("snappy", resultProperties.get(TableProperties.AVRO_COMPRESSION));
+ Assert.assertEquals("6", resultProperties.get(TableProperties.AVRO_COMPRESSION_LEVEL));
+ }
+
+ @Test
+ public void testCompressionParquet() throws Exception {
+ // No override provided
+ Map<String, String> resultProperties =
+ appenderProperties(
+ table,
+ SimpleDataUtil.FLINK_SCHEMA,
+ ImmutableMap.of(FlinkWriteOptions.WRITE_FORMAT.key(), "PARQUET"));
+
+ if (initProperties.get(TableProperties.PARQUET_COMPRESSION) == null) {
+ Assert.assertEquals(
+ TableProperties.PARQUET_COMPRESSION_DEFAULT,
+ resultProperties.get(TableProperties.PARQUET_COMPRESSION));
+ Assert.assertEquals(
+ TableProperties.PARQUET_COMPRESSION_LEVEL_DEFAULT,
+ resultProperties.get(TableProperties.PARQUET_COMPRESSION_LEVEL));
+ } else {
+ Assert.assertEquals(
+ initProperties.get(TableProperties.PARQUET_COMPRESSION),
+ resultProperties.get(TableProperties.PARQUET_COMPRESSION));
+ Assert.assertEquals(
+ initProperties.get(TableProperties.PARQUET_COMPRESSION_LEVEL),
+ resultProperties.get(TableProperties.PARQUET_COMPRESSION_LEVEL));
+ }
+
+ // Override compression to snappy and some random level
+ resultProperties =
+ appenderProperties(
+ table,
+ SimpleDataUtil.FLINK_SCHEMA,
+ ImmutableMap.of(
+ FlinkWriteOptions.WRITE_FORMAT.key(),
+ "PARQUET",
+ FlinkWriteOptions.COMPRESSION_CODEC.key(),
+ "snappy",
+ FlinkWriteOptions.COMPRESSION_LEVEL.key(),
+ "6"));
+
+ Assert.assertEquals("snappy", resultProperties.get(TableProperties.PARQUET_COMPRESSION));
+ Assert.assertEquals("6", resultProperties.get(TableProperties.PARQUET_COMPRESSION_LEVEL));
+ }
+
+ @Test
+ public void testCompressionOrc() throws Exception {
+ // No override provided
+ Map<String, String> resultProperties =
+ appenderProperties(
+ table,
+ SimpleDataUtil.FLINK_SCHEMA,
+ ImmutableMap.of(FlinkWriteOptions.WRITE_FORMAT.key(), "ORC"));
+
+ if (initProperties.get(TableProperties.ORC_COMPRESSION) == null) {
+ Assert.assertEquals(
+ TableProperties.ORC_COMPRESSION_DEFAULT,
+ resultProperties.get(TableProperties.ORC_COMPRESSION));
+ Assert.assertEquals(
+ TableProperties.ORC_COMPRESSION_STRATEGY_DEFAULT,
+ resultProperties.get(TableProperties.ORC_COMPRESSION_STRATEGY));
+ } else {
+ Assert.assertEquals(
+ initProperties.get(TableProperties.ORC_COMPRESSION),
+ resultProperties.get(TableProperties.ORC_COMPRESSION));
+ Assert.assertEquals(
+ initProperties.get(TableProperties.ORC_COMPRESSION_STRATEGY),
+ resultProperties.get(TableProperties.ORC_COMPRESSION_STRATEGY));
+ }
+
+ // Override compression to snappy and a different strategy
+ resultProperties =
+ appenderProperties(
+ table,
+ SimpleDataUtil.FLINK_SCHEMA,
+ ImmutableMap.of(
+ FlinkWriteOptions.WRITE_FORMAT.key(),
+ "ORC",
+ FlinkWriteOptions.COMPRESSION_CODEC.key(),
+ "snappy",
+ FlinkWriteOptions.COMPRESSION_STRATEGY.key(),
+ "speed"));
+
+ Assert.assertEquals("snappy", resultProperties.get(TableProperties.ORC_COMPRESSION));
+ Assert.assertEquals("speed", resultProperties.get(TableProperties.ORC_COMPRESSION_STRATEGY));
+ }
+
+ private static OneInputStreamOperatorTestHarness<RowData, WriteResult> createIcebergStreamWriter(
+ Table icebergTable, TableSchema flinkSchema, Map<String, String> override) throws Exception {
+ RowType flinkRowType = FlinkSink.toFlinkRowType(icebergTable.schema(), flinkSchema);
+ FlinkWriteConf flinkWriteConfig =
+ new FlinkWriteConf(
+ icebergTable, override, new org.apache.flink.configuration.Configuration());
+
+ IcebergStreamWriter<RowData> streamWriter =
+ FlinkSink.createStreamWriter(icebergTable, flinkWriteConfig, flinkRowType, null);
+ OneInputStreamOperatorTestHarness<RowData, WriteResult> harness =
+ new OneInputStreamOperatorTestHarness<>(streamWriter, 1, 1, 0);
+
+ harness.setup();
+ harness.open();
+
+ return harness;
+ }
+
+ private static Map<String, String> appenderProperties(
+ Table table, TableSchema schema, Map<String, String> override) throws Exception {
+ try (OneInputStreamOperatorTestHarness<RowData, WriteResult> testHarness =
+ createIcebergStreamWriter(table, schema, override)) {
+ testHarness.processElement(SimpleDataUtil.createRowData(1, "hello"), 1);
+
+ testHarness.prepareSnapshotPreBarrier(1L);
+ DynFields.BoundField<IcebergStreamWriter> operatorField =
+ DynFields.builder()
+ .hiddenImpl(testHarness.getOperatorFactory().getClass(), "operator")
+ .build(testHarness.getOperatorFactory());
+ DynFields.BoundField<TaskWriter> writerField =
+ DynFields.builder()
+ .hiddenImpl(IcebergStreamWriter.class, "writer")
+ .build(operatorField.get());
+ DynFields.BoundField<FlinkAppenderFactory> appenderField =
+ DynFields.builder()
+ .hiddenImpl(BaseTaskWriter.class, "appenderFactory")
+ .build(writerField.get());
+ DynFields.BoundField<Map<String, String>> propsField =
+ DynFields.builder()
+ .hiddenImpl(FlinkAppenderFactory.class, "props")
+ .build(appenderField.get());
+ return propsField.get();
+ }
+ }
+}
diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java
index 2e5e7121bb..1f8cbfe191 100644
--- a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java
+++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java
@@ -345,6 +345,7 @@ public class TestDeltaTaskWriter extends TableTestBase {
FlinkSchemaUtil.convert(table.schema()),
128 * 1024 * 1024,
format,
+ table.properties(),
equalityFieldIds,
false);
}
diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestTaskWriters.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestTaskWriters.java
index 04ec662fb8..112dbb5113 100644
--- a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestTaskWriters.java
+++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestTaskWriters.java
@@ -234,6 +234,7 @@ public class TestTaskWriters {
(RowType) SimpleDataUtil.FLINK_SCHEMA.toRowDataType().getLogicalType(),
targetFileSize,
format,
+ table.properties(),
null,
false);
taskWriterFactory.initialize(1, 1);
diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestProjectMetaColumn.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestProjectMetaColumn.java
index bc63e4a0b2..25ecec23d2 100644
--- a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestProjectMetaColumn.java
+++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestProjectMetaColumn.java
@@ -176,6 +176,7 @@ public class TestProjectMetaColumn {
SimpleDataUtil.ROW_TYPE,
TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT,
format,
+ table.properties(),
equalityFieldIds,
upsert);