You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by op...@apache.org on 2021/09/13 11:49:41 UTC

[iceberg] branch master updated: Flink: Add streaming upsert write option. (#2863)

This is an automated email from the ASF dual-hosted git repository.

openinx pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 3763952  Flink:  Add streaming upsert write option. (#2863)
3763952 is described below

commit 3763952c7b5d101e2c84efb1651b88439f641099
Author: Reo <Re...@users.noreply.github.com>
AuthorDate: Mon Sep 13 19:49:30 2021 +0800

    Flink:  Add streaming upsert write option. (#2863)
---
 .../java/org/apache/iceberg/TableProperties.java   |   3 +
 .../iceberg/flink/sink/BaseDeltaTaskWriter.java    |  15 ++-
 .../org/apache/iceberg/flink/sink/FlinkSink.java   |  46 +++++++-
 .../iceberg/flink/sink/PartitionedDeltaWriter.java |   6 +-
 .../flink/sink/RowDataTaskWriterFactory.java       |   9 +-
 .../flink/sink/UnpartitionedDeltaWriter.java       |   6 +-
 .../iceberg/flink/source/RowDataRewriter.java      |   3 +-
 .../iceberg/flink/sink/TestDeltaTaskWriter.java    |   2 +-
 .../iceberg/flink/sink/TestFlinkIcebergSinkV2.java | 126 ++++++++++++++++++++-
 .../flink/sink/TestIcebergStreamWriter.java        |   2 +-
 .../apache/iceberg/flink/sink/TestTaskWriters.java |   2 +-
 11 files changed, 201 insertions(+), 19 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/TableProperties.java b/core/src/main/java/org/apache/iceberg/TableProperties.java
index dcbab13..e2433db 100644
--- a/core/src/main/java/org/apache/iceberg/TableProperties.java
+++ b/core/src/main/java/org/apache/iceberg/TableProperties.java
@@ -228,4 +228,7 @@ public class TableProperties {
 
   public static final String MERGE_CARDINALITY_CHECK_ENABLED = "write.merge.cardinality-check.enabled";
   public static final boolean MERGE_CARDINALITY_CHECK_ENABLED_DEFAULT = true;
+
+  public static final String UPSERT_MODE_ENABLE = "write.upsert.enable";
+  public static final boolean UPSERT_MODE_ENABLE_DEFAULT = false;
 }
diff --git a/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java b/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java
index 10dab41..8415129 100644
--- a/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java
+++ b/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java
@@ -41,6 +41,7 @@ abstract class BaseDeltaTaskWriter extends BaseTaskWriter<RowData> {
   private final Schema schema;
   private final Schema deleteSchema;
   private final RowDataWrapper wrapper;
+  private final boolean upsert;
 
   BaseDeltaTaskWriter(PartitionSpec spec,
                       FileFormat format,
@@ -50,11 +51,13 @@ abstract class BaseDeltaTaskWriter extends BaseTaskWriter<RowData> {
                       long targetFileSize,
                       Schema schema,
                       RowType flinkSchema,
-                      List<Integer> equalityFieldIds) {
+                      List<Integer> equalityFieldIds,
+                      boolean upsert) {
     super(spec, format, appenderFactory, fileFactory, io, targetFileSize);
     this.schema = schema;
     this.deleteSchema = TypeUtil.select(schema, Sets.newHashSet(equalityFieldIds));
     this.wrapper = new RowDataWrapper(flinkSchema, schema.asStruct());
+    this.upsert = upsert;
   }
 
   abstract RowDataDeltaWriter route(RowData row);
@@ -70,11 +73,19 @@ abstract class BaseDeltaTaskWriter extends BaseTaskWriter<RowData> {
     switch (row.getRowKind()) {
       case INSERT:
       case UPDATE_AFTER:
+        if (upsert) {
+          writer.delete(row);
+        }
         writer.write(row);
         break;
 
-      case DELETE:
       case UPDATE_BEFORE:
+        if (upsert) {
+          break;  // UPDATE_BEFORE is not necessary for UPDATE, we do nothing to prevent delete one row twice
+        }
+        writer.delete(row);
+        break;
+      case DELETE:
         writer.delete(row);
         break;
 
diff --git a/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java b/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
index d25f5d0..055c6b5 100644
--- a/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
+++ b/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
@@ -41,6 +41,7 @@ import org.apache.flink.types.Row;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DistributionMode;
 import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionField;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.SerializableTable;
@@ -58,6 +59,8 @@ import org.slf4j.LoggerFactory;
 
 import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
 import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
+import static org.apache.iceberg.TableProperties.UPSERT_MODE_ENABLE;
+import static org.apache.iceberg.TableProperties.UPSERT_MODE_ENABLE_DEFAULT;
 import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE;
 import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE_DEFAULT;
 import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES;
@@ -125,6 +128,7 @@ public class FlinkSink {
     private boolean overwrite = false;
     private DistributionMode distributionMode = null;
     private Integer writeParallelism = null;
+    private boolean upsert = false;
     private List<String> equalityFieldColumns = null;
     private String uidPrefix = null;
 
@@ -213,6 +217,20 @@ public class FlinkSink {
     }
 
     /**
+     * All INSERT/UPDATE_AFTER events from input stream will be transformed to UPSERT events, which means it will
+     * DELETE the old records and then INSERT the new records. In partitioned table, the partition fields should be
+     * a subset of equality fields, otherwise the old row that located in partition-A could not be deleted by the
+     * new row that located in partition-B.
+     *
+     * @param enable indicate whether it should transform all INSERT/UPDATE_AFTER events to UPSERT.
+     * @return {@link Builder} to connect the iceberg table.
+     */
+    public Builder upsert(boolean enable) {
+      this.upsert = enable;
+      return this;
+    }
+
+    /**
      * Configuring the equality field columns for iceberg table that accept CDC or UPSERT events.
      *
      * @param columns defines the iceberg table's key.
@@ -343,7 +361,27 @@ public class FlinkSink {
           equalityFieldIds.add(field.fieldId());
         }
       }
-      IcebergStreamWriter<RowData> streamWriter = createStreamWriter(table, flinkRowType, equalityFieldIds);
+
+      // Fallback to use upsert mode parsed from table properties if don't specify in job level.
+      boolean upsertMode = upsert || PropertyUtil.propertyAsBoolean(table.properties(),
+          UPSERT_MODE_ENABLE, UPSERT_MODE_ENABLE_DEFAULT);
+
+      // Validate the equality fields and partition fields if we enable the upsert mode.
+      if (upsertMode) {
+        Preconditions.checkState(!overwrite,
+            "OVERWRITE mode shouldn't be enable when configuring to use UPSERT data stream.");
+        Preconditions.checkState(!equalityFieldIds.isEmpty(),
+            "Equality field columns shouldn't be empty when configuring to use UPSERT data stream.");
+        if (!table.spec().isUnpartitioned()) {
+          for (PartitionField partitionField : table.spec().fields()) {
+            Preconditions.checkState(equalityFieldIds.contains(partitionField.sourceId()),
+                "In UPSERT mode, partition field '%s' should be included in equality fields: '%s'",
+                partitionField, equalityFieldColumns);
+          }
+        }
+      }
+
+      IcebergStreamWriter<RowData> streamWriter = createStreamWriter(table, flinkRowType, equalityFieldIds, upsertMode);
 
       int parallelism = writeParallelism == null ? input.getParallelism() : writeParallelism;
       SingleOutputStreamOperator<WriteResult> writerStream = input
@@ -412,7 +450,9 @@ public class FlinkSink {
 
   static IcebergStreamWriter<RowData> createStreamWriter(Table table,
                                                          RowType flinkRowType,
-                                                         List<Integer> equalityFieldIds) {
+                                                         List<Integer> equalityFieldIds,
+                                                         boolean upsert) {
+    Preconditions.checkArgument(table != null, "Iceberg table should't be null");
     Map<String, String> props = table.properties();
     long targetFileSize = getTargetFileSizeBytes(props);
     FileFormat fileFormat = getFileFormat(props);
@@ -420,7 +460,7 @@ public class FlinkSink {
     Table serializableTable = SerializableTable.copyOf(table);
     TaskWriterFactory<RowData> taskWriterFactory = new RowDataTaskWriterFactory(
         serializableTable, flinkRowType, targetFileSize,
-        fileFormat, equalityFieldIds);
+        fileFormat, equalityFieldIds, upsert);
 
     return new IcebergStreamWriter<>(table.name(), taskWriterFactory);
   }
diff --git a/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionedDeltaWriter.java b/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionedDeltaWriter.java
index b2f8cee..1eee629 100644
--- a/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionedDeltaWriter.java
+++ b/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionedDeltaWriter.java
@@ -49,8 +49,10 @@ class PartitionedDeltaWriter extends BaseDeltaTaskWriter {
                          long targetFileSize,
                          Schema schema,
                          RowType flinkSchema,
-                         List<Integer> equalityFieldIds) {
-    super(spec, format, appenderFactory, fileFactory, io, targetFileSize, schema, flinkSchema, equalityFieldIds);
+                         List<Integer> equalityFieldIds,
+                         boolean upsert) {
+    super(spec, format, appenderFactory, fileFactory, io, targetFileSize, schema, flinkSchema, equalityFieldIds,
+        upsert);
     this.partitionKey = new PartitionKey(spec, schema);
   }
 
diff --git a/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java b/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java
index e94f99f..2849100 100644
--- a/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java
+++ b/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java
@@ -46,6 +46,7 @@ public class RowDataTaskWriterFactory implements TaskWriterFactory<RowData> {
   private final long targetFileSizeBytes;
   private final FileFormat format;
   private final List<Integer> equalityFieldIds;
+  private final boolean upsert;
   private final FileAppenderFactory<RowData> appenderFactory;
 
   private transient OutputFileFactory outputFileFactory;
@@ -54,7 +55,8 @@ public class RowDataTaskWriterFactory implements TaskWriterFactory<RowData> {
                                   RowType flinkSchema,
                                   long targetFileSizeBytes,
                                   FileFormat format,
-                                  List<Integer> equalityFieldIds) {
+                                  List<Integer> equalityFieldIds,
+                                  boolean upsert) {
     this.table = table;
     this.schema = table.schema();
     this.flinkSchema = flinkSchema;
@@ -63,6 +65,7 @@ public class RowDataTaskWriterFactory implements TaskWriterFactory<RowData> {
     this.targetFileSizeBytes = targetFileSizeBytes;
     this.format = format;
     this.equalityFieldIds = equalityFieldIds;
+    this.upsert = upsert;
 
     if (equalityFieldIds == null || equalityFieldIds.isEmpty()) {
       this.appenderFactory = new FlinkAppenderFactory(schema, flinkSchema, table.properties(), spec);
@@ -95,10 +98,10 @@ public class RowDataTaskWriterFactory implements TaskWriterFactory<RowData> {
       // Initialize a task writer to write both INSERT and equality DELETE.
       if (spec.isUnpartitioned()) {
         return new UnpartitionedDeltaWriter(spec, format, appenderFactory, outputFileFactory, io,
-            targetFileSizeBytes, schema, flinkSchema, equalityFieldIds);
+            targetFileSizeBytes, schema, flinkSchema, equalityFieldIds, upsert);
       } else {
         return new PartitionedDeltaWriter(spec, format, appenderFactory, outputFileFactory, io,
-            targetFileSizeBytes, schema, flinkSchema, equalityFieldIds);
+            targetFileSizeBytes, schema, flinkSchema, equalityFieldIds, upsert);
       }
     }
   }
diff --git a/flink/src/main/java/org/apache/iceberg/flink/sink/UnpartitionedDeltaWriter.java b/flink/src/main/java/org/apache/iceberg/flink/sink/UnpartitionedDeltaWriter.java
index 341e634..331ed7c 100644
--- a/flink/src/main/java/org/apache/iceberg/flink/sink/UnpartitionedDeltaWriter.java
+++ b/flink/src/main/java/org/apache/iceberg/flink/sink/UnpartitionedDeltaWriter.java
@@ -41,8 +41,10 @@ class UnpartitionedDeltaWriter extends BaseDeltaTaskWriter {
                            long targetFileSize,
                            Schema schema,
                            RowType flinkSchema,
-                           List<Integer> equalityFieldIds) {
-    super(spec, format, appenderFactory, fileFactory, io, targetFileSize, schema, flinkSchema, equalityFieldIds);
+                           List<Integer> equalityFieldIds,
+                           boolean upsert) {
+    super(spec, format, appenderFactory, fileFactory, io, targetFileSize, schema, flinkSchema, equalityFieldIds,
+        upsert);
     this.writer = new RowDataDeltaWriter(null);
   }
 
diff --git a/flink/src/main/java/org/apache/iceberg/flink/source/RowDataRewriter.java b/flink/src/main/java/org/apache/iceberg/flink/source/RowDataRewriter.java
index 752035e..5e8837c 100644
--- a/flink/src/main/java/org/apache/iceberg/flink/source/RowDataRewriter.java
+++ b/flink/src/main/java/org/apache/iceberg/flink/source/RowDataRewriter.java
@@ -77,7 +77,8 @@ public class RowDataRewriter {
         flinkSchema,
         Long.MAX_VALUE,
         format,
-        null);
+        null,
+        false);
   }
 
   public List<DataFile> rewriteDataForTasks(DataStream<CombinedScanTask> dataStream, int parallelism) throws Exception {
diff --git a/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java b/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java
index bdda3fd..71978fd 100644
--- a/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java
+++ b/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java
@@ -334,6 +334,6 @@ public class TestDeltaTaskWriter extends TableTestBase {
   private TaskWriterFactory<RowData> createTaskWriterFactory(List<Integer> equalityFieldIds) {
     return new RowDataTaskWriterFactory(
         SerializableTable.copyOf(table), FlinkSchemaUtil.convert(table.schema()),
-        128 * 1024 * 1024, format, equalityFieldIds);
+        128 * 1024 * 1024, format, equalityFieldIds, false);
   }
 }
diff --git a/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java b/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java
index fd2a71a..90f662c 100644
--- a/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java
+++ b/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java
@@ -32,6 +32,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.flink.types.Row;
 import org.apache.flink.types.RowKind;
+import org.apache.iceberg.AssertHelpers;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Snapshot;
@@ -144,6 +145,7 @@ public class TestFlinkIcebergSinkV2 extends TableTestBase {
 
   private void testChangeLogs(List<String> equalityFieldColumns,
                               KeySelector<Row, Object> keySelector,
+                              boolean insertAsUpsert,
                               List<List<Row>> elementsPerCheckpoint,
                               List<List<Record>> expectedRecordsPerCheckpoint) throws Exception {
     DataStream<Row> dataStream = env.addSource(new BoundedTestSource<>(elementsPerCheckpoint), ROW_TYPE_INFO);
@@ -157,6 +159,7 @@ public class TestFlinkIcebergSinkV2 extends TableTestBase {
         .tableSchema(SimpleDataUtil.FLINK_SCHEMA)
         .writeParallelism(parallelism)
         .equalityFieldColumns(equalityFieldColumns)
+        .upsert(insertAsUpsert)
         .build();
 
     // Execute the program.
@@ -219,7 +222,8 @@ public class TestFlinkIcebergSinkV2 extends TableTestBase {
         ImmutableList.of(record(1, "ddd"), record(2, "ddd"))
     );
 
-    testChangeLogs(ImmutableList.of("id"), row -> row.getField(ROW_ID_POS), elementsPerCheckpoint, expectedRecords);
+    testChangeLogs(ImmutableList.of("id"), row -> row.getField(ROW_ID_POS), false,
+        elementsPerCheckpoint, expectedRecords);
   }
 
   @Test
@@ -250,7 +254,8 @@ public class TestFlinkIcebergSinkV2 extends TableTestBase {
         ImmutableList.of(record(1, "aaa"), record(1, "ccc"), record(2, "aaa"), record(2, "ccc"))
     );
 
-    testChangeLogs(ImmutableList.of("data"), row -> row.getField(ROW_DATA_POS), elementsPerCheckpoint, expectedRecords);
+    testChangeLogs(ImmutableList.of("data"), row -> row.getField(ROW_DATA_POS), false,
+        elementsPerCheckpoint, expectedRecords);
   }
 
   @Test
@@ -281,7 +286,7 @@ public class TestFlinkIcebergSinkV2 extends TableTestBase {
     );
 
     testChangeLogs(ImmutableList.of("data", "id"), row -> Row.of(row.getField(ROW_ID_POS), row.getField(ROW_DATA_POS)),
-        elementsPerCheckpoint, expectedRecords);
+        false, elementsPerCheckpoint, expectedRecords);
   }
 
   @Test
@@ -319,9 +324,124 @@ public class TestFlinkIcebergSinkV2 extends TableTestBase {
     );
 
     testChangeLogs(ImmutableList.of("id", "data"), row -> Row.of(row.getField(ROW_ID_POS), row.getField(ROW_DATA_POS)),
+        false, elementsPerCheckpoint, expectedRecords);
+  }
+
+  @Test
+  public void testUpsertModeCheck() throws Exception {
+    DataStream<Row> dataStream = env.addSource(new BoundedTestSource<>(ImmutableList.of()), ROW_TYPE_INFO);
+    FlinkSink.Builder builder = FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA)
+        .tableLoader(tableLoader)
+        .tableSchema(SimpleDataUtil.FLINK_SCHEMA)
+        .writeParallelism(parallelism)
+        .upsert(true);
+
+    AssertHelpers.assertThrows("Should be error because upsert mode and overwrite mode enable at the same time.",
+        IllegalStateException.class, "OVERWRITE mode shouldn't be enable",
+        () -> builder.equalityFieldColumns(ImmutableList.of("id")).overwrite(true).build()
+    );
+
+    AssertHelpers.assertThrows("Should be error because equality field columns are empty.",
+        IllegalStateException.class, "Equality field columns shouldn't be empty",
+        () -> builder.equalityFieldColumns(ImmutableList.of()).overwrite(false).build()
+    );
+  }
+
+  @Test
+  public void testUpsertOnIdKey() throws Exception {
+    List<List<Row>> elementsPerCheckpoint = ImmutableList.of(
+        ImmutableList.of(
+            row("+I", 1, "aaa"),
+            row("+U", 1, "bbb")
+        ),
+        ImmutableList.of(
+            row("+I", 1, "ccc")
+        ),
+        ImmutableList.of(
+            row("+U", 1, "ddd"),
+            row("+I", 1, "eee")
+        )
+    );
+
+    List<List<Record>> expectedRecords = ImmutableList.of(
+        ImmutableList.of(record(1, "bbb")),
+        ImmutableList.of(record(1, "ccc")),
+        ImmutableList.of(record(1, "eee"))
+    );
+
+    if (!partitioned) {
+      testChangeLogs(ImmutableList.of("id"), row -> row.getField(ROW_ID_POS), true,
+          elementsPerCheckpoint, expectedRecords);
+    } else {
+      AssertHelpers.assertThrows("Should be error because equality field columns don't include all partition keys",
+          IllegalStateException.class, "should be included in equality fields",
+          () -> {
+            testChangeLogs(ImmutableList.of("id"), row -> row.getField(ROW_ID_POS), true, elementsPerCheckpoint,
+                expectedRecords);
+            return null;
+          });
+    }
+  }
+
+  @Test
+  public void testUpsertOnDataKey() throws Exception {
+    List<List<Row>> elementsPerCheckpoint = ImmutableList.of(
+        ImmutableList.of(
+            row("+I", 1, "aaa"),
+            row("+I", 2, "aaa"),
+            row("+I", 3, "bbb")
+        ),
+        ImmutableList.of(
+            row("+U", 4, "aaa"),
+            row("-U", 3, "bbb"),
+            row("+U", 5, "bbb")
+        ),
+        ImmutableList.of(
+            row("+I", 6, "aaa"),
+            row("+U", 7, "bbb")
+        )
+    );
+
+    List<List<Record>> expectedRecords = ImmutableList.of(
+        ImmutableList.of(record(2, "aaa"), record(3, "bbb")),
+        ImmutableList.of(record(4, "aaa"), record(5, "bbb")),
+        ImmutableList.of(record(6, "aaa"), record(7, "bbb"))
+    );
+
+    testChangeLogs(ImmutableList.of("data"), row -> row.getField(ROW_DATA_POS), true,
         elementsPerCheckpoint, expectedRecords);
   }
 
+  @Test
+  public void testUpsertOnIdDataKey() throws Exception {
+    List<List<Row>> elementsPerCheckpoint = ImmutableList.of(
+        ImmutableList.of(
+            row("+I", 1, "aaa"),
+            row("+U", 1, "aaa"),
+            row("+I", 2, "bbb")
+        ),
+        ImmutableList.of(
+            row("+I", 1, "aaa"),
+            row("-D", 2, "bbb"),
+            row("+I", 2, "ccc")
+        ),
+        ImmutableList.of(
+            row("+U", 1, "bbb"),
+            row("-U", 1, "ccc"),
+            row("-D", 1, "aaa")
+        )
+    );
+
+    List<List<Record>> expectedRecords = ImmutableList.of(
+        ImmutableList.of(record(1, "aaa"), record(2, "bbb")),
+        ImmutableList.of(record(1, "aaa"), record(2, "ccc")),
+        ImmutableList.of(record(1, "bbb"), record(2, "ccc"))
+    );
+
+    testChangeLogs(ImmutableList.of("id", "data"), row -> Row.of(row.getField(ROW_ID_POS), row.getField(ROW_DATA_POS)),
+        true, elementsPerCheckpoint, expectedRecords);
+  }
+
   private StructLikeSet expectedRowSet(Record... records) {
     return SimpleDataUtil.expectedRowSet(table, records);
   }
diff --git a/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java b/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java
index e5ffd01..7419775 100644
--- a/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java
+++ b/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java
@@ -340,7 +340,7 @@ public class TestIcebergStreamWriter {
   private OneInputStreamOperatorTestHarness<RowData, WriteResult> createIcebergStreamWriter(
       Table icebergTable, TableSchema flinkSchema) throws Exception {
     RowType flinkRowType = FlinkSink.toFlinkRowType(icebergTable.schema(), flinkSchema);
-    IcebergStreamWriter<RowData> streamWriter = FlinkSink.createStreamWriter(icebergTable, flinkRowType, null);
+    IcebergStreamWriter<RowData> streamWriter = FlinkSink.createStreamWriter(icebergTable, flinkRowType, null, false);
     OneInputStreamOperatorTestHarness<RowData, WriteResult> harness = new OneInputStreamOperatorTestHarness<>(
         streamWriter, 1, 1, 0);
 
diff --git a/flink/src/test/java/org/apache/iceberg/flink/sink/TestTaskWriters.java b/flink/src/test/java/org/apache/iceberg/flink/sink/TestTaskWriters.java
index 9eee57f..562a75e 100644
--- a/flink/src/test/java/org/apache/iceberg/flink/sink/TestTaskWriters.java
+++ b/flink/src/test/java/org/apache/iceberg/flink/sink/TestTaskWriters.java
@@ -239,7 +239,7 @@ public class TestTaskWriters {
   private TaskWriter<RowData> createTaskWriter(long targetFileSize) {
     TaskWriterFactory<RowData> taskWriterFactory = new RowDataTaskWriterFactory(
         SerializableTable.copyOf(table), (RowType) SimpleDataUtil.FLINK_SCHEMA.toRowDataType().getLogicalType(),
-        targetFileSize, format, null);
+        targetFileSize, format, null, false);
     taskWriterFactory.initialize(1, 1);
     return taskWriterFactory.create();
   }