You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2021/01/27 04:13:46 UTC

[spark] branch branch-3.1 updated: [SPARK-34236][SQL] Fix v2 Overwrite w/ null static partition raise Cannot translate expression to source filter: null

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new c9b0dd5  [SPARK-34236][SQL] Fix v2 Overwrite w/ null static partition raise Cannot translate expression to source filter: null
c9b0dd5 is described below

commit c9b0dd54d0efe1d045d738c917bfcdd936ebe19d
Author: Kent Yao <ya...@apache.org>
AuthorDate: Wed Jan 27 12:05:50 2021 +0800

    [SPARK-34236][SQL] Fix v2 Overwrite w/ null static partition raise Cannot translate expression to source filter: null
    
    ### What changes were proposed in this pull request?
    
    For v2 static partitions overwriting, we use `EqualTo ` to generate the `deleteExpr`
    
    This is not right for null partition values, and cause the problem like below because `ConstantFolding` converts it to lit(null)
    
    ```scala
    SPARK-34223: static partition with null raise NPE *** FAILED *** (19 milliseconds)
    [info]   org.apache.spark.sql.AnalysisException: Cannot translate expression to source filter: null
    [info]   at org.apache.spark.sql.execution.datasources.v2.V2Writes$$anonfun$apply$1.$anonfun$applyOrElse$1(V2Writes.scala:50)
    [info]   at scala.collection.immutable.List.flatMap(List.scala:366)
    [info]   at org.apache.spark.sql.execution.datasources.v2.V2Writes$$anonfun$apply$1.applyOrElse(V2Writes.scala:47)
    [info]   at org.apache.spark.sql.execution.datasources.v2.V2Writes$$anonfun$apply$1.applyOrElse(V2Writes.scala:39)
    [info]   at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$1(TreeNode.scala:317)
    [info]   at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:73)
    ```
    
    The right way is to use EqualNullSafe instead to delete the null partitions.
    
    ### Why are the changes needed?
    
    bugfix
    
    ### Does this PR introduce _any_ user-facing change?
    
    no
    ### How was this patch tested?
    
    an original test to new place
    
    Closes #31339 from yaooqinn/SPARK-34236.
    
    Authored-by: Kent Yao <ya...@apache.org>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
    (cherry picked from commit 91ca21d7006169d95940506b8de154b96b4fae20)
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../org/apache/spark/sql/catalyst/analysis/Analyzer.scala |  2 +-
 .../org/apache/spark/sql/connector/InMemoryTable.scala    | 15 ++++++++++++++-
 .../scala/org/apache/spark/sql/SQLInsertTestSuite.scala   |  8 ++++++++
 .../scala/org/apache/spark/sql/sources/InsertSuite.scala  |  8 --------
 4 files changed, 23 insertions(+), 10 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 4274759..fb95323a 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -1295,7 +1295,7 @@ class Analyzer(override val catalogManager: CatalogManager)
               // ResolveOutputRelation runs, using the query's column names that will match the
               // table names at that point. because resolution happens after a future rule, create
               // an UnresolvedAttribute.
-              EqualTo(UnresolvedAttribute(attr.name), Cast(Literal(value), attr.dataType))
+              EqualNullSafe(UnresolvedAttribute(attr.name), Cast(Literal(value), attr.dataType))
             case None =>
               throw QueryCompilationErrors.unknownStaticPartitionColError(name)
           }
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTable.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTable.scala
index aac3077..a7010a9 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTable.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTable.scala
@@ -34,7 +34,7 @@ import org.apache.spark.sql.connector.expressions.{BucketTransform, DaysTransfor
 import org.apache.spark.sql.connector.read._
 import org.apache.spark.sql.connector.write._
 import org.apache.spark.sql.connector.write.streaming.{StreamingDataWriterFactory, StreamingWrite}
-import org.apache.spark.sql.sources.{And, EqualTo, Filter, IsNotNull}
+import org.apache.spark.sql.sources.{And, EqualNullSafe, EqualTo, Filter, IsNotNull, IsNull}
 import org.apache.spark.sql.types.{DataType, DateType, StringType, StructField, StructType, TimestampType}
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 import org.apache.spark.unsafe.types.UTF8String
@@ -366,6 +366,17 @@ object InMemoryTable {
       filters.flatMap(splitAnd).forall {
         case EqualTo(attr, value) =>
           value == extractValue(attr, partitionNames, partValues)
+        case EqualNullSafe(attr, value) =>
+          val attrVal = extractValue(attr, partitionNames, partValues)
+          if (attrVal == null && value === null) {
+            true
+          } else if (attrVal == null || value === null) {
+            false
+          } else {
+            value == attrVal
+          }
+        case IsNull(attr) =>
+          null == extractValue(attr, partitionNames, partValues)
         case IsNotNull(attr) =>
           null != extractValue(attr, partitionNames, partValues)
         case f =>
@@ -377,6 +388,8 @@ object InMemoryTable {
   def supportsFilters(filters: Array[Filter]): Boolean = {
     filters.flatMap(splitAnd).forall {
       case _: EqualTo => true
+      case _: EqualNullSafe => true
+      case _: IsNull => true
       case _: IsNotNull => true
       case _ => false
     }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLInsertTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLInsertTestSuite.scala
index 12394a9..c7446c7 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLInsertTestSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLInsertTestSuite.scala
@@ -200,6 +200,14 @@ trait SQLInsertTestSuite extends QueryTest with SQLTestUtils {
         e1.getMessage.contains(v2Msg))
     }
   }
+
+  test("SPARK-34223: static partition with null raise NPE") {
+    withTable("t") {
+      sql(s"CREATE TABLE t(i STRING, c string) USING PARQUET PARTITIONED BY (c)")
+      sql("INSERT OVERWRITE t PARTITION (c=null) VALUES ('1')")
+      checkAnswer(spark.table("t"), Row("1", null))
+    }
+  }
 }
 
 class FileSourceSQLInsertTestSuite extends SQLInsertTestSuite with SharedSparkSession {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
index 6082f06..aaf8765 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
@@ -954,14 +954,6 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
       assert(msg.contains("cannot resolve '`c3`' given input columns"))
     }
   }
-
-  test("SPARK-34223: static partition with null raise NPE") {
-    withTable("t") {
-      sql(s"CREATE TABLE t(i STRING, c string) USING PARQUET PARTITIONED BY (c)")
-      sql("INSERT OVERWRITE t PARTITION (c=null) VALUES ('1')")
-      checkAnswer(spark.table("t"), Row("1", null))
-    }
-  }
 }
 
 class FileExistingTestFileSystem extends RawLocalFileSystem {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org