You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by "jackye1995 (via GitHub)" <gi...@apache.org> on 2023/03/09 05:08:15 UTC

[GitHub] [iceberg] jackye1995 opened a new pull request, #7050: Spark 3.3: support write to WAP branch

jackye1995 opened a new pull request, #7050:
URL: https://github.com/apache/iceberg/pull/7050

   Fixes #6774
   
   based on #6965 , separated PR for writing to WAP branch. Will rebase once that is merged.
   
   @rdblue @aokolnychyi @amogh-jahagirdar @namrathamyske 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] amogh-jahagirdar commented on a diff in pull request #7050: Spark 3.3: support write to WAP branch

Posted by "amogh-jahagirdar (via GitHub)" <gi...@apache.org>.
amogh-jahagirdar commented on code in PR #7050:
URL: https://github.com/apache/iceberg/pull/7050#discussion_r1133150649


##########
spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/sql/TestPartitionedWritesToWAPBranch.java:
##########
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.sql;
+
+import java.util.Map;
+import java.util.UUID;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.spark.SparkSessionProperties;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.assertj.core.api.Assertions;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestPartitionedWritesToWAPBranch extends PartitionedWritesTestBase {
+
+  private static final String BRANCH = "test";
+
+  public TestPartitionedWritesToWAPBranch(
+      String catalogName, String implementation, Map<String, String> config) {
+    super(catalogName, implementation, config);
+  }
+
+  @Before
+  @Override
+  public void createTables() {
+    spark.conf().set(SparkSessionProperties.WAP_BRANCH, BRANCH);
+    sql(
+        "CREATE TABLE %s (id bigint, data string) USING iceberg PARTITIONED BY (truncate(id, 3)) OPTIONS (%s = 'true')",
+        tableName, TableProperties.WRITE_AUDIT_PUBLISH_ENABLED);
+    sql("INSERT INTO %s VALUES (1, 'a'), (2, 'b'), (3, 'c')", tableName);
+  }
+
+  @After
+  @Override
+  public void removeTables() {
+    super.removeTables();
+    spark.conf().unset(SparkSessionProperties.WAP_BRANCH);
+    spark.conf().unset(SparkSessionProperties.WAP_ID);
+  }
+
+  @Override
+  protected String commitTarget() {
+    return tableName;
+  }
+
+  @Override
+  protected String selectTarget() {
+    return String.format("%s VERSION AS OF '%s'", tableName, BRANCH);
+  }
+
+  // commit target in WAP is just the table name
+  // should use table + branch name instead for read
+  @Test
+  @Override
+  public void testViewsReturnRecentResults() {
+    Assert.assertEquals(
+        "Should have 3 rows", 3L, scalarSql("SELECT count(*) FROM %s", selectTarget()));
+
+    Dataset<Row> query =
+        spark.sql("SELECT * FROM " + tableName + ".branch_" + BRANCH + " WHERE id = 1");
+    query.createOrReplaceTempView("tmp");
+
+    assertEquals(
+        "View should have expected rows", ImmutableList.of(row(1L, "a")), sql("SELECT * FROM tmp"));
+
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", commitTarget());

Review Comment:
   very minor nit: If we could make the second insert, insert another value for the second column it makes it a bit more obvious this is another record just from reading the expectation. but very minor, just something to make the test more apparent imo.



##########
spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/sql/TestPartitionedWritesToWAPBranch.java:
##########
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.sql;
+
+import java.util.Map;
+import java.util.UUID;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.spark.SparkSessionProperties;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.assertj.core.api.Assertions;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestPartitionedWritesToWAPBranch extends PartitionedWritesTestBase {
+
+  private static final String BRANCH = "test";
+
+  public TestPartitionedWritesToWAPBranch(
+      String catalogName, String implementation, Map<String, String> config) {
+    super(catalogName, implementation, config);
+  }
+
+  @Before
+  @Override
+  public void createTables() {
+    spark.conf().set(SparkSessionProperties.WAP_BRANCH, BRANCH);
+    sql(
+        "CREATE TABLE %s (id bigint, data string) USING iceberg PARTITIONED BY (truncate(id, 3)) OPTIONS (%s = 'true')",
+        tableName, TableProperties.WRITE_AUDIT_PUBLISH_ENABLED);
+    sql("INSERT INTO %s VALUES (1, 'a'), (2, 'b'), (3, 'c')", tableName);
+  }
+
+  @After
+  @Override
+  public void removeTables() {
+    super.removeTables();
+    spark.conf().unset(SparkSessionProperties.WAP_BRANCH);
+    spark.conf().unset(SparkSessionProperties.WAP_ID);
+  }
+
+  @Override
+  protected String commitTarget() {
+    return tableName;
+  }
+
+  @Override
+  protected String selectTarget() {
+    return String.format("%s VERSION AS OF '%s'", tableName, BRANCH);
+  }
+
+  // commit target in WAP is just the table name
+  // should use table + branch name instead for read

Review Comment:
   Yeah we need to use the `branch` during planning of the write to handle the dynamic pruning case, that's what came up in https://github.com/apache/iceberg/pull/6651/files. If we pass the branch through the SparkTable that should take care of this? Whether it's WAP branch or not doesn't matter, just needs to read the branch



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/Spark3Util.java:
##########
@@ -937,6 +938,28 @@ public static org.apache.spark.sql.catalyst.TableIdentifier toV1TableIdentifier(
     return org.apache.spark.sql.catalyst.TableIdentifier.apply(table, database);
   }
 
+  public static String determineWriteBranch(

Review Comment:
   Yeah just took a look in the code, there's no real computation for `wapId` and `wapEnabled` they're just constants, so I'm in favor of having it in `SparkWriteConf` as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 commented on a diff in pull request #7050: Spark 3.3: support write to WAP branch

Posted by "jackye1995 (via GitHub)" <gi...@apache.org>.
jackye1995 commented on code in PR #7050:
URL: https://github.com/apache/iceberg/pull/7050#discussion_r1133037748


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/Spark3Util.java:
##########
@@ -937,6 +938,28 @@ public static org.apache.spark.sql.catalyst.TableIdentifier toV1TableIdentifier(
     return org.apache.spark.sql.catalyst.TableIdentifier.apply(table, database);
   }
 
+  public static String determineWriteBranch(

Review Comment:
   This method can technically be embedded in `SparkWriteConf.branch()`, and we can also remove `SparkWriteConf.wapBranch()` method. The only issue is that it will compute `wapId` and `wapEnabled` twice, once in the method, once in `SparkWrite`/`SparkPositionDeltaWrite`. Please let me know if there is any preference for code cleanness vs computation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 commented on a diff in pull request #7050: Spark 3.3: support write to WAP branch

Posted by "jackye1995 (via GitHub)" <gi...@apache.org>.
jackye1995 commented on code in PR #7050:
URL: https://github.com/apache/iceberg/pull/7050#discussion_r1133037748


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/Spark3Util.java:
##########
@@ -937,6 +938,28 @@ public static org.apache.spark.sql.catalyst.TableIdentifier toV1TableIdentifier(
     return org.apache.spark.sql.catalyst.TableIdentifier.apply(table, database);
   }
 
+  public static String determineWriteBranch(

Review Comment:
   This method can technically be embedded in `SparkWriteConf.branch()`. If we do that, we can also remove `SparkWriteConf.wapBranch()`. The only issue is that it will compute `wapId` and `wapEnabled` twice, once in the method, once in `SparkWrite`/`SparkPositionDeltaWrite`. Please let me know if there is any preference for code cleanness vs computation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 commented on a diff in pull request #7050: Spark 3.3: support write to WAP branch

Posted by "jackye1995 (via GitHub)" <gi...@apache.org>.
jackye1995 commented on code in PR #7050:
URL: https://github.com/apache/iceberg/pull/7050#discussion_r1135675590


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java:
##########
@@ -333,6 +334,28 @@ public boolean caseSensitive() {
   }
 
   public String branch() {
+    if (wapEnabled()) {
+      String wapId = wapId();
+      String wapBranch =
+          confParser.stringConf().sessionConf(SparkSQLProperties.WAP_BRANCH).parseOptional();
+
+      ValidationException.check(
+          wapId == null || wapBranch == null,
+          "Cannot set both WAP ID and branch, but got ID [%s] and branch [%s]",
+          wapId,
+          wapBranch);
+
+      if (wapBranch != null) {
+        ValidationException.check(
+            branch == null,
+            "Cannot write to both branch and WAP branch, but got branch [%s] and WAP branch [%s]",
+            branch,
+            wapBranch);

Review Comment:
   Thank you! I added issue https://github.com/apache/iceberg/issues/7103 and we can discuss there with related people.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 commented on a diff in pull request #7050: Spark 3.3: support write to WAP branch

Posted by "jackye1995 (via GitHub)" <gi...@apache.org>.
jackye1995 commented on code in PR #7050:
URL: https://github.com/apache/iceberg/pull/7050#discussion_r1133035339


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkSessionProperties.java:
##########
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark;
+
+public class SparkSessionProperties {

Review Comment:
   Seems like we have been hard-coding `spark.wap.id`, I created this class to hold commonly used session properties. Was there any particular reason that we did not do that in the past?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #7050: Spark 3.3: support write to WAP branch

Posted by "rdblue (via GitHub)" <gi...@apache.org>.
rdblue commented on code in PR #7050:
URL: https://github.com/apache/iceberg/pull/7050#discussion_r1133141037


##########
spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java:
##########
@@ -1066,6 +1068,50 @@ public void testDeleteWithMultipleSpecs() {
         sql("SELECT * FROM %s ORDER BY id", selectTarget()));
   }
 
+  @Test
+  public void testDeleteToWAPBranch() throws NoSuchTableException {
+    Assume.assumeTrue("WAP branch only works for table identifier without branch", branch == null);

Review Comment:
   Do we fail when writing to a branch identifier if the `WAP_BRANCH` is set? If so, we could test that behavior here instead of using `Assume`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #7050: Spark 3.3: support write to WAP branch

Posted by "rdblue (via GitHub)" <gi...@apache.org>.
rdblue commented on code in PR #7050:
URL: https://github.com/apache/iceberg/pull/7050#discussion_r1133142216


##########
spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/sql/TestPartitionedWritesToWAPBranch.java:
##########
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.sql;
+
+import java.util.Map;
+import java.util.UUID;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.spark.SparkSessionProperties;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.assertj.core.api.Assertions;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestPartitionedWritesToWAPBranch extends PartitionedWritesTestBase {
+
+  private static final String BRANCH = "test";
+
+  public TestPartitionedWritesToWAPBranch(
+      String catalogName, String implementation, Map<String, String> config) {
+    super(catalogName, implementation, config);
+  }
+
+  @Before
+  @Override
+  public void createTables() {
+    spark.conf().set(SparkSessionProperties.WAP_BRANCH, BRANCH);
+    sql(
+        "CREATE TABLE %s (id bigint, data string) USING iceberg PARTITIONED BY (truncate(id, 3)) OPTIONS (%s = 'true')",
+        tableName, TableProperties.WRITE_AUDIT_PUBLISH_ENABLED);
+    sql("INSERT INTO %s VALUES (1, 'a'), (2, 'b'), (3, 'c')", tableName);
+  }
+
+  @After
+  @Override
+  public void removeTables() {
+    super.removeTables();
+    spark.conf().unset(SparkSessionProperties.WAP_BRANCH);
+    spark.conf().unset(SparkSessionProperties.WAP_ID);
+  }
+
+  @Override
+  protected String commitTarget() {
+    return tableName;
+  }
+
+  @Override
+  protected String selectTarget() {
+    return String.format("%s VERSION AS OF '%s'", tableName, BRANCH);
+  }
+
+  // commit target in WAP is just the table name
+  // should use table + branch name instead for read

Review Comment:
   I don't agree with this. I think it has to read the branch if it exists.
   
   For example, a DELETE command is going to modify the state of a branch if it exists, and it could exist because branch WAP supports multiple writes. That means the reads for both the delete itself and any dynamic pruning must read the branch if it exists and main if it doesn't.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] amogh-jahagirdar commented on a diff in pull request #7050: Spark 3.3: support write to WAP branch

Posted by "amogh-jahagirdar (via GitHub)" <gi...@apache.org>.
amogh-jahagirdar commented on code in PR #7050:
URL: https://github.com/apache/iceberg/pull/7050#discussion_r1134622526


##########
spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java:
##########
@@ -1066,6 +1068,73 @@ public void testDeleteWithMultipleSpecs() {
         sql("SELECT * FROM %s ORDER BY id", selectTarget()));
   }
 
+  @Test
+  public void testDeleteToWapBranch() throws NoSuchTableException {
+    Assume.assumeTrue("WAP branch only works for table identifier without branch", branch == null);
+
+    createAndInitPartitionedTable();
+    sql(
+        "ALTER TABLE %s SET TBLPROPERTIES ('%s' = 'true')",
+        tableName, TableProperties.WRITE_AUDIT_PUBLISH_ENABLED);
+    append(new Employee(0, "hr"), new Employee(1, "hr"), new Employee(2, "hr"));
+
+    withSQLConf(
+        ImmutableMap.of(SparkSQLProperties.WAP_BRANCH, "wap"),
+        () -> {
+          sql("DELETE FROM %s t WHERE id=0", tableName);
+          Assert.assertEquals(
+              "Should have expected num of rows when reading table",
+              2L,
+              spark.table(tableName).count());
+          Assert.assertEquals(
+              "Should have expected num of rows when reading WAP branch",
+              2L,
+              spark.table(tableName + ".branch_wap").count());
+          Assert.assertEquals(
+              "Should not modify main branch", 3L, spark.table(tableName + ".branch_main").count());
+        });
+
+    withSQLConf(
+        ImmutableMap.of(SparkSQLProperties.WAP_BRANCH, "wap"),
+        () -> {
+          sql("DELETE FROM %s t WHERE id=1", tableName);
+          Assert.assertEquals(
+              "Should have expected num of rows when reading table with multiple writes",
+              1L,
+              spark.table(tableName).count());
+          Assert.assertEquals(
+              "Should have expected num of rows when reading WAP branch with multiple writes",
+              1L,
+              spark.table(tableName + ".branch_wap").count());
+          Assert.assertEquals(
+              "Should not modify main branch with multiple writes",
+              3L,
+              spark.table(tableName + ".branch_main").count());

Review Comment:
   Ah nvm, I misinterpreted, we actually do need this! 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue merged pull request #7050: Spark 3.3: support write to WAP branch

Posted by "rdblue (via GitHub)" <gi...@apache.org>.
rdblue merged PR #7050:
URL: https://github.com/apache/iceberg/pull/7050


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] amogh-jahagirdar commented on a diff in pull request #7050: Spark 3.3: support write to WAP branch

Posted by "amogh-jahagirdar (via GitHub)" <gi...@apache.org>.
amogh-jahagirdar commented on code in PR #7050:
URL: https://github.com/apache/iceberg/pull/7050#discussion_r1134583483


##########
spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java:
##########
@@ -1066,6 +1068,73 @@ public void testDeleteWithMultipleSpecs() {
         sql("SELECT * FROM %s ORDER BY id", selectTarget()));
   }
 
+  @Test
+  public void testDeleteToWapBranch() throws NoSuchTableException {
+    Assume.assumeTrue("WAP branch only works for table identifier without branch", branch == null);
+
+    createAndInitPartitionedTable();
+    sql(
+        "ALTER TABLE %s SET TBLPROPERTIES ('%s' = 'true')",
+        tableName, TableProperties.WRITE_AUDIT_PUBLISH_ENABLED);
+    append(new Employee(0, "hr"), new Employee(1, "hr"), new Employee(2, "hr"));
+
+    withSQLConf(
+        ImmutableMap.of(SparkSQLProperties.WAP_BRANCH, "wap"),
+        () -> {
+          sql("DELETE FROM %s t WHERE id=0", tableName);
+          Assert.assertEquals(
+              "Should have expected num of rows when reading table",
+              2L,
+              spark.table(tableName).count());
+          Assert.assertEquals(
+              "Should have expected num of rows when reading WAP branch",
+              2L,
+              spark.table(tableName + ".branch_wap").count());
+          Assert.assertEquals(
+              "Should not modify main branch", 3L, spark.table(tableName + ".branch_main").count());
+        });
+
+    withSQLConf(
+        ImmutableMap.of(SparkSQLProperties.WAP_BRANCH, "wap"),
+        () -> {
+          sql("DELETE FROM %s t WHERE id=1", tableName);
+          Assert.assertEquals(
+              "Should have expected num of rows when reading table with multiple writes",
+              1L,
+              spark.table(tableName).count());
+          Assert.assertEquals(
+              "Should have expected num of rows when reading WAP branch with multiple writes",
+              1L,
+              spark.table(tableName + ".branch_wap").count());
+          Assert.assertEquals(
+              "Should not modify main branch with multiple writes",
+              3L,
+              spark.table(tableName + ".branch_main").count());

Review Comment:
   Nit: I think for the purpose of this test we could just do `spark.table(tableName).count()` instead of main, don't think we need to validate the identifier read here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on pull request #7050: Spark 3.3: support write to WAP branch

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on PR #7050:
URL: https://github.com/apache/iceberg/pull/7050#issuecomment-1464996309

   I'll take a look on Monday.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #7050: Spark 3.3: support write to WAP branch

Posted by "rdblue (via GitHub)" <gi...@apache.org>.
rdblue commented on code in PR #7050:
URL: https://github.com/apache/iceberg/pull/7050#discussion_r1133141398


##########
spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java:
##########
@@ -1066,6 +1068,50 @@ public void testDeleteWithMultipleSpecs() {
         sql("SELECT * FROM %s ORDER BY id", selectTarget()));
   }
 
+  @Test
+  public void testDeleteToWAPBranch() throws NoSuchTableException {
+    Assume.assumeTrue("WAP branch only works for table identifier without branch", branch == null);
+
+    createAndInitPartitionedTable();
+    sql(
+        "ALTER TABLE %s SET TBLPROPERTIES ('%s' = 'true')",
+        tableName, TableProperties.WRITE_AUDIT_PUBLISH_ENABLED);
+    append(new Employee(0, "hr"), new Employee(1, "hr"), new Employee(2, "hr"));
+
+    try {
+      spark.conf().set(SparkSessionProperties.WAP_BRANCH, "wap");
+      sql("DELETE FROM %s t WHERE id=0", tableName);
+      Assert.assertEquals(
+          "Should have expected num of rows", 2L, spark.table(tableName + ".branch_wap").count());
+    } finally {
+      spark.conf().unset(SparkSessionProperties.WAP_BRANCH);
+    }
+  }
+
+  @Test
+  public void testDeleteToWAPBranchWithTableBranchIdentifier() throws NoSuchTableException {
+    Assume.assumeTrue("Test must have branch name part in table identifier", branch != null);
+
+    createAndInitPartitionedTable();
+    sql(
+        "ALTER TABLE %s SET TBLPROPERTIES ('%s' = 'true')",
+        tableName, TableProperties.WRITE_AUDIT_PUBLISH_ENABLED);
+    append(tableName, new Employee(0, "hr"), new Employee(1, "hr"), new Employee(2, "hr"));
+    createBranchIfNeeded();
+
+    try {
+      spark.conf().set(SparkSessionProperties.WAP_BRANCH, "wap");
+      Assertions.assertThatThrownBy(() -> sql("DELETE FROM %s t WHERE id=0", commitTarget()))
+          .isInstanceOf(ValidationException.class)
+          .hasMessage(
+              String.format(
+                  "Cannot set both branch and WAP branch, but got branch [%s] and WAP branch [wap]",

Review Comment:
   "Cannot write to" isntead of "Cannot set"?



##########
spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java:
##########
@@ -1066,6 +1068,50 @@ public void testDeleteWithMultipleSpecs() {
         sql("SELECT * FROM %s ORDER BY id", selectTarget()));
   }
 
+  @Test
+  public void testDeleteToWAPBranch() throws NoSuchTableException {
+    Assume.assumeTrue("WAP branch only works for table identifier without branch", branch == null);

Review Comment:
   I see that this is tested in the next case, so we're good.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #7050: Spark 3.3: support write to WAP branch

Posted by "rdblue (via GitHub)" <gi...@apache.org>.
rdblue commented on code in PR #7050:
URL: https://github.com/apache/iceberg/pull/7050#discussion_r1133141503


##########
spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java:
##########
@@ -2448,6 +2450,65 @@ public void testMergeNonExistingBranch() {
         .hasMessage("Cannot use branch (does not exist): test");
   }
 
+  @Test
+  public void testMergeToWAPBranch() {
+    Assume.assumeTrue("WAP branch only works for table identifier without branch", branch == null);
+
+    createAndInitTable("id INT", "{\"id\": -1}");
+    sql(
+        "ALTER TABLE %s SET TBLPROPERTIES ('%s' = 'true')",
+        tableName, TableProperties.WRITE_AUDIT_PUBLISH_ENABLED);
+    spark.range(0, 5).coalesce(1).createOrReplaceTempView("source");
+    ImmutableList<Object[]> expectedRows =
+        ImmutableList.of(row(-1), row(0), row(1), row(2), row(3), row(4));
+
+    try {
+      spark.conf().set(SparkSessionProperties.WAP_BRANCH, "wap");
+      sql(
+          "MERGE INTO %s t USING source s ON t.id = s.id "
+              + "WHEN MATCHED THEN UPDATE SET *"
+              + "WHEN NOT MATCHED THEN INSERT *",
+          tableName);
+      assertEquals(
+          "Should correctly merge to WAP branch",
+          expectedRows,
+          sql("SELECT * FROM %s.branch_wap ORDER BY id", tableName));

Review Comment:
   Same general comments as above. It would be nice to validate both main and the "wap" branch.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 commented on pull request #7050: Spark 3.3: support write to WAP branch

Posted by "jackye1995 (via GitHub)" <gi...@apache.org>.
jackye1995 commented on PR #7050:
URL: https://github.com/apache/iceberg/pull/7050#issuecomment-1465239889

   @rdblue @amogh-jahagirdar thanks for the suggestions, I have addressed all nit comments, and also fixed the scan issue and updated the tests to verify behavior of multiple writes. Could you take another look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #7050: Spark 3.3: support write to WAP branch

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on code in PR #7050:
URL: https://github.com/apache/iceberg/pull/7050#discussion_r1136507400


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java:
##########
@@ -333,6 +334,28 @@ public boolean caseSensitive() {
   }
 
   public String branch() {
+    if (wapEnabled()) {
+      String wapId = wapId();
+      String wapBranch =
+          confParser.stringConf().sessionConf(SparkSQLProperties.WAP_BRANCH).parseOptional();
+
+      ValidationException.check(
+          wapId == null || wapBranch == null,
+          "Cannot set both WAP ID and branch, but got ID [%s] and branch [%s]",
+          wapId,
+          wapBranch);
+
+      if (wapBranch != null) {
+        ValidationException.check(
+            branch == null,
+            "Cannot write to both branch and WAP branch, but got branch [%s] and WAP branch [%s]",
+            branch,
+            wapBranch);

Review Comment:
   It seems like a reasonable starting point to me.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #7050: Spark 3.3: support write to WAP branch

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on code in PR #7050:
URL: https://github.com/apache/iceberg/pull/7050#discussion_r1136505942


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java:
##########
@@ -333,6 +334,28 @@ public boolean caseSensitive() {
   }
 
   public String branch() {
+    if (wapEnabled()) {
+      String wapId = wapId();
+      String wapBranch =
+          confParser.stringConf().sessionConf(SparkSQLProperties.WAP_BRANCH).parseOptional();

Review Comment:
   nit: What about a separate method like we have for `wapId()`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 commented on a diff in pull request #7050: Spark 3.3: support write to WAP branch

Posted by "jackye1995 (via GitHub)" <gi...@apache.org>.
jackye1995 commented on code in PR #7050:
URL: https://github.com/apache/iceberg/pull/7050#discussion_r1133035339


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkSessionProperties.java:
##########
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark;
+
+public class SparkSessionProperties {

Review Comment:
   Seems like we have been hard-coding `spark.wap.id`, maybe it's better to have a class to hold these commonly used session properties?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #7050: Spark 3.3: support write to WAP branch

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on code in PR #7050:
URL: https://github.com/apache/iceberg/pull/7050#discussion_r1133138763


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkSessionProperties.java:
##########
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark;
+
+public class SparkSessionProperties {

Review Comment:
   Can we use existing `SparkSQLProperties`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 commented on a diff in pull request #7050: Spark 3.3: support write to WAP branch

Posted by "jackye1995 (via GitHub)" <gi...@apache.org>.
jackye1995 commented on code in PR #7050:
URL: https://github.com/apache/iceberg/pull/7050#discussion_r1133149171


##########
spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java:
##########
@@ -1066,6 +1068,50 @@ public void testDeleteWithMultipleSpecs() {
         sql("SELECT * FROM %s ORDER BY id", selectTarget()));
   }
 
+  @Test
+  public void testDeleteToWAPBranch() throws NoSuchTableException {
+    Assume.assumeTrue("WAP branch only works for table identifier without branch", branch == null);
+
+    createAndInitPartitionedTable();
+    sql(
+        "ALTER TABLE %s SET TBLPROPERTIES ('%s' = 'true')",
+        tableName, TableProperties.WRITE_AUDIT_PUBLISH_ENABLED);
+    append(new Employee(0, "hr"), new Employee(1, "hr"), new Employee(2, "hr"));
+
+    try {
+      spark.conf().set(SparkSessionProperties.WAP_BRANCH, "wap");

Review Comment:
   Good point, I copied this from other tests, seems like we can also update those tests with the helper.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 closed pull request #7050: Spark 3.3: support write to WAP branch

Posted by "jackye1995 (via GitHub)" <gi...@apache.org>.
jackye1995 closed pull request #7050: Spark 3.3: support write to WAP branch
URL: https://github.com/apache/iceberg/pull/7050


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 commented on a diff in pull request #7050: Spark 3.3: support write to WAP branch

Posted by "jackye1995 (via GitHub)" <gi...@apache.org>.
jackye1995 commented on code in PR #7050:
URL: https://github.com/apache/iceberg/pull/7050#discussion_r1133037086


##########
spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/sql/TestPartitionedWritesToWAPBranch.java:
##########
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.sql;
+
+import java.util.Map;
+import java.util.UUID;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.spark.SparkSessionProperties;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.assertj.core.api.Assertions;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestPartitionedWritesToWAPBranch extends PartitionedWritesTestBase {

Review Comment:
   I did not add `TestUnpartitionedWritesToWAPBranch`, seems unnecessary to add more tests because it does not test additional case in the code path.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 commented on a diff in pull request #7050: Spark 3.3: support write to WAP branch

Posted by "jackye1995 (via GitHub)" <gi...@apache.org>.
jackye1995 commented on code in PR #7050:
URL: https://github.com/apache/iceberg/pull/7050#discussion_r1134380400


##########
spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java:
##########
@@ -1066,6 +1068,50 @@ public void testDeleteWithMultipleSpecs() {
         sql("SELECT * FROM %s ORDER BY id", selectTarget()));
   }
 
+  @Test
+  public void testDeleteToWAPBranch() throws NoSuchTableException {
+    Assume.assumeTrue("WAP branch only works for table identifier without branch", branch == null);
+
+    createAndInitPartitionedTable();
+    sql(
+        "ALTER TABLE %s SET TBLPROPERTIES ('%s' = 'true')",
+        tableName, TableProperties.WRITE_AUDIT_PUBLISH_ENABLED);
+    append(new Employee(0, "hr"), new Employee(1, "hr"), new Employee(2, "hr"));
+
+    try {
+      spark.conf().set(SparkSessionProperties.WAP_BRANCH, "wap");

Review Comment:
   I see, I copied test based on `testDeleteThatRequiresGroupingBeforeWrite`, that's the only one not using the helper, because it needs to set the shuffle partitions back to the original value instead of just unset.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 commented on a diff in pull request #7050: Spark 3.3: support write to WAP branch

Posted by "jackye1995 (via GitHub)" <gi...@apache.org>.
jackye1995 commented on code in PR #7050:
URL: https://github.com/apache/iceberg/pull/7050#discussion_r1134380849


##########
spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java:
##########
@@ -1066,6 +1068,50 @@ public void testDeleteWithMultipleSpecs() {
         sql("SELECT * FROM %s ORDER BY id", selectTarget()));
   }
 
+  @Test
+  public void testDeleteToWAPBranch() throws NoSuchTableException {
+    Assume.assumeTrue("WAP branch only works for table identifier without branch", branch == null);
+
+    createAndInitPartitionedTable();
+    sql(
+        "ALTER TABLE %s SET TBLPROPERTIES ('%s' = 'true')",
+        tableName, TableProperties.WRITE_AUDIT_PUBLISH_ENABLED);
+    append(new Employee(0, "hr"), new Employee(1, "hr"), new Employee(2, "hr"));
+
+    try {
+      spark.conf().set(SparkSessionProperties.WAP_BRANCH, "wap");
+      sql("DELETE FROM %s t WHERE id=0", tableName);
+      Assert.assertEquals(
+          "Should have expected num of rows", 2L, spark.table(tableName + ".branch_wap").count());

Review Comment:
   added



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 commented on pull request #7050: Spark 3.3: support write to WAP branch

Posted by "jackye1995 (via GitHub)" <gi...@apache.org>.
jackye1995 commented on PR #7050:
URL: https://github.com/apache/iceberg/pull/7050#issuecomment-1465272821

   > Should we also validate that tableName + ".branch_main" has not been modified?
   
   Overlooked this one, will add some lines to verify in a few hours.  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #7050: Spark 3.3: support write to WAP branch

Posted by "rdblue (via GitHub)" <gi...@apache.org>.
rdblue commented on code in PR #7050:
URL: https://github.com/apache/iceberg/pull/7050#discussion_r1133141321


##########
spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java:
##########
@@ -1066,6 +1068,50 @@ public void testDeleteWithMultipleSpecs() {
         sql("SELECT * FROM %s ORDER BY id", selectTarget()));
   }
 
+  @Test
+  public void testDeleteToWAPBranch() throws NoSuchTableException {
+    Assume.assumeTrue("WAP branch only works for table identifier without branch", branch == null);
+
+    createAndInitPartitionedTable();
+    sql(
+        "ALTER TABLE %s SET TBLPROPERTIES ('%s' = 'true')",
+        tableName, TableProperties.WRITE_AUDIT_PUBLISH_ENABLED);
+    append(new Employee(0, "hr"), new Employee(1, "hr"), new Employee(2, "hr"));
+
+    try {
+      spark.conf().set(SparkSessionProperties.WAP_BRANCH, "wap");
+      sql("DELETE FROM %s t WHERE id=0", tableName);
+      Assert.assertEquals(
+          "Should have expected num of rows", 2L, spark.table(tableName + ".branch_wap").count());

Review Comment:
   Is this branch read necessary or is it the default in WAP? I think it needs to be the default so that dynamic pruning works. That said, it does make sense to allow reading specific branches, and to read a specific branch here to validate behavior.
   
   Should we also validate that `tableName + ".branch_main"` has not been modified?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #7050: Spark 3.3: support write to WAP branch

Posted by "rdblue (via GitHub)" <gi...@apache.org>.
rdblue commented on code in PR #7050:
URL: https://github.com/apache/iceberg/pull/7050#discussion_r1133141092


##########
spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java:
##########
@@ -1066,6 +1068,50 @@ public void testDeleteWithMultipleSpecs() {
         sql("SELECT * FROM %s ORDER BY id", selectTarget()));
   }
 
+  @Test
+  public void testDeleteToWAPBranch() throws NoSuchTableException {
+    Assume.assumeTrue("WAP branch only works for table identifier without branch", branch == null);
+
+    createAndInitPartitionedTable();
+    sql(
+        "ALTER TABLE %s SET TBLPROPERTIES ('%s' = 'true')",
+        tableName, TableProperties.WRITE_AUDIT_PUBLISH_ENABLED);
+    append(new Employee(0, "hr"), new Employee(1, "hr"), new Employee(2, "hr"));
+
+    try {
+      spark.conf().set(SparkSessionProperties.WAP_BRANCH, "wap");

Review Comment:
   Don't we have a helper to run a lambda with this try/finally to set a property?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #7050: Spark 3.3: support write to WAP branch

Posted by "rdblue (via GitHub)" <gi...@apache.org>.
rdblue commented on code in PR #7050:
URL: https://github.com/apache/iceberg/pull/7050#discussion_r1133141642


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/Spark3Util.java:
##########
@@ -937,6 +938,28 @@ public static org.apache.spark.sql.catalyst.TableIdentifier toV1TableIdentifier(
     return org.apache.spark.sql.catalyst.TableIdentifier.apply(table, database);
   }
 
+  public static String determineWriteBranch(

Review Comment:
   I think it makes more sense to embed this in `SparkWriteConf.branch()`. `wapId` and `wapEnabled` are constant so it should be fine.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #7050: Spark 3.3: support write to WAP branch

Posted by "rdblue (via GitHub)" <gi...@apache.org>.
rdblue commented on code in PR #7050:
URL: https://github.com/apache/iceberg/pull/7050#discussion_r1134707787


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java:
##########
@@ -333,6 +334,28 @@ public boolean caseSensitive() {
   }
 
   public String branch() {
+    if (wapEnabled()) {
+      String wapId = wapId();
+      String wapBranch =
+          confParser.stringConf().sessionConf(SparkSQLProperties.WAP_BRANCH).parseOptional();
+
+      ValidationException.check(
+          wapId == null || wapBranch == null,
+          "Cannot set both WAP ID and branch, but got ID [%s] and branch [%s]",
+          wapId,
+          wapBranch);
+
+      if (wapBranch != null) {
+        ValidationException.check(
+            branch == null,
+            "Cannot write to both branch and WAP branch, but got branch [%s] and WAP branch [%s]",
+            branch,
+            wapBranch);

Review Comment:
   I don't think this behavior is a blocker because it is strict, but I would expect to be able to write to another branch with the WAP branch set. I'm curious what other people think the long-term behavior should be.
   
   I think this behavior does help ensure that there are no side-effects, which is good if you want people to trust the pattern. But that's undermined by enabling/disabling WAP on a per-table basis.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on pull request #7050: Spark 3.3: support write to WAP branch

Posted by "rdblue (via GitHub)" <gi...@apache.org>.
rdblue commented on PR #7050:
URL: https://github.com/apache/iceberg/pull/7050#issuecomment-1467111378

   Thanks @jackye1995! Looks great.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #7050: Spark 3.3: support write to WAP branch

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on code in PR #7050:
URL: https://github.com/apache/iceberg/pull/7050#discussion_r1136504884


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java:
##########
@@ -98,7 +98,22 @@ public String branch() {
             + "got [%s] in identifier and [%s] in options",
         branch,
         optionBranch);
-    return branch != null ? branch : optionBranch;
+    String inputBranch = branch != null ? branch : optionBranch;
+    if (inputBranch != null) {
+      return inputBranch;
+    }
+
+    boolean wapEnabled =

Review Comment:
   nit: I'd prefer a separate method called `wapEnabled()` like we have in SparkWriteConf. Then we could use the constant for the default value and it would simplify this method.
   
   ```
   public boolean wapEnabled() {
     return confParser
         .booleanConf()
         .tableProperty(TableProperties.WRITE_AUDIT_PUBLISH_ENABLED)
         .defaultValue(TableProperties.WRITE_AUDIT_PUBLISH_ENABLED_DEFAULT)
         .parse();
   }
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #7050: Spark 3.3: support write to WAP branch

Posted by "rdblue (via GitHub)" <gi...@apache.org>.
rdblue commented on code in PR #7050:
URL: https://github.com/apache/iceberg/pull/7050#discussion_r1133141686


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkSessionProperties.java:
##########
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark;
+
+public class SparkSessionProperties {

Review Comment:
   +1 for `SparkSQLProperties`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org