You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by op...@apache.org on 2021/09/13 11:49:41 UTC
[iceberg] branch master updated: Flink: Add streaming upsert write
option. (#2863)
This is an automated email from the ASF dual-hosted git repository.
openinx pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 3763952 Flink: Add streaming upsert write option. (#2863)
3763952 is described below
commit 3763952c7b5d101e2c84efb1651b88439f641099
Author: Reo <Re...@users.noreply.github.com>
AuthorDate: Mon Sep 13 19:49:30 2021 +0800
Flink: Add streaming upsert write option. (#2863)
---
.../java/org/apache/iceberg/TableProperties.java | 3 +
.../iceberg/flink/sink/BaseDeltaTaskWriter.java | 15 ++-
.../org/apache/iceberg/flink/sink/FlinkSink.java | 46 +++++++-
.../iceberg/flink/sink/PartitionedDeltaWriter.java | 6 +-
.../flink/sink/RowDataTaskWriterFactory.java | 9 +-
.../flink/sink/UnpartitionedDeltaWriter.java | 6 +-
.../iceberg/flink/source/RowDataRewriter.java | 3 +-
.../iceberg/flink/sink/TestDeltaTaskWriter.java | 2 +-
.../iceberg/flink/sink/TestFlinkIcebergSinkV2.java | 126 ++++++++++++++++++++-
.../flink/sink/TestIcebergStreamWriter.java | 2 +-
.../apache/iceberg/flink/sink/TestTaskWriters.java | 2 +-
11 files changed, 201 insertions(+), 19 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/TableProperties.java b/core/src/main/java/org/apache/iceberg/TableProperties.java
index dcbab13..e2433db 100644
--- a/core/src/main/java/org/apache/iceberg/TableProperties.java
+++ b/core/src/main/java/org/apache/iceberg/TableProperties.java
@@ -228,4 +228,7 @@ public class TableProperties {
public static final String MERGE_CARDINALITY_CHECK_ENABLED = "write.merge.cardinality-check.enabled";
public static final boolean MERGE_CARDINALITY_CHECK_ENABLED_DEFAULT = true;
+
+ public static final String UPSERT_MODE_ENABLE = "write.upsert.enable";
+ public static final boolean UPSERT_MODE_ENABLE_DEFAULT = false;
}
diff --git a/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java b/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java
index 10dab41..8415129 100644
--- a/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java
+++ b/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java
@@ -41,6 +41,7 @@ abstract class BaseDeltaTaskWriter extends BaseTaskWriter<RowData> {
private final Schema schema;
private final Schema deleteSchema;
private final RowDataWrapper wrapper;
+ private final boolean upsert;
BaseDeltaTaskWriter(PartitionSpec spec,
FileFormat format,
@@ -50,11 +51,13 @@ abstract class BaseDeltaTaskWriter extends BaseTaskWriter<RowData> {
long targetFileSize,
Schema schema,
RowType flinkSchema,
- List<Integer> equalityFieldIds) {
+ List<Integer> equalityFieldIds,
+ boolean upsert) {
super(spec, format, appenderFactory, fileFactory, io, targetFileSize);
this.schema = schema;
this.deleteSchema = TypeUtil.select(schema, Sets.newHashSet(equalityFieldIds));
this.wrapper = new RowDataWrapper(flinkSchema, schema.asStruct());
+ this.upsert = upsert;
}
abstract RowDataDeltaWriter route(RowData row);
@@ -70,11 +73,19 @@ abstract class BaseDeltaTaskWriter extends BaseTaskWriter<RowData> {
switch (row.getRowKind()) {
case INSERT:
case UPDATE_AFTER:
+ if (upsert) {
+ writer.delete(row);
+ }
writer.write(row);
break;
- case DELETE:
case UPDATE_BEFORE:
+ if (upsert) {
+ break; // UPDATE_BEFORE is not necessary for UPDATE, we do nothing to prevent delete one row twice
+ }
+ writer.delete(row);
+ break;
+ case DELETE:
writer.delete(row);
break;
diff --git a/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java b/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
index d25f5d0..055c6b5 100644
--- a/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
+++ b/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
@@ -41,6 +41,7 @@ import org.apache.flink.types.Row;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DistributionMode;
import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SerializableTable;
@@ -58,6 +59,8 @@ import org.slf4j.LoggerFactory;
import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
+import static org.apache.iceberg.TableProperties.UPSERT_MODE_ENABLE;
+import static org.apache.iceberg.TableProperties.UPSERT_MODE_ENABLE_DEFAULT;
import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE;
import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE_DEFAULT;
import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES;
@@ -125,6 +128,7 @@ public class FlinkSink {
private boolean overwrite = false;
private DistributionMode distributionMode = null;
private Integer writeParallelism = null;
+ private boolean upsert = false;
private List<String> equalityFieldColumns = null;
private String uidPrefix = null;
@@ -213,6 +217,20 @@ public class FlinkSink {
}
/**
+ * All INSERT/UPDATE_AFTER events from input stream will be transformed to UPSERT events, which means it will
+ * DELETE the old records and then INSERT the new records. In partitioned table, the partition fields should be
+ * a subset of equality fields, otherwise the old row that located in partition-A could not be deleted by the
+ * new row that located in partition-B.
+ *
+ * @param enable indicate whether it should transform all INSERT/UPDATE_AFTER events to UPSERT.
+ * @return {@link Builder} to connect the iceberg table.
+ */
+ public Builder upsert(boolean enable) {
+ this.upsert = enable;
+ return this;
+ }
+
+ /**
* Configuring the equality field columns for iceberg table that accept CDC or UPSERT events.
*
* @param columns defines the iceberg table's key.
@@ -343,7 +361,27 @@ public class FlinkSink {
equalityFieldIds.add(field.fieldId());
}
}
- IcebergStreamWriter<RowData> streamWriter = createStreamWriter(table, flinkRowType, equalityFieldIds);
+
+ // Fallback to use upsert mode parsed from table properties if don't specify in job level.
+ boolean upsertMode = upsert || PropertyUtil.propertyAsBoolean(table.properties(),
+ UPSERT_MODE_ENABLE, UPSERT_MODE_ENABLE_DEFAULT);
+
+ // Validate the equality fields and partition fields if we enable the upsert mode.
+ if (upsertMode) {
+ Preconditions.checkState(!overwrite,
+ "OVERWRITE mode shouldn't be enable when configuring to use UPSERT data stream.");
+ Preconditions.checkState(!equalityFieldIds.isEmpty(),
+ "Equality field columns shouldn't be empty when configuring to use UPSERT data stream.");
+ if (!table.spec().isUnpartitioned()) {
+ for (PartitionField partitionField : table.spec().fields()) {
+ Preconditions.checkState(equalityFieldIds.contains(partitionField.sourceId()),
+ "In UPSERT mode, partition field '%s' should be included in equality fields: '%s'",
+ partitionField, equalityFieldColumns);
+ }
+ }
+ }
+
+ IcebergStreamWriter<RowData> streamWriter = createStreamWriter(table, flinkRowType, equalityFieldIds, upsertMode);
int parallelism = writeParallelism == null ? input.getParallelism() : writeParallelism;
SingleOutputStreamOperator<WriteResult> writerStream = input
@@ -412,7 +450,9 @@ public class FlinkSink {
static IcebergStreamWriter<RowData> createStreamWriter(Table table,
RowType flinkRowType,
- List<Integer> equalityFieldIds) {
+ List<Integer> equalityFieldIds,
+ boolean upsert) {
+ Preconditions.checkArgument(table != null, "Iceberg table should't be null");
Map<String, String> props = table.properties();
long targetFileSize = getTargetFileSizeBytes(props);
FileFormat fileFormat = getFileFormat(props);
@@ -420,7 +460,7 @@ public class FlinkSink {
Table serializableTable = SerializableTable.copyOf(table);
TaskWriterFactory<RowData> taskWriterFactory = new RowDataTaskWriterFactory(
serializableTable, flinkRowType, targetFileSize,
- fileFormat, equalityFieldIds);
+ fileFormat, equalityFieldIds, upsert);
return new IcebergStreamWriter<>(table.name(), taskWriterFactory);
}
diff --git a/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionedDeltaWriter.java b/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionedDeltaWriter.java
index b2f8cee..1eee629 100644
--- a/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionedDeltaWriter.java
+++ b/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionedDeltaWriter.java
@@ -49,8 +49,10 @@ class PartitionedDeltaWriter extends BaseDeltaTaskWriter {
long targetFileSize,
Schema schema,
RowType flinkSchema,
- List<Integer> equalityFieldIds) {
- super(spec, format, appenderFactory, fileFactory, io, targetFileSize, schema, flinkSchema, equalityFieldIds);
+ List<Integer> equalityFieldIds,
+ boolean upsert) {
+ super(spec, format, appenderFactory, fileFactory, io, targetFileSize, schema, flinkSchema, equalityFieldIds,
+ upsert);
this.partitionKey = new PartitionKey(spec, schema);
}
diff --git a/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java b/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java
index e94f99f..2849100 100644
--- a/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java
+++ b/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java
@@ -46,6 +46,7 @@ public class RowDataTaskWriterFactory implements TaskWriterFactory<RowData> {
private final long targetFileSizeBytes;
private final FileFormat format;
private final List<Integer> equalityFieldIds;
+ private final boolean upsert;
private final FileAppenderFactory<RowData> appenderFactory;
private transient OutputFileFactory outputFileFactory;
@@ -54,7 +55,8 @@ public class RowDataTaskWriterFactory implements TaskWriterFactory<RowData> {
RowType flinkSchema,
long targetFileSizeBytes,
FileFormat format,
- List<Integer> equalityFieldIds) {
+ List<Integer> equalityFieldIds,
+ boolean upsert) {
this.table = table;
this.schema = table.schema();
this.flinkSchema = flinkSchema;
@@ -63,6 +65,7 @@ public class RowDataTaskWriterFactory implements TaskWriterFactory<RowData> {
this.targetFileSizeBytes = targetFileSizeBytes;
this.format = format;
this.equalityFieldIds = equalityFieldIds;
+ this.upsert = upsert;
if (equalityFieldIds == null || equalityFieldIds.isEmpty()) {
this.appenderFactory = new FlinkAppenderFactory(schema, flinkSchema, table.properties(), spec);
@@ -95,10 +98,10 @@ public class RowDataTaskWriterFactory implements TaskWriterFactory<RowData> {
// Initialize a task writer to write both INSERT and equality DELETE.
if (spec.isUnpartitioned()) {
return new UnpartitionedDeltaWriter(spec, format, appenderFactory, outputFileFactory, io,
- targetFileSizeBytes, schema, flinkSchema, equalityFieldIds);
+ targetFileSizeBytes, schema, flinkSchema, equalityFieldIds, upsert);
} else {
return new PartitionedDeltaWriter(spec, format, appenderFactory, outputFileFactory, io,
- targetFileSizeBytes, schema, flinkSchema, equalityFieldIds);
+ targetFileSizeBytes, schema, flinkSchema, equalityFieldIds, upsert);
}
}
}
diff --git a/flink/src/main/java/org/apache/iceberg/flink/sink/UnpartitionedDeltaWriter.java b/flink/src/main/java/org/apache/iceberg/flink/sink/UnpartitionedDeltaWriter.java
index 341e634..331ed7c 100644
--- a/flink/src/main/java/org/apache/iceberg/flink/sink/UnpartitionedDeltaWriter.java
+++ b/flink/src/main/java/org/apache/iceberg/flink/sink/UnpartitionedDeltaWriter.java
@@ -41,8 +41,10 @@ class UnpartitionedDeltaWriter extends BaseDeltaTaskWriter {
long targetFileSize,
Schema schema,
RowType flinkSchema,
- List<Integer> equalityFieldIds) {
- super(spec, format, appenderFactory, fileFactory, io, targetFileSize, schema, flinkSchema, equalityFieldIds);
+ List<Integer> equalityFieldIds,
+ boolean upsert) {
+ super(spec, format, appenderFactory, fileFactory, io, targetFileSize, schema, flinkSchema, equalityFieldIds,
+ upsert);
this.writer = new RowDataDeltaWriter(null);
}
diff --git a/flink/src/main/java/org/apache/iceberg/flink/source/RowDataRewriter.java b/flink/src/main/java/org/apache/iceberg/flink/source/RowDataRewriter.java
index 752035e..5e8837c 100644
--- a/flink/src/main/java/org/apache/iceberg/flink/source/RowDataRewriter.java
+++ b/flink/src/main/java/org/apache/iceberg/flink/source/RowDataRewriter.java
@@ -77,7 +77,8 @@ public class RowDataRewriter {
flinkSchema,
Long.MAX_VALUE,
format,
- null);
+ null,
+ false);
}
public List<DataFile> rewriteDataForTasks(DataStream<CombinedScanTask> dataStream, int parallelism) throws Exception {
diff --git a/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java b/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java
index bdda3fd..71978fd 100644
--- a/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java
+++ b/flink/src/test/java/org/apache/iceberg/flink/sink/TestDeltaTaskWriter.java
@@ -334,6 +334,6 @@ public class TestDeltaTaskWriter extends TableTestBase {
private TaskWriterFactory<RowData> createTaskWriterFactory(List<Integer> equalityFieldIds) {
return new RowDataTaskWriterFactory(
SerializableTable.copyOf(table), FlinkSchemaUtil.convert(table.schema()),
- 128 * 1024 * 1024, format, equalityFieldIds);
+ 128 * 1024 * 1024, format, equalityFieldIds, false);
}
}
diff --git a/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java b/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java
index fd2a71a..90f662c 100644
--- a/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java
+++ b/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java
@@ -32,6 +32,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
+import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Snapshot;
@@ -144,6 +145,7 @@ public class TestFlinkIcebergSinkV2 extends TableTestBase {
private void testChangeLogs(List<String> equalityFieldColumns,
KeySelector<Row, Object> keySelector,
+ boolean insertAsUpsert,
List<List<Row>> elementsPerCheckpoint,
List<List<Record>> expectedRecordsPerCheckpoint) throws Exception {
DataStream<Row> dataStream = env.addSource(new BoundedTestSource<>(elementsPerCheckpoint), ROW_TYPE_INFO);
@@ -157,6 +159,7 @@ public class TestFlinkIcebergSinkV2 extends TableTestBase {
.tableSchema(SimpleDataUtil.FLINK_SCHEMA)
.writeParallelism(parallelism)
.equalityFieldColumns(equalityFieldColumns)
+ .upsert(insertAsUpsert)
.build();
// Execute the program.
@@ -219,7 +222,8 @@ public class TestFlinkIcebergSinkV2 extends TableTestBase {
ImmutableList.of(record(1, "ddd"), record(2, "ddd"))
);
- testChangeLogs(ImmutableList.of("id"), row -> row.getField(ROW_ID_POS), elementsPerCheckpoint, expectedRecords);
+ testChangeLogs(ImmutableList.of("id"), row -> row.getField(ROW_ID_POS), false,
+ elementsPerCheckpoint, expectedRecords);
}
@Test
@@ -250,7 +254,8 @@ public class TestFlinkIcebergSinkV2 extends TableTestBase {
ImmutableList.of(record(1, "aaa"), record(1, "ccc"), record(2, "aaa"), record(2, "ccc"))
);
- testChangeLogs(ImmutableList.of("data"), row -> row.getField(ROW_DATA_POS), elementsPerCheckpoint, expectedRecords);
+ testChangeLogs(ImmutableList.of("data"), row -> row.getField(ROW_DATA_POS), false,
+ elementsPerCheckpoint, expectedRecords);
}
@Test
@@ -281,7 +286,7 @@ public class TestFlinkIcebergSinkV2 extends TableTestBase {
);
testChangeLogs(ImmutableList.of("data", "id"), row -> Row.of(row.getField(ROW_ID_POS), row.getField(ROW_DATA_POS)),
- elementsPerCheckpoint, expectedRecords);
+ false, elementsPerCheckpoint, expectedRecords);
}
@Test
@@ -319,9 +324,124 @@ public class TestFlinkIcebergSinkV2 extends TableTestBase {
);
testChangeLogs(ImmutableList.of("id", "data"), row -> Row.of(row.getField(ROW_ID_POS), row.getField(ROW_DATA_POS)),
+ false, elementsPerCheckpoint, expectedRecords);
+ }
+
+ @Test
+ public void testUpsertModeCheck() throws Exception {
+ DataStream<Row> dataStream = env.addSource(new BoundedTestSource<>(ImmutableList.of()), ROW_TYPE_INFO);
+ FlinkSink.Builder builder = FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA)
+ .tableLoader(tableLoader)
+ .tableSchema(SimpleDataUtil.FLINK_SCHEMA)
+ .writeParallelism(parallelism)
+ .upsert(true);
+
+ AssertHelpers.assertThrows("Should be error because upsert mode and overwrite mode enable at the same time.",
+ IllegalStateException.class, "OVERWRITE mode shouldn't be enable",
+ () -> builder.equalityFieldColumns(ImmutableList.of("id")).overwrite(true).build()
+ );
+
+ AssertHelpers.assertThrows("Should be error because equality field columns are empty.",
+ IllegalStateException.class, "Equality field columns shouldn't be empty",
+ () -> builder.equalityFieldColumns(ImmutableList.of()).overwrite(false).build()
+ );
+ }
+
+ @Test
+ public void testUpsertOnIdKey() throws Exception {
+ List<List<Row>> elementsPerCheckpoint = ImmutableList.of(
+ ImmutableList.of(
+ row("+I", 1, "aaa"),
+ row("+U", 1, "bbb")
+ ),
+ ImmutableList.of(
+ row("+I", 1, "ccc")
+ ),
+ ImmutableList.of(
+ row("+U", 1, "ddd"),
+ row("+I", 1, "eee")
+ )
+ );
+
+ List<List<Record>> expectedRecords = ImmutableList.of(
+ ImmutableList.of(record(1, "bbb")),
+ ImmutableList.of(record(1, "ccc")),
+ ImmutableList.of(record(1, "eee"))
+ );
+
+ if (!partitioned) {
+ testChangeLogs(ImmutableList.of("id"), row -> row.getField(ROW_ID_POS), true,
+ elementsPerCheckpoint, expectedRecords);
+ } else {
+ AssertHelpers.assertThrows("Should be error because equality field columns don't include all partition keys",
+ IllegalStateException.class, "should be included in equality fields",
+ () -> {
+ testChangeLogs(ImmutableList.of("id"), row -> row.getField(ROW_ID_POS), true, elementsPerCheckpoint,
+ expectedRecords);
+ return null;
+ });
+ }
+ }
+
+ @Test
+ public void testUpsertOnDataKey() throws Exception {
+ List<List<Row>> elementsPerCheckpoint = ImmutableList.of(
+ ImmutableList.of(
+ row("+I", 1, "aaa"),
+ row("+I", 2, "aaa"),
+ row("+I", 3, "bbb")
+ ),
+ ImmutableList.of(
+ row("+U", 4, "aaa"),
+ row("-U", 3, "bbb"),
+ row("+U", 5, "bbb")
+ ),
+ ImmutableList.of(
+ row("+I", 6, "aaa"),
+ row("+U", 7, "bbb")
+ )
+ );
+
+ List<List<Record>> expectedRecords = ImmutableList.of(
+ ImmutableList.of(record(2, "aaa"), record(3, "bbb")),
+ ImmutableList.of(record(4, "aaa"), record(5, "bbb")),
+ ImmutableList.of(record(6, "aaa"), record(7, "bbb"))
+ );
+
+ testChangeLogs(ImmutableList.of("data"), row -> row.getField(ROW_DATA_POS), true,
elementsPerCheckpoint, expectedRecords);
}
+ @Test
+ public void testUpsertOnIdDataKey() throws Exception {
+ List<List<Row>> elementsPerCheckpoint = ImmutableList.of(
+ ImmutableList.of(
+ row("+I", 1, "aaa"),
+ row("+U", 1, "aaa"),
+ row("+I", 2, "bbb")
+ ),
+ ImmutableList.of(
+ row("+I", 1, "aaa"),
+ row("-D", 2, "bbb"),
+ row("+I", 2, "ccc")
+ ),
+ ImmutableList.of(
+ row("+U", 1, "bbb"),
+ row("-U", 1, "ccc"),
+ row("-D", 1, "aaa")
+ )
+ );
+
+ List<List<Record>> expectedRecords = ImmutableList.of(
+ ImmutableList.of(record(1, "aaa"), record(2, "bbb")),
+ ImmutableList.of(record(1, "aaa"), record(2, "ccc")),
+ ImmutableList.of(record(1, "bbb"), record(2, "ccc"))
+ );
+
+ testChangeLogs(ImmutableList.of("id", "data"), row -> Row.of(row.getField(ROW_ID_POS), row.getField(ROW_DATA_POS)),
+ true, elementsPerCheckpoint, expectedRecords);
+ }
+
private StructLikeSet expectedRowSet(Record... records) {
return SimpleDataUtil.expectedRowSet(table, records);
}
diff --git a/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java b/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java
index e5ffd01..7419775 100644
--- a/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java
+++ b/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java
@@ -340,7 +340,7 @@ public class TestIcebergStreamWriter {
private OneInputStreamOperatorTestHarness<RowData, WriteResult> createIcebergStreamWriter(
Table icebergTable, TableSchema flinkSchema) throws Exception {
RowType flinkRowType = FlinkSink.toFlinkRowType(icebergTable.schema(), flinkSchema);
- IcebergStreamWriter<RowData> streamWriter = FlinkSink.createStreamWriter(icebergTable, flinkRowType, null);
+ IcebergStreamWriter<RowData> streamWriter = FlinkSink.createStreamWriter(icebergTable, flinkRowType, null, false);
OneInputStreamOperatorTestHarness<RowData, WriteResult> harness = new OneInputStreamOperatorTestHarness<>(
streamWriter, 1, 1, 0);
diff --git a/flink/src/test/java/org/apache/iceberg/flink/sink/TestTaskWriters.java b/flink/src/test/java/org/apache/iceberg/flink/sink/TestTaskWriters.java
index 9eee57f..562a75e 100644
--- a/flink/src/test/java/org/apache/iceberg/flink/sink/TestTaskWriters.java
+++ b/flink/src/test/java/org/apache/iceberg/flink/sink/TestTaskWriters.java
@@ -239,7 +239,7 @@ public class TestTaskWriters {
private TaskWriter<RowData> createTaskWriter(long targetFileSize) {
TaskWriterFactory<RowData> taskWriterFactory = new RowDataTaskWriterFactory(
SerializableTable.copyOf(table), (RowType) SimpleDataUtil.FLINK_SCHEMA.toRowDataType().getLogicalType(),
- targetFileSize, format, null);
+ targetFileSize, format, null, false);
taskWriterFactory.initialize(1, 1);
return taskWriterFactory.create();
}