You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2018/05/20 08:14:17 UTC

spark git commit: Revert "[SPARK-24250][SQL] support accessing SQLConf inside tasks"

Repository: spark
Updated Branches:
  refs/heads/master dd37529a8 -> 000e25ae7


Revert "[SPARK-24250][SQL] support accessing SQLConf inside tasks"

This reverts commit dd37529a8dada6ed8a49b8ce50875268f6a20cba.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/000e25ae
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/000e25ae
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/000e25ae

Branch: refs/heads/master
Commit: 000e25ae7950ff005d4bbe4fffed410e5947075c
Parents: dd37529
Author: Wenchen Fan <we...@databricks.com>
Authored: Sun May 20 16:13:42 2018 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Sun May 20 16:13:42 2018 +0800

----------------------------------------------------------------------
 .../org/apache/spark/TaskContextImpl.scala      |  2 -
 .../spark/sql/internal/ReadOnlySQLConf.scala    | 66 --------------------
 .../org/apache/spark/sql/internal/SQLConf.scala | 21 ++++---
 .../org/apache/spark/sql/SparkSession.scala     | 21 +------
 .../spark/sql/execution/SQLExecution.scala      | 50 ++++-----------
 .../sql/execution/basicPhysicalOperators.scala  |  2 +-
 .../datasources/json/JsonDataSource.scala       | 16 ++---
 .../exchange/BroadcastExchangeExec.scala        |  2 +-
 .../sql/internal/ExecutorSideSQLConfSuite.scala | 66 --------------------
 9 files changed, 36 insertions(+), 210 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/000e25ae/core/src/main/scala/org/apache/spark/TaskContextImpl.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala b/core/src/main/scala/org/apache/spark/TaskContextImpl.scala
index 0791fe8..cccd3ea 100644
--- a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala
+++ b/core/src/main/scala/org/apache/spark/TaskContextImpl.scala
@@ -178,6 +178,4 @@ private[spark] class TaskContextImpl(
 
   private[spark] def fetchFailed: Option[FetchFailedException] = _fetchFailedException
 
-  // TODO: shall we publish it and define it in `TaskContext`?
-  private[spark] def getLocalProperties(): Properties = localProperties
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/000e25ae/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/ReadOnlySQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/ReadOnlySQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/ReadOnlySQLConf.scala
deleted file mode 100644
index 19f6723..0000000
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/ReadOnlySQLConf.scala
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.internal
-
-import java.util.{Map => JMap}
-
-import org.apache.spark.{TaskContext, TaskContextImpl}
-import org.apache.spark.internal.config.{ConfigEntry, ConfigProvider, ConfigReader}
-
-/**
- * A readonly SQLConf that will be created by tasks running at the executor side. It reads the
- * configs from the local properties which are propagated from driver to executors.
- */
-class ReadOnlySQLConf(context: TaskContext) extends SQLConf {
-
-  @transient override val settings: JMap[String, String] = {
-    context.asInstanceOf[TaskContextImpl].getLocalProperties().asInstanceOf[JMap[String, String]]
-  }
-
-  @transient override protected val reader: ConfigReader = {
-    new ConfigReader(new TaskContextConfigProvider(context))
-  }
-
-  override protected def setConfWithCheck(key: String, value: String): Unit = {
-    throw new UnsupportedOperationException("Cannot mutate ReadOnlySQLConf.")
-  }
-
-  override def unsetConf(key: String): Unit = {
-    throw new UnsupportedOperationException("Cannot mutate ReadOnlySQLConf.")
-  }
-
-  override def unsetConf(entry: ConfigEntry[_]): Unit = {
-    throw new UnsupportedOperationException("Cannot mutate ReadOnlySQLConf.")
-  }
-
-  override def clear(): Unit = {
-    throw new UnsupportedOperationException("Cannot mutate ReadOnlySQLConf.")
-  }
-
-  override def clone(): SQLConf = {
-    throw new UnsupportedOperationException("Cannot clone/copy ReadOnlySQLConf.")
-  }
-
-  override def copy(entries: (ConfigEntry[_], Any)*): SQLConf = {
-    throw new UnsupportedOperationException("Cannot clone/copy ReadOnlySQLConf.")
-  }
-}
-
-class TaskContextConfigProvider(context: TaskContext) extends ConfigProvider {
-  override def get(key: String): Option[String] = Option(context.getLocalProperty(key))
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/000e25ae/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 643e4c6..53a5030 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -27,12 +27,13 @@ import scala.util.matching.Regex
 
 import org.apache.hadoop.fs.Path
 
-import org.apache.spark.TaskContext
+import org.apache.spark.{SparkContext, SparkEnv}
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config._
 import org.apache.spark.network.util.ByteUnit
 import org.apache.spark.sql.catalyst.analysis.Resolver
 import org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator
+import org.apache.spark.util.Utils
 
 ////////////////////////////////////////////////////////////////////////////////////////////////////
 // This file defines the configuration options for Spark SQL.
@@ -106,13 +107,7 @@ object SQLConf {
    * run tests in parallel. At the time this feature was implemented, this was a no-op since we
    * run unit tests (that does not involve SparkSession) in serial order.
    */
-  def get: SQLConf = {
-    if (TaskContext.get != null) {
-      new ReadOnlySQLConf(TaskContext.get())
-    } else {
-      confGetter.get()()
-    }
-  }
+  def get: SQLConf = confGetter.get()()
 
   val OPTIMIZER_MAX_ITERATIONS = buildConf("spark.sql.optimizer.maxIterations")
     .internal()
@@ -1297,11 +1292,17 @@ object SQLConf {
 class SQLConf extends Serializable with Logging {
   import SQLConf._
 
+  if (Utils.isTesting && SparkEnv.get != null) {
+    // assert that we're only accessing it on the driver.
+    assert(SparkEnv.get.executorId == SparkContext.DRIVER_IDENTIFIER,
+      "SQLConf should only be created and accessed on the driver.")
+  }
+
   /** Only low degree of contention is expected for conf, thus NOT using ConcurrentHashMap. */
   @transient protected[spark] val settings = java.util.Collections.synchronizedMap(
     new java.util.HashMap[String, String]())
 
-  @transient protected val reader = new ConfigReader(settings)
+  @transient private val reader = new ConfigReader(settings)
 
   /** ************************ Spark SQL Params/Hints ******************* */
 
@@ -1764,7 +1765,7 @@ class SQLConf extends Serializable with Logging {
     settings.containsKey(key)
   }
 
-  protected def setConfWithCheck(key: String, value: String): Unit = {
+  private def setConfWithCheck(key: String, value: String): Unit = {
     settings.put(key, value)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/000e25ae/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
index e2a1a57..c502e58 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -24,7 +24,7 @@ import scala.collection.JavaConverters._
 import scala.reflect.runtime.universe.TypeTag
 import scala.util.control.NonFatal
 
-import org.apache.spark.{SPARK_VERSION, SparkConf, SparkContext, TaskContext}
+import org.apache.spark.{SPARK_VERSION, SparkConf, SparkContext}
 import org.apache.spark.annotation.{DeveloperApi, Experimental, InterfaceStability}
 import org.apache.spark.api.java.JavaRDD
 import org.apache.spark.internal.Logging
@@ -898,7 +898,6 @@ object SparkSession extends Logging {
      * @since 2.0.0
      */
     def getOrCreate(): SparkSession = synchronized {
-      assertOnDriver()
       // Get the session from current thread's active session.
       var session = activeThreadSession.get()
       if ((session ne null) && !session.sparkContext.isStopped) {
@@ -1023,20 +1022,14 @@ object SparkSession extends Logging {
    *
    * @since 2.2.0
    */
-  def getActiveSession: Option[SparkSession] = {
-    assertOnDriver()
-    Option(activeThreadSession.get)
-  }
+  def getActiveSession: Option[SparkSession] = Option(activeThreadSession.get)
 
   /**
    * Returns the default SparkSession that is returned by the builder.
    *
    * @since 2.2.0
    */
-  def getDefaultSession: Option[SparkSession] = {
-    assertOnDriver()
-    Option(defaultSession.get)
-  }
+  def getDefaultSession: Option[SparkSession] = Option(defaultSession.get)
 
   /**
    * Returns the currently active SparkSession, otherwise the default one. If there is no default
@@ -1069,14 +1062,6 @@ object SparkSession extends Logging {
     }
   }
 
-  private def assertOnDriver(): Unit = {
-    if (Utils.isTesting && TaskContext.get != null) {
-      // we're accessing it during task execution, fail.
-      throw new IllegalStateException(
-        "SparkSession should only be created and accessed on the driver.")
-    }
-  }
-
   /**
    * Helper method to create an instance of `SessionState` based on `className` from conf.
    * The result is either `SessionState` or a Hive based `SessionState`.

http://git-wip-us.apache.org/repos/asf/spark/blob/000e25ae/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
index 032525a..2c5102b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
@@ -68,18 +68,16 @@ object SQLExecution {
       // sparkContext.getCallSite() would first try to pick up any call site that was previously
       // set, then fall back to Utils.getCallSite(); call Utils.getCallSite() directly on
       // streaming queries would give us call site like "run at <unknown>:0"
-      val callSite = sc.getCallSite()
+      val callSite = sparkSession.sparkContext.getCallSite()
 
-      withSQLConfPropagated(sparkSession) {
-        sc.listenerBus.post(SparkListenerSQLExecutionStart(
-          executionId, callSite.shortForm, callSite.longForm, queryExecution.toString,
-          SparkPlanInfo.fromSparkPlan(queryExecution.executedPlan), System.currentTimeMillis()))
-        try {
-          body
-        } finally {
-          sc.listenerBus.post(SparkListenerSQLExecutionEnd(
-            executionId, System.currentTimeMillis()))
-        }
+      sparkSession.sparkContext.listenerBus.post(SparkListenerSQLExecutionStart(
+        executionId, callSite.shortForm, callSite.longForm, queryExecution.toString,
+        SparkPlanInfo.fromSparkPlan(queryExecution.executedPlan), System.currentTimeMillis()))
+      try {
+        body
+      } finally {
+        sparkSession.sparkContext.listenerBus.post(SparkListenerSQLExecutionEnd(
+          executionId, System.currentTimeMillis()))
       }
     } finally {
       executionIdToQueryExecution.remove(executionId)
@@ -92,37 +90,13 @@ object SQLExecution {
    * thread from the original one, this method can be used to connect the Spark jobs in this action
    * with the known executionId, e.g., `BroadcastExchangeExec.relationFuture`.
    */
-  def withExecutionId[T](sparkSession: SparkSession, executionId: String)(body: => T): T = {
-    val sc = sparkSession.sparkContext
+  def withExecutionId[T](sc: SparkContext, executionId: String)(body: => T): T = {
     val oldExecutionId = sc.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
-    withSQLConfPropagated(sparkSession) {
-      try {
-        sc.setLocalProperty(SQLExecution.EXECUTION_ID_KEY, executionId)
-        body
-      } finally {
-        sc.setLocalProperty(SQLExecution.EXECUTION_ID_KEY, oldExecutionId)
-      }
-    }
-  }
-
-  def withSQLConfPropagated[T](sparkSession: SparkSession)(body: => T): T = {
-    val sc = sparkSession.sparkContext
-    // Set all the specified SQL configs to local properties, so that they can be available at
-    // the executor side.
-    val allConfigs = sparkSession.sessionState.conf.getAllConfs
-    val originalLocalProps = allConfigs.collect {
-      case (key, value) if key.startsWith("spark") =>
-        val originalValue = sc.getLocalProperty(key)
-        sc.setLocalProperty(key, value)
-        (key, originalValue)
-    }
-
     try {
+      sc.setLocalProperty(SQLExecution.EXECUTION_ID_KEY, executionId)
       body
     } finally {
-      for ((key, value) <- originalLocalProps) {
-        sc.setLocalProperty(key, value)
-      }
+      sc.setLocalProperty(SQLExecution.EXECUTION_ID_KEY, oldExecutionId)
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/000e25ae/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
index d54bfbf..1edfdc8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
@@ -629,7 +629,7 @@ case class SubqueryExec(name: String, child: SparkPlan) extends UnaryExecNode {
     Future {
       // This will run in another thread. Set the execution id so that we can connect these jobs
       // with the correct execution.
-      SQLExecution.withExecutionId(sqlContext.sparkSession, executionId) {
+      SQLExecution.withExecutionId(sparkContext, executionId) {
         val beforeCollect = System.nanoTime()
         // Note that we use .executeCollect() because we don't want to convert data to Scala types
         val rows: Array[InternalRow] = child.executeCollect()

http://git-wip-us.apache.org/repos/asf/spark/blob/000e25ae/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala
index 3b6df45..ba83df0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala
@@ -34,7 +34,6 @@ import org.apache.spark.rdd.{BinaryFileRDD, RDD}
 import org.apache.spark.sql.{Dataset, Encoders, SparkSession}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.json.{CreateJacksonParser, JacksonParser, JSONOptions}
-import org.apache.spark.sql.execution.SQLExecution
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.execution.datasources.text.TextFileFormat
 import org.apache.spark.sql.types.StructType
@@ -105,19 +104,22 @@ object TextInputJsonDataSource extends JsonDataSource {
       CreateJacksonParser.internalRow(enc, _: JsonFactory, _: InternalRow)
     }.getOrElse(CreateJacksonParser.internalRow(_: JsonFactory, _: InternalRow))
 
-    SQLExecution.withSQLConfPropagated(json.sparkSession) {
-      JsonInferSchema.infer(rdd, parsedOptions, rowParser)
-    }
+    JsonInferSchema.infer(rdd, parsedOptions, rowParser)
   }
 
   private def createBaseDataset(
       sparkSession: SparkSession,
       inputPaths: Seq[FileStatus],
       parsedOptions: JSONOptions): Dataset[String] = {
+    val paths = inputPaths.map(_.getPath.toString)
+    val textOptions = Map.empty[String, String] ++
+      parsedOptions.encoding.map("encoding" -> _) ++
+      parsedOptions.lineSeparator.map("lineSep" -> _)
+
     sparkSession.baseRelationToDataFrame(
       DataSource.apply(
         sparkSession,
-        paths = inputPaths.map(_.getPath.toString),
+        paths = paths,
         className = classOf[TextFileFormat].getName,
         options = parsedOptions.parameters
       ).resolveRelation(checkFilesExist = false))
@@ -163,9 +165,7 @@ object MultiLineJsonDataSource extends JsonDataSource {
       .map(enc => createParser(enc, _: JsonFactory, _: PortableDataStream))
       .getOrElse(createParser(_: JsonFactory, _: PortableDataStream))
 
-    SQLExecution.withSQLConfPropagated(sparkSession) {
-      JsonInferSchema.infer[PortableDataStream](sampled, parsedOptions, parser)
-    }
+    JsonInferSchema.infer[PortableDataStream](sampled, parsedOptions, parser)
   }
 
   private def createBaseRdd(

http://git-wip-us.apache.org/repos/asf/spark/blob/000e25ae/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
index 9e0ec94..daea6c3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
@@ -69,7 +69,7 @@ case class BroadcastExchangeExec(
     Future {
       // This will run in another thread. Set the execution id so that we can connect these jobs
       // with the correct execution.
-      SQLExecution.withExecutionId(sqlContext.sparkSession, executionId) {
+      SQLExecution.withExecutionId(sparkContext, executionId) {
         try {
           val beforeCollect = System.nanoTime()
           // Use executeCollect/executeCollectIterator to avoid conversion to Scala types

http://git-wip-us.apache.org/repos/asf/spark/blob/000e25ae/sql/core/src/test/scala/org/apache/spark/sql/internal/ExecutorSideSQLConfSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/ExecutorSideSQLConfSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/ExecutorSideSQLConfSuite.scala
deleted file mode 100644
index 404d631..0000000
--- a/sql/core/src/test/scala/org/apache/spark/sql/internal/ExecutorSideSQLConfSuite.scala
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.internal
-
-import org.apache.spark.SparkFunSuite
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.test.SQLTestUtils
-
-class ExecutorSideSQLConfSuite extends SparkFunSuite with SQLTestUtils {
-  import testImplicits._
-
-  protected var spark: SparkSession = null
-
-  // Create a new [[SparkSession]] running in local-cluster mode.
-  override def beforeAll(): Unit = {
-    super.beforeAll()
-    spark = SparkSession.builder()
-      .master("local-cluster[2,1,1024]")
-      .appName("testing")
-      .getOrCreate()
-  }
-
-  override def afterAll(): Unit = {
-    spark.stop()
-    spark = null
-  }
-
-  test("ReadonlySQLConf is correctly created at the executor side") {
-    SQLConf.get.setConfString("spark.sql.x", "a")
-    try {
-      val checks = spark.range(10).mapPartitions { it =>
-        val conf = SQLConf.get
-        Iterator(conf.isInstanceOf[ReadOnlySQLConf] && conf.getConfString("spark.sql.x") == "a")
-      }.collect()
-      assert(checks.forall(_ == true))
-    } finally {
-      SQLConf.get.unsetConf("spark.sql.x")
-    }
-  }
-
-  test("case-sensitive config should work for json schema inference") {
-    withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
-      withTempPath { path =>
-        val pathString = path.getCanonicalPath
-        spark.range(10).select('id.as("ID")).write.json(pathString)
-        spark.range(10).write.mode("append").json(pathString)
-        assert(spark.read.json(pathString).columns.toSet == Set("id", "ID"))
-      }
-    }
-  }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org