You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2016/04/26 02:52:33 UTC

spark git commit: [SPARK-14902][SQL] Expose RuntimeConfig in SparkSession

Repository: spark
Updated Branches:
  refs/heads/master f36c9c837 -> cfa64882f


[SPARK-14902][SQL] Expose RuntimeConfig in SparkSession

## What changes were proposed in this pull request?

`RuntimeConfig` is the new user-facing API in 2.0 added in #11378. Until now, however, it's been dead code. This patch uses `RuntimeConfig` in `SessionState` and exposes that through the `SparkSession`.

## How was this patch tested?

New test in `SQLContextSuite`.

Author: Andrew Or <an...@databricks.com>

Closes #12669 from andrewor14/use-runtime-conf.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cfa64882
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cfa64882
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cfa64882

Branch: refs/heads/master
Commit: cfa64882fc0728d7becf55b8a424926e4ca93887
Parents: f36c9c8
Author: Andrew Or <an...@databricks.com>
Authored: Mon Apr 25 17:52:25 2016 -0700
Committer: Reynold Xin <rx...@databricks.com>
Committed: Mon Apr 25 17:52:25 2016 -0700

----------------------------------------------------------------------
 .../org/apache/spark/sql/DataFrameWriter.scala  |  2 +-
 .../scala/org/apache/spark/sql/SQLContext.scala |  3 +-
 .../org/apache/spark/sql/SparkSession.scala     | 37 +++++++++++++------
 .../sql/execution/command/AnalyzeTable.scala    |  2 +-
 .../spark/sql/execution/command/tables.scala    |  2 +-
 .../sql/execution/datasources/DataSource.scala  |  8 ++---
 .../InsertIntoHadoopFsRelation.scala            |  3 +-
 .../datasources/csv/DefaultSource.scala         |  2 +-
 .../datasources/fileSourceInterfaces.scala      |  2 +-
 .../datasources/json/JSONRelation.scala         |  4 +--
 .../datasources/parquet/ParquetRelation.scala   |  4 +--
 .../datasources/text/DefaultSource.scala        |  2 +-
 .../execution/streaming/FileStreamSink.scala    |  2 +-
 .../execution/streaming/FileStreamSource.scala  |  2 +-
 .../execution/streaming/HDFSMetadataLog.scala   |  2 +-
 .../execution/streaming/StatefulAggregate.scala |  4 +--
 .../execution/streaming/StreamFileCatalog.scala |  2 +-
 .../streaming/state/StateStoreRDD.scala         |  7 ++--
 .../sql/execution/streaming/state/package.scala |  7 ++--
 .../spark/sql/internal/RuntimeConfigImpl.scala  | 38 ++++++++++----------
 .../spark/sql/internal/SessionState.scala       |  3 ++
 .../org/apache/spark/sql/SQLContextSuite.scala  | 23 ++++++++++++
 .../datasources/parquet/ParquetIOSuite.scala    |  8 ++---
 .../streaming/FileStreamSinkLogSuite.scala      |  2 +-
 .../streaming/HDFSMetadataLogSuite.scala        |  2 +-
 .../apache/spark/sql/test/SQLTestUtils.scala    |  2 +-
 .../apache/spark/sql/test/TestSQLContext.scala  |  4 +--
 .../spark/sql/hive/HiveMetastoreCatalog.scala   |  2 +-
 .../spark/sql/hive/HiveSessionState.scala       |  1 +
 .../apache/spark/sql/hive/orc/OrcRelation.scala |  6 ++--
 .../apache/spark/sql/hive/test/TestHive.scala   |  4 +--
 .../sql/hive/MetastoreDataSourcesSuite.scala    |  2 +-
 .../sql/hive/execution/HiveCommandSuite.scala   |  6 ++--
 .../spark/sql/hive/execution/HiveDDLSuite.scala |  2 +-
 .../sql/hive/orc/OrcHadoopFsRelationSuite.scala |  2 +-
 .../spark/sql/hive/orc/OrcQuerySuite.scala      |  2 +-
 .../spark/sql/sources/SimpleTextRelation.scala  |  2 +-
 .../sql/sources/hadoopFsRelationSuites.scala    |  4 +--
 38 files changed, 131 insertions(+), 81 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/cfa64882/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 0745ef4..99d92b9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -297,7 +297,7 @@ final class DataFrameWriter private[sql](df: DataFrame) {
 
       // If offsets have already been created, we trying to resume a query.
       val checkpointPath = new Path(checkpointLocation, "offsets")
-      val fs = checkpointPath.getFileSystem(df.sqlContext.sparkContext.hadoopConfiguration)
+      val fs = checkpointPath.getFileSystem(df.sqlContext.sessionState.hadoopConf)
       if (fs.exists(checkpointPath)) {
         throw new AnalysisException(
           s"Unable to resume query written to memory sink. Delete $checkpointPath to start over.")

http://git-wip-us.apache.org/repos/asf/spark/blob/cfa64882/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 4c9977c..dde1396 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -103,7 +103,8 @@ class SQLContext private[sql](
 
   protected[sql] def sessionState: SessionState = sparkSession.sessionState
   protected[sql] def sharedState: SharedState = sparkSession.sharedState
-  protected[sql] def conf: SQLConf = sparkSession.conf
+  protected[sql] def conf: SQLConf = sessionState.conf
+  protected[sql] def runtimeConf: RuntimeConfig = sparkSession.conf
   protected[sql] def cacheManager: CacheManager = sparkSession.cacheManager
   protected[sql] def listener: SQLListener = sparkSession.listener
   protected[sql] def externalCatalog: ExternalCatalog = sparkSession.externalCatalog

http://git-wip-us.apache.org/repos/asf/spark/blob/cfa64882/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
index 3561765..00256bd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -40,7 +40,7 @@ import org.apache.spark.sql.execution._
 import org.apache.spark.sql.execution.command.ShowTablesCommand
 import org.apache.spark.sql.execution.datasources.{CreateTableUsing, LogicalRelation}
 import org.apache.spark.sql.execution.ui.SQLListener
-import org.apache.spark.sql.internal.{SessionState, SharedState, SQLConf}
+import org.apache.spark.sql.internal.{RuntimeConfigImpl, SessionState, SharedState}
 import org.apache.spark.sql.sources.BaseRelation
 import org.apache.spark.sql.types.{DataType, LongType, StructType}
 import org.apache.spark.sql.util.ExecutionListenerManager
@@ -76,8 +76,8 @@ class SparkSession private(
   }
 
   /**
-   * State isolated across sessions, including SQL configurations, temporary tables,
-   * registered functions, and everything else that accepts a [[SQLConf]].
+   * State isolated across sessions, including SQL configurations, temporary tables, registered
+   * functions, and everything else that accepts a [[org.apache.spark.sql.internal.SQLConf]].
    */
   @transient
   protected[sql] lazy val sessionState: SessionState = {
@@ -103,7 +103,6 @@ class SparkSession private(
     _wrapped = sqlContext
   }
 
-  protected[sql] def conf: SQLConf = sessionState.conf
   protected[sql] def cacheManager: CacheManager = sharedState.cacheManager
   protected[sql] def listener: SQLListener = sharedState.listener
   protected[sql] def externalCatalog: ExternalCatalog = sharedState.externalCatalog
@@ -191,6 +190,22 @@ class SparkSession private(
    |  Methods for accessing or mutating configurations  |
    * -------------------------------------------------- */
 
+  @transient private lazy val _conf: RuntimeConfig = {
+    new RuntimeConfigImpl(sessionState.conf, sessionState.hadoopConf)
+  }
+
+  /**
+   * Runtime configuration interface for Spark.
+   *
+   * This is the interface through which the user can get and set all Spark and Hadoop
+   * configurations that are relevant to Spark SQL. When getting the value of a config,
+   * this defaults to the value set in the underlying [[SparkContext]], if any.
+   *
+   * @group config
+   * @since 2.0.0
+   */
+  def conf: RuntimeConfig = _conf
+
   /**
    * Set Spark SQL configuration properties.
    *
@@ -213,7 +228,7 @@ class SparkSession private(
    * @group config
    * @since 2.0.0
    */
-  def getConf(key: String): String = conf.getConfString(key)
+  def getConf(key: String): String = sessionState.conf.getConfString(key)
 
   /**
    * Return the value of Spark SQL configuration property for the given key. If the key is not set
@@ -222,7 +237,9 @@ class SparkSession private(
    * @group config
    * @since 2.0.0
    */
-  def getConf(key: String, defaultValue: String): String = conf.getConfString(key, defaultValue)
+  def getConf(key: String, defaultValue: String): String = {
+    sessionState.conf.getConfString(key, defaultValue)
+  }
 
   /**
    * Return all the configuration properties that have been set (i.e. not the default).
@@ -231,7 +248,7 @@ class SparkSession private(
    * @group config
    * @since 2.0.0
    */
-  def getAllConfs: immutable.Map[String, String] = conf.getAllConfs
+  def getAllConfs: immutable.Map[String, String] = sessionState.conf.getAllConfs
 
   /**
    * Set the given Spark SQL configuration property.
@@ -244,7 +261,7 @@ class SparkSession private(
    * Return the value of Spark SQL configuration property for the given key. If the key is not set
    * yet, return `defaultValue` in [[ConfigEntry]].
    */
-  protected[sql] def getConf[T](entry: ConfigEntry[T]): T = conf.getConf(entry)
+  protected[sql] def getConf[T](entry: ConfigEntry[T]): T = sessionState.conf.getConf(entry)
 
   /**
    * Return the value of Spark SQL configuration property for the given key. If the key is not set
@@ -252,7 +269,7 @@ class SparkSession private(
    * desired one.
    */
   protected[sql] def getConf[T](entry: ConfigEntry[T], defaultValue: T): T = {
-    conf.getConf(entry, defaultValue)
+    sessionState.conf.getConf(entry, defaultValue)
   }
 
 
@@ -601,7 +618,7 @@ class SparkSession private(
    */
   @Experimental
   def createExternalTable(tableName: String, path: String): DataFrame = {
-    val dataSourceName = conf.defaultDataSourceName
+    val dataSourceName = sessionState.conf.defaultDataSourceName
     createExternalTable(tableName, path, dataSourceName)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/cfa64882/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTable.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTable.scala
index 7fa246b..e6c5351 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTable.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTable.scala
@@ -77,7 +77,7 @@ case class AnalyzeTable(tableName: String) extends RunnableCommand {
           catalogTable.storage.locationUri.map { p =>
             val path = new Path(p)
             try {
-              val fs = path.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
+              val fs = path.getFileSystem(sqlContext.sessionState.hadoopConf)
               calculateTableSize(fs, path)
             } catch {
               case NonFatal(e) =>

http://git-wip-us.apache.org/repos/asf/spark/blob/cfa64882/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index eae8fe8..5cac9d8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -210,7 +210,7 @@ case class LoadData(
           // Follow Hive's behavior:
           // If no schema or authority is provided with non-local inpath,
           // we will use hadoop configuration "fs.default.name".
-          val defaultFSConf = sqlContext.sparkContext.hadoopConfiguration.get("fs.default.name")
+          val defaultFSConf = sqlContext.sessionState.hadoopConf.get("fs.default.name")
           val defaultFS = if (defaultFSConf == null) {
             new URI("")
           } else {

http://git-wip-us.apache.org/repos/asf/spark/blob/cfa64882/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 07bc8ae..4e7214c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -131,7 +131,7 @@ case class DataSource(
     val allPaths = caseInsensitiveOptions.get("path")
     val globbedPaths = allPaths.toSeq.flatMap { path =>
       val hdfsPath = new Path(path)
-      val fs = hdfsPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
+      val fs = hdfsPath.getFileSystem(sqlContext.sessionState.hadoopConf)
       val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
       SparkHadoopUtil.get.globPathIfNecessary(qualified)
     }.toArray
@@ -225,7 +225,7 @@ case class DataSource(
       case Seq(singlePath) =>
         try {
           val hdfsPath = new Path(singlePath)
-          val fs = hdfsPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
+          val fs = hdfsPath.getFileSystem(sqlContext.sessionState.hadoopConf)
           val metadataPath = new Path(hdfsPath, FileStreamSink.metadataDir)
           val res = fs.exists(metadataPath)
           res
@@ -284,7 +284,7 @@ case class DataSource(
         val allPaths = caseInsensitiveOptions.get("path") ++ paths
         val globbedPaths = allPaths.flatMap { path =>
           val hdfsPath = new Path(path)
-          val fs = hdfsPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
+          val fs = hdfsPath.getFileSystem(sqlContext.sessionState.hadoopConf)
           val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
           val globPath = SparkHadoopUtil.get.globPathIfNecessary(qualified)
 
@@ -374,7 +374,7 @@ case class DataSource(
           val path = new Path(caseInsensitiveOptions.getOrElse("path", {
             throw new IllegalArgumentException("'path' is not specified")
           }))
-          val fs = path.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
+          val fs = path.getFileSystem(sqlContext.sessionState.hadoopConf)
           path.makeQualified(fs.getUri, fs.getWorkingDirectory)
         }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/cfa64882/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala
index 95629a9..a636ca2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelation.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.datasources
 
 import java.io.IOException
 
+import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.mapreduce._
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
@@ -77,7 +78,7 @@ private[sql] case class InsertIntoHadoopFsRelation(
           s"cannot save to file.")
     }
 
-    val hadoopConf = sqlContext.sparkContext.hadoopConfiguration
+    val hadoopConf = new Configuration(sqlContext.sessionState.hadoopConf)
     val fs = outputPath.getFileSystem(hadoopConf)
     val qualifiedOutputPath = outputPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/cfa64882/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala
index 61ec7ed..7d407a7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala
@@ -103,7 +103,7 @@ class DefaultSource extends FileFormat with DataSourceRegister {
     val csvOptions = new CSVOptions(options)
     val headers = requiredSchema.fields.map(_.name)
 
-    val conf = new Configuration(sqlContext.sparkContext.hadoopConfiguration)
+    val conf = new Configuration(sqlContext.sessionState.hadoopConf)
     val broadcastedConf = sqlContext.sparkContext.broadcast(new SerializableConfiguration(conf))
 
     (file: PartitionedFile) => {

http://git-wip-us.apache.org/repos/asf/spark/blob/cfa64882/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
index 4063c6e..731b004 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
@@ -271,7 +271,7 @@ class HDFSFileCatalog(
     val partitionSchema: Option[StructType])
   extends FileCatalog with Logging {
 
-  private val hadoopConf = new Configuration(sqlContext.sparkContext.hadoopConfiguration)
+  private val hadoopConf = new Configuration(sqlContext.sessionState.hadoopConf)
 
   var leafFiles = mutable.LinkedHashMap.empty[Path, FileStatus]
   var leafDirToChildrenFiles = mutable.Map.empty[Path, Array[FileStatus]]

http://git-wip-us.apache.org/repos/asf/spark/blob/cfa64882/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
index 7773ff5..580a0e1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
@@ -98,7 +98,7 @@ class DefaultSource extends FileFormat with DataSourceRegister {
       requiredSchema: StructType,
       filters: Seq[Filter],
       options: Map[String, String]): PartitionedFile => Iterator[InternalRow] = {
-    val conf = new Configuration(sqlContext.sparkContext.hadoopConfiguration)
+    val conf = new Configuration(sqlContext.sessionState.hadoopConf)
     val broadcastedConf =
       sqlContext.sparkContext.broadcast(new SerializableConfiguration(conf))
 
@@ -126,7 +126,7 @@ class DefaultSource extends FileFormat with DataSourceRegister {
   }
 
   private def createBaseRdd(sqlContext: SQLContext, inputPaths: Seq[FileStatus]): RDD[String] = {
-    val job = Job.getInstance(sqlContext.sparkContext.hadoopConfiguration)
+    val job = Job.getInstance(sqlContext.sessionState.hadoopConf)
     val conf = job.getConfiguration
 
     val paths = inputPaths.map(_.getPath)

http://git-wip-us.apache.org/repos/asf/spark/blob/cfa64882/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
index bbbbc5e..28c6664 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
@@ -263,7 +263,7 @@ private[sql] class DefaultSource
       requiredSchema: StructType,
       filters: Seq[Filter],
       options: Map[String, String]): PartitionedFile => Iterator[InternalRow] = {
-    val parquetConf = new Configuration(sqlContext.sparkContext.hadoopConfiguration)
+    val parquetConf = new Configuration(sqlContext.sessionState.hadoopConf)
     parquetConf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[CatalystReadSupport].getName)
     parquetConf.set(
       CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA,
@@ -648,7 +648,7 @@ private[sql] object ParquetRelation extends Logging {
     val assumeBinaryIsString = sqlContext.conf.isParquetBinaryAsString
     val assumeInt96IsTimestamp = sqlContext.conf.isParquetINT96AsTimestamp
     val writeLegacyParquetFormat = sqlContext.conf.writeLegacyParquetFormat
-    val serializedConf = new SerializableConfiguration(sqlContext.sparkContext.hadoopConfiguration)
+    val serializedConf = new SerializableConfiguration(sqlContext.sessionState.hadoopConf)
 
     // !! HACK ALERT !!
     //

http://git-wip-us.apache.org/repos/asf/spark/blob/cfa64882/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
index fa0df61..f7ac1ac 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala
@@ -90,7 +90,7 @@ class DefaultSource extends FileFormat with DataSourceRegister {
       requiredSchema: StructType,
       filters: Seq[Filter],
       options: Map[String, String]): PartitionedFile => Iterator[InternalRow] = {
-    val conf = new Configuration(sqlContext.sparkContext.hadoopConfiguration)
+    val conf = new Configuration(sqlContext.sessionState.hadoopConf)
     val broadcastedConf =
       sqlContext.sparkContext.broadcast(new SerializableConfiguration(conf))
 

http://git-wip-us.apache.org/repos/asf/spark/blob/cfa64882/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
index 4f722a5..a861088 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
@@ -45,7 +45,7 @@ class FileStreamSink(
   private val basePath = new Path(path)
   private val logPath = new Path(basePath, FileStreamSink.metadataDir)
   private val fileLog = new FileStreamSinkLog(sqlContext, logPath.toUri.toString)
-  private val fs = basePath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
+  private val fs = basePath.getFileSystem(sqlContext.sessionState.hadoopConf)
 
   override def addBatch(batchId: Long, data: DataFrame): Unit = {
     if (batchId <= fileLog.getLatest().map(_._1).getOrElse(-1L)) {

http://git-wip-us.apache.org/repos/asf/spark/blob/cfa64882/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
index 51c3aee..aeb64c9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
@@ -38,7 +38,7 @@ class FileStreamSource(
     override val schema: StructType,
     dataFrameBuilder: Array[String] => DataFrame) extends Source with Logging {
 
-  private val fs = new Path(path).getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
+  private val fs = new Path(path).getFileSystem(sqlContext.sessionState.hadoopConf)
   private val metadataLog = new HDFSMetadataLog[Seq[String]](sqlContext, metadataPath)
   private var maxBatchId = metadataLog.getLatest().map(_._1).getOrElse(-1L)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/cfa64882/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
index b52f7a2..dd6760d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
@@ -212,7 +212,7 @@ class HDFSMetadataLog[T: ClassTag](sqlContext: SQLContext, path: String)
   }
 
   private def createFileManager(): FileManager = {
-    val hadoopConf = sqlContext.sparkContext.hadoopConfiguration
+    val hadoopConf = new Configuration(sqlContext.sessionState.hadoopConf)
     try {
       new FileContextManager(metadataPath, hadoopConf)
     } catch {

http://git-wip-us.apache.org/repos/asf/spark/blob/cfa64882/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala
index de4305f..d5e4dd8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala
@@ -63,7 +63,7 @@ case class StateStoreRestoreExec(
       storeVersion = getStateId.batchId,
       keyExpressions.toStructType,
       child.output.toStructType,
-      new StateStoreConf(sqlContext.conf),
+      sqlContext.sessionState,
       Some(sqlContext.streams.stateStoreCoordinator)) { case (store, iter) =>
         val getKey = GenerateUnsafeProjection.generate(keyExpressions, child.output)
         iter.flatMap { row =>
@@ -92,7 +92,7 @@ case class StateStoreSaveExec(
       storeVersion = getStateId.batchId,
       keyExpressions.toStructType,
       child.output.toStructType,
-      new StateStoreConf(sqlContext.conf),
+      sqlContext.sessionState,
       Some(sqlContext.streams.stateStoreCoordinator)) { case (store, iter) =>
         new Iterator[InternalRow] {
           private[this] val baseIterator = iter

http://git-wip-us.apache.org/repos/asf/spark/blob/cfa64882/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamFileCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamFileCatalog.scala
index 95b5129..a08a4bb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamFileCatalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamFileCatalog.scala
@@ -30,7 +30,7 @@ class StreamFileCatalog(sqlContext: SQLContext, path: Path) extends FileCatalog
   val metadataDirectory = new Path(path, FileStreamSink.metadataDir)
   logInfo(s"Reading streaming file log from $metadataDirectory")
   val metadataLog = new FileStreamSinkLog(sqlContext, metadataDirectory.toUri.toString)
-  val fs = path.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
+  val fs = path.getFileSystem(sqlContext.sessionState.hadoopConf)
 
   override def paths: Seq[Path] = path :: Nil
 

http://git-wip-us.apache.org/repos/asf/spark/blob/cfa64882/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala
index d708486..635bb86 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala
@@ -21,6 +21,7 @@ import scala.reflect.ClassTag
 
 import org.apache.spark.{Partition, TaskContext}
 import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.internal.SessionState
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.SerializableConfiguration
 
@@ -37,13 +38,15 @@ class StateStoreRDD[T: ClassTag, U: ClassTag](
     storeVersion: Long,
     keySchema: StructType,
     valueSchema: StructType,
-    storeConf: StateStoreConf,
+    sessionState: SessionState,
     @transient private val storeCoordinator: Option[StateStoreCoordinatorRef])
   extends RDD[U](dataRDD) {
 
+  private val storeConf = new StateStoreConf(sessionState.conf)
+
   // A Hadoop Configuration can be about 10 KB, which is pretty big, so broadcast it
   private val confBroadcast = dataRDD.context.broadcast(
-    new SerializableConfiguration(dataRDD.context.hadoopConfiguration))
+    new SerializableConfiguration(sessionState.hadoopConf))
 
   override protected def getPartitions: Array[Partition] = dataRDD.partitions
 

http://git-wip-us.apache.org/repos/asf/spark/blob/cfa64882/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/package.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/package.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/package.scala
index 9b6d091..4914a9d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/package.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/package.scala
@@ -21,6 +21,7 @@ import scala.reflect.ClassTag
 
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.internal.SessionState
 import org.apache.spark.sql.types.StructType
 
 package object state {
@@ -43,7 +44,7 @@ package object state {
         storeVersion,
         keySchema,
         valueSchema,
-        new StateStoreConf(sqlContext.conf),
+        sqlContext.sessionState,
         Some(sqlContext.streams.stateStoreCoordinator))(
         storeUpdateFunction)
     }
@@ -55,7 +56,7 @@ package object state {
         storeVersion: Long,
         keySchema: StructType,
         valueSchema: StructType,
-        storeConf: StateStoreConf,
+        sessionState: SessionState,
         storeCoordinator: Option[StateStoreCoordinatorRef])(
         storeUpdateFunction: (StateStore, Iterator[T]) => Iterator[U]): StateStoreRDD[T, U] = {
       val cleanedF = dataRDD.sparkContext.clean(storeUpdateFunction)
@@ -67,7 +68,7 @@ package object state {
         storeVersion,
         keySchema,
         valueSchema,
-        storeConf,
+        sessionState,
         storeCoordinator)
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/cfa64882/sql/core/src/main/scala/org/apache/spark/sql/internal/RuntimeConfigImpl.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/RuntimeConfigImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/RuntimeConfigImpl.scala
index 058df1e..1373235 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/RuntimeConfigImpl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/RuntimeConfigImpl.scala
@@ -17,20 +17,21 @@
 
 package org.apache.spark.sql.internal
 
+import org.apache.hadoop.conf.Configuration
+
 import org.apache.spark.sql.RuntimeConfig
 
+
 /**
  * Implementation for [[RuntimeConfig]].
  */
-class RuntimeConfigImpl extends RuntimeConfig {
-
-  private val conf = new SQLConf
-
-  private val hadoopConf = java.util.Collections.synchronizedMap(
-    new java.util.HashMap[String, String]())
+class RuntimeConfigImpl(
+    sqlConf: SQLConf = new SQLConf,
+    hadoopConf: Configuration = new Configuration)
+  extends RuntimeConfig {
 
   override def set(key: String, value: String): RuntimeConfig = {
-    conf.setConfString(key, value)
+    sqlConf.setConfString(key, value)
     this
   }
 
@@ -39,7 +40,7 @@ class RuntimeConfigImpl extends RuntimeConfig {
   override def set(key: String, value: Long): RuntimeConfig = set(key, value.toString)
 
   @throws[NoSuchElementException]("if the key is not set")
-  override def get(key: String): String = conf.getConfString(key)
+  override def get(key: String): String = sqlConf.getConfString(key)
 
   override def getOption(key: String): Option[String] = {
     try Option(get(key)) catch {
@@ -47,27 +48,26 @@ class RuntimeConfigImpl extends RuntimeConfig {
     }
   }
 
-  override def unset(key: String): Unit = conf.unsetConf(key)
+  override def unset(key: String): Unit = sqlConf.unsetConf(key)
 
-  override def setHadoop(key: String, value: String): RuntimeConfig = {
-    hadoopConf.put(key, value)
+  override def setHadoop(key: String, value: String): RuntimeConfig = hadoopConf.synchronized {
+    hadoopConf.set(key, value)
     this
   }
 
   @throws[NoSuchElementException]("if the key is not set")
   override def getHadoop(key: String): String = hadoopConf.synchronized {
-    if (hadoopConf.containsKey(key)) {
-      hadoopConf.get(key)
-    } else {
+    Option(hadoopConf.get(key)).getOrElse {
       throw new NoSuchElementException(key)
     }
   }
 
-  override def getHadoopOption(key: String): Option[String] = {
-    try Option(getHadoop(key)) catch {
-      case _: NoSuchElementException => None
-    }
+  override def getHadoopOption(key: String): Option[String] = hadoopConf.synchronized {
+    Option(hadoopConf.get(key))
+  }
+
+  override def unsetHadoop(key: String): Unit = hadoopConf.synchronized {
+    hadoopConf.unset(key)
   }
 
-  override def unsetHadoop(key: String): Unit = hadoopConf.remove(key)
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/cfa64882/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
index f683bbb..04ad729 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
@@ -21,6 +21,8 @@ import java.util.Properties
 
 import scala.collection.JavaConverters._
 
+import org.apache.hadoop.conf.Configuration
+
 import org.apache.spark.internal.config.ConfigEntry
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry}
@@ -46,6 +48,7 @@ private[sql] class SessionState(ctx: SQLContext) {
    * SQL-specific key-value configurations.
    */
   lazy val conf: SQLConf = new SQLConf
+  lazy val hadoopConf: Configuration = new Configuration(ctx.sparkContext.hadoopConfiguration)
 
   // Automatically extract `spark.sql.*` entries and put it in our SQLConf
   setConf(SQLContext.getSQLProperties(ctx.sparkContext.getConf))

http://git-wip-us.apache.org/repos/asf/spark/blob/cfa64882/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala
index 2f62ad4..d49cc10 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala
@@ -85,4 +85,27 @@ class SQLContextSuite extends SparkFunSuite with SharedSparkContext {
     assert(sqlContext.getConf("spark.sql.with.or.without.you") == "my love")
   }
 
+  test("Hadoop conf interaction between SQLContext and SparkContext") {
+    val mySpecialKey = "mai.special.key"
+    val mySpecialValue = "msv"
+    try {
+      sc.hadoopConfiguration.set(mySpecialKey, mySpecialValue)
+      val sqlContext = SQLContext.getOrCreate(sc)
+      val sessionState = sqlContext.sessionState
+      assert(sessionState.hadoopConf.get(mySpecialKey) === mySpecialValue)
+      assert(sqlContext.runtimeConf.getHadoop(mySpecialKey) === mySpecialValue)
+      // mutating hadoop conf in SQL doesn't mutate the underlying one
+      sessionState.hadoopConf.set(mySpecialKey, "no no no")
+      assert(sessionState.hadoopConf.get(mySpecialKey) === "no no no")
+      assert(sqlContext.runtimeConf.getHadoop(mySpecialKey) === "no no no")
+      assert(sc.hadoopConfiguration.get(mySpecialKey) === mySpecialValue)
+      sqlContext.runtimeConf.setHadoop(mySpecialKey, "yes yes yes")
+      assert(sessionState.hadoopConf.get(mySpecialKey) === "yes yes yes")
+      assert(sqlContext.runtimeConf.getHadoop(mySpecialKey) === "yes yes yes")
+      assert(sc.hadoopConfiguration.get(mySpecialKey) === mySpecialValue)
+    } finally {
+      sc.hadoopConfiguration.unset(mySpecialKey)
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/cfa64882/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index 581095d..0aab36a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
@@ -113,7 +113,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
 
     withTempPath { location =>
       val path = new Path(location.getCanonicalPath)
-      val conf = sparkContext.hadoopConfiguration
+      val conf = new Configuration(sqlContext.sessionState.hadoopConf)
       writeMetadata(parquetSchema, path, conf)
       readParquetFile(path.toString)(df => {
         val sparkTypes = df.schema.map(_.dataType)
@@ -250,7 +250,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
 
     withTempPath { location =>
       val path = new Path(location.getCanonicalPath)
-      val conf = sparkContext.hadoopConfiguration
+      val conf = new Configuration(sqlContext.sessionState.hadoopConf)
       writeMetadata(parquetSchema, path, conf)
       val errorMessage = intercept[Throwable] {
         sqlContext.read.parquet(path.toString).printSchema()
@@ -271,7 +271,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
 
     withTempPath { location =>
       val path = new Path(location.getCanonicalPath)
-      val conf = sparkContext.hadoopConfiguration
+      val conf = new Configuration(sqlContext.sessionState.hadoopConf)
       writeMetadata(parquetSchema, path, conf)
       val sparkTypes = sqlContext.read.parquet(path.toString).schema.map(_.dataType)
       assert(sparkTypes === expectedSparkTypes)
@@ -431,7 +431,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
     withTempPath { location =>
       val extraMetadata = Map(CatalystReadSupport.SPARK_METADATA_KEY -> sparkSchema.toString)
       val path = new Path(location.getCanonicalPath)
-      val conf = sparkContext.hadoopConfiguration
+      val conf = new Configuration(sqlContext.sessionState.hadoopConf)
       writeMetadata(parquetSchema, path, conf, extraMetadata)
 
       readParquetFile(path.toString) { df =>

http://git-wip-us.apache.org/repos/asf/spark/blob/cfa64882/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala
index 70c2a82..a164f4c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala
@@ -217,7 +217,7 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSQLContext {
       SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key -> "3",
       SQLConf.FILE_SINK_LOG_CLEANUP_DELAY.key -> "0") {
       withFileStreamSinkLog { sinkLog =>
-        val fs = sinkLog.metadataPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
+        val fs = sinkLog.metadataPath.getFileSystem(sqlContext.sessionState.hadoopConf)
 
         def listBatchFiles(): Set[String] = {
           fs.listStatus(sinkLog.metadataPath).map(_.getPath.getName).filter { fileName =>

http://git-wip-us.apache.org/repos/asf/spark/blob/cfa64882/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
index 1328142..22e011c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
@@ -82,7 +82,7 @@ class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext {
   }
 
   testQuietly("HDFSMetadataLog: fallback from FileContext to FileSystem") {
-    sqlContext.sparkContext.hadoopConfiguration.set(
+    sqlContext.sessionState.hadoopConf.set(
       s"fs.$scheme.impl",
       classOf[FakeFileSystem].getName)
     withTempDir { temp =>

http://git-wip-us.apache.org/repos/asf/spark/blob/cfa64882/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
index 5691105..fcfac35 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
@@ -88,7 +88,7 @@ private[sql] trait SQLTestUtils
    * The Hadoop configuration used by the active [[SQLContext]].
    */
   protected def hadoopConfiguration: Configuration = {
-    sparkContext.hadoopConfiguration
+    sqlContext.sessionState.hadoopConf
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/cfa64882/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala
index 431ac8e..d270775 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala
@@ -18,8 +18,8 @@
 package org.apache.spark.sql.test
 
 import org.apache.spark.{SparkConf, SparkContext}
-import org.apache.spark.sql.{SparkSession, SQLContext}
-import org.apache.spark.sql.internal.{SessionState, SQLConf}
+import org.apache.spark.sql.{RuntimeConfig, SparkSession, SQLContext}
+import org.apache.spark.sql.internal.{RuntimeConfigImpl, SessionState, SQLConf}
 
 /**
  * A special [[SQLContext]] prepared for testing.

http://git-wip-us.apache.org/repos/asf/spark/blob/cfa64882/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 13f29e0..edb87b9 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -544,7 +544,7 @@ private[hive] class MetaStoreFileCatalog(
   extends HDFSFileCatalog(ctx, Map.empty, paths, Some(partitionSpecFromHive.partitionColumns)) {
 
   override def getStatus(path: Path): Array[FileStatus] = {
-    val fs = path.getFileSystem(ctx.sparkContext.hadoopConfiguration)
+    val fs = path.getFileSystem(ctx.sessionState.hadoopConf)
     fs.listStatus(path)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/cfa64882/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
index 6457a90..bf0288c 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
@@ -223,6 +223,7 @@ private[hive] class HiveSessionState(ctx: SQLContext) extends SessionState(ctx)
     conf.getConf(HiveUtils.HIVE_THRIFT_SERVER_ASYNC)
   }
 
+  // TODO: why do we get this from SparkConf but not SQLConf?
   def hiveThriftServerSingleSession: Boolean = {
     ctx.sparkContext.conf.getBoolean(
       "spark.sql.hive.thriftServer.singleSession", defaultValue = false)

http://git-wip-us.apache.org/repos/asf/spark/blob/cfa64882/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
index 4250a87..1095f5f 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
@@ -57,7 +57,7 @@ private[sql] class DefaultSource
       files: Seq[FileStatus]): Option[StructType] = {
     OrcFileOperator.readSchema(
       files.map(_.getPath.toUri.toString),
-      Some(sqlContext.sparkContext.hadoopConfiguration)
+      Some(new Configuration(sqlContext.sessionState.hadoopConf))
     )
   }
 
@@ -115,7 +115,7 @@ private[sql] class DefaultSource
       requiredSchema: StructType,
       filters: Seq[Filter],
       options: Map[String, String]): (PartitionedFile) => Iterator[InternalRow] = {
-    val orcConf = new Configuration(sqlContext.sparkContext.hadoopConfiguration)
+    val orcConf = new Configuration(sqlContext.sessionState.hadoopConf)
 
     if (sqlContext.conf.orcFilterPushDown) {
       // Sets pushed predicates
@@ -278,7 +278,7 @@ private[orc] case class OrcTableScan(
   with HiveInspectors {
 
   def execute(): RDD[InternalRow] = {
-    val job = Job.getInstance(sqlContext.sparkContext.hadoopConfiguration)
+    val job = Job.getInstance(new Configuration(sqlContext.sessionState.hadoopConf))
     val conf = job.getConfiguration
 
     // Tries to push down filters if ORC filter push-down is enabled

http://git-wip-us.apache.org/repos/asf/spark/blob/cfa64882/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
index bf099e0..04b2494 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
@@ -34,7 +34,7 @@ import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe
 import org.apache.spark.{SparkConf, SparkContext}
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config.CATALOG_IMPLEMENTATION
-import org.apache.spark.sql.{SparkSession, SQLContext}
+import org.apache.spark.sql.{RuntimeConfig, SparkSession, SQLContext}
 import org.apache.spark.sql.catalyst.analysis._
 import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
 import org.apache.spark.sql.catalyst.expressions.ExpressionInfo
@@ -43,7 +43,7 @@ import org.apache.spark.sql.execution.QueryExecution
 import org.apache.spark.sql.execution.command.{CacheTableCommand, HiveNativeCommand}
 import org.apache.spark.sql.hive._
 import org.apache.spark.sql.hive.client.HiveClient
-import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.{RuntimeConfigImpl, SQLConf}
 import org.apache.spark.util.{ShutdownHookManager, Utils}
 
 // SPARK-3729: Test key required to check for initialization errors with config.

http://git-wip-us.apache.org/repos/asf/spark/blob/cfa64882/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index 68244cd..5965cdc 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -374,7 +374,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
       val expectedPath =
         sessionState.catalog.hiveDefaultTableFilePath(TableIdentifier("ctasJsonTable"))
       val filesystemPath = new Path(expectedPath)
-      val fs = filesystemPath.getFileSystem(sparkContext.hadoopConfiguration)
+      val fs = filesystemPath.getFileSystem(sqlContext.sessionState.hadoopConf)
       if (fs.exists(filesystemPath)) fs.delete(filesystemPath, true)
 
       // It is a managed table when we do not specify the location.

http://git-wip-us.apache.org/repos/asf/spark/blob/cfa64882/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala
index 014c100..8b4e4dc 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala
@@ -239,12 +239,12 @@ class HiveCommandSuite extends QueryTest with SQLTestUtils with TestHiveSingleto
       }
 
       // Unset default URI Scheme and Authority: throw exception
-      val originalFsName = hiveContext.sparkContext.hadoopConfiguration.get("fs.default.name")
-      hiveContext.sparkContext.hadoopConfiguration.unset("fs.default.name")
+      val originalFsName = hiveContext.sessionState.hadoopConf.get("fs.default.name")
+      hiveContext.sessionState.hadoopConf.unset("fs.default.name")
       intercept[AnalysisException] {
         sql(s"""LOAD DATA INPATH "$testData" INTO TABLE non_part_table""")
       }
-      hiveContext.sparkContext.hadoopConfiguration.set("fs.default.name", originalFsName)
+      hiveContext.sessionState.hadoopConf.set("fs.default.name", originalFsName)
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/cfa64882/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index 206d911..fd19fcb 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -36,7 +36,7 @@ class HiveDDLSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
     val expectedTablePath =
       hiveContext.sessionState.catalog.hiveDefaultTableFilePath(tableIdentifier)
     val filesystemPath = new Path(expectedTablePath)
-    val fs = filesystemPath.getFileSystem(sparkContext.hadoopConfiguration)
+    val fs = filesystemPath.getFileSystem(hiveContext.sessionState.hadoopConf)
     fs.exists(filesystemPath)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/cfa64882/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala
index 2345c1c..f11c055 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcHadoopFsRelationSuite.scala
@@ -94,7 +94,7 @@ class OrcHadoopFsRelationSuite extends HadoopFsRelationTest {
         .orc(path)
 
       // Check if this is compressed as ZLIB.
-      val conf = sparkContext.hadoopConfiguration
+      val conf = sqlContext.sessionState.hadoopConf
       val fs = FileSystem.getLocal(conf)
       val maybeOrcFile = new File(path).listFiles().find(_.getName.endsWith(".zlib.orc"))
       assert(maybeOrcFile.isDefined)

http://git-wip-us.apache.org/repos/asf/spark/blob/cfa64882/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
index 4fb78ac..0a0cdf6 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
@@ -181,7 +181,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest {
   // Following codec is supported in hive-0.13.1, ignore it now
   ignore("Other compression options for writing to an ORC file - 0.13.1 and above") {
     val data = (1 to 100).map(i => (i, s"val_$i"))
-    val conf = sparkContext.hadoopConfiguration
+    val conf = sqlContext.sessionState.hadoopConf
 
     conf.set(ConfVars.HIVE_ORC_DEFAULT_COMPRESS.varname, "SNAPPY")
     withOrcFile(data) { file =>

http://git-wip-us.apache.org/repos/asf/spark/blob/cfa64882/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
index dad4f87..eced8ed 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
@@ -74,7 +74,7 @@ class SimpleTextSource extends FileFormat with DataSourceRegister {
       inputAttributes.find(_.name == field.name)
     }
 
-    val conf = new Configuration(sqlContext.sparkContext.hadoopConfiguration)
+    val conf = new Configuration(sqlContext.sessionState.hadoopConf)
     val broadcastedConf =
       sqlContext.sparkContext.broadcast(new SerializableConfiguration(conf))
 

http://git-wip-us.apache.org/repos/asf/spark/blob/cfa64882/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
index 5378336..3b16468 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
@@ -209,7 +209,7 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes
       testDF.write.mode(SaveMode.Ignore).format(dataSourceName).save(file.getCanonicalPath)
 
       val path = new Path(file.getCanonicalPath)
-      val fs = path.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
+      val fs = path.getFileSystem(sqlContext.sessionState.hadoopConf)
       assert(fs.listStatus(path).isEmpty)
     }
   }
@@ -510,7 +510,7 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes
         s"${file.getCanonicalFile}/p1=2/p2=bar"
       ).map { p =>
         val path = new Path(p)
-        val fs = path.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
+        val fs = path.getFileSystem(sqlContext.sessionState.hadoopConf)
         path.makeQualified(fs.getUri, fs.getWorkingDirectory).toString
       }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org