You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by am...@apache.org on 2023/05/20 19:05:52 UTC

[iceberg] branch master updated: Spark: Backport fix NPE when create branch and tag on table without snapshot (#7659)

This is an automated email from the ASF dual-hosted git repository.

amoghj pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 950daa5702 Spark: Backport fix NPE when create branch and tag on table without snapshot (#7659)
950daa5702 is described below

commit 950daa5702bf9ac7490b273c8f7d3132817c2491
Author: Hongyue/Steve Zhang <st...@gmail.com>
AuthorDate: Sat May 20 12:05:47 2023 -0700

    Spark: Backport fix NPE when create branch and tag on table without snapshot (#7659)
    
    Co-authored-by: Steve Zhang <ho...@apple.com>
---
 .../datasources/v2/CreateOrReplaceBranchExec.scala         | 14 +++++++++++---
 .../org/apache/iceberg/spark/extensions/TestBranchDDL.java | 10 ++++++++++
 .../datasources/v2/CreateOrReplaceBranchExec.scala         | 14 +++++++++++---
 .../org/apache/iceberg/spark/extensions/TestBranchDDL.java | 10 ++++++++++
 .../datasources/v2/CreateOrReplaceBranchExec.scala         | 14 +++++++++++---
 .../execution/datasources/v2/CreateOrReplaceTagExec.scala  | 14 +++++++++++---
 .../org/apache/iceberg/spark/extensions/TestBranchDDL.java | 10 ++++++++++
 .../org/apache/iceberg/spark/extensions/TestTagDDL.java    | 10 ++++++++++
 8 files changed, 84 insertions(+), 12 deletions(-)

diff --git a/spark/v3.1/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateOrReplaceBranchExec.scala b/spark/v3.1/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateOrReplaceBranchExec.scala
index 8110112445..9b378cf84e 100644
--- a/spark/v3.1/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateOrReplaceBranchExec.scala
+++ b/spark/v3.1/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateOrReplaceBranchExec.scala
@@ -19,6 +19,7 @@
 
 package org.apache.spark.sql.execution.datasources.v2
 
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions
 import org.apache.iceberg.spark.source.SparkTable
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.Attribute
@@ -41,10 +42,17 @@ case class CreateOrReplaceBranchExec(
   override protected def run(): Seq[InternalRow] = {
     catalog.loadTable(ident) match {
       case iceberg: SparkTable =>
-        val snapshotId = branchOptions.snapshotId.getOrElse(iceberg.table.currentSnapshot().snapshotId())
+        val snapshotId: java.lang.Long = branchOptions.snapshotId
+          .orElse(Option(iceberg.table.currentSnapshot()).map(_.snapshotId()))
+          .map(java.lang.Long.valueOf)
+          .orNull
+
+        Preconditions.checkArgument(snapshotId != null,
+          "Cannot complete create or replace branch operation on %s, main has no snapshot", ident)
+
         val manageSnapshots = iceberg.table().manageSnapshots()
         if (!replace) {
-          val ref = iceberg.table().refs().get(branch);
+          val ref = iceberg.table().refs().get(branch)
           if (ref != null && ifNotExists) {
             return Nil
           }
@@ -76,6 +84,6 @@ case class CreateOrReplaceBranchExec(
   }
 
   override def simpleString(maxFields: Int): String = {
-    s"CreateOrReplace branch: ${branch} for table: ${ident.quoted}"
+    s"CreateOrReplace branch: $branch for table: ${ident.quoted}"
   }
 }
diff --git a/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestBranchDDL.java b/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestBranchDDL.java
index 6c41e023be..76e92f624f 100644
--- a/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestBranchDDL.java
+++ b/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestBranchDDL.java
@@ -31,6 +31,7 @@ import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
 import org.apache.spark.sql.catalyst.parser.extensions.IcebergParseException;
+import org.assertj.core.api.Assertions;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -95,6 +96,15 @@ public class TestBranchDDL extends SparkExtensionsTestBase {
     }
   }
 
+  @Test
+  public void testCreateBranchOnEmptyTable() {
+    Assertions.assertThatThrownBy(() -> sql("ALTER TABLE %s CREATE BRANCH %s", tableName, "b1"))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessageContaining(
+            "Cannot complete create or replace branch operation on %s, main has no snapshot",
+            tableName);
+  }
+
   @Test
   public void testCreateBranchUseDefaultConfig() throws NoSuchTableException {
     Table table = insertRows();
diff --git a/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateOrReplaceBranchExec.scala b/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateOrReplaceBranchExec.scala
index 1c1f94e153..8552ab132f 100644
--- a/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateOrReplaceBranchExec.scala
+++ b/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateOrReplaceBranchExec.scala
@@ -19,6 +19,7 @@
 
 package org.apache.spark.sql.execution.datasources.v2
 
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions
 import org.apache.iceberg.spark.source.SparkTable
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.Attribute
@@ -41,10 +42,17 @@ case class CreateOrReplaceBranchExec(
   override protected def run(): Seq[InternalRow] = {
     catalog.loadTable(ident) match {
       case iceberg: SparkTable =>
-        val snapshotId = branchOptions.snapshotId.getOrElse(iceberg.table.currentSnapshot().snapshotId())
+        val snapshotId: java.lang.Long = branchOptions.snapshotId
+          .orElse(Option(iceberg.table.currentSnapshot()).map(_.snapshotId()))
+          .map(java.lang.Long.valueOf)
+          .orNull
+
+        Preconditions.checkArgument(snapshotId != null,
+          "Cannot complete create or replace branch operation on %s, main has no snapshot", ident)
+
         val manageSnapshots = iceberg.table().manageSnapshots()
         if (!replace) {
-          val ref = iceberg.table().refs().get(branch);
+          val ref = iceberg.table().refs().get(branch)
           if (ref != null && ifNotExists) {
             return Nil
           }
@@ -76,6 +84,6 @@ case class CreateOrReplaceBranchExec(
   }
 
   override def simpleString(maxFields: Int): String = {
-    s"CreateOrReplace branch: ${branch} for table: ${ident.quoted}"
+    s"CreateOrReplace branch: $branch for table: ${ident.quoted}"
   }
 }
diff --git a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestBranchDDL.java b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestBranchDDL.java
index c6d92243bf..55a2a1c142 100644
--- a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestBranchDDL.java
+++ b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestBranchDDL.java
@@ -31,6 +31,7 @@ import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
 import org.apache.spark.sql.catalyst.parser.extensions.IcebergParseException;
+import org.assertj.core.api.Assertions;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -96,6 +97,15 @@ public class TestBranchDDL extends SparkExtensionsTestBase {
     }
   }
 
+  @Test
+  public void testCreateBranchOnEmptyTable() {
+    Assertions.assertThatThrownBy(() -> sql("ALTER TABLE %s CREATE BRANCH %s", tableName, "b1"))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessageContaining(
+            "Cannot complete create or replace branch operation on %s, main has no snapshot",
+            tableName);
+  }
+
   @Test
   public void testCreateBranchUseDefaultConfig() throws NoSuchTableException {
     Table table = insertRows();
diff --git a/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateOrReplaceBranchExec.scala b/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateOrReplaceBranchExec.scala
index 08230afb5a..6457875b15 100644
--- a/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateOrReplaceBranchExec.scala
+++ b/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateOrReplaceBranchExec.scala
@@ -19,6 +19,7 @@
 
 package org.apache.spark.sql.execution.datasources.v2
 
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions
 import org.apache.iceberg.spark.source.SparkTable
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.Attribute
@@ -41,10 +42,17 @@ case class CreateOrReplaceBranchExec(
   override protected def run(): Seq[InternalRow] = {
     catalog.loadTable(ident) match {
       case iceberg: SparkTable =>
-        val snapshotId = branchOptions.snapshotId.getOrElse(iceberg.table.currentSnapshot().snapshotId())
+        val snapshotId: java.lang.Long = branchOptions.snapshotId
+          .orElse(Option(iceberg.table.currentSnapshot()).map(_.snapshotId()))
+          .map(java.lang.Long.valueOf)
+          .orNull
+
+        Preconditions.checkArgument(snapshotId != null,
+          "Cannot complete create or replace branch operation on %s, main has no snapshot", ident)
+
         val manageSnapshots = iceberg.table().manageSnapshots()
         if (!replace) {
-          val ref = iceberg.table().refs().get(branch);
+          val ref = iceberg.table().refs().get(branch)
           if (ref != null && ifNotExists) {
             return Nil
           }
@@ -76,6 +84,6 @@ case class CreateOrReplaceBranchExec(
   }
 
   override def simpleString(maxFields: Int): String = {
-    s"CreateOrReplace branch: ${branch} for table: ${ident.quoted}"
+    s"CreateOrReplace branch: $branch for table: ${ident.quoted}"
   }
 }
diff --git a/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateOrReplaceTagExec.scala b/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateOrReplaceTagExec.scala
index d41f9f03ff..7ca193d1b1 100644
--- a/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateOrReplaceTagExec.scala
+++ b/spark/v3.3/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateOrReplaceTagExec.scala
@@ -19,6 +19,7 @@
 
 package org.apache.spark.sql.execution.datasources.v2
 
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions
 import org.apache.iceberg.spark.source.SparkTable
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.Attribute
@@ -40,10 +41,17 @@ case class CreateOrReplaceTagExec(
   override protected def run(): Seq[InternalRow] = {
     catalog.loadTable(ident) match {
       case iceberg: SparkTable =>
-        val snapshotId = tagOptions.snapshotId.getOrElse(iceberg.table.currentSnapshot().snapshotId())
+        val snapshotId: java.lang.Long = tagOptions.snapshotId
+          .orElse(Option(iceberg.table.currentSnapshot()).map(_.snapshotId()))
+          .map(java.lang.Long.valueOf)
+          .orNull
+
+        Preconditions.checkArgument(snapshotId != null,
+          "Cannot complete create or replace tag operation on %s, main has no snapshot", ident)
+
         val manageSnapshot = iceberg.table.manageSnapshots()
         if (!replace) {
-          val ref = iceberg.table().refs().get(tag);
+          val ref = iceberg.table().refs().get(tag)
           if (ref != null && ifNotExists) {
             return Nil
           }
@@ -67,6 +75,6 @@ case class CreateOrReplaceTagExec(
   }
 
   override def simpleString(maxFields: Int): String = {
-    s"Create tag: ${tag} for table: ${ident.quoted}"
+    s"Create tag: $tag for table: ${ident.quoted}"
   }
 }
diff --git a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestBranchDDL.java b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestBranchDDL.java
index cc60be55ba..5dd8c75186 100644
--- a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestBranchDDL.java
+++ b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestBranchDDL.java
@@ -31,6 +31,7 @@ import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
 import org.apache.spark.sql.catalyst.parser.extensions.IcebergParseException;
+import org.assertj.core.api.Assertions;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -89,6 +90,15 @@ public class TestBranchDDL extends SparkExtensionsTestBase {
         () -> sql("ALTER TABLE %s CREATE BRANCH %s", tableName, branchName));
   }
 
+  @Test
+  public void testCreateBranchOnEmptyTable() {
+    Assertions.assertThatThrownBy(() -> sql("ALTER TABLE %s CREATE BRANCH %s", tableName, "b1"))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessageContaining(
+            "Cannot complete create or replace branch operation on %s, main has no snapshot",
+            tableName);
+  }
+
   @Test
   public void testCreateBranchUseDefaultConfig() throws NoSuchTableException {
     Table table = insertRows();
diff --git a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestTagDDL.java b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestTagDDL.java
index 25efaaf766..ec3148de6c 100644
--- a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestTagDDL.java
+++ b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestTagDDL.java
@@ -33,6 +33,7 @@ import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
 import org.apache.spark.sql.catalyst.parser.extensions.IcebergParseException;
+import org.assertj.core.api.Assertions;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -119,6 +120,15 @@ public class TestTagDDL extends SparkExtensionsTestBase {
                 tableName, tagName, firstSnapshotId, maxRefAge));
   }
 
+  @Test
+  public void testCreateTagOnEmptyTable() {
+    Assertions.assertThatThrownBy(() -> sql("ALTER TABLE %s CREATE TAG %s", tableName, "abc"))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessageContaining(
+            "Cannot complete create or replace tag operation on %s, main has no snapshot",
+            tableName);
+  }
+
   @Test
   public void testCreateTagUseDefaultConfig() throws NoSuchTableException {
     Table table = insertRows();