You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by st...@apache.org on 2023/05/23 03:33:27 UTC

[iceberg] branch master updated: Flink: Port #7171 to 1.15 (#7679)

This is an automated email from the ASF dual-hosted git repository.

stevenwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 53683f87fb Flink: Port #7171 to 1.15 (#7679)
53683f87fb is described below

commit 53683f87fb0f1dab12358f826e6f8a7b1ce13b25
Author: Xianyang Liu <li...@hotmail.com>
AuthorDate: Tue May 23 11:33:20 2023 +0800

    Flink: Port #7171 to 1.15 (#7679)
    
    Co-authored-by: xianyangliu <xi...@tencent.com>
---
 .../iceberg/flink/sink/FlinkManifestUtil.java      |  20 ++-
 .../org/apache/iceberg/flink/sink/FlinkSink.java   |   3 +-
 .../iceberg/flink/sink/IcebergFilesCommitter.java  |  15 ++-
 .../org/apache/iceberg/flink/SimpleDataUtil.java   |  29 +++-
 .../iceberg/flink/sink/TestFlinkManifest.java      |   9 +-
 .../flink/sink/TestIcebergFilesCommitter.java      | 148 ++++++++++++++++++++-
 6 files changed, 200 insertions(+), 24 deletions(-)

diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java
index 25badc372a..00d55f937c 100644
--- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java
@@ -20,6 +20,7 @@ package org.apache.iceberg.flink.sink;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.Map;
 import java.util.function.Supplier;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DeleteFile;
@@ -54,8 +55,10 @@ class FlinkManifestUtil {
     return writer.toManifestFile();
   }
 
-  static List<DataFile> readDataFiles(ManifestFile manifestFile, FileIO io) throws IOException {
-    try (CloseableIterable<DataFile> dataFiles = ManifestFiles.read(manifestFile, io)) {
+  static List<DataFile> readDataFiles(
+      ManifestFile manifestFile, FileIO io, Map<Integer, PartitionSpec> specsById)
+      throws IOException {
+    try (CloseableIterable<DataFile> dataFiles = ManifestFiles.read(manifestFile, io, specsById)) {
       return Lists.newArrayList(dataFiles);
     }
   }
@@ -73,6 +76,12 @@ class FlinkManifestUtil {
         attemptNumber);
   }
 
+  /**
+   * Write the {@link WriteResult} to temporary manifest files.
+   *
+   * @param result all those DataFiles/DeleteFiles in this WriteResult should be written with same
+   *     partition spec
+   */
   static DeltaManifests writeCompletedFiles(
       WriteResult result, Supplier<OutputFile> outputFileSupplier, PartitionSpec spec)
       throws IOException {
@@ -104,19 +113,20 @@ class FlinkManifestUtil {
     return new DeltaManifests(dataManifest, deleteManifest, result.referencedDataFiles());
   }
 
-  static WriteResult readCompletedFiles(DeltaManifests deltaManifests, FileIO io)
+  static WriteResult readCompletedFiles(
+      DeltaManifests deltaManifests, FileIO io, Map<Integer, PartitionSpec> specsById)
       throws IOException {
     WriteResult.Builder builder = WriteResult.builder();
 
     // Read the completed data files from persisted data manifest file.
     if (deltaManifests.dataManifest() != null) {
-      builder.addDataFiles(readDataFiles(deltaManifests.dataManifest(), io));
+      builder.addDataFiles(readDataFiles(deltaManifests.dataManifest(), io, specsById));
     }
 
     // Read the completed delete files from persisted delete manifests file.
     if (deltaManifests.deleteManifest() != null) {
       try (CloseableIterable<DeleteFile> deleteFiles =
-          ManifestFiles.readDeleteManifest(deltaManifests.deleteManifest(), io, null)) {
+          ManifestFiles.readDeleteManifest(deltaManifests.deleteManifest(), io, specsById)) {
         builder.addDeleteFiles(deleteFiles);
       }
     }
diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
index 46a229d4c8..844efd6ca6 100644
--- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
@@ -428,7 +428,8 @@ public class FlinkSink {
               flinkWriteConf.overwriteMode(),
               snapshotProperties,
               flinkWriteConf.workerPoolSize(),
-              flinkWriteConf.branch());
+              flinkWriteConf.branch(),
+              table.spec());
       SingleOutputStreamOperator<Void> committerStream =
           writerStream
               .transform(operatorName(ICEBERG_FILES_COMMITTER_NAME), Types.VOID, filesCommitter)
diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
index e11975b3ef..3805ab2984 100644
--- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
@@ -41,6 +41,7 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.table.runtime.typeutils.SortedMapTypeInfo;
 import org.apache.iceberg.AppendFiles;
 import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.ReplacePartitions;
 import org.apache.iceberg.RowDelta;
 import org.apache.iceberg.Snapshot;
@@ -48,6 +49,7 @@ import org.apache.iceberg.SnapshotUpdate;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.flink.TableLoader;
 import org.apache.iceberg.io.WriteResult;
+import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
 import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.base.Strings;
@@ -120,6 +122,7 @@ class IcebergFilesCommitter extends AbstractStreamOperator<Void>
   private transient ListState<SortedMap<Long, byte[]>> checkpointsState;
 
   private final Integer workerPoolSize;
+  private final PartitionSpec spec;
   private transient ExecutorService workerPool;
 
   IcebergFilesCommitter(
@@ -127,12 +130,14 @@ class IcebergFilesCommitter extends AbstractStreamOperator<Void>
       boolean replacePartitions,
       Map<String, String> snapshotProperties,
       Integer workerPoolSize,
-      String branch) {
+      String branch,
+      PartitionSpec spec) {
     this.tableLoader = tableLoader;
     this.replacePartitions = replacePartitions;
     this.snapshotProperties = snapshotProperties;
     this.workerPoolSize = workerPoolSize;
     this.branch = branch;
+    this.spec = spec;
   }
 
   @Override
@@ -263,7 +268,8 @@ class IcebergFilesCommitter extends AbstractStreamOperator<Void>
           SimpleVersionedSerialization.readVersionAndDeSerialize(
               DeltaManifestsSerializer.INSTANCE, e.getValue());
       pendingResults.put(
-          e.getKey(), FlinkManifestUtil.readCompletedFiles(deltaManifests, table.io()));
+          e.getKey(),
+          FlinkManifestUtil.readCompletedFiles(deltaManifests, table.io(), table.specs()));
       manifests.addAll(deltaManifests.manifests());
     }
 
@@ -443,7 +449,7 @@ class IcebergFilesCommitter extends AbstractStreamOperator<Void>
     WriteResult result = WriteResult.builder().addAll(writeResultsOfCurrentCkpt).build();
     DeltaManifests deltaManifests =
         FlinkManifestUtil.writeCompletedFiles(
-            result, () -> manifestOutputFileFactory.create(checkpointId), table.spec());
+            result, () -> manifestOutputFileFactory.create(checkpointId), spec);
 
     return SimpleVersionedSerialization.writeVersionAndSerialize(
         DeltaManifestsSerializer.INSTANCE, deltaManifests);
@@ -469,7 +475,8 @@ class IcebergFilesCommitter extends AbstractStreamOperator<Void>
     }
   }
 
-  private static ListStateDescriptor<SortedMap<Long, byte[]>> buildStateDescriptor() {
+  @VisibleForTesting
+  static ListStateDescriptor<SortedMap<Long, byte[]>> buildStateDescriptor() {
     Comparator<Long> longComparator = Comparators.forType(Types.LongType.get());
     // Construct a SortedMapTypeInfo.
     SortedMapTypeInfo<Long, byte[]> sortedMapTypeInfo =
diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java
index 345d88a48a..408c281004 100644
--- a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java
+++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java
@@ -44,6 +44,7 @@ import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.SnapshotRef;
+import org.apache.iceberg.StructLike;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableScan;
 import org.apache.iceberg.data.GenericRecord;
@@ -134,6 +135,20 @@ public class SimpleDataUtil {
       String filename,
       List<RowData> rows)
       throws IOException {
+    return writeFile(table, schema, spec, conf, location, filename, rows, null);
+  }
+
+  /** Write the list of {@link RowData} to the given path and with the given partition data */
+  public static DataFile writeFile(
+      Table table,
+      Schema schema,
+      PartitionSpec spec,
+      Configuration conf,
+      String location,
+      String filename,
+      List<RowData> rows,
+      StructLike partition)
+      throws IOException {
     Path path = new Path(location, filename);
     FileFormat fileFormat = FileFormat.fromFileName(filename);
     Preconditions.checkNotNull(fileFormat, "Cannot determine format for file: %s", filename);
@@ -148,10 +163,16 @@ public class SimpleDataUtil {
       closeableAppender.addAll(rows);
     }
 
-    return DataFiles.builder(spec)
-        .withInputFile(HadoopInputFile.fromPath(path, conf))
-        .withMetrics(appender.metrics())
-        .build();
+    DataFiles.Builder builder =
+        DataFiles.builder(spec)
+            .withInputFile(HadoopInputFile.fromPath(path, conf))
+            .withMetrics(appender.metrics());
+
+    if (partition != null) {
+      builder = builder.withPartition(partition);
+    }
+
+    return builder.build();
   }
 
   public static DeleteFile writeEqDeleteFile(
diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java
index 3680142010..f171485a90 100644
--- a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java
+++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java
@@ -110,7 +110,8 @@ public class TestFlinkManifest {
               () -> factory.create(curCkpId),
               table.spec());
 
-      WriteResult result = FlinkManifestUtil.readCompletedFiles(deltaManifests, table.io());
+      WriteResult result =
+          FlinkManifestUtil.readCompletedFiles(deltaManifests, table.io(), table.specs());
       Assert.assertEquals("Size of data file list are not equal.", 10, result.deleteFiles().length);
       for (int i = 0; i < dataFiles.size(); i++) {
         TestHelpers.assertEquals(dataFiles.get(i), result.dataFiles()[i]);
@@ -157,7 +158,8 @@ public class TestFlinkManifest {
         userProvidedFolder.toPath(),
         Paths.get(deltaManifests.dataManifest().path()).getParent());
 
-    WriteResult result = FlinkManifestUtil.readCompletedFiles(deltaManifests, table.io());
+    WriteResult result =
+        FlinkManifestUtil.readCompletedFiles(deltaManifests, table.io(), table.specs());
 
     Assert.assertEquals(0, result.deleteFiles().length);
     Assert.assertEquals(5, result.dataFiles().length);
@@ -228,7 +230,8 @@ public class TestFlinkManifest {
         "Serialization v1 should not have null data manifest.", delta.dataManifest());
     TestHelpers.assertEquals(manifest, delta.dataManifest());
 
-    List<DataFile> actualFiles = FlinkManifestUtil.readDataFiles(delta.dataManifest(), table.io());
+    List<DataFile> actualFiles =
+        FlinkManifestUtil.readDataFiles(delta.dataManifest(), table.io(), table.specs());
     Assert.assertEquals(10, actualFiles.size());
     for (int i = 0; i < 10; i++) {
       TestHelpers.assertEquals(dataFiles.get(i), actualFiles.get(i));
diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java
index a4f29d47f4..a2fe092b7c 100644
--- a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java
+++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java
@@ -21,6 +21,7 @@ package org.apache.iceberg.flink.sink;
 import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
 import static org.apache.iceberg.flink.sink.IcebergFilesCommitter.MAX_CONTINUOUS_EMPTY_COMMITS;
 import static org.apache.iceberg.flink.sink.ManifestOutputFileFactory.FLINK_MANIFEST_LOCATION;
+import static org.assertj.core.api.Assertions.assertThat;
 
 import java.io.File;
 import java.io.IOException;
@@ -28,9 +29,14 @@ import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.Collections;
 import java.util.List;
+import java.util.NavigableMap;
+import java.util.SortedMap;
 import java.util.stream.Collectors;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.OperatorStateStore;
+import org.apache.flink.core.io.SimpleVersionedSerialization;
 import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.operators.testutils.MockEnvironment;
@@ -50,7 +56,9 @@ import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.GenericManifestFile;
 import org.apache.iceberg.ManifestContent;
 import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.PartitionData;
 import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.StructLike;
 import org.apache.iceberg.TableTestBase;
 import org.apache.iceberg.flink.FlinkSchemaUtil;
 import org.apache.iceberg.flink.SimpleDataUtil;
@@ -60,6 +68,7 @@ import org.apache.iceberg.io.FileAppenderFactory;
 import org.apache.iceberg.io.WriteResult;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.util.Pair;
 import org.apache.iceberg.util.ThreadPools;
 import org.junit.Assert;
@@ -744,7 +753,8 @@ public class TestIcebergFilesCommitter extends TableTestBase {
 
       // 2. Read the data files from manifests and assert.
       List<DataFile> dataFiles =
-          FlinkManifestUtil.readDataFiles(createTestingManifestFile(manifestPath), table.io());
+          FlinkManifestUtil.readDataFiles(
+              createTestingManifestFile(manifestPath), table.io(), table.specs());
       Assert.assertEquals(1, dataFiles.size());
       TestHelpers.assertEquals(dataFile1, dataFiles.get(0));
 
@@ -790,7 +800,8 @@ public class TestIcebergFilesCommitter extends TableTestBase {
 
       // 2. Read the data files from manifests and assert.
       List<DataFile> dataFiles =
-          FlinkManifestUtil.readDataFiles(createTestingManifestFile(manifestPath), table.io());
+          FlinkManifestUtil.readDataFiles(
+              createTestingManifestFile(manifestPath), table.io(), table.specs());
       Assert.assertEquals(1, dataFiles.size());
       TestHelpers.assertEquals(dataFile1, dataFiles.get(0));
 
@@ -877,6 +888,112 @@ public class TestIcebergFilesCommitter extends TableTestBase {
     }
   }
 
+  @Test
+  public void testSpecEvolution() throws Exception {
+    long timestamp = 0;
+    int checkpointId = 0;
+    List<RowData> rows = Lists.newArrayList();
+    JobID jobId = new JobID();
+
+    OperatorID operatorId;
+    OperatorSubtaskState snapshot;
+    DataFile dataFile;
+    int specId;
+
+    try (OneInputStreamOperatorTestHarness<WriteResult, Void> harness = createStreamSink(jobId)) {
+      harness.setup();
+      harness.open();
+      operatorId = harness.getOperator().getOperatorID();
+
+      assertSnapshotSize(0);
+
+      checkpointId++;
+      RowData rowData = SimpleDataUtil.createRowData(checkpointId, "hello" + checkpointId);
+      // table unpartitioned
+      dataFile = writeDataFile("data-" + checkpointId, ImmutableList.of(rowData));
+      harness.processElement(of(dataFile), ++timestamp);
+      rows.add(rowData);
+      harness.snapshot(checkpointId, ++timestamp);
+
+      specId =
+          getStagingManifestSpecId(harness.getOperator().getOperatorStateBackend(), checkpointId);
+      assertThat(specId).isEqualTo(table.spec().specId());
+
+      harness.notifyOfCompletedCheckpoint(checkpointId);
+
+      // Change partition spec
+      table.refresh();
+      PartitionSpec oldSpec = table.spec();
+      table.updateSpec().addField("id").commit();
+
+      checkpointId++;
+      rowData = SimpleDataUtil.createRowData(checkpointId, "hello" + checkpointId);
+      // write data with old partition spec
+      dataFile = writeDataFile("data-" + checkpointId, ImmutableList.of(rowData), oldSpec, null);
+      harness.processElement(of(dataFile), ++timestamp);
+      rows.add(rowData);
+      snapshot = harness.snapshot(checkpointId, ++timestamp);
+
+      specId =
+          getStagingManifestSpecId(harness.getOperator().getOperatorStateBackend(), checkpointId);
+      assertThat(specId).isEqualTo(oldSpec.specId());
+
+      harness.notifyOfCompletedCheckpoint(checkpointId);
+
+      assertFlinkManifests(0);
+
+      SimpleDataUtil.assertTableRows(table, ImmutableList.copyOf(rows), branch);
+      assertSnapshotSize(checkpointId);
+      assertMaxCommittedCheckpointId(jobId, operatorId, checkpointId);
+    }
+
+    // Restore from the given snapshot
+    try (OneInputStreamOperatorTestHarness<WriteResult, Void> harness = createStreamSink(jobId)) {
+      harness.getStreamConfig().setOperatorID(operatorId);
+      harness.setup();
+      harness.initializeState(snapshot);
+      harness.open();
+
+      SimpleDataUtil.assertTableRows(table, rows, branch);
+      assertSnapshotSize(checkpointId);
+      assertMaxCommittedCheckpointId(jobId, operatorId, checkpointId);
+
+      checkpointId++;
+      RowData row = SimpleDataUtil.createRowData(checkpointId, "world" + checkpointId);
+      StructLike partition = new PartitionData(table.spec().partitionType());
+      partition.set(0, checkpointId);
+      dataFile =
+          writeDataFile("data-" + checkpointId, ImmutableList.of(row), table.spec(), partition);
+      harness.processElement(of(dataFile), ++timestamp);
+      rows.add(row);
+      harness.snapshot(checkpointId, ++timestamp);
+      assertFlinkManifests(1);
+
+      specId =
+          getStagingManifestSpecId(harness.getOperator().getOperatorStateBackend(), checkpointId);
+      assertThat(specId).isEqualTo(table.spec().specId());
+
+      harness.notifyOfCompletedCheckpoint(checkpointId);
+      assertFlinkManifests(0);
+
+      SimpleDataUtil.assertTableRows(table, rows, branch);
+      assertSnapshotSize(checkpointId);
+      assertMaxCommittedCheckpointId(jobId, operatorId, checkpointId);
+    }
+  }
+
+  private int getStagingManifestSpecId(OperatorStateStore operatorStateStore, long checkPointId)
+      throws Exception {
+    ListState<SortedMap<Long, byte[]>> checkpointsState =
+        operatorStateStore.getListState(IcebergFilesCommitter.buildStateDescriptor());
+    NavigableMap<Long, byte[]> statedDataFiles =
+        Maps.newTreeMap(checkpointsState.get().iterator().next());
+    DeltaManifests deltaManifests =
+        SimpleVersionedSerialization.readVersionAndDeSerialize(
+            DeltaManifestsSerializer.INSTANCE, statedDataFiles.get(checkPointId));
+    return deltaManifests.dataManifest().partitionSpecId();
+  }
+
   private DeleteFile writeEqDeleteFile(
       FileAppenderFactory<RowData> appenderFactory, String filename, List<RowData> deletes)
       throws IOException {
@@ -949,6 +1066,20 @@ public class TestIcebergFilesCommitter extends TableTestBase {
         rows);
   }
 
+  private DataFile writeDataFile(
+      String filename, List<RowData> rows, PartitionSpec spec, StructLike partition)
+      throws IOException {
+    return SimpleDataUtil.writeFile(
+        table,
+        table.schema(),
+        spec,
+        CONF,
+        table.location(),
+        format.addExtension(filename),
+        rows,
+        partition);
+  }
+
   private void assertMaxCommittedCheckpointId(JobID jobID, OperatorID operatorID, long expectedId) {
     table.refresh();
     long actualId =
@@ -964,7 +1095,7 @@ public class TestIcebergFilesCommitter extends TableTestBase {
 
   private OneInputStreamOperatorTestHarness<WriteResult, Void> createStreamSink(JobID jobID)
       throws Exception {
-    TestOperatorFactory factory = TestOperatorFactory.of(table.location(), branch);
+    TestOperatorFactory factory = TestOperatorFactory.of(table.location(), branch, table.spec());
     return new OneInputStreamOperatorTestHarness<>(factory, createEnvironment(jobID));
   }
 
@@ -985,14 +1116,16 @@ public class TestIcebergFilesCommitter extends TableTestBase {
       implements OneInputStreamOperatorFactory<WriteResult, Void> {
     private final String tablePath;
     private final String branch;
+    private final PartitionSpec spec;
 
-    private TestOperatorFactory(String tablePath, String branch) {
+    private TestOperatorFactory(String tablePath, String branch, PartitionSpec spec) {
       this.tablePath = tablePath;
       this.branch = branch;
+      this.spec = spec;
     }
 
-    private static TestOperatorFactory of(String tablePath, String branch) {
-      return new TestOperatorFactory(tablePath, branch);
+    private static TestOperatorFactory of(String tablePath, String branch, PartitionSpec spec) {
+      return new TestOperatorFactory(tablePath, branch, spec);
     }
 
     @Override
@@ -1005,7 +1138,8 @@ public class TestIcebergFilesCommitter extends TableTestBase {
               false,
               Collections.singletonMap("flink.test", TestIcebergFilesCommitter.class.getName()),
               ThreadPools.WORKER_THREAD_POOL_SIZE,
-              branch);
+              branch,
+              spec);
       committer.setup(param.getContainingTask(), param.getStreamConfig(), param.getOutput());
       return (T) committer;
     }