You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by "amogh-jahagirdar (via GitHub)" <gi...@apache.org> on 2023/03/11 21:42:08 UTC

[GitHub] [iceberg] amogh-jahagirdar commented on a diff in pull request #7050: Spark 3.3: support write to WAP branch

amogh-jahagirdar commented on code in PR #7050:
URL: https://github.com/apache/iceberg/pull/7050#discussion_r1133150649


##########
spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/sql/TestPartitionedWritesToWAPBranch.java:
##########
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.sql;
+
+import java.util.Map;
+import java.util.UUID;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.spark.SparkSessionProperties;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.assertj.core.api.Assertions;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestPartitionedWritesToWAPBranch extends PartitionedWritesTestBase {
+
+  private static final String BRANCH = "test";
+
+  public TestPartitionedWritesToWAPBranch(
+      String catalogName, String implementation, Map<String, String> config) {
+    super(catalogName, implementation, config);
+  }
+
+  @Before
+  @Override
+  public void createTables() {
+    spark.conf().set(SparkSessionProperties.WAP_BRANCH, BRANCH);
+    sql(
+        "CREATE TABLE %s (id bigint, data string) USING iceberg PARTITIONED BY (truncate(id, 3)) OPTIONS (%s = 'true')",
+        tableName, TableProperties.WRITE_AUDIT_PUBLISH_ENABLED);
+    sql("INSERT INTO %s VALUES (1, 'a'), (2, 'b'), (3, 'c')", tableName);
+  }
+
+  @After
+  @Override
+  public void removeTables() {
+    super.removeTables();
+    spark.conf().unset(SparkSessionProperties.WAP_BRANCH);
+    spark.conf().unset(SparkSessionProperties.WAP_ID);
+  }
+
+  @Override
+  protected String commitTarget() {
+    return tableName;
+  }
+
+  @Override
+  protected String selectTarget() {
+    return String.format("%s VERSION AS OF '%s'", tableName, BRANCH);
+  }
+
+  // commit target in WAP is just the table name
+  // should use table + branch name instead for read
+  @Test
+  @Override
+  public void testViewsReturnRecentResults() {
+    Assert.assertEquals(
+        "Should have 3 rows", 3L, scalarSql("SELECT count(*) FROM %s", selectTarget()));
+
+    Dataset<Row> query =
+        spark.sql("SELECT * FROM " + tableName + ".branch_" + BRANCH + " WHERE id = 1");
+    query.createOrReplaceTempView("tmp");
+
+    assertEquals(
+        "View should have expected rows", ImmutableList.of(row(1L, "a")), sql("SELECT * FROM tmp"));
+
+    sql("INSERT INTO TABLE %s VALUES (1, 'a')", commitTarget());

Review Comment:
   very minor nit: If we could make the second insert, insert another value for the second column it makes it a bit more obvious this is another record just from reading the expectation. but very minor, just something to make the test more apparent imo.



##########
spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/sql/TestPartitionedWritesToWAPBranch.java:
##########
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.sql;
+
+import java.util.Map;
+import java.util.UUID;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.spark.SparkSessionProperties;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.assertj.core.api.Assertions;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestPartitionedWritesToWAPBranch extends PartitionedWritesTestBase {
+
+  private static final String BRANCH = "test";
+
+  public TestPartitionedWritesToWAPBranch(
+      String catalogName, String implementation, Map<String, String> config) {
+    super(catalogName, implementation, config);
+  }
+
+  @Before
+  @Override
+  public void createTables() {
+    spark.conf().set(SparkSessionProperties.WAP_BRANCH, BRANCH);
+    sql(
+        "CREATE TABLE %s (id bigint, data string) USING iceberg PARTITIONED BY (truncate(id, 3)) OPTIONS (%s = 'true')",
+        tableName, TableProperties.WRITE_AUDIT_PUBLISH_ENABLED);
+    sql("INSERT INTO %s VALUES (1, 'a'), (2, 'b'), (3, 'c')", tableName);
+  }
+
+  @After
+  @Override
+  public void removeTables() {
+    super.removeTables();
+    spark.conf().unset(SparkSessionProperties.WAP_BRANCH);
+    spark.conf().unset(SparkSessionProperties.WAP_ID);
+  }
+
+  @Override
+  protected String commitTarget() {
+    return tableName;
+  }
+
+  @Override
+  protected String selectTarget() {
+    return String.format("%s VERSION AS OF '%s'", tableName, BRANCH);
+  }
+
+  // commit target in WAP is just the table name
+  // should use table + branch name instead for read

Review Comment:
   Yeah we need to use the `branch` during planning of the write to handle the dynamic pruning case, that's what came up in https://github.com/apache/iceberg/pull/6651/files. If we pass the branch through the SparkTable that should take care of this? Whether it's WAP branch or not doesn't matter, just needs to read the branch



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/Spark3Util.java:
##########
@@ -937,6 +938,28 @@ public static org.apache.spark.sql.catalyst.TableIdentifier toV1TableIdentifier(
     return org.apache.spark.sql.catalyst.TableIdentifier.apply(table, database);
   }
 
+  public static String determineWriteBranch(

Review Comment:
   Yeah just took a look in the code, there's no real computation for `wapId` and `wapEnabled` they're just constants, so I'm in favor of having it in `SparkWriteConf` as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org