You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by st...@apache.org on 2023/02/28 23:43:51 UTC

[iceberg] branch master updated: Flink: Backport PR #6660 to Flink 1.14 and 1.15 (#6949)

This is an automated email from the ASF dual-hosted git repository.

stevenwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new d1679dfac5 Flink: Backport PR #6660 to Flink 1.14 and 1.15 (#6949)
d1679dfac5 is described below

commit d1679dfac55d9cc12d8f869a85109054a41ebd31
Author: Amogh Jahagirdar <ja...@amazon.com>
AuthorDate: Tue Feb 28 15:43:44 2023 -0800

    Flink: Backport PR #6660 to Flink 1.14 and 1.15 (#6949)
---
 .../org/apache/iceberg/flink/FlinkWriteConf.java   |   8 +
 .../apache/iceberg/flink/FlinkWriteOptions.java    |   5 +
 .../org/apache/iceberg/flink/sink/FlinkSink.java   |  11 +-
 .../iceberg/flink/sink/IcebergFilesCommitter.java  |  22 +-
 .../org/apache/iceberg/flink/SimpleDataUtil.java   |  46 +-
 .../iceberg/flink/sink/TestFlinkIcebergSink.java   |  35 +-
 .../flink/sink/TestFlinkIcebergSinkBase.java       |  64 +++
 .../flink/sink/TestFlinkIcebergSinkBranch.java     | 137 ++++++
 .../iceberg/flink/sink/TestFlinkIcebergSinkV2.java | 310 +------------
 ...SinkV2.java => TestFlinkIcebergSinkV2Base.java} | 511 +++++++--------------
 .../flink/sink/TestFlinkIcebergSinkV2Branch.java   | 134 ++++++
 .../flink/sink/TestIcebergFilesCommitter.java      |  87 ++--
 .../org/apache/iceberg/flink/FlinkWriteConf.java   |   8 +
 .../apache/iceberg/flink/FlinkWriteOptions.java    |   5 +
 .../org/apache/iceberg/flink/sink/FlinkSink.java   |   8 +-
 .../iceberg/flink/sink/IcebergFilesCommitter.java  |  22 +-
 .../org/apache/iceberg/flink/SimpleDataUtil.java   |  46 +-
 .../iceberg/flink/sink/TestFlinkIcebergSink.java   |  35 +-
 .../flink/sink/TestFlinkIcebergSinkBase.java       |  64 +++
 .../flink/sink/TestFlinkIcebergSinkBranch.java     | 137 ++++++
 .../iceberg/flink/sink/TestFlinkIcebergSinkV2.java | 310 +------------
 .../flink/sink/TestFlinkIcebergSinkV2Base.java}    | 511 +++++++--------------
 .../flink/sink/TestFlinkIcebergSinkV2Branch.java   | 134 ++++++
 .../flink/sink/TestIcebergFilesCommitter.java      |  87 ++--
 24 files changed, 1284 insertions(+), 1453 deletions(-)

diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java
index 448b2aa2d8..4b5c7e4a0d 100644
--- a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java
+++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java
@@ -172,4 +172,12 @@ public class FlinkWriteConf {
         .defaultValue(FlinkConfigOptions.TABLE_EXEC_ICEBERG_WORKER_POOL_SIZE.defaultValue())
         .parse();
   }
+
+  public String branch() {
+    return confParser
+        .stringConf()
+        .option(FlinkWriteOptions.BRANCH.key())
+        .defaultValue(FlinkWriteOptions.BRANCH.defaultValue())
+        .parse();
+  }
 }
diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java
index f3cc52972b..86cb2fb0eb 100644
--- a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java
+++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java
@@ -20,6 +20,7 @@ package org.apache.iceberg.flink;
 
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
+import org.apache.iceberg.SnapshotRef;
 
 /** Flink sink write options */
 public class FlinkWriteOptions {
@@ -56,4 +57,8 @@ public class FlinkWriteOptions {
   // Overrides the table's write.distribution-mode
   public static final ConfigOption<String> DISTRIBUTION_MODE =
       ConfigOptions.key("distribution-mode").stringType().noDefaultValue();
+
+  // Branch to write to
+  public static final ConfigOption<String> BRANCH =
+      ConfigOptions.key("branch").stringType().defaultValue(SnapshotRef.MAIN_BRANCH);
 }
diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
index ead0b757e5..445b6a6ff9 100644
--- a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
+++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
@@ -132,10 +132,7 @@ public class FlinkSink {
     private TableLoader tableLoader;
     private Table table;
     private TableSchema tableSchema;
-    private boolean overwrite = false;
-    private DistributionMode distributionMode = null;
     private Integer writeParallelism = null;
-    private boolean upsert = false;
     private List<String> equalityFieldColumns = null;
     private String uidPrefix = null;
     private final Map<String, String> snapshotProperties = Maps.newHashMap();
@@ -319,6 +316,11 @@ public class FlinkSink {
       return this;
     }
 
+    public Builder toBranch(String branch) {
+      writeOptions.put(FlinkWriteOptions.BRANCH.key(), branch);
+      return this;
+    }
+
     private <T> DataStreamSink<T> chainIcebergOperators() {
       Preconditions.checkArgument(
           inputCreator != null,
@@ -425,7 +427,8 @@ public class FlinkSink {
               tableLoader,
               flinkWriteConf.overwriteMode(),
               snapshotProperties,
-              flinkWriteConf.workerPoolSize());
+              flinkWriteConf.workerPoolSize(),
+              flinkWriteConf.branch());
       SingleOutputStreamOperator<Void> committerStream =
           writerStream
               .transform(operatorName(ICEBERG_FILES_COMMITTER_NAME), Types.VOID, filesCommitter)
diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
index d8a7bc5cf2..22b4dc9d21 100644
--- a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
+++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
@@ -95,6 +95,7 @@ class IcebergFilesCommitter extends AbstractStreamOperator<Void>
   // The completed files cache for current checkpoint. Once the snapshot barrier received, it will
   // be flushed to the 'dataFilesPerCheckpoint'.
   private final List<WriteResult> writeResultsOfCurrentCkpt = Lists.newArrayList();
+  private final String branch;
 
   // It will have an unique identifier for one job.
   private transient String flinkJobId;
@@ -125,11 +126,13 @@ class IcebergFilesCommitter extends AbstractStreamOperator<Void>
       TableLoader tableLoader,
       boolean replacePartitions,
       Map<String, String> snapshotProperties,
-      Integer workerPoolSize) {
+      Integer workerPoolSize,
+      String branch) {
     this.tableLoader = tableLoader;
     this.replacePartitions = replacePartitions;
     this.snapshotProperties = snapshotProperties;
     this.workerPoolSize = workerPoolSize;
+    this.branch = branch;
   }
 
   @Override
@@ -179,7 +182,7 @@ class IcebergFilesCommitter extends AbstractStreamOperator<Void>
       // it's safe to assign the max committed checkpoint id from restored flink job to the current
       // flink job.
       this.maxCommittedCheckpointId =
-          getMaxCommittedCheckpointId(table, restoredFlinkJobId, operatorUniqueId);
+          getMaxCommittedCheckpointId(table, restoredFlinkJobId, operatorUniqueId, branch);
 
       NavigableMap<Long, byte[]> uncommittedDataFiles =
           Maps.newTreeMap(checkpointsState.get().iterator().next())
@@ -230,7 +233,6 @@ class IcebergFilesCommitter extends AbstractStreamOperator<Void>
     // the files,
     // Besides, we need to maintain the max-committed-checkpoint-id to be increasing.
     if (checkpointId > maxCommittedCheckpointId) {
-      LOG.info("Checkpoint {} completed. Attempting commit.", checkpointId);
       commitUpToCheckpoint(dataFilesPerCheckpoint, flinkJobId, operatorUniqueId, checkpointId);
       this.maxCommittedCheckpointId = checkpointId;
     } else {
@@ -286,8 +288,6 @@ class IcebergFilesCommitter extends AbstractStreamOperator<Void>
         commitDeltaTxn(pendingResults, summary, newFlinkJobId, operatorId, checkpointId);
       }
       continuousEmptyCheckpoints = 0;
-    } else {
-      LOG.info("Skip commit for checkpoint {} due to no data files or delete files.", checkpointId);
     }
   }
 
@@ -386,10 +386,11 @@ class IcebergFilesCommitter extends AbstractStreamOperator<Void>
       String operatorId,
       long checkpointId) {
     LOG.info(
-        "Committing {} for checkpoint {} to table {} with summary: {}",
+        "Committing {} for checkpoint {} to table {} branch {} with summary: {}",
         description,
         checkpointId,
         table.name(),
+        branch,
         summary);
     snapshotProperties.forEach(operation::set);
     // custom snapshot metadata properties will be overridden if they conflict with internal ones
@@ -397,14 +398,16 @@ class IcebergFilesCommitter extends AbstractStreamOperator<Void>
     operation.set(MAX_COMMITTED_CHECKPOINT_ID, Long.toString(checkpointId));
     operation.set(FLINK_JOB_ID, newFlinkJobId);
     operation.set(OPERATOR_ID, operatorId);
+    operation.toBranch(branch);
 
     long startNano = System.nanoTime();
     operation.commit(); // abort is automatically called if this fails.
     long durationMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNano);
     LOG.info(
-        "Committed {} to table: {}, checkpointId {} in {} ms",
+        "Committed {} to table: {}, branch: {}, checkpointId {} in {} ms",
         description,
         table.name(),
+        branch,
         checkpointId,
         durationMs);
     committerMetrics.commitDuration(durationMs);
@@ -474,8 +477,9 @@ class IcebergFilesCommitter extends AbstractStreamOperator<Void>
     return new ListStateDescriptor<>("iceberg-files-committer-state", sortedMapTypeInfo);
   }
 
-  static long getMaxCommittedCheckpointId(Table table, String flinkJobId, String operatorId) {
-    Snapshot snapshot = table.currentSnapshot();
+  static long getMaxCommittedCheckpointId(
+      Table table, String flinkJobId, String operatorId, String branch) {
+    Snapshot snapshot = table.snapshot(branch);
     long lastCommittedCheckpointId = INITIAL_CHECKPOINT_ID;
 
     while (snapshot != null) {
diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java
index e296763508..345d88a48a 100644
--- a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java
+++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java
@@ -43,6 +43,7 @@ import org.apache.iceberg.FileScanTask;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotRef;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableScan;
 import org.apache.iceberg.data.GenericRecord;
@@ -206,12 +207,18 @@ public class SimpleDataUtil {
     return records;
   }
 
-  public static void assertTableRows(String tablePath, List<RowData> expected) throws IOException {
-    assertTableRecords(tablePath, convertToRecords(expected));
+  public static void assertTableRows(String tablePath, List<RowData> expected, String branch)
+      throws IOException {
+    assertTableRecords(tablePath, convertToRecords(expected), branch);
   }
 
   public static void assertTableRows(Table table, List<RowData> expected) throws IOException {
-    assertTableRecords(table, convertToRecords(expected));
+    assertTableRecords(table, convertToRecords(expected), SnapshotRef.MAIN_BRANCH);
+  }
+
+  public static void assertTableRows(Table table, List<RowData> expected, String branch)
+      throws IOException {
+    assertTableRecords(table, convertToRecords(expected), branch);
   }
 
   /** Get all rows for a table */
@@ -267,13 +274,25 @@ public class SimpleDataUtil {
   }
 
   public static void assertTableRecords(Table table, List<Record> expected) throws IOException {
+    assertTableRecords(table, expected, SnapshotRef.MAIN_BRANCH);
+  }
+
+  public static void assertTableRecords(Table table, List<Record> expected, String branch)
+      throws IOException {
     table.refresh();
+    Snapshot snapshot = latestSnapshot(table, branch);
+
+    if (snapshot == null) {
+      Assert.assertEquals(expected, ImmutableList.of());
+      return;
+    }
 
     Types.StructType type = table.schema().asStruct();
     StructLikeSet expectedSet = StructLikeSet.create(type);
     expectedSet.addAll(expected);
 
-    try (CloseableIterable<Record> iterable = IcebergGenerics.read(table).build()) {
+    try (CloseableIterable<Record> iterable =
+        IcebergGenerics.read(table).useSnapshot(snapshot.snapshotId()).build()) {
       StructLikeSet actualSet = StructLikeSet.create(type);
 
       for (Record record : iterable) {
@@ -284,10 +303,27 @@ public class SimpleDataUtil {
     }
   }
 
+  // Returns the latest snapshot of the given branch in the table
+  public static Snapshot latestSnapshot(Table table, String branch) {
+    // For the main branch, currentSnapshot() is used to validate that the API behavior has
+    // not changed since that was the API used for validation prior to addition of branches.
+    if (branch.equals(SnapshotRef.MAIN_BRANCH)) {
+      return table.currentSnapshot();
+    }
+
+    return table.snapshot(branch);
+  }
+
   public static void assertTableRecords(String tablePath, List<Record> expected)
       throws IOException {
     Preconditions.checkArgument(expected != null, "expected records shouldn't be null");
-    assertTableRecords(new HadoopTables().load(tablePath), expected);
+    assertTableRecords(new HadoopTables().load(tablePath), expected, SnapshotRef.MAIN_BRANCH);
+  }
+
+  public static void assertTableRecords(String tablePath, List<Record> expected, String branch)
+      throws IOException {
+    Preconditions.checkArgument(expected != null, "expected records shouldn't be null");
+    assertTableRecords(new HadoopTables().load(tablePath), expected, branch);
   }
 
   public static StructLikeSet expectedRowSet(Table table, Record... records) {
diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java
index c2af30d342..23beb19a72 100644
--- a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java
+++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java
@@ -22,14 +22,10 @@ import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.stream.Collectors;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.data.util.DataFormatConverters;
 import org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.flink.types.Row;
 import org.apache.iceberg.AssertHelpers;
@@ -45,7 +41,6 @@ import org.apache.iceberg.flink.MiniClusterResource;
 import org.apache.iceberg.flink.SimpleDataUtil;
 import org.apache.iceberg.flink.TableLoader;
 import org.apache.iceberg.flink.TestFixtures;
-import org.apache.iceberg.flink.source.BoundedTestSource;
 import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
@@ -60,7 +55,7 @@ import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
 @RunWith(Parameterized.class)
-public class TestFlinkIcebergSink {
+public class TestFlinkIcebergSink extends TestFlinkIcebergSinkBase {
 
   @ClassRule
   public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE =
@@ -72,13 +67,6 @@ public class TestFlinkIcebergSink {
   public final HadoopCatalogResource catalogResource =
       new HadoopCatalogResource(TEMPORARY_FOLDER, TestFixtures.DATABASE, TestFixtures.TABLE);
 
-  private static final TypeInformation<Row> ROW_TYPE_INFO =
-      new RowTypeInfo(SimpleDataUtil.FLINK_SCHEMA.getFieldTypes());
-  private static final DataFormatConverters.RowConverter CONVERTER =
-      new DataFormatConverters.RowConverter(SimpleDataUtil.FLINK_SCHEMA.getFieldDataTypes());
-
-  private Table table;
-  private StreamExecutionEnvironment env;
   private TableLoader tableLoader;
 
   private final FileFormat format;
@@ -132,14 +120,6 @@ public class TestFlinkIcebergSink {
     tableLoader = catalogResource.tableLoader();
   }
 
-  private List<RowData> convertToRowData(List<Row> rows) {
-    return rows.stream().map(CONVERTER::toInternal).collect(Collectors.toList());
-  }
-
-  private BoundedTestSource<Row> createBoundedSource(List<Row> rows) {
-    return new BoundedTestSource<>(rows.toArray(new Row[0]));
-  }
-
   @Test
   public void testWriteRowData() throws Exception {
     List<Row> rows = Lists.newArrayList(Row.of(1, "hello"), Row.of(2, "world"), Row.of(3, "foo"));
@@ -160,19 +140,6 @@ public class TestFlinkIcebergSink {
     SimpleDataUtil.assertTableRows(table, convertToRowData(rows));
   }
 
-  private List<Row> createRows(String prefix) {
-    return Lists.newArrayList(
-        Row.of(1, prefix + "aaa"),
-        Row.of(1, prefix + "bbb"),
-        Row.of(1, prefix + "ccc"),
-        Row.of(2, prefix + "aaa"),
-        Row.of(2, prefix + "bbb"),
-        Row.of(2, prefix + "ccc"),
-        Row.of(3, prefix + "aaa"),
-        Row.of(3, prefix + "bbb"),
-        Row.of(3, prefix + "ccc"));
-  }
-
   private void testWriteRow(TableSchema tableSchema, DistributionMode distributionMode)
       throws Exception {
     List<Row> rows = createRows("");
diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkBase.java b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkBase.java
new file mode 100644
index 0000000000..b38aa6b50c
--- /dev/null
+++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkBase.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink;
+
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.util.DataFormatConverters;
+import org.apache.flink.types.Row;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.flink.SimpleDataUtil;
+import org.apache.iceberg.flink.source.BoundedTestSource;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+
+public class TestFlinkIcebergSinkBase {
+
+  protected Table table;
+  protected StreamExecutionEnvironment env;
+  protected static final TypeInformation<Row> ROW_TYPE_INFO =
+      new RowTypeInfo(SimpleDataUtil.FLINK_SCHEMA.getFieldTypes());
+
+  protected static final DataFormatConverters.RowConverter CONVERTER =
+      new DataFormatConverters.RowConverter(SimpleDataUtil.FLINK_SCHEMA.getFieldDataTypes());
+
+  protected BoundedTestSource<Row> createBoundedSource(List<Row> rows) {
+    return new BoundedTestSource<>(rows.toArray(new Row[0]));
+  }
+
+  protected List<Row> createRows(String prefix) {
+    return Lists.newArrayList(
+        Row.of(1, prefix + "aaa"),
+        Row.of(1, prefix + "bbb"),
+        Row.of(1, prefix + "ccc"),
+        Row.of(2, prefix + "aaa"),
+        Row.of(2, prefix + "bbb"),
+        Row.of(2, prefix + "ccc"),
+        Row.of(3, prefix + "aaa"),
+        Row.of(3, prefix + "bbb"),
+        Row.of(3, prefix + "ccc"));
+  }
+
+  protected List<RowData> convertToRowData(List<Row> rows) {
+    return rows.stream().map(CONVERTER::toInternal).collect(Collectors.toList());
+  }
+}
diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkBranch.java b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkBranch.java
new file mode 100644
index 0000000000..16b4542b00
--- /dev/null
+++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkBranch.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.types.Row;
+import org.apache.iceberg.DistributionMode;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.SnapshotRef;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.flink.HadoopCatalogResource;
+import org.apache.iceberg.flink.MiniClusterResource;
+import org.apache.iceberg.flink.SimpleDataUtil;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.flink.TestFixtures;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class TestFlinkIcebergSinkBranch extends TestFlinkIcebergSinkBase {
+
+  @ClassRule
+  public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE =
+      MiniClusterResource.createWithClassloaderCheckDisabled();
+
+  @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
+
+  @Rule
+  public final HadoopCatalogResource catalogResource =
+      new HadoopCatalogResource(TEMPORARY_FOLDER, TestFixtures.DATABASE, TestFixtures.TABLE);
+
+  private final String branch;
+  private TableLoader tableLoader;
+
+  @Parameterized.Parameters(name = "formatVersion = {0}, branch = {1}")
+  public static Object[] parameters() {
+    return new Object[] {"main", "testBranch"};
+  }
+
+  public TestFlinkIcebergSinkBranch(String branch) {
+    this.branch = branch;
+  }
+
+  @Before
+  public void before() throws IOException {
+    table =
+        catalogResource
+            .catalog()
+            .createTable(
+                TestFixtures.TABLE_IDENTIFIER,
+                SimpleDataUtil.SCHEMA,
+                PartitionSpec.unpartitioned(),
+                ImmutableMap.of(
+                    TableProperties.DEFAULT_FILE_FORMAT,
+                    FileFormat.AVRO.name(),
+                    TableProperties.FORMAT_VERSION,
+                    "1"));
+
+    env =
+        StreamExecutionEnvironment.getExecutionEnvironment(
+                MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG)
+            .enableCheckpointing(100);
+
+    tableLoader = catalogResource.tableLoader();
+  }
+
+  @Test
+  public void testWriteRowWithTableSchema() throws Exception {
+    testWriteRow(SimpleDataUtil.FLINK_SCHEMA, DistributionMode.NONE);
+    verifyOtherBranchUnmodified();
+  }
+
+  private void testWriteRow(TableSchema tableSchema, DistributionMode distributionMode)
+      throws Exception {
+    List<Row> rows = createRows("");
+    DataStream<Row> dataStream = env.addSource(createBoundedSource(rows), ROW_TYPE_INFO);
+
+    FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA)
+        .table(table)
+        .tableLoader(tableLoader)
+        .tableSchema(tableSchema)
+        .toBranch(branch)
+        .distributionMode(distributionMode)
+        .append();
+
+    // Execute the program.
+    env.execute("Test Iceberg DataStream.");
+
+    SimpleDataUtil.assertTableRows(table, convertToRowData(rows), branch);
+    SimpleDataUtil.assertTableRows(
+        table,
+        ImmutableList.of(),
+        branch.equals(SnapshotRef.MAIN_BRANCH) ? "test-branch" : SnapshotRef.MAIN_BRANCH);
+
+    verifyOtherBranchUnmodified();
+  }
+
+  private void verifyOtherBranchUnmodified() {
+    String otherBranch =
+        branch.equals(SnapshotRef.MAIN_BRANCH) ? "test-branch" : SnapshotRef.MAIN_BRANCH;
+    if (otherBranch.equals(SnapshotRef.MAIN_BRANCH)) {
+      Assert.assertNull(table.currentSnapshot());
+    }
+
+    Assert.assertTrue(table.snapshot(otherBranch) == null);
+  }
+}
diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java
index ccc3c0f23d..af3cc19635 100644
--- a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java
+++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java
@@ -18,38 +18,25 @@
  */
 package org.apache.iceberg.flink.sink;
 
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.flink.types.Row;
-import org.apache.flink.types.RowKind;
 import org.apache.iceberg.AssertHelpers;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.PartitionSpec;
-import org.apache.iceberg.Snapshot;
-import org.apache.iceberg.Table;
+import org.apache.iceberg.SnapshotRef;
 import org.apache.iceberg.TableProperties;
-import org.apache.iceberg.data.IcebergGenerics;
-import org.apache.iceberg.data.Record;
 import org.apache.iceberg.flink.HadoopCatalogResource;
 import org.apache.iceberg.flink.MiniClusterResource;
 import org.apache.iceberg.flink.SimpleDataUtil;
-import org.apache.iceberg.flink.TableLoader;
 import org.apache.iceberg.flink.TestFixtures;
 import org.apache.iceberg.flink.source.BoundedTestSource;
-import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.types.Types;
-import org.apache.iceberg.util.StructLikeSet;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.ClassRule;
@@ -60,7 +47,7 @@ import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
 @RunWith(Parameterized.class)
-public class TestFlinkIcebergSinkV2 {
+public class TestFlinkIcebergSinkV2 extends TestFlinkIcebergSinkV2Base {
 
   @ClassRule
   public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE =
@@ -72,29 +59,6 @@ public class TestFlinkIcebergSinkV2 {
   public final HadoopCatalogResource catalogResource =
       new HadoopCatalogResource(TEMPORARY_FOLDER, TestFixtures.DATABASE, TestFixtures.TABLE);
 
-  private static final int FORMAT_V2 = 2;
-  private static final TypeInformation<Row> ROW_TYPE_INFO =
-      new RowTypeInfo(SimpleDataUtil.FLINK_SCHEMA.getFieldTypes());
-
-  private static final Map<String, RowKind> ROW_KIND_MAP =
-      ImmutableMap.of(
-          "+I", RowKind.INSERT,
-          "-D", RowKind.DELETE,
-          "-U", RowKind.UPDATE_BEFORE,
-          "+U", RowKind.UPDATE_AFTER);
-
-  private static final int ROW_ID_POS = 0;
-  private static final int ROW_DATA_POS = 1;
-
-  private final FileFormat format;
-  private final int parallelism;
-  private final boolean partitioned;
-  private final String writeDistributionMode;
-
-  private Table table;
-  private StreamExecutionEnvironment env;
-  private TableLoader tableLoader;
-
   @Parameterized.Parameters(
       name = "FileFormat = {0}, Parallelism = {1}, Partitioned={2}, WriteDistributionMode ={3}")
   public static Object[][] parameters() {
@@ -155,67 +119,6 @@ public class TestFlinkIcebergSinkV2 {
     tableLoader = catalogResource.tableLoader();
   }
 
-  private List<Snapshot> findValidSnapshots() {
-    List<Snapshot> validSnapshots = Lists.newArrayList();
-    for (Snapshot snapshot : table.snapshots()) {
-      if (snapshot.allManifests(table.io()).stream()
-          .anyMatch(m -> snapshot.snapshotId() == m.snapshotId())) {
-        validSnapshots.add(snapshot);
-      }
-    }
-    return validSnapshots;
-  }
-
-  private void testChangeLogs(
-      List<String> equalityFieldColumns,
-      KeySelector<Row, Object> keySelector,
-      boolean insertAsUpsert,
-      List<List<Row>> elementsPerCheckpoint,
-      List<List<Record>> expectedRecordsPerCheckpoint)
-      throws Exception {
-    DataStream<Row> dataStream =
-        env.addSource(new BoundedTestSource<>(elementsPerCheckpoint), ROW_TYPE_INFO);
-
-    FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA)
-        .tableLoader(tableLoader)
-        .tableSchema(SimpleDataUtil.FLINK_SCHEMA)
-        .writeParallelism(parallelism)
-        .equalityFieldColumns(equalityFieldColumns)
-        .upsert(insertAsUpsert)
-        .append();
-
-    // Execute the program.
-    env.execute("Test Iceberg Change-Log DataStream.");
-
-    table.refresh();
-    List<Snapshot> snapshots = findValidSnapshots();
-    int expectedSnapshotNum = expectedRecordsPerCheckpoint.size();
-    Assert.assertEquals(
-        "Should have the expected snapshot number", expectedSnapshotNum, snapshots.size());
-
-    for (int i = 0; i < expectedSnapshotNum; i++) {
-      long snapshotId = snapshots.get(i).snapshotId();
-      List<Record> expectedRecords = expectedRecordsPerCheckpoint.get(i);
-      Assert.assertEquals(
-          "Should have the expected records for the checkpoint#" + i,
-          expectedRowSet(expectedRecords.toArray(new Record[0])),
-          actualRowSet(snapshotId, "*"));
-    }
-  }
-
-  private Row row(String rowKind, int id, String data) {
-    RowKind kind = ROW_KIND_MAP.get(rowKind);
-    if (kind == null) {
-      throw new IllegalArgumentException("Unknown row kind: " + rowKind);
-    }
-
-    return Row.ofKind(kind, id, data);
-  }
-
-  private Record record(int id, String data) {
-    return SimpleDataUtil.createRecord(id, data);
-  }
-
   @Test
   public void testCheckAndGetEqualityFieldIds() {
     table
@@ -249,79 +152,12 @@ public class TestFlinkIcebergSinkV2 {
 
   @Test
   public void testChangeLogOnIdKey() throws Exception {
-    List<List<Row>> elementsPerCheckpoint =
-        ImmutableList.of(
-            ImmutableList.of(
-                row("+I", 1, "aaa"),
-                row("-D", 1, "aaa"),
-                row("+I", 1, "bbb"),
-                row("+I", 2, "aaa"),
-                row("-D", 2, "aaa"),
-                row("+I", 2, "bbb")),
-            ImmutableList.of(
-                row("-U", 2, "bbb"), row("+U", 2, "ccc"), row("-D", 2, "ccc"), row("+I", 2, "ddd")),
-            ImmutableList.of(
-                row("-D", 1, "bbb"),
-                row("+I", 1, "ccc"),
-                row("-D", 1, "ccc"),
-                row("+I", 1, "ddd")));
-
-    List<List<Record>> expectedRecords =
-        ImmutableList.of(
-            ImmutableList.of(record(1, "bbb"), record(2, "bbb")),
-            ImmutableList.of(record(1, "bbb"), record(2, "ddd")),
-            ImmutableList.of(record(1, "ddd"), record(2, "ddd")));
-
-    if (partitioned && writeDistributionMode.equals(TableProperties.WRITE_DISTRIBUTION_MODE_HASH)) {
-      AssertHelpers.assertThrows(
-          "Should be error because equality field columns don't include all partition keys",
-          IllegalStateException.class,
-          "should be included in equality fields",
-          () -> {
-            testChangeLogs(
-                ImmutableList.of("id"),
-                row -> row.getField(ROW_ID_POS),
-                false,
-                elementsPerCheckpoint,
-                expectedRecords);
-            return null;
-          });
-    } else {
-      testChangeLogs(
-          ImmutableList.of("id"),
-          row -> row.getField(ROW_ID_POS),
-          false,
-          elementsPerCheckpoint,
-          expectedRecords);
-    }
+    testChangeLogOnIdKey(SnapshotRef.MAIN_BRANCH);
   }
 
   @Test
   public void testChangeLogOnDataKey() throws Exception {
-    List<List<Row>> elementsPerCheckpoint =
-        ImmutableList.of(
-            ImmutableList.of(
-                row("+I", 1, "aaa"),
-                row("-D", 1, "aaa"),
-                row("+I", 2, "bbb"),
-                row("+I", 1, "bbb"),
-                row("+I", 2, "aaa")),
-            ImmutableList.of(row("-U", 2, "aaa"), row("+U", 1, "ccc"), row("+I", 1, "aaa")),
-            ImmutableList.of(row("-D", 1, "bbb"), row("+I", 2, "aaa"), row("+I", 2, "ccc")));
-
-    List<List<Record>> expectedRecords =
-        ImmutableList.of(
-            ImmutableList.of(record(1, "bbb"), record(2, "aaa")),
-            ImmutableList.of(record(1, "aaa"), record(1, "bbb"), record(1, "ccc")),
-            ImmutableList.of(
-                record(1, "aaa"), record(1, "ccc"), record(2, "aaa"), record(2, "ccc")));
-
-    testChangeLogs(
-        ImmutableList.of("data"),
-        row -> row.getField(ROW_DATA_POS),
-        false,
-        elementsPerCheckpoint,
-        expectedRecords);
+    testChangeLogOnDataKey(SnapshotRef.MAIN_BRANCH);
   }
 
   @Test
@@ -344,59 +180,12 @@ public class TestFlinkIcebergSinkV2 {
 
   @Test
   public void testChangeLogOnIdDataKey() throws Exception {
-    List<List<Row>> elementsPerCheckpoint =
-        ImmutableList.of(
-            ImmutableList.of(
-                row("+I", 1, "aaa"),
-                row("-D", 1, "aaa"),
-                row("+I", 2, "bbb"),
-                row("+I", 1, "bbb"),
-                row("+I", 2, "aaa")),
-            ImmutableList.of(row("-U", 2, "aaa"), row("+U", 1, "ccc"), row("+I", 1, "aaa")),
-            ImmutableList.of(row("-D", 1, "bbb"), row("+I", 2, "aaa")));
-
-    List<List<Record>> expectedRecords =
-        ImmutableList.of(
-            ImmutableList.of(record(1, "bbb"), record(2, "aaa"), record(2, "bbb")),
-            ImmutableList.of(
-                record(1, "aaa"), record(1, "bbb"), record(1, "ccc"), record(2, "bbb")),
-            ImmutableList.of(
-                record(1, "aaa"), record(1, "ccc"), record(2, "aaa"), record(2, "bbb")));
-
-    testChangeLogs(
-        ImmutableList.of("data", "id"),
-        row -> Row.of(row.getField(ROW_ID_POS), row.getField(ROW_DATA_POS)),
-        false,
-        elementsPerCheckpoint,
-        expectedRecords);
+    testChangeLogOnIdDataKey(SnapshotRef.MAIN_BRANCH);
   }
 
   @Test
   public void testChangeLogOnSameKey() throws Exception {
-    List<List<Row>> elementsPerCheckpoint =
-        ImmutableList.of(
-            // Checkpoint #1
-            ImmutableList.of(row("+I", 1, "aaa"), row("-D", 1, "aaa"), row("+I", 1, "aaa")),
-            // Checkpoint #2
-            ImmutableList.of(row("-U", 1, "aaa"), row("+U", 1, "aaa")),
-            // Checkpoint #3
-            ImmutableList.of(row("-D", 1, "aaa"), row("+I", 1, "aaa")),
-            // Checkpoint #4
-            ImmutableList.of(row("-U", 1, "aaa"), row("+U", 1, "aaa"), row("+I", 1, "aaa")));
-
-    List<List<Record>> expectedRecords =
-        ImmutableList.of(
-            ImmutableList.of(record(1, "aaa")),
-            ImmutableList.of(record(1, "aaa")),
-            ImmutableList.of(record(1, "aaa")),
-            ImmutableList.of(record(1, "aaa"), record(1, "aaa")));
-
-    testChangeLogs(
-        ImmutableList.of("id", "data"),
-        row -> Row.of(row.getField(ROW_ID_POS), row.getField(ROW_DATA_POS)),
-        false,
-        elementsPerCheckpoint,
-        expectedRecords);
+    testChangeLogOnSameKey(SnapshotRef.MAIN_BRANCH);
   }
 
   @Test
@@ -426,97 +215,16 @@ public class TestFlinkIcebergSinkV2 {
 
   @Test
   public void testUpsertOnIdKey() throws Exception {
-    List<List<Row>> elementsPerCheckpoint =
-        ImmutableList.of(
-            ImmutableList.of(row("+I", 1, "aaa"), row("+U", 1, "bbb")),
-            ImmutableList.of(row("+I", 1, "ccc")),
-            ImmutableList.of(row("+U", 1, "ddd"), row("+I", 1, "eee")));
-
-    List<List<Record>> expectedRecords =
-        ImmutableList.of(
-            ImmutableList.of(record(1, "bbb")),
-            ImmutableList.of(record(1, "ccc")),
-            ImmutableList.of(record(1, "eee")));
-
-    if (!partitioned) {
-      testChangeLogs(
-          ImmutableList.of("id"),
-          row -> row.getField(ROW_ID_POS),
-          true,
-          elementsPerCheckpoint,
-          expectedRecords);
-    } else {
-      AssertHelpers.assertThrows(
-          "Should be error because equality field columns don't include all partition keys",
-          IllegalStateException.class,
-          "should be included in equality fields",
-          () -> {
-            testChangeLogs(
-                ImmutableList.of("id"),
-                row -> row.getField(ROW_ID_POS),
-                true,
-                elementsPerCheckpoint,
-                expectedRecords);
-            return null;
-          });
-    }
+    testUpsertOnIdKey(SnapshotRef.MAIN_BRANCH);
   }
 
   @Test
   public void testUpsertOnDataKey() throws Exception {
-    List<List<Row>> elementsPerCheckpoint =
-        ImmutableList.of(
-            ImmutableList.of(row("+I", 1, "aaa"), row("+I", 2, "aaa"), row("+I", 3, "bbb")),
-            ImmutableList.of(row("+U", 4, "aaa"), row("-U", 3, "bbb"), row("+U", 5, "bbb")),
-            ImmutableList.of(row("+I", 6, "aaa"), row("+U", 7, "bbb")));
-
-    List<List<Record>> expectedRecords =
-        ImmutableList.of(
-            ImmutableList.of(record(2, "aaa"), record(3, "bbb")),
-            ImmutableList.of(record(4, "aaa"), record(5, "bbb")),
-            ImmutableList.of(record(6, "aaa"), record(7, "bbb")));
-
-    testChangeLogs(
-        ImmutableList.of("data"),
-        row -> row.getField(ROW_DATA_POS),
-        true,
-        elementsPerCheckpoint,
-        expectedRecords);
+    testUpsertOnDataKey(SnapshotRef.MAIN_BRANCH);
   }
 
   @Test
   public void testUpsertOnIdDataKey() throws Exception {
-    List<List<Row>> elementsPerCheckpoint =
-        ImmutableList.of(
-            ImmutableList.of(row("+I", 1, "aaa"), row("+U", 1, "aaa"), row("+I", 2, "bbb")),
-            ImmutableList.of(row("+I", 1, "aaa"), row("-D", 2, "bbb"), row("+I", 2, "ccc")),
-            ImmutableList.of(row("+U", 1, "bbb"), row("-U", 1, "ccc"), row("-D", 1, "aaa")));
-
-    List<List<Record>> expectedRecords =
-        ImmutableList.of(
-            ImmutableList.of(record(1, "aaa"), record(2, "bbb")),
-            ImmutableList.of(record(1, "aaa"), record(2, "ccc")),
-            ImmutableList.of(record(1, "bbb"), record(2, "ccc")));
-
-    testChangeLogs(
-        ImmutableList.of("id", "data"),
-        row -> Row.of(row.getField(ROW_ID_POS), row.getField(ROW_DATA_POS)),
-        true,
-        elementsPerCheckpoint,
-        expectedRecords);
-  }
-
-  private StructLikeSet expectedRowSet(Record... records) {
-    return SimpleDataUtil.expectedRowSet(table, records);
-  }
-
-  private StructLikeSet actualRowSet(long snapshotId, String... columns) throws IOException {
-    table.refresh();
-    StructLikeSet set = StructLikeSet.create(table.schema().asStruct());
-    try (CloseableIterable<Record> reader =
-        IcebergGenerics.read(table).useSnapshot(snapshotId).select(columns).build()) {
-      reader.forEach(set::add);
-    }
-    return set;
+    testUpsertOnIdDataKey(SnapshotRef.MAIN_BRANCH);
   }
 }
diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2Base.java
similarity index 60%
copy from flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java
copy to flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2Base.java
index ccc3c0f23d..15380408e4 100644
--- a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java
+++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2Base.java
@@ -26,184 +26,50 @@ import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.flink.types.Row;
 import org.apache.flink.types.RowKind;
 import org.apache.iceberg.AssertHelpers;
 import org.apache.iceberg.FileFormat;
-import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.data.IcebergGenerics;
 import org.apache.iceberg.data.Record;
-import org.apache.iceberg.flink.HadoopCatalogResource;
-import org.apache.iceberg.flink.MiniClusterResource;
 import org.apache.iceberg.flink.SimpleDataUtil;
 import org.apache.iceberg.flink.TableLoader;
-import org.apache.iceberg.flink.TestFixtures;
 import org.apache.iceberg.flink.source.BoundedTestSource;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
-import org.apache.iceberg.relocated.com.google.common.collect.Sets;
-import org.apache.iceberg.types.Types;
 import org.apache.iceberg.util.StructLikeSet;
 import org.junit.Assert;
-import org.junit.Before;
-import org.junit.ClassRule;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
 
-@RunWith(Parameterized.class)
-public class TestFlinkIcebergSinkV2 {
+public class TestFlinkIcebergSinkV2Base {
 
-  @ClassRule
-  public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE =
-      MiniClusterResource.createWithClassloaderCheckDisabled();
-
-  @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
+  protected static final int FORMAT_V2 = 2;
+  protected static final TypeInformation<Row> ROW_TYPE_INFO =
+      new RowTypeInfo(SimpleDataUtil.FLINK_SCHEMA.getFieldTypes());
 
-  @Rule
-  public final HadoopCatalogResource catalogResource =
-      new HadoopCatalogResource(TEMPORARY_FOLDER, TestFixtures.DATABASE, TestFixtures.TABLE);
+  protected static final int ROW_ID_POS = 0;
+  protected static final int ROW_DATA_POS = 1;
 
-  private static final int FORMAT_V2 = 2;
-  private static final TypeInformation<Row> ROW_TYPE_INFO =
-      new RowTypeInfo(SimpleDataUtil.FLINK_SCHEMA.getFieldTypes());
+  protected int parallelism = 1;
+  protected TableLoader tableLoader;
+  protected Table table;
+  protected StreamExecutionEnvironment env;
+  protected FileFormat format;
+  protected boolean partitioned;
+  protected String writeDistributionMode;
 
-  private static final Map<String, RowKind> ROW_KIND_MAP =
+  protected static final Map<String, RowKind> ROW_KIND_MAP =
       ImmutableMap.of(
           "+I", RowKind.INSERT,
           "-D", RowKind.DELETE,
           "-U", RowKind.UPDATE_BEFORE,
           "+U", RowKind.UPDATE_AFTER);
 
-  private static final int ROW_ID_POS = 0;
-  private static final int ROW_DATA_POS = 1;
-
-  private final FileFormat format;
-  private final int parallelism;
-  private final boolean partitioned;
-  private final String writeDistributionMode;
-
-  private Table table;
-  private StreamExecutionEnvironment env;
-  private TableLoader tableLoader;
-
-  @Parameterized.Parameters(
-      name = "FileFormat = {0}, Parallelism = {1}, Partitioned={2}, WriteDistributionMode ={3}")
-  public static Object[][] parameters() {
-    return new Object[][] {
-      new Object[] {"avro", 1, true, TableProperties.WRITE_DISTRIBUTION_MODE_NONE},
-      new Object[] {"avro", 1, false, TableProperties.WRITE_DISTRIBUTION_MODE_NONE},
-      new Object[] {"avro", 4, true, TableProperties.WRITE_DISTRIBUTION_MODE_NONE},
-      new Object[] {"avro", 4, false, TableProperties.WRITE_DISTRIBUTION_MODE_NONE},
-      new Object[] {"orc", 1, true, TableProperties.WRITE_DISTRIBUTION_MODE_HASH},
-      new Object[] {"orc", 1, false, TableProperties.WRITE_DISTRIBUTION_MODE_HASH},
-      new Object[] {"orc", 4, true, TableProperties.WRITE_DISTRIBUTION_MODE_HASH},
-      new Object[] {"orc", 4, false, TableProperties.WRITE_DISTRIBUTION_MODE_HASH},
-      new Object[] {"parquet", 1, true, TableProperties.WRITE_DISTRIBUTION_MODE_RANGE},
-      new Object[] {"parquet", 1, false, TableProperties.WRITE_DISTRIBUTION_MODE_RANGE},
-      new Object[] {"parquet", 4, true, TableProperties.WRITE_DISTRIBUTION_MODE_RANGE},
-      new Object[] {"parquet", 4, false, TableProperties.WRITE_DISTRIBUTION_MODE_RANGE}
-    };
-  }
-
-  public TestFlinkIcebergSinkV2(
-      String format, int parallelism, boolean partitioned, String writeDistributionMode) {
-    this.format = FileFormat.fromString(format);
-    this.parallelism = parallelism;
-    this.partitioned = partitioned;
-    this.writeDistributionMode = writeDistributionMode;
-  }
-
-  @Before
-  public void setupTable() {
-    table =
-        catalogResource
-            .catalog()
-            .createTable(
-                TestFixtures.TABLE_IDENTIFIER,
-                SimpleDataUtil.SCHEMA,
-                partitioned
-                    ? PartitionSpec.builderFor(SimpleDataUtil.SCHEMA).identity("data").build()
-                    : PartitionSpec.unpartitioned(),
-                ImmutableMap.of(
-                    TableProperties.DEFAULT_FILE_FORMAT,
-                    format.name(),
-                    TableProperties.FORMAT_VERSION,
-                    String.valueOf(FORMAT_V2)));
-
-    table
-        .updateProperties()
-        .set(TableProperties.DEFAULT_FILE_FORMAT, format.name())
-        .set(TableProperties.WRITE_DISTRIBUTION_MODE, writeDistributionMode)
-        .commit();
-
-    env =
-        StreamExecutionEnvironment.getExecutionEnvironment(
-                MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG)
-            .enableCheckpointing(100L)
-            .setParallelism(parallelism)
-            .setMaxParallelism(parallelism);
-
-    tableLoader = catalogResource.tableLoader();
-  }
-
-  private List<Snapshot> findValidSnapshots() {
-    List<Snapshot> validSnapshots = Lists.newArrayList();
-    for (Snapshot snapshot : table.snapshots()) {
-      if (snapshot.allManifests(table.io()).stream()
-          .anyMatch(m -> snapshot.snapshotId() == m.snapshotId())) {
-        validSnapshots.add(snapshot);
-      }
-    }
-    return validSnapshots;
-  }
-
-  private void testChangeLogs(
-      List<String> equalityFieldColumns,
-      KeySelector<Row, Object> keySelector,
-      boolean insertAsUpsert,
-      List<List<Row>> elementsPerCheckpoint,
-      List<List<Record>> expectedRecordsPerCheckpoint)
-      throws Exception {
-    DataStream<Row> dataStream =
-        env.addSource(new BoundedTestSource<>(elementsPerCheckpoint), ROW_TYPE_INFO);
-
-    FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA)
-        .tableLoader(tableLoader)
-        .tableSchema(SimpleDataUtil.FLINK_SCHEMA)
-        .writeParallelism(parallelism)
-        .equalityFieldColumns(equalityFieldColumns)
-        .upsert(insertAsUpsert)
-        .append();
-
-    // Execute the program.
-    env.execute("Test Iceberg Change-Log DataStream.");
-
-    table.refresh();
-    List<Snapshot> snapshots = findValidSnapshots();
-    int expectedSnapshotNum = expectedRecordsPerCheckpoint.size();
-    Assert.assertEquals(
-        "Should have the expected snapshot number", expectedSnapshotNum, snapshots.size());
-
-    for (int i = 0; i < expectedSnapshotNum; i++) {
-      long snapshotId = snapshots.get(i).snapshotId();
-      List<Record> expectedRecords = expectedRecordsPerCheckpoint.get(i);
-      Assert.assertEquals(
-          "Should have the expected records for the checkpoint#" + i,
-          expectedRowSet(expectedRecords.toArray(new Record[0])),
-          actualRowSet(snapshotId, "*"));
-    }
-  }
-
-  private Row row(String rowKind, int id, String data) {
+  protected Row row(String rowKind, int id, String data) {
     RowKind kind = ROW_KIND_MAP.get(rowKind);
     if (kind == null) {
       throw new IllegalArgumentException("Unknown row kind: " + rowKind);
@@ -212,92 +78,85 @@ public class TestFlinkIcebergSinkV2 {
     return Row.ofKind(kind, id, data);
   }
 
-  private Record record(int id, String data) {
-    return SimpleDataUtil.createRecord(id, data);
-  }
-
-  @Test
-  public void testCheckAndGetEqualityFieldIds() {
-    table
-        .updateSchema()
-        .allowIncompatibleChanges()
-        .addRequiredColumn("type", Types.StringType.get())
-        .setIdentifierFields("type")
-        .commit();
-
-    DataStream<Row> dataStream =
-        env.addSource(new BoundedTestSource<>(ImmutableList.of()), ROW_TYPE_INFO);
-    FlinkSink.Builder builder =
-        FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA).table(table);
-
-    // Use schema identifier field IDs as equality field id list by default
-    Assert.assertEquals(
-        table.schema().identifierFieldIds(),
-        Sets.newHashSet(builder.checkAndGetEqualityFieldIds()));
-
-    // Use user-provided equality field column as equality field id list
-    builder.equalityFieldColumns(Lists.newArrayList("id"));
-    Assert.assertEquals(
-        Sets.newHashSet(table.schema().findField("id").fieldId()),
-        Sets.newHashSet(builder.checkAndGetEqualityFieldIds()));
+  protected void testUpsertOnIdDataKey(String branch) throws Exception {
+    List<List<Row>> elementsPerCheckpoint =
+        ImmutableList.of(
+            ImmutableList.of(row("+I", 1, "aaa"), row("+U", 1, "aaa"), row("+I", 2, "bbb")),
+            ImmutableList.of(row("+I", 1, "aaa"), row("-D", 2, "bbb"), row("+I", 2, "ccc")),
+            ImmutableList.of(row("+U", 1, "bbb"), row("-U", 1, "ccc"), row("-D", 1, "aaa")));
 
-    builder.equalityFieldColumns(Lists.newArrayList("type"));
-    Assert.assertEquals(
-        Sets.newHashSet(table.schema().findField("type").fieldId()),
-        Sets.newHashSet(builder.checkAndGetEqualityFieldIds()));
+    List<List<Record>> expectedRecords =
+        ImmutableList.of(
+            ImmutableList.of(record(1, "aaa"), record(2, "bbb")),
+            ImmutableList.of(record(1, "aaa"), record(2, "ccc")),
+            ImmutableList.of(record(1, "bbb"), record(2, "ccc")));
+    testChangeLogs(
+        ImmutableList.of("id", "data"),
+        row -> Row.of(row.getField(ROW_ID_POS), row.getField(ROW_DATA_POS)),
+        true,
+        elementsPerCheckpoint,
+        expectedRecords,
+        branch);
   }
 
-  @Test
-  public void testChangeLogOnIdKey() throws Exception {
+  protected void testChangeLogOnIdDataKey(String branch) throws Exception {
     List<List<Row>> elementsPerCheckpoint =
         ImmutableList.of(
             ImmutableList.of(
                 row("+I", 1, "aaa"),
                 row("-D", 1, "aaa"),
+                row("+I", 2, "bbb"),
                 row("+I", 1, "bbb"),
-                row("+I", 2, "aaa"),
-                row("-D", 2, "aaa"),
-                row("+I", 2, "bbb")),
+                row("+I", 2, "aaa")),
+            ImmutableList.of(row("-U", 2, "aaa"), row("+U", 1, "ccc"), row("+I", 1, "aaa")),
+            ImmutableList.of(row("-D", 1, "bbb"), row("+I", 2, "aaa")));
+
+    List<List<Record>> expectedRecords =
+        ImmutableList.of(
+            ImmutableList.of(record(1, "bbb"), record(2, "aaa"), record(2, "bbb")),
             ImmutableList.of(
-                row("-U", 2, "bbb"), row("+U", 2, "ccc"), row("-D", 2, "ccc"), row("+I", 2, "ddd")),
+                record(1, "aaa"), record(1, "bbb"), record(1, "ccc"), record(2, "bbb")),
             ImmutableList.of(
-                row("-D", 1, "bbb"),
-                row("+I", 1, "ccc"),
-                row("-D", 1, "ccc"),
-                row("+I", 1, "ddd")));
+                record(1, "aaa"), record(1, "ccc"), record(2, "aaa"), record(2, "bbb")));
+
+    testChangeLogs(
+        ImmutableList.of("data", "id"),
+        row -> Row.of(row.getField(ROW_ID_POS), row.getField(ROW_DATA_POS)),
+        false,
+        elementsPerCheckpoint,
+        expectedRecords,
+        branch);
+  }
+
+  protected void testChangeLogOnSameKey(String branch) throws Exception {
+    List<List<Row>> elementsPerCheckpoint =
+        ImmutableList.of(
+            // Checkpoint #1
+            ImmutableList.of(row("+I", 1, "aaa"), row("-D", 1, "aaa"), row("+I", 1, "aaa")),
+            // Checkpoint #2
+            ImmutableList.of(row("-U", 1, "aaa"), row("+U", 1, "aaa")),
+            // Checkpoint #3
+            ImmutableList.of(row("-D", 1, "aaa"), row("+I", 1, "aaa")),
+            // Checkpoint #4
+            ImmutableList.of(row("-U", 1, "aaa"), row("+U", 1, "aaa"), row("+I", 1, "aaa")));
 
     List<List<Record>> expectedRecords =
         ImmutableList.of(
-            ImmutableList.of(record(1, "bbb"), record(2, "bbb")),
-            ImmutableList.of(record(1, "bbb"), record(2, "ddd")),
-            ImmutableList.of(record(1, "ddd"), record(2, "ddd")));
+            ImmutableList.of(record(1, "aaa")),
+            ImmutableList.of(record(1, "aaa")),
+            ImmutableList.of(record(1, "aaa")),
+            ImmutableList.of(record(1, "aaa"), record(1, "aaa")));
 
-    if (partitioned && writeDistributionMode.equals(TableProperties.WRITE_DISTRIBUTION_MODE_HASH)) {
-      AssertHelpers.assertThrows(
-          "Should be error because equality field columns don't include all partition keys",
-          IllegalStateException.class,
-          "should be included in equality fields",
-          () -> {
-            testChangeLogs(
-                ImmutableList.of("id"),
-                row -> row.getField(ROW_ID_POS),
-                false,
-                elementsPerCheckpoint,
-                expectedRecords);
-            return null;
-          });
-    } else {
-      testChangeLogs(
-          ImmutableList.of("id"),
-          row -> row.getField(ROW_ID_POS),
-          false,
-          elementsPerCheckpoint,
-          expectedRecords);
-    }
+    testChangeLogs(
+        ImmutableList.of("id", "data"),
+        row -> Row.of(row.getField(ROW_ID_POS), row.getField(ROW_DATA_POS)),
+        false,
+        elementsPerCheckpoint,
+        expectedRecords,
+        branch);
   }
 
-  @Test
-  public void testChangeLogOnDataKey() throws Exception {
+  protected void testChangeLogOnDataKey(String branch) throws Exception {
     List<List<Row>> elementsPerCheckpoint =
         ImmutableList.of(
             ImmutableList.of(
@@ -321,111 +180,83 @@ public class TestFlinkIcebergSinkV2 {
         row -> row.getField(ROW_DATA_POS),
         false,
         elementsPerCheckpoint,
-        expectedRecords);
+        expectedRecords,
+        branch);
   }
 
-  @Test
-  public void testUpsertOnlyDeletesOnDataKey() throws Exception {
+  protected void testUpsertOnDataKey(String branch) throws Exception {
     List<List<Row>> elementsPerCheckpoint =
         ImmutableList.of(
-            ImmutableList.of(row("+I", 1, "aaa")),
-            ImmutableList.of(row("-D", 1, "aaa"), row("-D", 2, "bbb")));
+            ImmutableList.of(row("+I", 1, "aaa"), row("+I", 2, "aaa"), row("+I", 3, "bbb")),
+            ImmutableList.of(row("+U", 4, "aaa"), row("-U", 3, "bbb"), row("+U", 5, "bbb")),
+            ImmutableList.of(row("+I", 6, "aaa"), row("+U", 7, "bbb")));
 
     List<List<Record>> expectedRecords =
-        ImmutableList.of(ImmutableList.of(record(1, "aaa")), ImmutableList.of());
+        ImmutableList.of(
+            ImmutableList.of(record(2, "aaa"), record(3, "bbb")),
+            ImmutableList.of(record(4, "aaa"), record(5, "bbb")),
+            ImmutableList.of(record(6, "aaa"), record(7, "bbb")));
 
     testChangeLogs(
         ImmutableList.of("data"),
         row -> row.getField(ROW_DATA_POS),
         true,
         elementsPerCheckpoint,
-        expectedRecords);
+        expectedRecords,
+        branch);
   }
 
-  @Test
-  public void testChangeLogOnIdDataKey() throws Exception {
+  protected void testChangeLogOnIdKey(String branch) throws Exception {
     List<List<Row>> elementsPerCheckpoint =
         ImmutableList.of(
             ImmutableList.of(
                 row("+I", 1, "aaa"),
                 row("-D", 1, "aaa"),
-                row("+I", 2, "bbb"),
                 row("+I", 1, "bbb"),
-                row("+I", 2, "aaa")),
-            ImmutableList.of(row("-U", 2, "aaa"), row("+U", 1, "ccc"), row("+I", 1, "aaa")),
-            ImmutableList.of(row("-D", 1, "bbb"), row("+I", 2, "aaa")));
-
-    List<List<Record>> expectedRecords =
-        ImmutableList.of(
-            ImmutableList.of(record(1, "bbb"), record(2, "aaa"), record(2, "bbb")),
+                row("+I", 2, "aaa"),
+                row("-D", 2, "aaa"),
+                row("+I", 2, "bbb")),
             ImmutableList.of(
-                record(1, "aaa"), record(1, "bbb"), record(1, "ccc"), record(2, "bbb")),
+                row("-U", 2, "bbb"), row("+U", 2, "ccc"), row("-D", 2, "ccc"), row("+I", 2, "ddd")),
             ImmutableList.of(
-                record(1, "aaa"), record(1, "ccc"), record(2, "aaa"), record(2, "bbb")));
-
-    testChangeLogs(
-        ImmutableList.of("data", "id"),
-        row -> Row.of(row.getField(ROW_ID_POS), row.getField(ROW_DATA_POS)),
-        false,
-        elementsPerCheckpoint,
-        expectedRecords);
-  }
-
-  @Test
-  public void testChangeLogOnSameKey() throws Exception {
-    List<List<Row>> elementsPerCheckpoint =
-        ImmutableList.of(
-            // Checkpoint #1
-            ImmutableList.of(row("+I", 1, "aaa"), row("-D", 1, "aaa"), row("+I", 1, "aaa")),
-            // Checkpoint #2
-            ImmutableList.of(row("-U", 1, "aaa"), row("+U", 1, "aaa")),
-            // Checkpoint #3
-            ImmutableList.of(row("-D", 1, "aaa"), row("+I", 1, "aaa")),
-            // Checkpoint #4
-            ImmutableList.of(row("-U", 1, "aaa"), row("+U", 1, "aaa"), row("+I", 1, "aaa")));
+                row("-D", 1, "bbb"),
+                row("+I", 1, "ccc"),
+                row("-D", 1, "ccc"),
+                row("+I", 1, "ddd")));
 
     List<List<Record>> expectedRecords =
         ImmutableList.of(
-            ImmutableList.of(record(1, "aaa")),
-            ImmutableList.of(record(1, "aaa")),
-            ImmutableList.of(record(1, "aaa")),
-            ImmutableList.of(record(1, "aaa"), record(1, "aaa")));
-
-    testChangeLogs(
-        ImmutableList.of("id", "data"),
-        row -> Row.of(row.getField(ROW_ID_POS), row.getField(ROW_DATA_POS)),
-        false,
-        elementsPerCheckpoint,
-        expectedRecords);
-  }
+            ImmutableList.of(record(1, "bbb"), record(2, "bbb")),
+            ImmutableList.of(record(1, "bbb"), record(2, "ddd")),
+            ImmutableList.of(record(1, "ddd"), record(2, "ddd")));
 
-  @Test
-  public void testUpsertModeCheck() throws Exception {
-    DataStream<Row> dataStream =
-        env.addSource(new BoundedTestSource<>(ImmutableList.of()), ROW_TYPE_INFO);
-    FlinkSink.Builder builder =
-        FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA)
-            .tableLoader(tableLoader)
-            .tableSchema(SimpleDataUtil.FLINK_SCHEMA)
-            .writeParallelism(parallelism)
-            .upsert(true);
-
-    AssertHelpers.assertThrows(
-        "Should be error because upsert mode and overwrite mode enable at the same time.",
-        IllegalStateException.class,
-        "OVERWRITE mode shouldn't be enable",
-        () ->
-            builder.equalityFieldColumns(ImmutableList.of("id", "data")).overwrite(true).append());
-
-    AssertHelpers.assertThrows(
-        "Should be error because equality field columns are empty.",
-        IllegalStateException.class,
-        "Equality field columns shouldn't be empty",
-        () -> builder.equalityFieldColumns(ImmutableList.of()).overwrite(false).append());
+    if (partitioned && writeDistributionMode.equals(TableProperties.WRITE_DISTRIBUTION_MODE_HASH)) {
+      AssertHelpers.assertThrows(
+          "Should be error because equality field columns don't include all partition keys",
+          IllegalStateException.class,
+          "should be included in equality fields",
+          () -> {
+            testChangeLogs(
+                ImmutableList.of("id"),
+                row -> row.getField(ROW_ID_POS),
+                false,
+                elementsPerCheckpoint,
+                expectedRecords,
+                branch);
+            return null;
+          });
+    } else {
+      testChangeLogs(
+          ImmutableList.of("id"),
+          row -> row.getField(ROW_ID_POS),
+          false,
+          elementsPerCheckpoint,
+          expectedRecords,
+          branch);
+    }
   }
 
-  @Test
-  public void testUpsertOnIdKey() throws Exception {
+  protected void testUpsertOnIdKey(String branch) throws Exception {
     List<List<Row>> elementsPerCheckpoint =
         ImmutableList.of(
             ImmutableList.of(row("+I", 1, "aaa"), row("+U", 1, "bbb")),
@@ -444,7 +275,8 @@ public class TestFlinkIcebergSinkV2 {
           row -> row.getField(ROW_ID_POS),
           true,
           elementsPerCheckpoint,
-          expectedRecords);
+          expectedRecords,
+          branch);
     } else {
       AssertHelpers.assertThrows(
           "Should be error because equality field columns don't include all partition keys",
@@ -456,54 +288,65 @@ public class TestFlinkIcebergSinkV2 {
                 row -> row.getField(ROW_ID_POS),
                 true,
                 elementsPerCheckpoint,
-                expectedRecords);
+                expectedRecords,
+                branch);
             return null;
           });
     }
   }
 
-  @Test
-  public void testUpsertOnDataKey() throws Exception {
-    List<List<Row>> elementsPerCheckpoint =
-        ImmutableList.of(
-            ImmutableList.of(row("+I", 1, "aaa"), row("+I", 2, "aaa"), row("+I", 3, "bbb")),
-            ImmutableList.of(row("+U", 4, "aaa"), row("-U", 3, "bbb"), row("+U", 5, "bbb")),
-            ImmutableList.of(row("+I", 6, "aaa"), row("+U", 7, "bbb")));
+  protected void testChangeLogs(
+      List<String> equalityFieldColumns,
+      KeySelector<Row, Object> keySelector,
+      boolean insertAsUpsert,
+      List<List<Row>> elementsPerCheckpoint,
+      List<List<Record>> expectedRecordsPerCheckpoint,
+      String branch)
+      throws Exception {
+    DataStream<Row> dataStream =
+        env.addSource(new BoundedTestSource<>(elementsPerCheckpoint), ROW_TYPE_INFO);
 
-    List<List<Record>> expectedRecords =
-        ImmutableList.of(
-            ImmutableList.of(record(2, "aaa"), record(3, "bbb")),
-            ImmutableList.of(record(4, "aaa"), record(5, "bbb")),
-            ImmutableList.of(record(6, "aaa"), record(7, "bbb")));
+    FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA)
+        .tableLoader(tableLoader)
+        .tableSchema(SimpleDataUtil.FLINK_SCHEMA)
+        .writeParallelism(parallelism)
+        .equalityFieldColumns(equalityFieldColumns)
+        .upsert(insertAsUpsert)
+        .toBranch(branch)
+        .append();
 
-    testChangeLogs(
-        ImmutableList.of("data"),
-        row -> row.getField(ROW_DATA_POS),
-        true,
-        elementsPerCheckpoint,
-        expectedRecords);
-  }
+    // Execute the program.
+    env.execute("Test Iceberg Change-Log DataStream.");
 
-  @Test
-  public void testUpsertOnIdDataKey() throws Exception {
-    List<List<Row>> elementsPerCheckpoint =
-        ImmutableList.of(
-            ImmutableList.of(row("+I", 1, "aaa"), row("+U", 1, "aaa"), row("+I", 2, "bbb")),
-            ImmutableList.of(row("+I", 1, "aaa"), row("-D", 2, "bbb"), row("+I", 2, "ccc")),
-            ImmutableList.of(row("+U", 1, "bbb"), row("-U", 1, "ccc"), row("-D", 1, "aaa")));
+    table.refresh();
+    List<Snapshot> snapshots = findValidSnapshots();
+    int expectedSnapshotNum = expectedRecordsPerCheckpoint.size();
+    Assert.assertEquals(
+        "Should have the expected snapshot number", expectedSnapshotNum, snapshots.size());
 
-    List<List<Record>> expectedRecords =
-        ImmutableList.of(
-            ImmutableList.of(record(1, "aaa"), record(2, "bbb")),
-            ImmutableList.of(record(1, "aaa"), record(2, "ccc")),
-            ImmutableList.of(record(1, "bbb"), record(2, "ccc")));
+    for (int i = 0; i < expectedSnapshotNum; i++) {
+      long snapshotId = snapshots.get(i).snapshotId();
+      List<Record> expectedRecords = expectedRecordsPerCheckpoint.get(i);
+      Assert.assertEquals(
+          "Should have the expected records for the checkpoint#" + i,
+          expectedRowSet(expectedRecords.toArray(new Record[0])),
+          actualRowSet(snapshotId, "*"));
+    }
+  }
 
-    testChangeLogs(
-        ImmutableList.of("id", "data"),
-        row -> Row.of(row.getField(ROW_ID_POS), row.getField(ROW_DATA_POS)),
-        true,
-        elementsPerCheckpoint,
-        expectedRecords);
+  protected Record record(int id, String data) {
+    return SimpleDataUtil.createRecord(id, data);
+  }
+
+  private List<Snapshot> findValidSnapshots() {
+    List<Snapshot> validSnapshots = Lists.newArrayList();
+    for (Snapshot snapshot : table.snapshots()) {
+      if (snapshot.allManifests(table.io()).stream()
+          .anyMatch(m -> snapshot.snapshotId() == m.snapshotId())) {
+        validSnapshots.add(snapshot);
+      }
+    }
+    return validSnapshots;
   }
 
   private StructLikeSet expectedRowSet(Record... records) {
diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2Branch.java b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2Branch.java
new file mode 100644
index 0000000000..fed3338482
--- /dev/null
+++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2Branch.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink;
+
+import java.io.IOException;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.SnapshotRef;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.flink.HadoopCatalogResource;
+import org.apache.iceberg.flink.MiniClusterResource;
+import org.apache.iceberg.flink.SimpleDataUtil;
+import org.apache.iceberg.flink.TestFixtures;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class TestFlinkIcebergSinkV2Branch extends TestFlinkIcebergSinkV2Base {
+
+  @ClassRule
+  public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE =
+      MiniClusterResource.createWithClassloaderCheckDisabled();
+
+  @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
+
+  @Rule
+  public final HadoopCatalogResource catalogResource =
+      new HadoopCatalogResource(TEMPORARY_FOLDER, TestFixtures.DATABASE, TestFixtures.TABLE);
+
+  private final String branch;
+
+  @Parameterized.Parameters(name = "branch = {0}")
+  public static Object[] parameters() {
+    return new Object[] {"main", "testBranch"};
+  }
+
+  public TestFlinkIcebergSinkV2Branch(String branch) {
+    this.branch = branch;
+  }
+
+  @Before
+  public void before() throws IOException {
+    table =
+        catalogResource
+            .catalog()
+            .createTable(
+                TestFixtures.TABLE_IDENTIFIER,
+                SimpleDataUtil.SCHEMA,
+                PartitionSpec.unpartitioned(),
+                ImmutableMap.of(
+                    TableProperties.DEFAULT_FILE_FORMAT,
+                    FileFormat.AVRO.name(),
+                    TableProperties.FORMAT_VERSION,
+                    "2"));
+
+    env =
+        StreamExecutionEnvironment.getExecutionEnvironment(
+                MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG)
+            .enableCheckpointing(100);
+
+    tableLoader = catalogResource.tableLoader();
+  }
+
+  @Test
+  public void testChangeLogOnIdKey() throws Exception {
+    testChangeLogOnIdKey(branch);
+    verifyOtherBranchUnmodified();
+  }
+
+  @Test
+  public void testChangeLogOnDataKey() throws Exception {
+    testChangeLogOnDataKey(branch);
+    verifyOtherBranchUnmodified();
+  }
+
+  @Test
+  public void testChangeLogOnIdDataKey() throws Exception {
+    testChangeLogOnIdDataKey(branch);
+    verifyOtherBranchUnmodified();
+  }
+
+  @Test
+  public void testUpsertOnIdKey() throws Exception {
+    testUpsertOnIdKey(branch);
+    verifyOtherBranchUnmodified();
+  }
+
+  @Test
+  public void testUpsertOnDataKey() throws Exception {
+    testUpsertOnDataKey(branch);
+    verifyOtherBranchUnmodified();
+  }
+
+  @Test
+  public void testUpsertOnIdDataKey() throws Exception {
+    testUpsertOnIdDataKey(branch);
+    verifyOtherBranchUnmodified();
+  }
+
+  private void verifyOtherBranchUnmodified() {
+    String otherBranch =
+        branch.equals(SnapshotRef.MAIN_BRANCH) ? "test-branch" : SnapshotRef.MAIN_BRANCH;
+    if (otherBranch.equals(SnapshotRef.MAIN_BRANCH)) {
+      Assert.assertNull(table.currentSnapshot());
+    }
+
+    Assert.assertTrue(table.snapshot(otherBranch) == null);
+  }
+}
diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java
index 66baaeb0e9..a4f29d47f4 100644
--- a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java
+++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java
@@ -76,22 +76,24 @@ public class TestIcebergFilesCommitter extends TableTestBase {
   private File flinkManifestFolder;
 
   private final FileFormat format;
+  private final String branch;
 
-  @Parameterized.Parameters(name = "FileFormat = {0}, FormatVersion={1}")
+  @Parameterized.Parameters(name = "FileFormat = {0}, FormatVersion = {1}, branch = {2}")
   public static Object[][] parameters() {
     return new Object[][] {
-      new Object[] {"avro", 1},
-      new Object[] {"avro", 2},
-      new Object[] {"parquet", 1},
-      new Object[] {"parquet", 2},
-      new Object[] {"orc", 1},
-      new Object[] {"orc", 2}
+      new Object[] {"avro", 1, "main"},
+      new Object[] {"avro", 2, "test-branch"},
+      new Object[] {"parquet", 1, "main"},
+      new Object[] {"parquet", 2, "test-branch"},
+      new Object[] {"orc", 1, "main"},
+      new Object[] {"orc", 2, "test-branch"}
     };
   }
 
-  public TestIcebergFilesCommitter(String format, int formatVersion) {
+  public TestIcebergFilesCommitter(String format, int formatVersion, String branch) {
     super(formatVersion);
     this.format = FileFormat.fromString(format);
+    this.branch = branch;
   }
 
   @Override
@@ -125,7 +127,7 @@ public class TestIcebergFilesCommitter extends TableTestBase {
       harness.open();
       operatorId = harness.getOperator().getOperatorID();
 
-      SimpleDataUtil.assertTableRows(table, Lists.newArrayList());
+      SimpleDataUtil.assertTableRows(table, Lists.newArrayList(), branch);
       assertSnapshotSize(0);
       assertMaxCommittedCheckpointId(jobId, operatorId, -1L);
 
@@ -204,12 +206,12 @@ public class TestIcebergFilesCommitter extends TableTestBase {
         harness.notifyOfCompletedCheckpoint(i);
         assertFlinkManifests(0);
 
-        SimpleDataUtil.assertTableRows(table, ImmutableList.copyOf(rows));
+        SimpleDataUtil.assertTableRows(table, ImmutableList.copyOf(rows), branch);
         assertSnapshotSize(i);
         assertMaxCommittedCheckpointId(jobID, operatorId, i);
         Assert.assertEquals(
             TestIcebergFilesCommitter.class.getName(),
-            table.currentSnapshot().summary().get("flink.test"));
+            SimpleDataUtil.latestSnapshot(table, branch).summary().get("flink.test"));
       }
     }
   }
@@ -255,13 +257,13 @@ public class TestIcebergFilesCommitter extends TableTestBase {
 
       // 3. notifyCheckpointComplete for checkpoint#1
       harness.notifyOfCompletedCheckpoint(firstCheckpointId);
-      SimpleDataUtil.assertTableRows(table, ImmutableList.of(row1));
+      SimpleDataUtil.assertTableRows(table, ImmutableList.of(row1), branch);
       assertMaxCommittedCheckpointId(jobId, operatorId, firstCheckpointId);
       assertFlinkManifests(1);
 
       // 4. notifyCheckpointComplete for checkpoint#2
       harness.notifyOfCompletedCheckpoint(secondCheckpointId);
-      SimpleDataUtil.assertTableRows(table, ImmutableList.of(row1, row2));
+      SimpleDataUtil.assertTableRows(table, ImmutableList.of(row1, row2), branch);
       assertMaxCommittedCheckpointId(jobId, operatorId, secondCheckpointId);
       assertFlinkManifests(0);
     }
@@ -308,13 +310,13 @@ public class TestIcebergFilesCommitter extends TableTestBase {
 
       // 3. notifyCheckpointComplete for checkpoint#2
       harness.notifyOfCompletedCheckpoint(secondCheckpointId);
-      SimpleDataUtil.assertTableRows(table, ImmutableList.of(row1, row2));
+      SimpleDataUtil.assertTableRows(table, ImmutableList.of(row1, row2), branch);
       assertMaxCommittedCheckpointId(jobId, operatorId, secondCheckpointId);
       assertFlinkManifests(0);
 
       // 4. notifyCheckpointComplete for checkpoint#1
       harness.notifyOfCompletedCheckpoint(firstCheckpointId);
-      SimpleDataUtil.assertTableRows(table, ImmutableList.of(row1, row2));
+      SimpleDataUtil.assertTableRows(table, ImmutableList.of(row1, row2), branch);
       assertMaxCommittedCheckpointId(jobId, operatorId, secondCheckpointId);
       assertFlinkManifests(0);
     }
@@ -348,7 +350,7 @@ public class TestIcebergFilesCommitter extends TableTestBase {
       harness.notifyOfCompletedCheckpoint(checkpointId);
       assertFlinkManifests(0);
 
-      SimpleDataUtil.assertTableRows(table, ImmutableList.of(row));
+      SimpleDataUtil.assertTableRows(table, ImmutableList.of(row), branch);
       assertSnapshotSize(1);
       assertMaxCommittedCheckpointId(jobId, operatorId, checkpointId);
     }
@@ -360,7 +362,7 @@ public class TestIcebergFilesCommitter extends TableTestBase {
       harness.initializeState(snapshot);
       harness.open();
 
-      SimpleDataUtil.assertTableRows(table, expectedRows);
+      SimpleDataUtil.assertTableRows(table, expectedRows, branch);
       assertSnapshotSize(1);
       assertMaxCommittedCheckpointId(jobId, operatorId, checkpointId);
 
@@ -375,7 +377,7 @@ public class TestIcebergFilesCommitter extends TableTestBase {
       harness.notifyOfCompletedCheckpoint(checkpointId);
       assertFlinkManifests(0);
 
-      SimpleDataUtil.assertTableRows(table, expectedRows);
+      SimpleDataUtil.assertTableRows(table, expectedRows, branch);
       assertSnapshotSize(2);
       assertMaxCommittedCheckpointId(jobId, operatorId, checkpointId);
     }
@@ -406,7 +408,7 @@ public class TestIcebergFilesCommitter extends TableTestBase {
       harness.processElement(of(dataFile), ++timestamp);
 
       snapshot = harness.snapshot(++checkpointId, ++timestamp);
-      SimpleDataUtil.assertTableRows(table, ImmutableList.of());
+      SimpleDataUtil.assertTableRows(table, ImmutableList.of(), branch);
       assertMaxCommittedCheckpointId(jobId, operatorId, -1L);
       assertFlinkManifests(1);
     }
@@ -421,7 +423,7 @@ public class TestIcebergFilesCommitter extends TableTestBase {
       // transaction.
       assertFlinkManifests(0);
 
-      SimpleDataUtil.assertTableRows(table, expectedRows);
+      SimpleDataUtil.assertTableRows(table, expectedRows, branch);
       assertMaxCommittedCheckpointId(jobId, operatorId, checkpointId);
 
       harness.snapshot(++checkpointId, ++timestamp);
@@ -431,7 +433,7 @@ public class TestIcebergFilesCommitter extends TableTestBase {
       harness.notifyOfCompletedCheckpoint(checkpointId);
       assertFlinkManifests(0);
 
-      SimpleDataUtil.assertTableRows(table, expectedRows);
+      SimpleDataUtil.assertTableRows(table, expectedRows, branch);
       assertSnapshotSize(2);
       assertMaxCommittedCheckpointId(jobId, operatorId, checkpointId);
 
@@ -459,7 +461,7 @@ public class TestIcebergFilesCommitter extends TableTestBase {
 
       assertMaxCommittedCheckpointId(newJobId, operatorId, -1);
       assertMaxCommittedCheckpointId(jobId, operatorId, checkpointId);
-      SimpleDataUtil.assertTableRows(table, expectedRows);
+      SimpleDataUtil.assertTableRows(table, expectedRows, branch);
       assertSnapshotSize(3);
 
       RowData row = SimpleDataUtil.createRowData(3, "foo");
@@ -473,7 +475,7 @@ public class TestIcebergFilesCommitter extends TableTestBase {
       harness.notifyOfCompletedCheckpoint(checkpointId);
       assertFlinkManifests(0);
 
-      SimpleDataUtil.assertTableRows(table, expectedRows);
+      SimpleDataUtil.assertTableRows(table, expectedRows, branch);
       assertSnapshotSize(4);
       assertMaxCommittedCheckpointId(newJobId, operatorId, checkpointId);
     }
@@ -509,7 +511,7 @@ public class TestIcebergFilesCommitter extends TableTestBase {
         harness.notifyOfCompletedCheckpoint(checkpointId);
         assertFlinkManifests(0);
 
-        SimpleDataUtil.assertTableRows(table, tableRows);
+        SimpleDataUtil.assertTableRows(table, tableRows, branch);
         assertSnapshotSize(i);
         assertMaxCommittedCheckpointId(oldJobId, oldOperatorId, checkpointId);
       }
@@ -540,7 +542,7 @@ public class TestIcebergFilesCommitter extends TableTestBase {
 
       harness.notifyOfCompletedCheckpoint(checkpointId);
       assertFlinkManifests(0);
-      SimpleDataUtil.assertTableRows(table, tableRows);
+      SimpleDataUtil.assertTableRows(table, tableRows, branch);
       assertSnapshotSize(4);
       assertMaxCommittedCheckpointId(newJobId, newOperatorId, checkpointId);
     }
@@ -577,7 +579,7 @@ public class TestIcebergFilesCommitter extends TableTestBase {
 
         harness.notifyOfCompletedCheckpoint(checkpointId + 1);
         assertFlinkManifests(0);
-        SimpleDataUtil.assertTableRows(table, tableRows);
+        SimpleDataUtil.assertTableRows(table, tableRows, branch);
         assertSnapshotSize(i + 1);
         assertMaxCommittedCheckpointId(jobId, operatorId, checkpointId + 1);
       }
@@ -628,7 +630,7 @@ public class TestIcebergFilesCommitter extends TableTestBase {
       assertFlinkManifests(1);
 
       // Only the first row is committed at this point
-      SimpleDataUtil.assertTableRows(table, ImmutableList.of(row1));
+      SimpleDataUtil.assertTableRows(table, ImmutableList.of(row1), branch);
       assertSnapshotSize(1);
       assertMaxCommittedCheckpointId(jobId, operatorId1, checkpointId);
       assertMaxCommittedCheckpointId(jobId, operatorId2, -1);
@@ -651,7 +653,7 @@ public class TestIcebergFilesCommitter extends TableTestBase {
       // transaction.
       assertFlinkManifests(0);
 
-      SimpleDataUtil.assertTableRows(table, expectedRows);
+      SimpleDataUtil.assertTableRows(table, expectedRows, branch);
       assertSnapshotSize(2);
       assertMaxCommittedCheckpointId(jobId, operatorId1, checkpointId);
       assertMaxCommittedCheckpointId(jobId, operatorId2, checkpointId);
@@ -675,7 +677,7 @@ public class TestIcebergFilesCommitter extends TableTestBase {
       harness2.notifyOfCompletedCheckpoint(checkpointId);
       assertFlinkManifests(0);
 
-      SimpleDataUtil.assertTableRows(table, expectedRows);
+      SimpleDataUtil.assertTableRows(table, expectedRows, branch);
       assertSnapshotSize(4);
       assertMaxCommittedCheckpointId(jobId, operatorId1, checkpointId);
       assertMaxCommittedCheckpointId(jobId, operatorId2, checkpointId);
@@ -702,12 +704,12 @@ public class TestIcebergFilesCommitter extends TableTestBase {
       ((BoundedOneInput) harness.getOneInputOperator()).endInput();
 
       assertFlinkManifests(0);
-      SimpleDataUtil.assertTableRows(table, tableRows);
+      SimpleDataUtil.assertTableRows(table, tableRows, branch);
       assertSnapshotSize(1);
       assertMaxCommittedCheckpointId(jobId, operatorId, Long.MAX_VALUE);
       Assert.assertEquals(
           TestIcebergFilesCommitter.class.getName(),
-          table.currentSnapshot().summary().get("flink.test"));
+          SimpleDataUtil.latestSnapshot(table, branch).summary().get("flink.test"));
     }
   }
 
@@ -748,7 +750,7 @@ public class TestIcebergFilesCommitter extends TableTestBase {
 
       // 3. notifyCheckpointComplete for checkpoint#1
       harness.notifyOfCompletedCheckpoint(checkpoint);
-      SimpleDataUtil.assertTableRows(table, ImmutableList.of(row1));
+      SimpleDataUtil.assertTableRows(table, ImmutableList.of(row1), branch);
       assertMaxCommittedCheckpointId(jobId, operatorId, checkpoint);
       assertFlinkManifests(0);
     }
@@ -794,7 +796,7 @@ public class TestIcebergFilesCommitter extends TableTestBase {
 
       // 3. notifyCheckpointComplete for checkpoint#1
       harness.notifyOfCompletedCheckpoint(checkpoint);
-      SimpleDataUtil.assertTableRows(table, ImmutableList.of(row1));
+      SimpleDataUtil.assertTableRows(table, ImmutableList.of(row1), branch);
       assertMaxCommittedCheckpointId(jobId, operatorId, checkpoint);
       assertFlinkManifests(0);
 
@@ -816,7 +818,7 @@ public class TestIcebergFilesCommitter extends TableTestBase {
 
       // 6. notifyCheckpointComplete for checkpoint#2
       harness.notifyOfCompletedCheckpoint(checkpoint);
-      SimpleDataUtil.assertTableRows(table, ImmutableList.of(row2));
+      SimpleDataUtil.assertTableRows(table, ImmutableList.of(row2), branch);
       assertMaxCommittedCheckpointId(jobId, operatorId, checkpoint);
       assertFlinkManifests(0);
     }
@@ -867,7 +869,7 @@ public class TestIcebergFilesCommitter extends TableTestBase {
 
       // Notify the 2nd snapshot to complete.
       harness.notifyOfCompletedCheckpoint(checkpoint);
-      SimpleDataUtil.assertTableRows(table, ImmutableList.of(insert1, insert4));
+      SimpleDataUtil.assertTableRows(table, ImmutableList.of(insert1, insert4), branch);
       assertMaxCommittedCheckpointId(jobId, operatorId, checkpoint);
       assertFlinkManifests(0);
       Assert.assertEquals(
@@ -951,7 +953,7 @@ public class TestIcebergFilesCommitter extends TableTestBase {
     table.refresh();
     long actualId =
         IcebergFilesCommitter.getMaxCommittedCheckpointId(
-            table, jobID.toString(), operatorID.toHexString());
+            table, jobID.toString(), operatorID.toHexString(), branch);
     Assert.assertEquals(expectedId, actualId);
   }
 
@@ -962,7 +964,7 @@ public class TestIcebergFilesCommitter extends TableTestBase {
 
   private OneInputStreamOperatorTestHarness<WriteResult, Void> createStreamSink(JobID jobID)
       throws Exception {
-    TestOperatorFactory factory = TestOperatorFactory.of(table.location());
+    TestOperatorFactory factory = TestOperatorFactory.of(table.location(), branch);
     return new OneInputStreamOperatorTestHarness<>(factory, createEnvironment(jobID));
   }
 
@@ -982,13 +984,15 @@ public class TestIcebergFilesCommitter extends TableTestBase {
   private static class TestOperatorFactory extends AbstractStreamOperatorFactory<Void>
       implements OneInputStreamOperatorFactory<WriteResult, Void> {
     private final String tablePath;
+    private final String branch;
 
-    private TestOperatorFactory(String tablePath) {
+    private TestOperatorFactory(String tablePath, String branch) {
       this.tablePath = tablePath;
+      this.branch = branch;
     }
 
-    private static TestOperatorFactory of(String tablePath) {
-      return new TestOperatorFactory(tablePath);
+    private static TestOperatorFactory of(String tablePath, String branch) {
+      return new TestOperatorFactory(tablePath, branch);
     }
 
     @Override
@@ -1000,7 +1004,8 @@ public class TestIcebergFilesCommitter extends TableTestBase {
               new TestTableLoader(tablePath),
               false,
               Collections.singletonMap("flink.test", TestIcebergFilesCommitter.class.getName()),
-              ThreadPools.WORKER_THREAD_POOL_SIZE);
+              ThreadPools.WORKER_THREAD_POOL_SIZE,
+              branch);
       committer.setup(param.getContainingTask(), param.getStreamConfig(), param.getOutput());
       return (T) committer;
     }
diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java
index 448b2aa2d8..4b5c7e4a0d 100644
--- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java
@@ -172,4 +172,12 @@ public class FlinkWriteConf {
         .defaultValue(FlinkConfigOptions.TABLE_EXEC_ICEBERG_WORKER_POOL_SIZE.defaultValue())
         .parse();
   }
+
+  public String branch() {
+    return confParser
+        .stringConf()
+        .option(FlinkWriteOptions.BRANCH.key())
+        .defaultValue(FlinkWriteOptions.BRANCH.defaultValue())
+        .parse();
+  }
 }
diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java
index f3cc52972b..86cb2fb0eb 100644
--- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java
@@ -20,6 +20,7 @@ package org.apache.iceberg.flink;
 
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
+import org.apache.iceberg.SnapshotRef;
 
 /** Flink sink write options */
 public class FlinkWriteOptions {
@@ -56,4 +57,8 @@ public class FlinkWriteOptions {
   // Overrides the table's write.distribution-mode
   public static final ConfigOption<String> DISTRIBUTION_MODE =
       ConfigOptions.key("distribution-mode").stringType().noDefaultValue();
+
+  // Branch to write to
+  public static final ConfigOption<String> BRANCH =
+      ConfigOptions.key("branch").stringType().defaultValue(SnapshotRef.MAIN_BRANCH);
 }
diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
index 81706e5824..445b6a6ff9 100644
--- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
@@ -316,6 +316,11 @@ public class FlinkSink {
       return this;
     }
 
+    public Builder toBranch(String branch) {
+      writeOptions.put(FlinkWriteOptions.BRANCH.key(), branch);
+      return this;
+    }
+
     private <T> DataStreamSink<T> chainIcebergOperators() {
       Preconditions.checkArgument(
           inputCreator != null,
@@ -422,7 +427,8 @@ public class FlinkSink {
               tableLoader,
               flinkWriteConf.overwriteMode(),
               snapshotProperties,
-              flinkWriteConf.workerPoolSize());
+              flinkWriteConf.workerPoolSize(),
+              flinkWriteConf.branch());
       SingleOutputStreamOperator<Void> committerStream =
           writerStream
               .transform(operatorName(ICEBERG_FILES_COMMITTER_NAME), Types.VOID, filesCommitter)
diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
index d8a7bc5cf2..22b4dc9d21 100644
--- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
@@ -95,6 +95,7 @@ class IcebergFilesCommitter extends AbstractStreamOperator<Void>
   // The completed files cache for current checkpoint. Once the snapshot barrier received, it will
   // be flushed to the 'dataFilesPerCheckpoint'.
   private final List<WriteResult> writeResultsOfCurrentCkpt = Lists.newArrayList();
+  private final String branch;
 
   // It will have an unique identifier for one job.
   private transient String flinkJobId;
@@ -125,11 +126,13 @@ class IcebergFilesCommitter extends AbstractStreamOperator<Void>
       TableLoader tableLoader,
       boolean replacePartitions,
       Map<String, String> snapshotProperties,
-      Integer workerPoolSize) {
+      Integer workerPoolSize,
+      String branch) {
     this.tableLoader = tableLoader;
     this.replacePartitions = replacePartitions;
     this.snapshotProperties = snapshotProperties;
     this.workerPoolSize = workerPoolSize;
+    this.branch = branch;
   }
 
   @Override
@@ -179,7 +182,7 @@ class IcebergFilesCommitter extends AbstractStreamOperator<Void>
       // it's safe to assign the max committed checkpoint id from restored flink job to the current
       // flink job.
       this.maxCommittedCheckpointId =
-          getMaxCommittedCheckpointId(table, restoredFlinkJobId, operatorUniqueId);
+          getMaxCommittedCheckpointId(table, restoredFlinkJobId, operatorUniqueId, branch);
 
       NavigableMap<Long, byte[]> uncommittedDataFiles =
           Maps.newTreeMap(checkpointsState.get().iterator().next())
@@ -230,7 +233,6 @@ class IcebergFilesCommitter extends AbstractStreamOperator<Void>
     // the files,
     // Besides, we need to maintain the max-committed-checkpoint-id to be increasing.
     if (checkpointId > maxCommittedCheckpointId) {
-      LOG.info("Checkpoint {} completed. Attempting commit.", checkpointId);
       commitUpToCheckpoint(dataFilesPerCheckpoint, flinkJobId, operatorUniqueId, checkpointId);
       this.maxCommittedCheckpointId = checkpointId;
     } else {
@@ -286,8 +288,6 @@ class IcebergFilesCommitter extends AbstractStreamOperator<Void>
         commitDeltaTxn(pendingResults, summary, newFlinkJobId, operatorId, checkpointId);
       }
       continuousEmptyCheckpoints = 0;
-    } else {
-      LOG.info("Skip commit for checkpoint {} due to no data files or delete files.", checkpointId);
     }
   }
 
@@ -386,10 +386,11 @@ class IcebergFilesCommitter extends AbstractStreamOperator<Void>
       String operatorId,
       long checkpointId) {
     LOG.info(
-        "Committing {} for checkpoint {} to table {} with summary: {}",
+        "Committing {} for checkpoint {} to table {} branch {} with summary: {}",
         description,
         checkpointId,
         table.name(),
+        branch,
         summary);
     snapshotProperties.forEach(operation::set);
     // custom snapshot metadata properties will be overridden if they conflict with internal ones
@@ -397,14 +398,16 @@ class IcebergFilesCommitter extends AbstractStreamOperator<Void>
     operation.set(MAX_COMMITTED_CHECKPOINT_ID, Long.toString(checkpointId));
     operation.set(FLINK_JOB_ID, newFlinkJobId);
     operation.set(OPERATOR_ID, operatorId);
+    operation.toBranch(branch);
 
     long startNano = System.nanoTime();
     operation.commit(); // abort is automatically called if this fails.
     long durationMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNano);
     LOG.info(
-        "Committed {} to table: {}, checkpointId {} in {} ms",
+        "Committed {} to table: {}, branch: {}, checkpointId {} in {} ms",
         description,
         table.name(),
+        branch,
         checkpointId,
         durationMs);
     committerMetrics.commitDuration(durationMs);
@@ -474,8 +477,9 @@ class IcebergFilesCommitter extends AbstractStreamOperator<Void>
     return new ListStateDescriptor<>("iceberg-files-committer-state", sortedMapTypeInfo);
   }
 
-  static long getMaxCommittedCheckpointId(Table table, String flinkJobId, String operatorId) {
-    Snapshot snapshot = table.currentSnapshot();
+  static long getMaxCommittedCheckpointId(
+      Table table, String flinkJobId, String operatorId, String branch) {
+    Snapshot snapshot = table.snapshot(branch);
     long lastCommittedCheckpointId = INITIAL_CHECKPOINT_ID;
 
     while (snapshot != null) {
diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java
index e296763508..345d88a48a 100644
--- a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java
+++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java
@@ -43,6 +43,7 @@ import org.apache.iceberg.FileScanTask;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotRef;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableScan;
 import org.apache.iceberg.data.GenericRecord;
@@ -206,12 +207,18 @@ public class SimpleDataUtil {
     return records;
   }
 
-  public static void assertTableRows(String tablePath, List<RowData> expected) throws IOException {
-    assertTableRecords(tablePath, convertToRecords(expected));
+  public static void assertTableRows(String tablePath, List<RowData> expected, String branch)
+      throws IOException {
+    assertTableRecords(tablePath, convertToRecords(expected), branch);
   }
 
   public static void assertTableRows(Table table, List<RowData> expected) throws IOException {
-    assertTableRecords(table, convertToRecords(expected));
+    assertTableRecords(table, convertToRecords(expected), SnapshotRef.MAIN_BRANCH);
+  }
+
+  public static void assertTableRows(Table table, List<RowData> expected, String branch)
+      throws IOException {
+    assertTableRecords(table, convertToRecords(expected), branch);
   }
 
   /** Get all rows for a table */
@@ -267,13 +274,25 @@ public class SimpleDataUtil {
   }
 
   public static void assertTableRecords(Table table, List<Record> expected) throws IOException {
+    assertTableRecords(table, expected, SnapshotRef.MAIN_BRANCH);
+  }
+
+  public static void assertTableRecords(Table table, List<Record> expected, String branch)
+      throws IOException {
     table.refresh();
+    Snapshot snapshot = latestSnapshot(table, branch);
+
+    if (snapshot == null) {
+      Assert.assertEquals(expected, ImmutableList.of());
+      return;
+    }
 
     Types.StructType type = table.schema().asStruct();
     StructLikeSet expectedSet = StructLikeSet.create(type);
     expectedSet.addAll(expected);
 
-    try (CloseableIterable<Record> iterable = IcebergGenerics.read(table).build()) {
+    try (CloseableIterable<Record> iterable =
+        IcebergGenerics.read(table).useSnapshot(snapshot.snapshotId()).build()) {
       StructLikeSet actualSet = StructLikeSet.create(type);
 
       for (Record record : iterable) {
@@ -284,10 +303,27 @@ public class SimpleDataUtil {
     }
   }
 
+  // Returns the latest snapshot of the given branch in the table
+  public static Snapshot latestSnapshot(Table table, String branch) {
+    // For the main branch, currentSnapshot() is used to validate that the API behavior has
+    // not changed since that was the API used for validation prior to addition of branches.
+    if (branch.equals(SnapshotRef.MAIN_BRANCH)) {
+      return table.currentSnapshot();
+    }
+
+    return table.snapshot(branch);
+  }
+
   public static void assertTableRecords(String tablePath, List<Record> expected)
       throws IOException {
     Preconditions.checkArgument(expected != null, "expected records shouldn't be null");
-    assertTableRecords(new HadoopTables().load(tablePath), expected);
+    assertTableRecords(new HadoopTables().load(tablePath), expected, SnapshotRef.MAIN_BRANCH);
+  }
+
+  public static void assertTableRecords(String tablePath, List<Record> expected, String branch)
+      throws IOException {
+    Preconditions.checkArgument(expected != null, "expected records shouldn't be null");
+    assertTableRecords(new HadoopTables().load(tablePath), expected, branch);
   }
 
   public static StructLikeSet expectedRowSet(Table table, Record... records) {
diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java
index c2af30d342..23beb19a72 100644
--- a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java
+++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java
@@ -22,14 +22,10 @@ import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.stream.Collectors;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.data.util.DataFormatConverters;
 import org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.flink.types.Row;
 import org.apache.iceberg.AssertHelpers;
@@ -45,7 +41,6 @@ import org.apache.iceberg.flink.MiniClusterResource;
 import org.apache.iceberg.flink.SimpleDataUtil;
 import org.apache.iceberg.flink.TableLoader;
 import org.apache.iceberg.flink.TestFixtures;
-import org.apache.iceberg.flink.source.BoundedTestSource;
 import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
@@ -60,7 +55,7 @@ import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
 @RunWith(Parameterized.class)
-public class TestFlinkIcebergSink {
+public class TestFlinkIcebergSink extends TestFlinkIcebergSinkBase {
 
   @ClassRule
   public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE =
@@ -72,13 +67,6 @@ public class TestFlinkIcebergSink {
   public final HadoopCatalogResource catalogResource =
       new HadoopCatalogResource(TEMPORARY_FOLDER, TestFixtures.DATABASE, TestFixtures.TABLE);
 
-  private static final TypeInformation<Row> ROW_TYPE_INFO =
-      new RowTypeInfo(SimpleDataUtil.FLINK_SCHEMA.getFieldTypes());
-  private static final DataFormatConverters.RowConverter CONVERTER =
-      new DataFormatConverters.RowConverter(SimpleDataUtil.FLINK_SCHEMA.getFieldDataTypes());
-
-  private Table table;
-  private StreamExecutionEnvironment env;
   private TableLoader tableLoader;
 
   private final FileFormat format;
@@ -132,14 +120,6 @@ public class TestFlinkIcebergSink {
     tableLoader = catalogResource.tableLoader();
   }
 
-  private List<RowData> convertToRowData(List<Row> rows) {
-    return rows.stream().map(CONVERTER::toInternal).collect(Collectors.toList());
-  }
-
-  private BoundedTestSource<Row> createBoundedSource(List<Row> rows) {
-    return new BoundedTestSource<>(rows.toArray(new Row[0]));
-  }
-
   @Test
   public void testWriteRowData() throws Exception {
     List<Row> rows = Lists.newArrayList(Row.of(1, "hello"), Row.of(2, "world"), Row.of(3, "foo"));
@@ -160,19 +140,6 @@ public class TestFlinkIcebergSink {
     SimpleDataUtil.assertTableRows(table, convertToRowData(rows));
   }
 
-  private List<Row> createRows(String prefix) {
-    return Lists.newArrayList(
-        Row.of(1, prefix + "aaa"),
-        Row.of(1, prefix + "bbb"),
-        Row.of(1, prefix + "ccc"),
-        Row.of(2, prefix + "aaa"),
-        Row.of(2, prefix + "bbb"),
-        Row.of(2, prefix + "ccc"),
-        Row.of(3, prefix + "aaa"),
-        Row.of(3, prefix + "bbb"),
-        Row.of(3, prefix + "ccc"));
-  }
-
   private void testWriteRow(TableSchema tableSchema, DistributionMode distributionMode)
       throws Exception {
     List<Row> rows = createRows("");
diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkBase.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkBase.java
new file mode 100644
index 0000000000..b38aa6b50c
--- /dev/null
+++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkBase.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink;
+
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.util.DataFormatConverters;
+import org.apache.flink.types.Row;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.flink.SimpleDataUtil;
+import org.apache.iceberg.flink.source.BoundedTestSource;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+
+public class TestFlinkIcebergSinkBase {
+
+  protected Table table;
+  protected StreamExecutionEnvironment env;
+  protected static final TypeInformation<Row> ROW_TYPE_INFO =
+      new RowTypeInfo(SimpleDataUtil.FLINK_SCHEMA.getFieldTypes());
+
+  protected static final DataFormatConverters.RowConverter CONVERTER =
+      new DataFormatConverters.RowConverter(SimpleDataUtil.FLINK_SCHEMA.getFieldDataTypes());
+
+  protected BoundedTestSource<Row> createBoundedSource(List<Row> rows) {
+    return new BoundedTestSource<>(rows.toArray(new Row[0]));
+  }
+
+  protected List<Row> createRows(String prefix) {
+    return Lists.newArrayList(
+        Row.of(1, prefix + "aaa"),
+        Row.of(1, prefix + "bbb"),
+        Row.of(1, prefix + "ccc"),
+        Row.of(2, prefix + "aaa"),
+        Row.of(2, prefix + "bbb"),
+        Row.of(2, prefix + "ccc"),
+        Row.of(3, prefix + "aaa"),
+        Row.of(3, prefix + "bbb"),
+        Row.of(3, prefix + "ccc"));
+  }
+
+  protected List<RowData> convertToRowData(List<Row> rows) {
+    return rows.stream().map(CONVERTER::toInternal).collect(Collectors.toList());
+  }
+}
diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkBranch.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkBranch.java
new file mode 100644
index 0000000000..16b4542b00
--- /dev/null
+++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkBranch.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.types.Row;
+import org.apache.iceberg.DistributionMode;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.SnapshotRef;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.flink.HadoopCatalogResource;
+import org.apache.iceberg.flink.MiniClusterResource;
+import org.apache.iceberg.flink.SimpleDataUtil;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.flink.TestFixtures;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class TestFlinkIcebergSinkBranch extends TestFlinkIcebergSinkBase {
+
+  @ClassRule
+  public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE =
+      MiniClusterResource.createWithClassloaderCheckDisabled();
+
+  @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
+
+  @Rule
+  public final HadoopCatalogResource catalogResource =
+      new HadoopCatalogResource(TEMPORARY_FOLDER, TestFixtures.DATABASE, TestFixtures.TABLE);
+
+  private final String branch;
+  private TableLoader tableLoader;
+
+  @Parameterized.Parameters(name = "formatVersion = {0}, branch = {1}")
+  public static Object[] parameters() {
+    return new Object[] {"main", "testBranch"};
+  }
+
+  public TestFlinkIcebergSinkBranch(String branch) {
+    this.branch = branch;
+  }
+
+  @Before
+  public void before() throws IOException {
+    table =
+        catalogResource
+            .catalog()
+            .createTable(
+                TestFixtures.TABLE_IDENTIFIER,
+                SimpleDataUtil.SCHEMA,
+                PartitionSpec.unpartitioned(),
+                ImmutableMap.of(
+                    TableProperties.DEFAULT_FILE_FORMAT,
+                    FileFormat.AVRO.name(),
+                    TableProperties.FORMAT_VERSION,
+                    "1"));
+
+    env =
+        StreamExecutionEnvironment.getExecutionEnvironment(
+                MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG)
+            .enableCheckpointing(100);
+
+    tableLoader = catalogResource.tableLoader();
+  }
+
+  @Test
+  public void testWriteRowWithTableSchema() throws Exception {
+    testWriteRow(SimpleDataUtil.FLINK_SCHEMA, DistributionMode.NONE);
+    verifyOtherBranchUnmodified();
+  }
+
+  private void testWriteRow(TableSchema tableSchema, DistributionMode distributionMode)
+      throws Exception {
+    List<Row> rows = createRows("");
+    DataStream<Row> dataStream = env.addSource(createBoundedSource(rows), ROW_TYPE_INFO);
+
+    FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA)
+        .table(table)
+        .tableLoader(tableLoader)
+        .tableSchema(tableSchema)
+        .toBranch(branch)
+        .distributionMode(distributionMode)
+        .append();
+
+    // Execute the program.
+    env.execute("Test Iceberg DataStream.");
+
+    SimpleDataUtil.assertTableRows(table, convertToRowData(rows), branch);
+    SimpleDataUtil.assertTableRows(
+        table,
+        ImmutableList.of(),
+        branch.equals(SnapshotRef.MAIN_BRANCH) ? "test-branch" : SnapshotRef.MAIN_BRANCH);
+
+    verifyOtherBranchUnmodified();
+  }
+
+  private void verifyOtherBranchUnmodified() {
+    String otherBranch =
+        branch.equals(SnapshotRef.MAIN_BRANCH) ? "test-branch" : SnapshotRef.MAIN_BRANCH;
+    if (otherBranch.equals(SnapshotRef.MAIN_BRANCH)) {
+      Assert.assertNull(table.currentSnapshot());
+    }
+
+    Assert.assertTrue(table.snapshot(otherBranch) == null);
+  }
+}
diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java
index 5d8a56ebbf..422bd97cd7 100644
--- a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java
+++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java
@@ -18,38 +18,25 @@
  */
 package org.apache.iceberg.flink.sink;
 
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.flink.types.Row;
-import org.apache.flink.types.RowKind;
 import org.apache.iceberg.AssertHelpers;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.PartitionSpec;
-import org.apache.iceberg.Snapshot;
-import org.apache.iceberg.Table;
+import org.apache.iceberg.SnapshotRef;
 import org.apache.iceberg.TableProperties;
-import org.apache.iceberg.data.IcebergGenerics;
-import org.apache.iceberg.data.Record;
 import org.apache.iceberg.flink.HadoopCatalogResource;
 import org.apache.iceberg.flink.MiniClusterResource;
 import org.apache.iceberg.flink.SimpleDataUtil;
-import org.apache.iceberg.flink.TableLoader;
 import org.apache.iceberg.flink.TestFixtures;
 import org.apache.iceberg.flink.source.BoundedTestSource;
-import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.types.Types;
-import org.apache.iceberg.util.StructLikeSet;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.ClassRule;
@@ -60,7 +47,7 @@ import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
 @RunWith(Parameterized.class)
-public class TestFlinkIcebergSinkV2 {
+public class TestFlinkIcebergSinkV2 extends TestFlinkIcebergSinkV2Base {
 
   @ClassRule
   public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE =
@@ -72,29 +59,6 @@ public class TestFlinkIcebergSinkV2 {
   public final HadoopCatalogResource catalogResource =
       new HadoopCatalogResource(TEMPORARY_FOLDER, TestFixtures.DATABASE, TestFixtures.TABLE);
 
-  private static final int FORMAT_V2 = 2;
-  private static final TypeInformation<Row> ROW_TYPE_INFO =
-      new RowTypeInfo(SimpleDataUtil.FLINK_SCHEMA.getFieldTypes());
-
-  private static final Map<String, RowKind> ROW_KIND_MAP =
-      ImmutableMap.of(
-          "+I", RowKind.INSERT,
-          "-D", RowKind.DELETE,
-          "-U", RowKind.UPDATE_BEFORE,
-          "+U", RowKind.UPDATE_AFTER);
-
-  private static final int ROW_ID_POS = 0;
-  private static final int ROW_DATA_POS = 1;
-
-  private final FileFormat format;
-  private final int parallelism;
-  private final boolean partitioned;
-  private final String writeDistributionMode;
-
-  private Table table;
-  private StreamExecutionEnvironment env;
-  private TableLoader tableLoader;
-
   @Parameterized.Parameters(
       name = "FileFormat = {0}, Parallelism = {1}, Partitioned={2}, WriteDistributionMode ={3}")
   public static Object[][] parameters() {
@@ -155,67 +119,6 @@ public class TestFlinkIcebergSinkV2 {
     tableLoader = catalogResource.tableLoader();
   }
 
-  private List<Snapshot> findValidSnapshots() {
-    List<Snapshot> validSnapshots = Lists.newArrayList();
-    for (Snapshot snapshot : table.snapshots()) {
-      if (snapshot.allManifests(table.io()).stream()
-          .anyMatch(m -> snapshot.snapshotId() == m.snapshotId())) {
-        validSnapshots.add(snapshot);
-      }
-    }
-    return validSnapshots;
-  }
-
-  private void testChangeLogs(
-      List<String> equalityFieldColumns,
-      KeySelector<Row, Object> keySelector,
-      boolean insertAsUpsert,
-      List<List<Row>> elementsPerCheckpoint,
-      List<List<Record>> expectedRecordsPerCheckpoint)
-      throws Exception {
-    DataStream<Row> dataStream =
-        env.addSource(new BoundedTestSource<>(elementsPerCheckpoint), ROW_TYPE_INFO);
-
-    FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA)
-        .tableLoader(tableLoader)
-        .tableSchema(SimpleDataUtil.FLINK_SCHEMA)
-        .writeParallelism(parallelism)
-        .equalityFieldColumns(equalityFieldColumns)
-        .upsert(insertAsUpsert)
-        .append();
-
-    // Execute the program.
-    env.execute("Test Iceberg Change-Log DataStream.");
-
-    table.refresh();
-    List<Snapshot> snapshots = findValidSnapshots();
-    int expectedSnapshotNum = expectedRecordsPerCheckpoint.size();
-    Assert.assertEquals(
-        "Should have the expected snapshot number", expectedSnapshotNum, snapshots.size());
-
-    for (int i = 0; i < expectedSnapshotNum; i++) {
-      long snapshotId = snapshots.get(i).snapshotId();
-      List<Record> expectedRecords = expectedRecordsPerCheckpoint.get(i);
-      Assert.assertEquals(
-          "Should have the expected records for the checkpoint#" + i,
-          expectedRowSet(expectedRecords.toArray(new Record[0])),
-          actualRowSet(snapshotId, "*"));
-    }
-  }
-
-  private Row row(String rowKind, int id, String data) {
-    RowKind kind = ROW_KIND_MAP.get(rowKind);
-    if (kind == null) {
-      throw new IllegalArgumentException("Unknown row kind: " + rowKind);
-    }
-
-    return Row.ofKind(kind, id, data);
-  }
-
-  private Record record(int id, String data) {
-    return SimpleDataUtil.createRecord(id, data);
-  }
-
   @Test
   public void testCheckAndGetEqualityFieldIds() {
     table
@@ -249,108 +152,17 @@ public class TestFlinkIcebergSinkV2 {
 
   @Test
   public void testChangeLogOnIdKey() throws Exception {
-    List<List<Row>> elementsPerCheckpoint =
-        ImmutableList.of(
-            ImmutableList.of(
-                row("+I", 1, "aaa"),
-                row("-D", 1, "aaa"),
-                row("+I", 1, "bbb"),
-                row("+I", 2, "aaa"),
-                row("-D", 2, "aaa"),
-                row("+I", 2, "bbb")),
-            ImmutableList.of(
-                row("-U", 2, "bbb"), row("+U", 2, "ccc"), row("-D", 2, "ccc"), row("+I", 2, "ddd")),
-            ImmutableList.of(
-                row("-D", 1, "bbb"),
-                row("+I", 1, "ccc"),
-                row("-D", 1, "ccc"),
-                row("+I", 1, "ddd")));
-
-    List<List<Record>> expectedRecords =
-        ImmutableList.of(
-            ImmutableList.of(record(1, "bbb"), record(2, "bbb")),
-            ImmutableList.of(record(1, "bbb"), record(2, "ddd")),
-            ImmutableList.of(record(1, "ddd"), record(2, "ddd")));
-
-    if (partitioned && writeDistributionMode.equals(TableProperties.WRITE_DISTRIBUTION_MODE_HASH)) {
-      AssertHelpers.assertThrows(
-          "Should be error because equality field columns don't include all partition keys",
-          IllegalStateException.class,
-          "should be included in equality fields",
-          () -> {
-            testChangeLogs(
-                ImmutableList.of("id"),
-                row -> row.getField(ROW_ID_POS),
-                false,
-                elementsPerCheckpoint,
-                expectedRecords);
-            return null;
-          });
-    } else {
-      testChangeLogs(
-          ImmutableList.of("id"),
-          row -> row.getField(ROW_ID_POS),
-          false,
-          elementsPerCheckpoint,
-          expectedRecords);
-    }
+    testChangeLogOnIdKey(SnapshotRef.MAIN_BRANCH);
   }
 
   @Test
   public void testChangeLogOnDataKey() throws Exception {
-    List<List<Row>> elementsPerCheckpoint =
-        ImmutableList.of(
-            ImmutableList.of(
-                row("+I", 1, "aaa"),
-                row("-D", 1, "aaa"),
-                row("+I", 2, "bbb"),
-                row("+I", 1, "bbb"),
-                row("+I", 2, "aaa")),
-            ImmutableList.of(row("-U", 2, "aaa"), row("+U", 1, "ccc"), row("+I", 1, "aaa")),
-            ImmutableList.of(row("-D", 1, "bbb"), row("+I", 2, "aaa"), row("+I", 2, "ccc")));
-
-    List<List<Record>> expectedRecords =
-        ImmutableList.of(
-            ImmutableList.of(record(1, "bbb"), record(2, "aaa")),
-            ImmutableList.of(record(1, "aaa"), record(1, "bbb"), record(1, "ccc")),
-            ImmutableList.of(
-                record(1, "aaa"), record(1, "ccc"), record(2, "aaa"), record(2, "ccc")));
-
-    testChangeLogs(
-        ImmutableList.of("data"),
-        row -> row.getField(ROW_DATA_POS),
-        false,
-        elementsPerCheckpoint,
-        expectedRecords);
+    testChangeLogOnDataKey(SnapshotRef.MAIN_BRANCH);
   }
 
   @Test
   public void testChangeLogOnIdDataKey() throws Exception {
-    List<List<Row>> elementsPerCheckpoint =
-        ImmutableList.of(
-            ImmutableList.of(
-                row("+I", 1, "aaa"),
-                row("-D", 1, "aaa"),
-                row("+I", 2, "bbb"),
-                row("+I", 1, "bbb"),
-                row("+I", 2, "aaa")),
-            ImmutableList.of(row("-U", 2, "aaa"), row("+U", 1, "ccc"), row("+I", 1, "aaa")),
-            ImmutableList.of(row("-D", 1, "bbb"), row("+I", 2, "aaa")));
-
-    List<List<Record>> expectedRecords =
-        ImmutableList.of(
-            ImmutableList.of(record(1, "bbb"), record(2, "aaa"), record(2, "bbb")),
-            ImmutableList.of(
-                record(1, "aaa"), record(1, "bbb"), record(1, "ccc"), record(2, "bbb")),
-            ImmutableList.of(
-                record(1, "aaa"), record(1, "ccc"), record(2, "aaa"), record(2, "bbb")));
-
-    testChangeLogs(
-        ImmutableList.of("data", "id"),
-        row -> Row.of(row.getField(ROW_ID_POS), row.getField(ROW_DATA_POS)),
-        false,
-        elementsPerCheckpoint,
-        expectedRecords);
+    testChangeLogOnIdDataKey(SnapshotRef.MAIN_BRANCH);
   }
 
   @Test
@@ -373,30 +185,7 @@ public class TestFlinkIcebergSinkV2 {
 
   @Test
   public void testChangeLogOnSameKey() throws Exception {
-    List<List<Row>> elementsPerCheckpoint =
-        ImmutableList.of(
-            // Checkpoint #1
-            ImmutableList.of(row("+I", 1, "aaa"), row("-D", 1, "aaa"), row("+I", 1, "aaa")),
-            // Checkpoint #2
-            ImmutableList.of(row("-U", 1, "aaa"), row("+U", 1, "aaa")),
-            // Checkpoint #3
-            ImmutableList.of(row("-D", 1, "aaa"), row("+I", 1, "aaa")),
-            // Checkpoint #4
-            ImmutableList.of(row("-U", 1, "aaa"), row("+U", 1, "aaa"), row("+I", 1, "aaa")));
-
-    List<List<Record>> expectedRecords =
-        ImmutableList.of(
-            ImmutableList.of(record(1, "aaa")),
-            ImmutableList.of(record(1, "aaa")),
-            ImmutableList.of(record(1, "aaa")),
-            ImmutableList.of(record(1, "aaa"), record(1, "aaa")));
-
-    testChangeLogs(
-        ImmutableList.of("id", "data"),
-        row -> Row.of(row.getField(ROW_ID_POS), row.getField(ROW_DATA_POS)),
-        false,
-        elementsPerCheckpoint,
-        expectedRecords);
+    testChangeLogOnSameKey(SnapshotRef.MAIN_BRANCH);
   }
 
   @Test
@@ -426,97 +215,16 @@ public class TestFlinkIcebergSinkV2 {
 
   @Test
   public void testUpsertOnIdKey() throws Exception {
-    List<List<Row>> elementsPerCheckpoint =
-        ImmutableList.of(
-            ImmutableList.of(row("+I", 1, "aaa"), row("+U", 1, "bbb")),
-            ImmutableList.of(row("+I", 1, "ccc")),
-            ImmutableList.of(row("+U", 1, "ddd"), row("+I", 1, "eee")));
-
-    List<List<Record>> expectedRecords =
-        ImmutableList.of(
-            ImmutableList.of(record(1, "bbb")),
-            ImmutableList.of(record(1, "ccc")),
-            ImmutableList.of(record(1, "eee")));
-
-    if (!partitioned) {
-      testChangeLogs(
-          ImmutableList.of("id"),
-          row -> row.getField(ROW_ID_POS),
-          true,
-          elementsPerCheckpoint,
-          expectedRecords);
-    } else {
-      AssertHelpers.assertThrows(
-          "Should be error because equality field columns don't include all partition keys",
-          IllegalStateException.class,
-          "should be included in equality fields",
-          () -> {
-            testChangeLogs(
-                ImmutableList.of("id"),
-                row -> row.getField(ROW_ID_POS),
-                true,
-                elementsPerCheckpoint,
-                expectedRecords);
-            return null;
-          });
-    }
+    testUpsertOnIdKey(SnapshotRef.MAIN_BRANCH);
   }
 
   @Test
   public void testUpsertOnDataKey() throws Exception {
-    List<List<Row>> elementsPerCheckpoint =
-        ImmutableList.of(
-            ImmutableList.of(row("+I", 1, "aaa"), row("+I", 2, "aaa"), row("+I", 3, "bbb")),
-            ImmutableList.of(row("+U", 4, "aaa"), row("-U", 3, "bbb"), row("+U", 5, "bbb")),
-            ImmutableList.of(row("+I", 6, "aaa"), row("+U", 7, "bbb")));
-
-    List<List<Record>> expectedRecords =
-        ImmutableList.of(
-            ImmutableList.of(record(2, "aaa"), record(3, "bbb")),
-            ImmutableList.of(record(4, "aaa"), record(5, "bbb")),
-            ImmutableList.of(record(6, "aaa"), record(7, "bbb")));
-
-    testChangeLogs(
-        ImmutableList.of("data"),
-        row -> row.getField(ROW_DATA_POS),
-        true,
-        elementsPerCheckpoint,
-        expectedRecords);
+    testUpsertOnDataKey(SnapshotRef.MAIN_BRANCH);
   }
 
   @Test
   public void testUpsertOnIdDataKey() throws Exception {
-    List<List<Row>> elementsPerCheckpoint =
-        ImmutableList.of(
-            ImmutableList.of(row("+I", 1, "aaa"), row("+U", 1, "aaa"), row("+I", 2, "bbb")),
-            ImmutableList.of(row("+I", 1, "aaa"), row("-D", 2, "bbb"), row("+I", 2, "ccc")),
-            ImmutableList.of(row("+U", 1, "bbb"), row("-U", 1, "ccc"), row("-D", 1, "aaa")));
-
-    List<List<Record>> expectedRecords =
-        ImmutableList.of(
-            ImmutableList.of(record(1, "aaa"), record(2, "bbb")),
-            ImmutableList.of(record(1, "aaa"), record(2, "ccc")),
-            ImmutableList.of(record(1, "bbb"), record(2, "ccc")));
-
-    testChangeLogs(
-        ImmutableList.of("id", "data"),
-        row -> Row.of(row.getField(ROW_ID_POS), row.getField(ROW_DATA_POS)),
-        true,
-        elementsPerCheckpoint,
-        expectedRecords);
-  }
-
-  private StructLikeSet expectedRowSet(Record... records) {
-    return SimpleDataUtil.expectedRowSet(table, records);
-  }
-
-  private StructLikeSet actualRowSet(long snapshotId, String... columns) throws IOException {
-    table.refresh();
-    StructLikeSet set = StructLikeSet.create(table.schema().asStruct());
-    try (CloseableIterable<Record> reader =
-        IcebergGenerics.read(table).useSnapshot(snapshotId).select(columns).build()) {
-      reader.forEach(set::add);
-    }
-    return set;
+    testUpsertOnIdDataKey(SnapshotRef.MAIN_BRANCH);
   }
 }
diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2Base.java
similarity index 60%
copy from flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java
copy to flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2Base.java
index ccc3c0f23d..15380408e4 100644
--- a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java
+++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2Base.java
@@ -26,184 +26,50 @@ import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.flink.types.Row;
 import org.apache.flink.types.RowKind;
 import org.apache.iceberg.AssertHelpers;
 import org.apache.iceberg.FileFormat;
-import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.data.IcebergGenerics;
 import org.apache.iceberg.data.Record;
-import org.apache.iceberg.flink.HadoopCatalogResource;
-import org.apache.iceberg.flink.MiniClusterResource;
 import org.apache.iceberg.flink.SimpleDataUtil;
 import org.apache.iceberg.flink.TableLoader;
-import org.apache.iceberg.flink.TestFixtures;
 import org.apache.iceberg.flink.source.BoundedTestSource;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
-import org.apache.iceberg.relocated.com.google.common.collect.Sets;
-import org.apache.iceberg.types.Types;
 import org.apache.iceberg.util.StructLikeSet;
 import org.junit.Assert;
-import org.junit.Before;
-import org.junit.ClassRule;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
 
-@RunWith(Parameterized.class)
-public class TestFlinkIcebergSinkV2 {
+public class TestFlinkIcebergSinkV2Base {
 
-  @ClassRule
-  public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE =
-      MiniClusterResource.createWithClassloaderCheckDisabled();
-
-  @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
+  protected static final int FORMAT_V2 = 2;
+  protected static final TypeInformation<Row> ROW_TYPE_INFO =
+      new RowTypeInfo(SimpleDataUtil.FLINK_SCHEMA.getFieldTypes());
 
-  @Rule
-  public final HadoopCatalogResource catalogResource =
-      new HadoopCatalogResource(TEMPORARY_FOLDER, TestFixtures.DATABASE, TestFixtures.TABLE);
+  protected static final int ROW_ID_POS = 0;
+  protected static final int ROW_DATA_POS = 1;
 
-  private static final int FORMAT_V2 = 2;
-  private static final TypeInformation<Row> ROW_TYPE_INFO =
-      new RowTypeInfo(SimpleDataUtil.FLINK_SCHEMA.getFieldTypes());
+  protected int parallelism = 1;
+  protected TableLoader tableLoader;
+  protected Table table;
+  protected StreamExecutionEnvironment env;
+  protected FileFormat format;
+  protected boolean partitioned;
+  protected String writeDistributionMode;
 
-  private static final Map<String, RowKind> ROW_KIND_MAP =
+  protected static final Map<String, RowKind> ROW_KIND_MAP =
       ImmutableMap.of(
           "+I", RowKind.INSERT,
           "-D", RowKind.DELETE,
           "-U", RowKind.UPDATE_BEFORE,
           "+U", RowKind.UPDATE_AFTER);
 
-  private static final int ROW_ID_POS = 0;
-  private static final int ROW_DATA_POS = 1;
-
-  private final FileFormat format;
-  private final int parallelism;
-  private final boolean partitioned;
-  private final String writeDistributionMode;
-
-  private Table table;
-  private StreamExecutionEnvironment env;
-  private TableLoader tableLoader;
-
-  @Parameterized.Parameters(
-      name = "FileFormat = {0}, Parallelism = {1}, Partitioned={2}, WriteDistributionMode ={3}")
-  public static Object[][] parameters() {
-    return new Object[][] {
-      new Object[] {"avro", 1, true, TableProperties.WRITE_DISTRIBUTION_MODE_NONE},
-      new Object[] {"avro", 1, false, TableProperties.WRITE_DISTRIBUTION_MODE_NONE},
-      new Object[] {"avro", 4, true, TableProperties.WRITE_DISTRIBUTION_MODE_NONE},
-      new Object[] {"avro", 4, false, TableProperties.WRITE_DISTRIBUTION_MODE_NONE},
-      new Object[] {"orc", 1, true, TableProperties.WRITE_DISTRIBUTION_MODE_HASH},
-      new Object[] {"orc", 1, false, TableProperties.WRITE_DISTRIBUTION_MODE_HASH},
-      new Object[] {"orc", 4, true, TableProperties.WRITE_DISTRIBUTION_MODE_HASH},
-      new Object[] {"orc", 4, false, TableProperties.WRITE_DISTRIBUTION_MODE_HASH},
-      new Object[] {"parquet", 1, true, TableProperties.WRITE_DISTRIBUTION_MODE_RANGE},
-      new Object[] {"parquet", 1, false, TableProperties.WRITE_DISTRIBUTION_MODE_RANGE},
-      new Object[] {"parquet", 4, true, TableProperties.WRITE_DISTRIBUTION_MODE_RANGE},
-      new Object[] {"parquet", 4, false, TableProperties.WRITE_DISTRIBUTION_MODE_RANGE}
-    };
-  }
-
-  public TestFlinkIcebergSinkV2(
-      String format, int parallelism, boolean partitioned, String writeDistributionMode) {
-    this.format = FileFormat.fromString(format);
-    this.parallelism = parallelism;
-    this.partitioned = partitioned;
-    this.writeDistributionMode = writeDistributionMode;
-  }
-
-  @Before
-  public void setupTable() {
-    table =
-        catalogResource
-            .catalog()
-            .createTable(
-                TestFixtures.TABLE_IDENTIFIER,
-                SimpleDataUtil.SCHEMA,
-                partitioned
-                    ? PartitionSpec.builderFor(SimpleDataUtil.SCHEMA).identity("data").build()
-                    : PartitionSpec.unpartitioned(),
-                ImmutableMap.of(
-                    TableProperties.DEFAULT_FILE_FORMAT,
-                    format.name(),
-                    TableProperties.FORMAT_VERSION,
-                    String.valueOf(FORMAT_V2)));
-
-    table
-        .updateProperties()
-        .set(TableProperties.DEFAULT_FILE_FORMAT, format.name())
-        .set(TableProperties.WRITE_DISTRIBUTION_MODE, writeDistributionMode)
-        .commit();
-
-    env =
-        StreamExecutionEnvironment.getExecutionEnvironment(
-                MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG)
-            .enableCheckpointing(100L)
-            .setParallelism(parallelism)
-            .setMaxParallelism(parallelism);
-
-    tableLoader = catalogResource.tableLoader();
-  }
-
-  private List<Snapshot> findValidSnapshots() {
-    List<Snapshot> validSnapshots = Lists.newArrayList();
-    for (Snapshot snapshot : table.snapshots()) {
-      if (snapshot.allManifests(table.io()).stream()
-          .anyMatch(m -> snapshot.snapshotId() == m.snapshotId())) {
-        validSnapshots.add(snapshot);
-      }
-    }
-    return validSnapshots;
-  }
-
-  private void testChangeLogs(
-      List<String> equalityFieldColumns,
-      KeySelector<Row, Object> keySelector,
-      boolean insertAsUpsert,
-      List<List<Row>> elementsPerCheckpoint,
-      List<List<Record>> expectedRecordsPerCheckpoint)
-      throws Exception {
-    DataStream<Row> dataStream =
-        env.addSource(new BoundedTestSource<>(elementsPerCheckpoint), ROW_TYPE_INFO);
-
-    FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA)
-        .tableLoader(tableLoader)
-        .tableSchema(SimpleDataUtil.FLINK_SCHEMA)
-        .writeParallelism(parallelism)
-        .equalityFieldColumns(equalityFieldColumns)
-        .upsert(insertAsUpsert)
-        .append();
-
-    // Execute the program.
-    env.execute("Test Iceberg Change-Log DataStream.");
-
-    table.refresh();
-    List<Snapshot> snapshots = findValidSnapshots();
-    int expectedSnapshotNum = expectedRecordsPerCheckpoint.size();
-    Assert.assertEquals(
-        "Should have the expected snapshot number", expectedSnapshotNum, snapshots.size());
-
-    for (int i = 0; i < expectedSnapshotNum; i++) {
-      long snapshotId = snapshots.get(i).snapshotId();
-      List<Record> expectedRecords = expectedRecordsPerCheckpoint.get(i);
-      Assert.assertEquals(
-          "Should have the expected records for the checkpoint#" + i,
-          expectedRowSet(expectedRecords.toArray(new Record[0])),
-          actualRowSet(snapshotId, "*"));
-    }
-  }
-
-  private Row row(String rowKind, int id, String data) {
+  protected Row row(String rowKind, int id, String data) {
     RowKind kind = ROW_KIND_MAP.get(rowKind);
     if (kind == null) {
       throw new IllegalArgumentException("Unknown row kind: " + rowKind);
@@ -212,92 +78,85 @@ public class TestFlinkIcebergSinkV2 {
     return Row.ofKind(kind, id, data);
   }
 
-  private Record record(int id, String data) {
-    return SimpleDataUtil.createRecord(id, data);
-  }
-
-  @Test
-  public void testCheckAndGetEqualityFieldIds() {
-    table
-        .updateSchema()
-        .allowIncompatibleChanges()
-        .addRequiredColumn("type", Types.StringType.get())
-        .setIdentifierFields("type")
-        .commit();
-
-    DataStream<Row> dataStream =
-        env.addSource(new BoundedTestSource<>(ImmutableList.of()), ROW_TYPE_INFO);
-    FlinkSink.Builder builder =
-        FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA).table(table);
-
-    // Use schema identifier field IDs as equality field id list by default
-    Assert.assertEquals(
-        table.schema().identifierFieldIds(),
-        Sets.newHashSet(builder.checkAndGetEqualityFieldIds()));
-
-    // Use user-provided equality field column as equality field id list
-    builder.equalityFieldColumns(Lists.newArrayList("id"));
-    Assert.assertEquals(
-        Sets.newHashSet(table.schema().findField("id").fieldId()),
-        Sets.newHashSet(builder.checkAndGetEqualityFieldIds()));
+  protected void testUpsertOnIdDataKey(String branch) throws Exception {
+    List<List<Row>> elementsPerCheckpoint =
+        ImmutableList.of(
+            ImmutableList.of(row("+I", 1, "aaa"), row("+U", 1, "aaa"), row("+I", 2, "bbb")),
+            ImmutableList.of(row("+I", 1, "aaa"), row("-D", 2, "bbb"), row("+I", 2, "ccc")),
+            ImmutableList.of(row("+U", 1, "bbb"), row("-U", 1, "ccc"), row("-D", 1, "aaa")));
 
-    builder.equalityFieldColumns(Lists.newArrayList("type"));
-    Assert.assertEquals(
-        Sets.newHashSet(table.schema().findField("type").fieldId()),
-        Sets.newHashSet(builder.checkAndGetEqualityFieldIds()));
+    List<List<Record>> expectedRecords =
+        ImmutableList.of(
+            ImmutableList.of(record(1, "aaa"), record(2, "bbb")),
+            ImmutableList.of(record(1, "aaa"), record(2, "ccc")),
+            ImmutableList.of(record(1, "bbb"), record(2, "ccc")));
+    testChangeLogs(
+        ImmutableList.of("id", "data"),
+        row -> Row.of(row.getField(ROW_ID_POS), row.getField(ROW_DATA_POS)),
+        true,
+        elementsPerCheckpoint,
+        expectedRecords,
+        branch);
   }
 
-  @Test
-  public void testChangeLogOnIdKey() throws Exception {
+  protected void testChangeLogOnIdDataKey(String branch) throws Exception {
     List<List<Row>> elementsPerCheckpoint =
         ImmutableList.of(
             ImmutableList.of(
                 row("+I", 1, "aaa"),
                 row("-D", 1, "aaa"),
+                row("+I", 2, "bbb"),
                 row("+I", 1, "bbb"),
-                row("+I", 2, "aaa"),
-                row("-D", 2, "aaa"),
-                row("+I", 2, "bbb")),
+                row("+I", 2, "aaa")),
+            ImmutableList.of(row("-U", 2, "aaa"), row("+U", 1, "ccc"), row("+I", 1, "aaa")),
+            ImmutableList.of(row("-D", 1, "bbb"), row("+I", 2, "aaa")));
+
+    List<List<Record>> expectedRecords =
+        ImmutableList.of(
+            ImmutableList.of(record(1, "bbb"), record(2, "aaa"), record(2, "bbb")),
             ImmutableList.of(
-                row("-U", 2, "bbb"), row("+U", 2, "ccc"), row("-D", 2, "ccc"), row("+I", 2, "ddd")),
+                record(1, "aaa"), record(1, "bbb"), record(1, "ccc"), record(2, "bbb")),
             ImmutableList.of(
-                row("-D", 1, "bbb"),
-                row("+I", 1, "ccc"),
-                row("-D", 1, "ccc"),
-                row("+I", 1, "ddd")));
+                record(1, "aaa"), record(1, "ccc"), record(2, "aaa"), record(2, "bbb")));
+
+    testChangeLogs(
+        ImmutableList.of("data", "id"),
+        row -> Row.of(row.getField(ROW_ID_POS), row.getField(ROW_DATA_POS)),
+        false,
+        elementsPerCheckpoint,
+        expectedRecords,
+        branch);
+  }
+
+  protected void testChangeLogOnSameKey(String branch) throws Exception {
+    List<List<Row>> elementsPerCheckpoint =
+        ImmutableList.of(
+            // Checkpoint #1
+            ImmutableList.of(row("+I", 1, "aaa"), row("-D", 1, "aaa"), row("+I", 1, "aaa")),
+            // Checkpoint #2
+            ImmutableList.of(row("-U", 1, "aaa"), row("+U", 1, "aaa")),
+            // Checkpoint #3
+            ImmutableList.of(row("-D", 1, "aaa"), row("+I", 1, "aaa")),
+            // Checkpoint #4
+            ImmutableList.of(row("-U", 1, "aaa"), row("+U", 1, "aaa"), row("+I", 1, "aaa")));
 
     List<List<Record>> expectedRecords =
         ImmutableList.of(
-            ImmutableList.of(record(1, "bbb"), record(2, "bbb")),
-            ImmutableList.of(record(1, "bbb"), record(2, "ddd")),
-            ImmutableList.of(record(1, "ddd"), record(2, "ddd")));
+            ImmutableList.of(record(1, "aaa")),
+            ImmutableList.of(record(1, "aaa")),
+            ImmutableList.of(record(1, "aaa")),
+            ImmutableList.of(record(1, "aaa"), record(1, "aaa")));
 
-    if (partitioned && writeDistributionMode.equals(TableProperties.WRITE_DISTRIBUTION_MODE_HASH)) {
-      AssertHelpers.assertThrows(
-          "Should be error because equality field columns don't include all partition keys",
-          IllegalStateException.class,
-          "should be included in equality fields",
-          () -> {
-            testChangeLogs(
-                ImmutableList.of("id"),
-                row -> row.getField(ROW_ID_POS),
-                false,
-                elementsPerCheckpoint,
-                expectedRecords);
-            return null;
-          });
-    } else {
-      testChangeLogs(
-          ImmutableList.of("id"),
-          row -> row.getField(ROW_ID_POS),
-          false,
-          elementsPerCheckpoint,
-          expectedRecords);
-    }
+    testChangeLogs(
+        ImmutableList.of("id", "data"),
+        row -> Row.of(row.getField(ROW_ID_POS), row.getField(ROW_DATA_POS)),
+        false,
+        elementsPerCheckpoint,
+        expectedRecords,
+        branch);
   }
 
-  @Test
-  public void testChangeLogOnDataKey() throws Exception {
+  protected void testChangeLogOnDataKey(String branch) throws Exception {
     List<List<Row>> elementsPerCheckpoint =
         ImmutableList.of(
             ImmutableList.of(
@@ -321,111 +180,83 @@ public class TestFlinkIcebergSinkV2 {
         row -> row.getField(ROW_DATA_POS),
         false,
         elementsPerCheckpoint,
-        expectedRecords);
+        expectedRecords,
+        branch);
   }
 
-  @Test
-  public void testUpsertOnlyDeletesOnDataKey() throws Exception {
+  protected void testUpsertOnDataKey(String branch) throws Exception {
     List<List<Row>> elementsPerCheckpoint =
         ImmutableList.of(
-            ImmutableList.of(row("+I", 1, "aaa")),
-            ImmutableList.of(row("-D", 1, "aaa"), row("-D", 2, "bbb")));
+            ImmutableList.of(row("+I", 1, "aaa"), row("+I", 2, "aaa"), row("+I", 3, "bbb")),
+            ImmutableList.of(row("+U", 4, "aaa"), row("-U", 3, "bbb"), row("+U", 5, "bbb")),
+            ImmutableList.of(row("+I", 6, "aaa"), row("+U", 7, "bbb")));
 
     List<List<Record>> expectedRecords =
-        ImmutableList.of(ImmutableList.of(record(1, "aaa")), ImmutableList.of());
+        ImmutableList.of(
+            ImmutableList.of(record(2, "aaa"), record(3, "bbb")),
+            ImmutableList.of(record(4, "aaa"), record(5, "bbb")),
+            ImmutableList.of(record(6, "aaa"), record(7, "bbb")));
 
     testChangeLogs(
         ImmutableList.of("data"),
         row -> row.getField(ROW_DATA_POS),
         true,
         elementsPerCheckpoint,
-        expectedRecords);
+        expectedRecords,
+        branch);
   }
 
-  @Test
-  public void testChangeLogOnIdDataKey() throws Exception {
+  protected void testChangeLogOnIdKey(String branch) throws Exception {
     List<List<Row>> elementsPerCheckpoint =
         ImmutableList.of(
             ImmutableList.of(
                 row("+I", 1, "aaa"),
                 row("-D", 1, "aaa"),
-                row("+I", 2, "bbb"),
                 row("+I", 1, "bbb"),
-                row("+I", 2, "aaa")),
-            ImmutableList.of(row("-U", 2, "aaa"), row("+U", 1, "ccc"), row("+I", 1, "aaa")),
-            ImmutableList.of(row("-D", 1, "bbb"), row("+I", 2, "aaa")));
-
-    List<List<Record>> expectedRecords =
-        ImmutableList.of(
-            ImmutableList.of(record(1, "bbb"), record(2, "aaa"), record(2, "bbb")),
+                row("+I", 2, "aaa"),
+                row("-D", 2, "aaa"),
+                row("+I", 2, "bbb")),
             ImmutableList.of(
-                record(1, "aaa"), record(1, "bbb"), record(1, "ccc"), record(2, "bbb")),
+                row("-U", 2, "bbb"), row("+U", 2, "ccc"), row("-D", 2, "ccc"), row("+I", 2, "ddd")),
             ImmutableList.of(
-                record(1, "aaa"), record(1, "ccc"), record(2, "aaa"), record(2, "bbb")));
-
-    testChangeLogs(
-        ImmutableList.of("data", "id"),
-        row -> Row.of(row.getField(ROW_ID_POS), row.getField(ROW_DATA_POS)),
-        false,
-        elementsPerCheckpoint,
-        expectedRecords);
-  }
-
-  @Test
-  public void testChangeLogOnSameKey() throws Exception {
-    List<List<Row>> elementsPerCheckpoint =
-        ImmutableList.of(
-            // Checkpoint #1
-            ImmutableList.of(row("+I", 1, "aaa"), row("-D", 1, "aaa"), row("+I", 1, "aaa")),
-            // Checkpoint #2
-            ImmutableList.of(row("-U", 1, "aaa"), row("+U", 1, "aaa")),
-            // Checkpoint #3
-            ImmutableList.of(row("-D", 1, "aaa"), row("+I", 1, "aaa")),
-            // Checkpoint #4
-            ImmutableList.of(row("-U", 1, "aaa"), row("+U", 1, "aaa"), row("+I", 1, "aaa")));
+                row("-D", 1, "bbb"),
+                row("+I", 1, "ccc"),
+                row("-D", 1, "ccc"),
+                row("+I", 1, "ddd")));
 
     List<List<Record>> expectedRecords =
         ImmutableList.of(
-            ImmutableList.of(record(1, "aaa")),
-            ImmutableList.of(record(1, "aaa")),
-            ImmutableList.of(record(1, "aaa")),
-            ImmutableList.of(record(1, "aaa"), record(1, "aaa")));
-
-    testChangeLogs(
-        ImmutableList.of("id", "data"),
-        row -> Row.of(row.getField(ROW_ID_POS), row.getField(ROW_DATA_POS)),
-        false,
-        elementsPerCheckpoint,
-        expectedRecords);
-  }
+            ImmutableList.of(record(1, "bbb"), record(2, "bbb")),
+            ImmutableList.of(record(1, "bbb"), record(2, "ddd")),
+            ImmutableList.of(record(1, "ddd"), record(2, "ddd")));
 
-  @Test
-  public void testUpsertModeCheck() throws Exception {
-    DataStream<Row> dataStream =
-        env.addSource(new BoundedTestSource<>(ImmutableList.of()), ROW_TYPE_INFO);
-    FlinkSink.Builder builder =
-        FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA)
-            .tableLoader(tableLoader)
-            .tableSchema(SimpleDataUtil.FLINK_SCHEMA)
-            .writeParallelism(parallelism)
-            .upsert(true);
-
-    AssertHelpers.assertThrows(
-        "Should be error because upsert mode and overwrite mode enable at the same time.",
-        IllegalStateException.class,
-        "OVERWRITE mode shouldn't be enable",
-        () ->
-            builder.equalityFieldColumns(ImmutableList.of("id", "data")).overwrite(true).append());
-
-    AssertHelpers.assertThrows(
-        "Should be error because equality field columns are empty.",
-        IllegalStateException.class,
-        "Equality field columns shouldn't be empty",
-        () -> builder.equalityFieldColumns(ImmutableList.of()).overwrite(false).append());
+    if (partitioned && writeDistributionMode.equals(TableProperties.WRITE_DISTRIBUTION_MODE_HASH)) {
+      AssertHelpers.assertThrows(
+          "Should be error because equality field columns don't include all partition keys",
+          IllegalStateException.class,
+          "should be included in equality fields",
+          () -> {
+            testChangeLogs(
+                ImmutableList.of("id"),
+                row -> row.getField(ROW_ID_POS),
+                false,
+                elementsPerCheckpoint,
+                expectedRecords,
+                branch);
+            return null;
+          });
+    } else {
+      testChangeLogs(
+          ImmutableList.of("id"),
+          row -> row.getField(ROW_ID_POS),
+          false,
+          elementsPerCheckpoint,
+          expectedRecords,
+          branch);
+    }
   }
 
-  @Test
-  public void testUpsertOnIdKey() throws Exception {
+  protected void testUpsertOnIdKey(String branch) throws Exception {
     List<List<Row>> elementsPerCheckpoint =
         ImmutableList.of(
             ImmutableList.of(row("+I", 1, "aaa"), row("+U", 1, "bbb")),
@@ -444,7 +275,8 @@ public class TestFlinkIcebergSinkV2 {
           row -> row.getField(ROW_ID_POS),
           true,
           elementsPerCheckpoint,
-          expectedRecords);
+          expectedRecords,
+          branch);
     } else {
       AssertHelpers.assertThrows(
           "Should be error because equality field columns don't include all partition keys",
@@ -456,54 +288,65 @@ public class TestFlinkIcebergSinkV2 {
                 row -> row.getField(ROW_ID_POS),
                 true,
                 elementsPerCheckpoint,
-                expectedRecords);
+                expectedRecords,
+                branch);
             return null;
           });
     }
   }
 
-  @Test
-  public void testUpsertOnDataKey() throws Exception {
-    List<List<Row>> elementsPerCheckpoint =
-        ImmutableList.of(
-            ImmutableList.of(row("+I", 1, "aaa"), row("+I", 2, "aaa"), row("+I", 3, "bbb")),
-            ImmutableList.of(row("+U", 4, "aaa"), row("-U", 3, "bbb"), row("+U", 5, "bbb")),
-            ImmutableList.of(row("+I", 6, "aaa"), row("+U", 7, "bbb")));
+  protected void testChangeLogs(
+      List<String> equalityFieldColumns,
+      KeySelector<Row, Object> keySelector,
+      boolean insertAsUpsert,
+      List<List<Row>> elementsPerCheckpoint,
+      List<List<Record>> expectedRecordsPerCheckpoint,
+      String branch)
+      throws Exception {
+    DataStream<Row> dataStream =
+        env.addSource(new BoundedTestSource<>(elementsPerCheckpoint), ROW_TYPE_INFO);
 
-    List<List<Record>> expectedRecords =
-        ImmutableList.of(
-            ImmutableList.of(record(2, "aaa"), record(3, "bbb")),
-            ImmutableList.of(record(4, "aaa"), record(5, "bbb")),
-            ImmutableList.of(record(6, "aaa"), record(7, "bbb")));
+    FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA)
+        .tableLoader(tableLoader)
+        .tableSchema(SimpleDataUtil.FLINK_SCHEMA)
+        .writeParallelism(parallelism)
+        .equalityFieldColumns(equalityFieldColumns)
+        .upsert(insertAsUpsert)
+        .toBranch(branch)
+        .append();
 
-    testChangeLogs(
-        ImmutableList.of("data"),
-        row -> row.getField(ROW_DATA_POS),
-        true,
-        elementsPerCheckpoint,
-        expectedRecords);
-  }
+    // Execute the program.
+    env.execute("Test Iceberg Change-Log DataStream.");
 
-  @Test
-  public void testUpsertOnIdDataKey() throws Exception {
-    List<List<Row>> elementsPerCheckpoint =
-        ImmutableList.of(
-            ImmutableList.of(row("+I", 1, "aaa"), row("+U", 1, "aaa"), row("+I", 2, "bbb")),
-            ImmutableList.of(row("+I", 1, "aaa"), row("-D", 2, "bbb"), row("+I", 2, "ccc")),
-            ImmutableList.of(row("+U", 1, "bbb"), row("-U", 1, "ccc"), row("-D", 1, "aaa")));
+    table.refresh();
+    List<Snapshot> snapshots = findValidSnapshots();
+    int expectedSnapshotNum = expectedRecordsPerCheckpoint.size();
+    Assert.assertEquals(
+        "Should have the expected snapshot number", expectedSnapshotNum, snapshots.size());
 
-    List<List<Record>> expectedRecords =
-        ImmutableList.of(
-            ImmutableList.of(record(1, "aaa"), record(2, "bbb")),
-            ImmutableList.of(record(1, "aaa"), record(2, "ccc")),
-            ImmutableList.of(record(1, "bbb"), record(2, "ccc")));
+    for (int i = 0; i < expectedSnapshotNum; i++) {
+      long snapshotId = snapshots.get(i).snapshotId();
+      List<Record> expectedRecords = expectedRecordsPerCheckpoint.get(i);
+      Assert.assertEquals(
+          "Should have the expected records for the checkpoint#" + i,
+          expectedRowSet(expectedRecords.toArray(new Record[0])),
+          actualRowSet(snapshotId, "*"));
+    }
+  }
 
-    testChangeLogs(
-        ImmutableList.of("id", "data"),
-        row -> Row.of(row.getField(ROW_ID_POS), row.getField(ROW_DATA_POS)),
-        true,
-        elementsPerCheckpoint,
-        expectedRecords);
+  protected Record record(int id, String data) {
+    return SimpleDataUtil.createRecord(id, data);
+  }
+
+  private List<Snapshot> findValidSnapshots() {
+    List<Snapshot> validSnapshots = Lists.newArrayList();
+    for (Snapshot snapshot : table.snapshots()) {
+      if (snapshot.allManifests(table.io()).stream()
+          .anyMatch(m -> snapshot.snapshotId() == m.snapshotId())) {
+        validSnapshots.add(snapshot);
+      }
+    }
+    return validSnapshots;
   }
 
   private StructLikeSet expectedRowSet(Record... records) {
diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2Branch.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2Branch.java
new file mode 100644
index 0000000000..fed3338482
--- /dev/null
+++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2Branch.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink;
+
+import java.io.IOException;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.SnapshotRef;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.flink.HadoopCatalogResource;
+import org.apache.iceberg.flink.MiniClusterResource;
+import org.apache.iceberg.flink.SimpleDataUtil;
+import org.apache.iceberg.flink.TestFixtures;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class TestFlinkIcebergSinkV2Branch extends TestFlinkIcebergSinkV2Base {
+
+  @ClassRule
+  public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE =
+      MiniClusterResource.createWithClassloaderCheckDisabled();
+
+  @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
+
+  @Rule
+  public final HadoopCatalogResource catalogResource =
+      new HadoopCatalogResource(TEMPORARY_FOLDER, TestFixtures.DATABASE, TestFixtures.TABLE);
+
+  private final String branch;
+
+  @Parameterized.Parameters(name = "branch = {0}")
+  public static Object[] parameters() {
+    return new Object[] {"main", "testBranch"};
+  }
+
+  public TestFlinkIcebergSinkV2Branch(String branch) {
+    this.branch = branch;
+  }
+
+  @Before
+  public void before() throws IOException {
+    table =
+        catalogResource
+            .catalog()
+            .createTable(
+                TestFixtures.TABLE_IDENTIFIER,
+                SimpleDataUtil.SCHEMA,
+                PartitionSpec.unpartitioned(),
+                ImmutableMap.of(
+                    TableProperties.DEFAULT_FILE_FORMAT,
+                    FileFormat.AVRO.name(),
+                    TableProperties.FORMAT_VERSION,
+                    "2"));
+
+    env =
+        StreamExecutionEnvironment.getExecutionEnvironment(
+                MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG)
+            .enableCheckpointing(100);
+
+    tableLoader = catalogResource.tableLoader();
+  }
+
+  @Test
+  public void testChangeLogOnIdKey() throws Exception {
+    testChangeLogOnIdKey(branch);
+    verifyOtherBranchUnmodified();
+  }
+
+  @Test
+  public void testChangeLogOnDataKey() throws Exception {
+    testChangeLogOnDataKey(branch);
+    verifyOtherBranchUnmodified();
+  }
+
+  @Test
+  public void testChangeLogOnIdDataKey() throws Exception {
+    testChangeLogOnIdDataKey(branch);
+    verifyOtherBranchUnmodified();
+  }
+
+  @Test
+  public void testUpsertOnIdKey() throws Exception {
+    testUpsertOnIdKey(branch);
+    verifyOtherBranchUnmodified();
+  }
+
+  @Test
+  public void testUpsertOnDataKey() throws Exception {
+    testUpsertOnDataKey(branch);
+    verifyOtherBranchUnmodified();
+  }
+
+  @Test
+  public void testUpsertOnIdDataKey() throws Exception {
+    testUpsertOnIdDataKey(branch);
+    verifyOtherBranchUnmodified();
+  }
+
+  private void verifyOtherBranchUnmodified() {
+    String otherBranch =
+        branch.equals(SnapshotRef.MAIN_BRANCH) ? "test-branch" : SnapshotRef.MAIN_BRANCH;
+    if (otherBranch.equals(SnapshotRef.MAIN_BRANCH)) {
+      Assert.assertNull(table.currentSnapshot());
+    }
+
+    Assert.assertTrue(table.snapshot(otherBranch) == null);
+  }
+}
diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java
index 66baaeb0e9..a4f29d47f4 100644
--- a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java
+++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java
@@ -76,22 +76,24 @@ public class TestIcebergFilesCommitter extends TableTestBase {
   private File flinkManifestFolder;
 
   private final FileFormat format;
+  private final String branch;
 
-  @Parameterized.Parameters(name = "FileFormat = {0}, FormatVersion={1}")
+  @Parameterized.Parameters(name = "FileFormat = {0}, FormatVersion = {1}, branch = {2}")
   public static Object[][] parameters() {
     return new Object[][] {
-      new Object[] {"avro", 1},
-      new Object[] {"avro", 2},
-      new Object[] {"parquet", 1},
-      new Object[] {"parquet", 2},
-      new Object[] {"orc", 1},
-      new Object[] {"orc", 2}
+      new Object[] {"avro", 1, "main"},
+      new Object[] {"avro", 2, "test-branch"},
+      new Object[] {"parquet", 1, "main"},
+      new Object[] {"parquet", 2, "test-branch"},
+      new Object[] {"orc", 1, "main"},
+      new Object[] {"orc", 2, "test-branch"}
     };
   }
 
-  public TestIcebergFilesCommitter(String format, int formatVersion) {
+  public TestIcebergFilesCommitter(String format, int formatVersion, String branch) {
     super(formatVersion);
     this.format = FileFormat.fromString(format);
+    this.branch = branch;
   }
 
   @Override
@@ -125,7 +127,7 @@ public class TestIcebergFilesCommitter extends TableTestBase {
       harness.open();
       operatorId = harness.getOperator().getOperatorID();
 
-      SimpleDataUtil.assertTableRows(table, Lists.newArrayList());
+      SimpleDataUtil.assertTableRows(table, Lists.newArrayList(), branch);
       assertSnapshotSize(0);
       assertMaxCommittedCheckpointId(jobId, operatorId, -1L);
 
@@ -204,12 +206,12 @@ public class TestIcebergFilesCommitter extends TableTestBase {
         harness.notifyOfCompletedCheckpoint(i);
         assertFlinkManifests(0);
 
-        SimpleDataUtil.assertTableRows(table, ImmutableList.copyOf(rows));
+        SimpleDataUtil.assertTableRows(table, ImmutableList.copyOf(rows), branch);
         assertSnapshotSize(i);
         assertMaxCommittedCheckpointId(jobID, operatorId, i);
         Assert.assertEquals(
             TestIcebergFilesCommitter.class.getName(),
-            table.currentSnapshot().summary().get("flink.test"));
+            SimpleDataUtil.latestSnapshot(table, branch).summary().get("flink.test"));
       }
     }
   }
@@ -255,13 +257,13 @@ public class TestIcebergFilesCommitter extends TableTestBase {
 
       // 3. notifyCheckpointComplete for checkpoint#1
       harness.notifyOfCompletedCheckpoint(firstCheckpointId);
-      SimpleDataUtil.assertTableRows(table, ImmutableList.of(row1));
+      SimpleDataUtil.assertTableRows(table, ImmutableList.of(row1), branch);
       assertMaxCommittedCheckpointId(jobId, operatorId, firstCheckpointId);
       assertFlinkManifests(1);
 
       // 4. notifyCheckpointComplete for checkpoint#2
       harness.notifyOfCompletedCheckpoint(secondCheckpointId);
-      SimpleDataUtil.assertTableRows(table, ImmutableList.of(row1, row2));
+      SimpleDataUtil.assertTableRows(table, ImmutableList.of(row1, row2), branch);
       assertMaxCommittedCheckpointId(jobId, operatorId, secondCheckpointId);
       assertFlinkManifests(0);
     }
@@ -308,13 +310,13 @@ public class TestIcebergFilesCommitter extends TableTestBase {
 
       // 3. notifyCheckpointComplete for checkpoint#2
       harness.notifyOfCompletedCheckpoint(secondCheckpointId);
-      SimpleDataUtil.assertTableRows(table, ImmutableList.of(row1, row2));
+      SimpleDataUtil.assertTableRows(table, ImmutableList.of(row1, row2), branch);
       assertMaxCommittedCheckpointId(jobId, operatorId, secondCheckpointId);
       assertFlinkManifests(0);
 
       // 4. notifyCheckpointComplete for checkpoint#1
       harness.notifyOfCompletedCheckpoint(firstCheckpointId);
-      SimpleDataUtil.assertTableRows(table, ImmutableList.of(row1, row2));
+      SimpleDataUtil.assertTableRows(table, ImmutableList.of(row1, row2), branch);
       assertMaxCommittedCheckpointId(jobId, operatorId, secondCheckpointId);
       assertFlinkManifests(0);
     }
@@ -348,7 +350,7 @@ public class TestIcebergFilesCommitter extends TableTestBase {
       harness.notifyOfCompletedCheckpoint(checkpointId);
       assertFlinkManifests(0);
 
-      SimpleDataUtil.assertTableRows(table, ImmutableList.of(row));
+      SimpleDataUtil.assertTableRows(table, ImmutableList.of(row), branch);
       assertSnapshotSize(1);
       assertMaxCommittedCheckpointId(jobId, operatorId, checkpointId);
     }
@@ -360,7 +362,7 @@ public class TestIcebergFilesCommitter extends TableTestBase {
       harness.initializeState(snapshot);
       harness.open();
 
-      SimpleDataUtil.assertTableRows(table, expectedRows);
+      SimpleDataUtil.assertTableRows(table, expectedRows, branch);
       assertSnapshotSize(1);
       assertMaxCommittedCheckpointId(jobId, operatorId, checkpointId);
 
@@ -375,7 +377,7 @@ public class TestIcebergFilesCommitter extends TableTestBase {
       harness.notifyOfCompletedCheckpoint(checkpointId);
       assertFlinkManifests(0);
 
-      SimpleDataUtil.assertTableRows(table, expectedRows);
+      SimpleDataUtil.assertTableRows(table, expectedRows, branch);
       assertSnapshotSize(2);
       assertMaxCommittedCheckpointId(jobId, operatorId, checkpointId);
     }
@@ -406,7 +408,7 @@ public class TestIcebergFilesCommitter extends TableTestBase {
       harness.processElement(of(dataFile), ++timestamp);
 
       snapshot = harness.snapshot(++checkpointId, ++timestamp);
-      SimpleDataUtil.assertTableRows(table, ImmutableList.of());
+      SimpleDataUtil.assertTableRows(table, ImmutableList.of(), branch);
       assertMaxCommittedCheckpointId(jobId, operatorId, -1L);
       assertFlinkManifests(1);
     }
@@ -421,7 +423,7 @@ public class TestIcebergFilesCommitter extends TableTestBase {
       // transaction.
       assertFlinkManifests(0);
 
-      SimpleDataUtil.assertTableRows(table, expectedRows);
+      SimpleDataUtil.assertTableRows(table, expectedRows, branch);
       assertMaxCommittedCheckpointId(jobId, operatorId, checkpointId);
 
       harness.snapshot(++checkpointId, ++timestamp);
@@ -431,7 +433,7 @@ public class TestIcebergFilesCommitter extends TableTestBase {
       harness.notifyOfCompletedCheckpoint(checkpointId);
       assertFlinkManifests(0);
 
-      SimpleDataUtil.assertTableRows(table, expectedRows);
+      SimpleDataUtil.assertTableRows(table, expectedRows, branch);
       assertSnapshotSize(2);
       assertMaxCommittedCheckpointId(jobId, operatorId, checkpointId);
 
@@ -459,7 +461,7 @@ public class TestIcebergFilesCommitter extends TableTestBase {
 
       assertMaxCommittedCheckpointId(newJobId, operatorId, -1);
       assertMaxCommittedCheckpointId(jobId, operatorId, checkpointId);
-      SimpleDataUtil.assertTableRows(table, expectedRows);
+      SimpleDataUtil.assertTableRows(table, expectedRows, branch);
       assertSnapshotSize(3);
 
       RowData row = SimpleDataUtil.createRowData(3, "foo");
@@ -473,7 +475,7 @@ public class TestIcebergFilesCommitter extends TableTestBase {
       harness.notifyOfCompletedCheckpoint(checkpointId);
       assertFlinkManifests(0);
 
-      SimpleDataUtil.assertTableRows(table, expectedRows);
+      SimpleDataUtil.assertTableRows(table, expectedRows, branch);
       assertSnapshotSize(4);
       assertMaxCommittedCheckpointId(newJobId, operatorId, checkpointId);
     }
@@ -509,7 +511,7 @@ public class TestIcebergFilesCommitter extends TableTestBase {
         harness.notifyOfCompletedCheckpoint(checkpointId);
         assertFlinkManifests(0);
 
-        SimpleDataUtil.assertTableRows(table, tableRows);
+        SimpleDataUtil.assertTableRows(table, tableRows, branch);
         assertSnapshotSize(i);
         assertMaxCommittedCheckpointId(oldJobId, oldOperatorId, checkpointId);
       }
@@ -540,7 +542,7 @@ public class TestIcebergFilesCommitter extends TableTestBase {
 
       harness.notifyOfCompletedCheckpoint(checkpointId);
       assertFlinkManifests(0);
-      SimpleDataUtil.assertTableRows(table, tableRows);
+      SimpleDataUtil.assertTableRows(table, tableRows, branch);
       assertSnapshotSize(4);
       assertMaxCommittedCheckpointId(newJobId, newOperatorId, checkpointId);
     }
@@ -577,7 +579,7 @@ public class TestIcebergFilesCommitter extends TableTestBase {
 
         harness.notifyOfCompletedCheckpoint(checkpointId + 1);
         assertFlinkManifests(0);
-        SimpleDataUtil.assertTableRows(table, tableRows);
+        SimpleDataUtil.assertTableRows(table, tableRows, branch);
         assertSnapshotSize(i + 1);
         assertMaxCommittedCheckpointId(jobId, operatorId, checkpointId + 1);
       }
@@ -628,7 +630,7 @@ public class TestIcebergFilesCommitter extends TableTestBase {
       assertFlinkManifests(1);
 
       // Only the first row is committed at this point
-      SimpleDataUtil.assertTableRows(table, ImmutableList.of(row1));
+      SimpleDataUtil.assertTableRows(table, ImmutableList.of(row1), branch);
       assertSnapshotSize(1);
       assertMaxCommittedCheckpointId(jobId, operatorId1, checkpointId);
       assertMaxCommittedCheckpointId(jobId, operatorId2, -1);
@@ -651,7 +653,7 @@ public class TestIcebergFilesCommitter extends TableTestBase {
       // transaction.
       assertFlinkManifests(0);
 
-      SimpleDataUtil.assertTableRows(table, expectedRows);
+      SimpleDataUtil.assertTableRows(table, expectedRows, branch);
       assertSnapshotSize(2);
       assertMaxCommittedCheckpointId(jobId, operatorId1, checkpointId);
       assertMaxCommittedCheckpointId(jobId, operatorId2, checkpointId);
@@ -675,7 +677,7 @@ public class TestIcebergFilesCommitter extends TableTestBase {
       harness2.notifyOfCompletedCheckpoint(checkpointId);
       assertFlinkManifests(0);
 
-      SimpleDataUtil.assertTableRows(table, expectedRows);
+      SimpleDataUtil.assertTableRows(table, expectedRows, branch);
       assertSnapshotSize(4);
       assertMaxCommittedCheckpointId(jobId, operatorId1, checkpointId);
       assertMaxCommittedCheckpointId(jobId, operatorId2, checkpointId);
@@ -702,12 +704,12 @@ public class TestIcebergFilesCommitter extends TableTestBase {
       ((BoundedOneInput) harness.getOneInputOperator()).endInput();
 
       assertFlinkManifests(0);
-      SimpleDataUtil.assertTableRows(table, tableRows);
+      SimpleDataUtil.assertTableRows(table, tableRows, branch);
       assertSnapshotSize(1);
       assertMaxCommittedCheckpointId(jobId, operatorId, Long.MAX_VALUE);
       Assert.assertEquals(
           TestIcebergFilesCommitter.class.getName(),
-          table.currentSnapshot().summary().get("flink.test"));
+          SimpleDataUtil.latestSnapshot(table, branch).summary().get("flink.test"));
     }
   }
 
@@ -748,7 +750,7 @@ public class TestIcebergFilesCommitter extends TableTestBase {
 
       // 3. notifyCheckpointComplete for checkpoint#1
       harness.notifyOfCompletedCheckpoint(checkpoint);
-      SimpleDataUtil.assertTableRows(table, ImmutableList.of(row1));
+      SimpleDataUtil.assertTableRows(table, ImmutableList.of(row1), branch);
       assertMaxCommittedCheckpointId(jobId, operatorId, checkpoint);
       assertFlinkManifests(0);
     }
@@ -794,7 +796,7 @@ public class TestIcebergFilesCommitter extends TableTestBase {
 
       // 3. notifyCheckpointComplete for checkpoint#1
       harness.notifyOfCompletedCheckpoint(checkpoint);
-      SimpleDataUtil.assertTableRows(table, ImmutableList.of(row1));
+      SimpleDataUtil.assertTableRows(table, ImmutableList.of(row1), branch);
       assertMaxCommittedCheckpointId(jobId, operatorId, checkpoint);
       assertFlinkManifests(0);
 
@@ -816,7 +818,7 @@ public class TestIcebergFilesCommitter extends TableTestBase {
 
       // 6. notifyCheckpointComplete for checkpoint#2
       harness.notifyOfCompletedCheckpoint(checkpoint);
-      SimpleDataUtil.assertTableRows(table, ImmutableList.of(row2));
+      SimpleDataUtil.assertTableRows(table, ImmutableList.of(row2), branch);
       assertMaxCommittedCheckpointId(jobId, operatorId, checkpoint);
       assertFlinkManifests(0);
     }
@@ -867,7 +869,7 @@ public class TestIcebergFilesCommitter extends TableTestBase {
 
       // Notify the 2nd snapshot to complete.
       harness.notifyOfCompletedCheckpoint(checkpoint);
-      SimpleDataUtil.assertTableRows(table, ImmutableList.of(insert1, insert4));
+      SimpleDataUtil.assertTableRows(table, ImmutableList.of(insert1, insert4), branch);
       assertMaxCommittedCheckpointId(jobId, operatorId, checkpoint);
       assertFlinkManifests(0);
       Assert.assertEquals(
@@ -951,7 +953,7 @@ public class TestIcebergFilesCommitter extends TableTestBase {
     table.refresh();
     long actualId =
         IcebergFilesCommitter.getMaxCommittedCheckpointId(
-            table, jobID.toString(), operatorID.toHexString());
+            table, jobID.toString(), operatorID.toHexString(), branch);
     Assert.assertEquals(expectedId, actualId);
   }
 
@@ -962,7 +964,7 @@ public class TestIcebergFilesCommitter extends TableTestBase {
 
   private OneInputStreamOperatorTestHarness<WriteResult, Void> createStreamSink(JobID jobID)
       throws Exception {
-    TestOperatorFactory factory = TestOperatorFactory.of(table.location());
+    TestOperatorFactory factory = TestOperatorFactory.of(table.location(), branch);
     return new OneInputStreamOperatorTestHarness<>(factory, createEnvironment(jobID));
   }
 
@@ -982,13 +984,15 @@ public class TestIcebergFilesCommitter extends TableTestBase {
   private static class TestOperatorFactory extends AbstractStreamOperatorFactory<Void>
       implements OneInputStreamOperatorFactory<WriteResult, Void> {
     private final String tablePath;
+    private final String branch;
 
-    private TestOperatorFactory(String tablePath) {
+    private TestOperatorFactory(String tablePath, String branch) {
       this.tablePath = tablePath;
+      this.branch = branch;
     }
 
-    private static TestOperatorFactory of(String tablePath) {
-      return new TestOperatorFactory(tablePath);
+    private static TestOperatorFactory of(String tablePath, String branch) {
+      return new TestOperatorFactory(tablePath, branch);
     }
 
     @Override
@@ -1000,7 +1004,8 @@ public class TestIcebergFilesCommitter extends TableTestBase {
               new TestTableLoader(tablePath),
               false,
               Collections.singletonMap("flink.test", TestIcebergFilesCommitter.class.getName()),
-              ThreadPools.WORKER_THREAD_POOL_SIZE);
+              ThreadPools.WORKER_THREAD_POOL_SIZE,
+              branch);
       committer.setup(param.getContainingTask(), param.getStreamConfig(), param.getOutput());
       return (T) committer;
     }