You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by "amogh-jahagirdar (via GitHub)" <gi...@apache.org> on 2023/02/08 15:23:03 UTC

[GitHub] [iceberg] amogh-jahagirdar opened a new pull request, #6660: Flink: Support writes to branches in FlinkSink

amogh-jahagirdar opened a new pull request, #6660:
URL: https://github.com/apache/iceberg/pull/6660

   This change adds support to write to branches in Flink Sink via a FlinkWriteOption "branch"


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] amogh-jahagirdar commented on a diff in pull request #6660: Flink: Support writes to branches in FlinkSink

Posted by "amogh-jahagirdar (via GitHub)" <gi...@apache.org>.
amogh-jahagirdar commented on code in PR #6660:
URL: https://github.com/apache/iceberg/pull/6660#discussion_r1099553716


##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkBranch.java:
##########
@@ -0,0 +1,398 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.types.Row;
+import org.apache.iceberg.DistributionMode;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.SnapshotRef;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.flink.HadoopCatalogResource;
+import org.apache.iceberg.flink.MiniClusterResource;
+import org.apache.iceberg.flink.SimpleDataUtil;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.flink.TestFixtures;
+import org.apache.iceberg.flink.source.BoundedTestSource;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.Types;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class TestFlinkIcebergSinkBranch extends TestFlinkIcebergSinkBase {
+
+  @ClassRule
+  public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE =
+      MiniClusterResource.createWithClassloaderCheckDisabled();
+
+  @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
+
+  @Rule
+  public final HadoopCatalogResource catalogResource =
+      new HadoopCatalogResource(TEMPORARY_FOLDER, TestFixtures.DATABASE, TestFixtures.TABLE);
+
+  private TableLoader tableLoader;
+  private Table table;
+  private StreamExecutionEnvironment env;
+  private final String branch;
+  private final int formatVersion;
+
+  @Parameterized.Parameters(name = "formatVersion = {0}, branch = {1}")
+  public static Object[][] parameters() {
+    return new Object[][] {
+      {1, "main"},
+      {1, "test-branch"},
+      {2, "main"},
+      {2, "test-branch"},
+    };
+  }
+
+  public TestFlinkIcebergSinkBranch(int formatVersion, String branch) {
+    this.formatVersion = formatVersion;
+    this.branch = branch;
+  }
+
+  @Before
+  public void before() throws IOException {
+    table =
+        catalogResource
+            .catalog()
+            .createTable(
+                TestFixtures.TABLE_IDENTIFIER,
+                SimpleDataUtil.SCHEMA,
+                PartitionSpec.unpartitioned(),
+                ImmutableMap.of(
+                    TableProperties.DEFAULT_FILE_FORMAT,
+                    FileFormat.AVRO.name(),
+                    TableProperties.FORMAT_VERSION,
+                    String.valueOf(formatVersion)));
+
+    env =
+        StreamExecutionEnvironment.getExecutionEnvironment(
+                MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG)
+            .enableCheckpointing(100);
+
+    tableLoader = catalogResource.tableLoader();
+  }
+
+  @Test
+  public void testWriteRow() throws Exception {
+    testWriteRow(null, DistributionMode.NONE);
+  }
+
+  @Test
+  public void testWriteRowWithTableSchema() throws Exception {

Review Comment:
   You're right, as far as I can tell there's no point to testing null schema or not for branches. For the branch test i updated just to test with a schema



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a diff in pull request #6660: Flink: Support writes to branches in FlinkSink

Posted by "stevenzwu (via GitHub)" <gi...@apache.org>.
stevenzwu commented on code in PR #6660:
URL: https://github.com/apache/iceberg/pull/6660#discussion_r1089604601


##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java:
##########
@@ -85,28 +86,29 @@ public class TestFlinkIcebergSink {
   private final int parallelism;
   private final boolean partitioned;
 
-  @Parameterized.Parameters(name = "format={0}, parallelism = {1}, partitioned = {2}")
+  @Parameterized.Parameters(name = "format={0}, parallelism = {1}, partitioned = {2}, branch = {3}")

Review Comment:
   here are some stats based on one local run (a powerful M1 macbook). CI machines tend to slower due to load.
   * TestFlinkIcebergSink: 36s
   * TestFlinkIcebergSinkV2: 77s
   * TestIcebergFilesCommitter: 19s
   
   If we double it, the total build time for those 3 classes will increase from 132s (2m) to 264s (4m). If we add another dimension/multiplication after this, it will increase from 4+ mins to 8-9 mins.
   
   The expo growth was my concern.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] amogh-jahagirdar commented on a diff in pull request #6660: Flink: Support writes to branches in FlinkSink

Posted by "amogh-jahagirdar (via GitHub)" <gi...@apache.org>.
amogh-jahagirdar commented on code in PR #6660:
URL: https://github.com/apache/iceberg/pull/6660#discussion_r1089737315


##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java:
##########
@@ -85,28 +86,29 @@ public class TestFlinkIcebergSink {
   private final int parallelism;
   private final boolean partitioned;
 
-  @Parameterized.Parameters(name = "format={0}, parallelism = {1}, partitioned = {2}")
+  @Parameterized.Parameters(name = "format={0}, parallelism = {1}, partitioned = {2}, branch = {3}")

Review Comment:
   Sure, yeah for branch tests the total time increase would be 2x. In that case, I will keep the current implementation which removes the extra dimension and separates the branch specific tests. let me know your thoughts @stevenzwu !



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hililiwei commented on pull request #6660: Flink: Support writes to branches in FlinkSink

Posted by "hililiwei (via GitHub)" <gi...@apache.org>.
hililiwei commented on PR #6660:
URL: https://github.com/apache/iceberg/pull/6660#issuecomment-1408484697

   > > Thanks for the PR @amogh-jahagirdar! Left one small question.
   > > Also, do we have this feature in FlinkSource?
   > 
   > Thanks for the review @pvary ! Not yet, but I'm working on a PR for it, will publish when it's ready .
   
   A long time ago, I raised a related #5029 to do this, if you can, can you take a look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] amogh-jahagirdar commented on a diff in pull request #6660: Flink: Support writes to branches in FlinkSink

Posted by "amogh-jahagirdar (via GitHub)" <gi...@apache.org>.
amogh-jahagirdar commented on code in PR #6660:
URL: https://github.com/apache/iceberg/pull/6660#discussion_r1087244952


##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java:
##########
@@ -284,10 +303,23 @@ public static void assertTableRecords(Table table, List<Record> expected) throws
     }
   }
 
+  public static Snapshot latestSnapshot(Table table, String branch) {

Review Comment:
   Technically `table.snapshot(branch)` would also yield the same results but refer to @rdblue [comment](https://github.com/apache/iceberg/pull/6651/files#r1085892442) on the Spark write PR.
   
   For the main branch we still want to validate the behavior of specifically currentSnapshot() is maintained since that was the original check done here. This will effectively ensure that the currentSnapshotId in metadata is maintained correctly as the main branch gets written to. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] amogh-jahagirdar commented on a diff in pull request #6660: Flink: Support writes to branches in FlinkSink

Posted by "amogh-jahagirdar (via GitHub)" <gi...@apache.org>.
amogh-jahagirdar commented on code in PR #6660:
URL: https://github.com/apache/iceberg/pull/6660#discussion_r1089580257


##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java:
##########
@@ -85,28 +86,29 @@ public class TestFlinkIcebergSink {
   private final int parallelism;
   private final boolean partitioned;
 
-  @Parameterized.Parameters(name = "format={0}, parallelism = {1}, partitioned = {2}")
+  @Parameterized.Parameters(name = "format={0}, parallelism = {1}, partitioned = {2}, branch = {3}")

Review Comment:
   @stevenzwu @jackye1995  I added this separate class and moved common logic between the sink/sinkv2 tests to TestFlinkIcebergSinkBase. After implementing, now I'm not sure if it's worth refactoring the code. Even though with the previous approach there's 2x the combinations which was unnecessary, it was a less intrusive to the test code and easier to read. Also running locally with 2x the combinations was still quite quick. Let me know your thoughts on what you prefer! 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] amogh-jahagirdar commented on a diff in pull request #6660: Flink: Support writes to branches in FlinkSink

Posted by "amogh-jahagirdar (via GitHub)" <gi...@apache.org>.
amogh-jahagirdar commented on code in PR #6660:
URL: https://github.com/apache/iceberg/pull/6660#discussion_r1099553716


##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkBranch.java:
##########
@@ -0,0 +1,398 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.types.Row;
+import org.apache.iceberg.DistributionMode;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.SnapshotRef;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.flink.HadoopCatalogResource;
+import org.apache.iceberg.flink.MiniClusterResource;
+import org.apache.iceberg.flink.SimpleDataUtil;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.flink.TestFixtures;
+import org.apache.iceberg.flink.source.BoundedTestSource;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.Types;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class TestFlinkIcebergSinkBranch extends TestFlinkIcebergSinkBase {
+
+  @ClassRule
+  public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE =
+      MiniClusterResource.createWithClassloaderCheckDisabled();
+
+  @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
+
+  @Rule
+  public final HadoopCatalogResource catalogResource =
+      new HadoopCatalogResource(TEMPORARY_FOLDER, TestFixtures.DATABASE, TestFixtures.TABLE);
+
+  private TableLoader tableLoader;
+  private Table table;
+  private StreamExecutionEnvironment env;
+  private final String branch;
+  private final int formatVersion;
+
+  @Parameterized.Parameters(name = "formatVersion = {0}, branch = {1}")
+  public static Object[][] parameters() {
+    return new Object[][] {
+      {1, "main"},
+      {1, "test-branch"},
+      {2, "main"},
+      {2, "test-branch"},
+    };
+  }
+
+  public TestFlinkIcebergSinkBranch(int formatVersion, String branch) {
+    this.formatVersion = formatVersion;
+    this.branch = branch;
+  }
+
+  @Before
+  public void before() throws IOException {
+    table =
+        catalogResource
+            .catalog()
+            .createTable(
+                TestFixtures.TABLE_IDENTIFIER,
+                SimpleDataUtil.SCHEMA,
+                PartitionSpec.unpartitioned(),
+                ImmutableMap.of(
+                    TableProperties.DEFAULT_FILE_FORMAT,
+                    FileFormat.AVRO.name(),
+                    TableProperties.FORMAT_VERSION,
+                    String.valueOf(formatVersion)));
+
+    env =
+        StreamExecutionEnvironment.getExecutionEnvironment(
+                MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG)
+            .enableCheckpointing(100);
+
+    tableLoader = catalogResource.tableLoader();
+  }
+
+  @Test
+  public void testWriteRow() throws Exception {
+    testWriteRow(null, DistributionMode.NONE);
+  }
+
+  @Test
+  public void testWriteRowWithTableSchema() throws Exception {

Review Comment:
   You're right, as far as I can tell there's no point to testing null schema or not for branches.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] pvary commented on a diff in pull request #6660: Flink: Support writes to branches in FlinkSink

Posted by "pvary (via GitHub)" <gi...@apache.org>.
pvary commented on code in PR #6660:
URL: https://github.com/apache/iceberg/pull/6660#discussion_r1087469498


##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java:
##########
@@ -316,6 +316,11 @@ public Builder setSnapshotProperty(String property, String value) {
       return this;
     }
 
+    public Builder toBranch(String branch) {

Review Comment:
   I am not superconfident in this, but `branch` feels more natural to me. We do not use `toTable`, why **to**Branch?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 commented on pull request #6660: Flink: Support writes to branches in FlinkSink

Posted by "jackye1995 (via GitHub)" <gi...@apache.org>.
jackye1995 commented on PR #6660:
URL: https://github.com/apache/iceberg/pull/6660#issuecomment-1404275883

   > As I implemented this though started to think we may just want to have a direct branch method on the FlinkSink builder itself. That seems more intuitive from an API perspective and is just easier to use.
   
   I think this is more of a question about convention, I will leave to people who use Flink more often to have an opinion. To me use `toBranch` definitely feels cleaner.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] amogh-jahagirdar commented on pull request #6660: Flink: Support writes to branches in FlinkSink

Posted by "amogh-jahagirdar (via GitHub)" <gi...@apache.org>.
amogh-jahagirdar commented on PR #6660:
URL: https://github.com/apache/iceberg/pull/6660#issuecomment-1404381644

   Thanks for the reviews @stevenzwu @jackye1995 ! 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] amogh-jahagirdar commented on pull request #6660: Flink: Support writes to branches in FlinkSink

Posted by "amogh-jahagirdar (via GitHub)" <gi...@apache.org>.
amogh-jahagirdar commented on PR #6660:
URL: https://github.com/apache/iceberg/pull/6660#issuecomment-1403956550

   Need to fix up the tests also 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a diff in pull request #6660: Flink: Support writes to branches in FlinkSink

Posted by "stevenzwu (via GitHub)" <gi...@apache.org>.
stevenzwu commented on code in PR #6660:
URL: https://github.com/apache/iceberg/pull/6660#discussion_r1087025072


##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java:
##########
@@ -471,8 +476,9 @@ private static ListStateDescriptor<SortedMap<Long, byte[]>> buildStateDescriptor
     return new ListStateDescriptor<>("iceberg-files-committer-state", sortedMapTypeInfo);
   }
 
-  static long getMaxCommittedCheckpointId(Table table, String flinkJobId, String operatorId) {
-    Snapshot snapshot = table.currentSnapshot();
+  static long getMaxCommittedCheckpointId(
+      Table table, String flinkJobId, String operatorId, String branch) {
+    Snapshot snapshot = table.snapshot(branch);

Review Comment:
   the fetched snapshot doesn't seem being used.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] pvary commented on a diff in pull request #6660: Flink: Support writes to branches in FlinkSink

Posted by "pvary (via GitHub)" <gi...@apache.org>.
pvary commented on code in PR #6660:
URL: https://github.com/apache/iceberg/pull/6660#discussion_r1088627154


##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java:
##########
@@ -316,6 +316,11 @@ public Builder setSnapshotProperty(String property, String value) {
       return this;
     }
 
+    public Builder toBranch(String branch) {

Review Comment:
   Thanks for the quick response +1



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] pvary commented on pull request #6660: Flink: Support writes to branches in FlinkSink

Posted by "pvary (via GitHub)" <gi...@apache.org>.
pvary commented on PR #6660:
URL: https://github.com/apache/iceberg/pull/6660#issuecomment-1404604496

   Thanks for the PR @amogh-jahagirdar!
   Left one small question.
   
   Also, do we have this feature in FlinkSource?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a diff in pull request #6660: Flink: Support writes to branches in FlinkSink

Posted by "stevenzwu (via GitHub)" <gi...@apache.org>.
stevenzwu commented on code in PR #6660:
URL: https://github.com/apache/iceberg/pull/6660#discussion_r1089604601


##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java:
##########
@@ -85,28 +86,29 @@ public class TestFlinkIcebergSink {
   private final int parallelism;
   private final boolean partitioned;
 
-  @Parameterized.Parameters(name = "format={0}, parallelism = {1}, partitioned = {2}")
+  @Parameterized.Parameters(name = "format={0}, parallelism = {1}, partitioned = {2}, branch = {3}")

Review Comment:
   here are some stats from one local run (a powerful M1 macbook) 
   * TestFlinkIcebergSink: 36s
   * TestFlinkIcebergSinkV2: 77s
   * TestIcebergFilesCommitter: 19s
   
   If we double it, the total build time for those 3 classes will increase from 132s (2m) to 264s (4m). If we add another dimension/multiplication after this, it will increase from 4+ mins to 8-9 mins.
   
   The expo growth was my concern.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] amogh-jahagirdar commented on a diff in pull request #6660: Flink: Support writes to branches in FlinkSink

Posted by "amogh-jahagirdar (via GitHub)" <gi...@apache.org>.
amogh-jahagirdar commented on code in PR #6660:
URL: https://github.com/apache/iceberg/pull/6660#discussion_r1089525018


##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java:
##########
@@ -60,7 +54,7 @@
 import org.junit.runners.Parameterized;
 
 @RunWith(Parameterized.class)
-public class TestFlinkIcebergSinkV2 {
+public class TestFlinkIcebergSinkV2 extends TestFlinkIcebergSinkBase {

Review Comment:
   Need to move over the v2 cases to the branch specific test class to validate that row level operations in Flink work as expected on branches



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a diff in pull request #6660: Flink: Support writes to branches in FlinkSink

Posted by "stevenzwu (via GitHub)" <gi...@apache.org>.
stevenzwu commented on code in PR #6660:
URL: https://github.com/apache/iceberg/pull/6660#discussion_r1088348291


##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java:
##########
@@ -316,6 +316,11 @@ public Builder setSnapshotProperty(String property, String value) {
       return this;
     }
 
+    public Builder toBranch(String branch) {

Review Comment:
   I share the same take as @amogh-jahagirdar. but I am not strongly opinionated here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a diff in pull request #6660: Flink: Support writes to branches in FlinkSink

Posted by "stevenzwu (via GitHub)" <gi...@apache.org>.
stevenzwu commented on code in PR #6660:
URL: https://github.com/apache/iceberg/pull/6660#discussion_r1088427398


##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestTaskWriters.java:
##########
@@ -53,16 +53,17 @@ public class TestTaskWriters {
   private static final long TARGET_FILE_SIZE = 128 * 1024 * 1024;
 
   @Rule public final TemporaryFolder tempFolder = new TemporaryFolder();
+  private final String branch;
 
   @Parameterized.Parameters(name = "format = {0}, partitioned = {1}")
   public static Object[][] parameters() {
     return new Object[][] {
-      {"avro", true},
-      {"avro", false},
-      {"orc", true},
-      {"orc", false},
-      {"parquet", true},
-      {"parquet", false}
+      {"avro", true, "main"},

Review Comment:
   we probably shouldn't need to add the branch parameter here. this class is mainly for testing Flink TaskWriter. branching or not is not really important.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 commented on pull request #6660: Flink: Support writes to branches in FlinkSink

Posted by "jackye1995 (via GitHub)" <gi...@apache.org>.
jackye1995 commented on PR #6660:
URL: https://github.com/apache/iceberg/pull/6660#issuecomment-1409461416

   > A long time ago, I raised a related https://github.com/apache/iceberg/pull/5029 to do this, if you can, can you take a look?
   
   Thanks, marking that as a part of the milestone!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] amogh-jahagirdar closed pull request #6660: Flink: Support writes to branches in FlinkSink

Posted by "amogh-jahagirdar (via GitHub)" <gi...@apache.org>.
amogh-jahagirdar closed pull request #6660: Flink: Support writes to branches in FlinkSink
URL: https://github.com/apache/iceberg/pull/6660


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] amogh-jahagirdar commented on a diff in pull request #6660: Flink: Support writes to branches in FlinkSink

Posted by "amogh-jahagirdar (via GitHub)" <gi...@apache.org>.
amogh-jahagirdar commented on code in PR #6660:
URL: https://github.com/apache/iceberg/pull/6660#discussion_r1100543216


##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkBranch.java:
##########
@@ -0,0 +1,398 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.types.Row;
+import org.apache.iceberg.DistributionMode;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.SnapshotRef;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.flink.HadoopCatalogResource;
+import org.apache.iceberg.flink.MiniClusterResource;
+import org.apache.iceberg.flink.SimpleDataUtil;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.flink.TestFixtures;
+import org.apache.iceberg.flink.source.BoundedTestSource;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.Types;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class TestFlinkIcebergSinkBranch extends TestFlinkIcebergSinkBase {
+
+  @ClassRule
+  public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE =
+      MiniClusterResource.createWithClassloaderCheckDisabled();
+
+  @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
+
+  @Rule
+  public final HadoopCatalogResource catalogResource =
+      new HadoopCatalogResource(TEMPORARY_FOLDER, TestFixtures.DATABASE, TestFixtures.TABLE);
+
+  private TableLoader tableLoader;
+  private Table table;
+  private StreamExecutionEnvironment env;
+  private final String branch;
+  private final int formatVersion;
+
+  @Parameterized.Parameters(name = "formatVersion = {0}, branch = {1}")
+  public static Object[][] parameters() {
+    return new Object[][] {
+      {1, "main"},
+      {1, "test-branch"},
+      {2, "main"},
+      {2, "test-branch"},
+    };
+  }
+
+  public TestFlinkIcebergSinkBranch(int formatVersion, String branch) {
+    this.formatVersion = formatVersion;
+    this.branch = branch;
+  }
+
+  @Before
+  public void before() throws IOException {
+    table =
+        catalogResource
+            .catalog()
+            .createTable(
+                TestFixtures.TABLE_IDENTIFIER,
+                SimpleDataUtil.SCHEMA,
+                PartitionSpec.unpartitioned(),
+                ImmutableMap.of(
+                    TableProperties.DEFAULT_FILE_FORMAT,
+                    FileFormat.AVRO.name(),
+                    TableProperties.FORMAT_VERSION,
+                    String.valueOf(formatVersion)));
+
+    env =
+        StreamExecutionEnvironment.getExecutionEnvironment(
+                MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG)
+            .enableCheckpointing(100);
+
+    tableLoader = catalogResource.tableLoader();
+  }
+
+  @Test
+  public void testWriteRow() throws Exception {
+    testWriteRow(null, DistributionMode.NONE);
+  }
+
+  @Test
+  public void testWriteRowWithTableSchema() throws Exception {
+    testWriteRow(SimpleDataUtil.FLINK_SCHEMA, DistributionMode.NONE);
+    verifyOtherBranchUnmodified();
+  }
+
+  private void testWriteRow(TableSchema tableSchema, DistributionMode distributionMode)
+      throws Exception {
+    List<Row> rows = createRows("");
+    DataStream<Row> dataStream = env.addSource(createBoundedSource(rows), ROW_TYPE_INFO);
+
+    FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA)
+        .table(table)
+        .tableLoader(tableLoader)
+        .tableSchema(tableSchema)
+        .toBranch(branch)
+        .distributionMode(distributionMode)
+        .append();
+
+    // Execute the program.
+    env.execute("Test Iceberg DataStream.");
+
+    SimpleDataUtil.assertTableRows(table, convertToRowData(rows), branch);
+    SimpleDataUtil.assertTableRows(
+        table,
+        ImmutableList.of(),
+        branch.equals(SnapshotRef.MAIN_BRANCH) ? "test-branch" : SnapshotRef.MAIN_BRANCH);
+
+    verifyOtherBranchUnmodified();
+  }
+
+  @Test
+  public void testCheckAndGetEqualityFieldIds() {
+    Assume.assumeTrue(formatVersion == 2);
+    table
+        .updateSchema()
+        .allowIncompatibleChanges()
+        .addRequiredColumn("type", Types.StringType.get())
+        .setIdentifierFields("type")
+        .commit();
+
+    DataStream<Row> dataStream =
+        env.addSource(new BoundedTestSource<>(ImmutableList.of()), ROW_TYPE_INFO);
+    FlinkSink.Builder builder =
+        FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA).table(table);
+
+    // Use schema identifier field IDs as equality field id list by default
+    Assert.assertEquals(
+        table.schema().identifierFieldIds(),
+        Sets.newHashSet(builder.checkAndGetEqualityFieldIds()));
+
+    // Use user-provided equality field column as equality field id list
+    builder.equalityFieldColumns(Lists.newArrayList("id"));
+    Assert.assertEquals(
+        Sets.newHashSet(table.schema().findField("id").fieldId()),
+        Sets.newHashSet(builder.checkAndGetEqualityFieldIds()));
+
+    builder.equalityFieldColumns(Lists.newArrayList("type"));
+    Assert.assertEquals(
+        Sets.newHashSet(table.schema().findField("type").fieldId()),
+        Sets.newHashSet(builder.checkAndGetEqualityFieldIds()));
+
+    verifyOtherBranchUnmodified();
+  }
+
+  @Test
+  public void testChangeLogOnIdKey() throws Exception {
+    Assume.assumeTrue(formatVersion == 2);
+    List<List<Row>> elementsPerCheckpoint =

Review Comment:
   Good point, I did some more refactoring to further reduce the duplication. The common test cases with v2 will be shared (so the row data and the append will be the same) but we'll pass in a branch. For the non branch test cases we pass in just main, but parameterize based on all the existing properties (like file format, partitioning etc). For the branch ones we'll pass in just the branches with a fixed file format, partitioning. In this way we can reduce the duplication and get the benefit of not exponentially increasing the number of tests.



##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkBranch.java:
##########
@@ -0,0 +1,398 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.types.Row;
+import org.apache.iceberg.DistributionMode;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.SnapshotRef;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.flink.HadoopCatalogResource;
+import org.apache.iceberg.flink.MiniClusterResource;
+import org.apache.iceberg.flink.SimpleDataUtil;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.flink.TestFixtures;
+import org.apache.iceberg.flink.source.BoundedTestSource;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.Types;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class TestFlinkIcebergSinkBranch extends TestFlinkIcebergSinkBase {
+
+  @ClassRule
+  public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE =
+      MiniClusterResource.createWithClassloaderCheckDisabled();
+
+  @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
+
+  @Rule
+  public final HadoopCatalogResource catalogResource =
+      new HadoopCatalogResource(TEMPORARY_FOLDER, TestFixtures.DATABASE, TestFixtures.TABLE);
+
+  private TableLoader tableLoader;
+  private Table table;
+  private StreamExecutionEnvironment env;
+  private final String branch;
+  private final int formatVersion;
+
+  @Parameterized.Parameters(name = "formatVersion = {0}, branch = {1}")
+  public static Object[][] parameters() {
+    return new Object[][] {
+      {1, "main"},
+      {1, "test-branch"},
+      {2, "main"},
+      {2, "test-branch"},
+    };
+  }
+
+  public TestFlinkIcebergSinkBranch(int formatVersion, String branch) {
+    this.formatVersion = formatVersion;
+    this.branch = branch;
+  }
+
+  @Before
+  public void before() throws IOException {
+    table =
+        catalogResource
+            .catalog()
+            .createTable(
+                TestFixtures.TABLE_IDENTIFIER,
+                SimpleDataUtil.SCHEMA,
+                PartitionSpec.unpartitioned(),
+                ImmutableMap.of(
+                    TableProperties.DEFAULT_FILE_FORMAT,
+                    FileFormat.AVRO.name(),
+                    TableProperties.FORMAT_VERSION,
+                    String.valueOf(formatVersion)));
+
+    env =
+        StreamExecutionEnvironment.getExecutionEnvironment(
+                MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG)
+            .enableCheckpointing(100);
+
+    tableLoader = catalogResource.tableLoader();
+  }
+
+  @Test
+  public void testWriteRow() throws Exception {
+    testWriteRow(null, DistributionMode.NONE);
+  }
+
+  @Test
+  public void testWriteRowWithTableSchema() throws Exception {
+    testWriteRow(SimpleDataUtil.FLINK_SCHEMA, DistributionMode.NONE);
+    verifyOtherBranchUnmodified();
+  }
+
+  private void testWriteRow(TableSchema tableSchema, DistributionMode distributionMode)
+      throws Exception {
+    List<Row> rows = createRows("");
+    DataStream<Row> dataStream = env.addSource(createBoundedSource(rows), ROW_TYPE_INFO);
+
+    FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA)
+        .table(table)
+        .tableLoader(tableLoader)
+        .tableSchema(tableSchema)
+        .toBranch(branch)
+        .distributionMode(distributionMode)
+        .append();
+
+    // Execute the program.
+    env.execute("Test Iceberg DataStream.");
+
+    SimpleDataUtil.assertTableRows(table, convertToRowData(rows), branch);
+    SimpleDataUtil.assertTableRows(
+        table,
+        ImmutableList.of(),
+        branch.equals(SnapshotRef.MAIN_BRANCH) ? "test-branch" : SnapshotRef.MAIN_BRANCH);
+
+    verifyOtherBranchUnmodified();
+  }
+
+  @Test
+  public void testCheckAndGetEqualityFieldIds() {
+    Assume.assumeTrue(formatVersion == 2);
+    table
+        .updateSchema()
+        .allowIncompatibleChanges()
+        .addRequiredColumn("type", Types.StringType.get())
+        .setIdentifierFields("type")
+        .commit();
+
+    DataStream<Row> dataStream =
+        env.addSource(new BoundedTestSource<>(ImmutableList.of()), ROW_TYPE_INFO);
+    FlinkSink.Builder builder =
+        FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA).table(table);
+
+    // Use schema identifier field IDs as equality field id list by default
+    Assert.assertEquals(
+        table.schema().identifierFieldIds(),
+        Sets.newHashSet(builder.checkAndGetEqualityFieldIds()));
+
+    // Use user-provided equality field column as equality field id list
+    builder.equalityFieldColumns(Lists.newArrayList("id"));
+    Assert.assertEquals(
+        Sets.newHashSet(table.schema().findField("id").fieldId()),
+        Sets.newHashSet(builder.checkAndGetEqualityFieldIds()));
+
+    builder.equalityFieldColumns(Lists.newArrayList("type"));
+    Assert.assertEquals(
+        Sets.newHashSet(table.schema().findField("type").fieldId()),
+        Sets.newHashSet(builder.checkAndGetEqualityFieldIds()));
+
+    verifyOtherBranchUnmodified();
+  }
+
+  @Test
+  public void testChangeLogOnIdKey() throws Exception {
+    Assume.assumeTrue(formatVersion == 2);
+    List<List<Row>> elementsPerCheckpoint =

Review Comment:
   Good point, I did some more refactoring to further reduce the duplication. The common test cases with v2 will be shared (so the row data and the append will be the same) but we'll pass in a branch. For the non branch test cases we pass in just main, but parameterize based on all the existing properties (like file format, partitioning etc). For the branch ones we'll pass in just the branches with a fixed file format, partitioning. In this way we can reduce the duplication and get the benefit of not doubling the number of tests. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] amogh-jahagirdar commented on pull request #6660: Flink: Support writes to branches

Posted by "amogh-jahagirdar (via GitHub)" <gi...@apache.org>.
amogh-jahagirdar commented on PR #6660:
URL: https://github.com/apache/iceberg/pull/6660#issuecomment-1403846236

   moving this out of draft. 
   
   Beyond the code wanted to discuss high level API design for branch writes using FlinkSink. In the current implementation we're using a FlinkWriteOption so it looks like this:
   
   ```
   FlinkSink sink = FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA)
       .table(table)
       .tableLoader(tableLoader)
       .set("write-format", "avro")
        .set(FlinkWriteOptions.BRANCH, "some-branch")
       .set(FlinkWriteOptions.OVERWRITE_MODE, "true")
   ```
   
   As I implemented this though started to think we may just want to have a direct branch method on the FlinkSink builder itself. That seems more intuitive from an API perspective and is just easier to use
   
   ```
   FlinkSink sink = FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA)
       .table(table)
       .tableLoader(tableLoader)
       .branch("some-branch")
       .set("write-format", "avro")
       .set(FlinkWriteOptions.OVERWRITE_MODE, "true")
   ```
   
   @stevenzwu @yyanyy @rdblue @jackye1995 Let me know your thoughts here! 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu merged pull request #6660: Flink: Support writes to branches in FlinkSink

Posted by "stevenzwu (via GitHub)" <gi...@apache.org>.
stevenzwu merged PR #6660:
URL: https://github.com/apache/iceberg/pull/6660


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] amogh-jahagirdar commented on a diff in pull request #6660: Flink: Support writes to branches in FlinkSink

Posted by "amogh-jahagirdar (via GitHub)" <gi...@apache.org>.
amogh-jahagirdar commented on code in PR #6660:
URL: https://github.com/apache/iceberg/pull/6660#discussion_r1087201304


##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java:
##########
@@ -471,8 +476,9 @@ private static ListStateDescriptor<SortedMap<Long, byte[]>> buildStateDescriptor
     return new ListStateDescriptor<>("iceberg-files-committer-state", sortedMapTypeInfo);
   }
 
-  static long getMaxCommittedCheckpointId(Table table, String flinkJobId, String operatorId) {
-    Snapshot snapshot = table.currentSnapshot();
+  static long getMaxCommittedCheckpointId(
+      Table table, String flinkJobId, String operatorId, String branch) {
+    Snapshot snapshot = table.snapshot(branch);

Review Comment:
   It does after line 484. This is where we start from the head of the branch (snapshot in this case) and traverse the lineage until we find a snapshot produced by the same flink job and get the most recently committed checkpoint 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] amogh-jahagirdar commented on a diff in pull request #6660: Flink: Support writes to branches in FlinkSink

Posted by "amogh-jahagirdar (via GitHub)" <gi...@apache.org>.
amogh-jahagirdar commented on code in PR #6660:
URL: https://github.com/apache/iceberg/pull/6660#discussion_r1087244952


##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java:
##########
@@ -284,10 +303,23 @@ public static void assertTableRecords(Table table, List<Record> expected) throws
     }
   }
 
+  public static Snapshot latestSnapshot(Table table, String branch) {

Review Comment:
   Technically `table.snapshot(branch)` would also yield the same results but refer to @rdblue [comment](https://github.com/apache/iceberg/pull/6651/files#r1085892442) on the Spark write PR.
   
   For the main branch we still want to validate that the behavior of currentSnapshot() is maintained since that was the API used before. This will effectively ensure that the currentSnapshotId in metadata is maintained correctly as the main branch gets written to. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on pull request #6660: Flink: Support writes to branches in FlinkSink

Posted by "stevenzwu (via GitHub)" <gi...@apache.org>.
stevenzwu commented on PR #6660:
URL: https://github.com/apache/iceberg/pull/6660#issuecomment-1404049007

   > As I implemented this though started to think we may just want to have a direct branch method on the FlinkSink builder itself. That seems more intuitive from an API perspective and is just easier to use.
   
   Yeah. I also think an explicit `toBranch` method in the builder itself is more intuitive.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] amogh-jahagirdar commented on a diff in pull request #6660: Flink: Support writes to branches in FlinkSink

Posted by "amogh-jahagirdar (via GitHub)" <gi...@apache.org>.
amogh-jahagirdar commented on code in PR #6660:
URL: https://github.com/apache/iceberg/pull/6660#discussion_r1088446403


##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java:
##########
@@ -85,28 +86,29 @@ public class TestFlinkIcebergSink {
   private final int parallelism;
   private final boolean partitioned;
 
-  @Parameterized.Parameters(name = "format={0}, parallelism = {1}, partitioned = {2}")
+  @Parameterized.Parameters(name = "format={0}, parallelism = {1}, partitioned = {2}, branch = {3}")

Review Comment:
   Agreed, even I was thinking along the same lines because the file format/parallelism/partitioned are all independent of branch writes and we'd be adding a lot of unnecessary test cases. I think it makes sense to separate the tests here specifically for branches!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a diff in pull request #6660: Flink: Support writes to branches in FlinkSink

Posted by "stevenzwu (via GitHub)" <gi...@apache.org>.
stevenzwu commented on code in PR #6660:
URL: https://github.com/apache/iceberg/pull/6660#discussion_r1091380956


##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkBranch.java:
##########
@@ -0,0 +1,398 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.types.Row;
+import org.apache.iceberg.DistributionMode;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.SnapshotRef;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.flink.HadoopCatalogResource;
+import org.apache.iceberg.flink.MiniClusterResource;
+import org.apache.iceberg.flink.SimpleDataUtil;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.flink.TestFixtures;
+import org.apache.iceberg.flink.source.BoundedTestSource;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.Types;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class TestFlinkIcebergSinkBranch extends TestFlinkIcebergSinkBase {
+
+  @ClassRule
+  public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE =
+      MiniClusterResource.createWithClassloaderCheckDisabled();
+
+  @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
+
+  @Rule
+  public final HadoopCatalogResource catalogResource =
+      new HadoopCatalogResource(TEMPORARY_FOLDER, TestFixtures.DATABASE, TestFixtures.TABLE);
+
+  private TableLoader tableLoader;

Review Comment:
   nit: we typically put non-final variables after final variables



##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkBranch.java:
##########
@@ -0,0 +1,398 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.types.Row;
+import org.apache.iceberg.DistributionMode;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.SnapshotRef;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.flink.HadoopCatalogResource;
+import org.apache.iceberg.flink.MiniClusterResource;
+import org.apache.iceberg.flink.SimpleDataUtil;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.flink.TestFixtures;
+import org.apache.iceberg.flink.source.BoundedTestSource;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.Types;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class TestFlinkIcebergSinkBranch extends TestFlinkIcebergSinkBase {
+
+  @ClassRule
+  public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE =
+      MiniClusterResource.createWithClassloaderCheckDisabled();
+
+  @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
+
+  @Rule
+  public final HadoopCatalogResource catalogResource =
+      new HadoopCatalogResource(TEMPORARY_FOLDER, TestFixtures.DATABASE, TestFixtures.TABLE);
+
+  private TableLoader tableLoader;
+  private Table table;
+  private StreamExecutionEnvironment env;
+  private final String branch;
+  private final int formatVersion;
+
+  @Parameterized.Parameters(name = "formatVersion = {0}, branch = {1}")
+  public static Object[][] parameters() {
+    return new Object[][] {
+      {1, "main"},
+      {1, "test-branch"},
+      {2, "main"},
+      {2, "test-branch"},
+    };
+  }
+
+  public TestFlinkIcebergSinkBranch(int formatVersion, String branch) {
+    this.formatVersion = formatVersion;
+    this.branch = branch;
+  }
+
+  @Before
+  public void before() throws IOException {
+    table =
+        catalogResource
+            .catalog()
+            .createTable(
+                TestFixtures.TABLE_IDENTIFIER,
+                SimpleDataUtil.SCHEMA,
+                PartitionSpec.unpartitioned(),
+                ImmutableMap.of(
+                    TableProperties.DEFAULT_FILE_FORMAT,
+                    FileFormat.AVRO.name(),
+                    TableProperties.FORMAT_VERSION,
+                    String.valueOf(formatVersion)));
+
+    env =
+        StreamExecutionEnvironment.getExecutionEnvironment(
+                MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG)
+            .enableCheckpointing(100);
+
+    tableLoader = catalogResource.tableLoader();
+  }
+
+  @Test
+  public void testWriteRow() throws Exception {
+    testWriteRow(null, DistributionMode.NONE);
+  }
+
+  @Test
+  public void testWriteRowWithTableSchema() throws Exception {

Review Comment:
   I feel it may not be necessary to test Flink TableSchema null or not for branching tests.



##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkBranch.java:
##########
@@ -0,0 +1,398 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.types.Row;
+import org.apache.iceberg.DistributionMode;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.SnapshotRef;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.flink.HadoopCatalogResource;
+import org.apache.iceberg.flink.MiniClusterResource;
+import org.apache.iceberg.flink.SimpleDataUtil;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.flink.TestFixtures;
+import org.apache.iceberg.flink.source.BoundedTestSource;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.Types;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class TestFlinkIcebergSinkBranch extends TestFlinkIcebergSinkBase {
+
+  @ClassRule
+  public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE =
+      MiniClusterResource.createWithClassloaderCheckDisabled();
+
+  @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
+
+  @Rule
+  public final HadoopCatalogResource catalogResource =
+      new HadoopCatalogResource(TEMPORARY_FOLDER, TestFixtures.DATABASE, TestFixtures.TABLE);
+
+  private TableLoader tableLoader;
+  private Table table;
+  private StreamExecutionEnvironment env;
+  private final String branch;
+  private final int formatVersion;
+
+  @Parameterized.Parameters(name = "formatVersion = {0}, branch = {1}")
+  public static Object[][] parameters() {
+    return new Object[][] {
+      {1, "main"},
+      {1, "test-branch"},
+      {2, "main"},
+      {2, "test-branch"},
+    };
+  }
+
+  public TestFlinkIcebergSinkBranch(int formatVersion, String branch) {
+    this.formatVersion = formatVersion;
+    this.branch = branch;
+  }
+
+  @Before
+  public void before() throws IOException {
+    table =
+        catalogResource
+            .catalog()
+            .createTable(
+                TestFixtures.TABLE_IDENTIFIER,
+                SimpleDataUtil.SCHEMA,
+                PartitionSpec.unpartitioned(),
+                ImmutableMap.of(
+                    TableProperties.DEFAULT_FILE_FORMAT,
+                    FileFormat.AVRO.name(),
+                    TableProperties.FORMAT_VERSION,
+                    String.valueOf(formatVersion)));
+
+    env =
+        StreamExecutionEnvironment.getExecutionEnvironment(
+                MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG)
+            .enableCheckpointing(100);
+
+    tableLoader = catalogResource.tableLoader();
+  }
+
+  @Test
+  public void testWriteRow() throws Exception {
+    testWriteRow(null, DistributionMode.NONE);
+  }
+
+  @Test
+  public void testWriteRowWithTableSchema() throws Exception {
+    testWriteRow(SimpleDataUtil.FLINK_SCHEMA, DistributionMode.NONE);
+    verifyOtherBranchUnmodified();
+  }
+
+  private void testWriteRow(TableSchema tableSchema, DistributionMode distributionMode)
+      throws Exception {
+    List<Row> rows = createRows("");
+    DataStream<Row> dataStream = env.addSource(createBoundedSource(rows), ROW_TYPE_INFO);
+
+    FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA)
+        .table(table)
+        .tableLoader(tableLoader)
+        .tableSchema(tableSchema)
+        .toBranch(branch)
+        .distributionMode(distributionMode)
+        .append();
+
+    // Execute the program.
+    env.execute("Test Iceberg DataStream.");
+
+    SimpleDataUtil.assertTableRows(table, convertToRowData(rows), branch);
+    SimpleDataUtil.assertTableRows(
+        table,
+        ImmutableList.of(),
+        branch.equals(SnapshotRef.MAIN_BRANCH) ? "test-branch" : SnapshotRef.MAIN_BRANCH);
+
+    verifyOtherBranchUnmodified();
+  }
+
+  @Test
+  public void testCheckAndGetEqualityFieldIds() {
+    Assume.assumeTrue(formatVersion == 2);
+    table
+        .updateSchema()
+        .allowIncompatibleChanges()
+        .addRequiredColumn("type", Types.StringType.get())
+        .setIdentifierFields("type")
+        .commit();
+
+    DataStream<Row> dataStream =
+        env.addSource(new BoundedTestSource<>(ImmutableList.of()), ROW_TYPE_INFO);
+    FlinkSink.Builder builder =
+        FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA).table(table);
+
+    // Use schema identifier field IDs as equality field id list by default
+    Assert.assertEquals(
+        table.schema().identifierFieldIds(),
+        Sets.newHashSet(builder.checkAndGetEqualityFieldIds()));
+
+    // Use user-provided equality field column as equality field id list
+    builder.equalityFieldColumns(Lists.newArrayList("id"));
+    Assert.assertEquals(
+        Sets.newHashSet(table.schema().findField("id").fieldId()),
+        Sets.newHashSet(builder.checkAndGetEqualityFieldIds()));
+
+    builder.equalityFieldColumns(Lists.newArrayList("type"));
+    Assert.assertEquals(
+        Sets.newHashSet(table.schema().findField("type").fieldId()),
+        Sets.newHashSet(builder.checkAndGetEqualityFieldIds()));
+
+    verifyOtherBranchUnmodified();
+  }
+
+  @Test
+  public void testChangeLogOnIdKey() throws Exception {
+    Assume.assumeTrue(formatVersion == 2);
+    List<List<Row>> elementsPerCheckpoint =
+        ImmutableList.of(
+            ImmutableList.of(
+                row("+I", 1, "aaa"),
+                row("-D", 1, "aaa"),
+                row("+I", 1, "bbb"),
+                row("+I", 2, "aaa"),
+                row("-D", 2, "aaa"),
+                row("+I", 2, "bbb")),
+            ImmutableList.of(
+                row("-U", 2, "bbb"), row("+U", 2, "ccc"), row("-D", 2, "ccc"), row("+I", 2, "ddd")),
+            ImmutableList.of(
+                row("-D", 1, "bbb"),
+                row("+I", 1, "ccc"),
+                row("-D", 1, "ccc"),
+                row("+I", 1, "ddd")));
+
+    List<List<Record>> expectedRecords =
+        ImmutableList.of(
+            ImmutableList.of(record(1, "bbb"), record(2, "bbb")),
+            ImmutableList.of(record(1, "bbb"), record(2, "ddd")),
+            ImmutableList.of(record(1, "ddd"), record(2, "ddd")));
+
+    testChangeLogs(
+        env,
+        tableLoader,
+        table,
+        1,
+        ImmutableList.of("id"),
+        row -> row.getField(ROW_ID_POS),
+        false,
+        elementsPerCheckpoint,
+        expectedRecords,
+        branch);
+
+    verifyOtherBranchUnmodified();
+  }
+
+  @Test
+  public void testChangeLogOnDataKey() throws Exception {
+    Assume.assumeTrue(formatVersion == 2);
+
+    List<List<Row>> elementsPerCheckpoint =
+        ImmutableList.of(
+            ImmutableList.of(
+                row("+I", 1, "aaa"),
+                row("-D", 1, "aaa"),
+                row("+I", 2, "bbb"),
+                row("+I", 1, "bbb"),
+                row("+I", 2, "aaa")),
+            ImmutableList.of(row("-U", 2, "aaa"), row("+U", 1, "ccc"), row("+I", 1, "aaa")),
+            ImmutableList.of(row("-D", 1, "bbb"), row("+I", 2, "aaa"), row("+I", 2, "ccc")));
+
+    List<List<Record>> expectedRecords =
+        ImmutableList.of(
+            ImmutableList.of(record(1, "bbb"), record(2, "aaa")),
+            ImmutableList.of(record(1, "aaa"), record(1, "bbb"), record(1, "ccc")),
+            ImmutableList.of(
+                record(1, "aaa"), record(1, "ccc"), record(2, "aaa"), record(2, "ccc")));
+
+    testChangeLogs(
+        env,
+        tableLoader,
+        table,
+        1,
+        ImmutableList.of("data"),
+        row -> row.getField(ROW_DATA_POS),
+        false,
+        elementsPerCheckpoint,
+        expectedRecords,
+        branch);
+
+    verifyOtherBranchUnmodified();
+  }
+
+  @Test
+  public void testChangeLogOnIdDataKey() throws Exception {
+    Assume.assumeTrue(formatVersion == 2);
+
+    List<List<Row>> elementsPerCheckpoint =
+        ImmutableList.of(
+            ImmutableList.of(
+                row("+I", 1, "aaa"),
+                row("-D", 1, "aaa"),
+                row("+I", 2, "bbb"),
+                row("+I", 1, "bbb"),
+                row("+I", 2, "aaa")),
+            ImmutableList.of(row("-U", 2, "aaa"), row("+U", 1, "ccc"), row("+I", 1, "aaa")),
+            ImmutableList.of(row("-D", 1, "bbb"), row("+I", 2, "aaa")));
+
+    List<List<Record>> expectedRecords =
+        ImmutableList.of(
+            ImmutableList.of(record(1, "bbb"), record(2, "aaa"), record(2, "bbb")),
+            ImmutableList.of(
+                record(1, "aaa"), record(1, "bbb"), record(1, "ccc"), record(2, "bbb")),
+            ImmutableList.of(
+                record(1, "aaa"), record(1, "ccc"), record(2, "aaa"), record(2, "bbb")));
+
+    testChangeLogs(
+        env,
+        tableLoader,
+        table,
+        1,
+        ImmutableList.of("data", "id"),
+        row -> Row.of(row.getField(ROW_ID_POS), row.getField(ROW_DATA_POS)),
+        false,
+        elementsPerCheckpoint,
+        expectedRecords,
+        branch);
+
+    verifyOtherBranchUnmodified();
+  }
+
+  @Test
+  public void testUpsertOnIdKey() throws Exception {
+    Assume.assumeTrue(formatVersion == 2);
+    List<List<Row>> elementsPerCheckpoint =
+        ImmutableList.of(
+            ImmutableList.of(row("+I", 1, "aaa"), row("+U", 1, "bbb")),
+            ImmutableList.of(row("+I", 1, "ccc")),
+            ImmutableList.of(row("+U", 1, "ddd"), row("+I", 1, "eee")));
+
+    List<List<Record>> expectedRecords =
+        ImmutableList.of(
+            ImmutableList.of(record(1, "bbb")),
+            ImmutableList.of(record(1, "ccc")),
+            ImmutableList.of(record(1, "eee")));
+
+    testChangeLogs(
+        env,
+        tableLoader,
+        table,
+        1,
+        ImmutableList.of("id"),
+        row -> row.getField(ROW_ID_POS),
+        true,
+        elementsPerCheckpoint,
+        expectedRecords,
+        branch);
+
+    verifyOtherBranchUnmodified();
+  }
+
+  @Test
+  public void testUpsertOnDataKey() throws Exception {
+    Assume.assumeTrue(formatVersion == 2);
+    List<List<Row>> elementsPerCheckpoint =
+        ImmutableList.of(
+            ImmutableList.of(row("+I", 1, "aaa"), row("+I", 2, "aaa"), row("+I", 3, "bbb")),
+            ImmutableList.of(row("+U", 4, "aaa"), row("-U", 3, "bbb"), row("+U", 5, "bbb")),
+            ImmutableList.of(row("+I", 6, "aaa"), row("+U", 7, "bbb")));
+
+    List<List<Record>> expectedRecords =
+        ImmutableList.of(
+            ImmutableList.of(record(2, "aaa"), record(3, "bbb")),
+            ImmutableList.of(record(4, "aaa"), record(5, "bbb")),
+            ImmutableList.of(record(6, "aaa"), record(7, "bbb")));
+
+    testChangeLogs(
+        env,
+        tableLoader,
+        table,
+        1,
+        ImmutableList.of("data"),
+        row -> row.getField(ROW_DATA_POS),
+        true,
+        elementsPerCheckpoint,
+        expectedRecords,
+        branch);
+
+    verifyOtherBranchUnmodified();
+  }
+
+  @Test
+  public void testUpsertOnIdDataKey() throws Exception {
+    Assume.assumeTrue(formatVersion == 2);
+    List<List<Row>> elementsPerCheckpoint =
+        ImmutableList.of(
+            ImmutableList.of(row("+I", 1, "aaa"), row("+U", 1, "aaa"), row("+I", 2, "bbb")),
+            ImmutableList.of(row("+I", 1, "aaa"), row("-D", 2, "bbb"), row("+I", 2, "ccc")),
+            ImmutableList.of(row("+U", 1, "bbb"), row("-U", 1, "ccc"), row("-D", 1, "aaa")));
+
+    List<List<Record>> expectedRecords =
+        ImmutableList.of(
+            ImmutableList.of(record(1, "aaa"), record(2, "bbb")),
+            ImmutableList.of(record(1, "aaa"), record(2, "ccc")),
+            ImmutableList.of(record(1, "bbb"), record(2, "ccc")));
+
+    testChangeLogs(
+        env,
+        tableLoader,
+        table,
+        1,
+        ImmutableList.of("id", "data"),
+        row -> Row.of(row.getField(ROW_ID_POS), row.getField(ROW_DATA_POS)),
+        true,
+        elementsPerCheckpoint,
+        expectedRecords,
+        branch);
+
+    verifyOtherBranchUnmodified();
+  }
+
+  private void verifyOtherBranchUnmodified() {

Review Comment:
   nice



##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkBranch.java:
##########
@@ -0,0 +1,398 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.sink;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.types.Row;
+import org.apache.iceberg.DistributionMode;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.SnapshotRef;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.flink.HadoopCatalogResource;
+import org.apache.iceberg.flink.MiniClusterResource;
+import org.apache.iceberg.flink.SimpleDataUtil;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.flink.TestFixtures;
+import org.apache.iceberg.flink.source.BoundedTestSource;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.Types;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class TestFlinkIcebergSinkBranch extends TestFlinkIcebergSinkBase {
+
+  @ClassRule
+  public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE =
+      MiniClusterResource.createWithClassloaderCheckDisabled();
+
+  @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
+
+  @Rule
+  public final HadoopCatalogResource catalogResource =
+      new HadoopCatalogResource(TEMPORARY_FOLDER, TestFixtures.DATABASE, TestFixtures.TABLE);
+
+  private TableLoader tableLoader;
+  private Table table;
+  private StreamExecutionEnvironment env;
+  private final String branch;
+  private final int formatVersion;
+
+  @Parameterized.Parameters(name = "formatVersion = {0}, branch = {1}")
+  public static Object[][] parameters() {
+    return new Object[][] {
+      {1, "main"},
+      {1, "test-branch"},
+      {2, "main"},
+      {2, "test-branch"},
+    };
+  }
+
+  public TestFlinkIcebergSinkBranch(int formatVersion, String branch) {
+    this.formatVersion = formatVersion;
+    this.branch = branch;
+  }
+
+  @Before
+  public void before() throws IOException {
+    table =
+        catalogResource
+            .catalog()
+            .createTable(
+                TestFixtures.TABLE_IDENTIFIER,
+                SimpleDataUtil.SCHEMA,
+                PartitionSpec.unpartitioned(),
+                ImmutableMap.of(
+                    TableProperties.DEFAULT_FILE_FORMAT,
+                    FileFormat.AVRO.name(),
+                    TableProperties.FORMAT_VERSION,
+                    String.valueOf(formatVersion)));
+
+    env =
+        StreamExecutionEnvironment.getExecutionEnvironment(
+                MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG)
+            .enableCheckpointing(100);
+
+    tableLoader = catalogResource.tableLoader();
+  }
+
+  @Test
+  public void testWriteRow() throws Exception {
+    testWriteRow(null, DistributionMode.NONE);
+  }
+
+  @Test
+  public void testWriteRowWithTableSchema() throws Exception {
+    testWriteRow(SimpleDataUtil.FLINK_SCHEMA, DistributionMode.NONE);
+    verifyOtherBranchUnmodified();
+  }
+
+  private void testWriteRow(TableSchema tableSchema, DistributionMode distributionMode)
+      throws Exception {
+    List<Row> rows = createRows("");
+    DataStream<Row> dataStream = env.addSource(createBoundedSource(rows), ROW_TYPE_INFO);
+
+    FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA)
+        .table(table)
+        .tableLoader(tableLoader)
+        .tableSchema(tableSchema)
+        .toBranch(branch)
+        .distributionMode(distributionMode)
+        .append();
+
+    // Execute the program.
+    env.execute("Test Iceberg DataStream.");
+
+    SimpleDataUtil.assertTableRows(table, convertToRowData(rows), branch);
+    SimpleDataUtil.assertTableRows(
+        table,
+        ImmutableList.of(),
+        branch.equals(SnapshotRef.MAIN_BRANCH) ? "test-branch" : SnapshotRef.MAIN_BRANCH);
+
+    verifyOtherBranchUnmodified();
+  }
+
+  @Test
+  public void testCheckAndGetEqualityFieldIds() {
+    Assume.assumeTrue(formatVersion == 2);
+    table
+        .updateSchema()
+        .allowIncompatibleChanges()
+        .addRequiredColumn("type", Types.StringType.get())
+        .setIdentifierFields("type")
+        .commit();
+
+    DataStream<Row> dataStream =
+        env.addSource(new BoundedTestSource<>(ImmutableList.of()), ROW_TYPE_INFO);
+    FlinkSink.Builder builder =
+        FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA).table(table);
+
+    // Use schema identifier field IDs as equality field id list by default
+    Assert.assertEquals(
+        table.schema().identifierFieldIds(),
+        Sets.newHashSet(builder.checkAndGetEqualityFieldIds()));
+
+    // Use user-provided equality field column as equality field id list
+    builder.equalityFieldColumns(Lists.newArrayList("id"));
+    Assert.assertEquals(
+        Sets.newHashSet(table.schema().findField("id").fieldId()),
+        Sets.newHashSet(builder.checkAndGetEqualityFieldIds()));
+
+    builder.equalityFieldColumns(Lists.newArrayList("type"));
+    Assert.assertEquals(
+        Sets.newHashSet(table.schema().findField("type").fieldId()),
+        Sets.newHashSet(builder.checkAndGetEqualityFieldIds()));
+
+    verifyOtherBranchUnmodified();
+  }
+
+  @Test
+  public void testChangeLogOnIdKey() throws Exception {
+    Assume.assumeTrue(formatVersion == 2);
+    List<List<Row>> elementsPerCheckpoint =

Review Comment:
   I still saw a lot of duplications with `TestFlinkIcebergSinkV2`. There are two differences
   1) branch arg for `testChangeLogs`
   2) verifyOtherBranchUnmodified
   
   Might be easier if we have separate `TestFlinkIcebergSinkBase` and `TestFlinkIcebergSinkV2Base`. then we can have separate `TestFlinkIcebergSinkBranch` and `TestFlinkIcebergSinkV2Branch`. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 commented on a diff in pull request #6660: Flink: Support writes to branches in FlinkSink

Posted by "jackye1995 (via GitHub)" <gi...@apache.org>.
jackye1995 commented on code in PR #6660:
URL: https://github.com/apache/iceberg/pull/6660#discussion_r1103848064


##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java:
##########
@@ -76,22 +76,24 @@ public class TestIcebergFilesCommitter extends TableTestBase {
   private File flinkManifestFolder;
 
   private final FileFormat format;
+  private final String branch;
 
-  @Parameterized.Parameters(name = "FileFormat = {0}, FormatVersion={1}")
+  @Parameterized.Parameters(name = "FileFormat = {0}, FormatVersion = {1}, branch = {2}")
   public static Object[][] parameters() {
     return new Object[][] {
-      new Object[] {"avro", 1},
-      new Object[] {"avro", 2},
-      new Object[] {"parquet", 1},
-      new Object[] {"parquet", 2},
-      new Object[] {"orc", 1},
-      new Object[] {"orc", 2}
+      new Object[] {"avro", 1, "main"},

Review Comment:
   nit: can we use `SnapshotRef.MAIN_BRANCH` for all the places we have `"main"` hard-coded?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 commented on a diff in pull request #6660: Flink: Support writes to branches in FlinkSink

Posted by "jackye1995 (via GitHub)" <gi...@apache.org>.
jackye1995 commented on code in PR #6660:
URL: https://github.com/apache/iceberg/pull/6660#discussion_r1103850488


##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java:
##########
@@ -76,22 +76,24 @@ public class TestIcebergFilesCommitter extends TableTestBase {
   private File flinkManifestFolder;
 
   private final FileFormat format;
+  private final String branch;
 
-  @Parameterized.Parameters(name = "FileFormat = {0}, FormatVersion={1}")
+  @Parameterized.Parameters(name = "FileFormat = {0}, FormatVersion = {1}, branch = {2}")
   public static Object[][] parameters() {
     return new Object[][] {
-      new Object[] {"avro", 1},
-      new Object[] {"avro", 2},
-      new Object[] {"parquet", 1},
-      new Object[] {"parquet", 2},
-      new Object[] {"orc", 1},
-      new Object[] {"orc", 2}
+      new Object[] {"avro", 1, "main"},

Review Comment:
   Also similar to the comment in Spark https://github.com/apache/iceberg/pull/6651/files#r1101939155, can we reduce the number of combinations here? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on pull request #6660: Flink: Support writes to branches in FlinkSink

Posted by "stevenzwu (via GitHub)" <gi...@apache.org>.
stevenzwu commented on PR #6660:
URL: https://github.com/apache/iceberg/pull/6660#issuecomment-1427324740

   thanks @amogh-jahagirdar for the contribution and @jackye1995 and @hililiwei for review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 commented on a diff in pull request #6660: Flink: Support writes to branches in FlinkSink

Posted by "jackye1995 (via GitHub)" <gi...@apache.org>.
jackye1995 commented on code in PR #6660:
URL: https://github.com/apache/iceberg/pull/6660#discussion_r1087212028


##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java:
##########
@@ -284,10 +303,23 @@ public static void assertTableRecords(Table table, List<Record> expected) throws
     }
   }
 
+  public static Snapshot latestSnapshot(Table table, String branch) {

Review Comment:
   shouldn't this always be `table.snapshot(branch)`? Why we need to handle main branch specially?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a diff in pull request #6660: Flink: Support writes to branches in FlinkSink

Posted by "stevenzwu (via GitHub)" <gi...@apache.org>.
stevenzwu commented on code in PR #6660:
URL: https://github.com/apache/iceberg/pull/6660#discussion_r1088414695


##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java:
##########
@@ -85,28 +86,29 @@ public class TestFlinkIcebergSink {
   private final int parallelism;
   private final boolean partitioned;
 
-  @Parameterized.Parameters(name = "format={0}, parallelism = {1}, partitioned = {2}")
+  @Parameterized.Parameters(name = "format={0}, parallelism = {1}, partitioned = {2}, branch = {3}")

Review Comment:
   I have a little bit concern of the multiplication factor of Parameterized tests. I feel we don't need to test different file format, writer parallelism, partitioned or not for the branch write.
   
   I am wondering if we should have a separate `TestFlinkIcebergSinkBranching`. it can be parameterized to test both v1 and v2 table format. also we can add more accurate assertion that write happened on the `test_branch` and verified that no snapshots/writes added to the `main` branch.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] amogh-jahagirdar commented on a diff in pull request #6660: Flink: Support writes to branches in FlinkSink

Posted by "amogh-jahagirdar (via GitHub)" <gi...@apache.org>.
amogh-jahagirdar commented on code in PR #6660:
URL: https://github.com/apache/iceberg/pull/6660#discussion_r1088335461


##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java:
##########
@@ -316,6 +316,11 @@ public Builder setSnapshotProperty(String property, String value) {
       return this;
     }
 
+    public Builder toBranch(String branch) {

Review Comment:
   Yeah it's a fair question, so in the Iceberg library API we're using toBranch as well so I thought it would be nice to follow the same convention for FlinkSink as well. for example to append files using the Iceberg library we would do the following
   
   
   ```
   table.appendFiles().append(FILE_A).toBranch("test-branch").commit()
   ```
   
   and so I thought we could do the same for Flink
   
   ```
   FlinkSink sink = FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA)
       .table(table)
       .tableLoader(tableLoader)
       .toBranch("test-branch")
       .set("write-format", "avro")
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] amogh-jahagirdar commented on pull request #6660: Flink: Support writes to branches in FlinkSink

Posted by "amogh-jahagirdar (via GitHub)" <gi...@apache.org>.
amogh-jahagirdar commented on PR #6660:
URL: https://github.com/apache/iceberg/pull/6660#issuecomment-1405646669

   > Thanks for the PR @amogh-jahagirdar! Left one small question.
   > 
   > Also, do we have this feature in FlinkSource?
   
   Thanks for the review @pvary ! Not yet, but I'm working on a PR for it, will publish when it's ready .


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 commented on a diff in pull request #6660: Flink: Support writes to branches in FlinkSink

Posted by "jackye1995 (via GitHub)" <gi...@apache.org>.
jackye1995 commented on code in PR #6660:
URL: https://github.com/apache/iceberg/pull/6660#discussion_r1087256344


##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java:
##########
@@ -284,10 +303,23 @@ public static void assertTableRecords(Table table, List<Record> expected) throws
     }
   }
 
+  public static Snapshot latestSnapshot(Table table, String branch) {

Review Comment:
   In that case, we should put what you say as documentation for the method so people know the context. Maybe we can refactor that later with Spark, but don't need to be done now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] amogh-jahagirdar commented on a diff in pull request #6660: Flink: Support writes to branches in FlinkSink

Posted by "amogh-jahagirdar (via GitHub)" <gi...@apache.org>.
amogh-jahagirdar commented on code in PR #6660:
URL: https://github.com/apache/iceberg/pull/6660#discussion_r1087244952


##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java:
##########
@@ -284,10 +303,23 @@ public static void assertTableRecords(Table table, List<Record> expected) throws
     }
   }
 
+  public static Snapshot latestSnapshot(Table table, String branch) {

Review Comment:
   Technically `table.snapshot(branch)` would also yield the same results but refer to @rdblue [comment](https://github.com/apache/iceberg/pull/6651/files#r1085892442) on the Spark write PR.
   
   For the main branch we still want to validate the behavior of specifically currentSnapshot() is maintained since that was the API used before. This will effectively ensure that the currentSnapshotId in metadata is maintained correctly as the main branch gets written to. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] amogh-jahagirdar commented on a diff in pull request #6660: Flink: Support writes to branches in FlinkSink

Posted by "amogh-jahagirdar (via GitHub)" <gi...@apache.org>.
amogh-jahagirdar commented on code in PR #6660:
URL: https://github.com/apache/iceberg/pull/6660#discussion_r1088335461


##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java:
##########
@@ -316,6 +316,11 @@ public Builder setSnapshotProperty(String property, String value) {
       return this;
     }
 
+    public Builder toBranch(String branch) {

Review Comment:
   Yeah it's a fair question, so in the Iceberg library API we're using toBranch as well so I thought it would be nice to follow the same convention for FlinkSink as well. for example to append files using the Iceberg library we would do the following
   
   
   ```
   table.appendFiles().append(FILE_A).toBranch("test-branch").commit()
   ```
   
   and so I thought we could do the same for Flink
   
   ```
   FlinkSink sink = FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA)
       .table(table)
       .tableLoader(tableLoader)
       .toBranch("test-branch")
       .set("write-format", "avro")
   ```
   
   Let me know your thoughts! CC @stevenzwu @jackye1995 @rdblue 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] amogh-jahagirdar commented on a diff in pull request #6660: Flink: Support writes to branches in FlinkSink

Posted by "amogh-jahagirdar (via GitHub)" <gi...@apache.org>.
amogh-jahagirdar commented on code in PR #6660:
URL: https://github.com/apache/iceberg/pull/6660#discussion_r1089278089


##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java:
##########
@@ -316,6 +316,11 @@ public Builder setSnapshotProperty(String property, String value) {
       return this;
     }
 
+    public Builder toBranch(String branch) {

Review Comment:
   Sure thanks for the review here, it's important to make sure we get the API correct! 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a diff in pull request #6660: Flink: Support writes to branches in FlinkSink

Posted by "stevenzwu (via GitHub)" <gi...@apache.org>.
stevenzwu commented on code in PR #6660:
URL: https://github.com/apache/iceberg/pull/6660#discussion_r1088408134


##########
flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java:
##########
@@ -471,8 +476,9 @@ private static ListStateDescriptor<SortedMap<Long, byte[]>> buildStateDescriptor
     return new ListStateDescriptor<>("iceberg-files-committer-state", sortedMapTypeInfo);
   }
 
-  static long getMaxCommittedCheckpointId(Table table, String flinkJobId, String operatorId) {
-    Snapshot snapshot = table.currentSnapshot();
+  static long getMaxCommittedCheckpointId(
+      Table table, String flinkJobId, String operatorId, String branch) {
+    Snapshot snapshot = table.snapshot(branch);

Review Comment:
   sorry, I misread the diff. thought it was a new line/variable



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a diff in pull request #6660: Flink: Support writes to branches in FlinkSink

Posted by "stevenzwu (via GitHub)" <gi...@apache.org>.
stevenzwu commented on code in PR #6660:
URL: https://github.com/apache/iceberg/pull/6660#discussion_r1088409938


##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java:
##########
@@ -267,13 +274,25 @@ public static void assertTableRecords(
   }
 
   public static void assertTableRecords(Table table, List<Record> expected) throws IOException {
+    assertTableRecords(table, expected, SnapshotRef.MAIN_BRANCH);
+  }
+
+  public static void assertTableRecords(Table table, List<Record> expected, String branch)
+      throws IOException {
     table.refresh();
+    Snapshot snapshot = latestSnapshot(table, branch);
+
+    if (snapshot == null) {
+      Assert.assertEquals(ImmutableList.of(), expected);

Review Comment:
   the assertion order is incorrect. should be `(expected, actual)`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] stevenzwu commented on a diff in pull request #6660: Flink: Support writes to branches in FlinkSink

Posted by "stevenzwu (via GitHub)" <gi...@apache.org>.
stevenzwu commented on code in PR #6660:
URL: https://github.com/apache/iceberg/pull/6660#discussion_r1089604601


##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java:
##########
@@ -85,28 +86,29 @@ public class TestFlinkIcebergSink {
   private final int parallelism;
   private final boolean partitioned;
 
-  @Parameterized.Parameters(name = "format={0}, parallelism = {1}, partitioned = {2}")
+  @Parameterized.Parameters(name = "format={0}, parallelism = {1}, partitioned = {2}, branch = {3}")

Review Comment:
   here are some stats from one local run (a powerful M1 macbook) 
   * TestFlinkIcebergSink: 36s
   * TestFlinkIcebergSinkV2: 77s
   * TestIcebergFilesCommitter: 19s
   
   If we double it, the total build time for those 3 classes will increase from 132s (2m) to 264s (4m). If we add another dimension/multiplication after this, it will increase from 4+ mins to 8-9 mins.
   
   That was my concern.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] amogh-jahagirdar commented on pull request #6660: Flink: Support writes to branches in FlinkSink

Posted by "amogh-jahagirdar (via GitHub)" <gi...@apache.org>.
amogh-jahagirdar commented on PR #6660:
URL: https://github.com/apache/iceberg/pull/6660#issuecomment-1429128924

   Thanks for the reviews! @stevenzwu @jackye1995   @hililiwei


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org