You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2019/11/10 22:01:03 UTC
[spark] branch branch-2.4 updated: [SPARK-28939][SQL][2.4]
Propagate SQLConf for plans executed by toRdd
This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-2.4 by this push:
new 2cc56e0 [SPARK-28939][SQL][2.4] Propagate SQLConf for plans executed by toRdd
2cc56e0 is described below
commit 2cc56e03398aea20457caf8f26284f64bb3a21d1
Author: Marco Gaido <ma...@gmail.com>
AuthorDate: Sun Nov 10 14:00:09 2019 -0800
[SPARK-28939][SQL][2.4] Propagate SQLConf for plans executed by toRdd
### What changes were proposed in this pull request?
The PR proposes to create a custom `RDD` which enables to propagate `SQLConf` also in cases not tracked by SQL execution, as it happens when a `Dataset` is converted to and RDD either using `.rdd` or `.queryExecution.toRdd` and then the returned RDD is used to invoke actions on it.
In this way, SQL configs are effective also in these cases, while earlier they were ignored.
### Why are the changes needed?
Without this patch, all the times `.rdd` or `.queryExecution.toRdd` are used, all the SQL configs set are ignored. An example of a reproducer can be:
```
withSQLConf(SQLConf.SUBEXPRESSION_ELIMINATION_ENABLED.key, "false") {
val df = spark.range(2).selectExpr((0 to 5000).map(i => s"id as field_$i"): _*)
df.createOrReplaceTempView("spark64kb")
val data = spark.sql("select * from spark64kb limit 10")
// Subexpression elimination is used here, despite it should have been disabled
data.describe()
}
```
### Why are the changes needed?
Without this patch, all the times `.rdd` or `.queryExecution.toRdd` are used, all the SQL configs set are ignored. An example of a reproducer can be:
```
withSQLConf(SQLConf.SUBEXPRESSION_ELIMINATION_ENABLED.key, "false") {
val df = spark.range(2).selectExpr((0 to 5000).map(i => s"id as field_$i"): _*)
df.createOrReplaceTempView("spark64kb")
val data = spark.sql("select * from spark64kb limit 10")
// Subexpression elimination is used here, despite it should have been disabled
data.describe()
}
```
### Does this PR introduce any user-facing change?
When a user calls `.queryExecution.toRdd`, a `SQLExecutionRDD` is returned wrapping the `RDD` of the execute. When `.rdd` is used, an additional `SQLExecutionRDD` is present in the hierarchy.
### How was this patch tested?
added UT
Closes #25734 from mgaido91/SPARK-28939_2.4.
Authored-by: Marco Gaido <ma...@gmail.com>
Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
docs/sql-migration-guide-upgrade.md | 7 +++
.../org/apache/spark/sql/internal/SQLConf.scala | 20 +++++++-
.../spark/sql/execution/QueryExecution.scala | 9 +++-
.../spark/sql/execution/SQLExecutionRDD.scala | 58 ++++++++++++++++++++++
.../sql/internal/ExecutorSideSQLConfSuite.scala | 58 +++++++++++++++++++++-
5 files changed, 147 insertions(+), 5 deletions(-)
diff --git a/docs/sql-migration-guide-upgrade.md b/docs/sql-migration-guide-upgrade.md
index b703cb5..a6fc42a 100644
--- a/docs/sql-migration-guide-upgrade.md
+++ b/docs/sql-migration-guide-upgrade.md
@@ -7,6 +7,13 @@ displayTitle: Spark SQL Upgrading Guide
* Table of contents
{:toc}
+## Upgrading from Spark SQL 2.4 to 2.4.5
+
+ - Starting from 2.4.5, SQL configurations are effective also when a Dataset is converted to an RDD and its
+ plan is executed due to action on the derived RDD. The previous behavior can be restored setting
+ `spark.sql.legacy.rdd.applyConf` to `false`: in this case, SQL configurations are ignored for operations
+ performed on a RDD derived from a Dataset.
+
## Upgrading from Spark SQL 2.4 to 2.4.1
- The value of `spark.executor.heartbeatInterval`, when specified without units like "30" rather than "30s", was
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index f281461..c7167f4 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -113,7 +113,9 @@ object SQLConf {
* Returns the active config object within the current scope. If there is an active SparkSession,
* the proper SQLConf associated with the thread's active session is used. If it's called from
* tasks in the executor side, a SQLConf will be created from job local properties, which are set
- * and propagated from the driver side.
+ * and propagated from the driver side, unless a `SQLConf` has been set in the scope by
+ * `withExistingConf` as done for propagating SQLConf for operations performed on RDDs created
+ * from DataFrames.
*
* The way this works is a little bit convoluted, due to the fact that config was added initially
* only for physical plans (and as a result not in sql/catalyst module).
@@ -127,7 +129,12 @@ object SQLConf {
*/
def get: SQLConf = {
if (TaskContext.get != null) {
- new ReadOnlySQLConf(TaskContext.get())
+ val conf = existingConf.get()
+ if (conf != null) {
+ conf
+ } else {
+ new ReadOnlySQLConf(TaskContext.get())
+ }
} else {
val isSchedulerEventLoopThread = SparkContext.getActive
.flatMap { sc => Option(sc.dagScheduler) }
@@ -1292,6 +1299,15 @@ object SQLConf {
.booleanConf
.createWithDefault(true)
+ val USE_CONF_ON_RDD_OPERATION =
+ buildConf("spark.sql.legacy.rdd.applyConf")
+ .internal()
+ .doc("When false, SQL configurations are disregarded when operations on a RDD derived from" +
+ " a dataframe are executed. This is the (buggy) behavior up to 2.4.4. This config is " +
+ "deprecated and it will be removed in 3.0.0.")
+ .booleanConf
+ .createWithDefault(true)
+
val REPLACE_EXCEPT_WITH_FILTER = buildConf("spark.sql.optimizer.replaceExceptWithFilter")
.internal()
.doc("When true, the apply function of the rule verifies whether the right node of the" +
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
index 64f49e2..37353b8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
@@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.execution.command.{DescribeTableCommand, ExecutedCommandExec, ShowTablesCommand}
import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ReuseExchange}
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{BinaryType, DateType, DecimalType, TimestampType, _}
import org.apache.spark.util.Utils
@@ -77,7 +78,13 @@ class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) {
lazy val executedPlan: SparkPlan = prepareForExecution(sparkPlan)
/** Internal version of the RDD. Avoids copies and has no schema */
- lazy val toRdd: RDD[InternalRow] = executedPlan.execute()
+ lazy val toRdd: RDD[InternalRow] = {
+ if (sparkSession.sessionState.conf.getConf(SQLConf.USE_CONF_ON_RDD_OPERATION)) {
+ new SQLExecutionRDD(executedPlan.execute(), sparkSession.sessionState.conf)
+ } else {
+ executedPlan.execute()
+ }
+ }
/**
* Prepares a planned [[SparkPlan]] for execution by inserting shuffle operations and internal
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecutionRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecutionRDD.scala
new file mode 100644
index 0000000..307e64b
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecutionRDD.scala
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution
+
+import org.apache.spark.{Partition, TaskContext}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * It is just a wrapper over `sqlRDD`, which sets and makes effective all the configs from the
+ * captured `SQLConf`.
+ * Please notice that this means we may miss configurations set after the creation of this RDD and
+ * before its execution.
+ *
+ * @param sqlRDD the `RDD` generated by the SQL plan
+ * @param conf the `SQLConf` to apply to the execution of the SQL plan
+ */
+class SQLExecutionRDD(
+ var sqlRDD: RDD[InternalRow], conf: SQLConf) extends RDD[InternalRow](sqlRDD) {
+ private val sqlConfigs = conf.getAllConfs
+ private lazy val sqlConfExecutorSide = {
+ val newConf = new SQLConf()
+ sqlConfigs.foreach { case (k, v) => newConf.setConfString(k, v) }
+ newConf
+ }
+
+ override val partitioner = firstParent[InternalRow].partitioner
+
+ override def getPartitions: Array[Partition] = firstParent[InternalRow].partitions
+
+ override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = {
+ // If we are in the context of a tracked SQL operation, `SQLExecution.EXECUTION_ID_KEY` is set
+ // and we have nothing to do here. Otherwise, we use the `SQLConf` captured at the creation of
+ // this RDD.
+ if (context.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) == null) {
+ SQLConf.withExistingConf(sqlConfExecutorSide) {
+ firstParent[InternalRow].iterator(split, context)
+ }
+ } else {
+ firstParent[InternalRow].iterator(split, context)
+ }
+ }
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/ExecutorSideSQLConfSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/ExecutorSideSQLConfSuite.scala
index 5b4736e..ae7206b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/internal/ExecutorSideSQLConfSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/ExecutorSideSQLConfSuite.scala
@@ -17,8 +17,13 @@
package org.apache.spark.sql.internal
-import org.apache.spark.SparkFunSuite
-import org.apache.spark.sql.{AnalysisException, SparkSession}
+import org.apache.spark.{SparkException, SparkFunSuite}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
+import org.apache.spark.sql.execution.{LeafExecNode, QueryExecution, SparkPlan}
import org.apache.spark.sql.execution.debug.codegenStringSeq
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.test.SQLTestUtils
@@ -98,4 +103,53 @@ class ExecutorSideSQLConfSuite extends SparkFunSuite with SQLTestUtils {
}
}
}
+
+ test("SPARK-28939: propagate SQLConf also in conversions to RDD") {
+ withSQLConf(SQLConf.USE_CONF_ON_RDD_OPERATION.key -> "true") {
+ val confs = Seq("spark.sql.a" -> "x", "spark.sql.b" -> "y")
+ val physicalPlan = SQLConfAssertPlan(confs)
+ val dummyQueryExecution = FakeQueryExecution(spark, physicalPlan)
+ withSQLConf(confs: _*) {
+ // Force RDD evaluation to trigger asserts
+ dummyQueryExecution.toRdd.collect()
+ }
+ val dummyQueryExecution1 = FakeQueryExecution(spark, physicalPlan)
+ // Without setting the configs assertions fail
+ val e = intercept[SparkException](dummyQueryExecution1.toRdd.collect())
+ assert(e.getCause.isInstanceOf[NoSuchElementException])
+ }
+ withSQLConf(SQLConf.USE_CONF_ON_RDD_OPERATION.key -> "false") {
+ val confs = Seq("spark.sql.a" -> "x", "spark.sql.b" -> "y")
+ val physicalPlan = SQLConfAssertPlan(confs)
+ val dummyQueryExecution = FakeQueryExecution(spark, physicalPlan)
+ withSQLConf(confs: _*) {
+ // Force RDD evaluation to trigger asserts
+ val e = intercept[SparkException](dummyQueryExecution.toRdd.collect())
+ assert(e.getCause.isInstanceOf[NoSuchElementException])
+ }
+ }
+ }
+}
+
+case class SQLConfAssertPlan(confToCheck: Seq[(String, String)]) extends LeafExecNode {
+ override protected def doExecute(): RDD[InternalRow] = {
+ sqlContext
+ .sparkContext
+ .parallelize(0 until 2, 2)
+ .mapPartitions { it =>
+ val confs = SQLConf.get
+ confToCheck.foreach { case (key, expectedValue) =>
+ assert(confs.getConfString(key) == expectedValue)
+ }
+ it.map(i => InternalRow.fromSeq(Seq(i)))
+ }
+ }
+
+ override def output: Seq[Attribute] = Seq.empty
+}
+
+case class FakeQueryExecution(spark: SparkSession, physicalPlan: SparkPlan)
+ extends QueryExecution(spark, LocalRelation()) {
+ override lazy val sparkPlan: SparkPlan = physicalPlan
+ override lazy val executedPlan: SparkPlan = physicalPlan
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org