You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2018/05/20 08:14:17 UTC
spark git commit: Revert "[SPARK-24250][SQL] support accessing
SQLConf inside tasks"
Repository: spark
Updated Branches:
refs/heads/master dd37529a8 -> 000e25ae7
Revert "[SPARK-24250][SQL] support accessing SQLConf inside tasks"
This reverts commit dd37529a8dada6ed8a49b8ce50875268f6a20cba.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/000e25ae
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/000e25ae
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/000e25ae
Branch: refs/heads/master
Commit: 000e25ae7950ff005d4bbe4fffed410e5947075c
Parents: dd37529
Author: Wenchen Fan <we...@databricks.com>
Authored: Sun May 20 16:13:42 2018 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Sun May 20 16:13:42 2018 +0800
----------------------------------------------------------------------
.../org/apache/spark/TaskContextImpl.scala | 2 -
.../spark/sql/internal/ReadOnlySQLConf.scala | 66 --------------------
.../org/apache/spark/sql/internal/SQLConf.scala | 21 ++++---
.../org/apache/spark/sql/SparkSession.scala | 21 +------
.../spark/sql/execution/SQLExecution.scala | 50 ++++-----------
.../sql/execution/basicPhysicalOperators.scala | 2 +-
.../datasources/json/JsonDataSource.scala | 16 ++---
.../exchange/BroadcastExchangeExec.scala | 2 +-
.../sql/internal/ExecutorSideSQLConfSuite.scala | 66 --------------------
9 files changed, 36 insertions(+), 210 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/000e25ae/core/src/main/scala/org/apache/spark/TaskContextImpl.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala b/core/src/main/scala/org/apache/spark/TaskContextImpl.scala
index 0791fe8..cccd3ea 100644
--- a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala
+++ b/core/src/main/scala/org/apache/spark/TaskContextImpl.scala
@@ -178,6 +178,4 @@ private[spark] class TaskContextImpl(
private[spark] def fetchFailed: Option[FetchFailedException] = _fetchFailedException
- // TODO: shall we publish it and define it in `TaskContext`?
- private[spark] def getLocalProperties(): Properties = localProperties
}
http://git-wip-us.apache.org/repos/asf/spark/blob/000e25ae/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/ReadOnlySQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/ReadOnlySQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/ReadOnlySQLConf.scala
deleted file mode 100644
index 19f6723..0000000
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/ReadOnlySQLConf.scala
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.internal
-
-import java.util.{Map => JMap}
-
-import org.apache.spark.{TaskContext, TaskContextImpl}
-import org.apache.spark.internal.config.{ConfigEntry, ConfigProvider, ConfigReader}
-
-/**
- * A readonly SQLConf that will be created by tasks running at the executor side. It reads the
- * configs from the local properties which are propagated from driver to executors.
- */
-class ReadOnlySQLConf(context: TaskContext) extends SQLConf {
-
- @transient override val settings: JMap[String, String] = {
- context.asInstanceOf[TaskContextImpl].getLocalProperties().asInstanceOf[JMap[String, String]]
- }
-
- @transient override protected val reader: ConfigReader = {
- new ConfigReader(new TaskContextConfigProvider(context))
- }
-
- override protected def setConfWithCheck(key: String, value: String): Unit = {
- throw new UnsupportedOperationException("Cannot mutate ReadOnlySQLConf.")
- }
-
- override def unsetConf(key: String): Unit = {
- throw new UnsupportedOperationException("Cannot mutate ReadOnlySQLConf.")
- }
-
- override def unsetConf(entry: ConfigEntry[_]): Unit = {
- throw new UnsupportedOperationException("Cannot mutate ReadOnlySQLConf.")
- }
-
- override def clear(): Unit = {
- throw new UnsupportedOperationException("Cannot mutate ReadOnlySQLConf.")
- }
-
- override def clone(): SQLConf = {
- throw new UnsupportedOperationException("Cannot clone/copy ReadOnlySQLConf.")
- }
-
- override def copy(entries: (ConfigEntry[_], Any)*): SQLConf = {
- throw new UnsupportedOperationException("Cannot clone/copy ReadOnlySQLConf.")
- }
-}
-
-class TaskContextConfigProvider(context: TaskContext) extends ConfigProvider {
- override def get(key: String): Option[String] = Option(context.getLocalProperty(key))
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/000e25ae/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 643e4c6..53a5030 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -27,12 +27,13 @@ import scala.util.matching.Regex
import org.apache.hadoop.fs.Path
-import org.apache.spark.TaskContext
+import org.apache.spark.{SparkContext, SparkEnv}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.network.util.ByteUnit
import org.apache.spark.sql.catalyst.analysis.Resolver
import org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator
+import org.apache.spark.util.Utils
////////////////////////////////////////////////////////////////////////////////////////////////////
// This file defines the configuration options for Spark SQL.
@@ -106,13 +107,7 @@ object SQLConf {
* run tests in parallel. At the time this feature was implemented, this was a no-op since we
* run unit tests (that does not involve SparkSession) in serial order.
*/
- def get: SQLConf = {
- if (TaskContext.get != null) {
- new ReadOnlySQLConf(TaskContext.get())
- } else {
- confGetter.get()()
- }
- }
+ def get: SQLConf = confGetter.get()()
val OPTIMIZER_MAX_ITERATIONS = buildConf("spark.sql.optimizer.maxIterations")
.internal()
@@ -1297,11 +1292,17 @@ object SQLConf {
class SQLConf extends Serializable with Logging {
import SQLConf._
+ if (Utils.isTesting && SparkEnv.get != null) {
+ // assert that we're only accessing it on the driver.
+ assert(SparkEnv.get.executorId == SparkContext.DRIVER_IDENTIFIER,
+ "SQLConf should only be created and accessed on the driver.")
+ }
+
/** Only low degree of contention is expected for conf, thus NOT using ConcurrentHashMap. */
@transient protected[spark] val settings = java.util.Collections.synchronizedMap(
new java.util.HashMap[String, String]())
- @transient protected val reader = new ConfigReader(settings)
+ @transient private val reader = new ConfigReader(settings)
/** ************************ Spark SQL Params/Hints ******************* */
@@ -1764,7 +1765,7 @@ class SQLConf extends Serializable with Logging {
settings.containsKey(key)
}
- protected def setConfWithCheck(key: String, value: String): Unit = {
+ private def setConfWithCheck(key: String, value: String): Unit = {
settings.put(key, value)
}
http://git-wip-us.apache.org/repos/asf/spark/blob/000e25ae/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
index e2a1a57..c502e58 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -24,7 +24,7 @@ import scala.collection.JavaConverters._
import scala.reflect.runtime.universe.TypeTag
import scala.util.control.NonFatal
-import org.apache.spark.{SPARK_VERSION, SparkConf, SparkContext, TaskContext}
+import org.apache.spark.{SPARK_VERSION, SparkConf, SparkContext}
import org.apache.spark.annotation.{DeveloperApi, Experimental, InterfaceStability}
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.internal.Logging
@@ -898,7 +898,6 @@ object SparkSession extends Logging {
* @since 2.0.0
*/
def getOrCreate(): SparkSession = synchronized {
- assertOnDriver()
// Get the session from current thread's active session.
var session = activeThreadSession.get()
if ((session ne null) && !session.sparkContext.isStopped) {
@@ -1023,20 +1022,14 @@ object SparkSession extends Logging {
*
* @since 2.2.0
*/
- def getActiveSession: Option[SparkSession] = {
- assertOnDriver()
- Option(activeThreadSession.get)
- }
+ def getActiveSession: Option[SparkSession] = Option(activeThreadSession.get)
/**
* Returns the default SparkSession that is returned by the builder.
*
* @since 2.2.0
*/
- def getDefaultSession: Option[SparkSession] = {
- assertOnDriver()
- Option(defaultSession.get)
- }
+ def getDefaultSession: Option[SparkSession] = Option(defaultSession.get)
/**
* Returns the currently active SparkSession, otherwise the default one. If there is no default
@@ -1069,14 +1062,6 @@ object SparkSession extends Logging {
}
}
- private def assertOnDriver(): Unit = {
- if (Utils.isTesting && TaskContext.get != null) {
- // we're accessing it during task execution, fail.
- throw new IllegalStateException(
- "SparkSession should only be created and accessed on the driver.")
- }
- }
-
/**
* Helper method to create an instance of `SessionState` based on `className` from conf.
* The result is either `SessionState` or a Hive based `SessionState`.
http://git-wip-us.apache.org/repos/asf/spark/blob/000e25ae/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
index 032525a..2c5102b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
@@ -68,18 +68,16 @@ object SQLExecution {
// sparkContext.getCallSite() would first try to pick up any call site that was previously
// set, then fall back to Utils.getCallSite(); call Utils.getCallSite() directly on
// streaming queries would give us call site like "run at <unknown>:0"
- val callSite = sc.getCallSite()
+ val callSite = sparkSession.sparkContext.getCallSite()
- withSQLConfPropagated(sparkSession) {
- sc.listenerBus.post(SparkListenerSQLExecutionStart(
- executionId, callSite.shortForm, callSite.longForm, queryExecution.toString,
- SparkPlanInfo.fromSparkPlan(queryExecution.executedPlan), System.currentTimeMillis()))
- try {
- body
- } finally {
- sc.listenerBus.post(SparkListenerSQLExecutionEnd(
- executionId, System.currentTimeMillis()))
- }
+ sparkSession.sparkContext.listenerBus.post(SparkListenerSQLExecutionStart(
+ executionId, callSite.shortForm, callSite.longForm, queryExecution.toString,
+ SparkPlanInfo.fromSparkPlan(queryExecution.executedPlan), System.currentTimeMillis()))
+ try {
+ body
+ } finally {
+ sparkSession.sparkContext.listenerBus.post(SparkListenerSQLExecutionEnd(
+ executionId, System.currentTimeMillis()))
}
} finally {
executionIdToQueryExecution.remove(executionId)
@@ -92,37 +90,13 @@ object SQLExecution {
* thread from the original one, this method can be used to connect the Spark jobs in this action
* with the known executionId, e.g., `BroadcastExchangeExec.relationFuture`.
*/
- def withExecutionId[T](sparkSession: SparkSession, executionId: String)(body: => T): T = {
- val sc = sparkSession.sparkContext
+ def withExecutionId[T](sc: SparkContext, executionId: String)(body: => T): T = {
val oldExecutionId = sc.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
- withSQLConfPropagated(sparkSession) {
- try {
- sc.setLocalProperty(SQLExecution.EXECUTION_ID_KEY, executionId)
- body
- } finally {
- sc.setLocalProperty(SQLExecution.EXECUTION_ID_KEY, oldExecutionId)
- }
- }
- }
-
- def withSQLConfPropagated[T](sparkSession: SparkSession)(body: => T): T = {
- val sc = sparkSession.sparkContext
- // Set all the specified SQL configs to local properties, so that they can be available at
- // the executor side.
- val allConfigs = sparkSession.sessionState.conf.getAllConfs
- val originalLocalProps = allConfigs.collect {
- case (key, value) if key.startsWith("spark") =>
- val originalValue = sc.getLocalProperty(key)
- sc.setLocalProperty(key, value)
- (key, originalValue)
- }
-
try {
+ sc.setLocalProperty(SQLExecution.EXECUTION_ID_KEY, executionId)
body
} finally {
- for ((key, value) <- originalLocalProps) {
- sc.setLocalProperty(key, value)
- }
+ sc.setLocalProperty(SQLExecution.EXECUTION_ID_KEY, oldExecutionId)
}
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/000e25ae/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
index d54bfbf..1edfdc8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
@@ -629,7 +629,7 @@ case class SubqueryExec(name: String, child: SparkPlan) extends UnaryExecNode {
Future {
// This will run in another thread. Set the execution id so that we can connect these jobs
// with the correct execution.
- SQLExecution.withExecutionId(sqlContext.sparkSession, executionId) {
+ SQLExecution.withExecutionId(sparkContext, executionId) {
val beforeCollect = System.nanoTime()
// Note that we use .executeCollect() because we don't want to convert data to Scala types
val rows: Array[InternalRow] = child.executeCollect()
http://git-wip-us.apache.org/repos/asf/spark/blob/000e25ae/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala
index 3b6df45..ba83df0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala
@@ -34,7 +34,6 @@ import org.apache.spark.rdd.{BinaryFileRDD, RDD}
import org.apache.spark.sql.{Dataset, Encoders, SparkSession}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.json.{CreateJacksonParser, JacksonParser, JSONOptions}
-import org.apache.spark.sql.execution.SQLExecution
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.text.TextFileFormat
import org.apache.spark.sql.types.StructType
@@ -105,19 +104,22 @@ object TextInputJsonDataSource extends JsonDataSource {
CreateJacksonParser.internalRow(enc, _: JsonFactory, _: InternalRow)
}.getOrElse(CreateJacksonParser.internalRow(_: JsonFactory, _: InternalRow))
- SQLExecution.withSQLConfPropagated(json.sparkSession) {
- JsonInferSchema.infer(rdd, parsedOptions, rowParser)
- }
+ JsonInferSchema.infer(rdd, parsedOptions, rowParser)
}
private def createBaseDataset(
sparkSession: SparkSession,
inputPaths: Seq[FileStatus],
parsedOptions: JSONOptions): Dataset[String] = {
+ val paths = inputPaths.map(_.getPath.toString)
+ val textOptions = Map.empty[String, String] ++
+ parsedOptions.encoding.map("encoding" -> _) ++
+ parsedOptions.lineSeparator.map("lineSep" -> _)
+
sparkSession.baseRelationToDataFrame(
DataSource.apply(
sparkSession,
- paths = inputPaths.map(_.getPath.toString),
+ paths = paths,
className = classOf[TextFileFormat].getName,
options = parsedOptions.parameters
).resolveRelation(checkFilesExist = false))
@@ -163,9 +165,7 @@ object MultiLineJsonDataSource extends JsonDataSource {
.map(enc => createParser(enc, _: JsonFactory, _: PortableDataStream))
.getOrElse(createParser(_: JsonFactory, _: PortableDataStream))
- SQLExecution.withSQLConfPropagated(sparkSession) {
- JsonInferSchema.infer[PortableDataStream](sampled, parsedOptions, parser)
- }
+ JsonInferSchema.infer[PortableDataStream](sampled, parsedOptions, parser)
}
private def createBaseRdd(
http://git-wip-us.apache.org/repos/asf/spark/blob/000e25ae/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
index 9e0ec94..daea6c3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
@@ -69,7 +69,7 @@ case class BroadcastExchangeExec(
Future {
// This will run in another thread. Set the execution id so that we can connect these jobs
// with the correct execution.
- SQLExecution.withExecutionId(sqlContext.sparkSession, executionId) {
+ SQLExecution.withExecutionId(sparkContext, executionId) {
try {
val beforeCollect = System.nanoTime()
// Use executeCollect/executeCollectIterator to avoid conversion to Scala types
http://git-wip-us.apache.org/repos/asf/spark/blob/000e25ae/sql/core/src/test/scala/org/apache/spark/sql/internal/ExecutorSideSQLConfSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/ExecutorSideSQLConfSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/ExecutorSideSQLConfSuite.scala
deleted file mode 100644
index 404d631..0000000
--- a/sql/core/src/test/scala/org/apache/spark/sql/internal/ExecutorSideSQLConfSuite.scala
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.internal
-
-import org.apache.spark.SparkFunSuite
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.test.SQLTestUtils
-
-class ExecutorSideSQLConfSuite extends SparkFunSuite with SQLTestUtils {
- import testImplicits._
-
- protected var spark: SparkSession = null
-
- // Create a new [[SparkSession]] running in local-cluster mode.
- override def beforeAll(): Unit = {
- super.beforeAll()
- spark = SparkSession.builder()
- .master("local-cluster[2,1,1024]")
- .appName("testing")
- .getOrCreate()
- }
-
- override def afterAll(): Unit = {
- spark.stop()
- spark = null
- }
-
- test("ReadonlySQLConf is correctly created at the executor side") {
- SQLConf.get.setConfString("spark.sql.x", "a")
- try {
- val checks = spark.range(10).mapPartitions { it =>
- val conf = SQLConf.get
- Iterator(conf.isInstanceOf[ReadOnlySQLConf] && conf.getConfString("spark.sql.x") == "a")
- }.collect()
- assert(checks.forall(_ == true))
- } finally {
- SQLConf.get.unsetConf("spark.sql.x")
- }
- }
-
- test("case-sensitive config should work for json schema inference") {
- withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
- withTempPath { path =>
- val pathString = path.getCanonicalPath
- spark.range(10).select('id.as("ID")).write.json(pathString)
- spark.range(10).write.mode("append").json(pathString)
- assert(spark.read.json(pathString).columns.toSet == Set("id", "ID"))
- }
- }
- }
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org