You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2023/03/10 12:59:00 UTC

[spark] branch master updated: [SPARK-42745][SQL] Improved AliasAwareOutputExpression works with DSv2

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 93d5816b3f1 [SPARK-42745][SQL] Improved AliasAwareOutputExpression works with DSv2
93d5816b3f1 is described below

commit 93d5816b3f1460b405c9828ed5ae646adfa236aa
Author: Peter Toth <pe...@gmail.com>
AuthorDate: Fri Mar 10 20:58:38 2023 +0800

    [SPARK-42745][SQL] Improved AliasAwareOutputExpression works with DSv2
    
    ### What changes were proposed in this pull request?
    
    After https://github.com/apache/spark/pull/37525 (SPARK-40086 / SPARK-42049) the following, simple subselect expression containing query:
    ```
    select (select sum(id) from t1)
    ```
    fails with:
    ```
    09:48:57.645 ERROR org.apache.spark.executor.Executor: Exception in task 0.0 in stage 3.0 (TID 3)
    java.lang.NullPointerException
            at org.apache.spark.sql.execution.datasources.v2.BatchScanExec.batch$lzycompute(BatchScanExec.scala:47)
            at org.apache.spark.sql.execution.datasources.v2.BatchScanExec.batch(BatchScanExec.scala:47)
            at org.apache.spark.sql.execution.datasources.v2.BatchScanExec.hashCode(BatchScanExec.scala:60)
            at scala.runtime.Statics.anyHash(Statics.java:122)
            ...
            at org.apache.spark.sql.catalyst.trees.TreeNode.hashCode(TreeNode.scala:249)
            at scala.runtime.Statics.anyHash(Statics.java:122)
            at scala.collection.mutable.HashTable$HashUtils.elemHashCode(HashTable.scala:416)
            at scala.collection.mutable.HashTable$HashUtils.elemHashCode$(HashTable.scala:416)
            at scala.collection.mutable.HashMap.elemHashCode(HashMap.scala:44)
            at scala.collection.mutable.HashTable.addEntry(HashTable.scala:149)
            at scala.collection.mutable.HashTable.addEntry$(HashTable.scala:148)
            at scala.collection.mutable.HashMap.addEntry(HashMap.scala:44)
            at scala.collection.mutable.HashTable.init(HashTable.scala:110)
            at scala.collection.mutable.HashTable.init$(HashTable.scala:89)
            at scala.collection.mutable.HashMap.init(HashMap.scala:44)
            at scala.collection.mutable.HashMap.readObject(HashMap.scala:195)
            ...
            at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461)
            at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:87)
            at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:129)
            at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:85)
            at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
            at org.apache.spark.scheduler.Task.run(Task.scala:139)
            at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
            at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1520)
            at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
            at java.lang.Thread.run(Thread.java:750)
    ```
    when DSv2 is enabled.
    
    This PR proposes to fix `BatchScanExec` as its `equals()` and `hashCode()` as those shouldn't throw NPE in any circumstances.
    
    But if we dig deeper we realize that the NPE orrurs since https://github.com/apache/spark/pull/37525 and the root cause of the problem is changing `AliasAwareOutputExpression.aliasMap` from immutable to mutable. The mutable map deserialization invokes the `hashCode()` of the keys while that is not the case with immutable maps. In this case the key is a subquery expression whose plan contains the `BatchScanExec`.
    Please note that the mutability of `aliasMap` shouldn't be an issue as it is a `private` field of `AliasAwareOutputExpression` (though adding a simple `.toMap` would also help to avoid the NPE).
    Based on the above findings this PR also proposes making `aliasMap` to transient as it isn't needed on executors.
    
    A side quiestion is if adding any subqery expressions to `AliasAwareOutputExpression.aliasMap` makes any sense because `AliasAwareOutputExpression.projectExpression()` mainly projects `child.outputPartitioning` and `child.outputOrdering` that can't contain subquery expressions. But there are a few exceptions (`SortAggregateExec`, `TakeOrderedAndProjectExec`) where `AliasAwareQueryOutputOrdering.orderingExpressions` doesn't come from the `child` and actually leaving those expressions i [...]
    
    ### Why are the changes needed?
    To fix regression introduced with https://github.com/apache/spark/pull/37525.
    
    ### Does this PR introduce _any_ user-facing change?
    Yes, the query works again.
    
    ### How was this patch tested?
    Added new UT.
    
    Closes #40364 from peter-toth/SPARK-42745-improved-aliasawareoutputexpression-with-dsv2.
    
    Authored-by: Peter Toth <pe...@gmail.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../sql/catalyst/plans/AliasAwareOutputExpression.scala  |  1 +
 .../sql/execution/datasources/v2/BatchScanExec.scala     |  5 +++--
 .../test/scala/org/apache/spark/sql/SubquerySuite.scala  | 16 ++++++++++++++++
 3 files changed, 20 insertions(+), 2 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/AliasAwareOutputExpression.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/AliasAwareOutputExpression.scala
index cfe22994592..2cca7b844cc 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/AliasAwareOutputExpression.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/AliasAwareOutputExpression.scala
@@ -40,6 +40,7 @@ trait AliasAwareOutputExpression extends SQLConfHelper {
   // more than `aliasCandidateLimit` attributes for an expression. In those cases the old logic
   // handled only the last alias so we need to make sure that we give precedence to that.
   // If the `outputExpressions` contain simple attributes we need to add those too to the map.
+  @transient
   private lazy val aliasMap = {
     val aliases = mutable.Map[Expression, mutable.ArrayBuffer[Attribute]]()
     outputExpressions.reverse.foreach {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala
index 52f15cf7b65..d43331d57c4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala
@@ -44,12 +44,13 @@ case class BatchScanExec(
     applyPartialClustering: Boolean = false,
     replicatePartitions: Boolean = false) extends DataSourceV2ScanExecBase {
 
-  @transient lazy val batch = scan.toBatch
+  @transient lazy val batch = if (scan == null) null else scan.toBatch
 
   // TODO: unify the equal/hashCode implementation for all data source v2 query plans.
   override def equals(other: Any): Boolean = other match {
     case other: BatchScanExec =>
-      this.batch == other.batch && this.runtimeFilters == other.runtimeFilters &&
+      this.batch != null && this.batch == other.batch &&
+          this.runtimeFilters == other.runtimeFilters &&
           this.commonPartitionValues == other.commonPartitionValues &&
           this.replicatePartitions == other.replicatePartitions &&
           this.applyPartialClustering == other.applyPartialClustering
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
index 4d76013d659..7c15a05c586 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
@@ -2679,4 +2679,20 @@ class SubquerySuite extends QueryTest
         Row(8, 6))
     }
   }
+
+  test("SPARK-42745: Improved AliasAwareOutputExpression works with DSv2") {
+    withSQLConf(
+      SQLConf.USE_V1_SOURCE_LIST.key -> "") {
+      withTempPath { path =>
+        spark.range(0)
+          .write
+          .mode("overwrite")
+          .parquet(path.getCanonicalPath)
+        withTempView("t1") {
+          spark.read.parquet(path.toString).createOrReplaceTempView("t1")
+          checkAnswer(sql("select (select sum(id) from t1)"), Row(null))
+        }
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org