You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2021/01/27 04:13:46 UTC
[spark] branch branch-3.1 updated: [SPARK-34236][SQL] Fix v2
Overwrite w/ null static partition raise Cannot translate expression to
source filter: null
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new c9b0dd5 [SPARK-34236][SQL] Fix v2 Overwrite w/ null static partition raise Cannot translate expression to source filter: null
c9b0dd5 is described below
commit c9b0dd54d0efe1d045d738c917bfcdd936ebe19d
Author: Kent Yao <ya...@apache.org>
AuthorDate: Wed Jan 27 12:05:50 2021 +0800
[SPARK-34236][SQL] Fix v2 Overwrite w/ null static partition raise Cannot translate expression to source filter: null
### What changes were proposed in this pull request?
For v2 static partitions overwriting, we use `EqualTo ` to generate the `deleteExpr`
This is not right for null partition values, and cause the problem like below because `ConstantFolding` converts it to lit(null)
```scala
SPARK-34223: static partition with null raise NPE *** FAILED *** (19 milliseconds)
[info] org.apache.spark.sql.AnalysisException: Cannot translate expression to source filter: null
[info] at org.apache.spark.sql.execution.datasources.v2.V2Writes$$anonfun$apply$1.$anonfun$applyOrElse$1(V2Writes.scala:50)
[info] at scala.collection.immutable.List.flatMap(List.scala:366)
[info] at org.apache.spark.sql.execution.datasources.v2.V2Writes$$anonfun$apply$1.applyOrElse(V2Writes.scala:47)
[info] at org.apache.spark.sql.execution.datasources.v2.V2Writes$$anonfun$apply$1.applyOrElse(V2Writes.scala:39)
[info] at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$1(TreeNode.scala:317)
[info] at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:73)
```
The right way is to use EqualNullSafe instead to delete the null partitions.
### Why are the changes needed?
bugfix
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
an original test to new place
Closes #31339 from yaooqinn/SPARK-34236.
Authored-by: Kent Yao <ya...@apache.org>
Signed-off-by: Wenchen Fan <we...@databricks.com>
(cherry picked from commit 91ca21d7006169d95940506b8de154b96b4fae20)
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 2 +-
.../org/apache/spark/sql/connector/InMemoryTable.scala | 15 ++++++++++++++-
.../scala/org/apache/spark/sql/SQLInsertTestSuite.scala | 8 ++++++++
.../scala/org/apache/spark/sql/sources/InsertSuite.scala | 8 --------
4 files changed, 23 insertions(+), 10 deletions(-)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 4274759..fb95323a 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -1295,7 +1295,7 @@ class Analyzer(override val catalogManager: CatalogManager)
// ResolveOutputRelation runs, using the query's column names that will match the
// table names at that point. because resolution happens after a future rule, create
// an UnresolvedAttribute.
- EqualTo(UnresolvedAttribute(attr.name), Cast(Literal(value), attr.dataType))
+ EqualNullSafe(UnresolvedAttribute(attr.name), Cast(Literal(value), attr.dataType))
case None =>
throw QueryCompilationErrors.unknownStaticPartitionColError(name)
}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTable.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTable.scala
index aac3077..a7010a9 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTable.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTable.scala
@@ -34,7 +34,7 @@ import org.apache.spark.sql.connector.expressions.{BucketTransform, DaysTransfor
import org.apache.spark.sql.connector.read._
import org.apache.spark.sql.connector.write._
import org.apache.spark.sql.connector.write.streaming.{StreamingDataWriterFactory, StreamingWrite}
-import org.apache.spark.sql.sources.{And, EqualTo, Filter, IsNotNull}
+import org.apache.spark.sql.sources.{And, EqualNullSafe, EqualTo, Filter, IsNotNull, IsNull}
import org.apache.spark.sql.types.{DataType, DateType, StringType, StructField, StructType, TimestampType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.unsafe.types.UTF8String
@@ -366,6 +366,17 @@ object InMemoryTable {
filters.flatMap(splitAnd).forall {
case EqualTo(attr, value) =>
value == extractValue(attr, partitionNames, partValues)
+ case EqualNullSafe(attr, value) =>
+ val attrVal = extractValue(attr, partitionNames, partValues)
+ if (attrVal == null && value === null) {
+ true
+ } else if (attrVal == null || value === null) {
+ false
+ } else {
+ value == attrVal
+ }
+ case IsNull(attr) =>
+ null == extractValue(attr, partitionNames, partValues)
case IsNotNull(attr) =>
null != extractValue(attr, partitionNames, partValues)
case f =>
@@ -377,6 +388,8 @@ object InMemoryTable {
def supportsFilters(filters: Array[Filter]): Boolean = {
filters.flatMap(splitAnd).forall {
case _: EqualTo => true
+ case _: EqualNullSafe => true
+ case _: IsNull => true
case _: IsNotNull => true
case _ => false
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLInsertTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLInsertTestSuite.scala
index 12394a9..c7446c7 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLInsertTestSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLInsertTestSuite.scala
@@ -200,6 +200,14 @@ trait SQLInsertTestSuite extends QueryTest with SQLTestUtils {
e1.getMessage.contains(v2Msg))
}
}
+
+ test("SPARK-34223: static partition with null raise NPE") {
+ withTable("t") {
+ sql(s"CREATE TABLE t(i STRING, c string) USING PARQUET PARTITIONED BY (c)")
+ sql("INSERT OVERWRITE t PARTITION (c=null) VALUES ('1')")
+ checkAnswer(spark.table("t"), Row("1", null))
+ }
+ }
}
class FileSourceSQLInsertTestSuite extends SQLInsertTestSuite with SharedSparkSession {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
index 6082f06..aaf8765 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
@@ -954,14 +954,6 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
assert(msg.contains("cannot resolve '`c3`' given input columns"))
}
}
-
- test("SPARK-34223: static partition with null raise NPE") {
- withTable("t") {
- sql(s"CREATE TABLE t(i STRING, c string) USING PARQUET PARTITIONED BY (c)")
- sql("INSERT OVERWRITE t PARTITION (c=null) VALUES ('1')")
- checkAnswer(spark.table("t"), Row("1", null))
- }
- }
}
class FileExistingTestFileSystem extends RawLocalFileSystem {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org