You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2019/11/10 22:01:03 UTC

[spark] branch branch-2.4 updated: [SPARK-28939][SQL][2.4] Propagate SQLConf for plans executed by toRdd

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
     new 2cc56e0  [SPARK-28939][SQL][2.4] Propagate SQLConf for plans executed by toRdd
2cc56e0 is described below

commit 2cc56e03398aea20457caf8f26284f64bb3a21d1
Author: Marco Gaido <ma...@gmail.com>
AuthorDate: Sun Nov 10 14:00:09 2019 -0800

    [SPARK-28939][SQL][2.4] Propagate SQLConf for plans executed by toRdd
    
    ### What changes were proposed in this pull request?
        The PR proposes to create a custom `RDD` which enables to propagate `SQLConf` also in cases not tracked by SQL execution, as it happens when a `Dataset` is converted to and RDD either using `.rdd` or `.queryExecution.toRdd` and then the returned RDD is used to invoke actions on it.
    
    In this way, SQL configs are effective also in these cases, while earlier they were ignored.
    
    ### Why are the changes needed?
    
    Without this patch, all the times `.rdd` or `.queryExecution.toRdd` are used, all the SQL configs set are ignored. An example of a reproducer can be:
    ```
          withSQLConf(SQLConf.SUBEXPRESSION_ELIMINATION_ENABLED.key, "false") {
            val df = spark.range(2).selectExpr((0 to 5000).map(i => s"id as field_$i"): _*)
            df.createOrReplaceTempView("spark64kb")
            val data = spark.sql("select * from spark64kb limit 10")
            // Subexpression elimination is used here, despite it should have been disabled
            data.describe()
          }
    ```
    
    ### Why are the changes needed?
    Without this patch, all the times `.rdd` or `.queryExecution.toRdd` are used, all the SQL configs set are ignored. An example of a reproducer can be:
    ```
          withSQLConf(SQLConf.SUBEXPRESSION_ELIMINATION_ENABLED.key, "false") {
            val df = spark.range(2).selectExpr((0 to 5000).map(i => s"id as field_$i"): _*)
            df.createOrReplaceTempView("spark64kb")
            val data = spark.sql("select * from spark64kb limit 10")
            // Subexpression elimination is used here, despite it should have been disabled
            data.describe()
          }
    ```
    
    ### Does this PR introduce any user-facing change?
    When a user calls `.queryExecution.toRdd`, a `SQLExecutionRDD` is returned wrapping the `RDD` of the execute. When `.rdd` is used, an additional `SQLExecutionRDD` is present in the hierarchy.
    
    ### How was this patch tested?
    added UT
    
    Closes #25734 from mgaido91/SPARK-28939_2.4.
    
    Authored-by: Marco Gaido <ma...@gmail.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 docs/sql-migration-guide-upgrade.md                |  7 +++
 .../org/apache/spark/sql/internal/SQLConf.scala    | 20 +++++++-
 .../spark/sql/execution/QueryExecution.scala       |  9 +++-
 .../spark/sql/execution/SQLExecutionRDD.scala      | 58 ++++++++++++++++++++++
 .../sql/internal/ExecutorSideSQLConfSuite.scala    | 58 +++++++++++++++++++++-
 5 files changed, 147 insertions(+), 5 deletions(-)

diff --git a/docs/sql-migration-guide-upgrade.md b/docs/sql-migration-guide-upgrade.md
index b703cb5..a6fc42a 100644
--- a/docs/sql-migration-guide-upgrade.md
+++ b/docs/sql-migration-guide-upgrade.md
@@ -7,6 +7,13 @@ displayTitle: Spark SQL Upgrading Guide
 * Table of contents
 {:toc}
 
+## Upgrading from Spark SQL 2.4 to 2.4.5
+
+ - Starting from 2.4.5, SQL configurations are effective also when a Dataset is converted to an RDD and its
+   plan is executed due to action on the derived RDD. The previous behavior can be restored setting
+   `spark.sql.legacy.rdd.applyConf` to `false`: in this case, SQL configurations are ignored for operations
+   performed on a RDD derived from a Dataset.
+
 ## Upgrading from Spark SQL 2.4 to 2.4.1
 
   - The value of `spark.executor.heartbeatInterval`, when specified without units like "30" rather than "30s", was
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index f281461..c7167f4 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -113,7 +113,9 @@ object SQLConf {
    * Returns the active config object within the current scope. If there is an active SparkSession,
    * the proper SQLConf associated with the thread's active session is used. If it's called from
    * tasks in the executor side, a SQLConf will be created from job local properties, which are set
-   * and propagated from the driver side.
+   * and propagated from the driver side, unless a `SQLConf` has been set in the scope by
+   * `withExistingConf` as done for propagating SQLConf for operations performed on RDDs created
+   * from DataFrames.
    *
    * The way this works is a little bit convoluted, due to the fact that config was added initially
    * only for physical plans (and as a result not in sql/catalyst module).
@@ -127,7 +129,12 @@ object SQLConf {
    */
   def get: SQLConf = {
     if (TaskContext.get != null) {
-      new ReadOnlySQLConf(TaskContext.get())
+      val conf = existingConf.get()
+      if (conf != null) {
+        conf
+      } else {
+        new ReadOnlySQLConf(TaskContext.get())
+      }
     } else {
       val isSchedulerEventLoopThread = SparkContext.getActive
         .flatMap { sc => Option(sc.dagScheduler) }
@@ -1292,6 +1299,15 @@ object SQLConf {
       .booleanConf
       .createWithDefault(true)
 
+  val USE_CONF_ON_RDD_OPERATION =
+    buildConf("spark.sql.legacy.rdd.applyConf")
+      .internal()
+      .doc("When false, SQL configurations are disregarded when operations on a RDD derived from" +
+        " a dataframe are executed. This is the (buggy) behavior up to 2.4.4. This config is " +
+        "deprecated and it will be removed in 3.0.0.")
+      .booleanConf
+      .createWithDefault(true)
+
   val REPLACE_EXCEPT_WITH_FILTER = buildConf("spark.sql.optimizer.replaceExceptWithFilter")
     .internal()
     .doc("When true, the apply function of the rule verifies whether the right node of the" +
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
index 64f49e2..37353b8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
@@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.execution.command.{DescribeTableCommand, ExecutedCommandExec, ShowTablesCommand}
 import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ReuseExchange}
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.{BinaryType, DateType, DecimalType, TimestampType, _}
 import org.apache.spark.util.Utils
 
@@ -77,7 +78,13 @@ class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) {
   lazy val executedPlan: SparkPlan = prepareForExecution(sparkPlan)
 
   /** Internal version of the RDD. Avoids copies and has no schema */
-  lazy val toRdd: RDD[InternalRow] = executedPlan.execute()
+  lazy val toRdd: RDD[InternalRow] = {
+    if (sparkSession.sessionState.conf.getConf(SQLConf.USE_CONF_ON_RDD_OPERATION)) {
+      new SQLExecutionRDD(executedPlan.execute(), sparkSession.sessionState.conf)
+    } else {
+      executedPlan.execute()
+    }
+  }
 
   /**
    * Prepares a planned [[SparkPlan]] for execution by inserting shuffle operations and internal
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecutionRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecutionRDD.scala
new file mode 100644
index 0000000..307e64b
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecutionRDD.scala
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution
+
+import org.apache.spark.{Partition, TaskContext}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * It is just a wrapper over `sqlRDD`, which sets and makes effective all the configs from the
+ * captured `SQLConf`.
+ * Please notice that this means we may miss configurations set after the creation of this RDD and
+ * before its execution.
+ *
+ * @param sqlRDD the `RDD` generated by the SQL plan
+ * @param conf the `SQLConf` to apply to the execution of the SQL plan
+ */
+class SQLExecutionRDD(
+    var sqlRDD: RDD[InternalRow], conf: SQLConf) extends RDD[InternalRow](sqlRDD) {
+  private val sqlConfigs = conf.getAllConfs
+  private lazy val sqlConfExecutorSide = {
+    val newConf = new SQLConf()
+    sqlConfigs.foreach { case (k, v) => newConf.setConfString(k, v) }
+    newConf
+  }
+
+  override val partitioner = firstParent[InternalRow].partitioner
+
+  override def getPartitions: Array[Partition] = firstParent[InternalRow].partitions
+
+  override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = {
+    // If we are in the context of a tracked SQL operation, `SQLExecution.EXECUTION_ID_KEY` is set
+    // and we have nothing to do here. Otherwise, we use the `SQLConf` captured at the creation of
+    // this RDD.
+    if (context.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) == null) {
+      SQLConf.withExistingConf(sqlConfExecutorSide) {
+        firstParent[InternalRow].iterator(split, context)
+      }
+    } else {
+      firstParent[InternalRow].iterator(split, context)
+    }
+  }
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/ExecutorSideSQLConfSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/ExecutorSideSQLConfSuite.scala
index 5b4736e..ae7206b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/internal/ExecutorSideSQLConfSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/ExecutorSideSQLConfSuite.scala
@@ -17,8 +17,13 @@
 
 package org.apache.spark.sql.internal
 
-import org.apache.spark.SparkFunSuite
-import org.apache.spark.sql.{AnalysisException, SparkSession}
+import org.apache.spark.{SparkException, SparkFunSuite}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
+import org.apache.spark.sql.execution.{LeafExecNode, QueryExecution, SparkPlan}
 import org.apache.spark.sql.execution.debug.codegenStringSeq
 import org.apache.spark.sql.functions.col
 import org.apache.spark.sql.test.SQLTestUtils
@@ -98,4 +103,53 @@ class ExecutorSideSQLConfSuite extends SparkFunSuite with SQLTestUtils {
       }
     }
   }
+
+  test("SPARK-28939: propagate SQLConf also in conversions to RDD") {
+    withSQLConf(SQLConf.USE_CONF_ON_RDD_OPERATION.key -> "true") {
+      val confs = Seq("spark.sql.a" -> "x", "spark.sql.b" -> "y")
+      val physicalPlan = SQLConfAssertPlan(confs)
+      val dummyQueryExecution = FakeQueryExecution(spark, physicalPlan)
+      withSQLConf(confs: _*) {
+        // Force RDD evaluation to trigger asserts
+        dummyQueryExecution.toRdd.collect()
+      }
+      val dummyQueryExecution1 = FakeQueryExecution(spark, physicalPlan)
+      // Without setting the configs assertions fail
+      val e = intercept[SparkException](dummyQueryExecution1.toRdd.collect())
+      assert(e.getCause.isInstanceOf[NoSuchElementException])
+    }
+    withSQLConf(SQLConf.USE_CONF_ON_RDD_OPERATION.key -> "false") {
+      val confs = Seq("spark.sql.a" -> "x", "spark.sql.b" -> "y")
+      val physicalPlan = SQLConfAssertPlan(confs)
+      val dummyQueryExecution = FakeQueryExecution(spark, physicalPlan)
+      withSQLConf(confs: _*) {
+        // Force RDD evaluation to trigger asserts
+        val e = intercept[SparkException](dummyQueryExecution.toRdd.collect())
+        assert(e.getCause.isInstanceOf[NoSuchElementException])
+      }
+    }
+  }
+}
+
+case class SQLConfAssertPlan(confToCheck: Seq[(String, String)]) extends LeafExecNode {
+  override protected def doExecute(): RDD[InternalRow] = {
+    sqlContext
+      .sparkContext
+      .parallelize(0 until 2, 2)
+      .mapPartitions { it =>
+        val confs = SQLConf.get
+        confToCheck.foreach { case (key, expectedValue) =>
+          assert(confs.getConfString(key) == expectedValue)
+        }
+        it.map(i => InternalRow.fromSeq(Seq(i)))
+      }
+  }
+
+  override def output: Seq[Attribute] = Seq.empty
+}
+
+case class FakeQueryExecution(spark: SparkSession, physicalPlan: SparkPlan)
+    extends QueryExecution(spark, LocalRelation()) {
+  override lazy val sparkPlan: SparkPlan = physicalPlan
+  override lazy val executedPlan: SparkPlan = physicalPlan
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org