You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by st...@apache.org on 2022/10/26 15:54:22 UTC

[iceberg] branch master updated: Flink: Add Sink options to override the compression properties of the Table (#6049)

This is an automated email from the ASF dual-hosted git repository.

stevenwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new e35f0d2f9d Flink: Add Sink options to override the compression properties of the Table (#6049)
e35f0d2f9d is described below

commit e35f0d2f9d18bd34ce2375fb018bb8b8211d41aa
Author: pvary <pv...@cloudera.com>
AuthorDate: Wed Oct 26 17:54:16 2022 +0200

    Flink: Add Sink options to override the compression properties of the Table (#6049)
    
    * Flink compression properties
    
    * Review comments
    
    Co-authored-by: Peter Vary <pe...@apple.com>
---
 docs/flink-getting-started.md                      |  17 +-
 .../org/apache/iceberg/flink/FlinkWriteConf.java   |  60 +++++
 .../apache/iceberg/flink/FlinkWriteOptions.java    |  12 +
 .../org/apache/iceberg/flink/sink/FlinkSink.java   |  52 ++++-
 .../flink/sink/RowDataTaskWriterFactory.java       |   9 +-
 .../iceberg/flink/source/RowDataRewriter.java      |   8 +-
 .../flink/sink/TestCompressionSettings.java        | 254 +++++++++++++++++++++
 .../iceberg/flink/sink/TestDeltaTaskWriter.java    |   1 +
 .../apache/iceberg/flink/sink/TestTaskWriters.java |   1 +
 .../flink/source/TestProjectMetaColumn.java        |   1 +
 10 files changed, 402 insertions(+), 13 deletions(-)

diff --git a/docs/flink-getting-started.md b/docs/flink-getting-started.md
index 1ca421e0fc..c6a2e0d490 100644
--- a/docs/flink-getting-started.md
+++ b/docs/flink-getting-started.md
@@ -700,13 +700,16 @@ INSERT INTO tableName /*+ OPTIONS('upsert-enabled'='true') */
 ...
 ```
 
-| Flink option           | Default                    | Description                                                                                                |
-|------------------------| -------------------------- |------------------------------------------------------------------------------------------------------------|
-| write-format           | Table write.format.default | File format to use for this write operation; parquet, avro, or orc                                         |
-| target-file-size-bytes | As per table property      | Overrides this table's write.target-file-size-bytes                                                        |
-| upsert-enabled         | Table write.upsert.enabled | Overrides this table's write.upsert.enabled                                                                |
-| overwrite-enabled      | false | Overwrite the table's data, overwrite mode shouldn't be enable when configuring to use UPSERT data stream. |
-| distribution-mode      | Table write.distribution-mode | Overrides this table's write.distribution-mode                                                             |
+| Flink option           | Default                                    | Description                                                                                                |
+|------------------------|--------------------------------------------|------------------------------------------------------------------------------------------------------------|
+| write-format           | Table write.format.default                 | File format to use for this write operation; parquet, avro, or orc                                         |
+| target-file-size-bytes | As per table property                      | Overrides this table's write.target-file-size-bytes                                                        |
+| upsert-enabled         | Table write.upsert.enabled                 | Overrides this table's write.upsert.enabled                                                                |
+| overwrite-enabled      | false                                      | Overwrite the table's data, overwrite mode shouldn't be enable when configuring to use UPSERT data stream. |
+| distribution-mode      | Table write.distribution-mode              | Overrides this table's write.distribution-mode                                                             |
+| compression-codec      | Table write.(fileformat).compression-codec | Overrides this table's compression codec for this write                                                    |
+| compression-level      | Table write.(fileformat).compression-level | Overrides this table's compression level for Parquet and Avro tables for this write                        |
+| compression-strategy   | Table write.orc.compression-strategy       | Overrides this table's compression strategy for ORC tables for this write                                  |
 
 
 ## Inspecting tables.
diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java
index 06528c9716..448b2aa2d8 100644
--- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java
@@ -93,6 +93,66 @@ public class FlinkWriteConf {
         .parse();
   }
 
+  public String parquetCompressionCodec() {
+    return confParser
+        .stringConf()
+        .option(FlinkWriteOptions.COMPRESSION_CODEC.key())
+        .flinkConfig(FlinkWriteOptions.COMPRESSION_CODEC)
+        .tableProperty(TableProperties.PARQUET_COMPRESSION)
+        .defaultValue(TableProperties.PARQUET_COMPRESSION_DEFAULT)
+        .parse();
+  }
+
+  public String parquetCompressionLevel() {
+    return confParser
+        .stringConf()
+        .option(FlinkWriteOptions.COMPRESSION_LEVEL.key())
+        .flinkConfig(FlinkWriteOptions.COMPRESSION_LEVEL)
+        .tableProperty(TableProperties.PARQUET_COMPRESSION_LEVEL)
+        .defaultValue(TableProperties.PARQUET_COMPRESSION_LEVEL_DEFAULT)
+        .parseOptional();
+  }
+
+  public String avroCompressionCodec() {
+    return confParser
+        .stringConf()
+        .option(FlinkWriteOptions.COMPRESSION_CODEC.key())
+        .flinkConfig(FlinkWriteOptions.COMPRESSION_CODEC)
+        .tableProperty(TableProperties.AVRO_COMPRESSION)
+        .defaultValue(TableProperties.AVRO_COMPRESSION_DEFAULT)
+        .parse();
+  }
+
+  public String avroCompressionLevel() {
+    return confParser
+        .stringConf()
+        .option(FlinkWriteOptions.COMPRESSION_LEVEL.key())
+        .flinkConfig(FlinkWriteOptions.COMPRESSION_LEVEL)
+        .tableProperty(TableProperties.AVRO_COMPRESSION_LEVEL)
+        .defaultValue(TableProperties.AVRO_COMPRESSION_LEVEL_DEFAULT)
+        .parseOptional();
+  }
+
+  public String orcCompressionCodec() {
+    return confParser
+        .stringConf()
+        .option(FlinkWriteOptions.COMPRESSION_CODEC.key())
+        .flinkConfig(FlinkWriteOptions.COMPRESSION_CODEC)
+        .tableProperty(TableProperties.ORC_COMPRESSION)
+        .defaultValue(TableProperties.ORC_COMPRESSION_DEFAULT)
+        .parse();
+  }
+
+  public String orcCompressionStrategy() {
+    return confParser
+        .stringConf()
+        .option(FlinkWriteOptions.COMPRESSION_STRATEGY.key())
+        .flinkConfig(FlinkWriteOptions.COMPRESSION_STRATEGY)
+        .tableProperty(TableProperties.ORC_COMPRESSION_STRATEGY)
+        .defaultValue(TableProperties.ORC_COMPRESSION_STRATEGY_DEFAULT)
+        .parse();
+  }
+
   public DistributionMode distributionMode() {
     String modeName =
         confParser
diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java
index a3091d5779..f3cc52972b 100644
--- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java
@@ -34,6 +34,18 @@ public class FlinkWriteOptions {
   public static final ConfigOption<Long> TARGET_FILE_SIZE_BYTES =
       ConfigOptions.key("target-file-size-bytes").longType().noDefaultValue();
 
+  // Overrides this table's write.<FILE_FORMAT>.compression-codec
+  public static final ConfigOption<String> COMPRESSION_CODEC =
+      ConfigOptions.key("compression-codec").stringType().noDefaultValue();
+
+  // Overrides this table's write.<FILE_FORMAT>.compression-level
+  public static final ConfigOption<String> COMPRESSION_LEVEL =
+      ConfigOptions.key("compression-level").stringType().noDefaultValue();
+
+  // Overrides this table's write.<FILE_FORMAT>.compression-strategy
+  public static final ConfigOption<String> COMPRESSION_STRATEGY =
+      ConfigOptions.key("compression-strategy").stringType().noDefaultValue();
+
   // Overrides this table's write.upsert.enabled
   public static final ConfigOption<Boolean> WRITE_UPSERT_ENABLED =
       ConfigOptions.key("upsert-enabled").booleanType().noDefaultValue();
diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
index 172484fc11..81706e5824 100644
--- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
@@ -18,6 +18,12 @@
  */
 package org.apache.iceberg.flink.sink;
 
+import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION;
+import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION_LEVEL;
+import static org.apache.iceberg.TableProperties.ORC_COMPRESSION;
+import static org.apache.iceberg.TableProperties.ORC_COMPRESSION_STRATEGY;
+import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION;
+import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_LEVEL;
 import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE;
 
 import java.io.IOException;
@@ -43,6 +49,7 @@ import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.types.Row;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DistributionMode;
+import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.PartitionField;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
@@ -564,14 +571,57 @@ public class FlinkSink {
     Preconditions.checkArgument(table != null, "Iceberg table shouldn't be null");
 
     Table serializableTable = SerializableTable.copyOf(table);
+    FileFormat format = flinkWriteConf.dataFileFormat();
     TaskWriterFactory<RowData> taskWriterFactory =
         new RowDataTaskWriterFactory(
             serializableTable,
             flinkRowType,
             flinkWriteConf.targetDataFileSize(),
-            flinkWriteConf.dataFileFormat(),
+            format,
+            writeProperties(table, format, flinkWriteConf),
             equalityFieldIds,
             flinkWriteConf.upsertMode());
     return new IcebergStreamWriter<>(table.name(), taskWriterFactory);
   }
+
+  /**
+   * Based on the {@link FileFormat} overwrites the table level compression properties for the table
+   * write.
+   *
+   * @param table The table to get the table level settings
+   * @param format The FileFormat to use
+   * @param conf The write configuration
+   * @return The properties to use for writing
+   */
+  private static Map<String, String> writeProperties(
+      Table table, FileFormat format, FlinkWriteConf conf) {
+    Map<String, String> writeProperties = Maps.newHashMap(table.properties());
+
+    switch (format) {
+      case PARQUET:
+        writeProperties.put(PARQUET_COMPRESSION, conf.parquetCompressionCodec());
+        String parquetCompressionLevel = conf.parquetCompressionLevel();
+        if (parquetCompressionLevel != null) {
+          writeProperties.put(PARQUET_COMPRESSION_LEVEL, parquetCompressionLevel);
+        }
+
+        break;
+      case AVRO:
+        writeProperties.put(AVRO_COMPRESSION, conf.avroCompressionCodec());
+        String avroCompressionLevel = conf.avroCompressionLevel();
+        if (avroCompressionLevel != null) {
+          writeProperties.put(AVRO_COMPRESSION_LEVEL, conf.avroCompressionLevel());
+        }
+
+        break;
+      case ORC:
+        writeProperties.put(ORC_COMPRESSION, conf.orcCompressionCodec());
+        writeProperties.put(ORC_COMPRESSION_STRATEGY, conf.orcCompressionStrategy());
+        break;
+      default:
+        throw new IllegalArgumentException(String.format("Unknown file format %s", format));
+    }
+
+    return writeProperties;
+  }
 }
diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java
index 1c330434d0..634c2dfdda 100644
--- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java
@@ -19,6 +19,7 @@
 package org.apache.iceberg.flink.sink;
 
 import java.util.List;
+import java.util.Map;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.iceberg.FileFormat;
@@ -57,6 +58,7 @@ public class RowDataTaskWriterFactory implements TaskWriterFactory<RowData> {
       RowType flinkSchema,
       long targetFileSizeBytes,
       FileFormat format,
+      Map<String, String> writeProperties,
       List<Integer> equalityFieldIds,
       boolean upsert) {
     this.table = table;
@@ -70,8 +72,7 @@ public class RowDataTaskWriterFactory implements TaskWriterFactory<RowData> {
     this.upsert = upsert;
 
     if (equalityFieldIds == null || equalityFieldIds.isEmpty()) {
-      this.appenderFactory =
-          new FlinkAppenderFactory(schema, flinkSchema, table.properties(), spec);
+      this.appenderFactory = new FlinkAppenderFactory(schema, flinkSchema, writeProperties, spec);
     } else if (upsert) {
       // In upsert mode, only the new row is emitted using INSERT row kind. Therefore, any column of
       // the inserted row
@@ -82,7 +83,7 @@ public class RowDataTaskWriterFactory implements TaskWriterFactory<RowData> {
           new FlinkAppenderFactory(
               schema,
               flinkSchema,
-              table.properties(),
+              writeProperties,
               spec,
               ArrayUtil.toIntArray(equalityFieldIds),
               TypeUtil.select(schema, Sets.newHashSet(equalityFieldIds)),
@@ -92,7 +93,7 @@ public class RowDataTaskWriterFactory implements TaskWriterFactory<RowData> {
           new FlinkAppenderFactory(
               schema,
               flinkSchema,
-              table.properties(),
+              writeProperties,
               spec,
               ArrayUtil.toIntArray(equalityFieldIds),
               schema,
diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/RowDataRewriter.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/RowDataRewriter.java
index 1468879097..23665b7c9f 100644
--- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/RowDataRewriter.java
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/RowDataRewriter.java
@@ -77,7 +77,13 @@ public class RowDataRewriter {
     RowType flinkSchema = FlinkSchemaUtil.convert(table.schema());
     this.taskWriterFactory =
         new RowDataTaskWriterFactory(
-            SerializableTable.copyOf(table), flinkSchema, Long.MAX_VALUE, format, null, false);
+            SerializableTable.copyOf(table),
+            flinkSchema,
+            Long.MAX_VALUE,
+            format,
+            table.properties(),
+            null,
+            false);
   }
 
   public List<DataFile> rewriteDataForTasks(
diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestCompressionSettings.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestCompressionSettings.java
new file mode 100644
index 0000000000..49f472b732
--- /dev/null
+++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestCompressionSettings.java
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.common.DynFields;
+import org.apache.iceberg.flink.FlinkWriteConf;
+import org.apache.iceberg.flink.FlinkWriteOptions;
+import org.apache.iceberg.flink.SimpleDataUtil;
+import org.apache.iceberg.io.BaseTaskWriter;
+import org.apache.iceberg.io.TaskWriter;
+import org.apache.iceberg.io.WriteResult;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class TestCompressionSettings {
+  @Rule public TemporaryFolder tempFolder = new TemporaryFolder();
+
+  private Table table;
+
+  private final Map<String, String> initProperties;
+
+  @Parameterized.Parameters(name = "tableProperties = {0}")
+  public static Object[] parameters() {
+    return new Object[] {
+      ImmutableMap.of(),
+      ImmutableMap.of(
+          TableProperties.AVRO_COMPRESSION,
+          "zstd",
+          TableProperties.AVRO_COMPRESSION_LEVEL,
+          "3",
+          TableProperties.PARQUET_COMPRESSION,
+          "zstd",
+          TableProperties.PARQUET_COMPRESSION_LEVEL,
+          "3",
+          TableProperties.ORC_COMPRESSION,
+          "zstd",
+          TableProperties.ORC_COMPRESSION_STRATEGY,
+          "compression")
+    };
+  }
+
+  public TestCompressionSettings(Map<String, String> initProperties) {
+    this.initProperties = initProperties;
+  }
+
+  @Before
+  public void before() throws IOException {
+    File folder = tempFolder.newFolder();
+    table = SimpleDataUtil.createTable(folder.getAbsolutePath(), initProperties, false);
+  }
+
+  @Test
+  public void testCompressionAvro() throws Exception {
+    // No override provided
+    Map<String, String> resultProperties =
+        appenderProperties(
+            table,
+            SimpleDataUtil.FLINK_SCHEMA,
+            ImmutableMap.of(FlinkWriteOptions.WRITE_FORMAT.key(), "AVRO"));
+
+    if (initProperties.get(TableProperties.AVRO_COMPRESSION) == null) {
+      Assert.assertEquals(
+          TableProperties.AVRO_COMPRESSION_DEFAULT,
+          resultProperties.get(TableProperties.AVRO_COMPRESSION));
+      Assert.assertEquals(
+          TableProperties.AVRO_COMPRESSION_LEVEL_DEFAULT,
+          resultProperties.get(TableProperties.AVRO_COMPRESSION_LEVEL));
+    } else {
+      Assert.assertEquals(
+          initProperties.get(TableProperties.AVRO_COMPRESSION),
+          resultProperties.get(TableProperties.AVRO_COMPRESSION));
+      Assert.assertEquals(
+          initProperties.get(TableProperties.AVRO_COMPRESSION_LEVEL),
+          resultProperties.get(TableProperties.AVRO_COMPRESSION_LEVEL));
+    }
+
+    // Override compression to snappy and some random level
+    resultProperties =
+        appenderProperties(
+            table,
+            SimpleDataUtil.FLINK_SCHEMA,
+            ImmutableMap.of(
+                FlinkWriteOptions.WRITE_FORMAT.key(),
+                "AVRO",
+                FlinkWriteOptions.COMPRESSION_CODEC.key(),
+                "snappy",
+                FlinkWriteOptions.COMPRESSION_LEVEL.key(),
+                "6"));
+
+    Assert.assertEquals("snappy", resultProperties.get(TableProperties.AVRO_COMPRESSION));
+    Assert.assertEquals("6", resultProperties.get(TableProperties.AVRO_COMPRESSION_LEVEL));
+  }
+
+  @Test
+  public void testCompressionParquet() throws Exception {
+    // No override provided
+    Map<String, String> resultProperties =
+        appenderProperties(
+            table,
+            SimpleDataUtil.FLINK_SCHEMA,
+            ImmutableMap.of(FlinkWriteOptions.WRITE_FORMAT.key(), "PARQUET"));
+
+    if (initProperties.get(TableProperties.PARQUET_COMPRESSION) == null) {
+      Assert.assertEquals(
+          TableProperties.PARQUET_COMPRESSION_DEFAULT,
+          resultProperties.get(TableProperties.PARQUET_COMPRESSION));
+      Assert.assertEquals(
+          TableProperties.PARQUET_COMPRESSION_LEVEL_DEFAULT,
+          resultProperties.get(TableProperties.PARQUET_COMPRESSION_LEVEL));
+    } else {
+      Assert.assertEquals(
+          initProperties.get(TableProperties.PARQUET_COMPRESSION),
+          resultProperties.get(TableProperties.PARQUET_COMPRESSION));
+      Assert.assertEquals(
+          initProperties.get(TableProperties.PARQUET_COMPRESSION_LEVEL),
+          resultProperties.get(TableProperties.PARQUET_COMPRESSION_LEVEL));
+    }
+
+    // Override compression to snappy and some random level
+    resultProperties =
+        appenderProperties(
+            table,
+            SimpleDataUtil.FLINK_SCHEMA,
+            ImmutableMap.of(
+                FlinkWriteOptions.WRITE_FORMAT.key(),
+                "PARQUET",
+                FlinkWriteOptions.COMPRESSION_CODEC.key(),
+                "snappy",
+                FlinkWriteOptions.COMPRESSION_LEVEL.key(),
+                "6"));
+
+    Assert.assertEquals("snappy", resultProperties.get(TableProperties.PARQUET_COMPRESSION));
+    Assert.assertEquals("6", resultProperties.get(TableProperties.PARQUET_COMPRESSION_LEVEL));
+  }
+
+  @Test
+  public void testCompressionOrc() throws Exception {
+    // No override provided
+    Map<String, String> resultProperties =
+        appenderProperties(
+            table,
+            SimpleDataUtil.FLINK_SCHEMA,
+            ImmutableMap.of(FlinkWriteOptions.WRITE_FORMAT.key(), "ORC"));
+
+    if (initProperties.get(TableProperties.ORC_COMPRESSION) == null) {
+      Assert.assertEquals(
+          TableProperties.ORC_COMPRESSION_DEFAULT,
+          resultProperties.get(TableProperties.ORC_COMPRESSION));
+      Assert.assertEquals(
+          TableProperties.ORC_COMPRESSION_STRATEGY_DEFAULT,
+          resultProperties.get(TableProperties.ORC_COMPRESSION_STRATEGY));
+    } else {
+      Assert.assertEquals(
+          initProperties.get(TableProperties.ORC_COMPRESSION),
+          resultProperties.get(TableProperties.ORC_COMPRESSION));
+      Assert.assertEquals(
+          initProperties.get(TableProperties.ORC_COMPRESSION_STRATEGY),
+          resultProperties.get(TableProperties.ORC_COMPRESSION_STRATEGY));
+    }
+
+    // Override compression to snappy and a different strategy
+    resultProperties =
+        appenderProperties(
+            table,
+            SimpleDataUtil.FLINK_SCHEMA,
+            ImmutableMap.of(
+                FlinkWriteOptions.WRITE_FORMAT.key(),
+                "ORC",
+                FlinkWriteOptions.COMPRESSION_CODEC.key(),
+                "snappy",
+                FlinkWriteOptions.COMPRESSION_STRATEGY.key(),
+                "speed"));
+
+    Assert.assertEquals("snappy", resultProperties.get(TableProperties.ORC_COMPRESSION));
+    Assert.assertEquals("speed", resultProperties.get(TableProperties.ORC_COMPRESSION_STRATEGY));
+  }
+
+  private static OneInputStreamOperatorTestHarness<RowData, WriteResult> createIcebergStreamWriter(
+      Table icebergTable, TableSchema flinkSchema, Map<String, String> override) throws Exception {
+    RowType flinkRowType = FlinkSink.toFlinkRowType(icebergTable.schema(), flinkSchema);
+    FlinkWriteConf flinkWriteConfig =
+        new FlinkWriteConf(
+            icebergTable, override, new org.apache.flink.configuration.Configuration());
+
+    IcebergStreamWriter<RowData> streamWriter =
+        FlinkSink.createStreamWriter(icebergTable, flinkWriteConfig, flinkRowType, null);
+    OneInputStreamOperatorTestHarness<RowData, WriteResult> harness =
+        new OneInputStreamOperatorTestHarness<>(streamWriter, 1, 1, 0);
+
+    harness.setup();
+    harness.open();
+
+    return harness;
+  }
+
+  private static Map<String, String> appenderProperties(
+      Table table, TableSchema schema, Map<String, String> override) throws Exception {
+    try (OneInputStreamOperatorTestHarness<RowData, WriteResult> testHarness =
+        createIcebergStreamWriter(table, schema, override)) {
+      testHarness.processElement(SimpleDataUtil.createRowData(1, "hello"), 1);
+
+      testHarness.prepareSnapshotPreBarrier(1L);
+      DynFields.BoundField<IcebergStreamWriter> operatorField =
+          DynFields.builder()
+              .hiddenImpl(testHarness.getOperatorFactory().getClass(), "operator")
+              .build(testHarness.getOperatorFactory());
+      DynFields.BoundField<TaskWriter> writerField =
+          DynFields.builder()
+              .hiddenImpl(IcebergStreamWriter.class, "writer")
+              .build(operatorField.get());
+      DynFields.BoundField<FlinkAppenderFactory> appenderField =
+          DynFields.builder()
+              .hiddenImpl(BaseTaskWriter.class, "appenderFactory")
+              .build(writerField.get());
+      DynFields.BoundField<Map<String, String>> propsField =
+          DynFields.builder()
+              .hiddenImpl(FlinkAppenderFactory.class, "props")
+              .build(appenderField.get());
+      return propsField.get();
+    }
+  }
+}
diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java
index 2e5e7121bb..1f8cbfe191 100644
--- a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java
+++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java
@@ -345,6 +345,7 @@ public class TestDeltaTaskWriter extends TableTestBase {
         FlinkSchemaUtil.convert(table.schema()),
         128 * 1024 * 1024,
         format,
+        table.properties(),
         equalityFieldIds,
         false);
   }
diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestTaskWriters.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestTaskWriters.java
index 04ec662fb8..112dbb5113 100644
--- a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestTaskWriters.java
+++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestTaskWriters.java
@@ -234,6 +234,7 @@ public class TestTaskWriters {
             (RowType) SimpleDataUtil.FLINK_SCHEMA.toRowDataType().getLogicalType(),
             targetFileSize,
             format,
+            table.properties(),
             null,
             false);
     taskWriterFactory.initialize(1, 1);
diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestProjectMetaColumn.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestProjectMetaColumn.java
index bc63e4a0b2..25ecec23d2 100644
--- a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestProjectMetaColumn.java
+++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestProjectMetaColumn.java
@@ -176,6 +176,7 @@ public class TestProjectMetaColumn {
             SimpleDataUtil.ROW_TYPE,
             TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT,
             format,
+            table.properties(),
             equalityFieldIds,
             upsert);