You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2015/01/13 22:31:58 UTC

spark git commit: [SPARK-5168] Make SQLConf a field rather than mixin in SQLContext

Repository: spark
Updated Branches:
  refs/heads/master 6463e0b9e -> 14e3f114e


[SPARK-5168] Make SQLConf a field rather than mixin in SQLContext

This change should be binary and source backward compatible since we didn't change any user facing APIs.

Author: Reynold Xin <rx...@databricks.com>

Closes #3965 from rxin/SPARK-5168-sqlconf and squashes the following commits:

42eec09 [Reynold Xin] Fix default conf value.
0ef86cc [Reynold Xin] Fix constructor ordering.
4d7f910 [Reynold Xin] Properly override config.
ccc8e6a [Reynold Xin] [SPARK-5168] Make SQLConf a field rather than mixin in SQLContext


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/14e3f114
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/14e3f114
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/14e3f114

Branch: refs/heads/master
Commit: 14e3f114efb906937b2d7b7ac04484b2814a3b48
Parents: 6463e0b
Author: Reynold Xin <rx...@databricks.com>
Authored: Tue Jan 13 13:30:35 2015 -0800
Committer: Michael Armbrust <mi...@databricks.com>
Committed: Tue Jan 13 13:30:35 2015 -0800

----------------------------------------------------------------------
 .../org/apache/spark/sql/CacheManager.scala     |  4 +-
 .../scala/org/apache/spark/sql/SQLConf.scala    |  2 +-
 .../scala/org/apache/spark/sql/SQLContext.scala | 40 ++++++++++++++++----
 .../spark/sql/api/java/JavaSQLContext.scala     |  6 +--
 .../columnar/InMemoryColumnarTableScan.scala    |  4 +-
 .../apache/spark/sql/execution/Exchange.scala   |  2 +-
 .../spark/sql/execution/ExistingRDD.scala       |  4 +-
 .../apache/spark/sql/execution/SparkPlan.scala  |  2 +-
 .../spark/sql/execution/SparkStrategies.scala   | 18 ++++-----
 .../apache/spark/sql/execution/commands.scala   |  2 +-
 .../sql/execution/joins/BroadcastHashJoin.scala |  2 +-
 .../apache/spark/sql/json/JSONRelation.scala    |  4 +-
 .../spark/sql/parquet/ParquetRelation.scala     |  7 ++--
 .../apache/spark/sql/parquet/ParquetTest.scala  |  2 +-
 .../apache/spark/sql/parquet/newParquet.scala   |  4 +-
 .../apache/spark/sql/sources/interfaces.scala   |  2 +-
 .../apache/spark/sql/test/TestSQLContext.scala  |  5 ++-
 .../scala/org/apache/spark/sql/JoinSuite.scala  |  2 +-
 .../org/apache/spark/sql/SQLConfSuite.scala     | 10 ++---
 .../org/apache/spark/sql/SQLQuerySuite.scala    | 10 ++---
 .../columnar/InMemoryColumnarQuerySuite.scala   |  3 +-
 .../columnar/PartitionBatchPruningSuite.scala   |  4 +-
 .../spark/sql/execution/PlannerSuite.scala      |  4 +-
 .../org/apache/spark/sql/json/JsonSuite.scala   |  2 +-
 .../spark/sql/parquet/ParquetIOSuite.scala      |  4 +-
 .../spark/sql/parquet/ParquetQuerySuite.scala   | 14 +++----
 .../hive/execution/HiveCompatibilitySuite.scala |  4 +-
 .../org/apache/spark/sql/hive/HiveContext.scala | 11 +++---
 .../spark/sql/hive/HiveMetastoreCatalog.scala   |  2 +-
 .../org/apache/spark/sql/hive/TestHive.scala    |  6 ++-
 .../sql/hive/api/java/JavaHiveContext.scala     |  6 +--
 .../apache/spark/sql/hive/StatisticsSuite.scala | 20 +++++-----
 .../sql/hive/execution/HiveQuerySuite.scala     |  4 +-
 33 files changed, 124 insertions(+), 92 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/14e3f114/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala
index 3c9439b..e715d94 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala
@@ -91,8 +91,8 @@ private[sql] trait CacheManager {
         CachedData(
           planToCache,
           InMemoryRelation(
-            useCompression,
-            columnBatchSize,
+            conf.useCompression,
+            conf.columnBatchSize,
             storageLevel,
             query.queryExecution.executedPlan,
             tableName))

http://git-wip-us.apache.org/repos/asf/spark/blob/14e3f114/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
index f5bf935..206d16f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
@@ -61,7 +61,7 @@ private[spark] object SQLConf {
  *
  * SQLConf is thread-safe (internally synchronized, so safe to be used in multiple threads).
  */
-private[sql] trait SQLConf {
+private[sql] class SQLConf {
   import SQLConf._
 
   /** Only low degree of contention is expected for conf, thus NOT using ConcurrentHashMap. */

http://git-wip-us.apache.org/repos/asf/spark/blob/14e3f114/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index e7021cc..d8efce0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -17,6 +17,9 @@
 
 package org.apache.spark.sql
 
+import java.util.Properties
+
+import scala.collection.immutable
 import scala.language.implicitConversions
 import scala.reflect.runtime.universe.TypeTag
 
@@ -49,7 +52,6 @@ import org.apache.spark.sql.sources.{DataSourceStrategy, BaseRelation, DDLParser
 @AlphaComponent
 class SQLContext(@transient val sparkContext: SparkContext)
   extends org.apache.spark.Logging
-  with SQLConf
   with CacheManager
   with ExpressionConversions
   with UDFRegistration
@@ -57,6 +59,30 @@ class SQLContext(@transient val sparkContext: SparkContext)
 
   self =>
 
+  // Note that this is a lazy val so we can override the default value in subclasses.
+  private[sql] lazy val conf: SQLConf = new SQLConf
+
+  /** Set Spark SQL configuration properties. */
+  def setConf(props: Properties): Unit = conf.setConf(props)
+
+  /** Set the given Spark SQL configuration property. */
+  def setConf(key: String, value: String): Unit = conf.setConf(key, value)
+
+  /** Return the value of Spark SQL configuration property for the given key. */
+  def getConf(key: String): String = conf.getConf(key)
+
+  /**
+   * Return the value of Spark SQL configuration property for the given key. If the key is not set
+   * yet, return `defaultValue`.
+   */
+  def getConf(key: String, defaultValue: String): String = conf.getConf(key, defaultValue)
+
+  /**
+   * Return all the configuration properties that have been set (i.e. not the default).
+   * This creates a new copy of the config properties in the form of a Map.
+   */
+  def getAllConfs: immutable.Map[String, String] = conf.getAllConfs
+
   @transient
   protected[sql] lazy val catalog: Catalog = new SimpleCatalog(true)
 
@@ -212,7 +238,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
    */
   @Experimental
   def jsonRDD(json: RDD[String], schema: StructType): SchemaRDD = {
-    val columnNameOfCorruptJsonRecord = columnNameOfCorruptRecord
+    val columnNameOfCorruptJsonRecord = conf.columnNameOfCorruptRecord
     val appliedSchema =
       Option(schema).getOrElse(
         JsonRDD.nullTypeToStringType(
@@ -226,7 +252,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
    */
   @Experimental
   def jsonRDD(json: RDD[String], samplingRatio: Double): SchemaRDD = {
-    val columnNameOfCorruptJsonRecord = columnNameOfCorruptRecord
+    val columnNameOfCorruptJsonRecord = conf.columnNameOfCorruptRecord
     val appliedSchema =
       JsonRDD.nullTypeToStringType(
         JsonRDD.inferSchema(json, samplingRatio, columnNameOfCorruptJsonRecord))
@@ -299,10 +325,10 @@ class SQLContext(@transient val sparkContext: SparkContext)
    * @group userf
    */
   def sql(sqlText: String): SchemaRDD = {
-    if (dialect == "sql") {
+    if (conf.dialect == "sql") {
       new SchemaRDD(this, parseSql(sqlText))
     } else {
-      sys.error(s"Unsupported SQL dialect: $dialect")
+      sys.error(s"Unsupported SQL dialect: ${conf.dialect}")
     }
   }
 
@@ -323,9 +349,9 @@ class SQLContext(@transient val sparkContext: SparkContext)
 
     val sqlContext: SQLContext = self
 
-    def codegenEnabled = self.codegenEnabled
+    def codegenEnabled = self.conf.codegenEnabled
 
-    def numPartitions = self.numShufflePartitions
+    def numPartitions = self.conf.numShufflePartitions
 
     def strategies: Seq[Strategy] =
       extraStrategies ++ (

http://git-wip-us.apache.org/repos/asf/spark/blob/14e3f114/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala
index 8884204..7f868cd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala
@@ -52,7 +52,7 @@ class JavaSQLContext(val sqlContext: SQLContext) extends UDFRegistration {
    * @group userf
    */
   def sql(sqlText: String): JavaSchemaRDD = {
-    if (sqlContext.dialect == "sql") {
+    if (sqlContext.conf.dialect == "sql") {
       new JavaSchemaRDD(sqlContext, sqlContext.parseSql(sqlText))
     } else {
       sys.error(s"Unsupported SQL dialect: $sqlContext.dialect")
@@ -164,7 +164,7 @@ class JavaSQLContext(val sqlContext: SQLContext) extends UDFRegistration {
    * It goes through the entire dataset once to determine the schema.
    */
   def jsonRDD(json: JavaRDD[String]): JavaSchemaRDD = {
-    val columnNameOfCorruptJsonRecord = sqlContext.columnNameOfCorruptRecord
+    val columnNameOfCorruptJsonRecord = sqlContext.conf.columnNameOfCorruptRecord
     val appliedScalaSchema =
       JsonRDD.nullTypeToStringType(
         JsonRDD.inferSchema(json.rdd, 1.0, columnNameOfCorruptJsonRecord))
@@ -182,7 +182,7 @@ class JavaSQLContext(val sqlContext: SQLContext) extends UDFRegistration {
    */
   @Experimental
   def jsonRDD(json: JavaRDD[String], schema: StructType): JavaSchemaRDD = {
-    val columnNameOfCorruptJsonRecord = sqlContext.columnNameOfCorruptRecord
+    val columnNameOfCorruptJsonRecord = sqlContext.conf.columnNameOfCorruptRecord
     val appliedScalaSchema =
       Option(asScalaDataType(schema)).getOrElse(
         JsonRDD.nullTypeToStringType(

http://git-wip-us.apache.org/repos/asf/spark/blob/14e3f114/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
index 1e43248..065fae3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
@@ -82,7 +82,7 @@ private[sql] case class InMemoryRelation(
     if (batchStats.value.isEmpty) {
       // Underlying columnar RDD hasn't been materialized, no useful statistics information
       // available, return the default statistics.
-      Statistics(sizeInBytes = child.sqlContext.defaultSizeInBytes)
+      Statistics(sizeInBytes = child.sqlContext.conf.defaultSizeInBytes)
     } else {
       // Underlying columnar RDD has been materialized, required information has also been collected
       // via the `batchStats` accumulator, compute the final statistics, and update `_statistics`.
@@ -233,7 +233,7 @@ private[sql] case class InMemoryColumnarTableScan(
   val readPartitions = sparkContext.accumulator(0)
   val readBatches = sparkContext.accumulator(0)
 
-  private val inMemoryPartitionPruningEnabled = sqlContext.inMemoryPartitionPruning
+  private val inMemoryPartitionPruningEnabled = sqlContext.conf.inMemoryPartitionPruning
 
   override def execute() = {
     readPartitions.setValue(0)

http://git-wip-us.apache.org/repos/asf/spark/blob/14e3f114/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
index d7c811c..7c0b72a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
@@ -123,7 +123,7 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una
  */
 private[sql] case class AddExchange(sqlContext: SQLContext) extends Rule[SparkPlan] {
   // TODO: Determine the number of partitions.
-  def numPartitions = sqlContext.numShufflePartitions
+  def numPartitions = sqlContext.conf.numShufflePartitions
 
   def apply(plan: SparkPlan): SparkPlan = plan.transformUp {
     case operator: SparkPlan =>

http://git-wip-us.apache.org/repos/asf/spark/blob/14e3f114/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
index d2d8cb1..069e950 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
@@ -69,7 +69,7 @@ case class LogicalRDD(output: Seq[Attribute], rdd: RDD[Row])(sqlContext: SQLCont
   @transient override lazy val statistics = Statistics(
     // TODO: Instead of returning a default value here, find a way to return a meaningful size
     // estimate for RDDs. See PR 1238 for more discussions.
-    sizeInBytes = BigInt(sqlContext.defaultSizeInBytes)
+    sizeInBytes = BigInt(sqlContext.conf.defaultSizeInBytes)
   )
 }
 
@@ -106,6 +106,6 @@ case class SparkLogicalPlan(alreadyPlanned: SparkPlan)(@transient sqlContext: SQ
   @transient override lazy val statistics = Statistics(
     // TODO: Instead of returning a default value here, find a way to return a meaningful size
     // estimate for RDDs. See PR 1238 for more discussions.
-    sizeInBytes = BigInt(sqlContext.defaultSizeInBytes)
+    sizeInBytes = BigInt(sqlContext.conf.defaultSizeInBytes)
   )
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/14e3f114/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
index 017c78d..6fecd1f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
@@ -51,7 +51,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
   // sqlContext will be null when we are being deserialized on the slaves.  In this instance
   // the value of codegenEnabled will be set by the desserializer after the constructor has run.
   val codegenEnabled: Boolean = if (sqlContext != null) {
-    sqlContext.codegenEnabled
+    sqlContext.conf.codegenEnabled
   } else {
     false
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/14e3f114/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index d91b1fb..0652d2f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -35,8 +35,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
   object LeftSemiJoin extends Strategy with PredicateHelper {
     def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
       case ExtractEquiJoinKeys(LeftSemi, leftKeys, rightKeys, condition, left, right)
-        if sqlContext.autoBroadcastJoinThreshold > 0 &&
-          right.statistics.sizeInBytes <= sqlContext.autoBroadcastJoinThreshold =>
+        if sqlContext.conf.autoBroadcastJoinThreshold > 0 &&
+          right.statistics.sizeInBytes <= sqlContext.conf.autoBroadcastJoinThreshold =>
         val semiJoin = joins.BroadcastLeftSemiJoinHash(
           leftKeys, rightKeys, planLater(left), planLater(right))
         condition.map(Filter(_, semiJoin)).getOrElse(semiJoin) :: Nil
@@ -81,13 +81,13 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
 
     def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
       case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, right)
-        if sqlContext.autoBroadcastJoinThreshold > 0 &&
-           right.statistics.sizeInBytes <= sqlContext.autoBroadcastJoinThreshold =>
+        if sqlContext.conf.autoBroadcastJoinThreshold > 0 &&
+           right.statistics.sizeInBytes <= sqlContext.conf.autoBroadcastJoinThreshold =>
         makeBroadcastHashJoin(leftKeys, rightKeys, left, right, condition, joins.BuildRight)
 
       case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, right)
-        if sqlContext.autoBroadcastJoinThreshold > 0 &&
-           left.statistics.sizeInBytes <= sqlContext.autoBroadcastJoinThreshold =>
+        if sqlContext.conf.autoBroadcastJoinThreshold > 0 &&
+           left.statistics.sizeInBytes <= sqlContext.conf.autoBroadcastJoinThreshold =>
           makeBroadcastHashJoin(leftKeys, rightKeys, left, right, condition, joins.BuildLeft)
 
       case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, right) =>
@@ -215,7 +215,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
         InsertIntoParquetTable(table, planLater(child), overwrite) :: Nil
       case PhysicalOperation(projectList, filters: Seq[Expression], relation: ParquetRelation) =>
         val prunePushedDownFilters =
-          if (sqlContext.parquetFilterPushDown) {
+          if (sqlContext.conf.parquetFilterPushDown) {
             (predicates: Seq[Expression]) => {
               // Note: filters cannot be pushed down to Parquet if they contain more complex
               // expressions than simple "Attribute cmp Literal" comparisons. Here we remove all
@@ -237,7 +237,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
           ParquetTableScan(
             _,
             relation,
-            if (sqlContext.parquetFilterPushDown) filters else Nil)) :: Nil
+            if (sqlContext.conf.parquetFilterPushDown) filters else Nil)) :: Nil
 
       case _ => Nil
     }
@@ -270,7 +270,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
         // This sort only sorts tuples within a partition. Its requiredDistribution will be
         // an UnspecifiedDistribution.
         execution.Sort(sortExprs, global = false, planLater(child)) :: Nil
-      case logical.Sort(sortExprs, global, child) if sqlContext.externalSortEnabled =>
+      case logical.Sort(sortExprs, global, child) if sqlContext.conf.externalSortEnabled =>
         execution.ExternalSort(sortExprs, global, planLater(child)):: Nil
       case logical.Sort(sortExprs, global, child) =>
         execution.Sort(sortExprs, global, planLater(child)):: Nil

http://git-wip-us.apache.org/repos/asf/spark/blob/14e3f114/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
index df8e616..af6b07b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
@@ -94,7 +94,7 @@ case class SetCommand(
       logWarning(
         s"Property ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS} is deprecated, " +
           s"showing ${SQLConf.SHUFFLE_PARTITIONS} instead.")
-      Seq(Row(s"${SQLConf.SHUFFLE_PARTITIONS}=${sqlContext.numShufflePartitions}"))
+      Seq(Row(s"${SQLConf.SHUFFLE_PARTITIONS}=${sqlContext.conf.numShufflePartitions}"))
 
     // Queries a single property.
     case Some((key, None)) =>

http://git-wip-us.apache.org/repos/asf/spark/blob/14e3f114/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
index fbe1d06..2dd22c0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
@@ -43,7 +43,7 @@ case class BroadcastHashJoin(
   extends BinaryNode with HashJoin {
 
   val timeout = {
-    val timeoutValue = sqlContext.broadcastTimeout
+    val timeoutValue = sqlContext.conf.broadcastTimeout
     if (timeoutValue < 0) {
       Duration.Inf
     } else {

http://git-wip-us.apache.org/repos/asf/spark/blob/14e3f114/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala
index a9a6696..f5c0222 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala
@@ -59,8 +59,8 @@ private[sql] case class JSONRelation(
       JsonRDD.inferSchema(
         baseRDD,
         samplingRatio,
-        sqlContext.columnNameOfCorruptRecord)))
+        sqlContext.conf.columnNameOfCorruptRecord)))
 
   override def buildScan() =
-    JsonRDD.jsonStringToRow(baseRDD, schema, sqlContext.columnNameOfCorruptRecord)
+    JsonRDD.jsonStringToRow(baseRDD, schema, sqlContext.conf.columnNameOfCorruptRecord)
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/14e3f114/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala
index 2835dc3..cde5160 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala
@@ -65,7 +65,7 @@ private[sql] case class ParquetRelation(
     ParquetTypesConverter.readSchemaFromFile(
       new Path(path.split(",").head),
       conf,
-      sqlContext.isParquetBinaryAsString)
+      sqlContext.conf.isParquetBinaryAsString)
 
   lazy val attributeMap = AttributeMap(output.map(o => o -> o))
 
@@ -80,7 +80,7 @@ private[sql] case class ParquetRelation(
   }
 
   // TODO: Use data from the footers.
-  override lazy val statistics = Statistics(sizeInBytes = sqlContext.defaultSizeInBytes)
+  override lazy val statistics = Statistics(sizeInBytes = sqlContext.conf.defaultSizeInBytes)
 }
 
 private[sql] object ParquetRelation {
@@ -163,7 +163,8 @@ private[sql] object ParquetRelation {
                   sqlContext: SQLContext): ParquetRelation = {
     val path = checkPath(pathString, allowExisting, conf)
     conf.set(ParquetOutputFormat.COMPRESSION, shortParquetCompressionCodecNames.getOrElse(
-      sqlContext.parquetCompressionCodec.toUpperCase, CompressionCodecName.UNCOMPRESSED).name())
+      sqlContext.conf.parquetCompressionCodec.toUpperCase, CompressionCodecName.UNCOMPRESSED)
+      .name())
     ParquetRelation.enableLogForwarding()
     ParquetTypesConverter.writeMetaData(attributes, path, conf)
     new ParquetRelation(path.toString, Some(conf), sqlContext) {

http://git-wip-us.apache.org/repos/asf/spark/blob/14e3f114/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTest.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTest.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTest.scala
index b4d4890..02ce1b3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTest.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTest.scala
@@ -54,7 +54,7 @@ trait ParquetTest {
     try f finally {
       keys.zip(currentValues).foreach {
         case (key, Some(value)) => setConf(key, value)
-        case (key, None) => unsetConf(key)
+        case (key, None) => conf.unsetConf(key)
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/14e3f114/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
index 2e0c6c5..55a2728 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
@@ -137,7 +137,7 @@ case class ParquetRelation2(path: String)(@transient val sqlContext: SQLContext)
     ParquetTypesConverter.readSchemaFromFile(
       partitions.head.files.head.getPath,
       Some(sparkContext.hadoopConfiguration),
-      sqlContext.isParquetBinaryAsString))
+      sqlContext.conf.isParquetBinaryAsString))
 
   val dataIncludesKey =
     partitionKeys.headOption.map(dataSchema.fieldNames.contains(_)).getOrElse(true)
@@ -198,7 +198,7 @@ case class ParquetRelation2(path: String)(@transient val sqlContext: SQLContext)
     predicates
       .reduceOption(And)
       .flatMap(ParquetFilters.createFilter)
-      .filter(_ => sqlContext.parquetFilterPushDown)
+      .filter(_ => sqlContext.conf.parquetFilterPushDown)
       .foreach(ParquetInputFormat.setFilterPredicate(jobConf, _))
 
     def percentRead = selectedPartitions.size.toDouble / partitions.size.toDouble * 100

http://git-wip-us.apache.org/repos/asf/spark/blob/14e3f114/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index 2a7be23..7f5564b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -99,7 +99,7 @@ abstract class BaseRelation {
    * large to broadcast.  This method will be called multiple times during query planning
    * and thus should not perform expensive operations for each invocation.
    */
-  def sizeInBytes = sqlContext.defaultSizeInBytes
+  def sizeInBytes = sqlContext.conf.defaultSizeInBytes
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/14e3f114/sql/core/src/main/scala/org/apache/spark/sql/test/TestSQLContext.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/test/TestSQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/test/TestSQLContext.scala
index 6bb81c7..8c80be1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/test/TestSQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/test/TestSQLContext.scala
@@ -29,6 +29,7 @@ object TestSQLContext
       new SparkConf().set("spark.sql.testkey", "true"))) {
 
   /** Fewer partitions to speed up testing. */
-  override private[spark] def numShufflePartitions: Int =
-    getConf(SQLConf.SHUFFLE_PARTITIONS, "5").toInt
+  private[sql] override lazy val conf: SQLConf = new SQLConf {
+    override def numShufflePartitions: Int = this.getConf(SQLConf.SHUFFLE_PARTITIONS, "5").toInt
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/14e3f114/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
index c7e1363..e5ab16f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
@@ -387,7 +387,7 @@ class JoinSuite extends QueryTest with BeforeAndAfterEach {
   test("broadcasted left semi join operator selection") {
     clearCache()
     sql("CACHE TABLE testData")
-    val tmp = autoBroadcastJoinThreshold
+    val tmp = conf.autoBroadcastJoinThreshold
 
     sql(s"SET ${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD}=1000000000")
     Seq(

http://git-wip-us.apache.org/repos/asf/spark/blob/14e3f114/sql/core/src/test/scala/org/apache/spark/sql/SQLConfSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLConfSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLConfSuite.scala
index 60701f0..bf73d0c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLConfSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLConfSuite.scala
@@ -37,7 +37,7 @@ class SQLConfSuite extends QueryTest with FunSuiteLike {
   }
 
   test("programmatic ways of basic setting and getting") {
-    clear()
+    conf.clear()
     assert(getAllConfs.size === 0)
 
     setConf(testKey, testVal)
@@ -51,11 +51,11 @@ class SQLConfSuite extends QueryTest with FunSuiteLike {
     assert(TestSQLContext.getConf(testKey, testVal + "_") == testVal)
     assert(TestSQLContext.getAllConfs.contains(testKey))
 
-    clear()
+    conf.clear()
   }
 
   test("parse SQL set commands") {
-    clear()
+    conf.clear()
     sql(s"set $testKey=$testVal")
     assert(getConf(testKey, testVal + "_") == testVal)
     assert(TestSQLContext.getConf(testKey, testVal + "_") == testVal)
@@ -73,11 +73,11 @@ class SQLConfSuite extends QueryTest with FunSuiteLike {
     sql(s"set $key=")
     assert(getConf(key, "0") == "")
 
-    clear()
+    conf.clear()
   }
 
   test("deprecated property") {
-    clear()
+    conf.clear()
     sql(s"set ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS}=10")
     assert(getConf(SQLConf.SHUFFLE_PARTITIONS) == "10")
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/14e3f114/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index d9de568..bc72daf 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -79,7 +79,7 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll {
   }
 
   test("aggregation with codegen") {
-    val originalValue = codegenEnabled
+    val originalValue = conf.codegenEnabled
     setConf(SQLConf.CODEGEN_ENABLED, "true")
     sql("SELECT key FROM testData GROUP BY key").collect()
     setConf(SQLConf.CODEGEN_ENABLED, originalValue.toString)
@@ -245,14 +245,14 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll {
   }
 
   test("sorting") {
-    val before = externalSortEnabled
+    val before = conf.externalSortEnabled
     setConf(SQLConf.EXTERNAL_SORT, "false")
     sortTest()
     setConf(SQLConf.EXTERNAL_SORT, before.toString)
   }
 
   test("external sorting") {
-    val before = externalSortEnabled
+    val before = conf.externalSortEnabled
     setConf(SQLConf.EXTERNAL_SORT, "true")
     sortTest()
     setConf(SQLConf.EXTERNAL_SORT, before.toString)
@@ -600,7 +600,7 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll {
   }
 
   test("SET commands semantics using sql()") {
-    clear()
+    conf.clear()
     val testKey = "test.key.0"
     val testVal = "test.val.0"
     val nonexistentKey = "nonexistent"
@@ -632,7 +632,7 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll {
       sql(s"SET $nonexistentKey"),
       Seq(Seq(s"$nonexistentKey=<undefined>"))
     )
-    clear()
+    conf.clear()
   }
 
   test("apply schema") {

http://git-wip-us.apache.org/repos/asf/spark/blob/14e3f114/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala
index fc95dcc..d94729b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala
@@ -39,7 +39,8 @@ class InMemoryColumnarQuerySuite extends QueryTest {
     sparkContext.parallelize(1 to 10).map(i => TestData(i, i.toString)).registerTempTable("sizeTst")
     cacheTable("sizeTst")
     assert(
-      table("sizeTst").queryExecution.logical.statistics.sizeInBytes > autoBroadcastJoinThreshold)
+      table("sizeTst").queryExecution.logical.statistics.sizeInBytes >
+        conf.autoBroadcastJoinThreshold)
   }
 
   test("projection") {

http://git-wip-us.apache.org/repos/asf/spark/blob/14e3f114/sql/core/src/test/scala/org/apache/spark/sql/columnar/PartitionBatchPruningSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/PartitionBatchPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/PartitionBatchPruningSuite.scala
index 1915c25..592cafb 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/PartitionBatchPruningSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/PartitionBatchPruningSuite.scala
@@ -23,8 +23,8 @@ import org.apache.spark.sql._
 import org.apache.spark.sql.test.TestSQLContext._
 
 class PartitionBatchPruningSuite extends FunSuite with BeforeAndAfterAll with BeforeAndAfter {
-  val originalColumnBatchSize = columnBatchSize
-  val originalInMemoryPartitionPruning = inMemoryPartitionPruning
+  val originalColumnBatchSize = conf.columnBatchSize
+  val originalInMemoryPartitionPruning = conf.inMemoryPartitionPruning
 
   override protected def beforeAll(): Unit = {
     // Make a table with 5 partitions, 2 batches per partition, 10 elements per batch

http://git-wip-us.apache.org/repos/asf/spark/blob/14e3f114/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
index a5af71a..c5b6fce 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
@@ -60,7 +60,7 @@ class PlannerSuite extends FunSuite {
   }
 
   test("sizeInBytes estimation of limit operator for broadcast hash join optimization") {
-    val origThreshold = autoBroadcastJoinThreshold
+    val origThreshold = conf.autoBroadcastJoinThreshold
     setConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD, 81920.toString)
 
     // Using a threshold that is definitely larger than the small testing table (b) below
@@ -78,7 +78,7 @@ class PlannerSuite extends FunSuite {
   }
 
   test("InMemoryRelation statistics propagation") {
-    val origThreshold = autoBroadcastJoinThreshold
+    val origThreshold = conf.autoBroadcastJoinThreshold
     setConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD, 81920.toString)
 
     testData.limit(3).registerTempTable("tiny")

http://git-wip-us.apache.org/repos/asf/spark/blob/14e3f114/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
index 8dce337..b09f1ac 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
@@ -713,7 +713,7 @@ class JsonSuite extends QueryTest {
 
   test("Corrupt records") {
     // Test if we can query corrupt records.
-    val oldColumnNameOfCorruptRecord = TestSQLContext.columnNameOfCorruptRecord
+    val oldColumnNameOfCorruptRecord = TestSQLContext.conf.columnNameOfCorruptRecord
     TestSQLContext.setConf(SQLConf.COLUMN_NAME_OF_CORRUPT_RECORD, "_unparsed")
 
     val jsonSchemaRDD = jsonRDD(corruptRecords)

http://git-wip-us.apache.org/repos/asf/spark/blob/14e3f114/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
index 10a0147..6ac67fc 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
@@ -213,7 +213,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest {
     def checkCompressionCodec(codec: CompressionCodecName): Unit = {
       withSQLConf(SQLConf.PARQUET_COMPRESSION -> codec.name()) {
         withParquetFile(data) { path =>
-          assertResult(parquetCompressionCodec.toUpperCase) {
+          assertResult(conf.parquetCompressionCodec.toUpperCase) {
             compressionCodecFor(path)
           }
         }
@@ -221,7 +221,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest {
     }
 
     // Checks default compression codec
-    checkCompressionCodec(CompressionCodecName.fromConf(parquetCompressionCodec))
+    checkCompressionCodec(CompressionCodecName.fromConf(conf.parquetCompressionCodec))
 
     checkCompressionCodec(CompressionCodecName.UNCOMPRESSED)
     checkCompressionCodec(CompressionCodecName.GZIP)

http://git-wip-us.apache.org/repos/asf/spark/blob/14e3f114/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
index a5fe2e8..0a92336 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
@@ -88,7 +88,7 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
   TestData // Load test data tables.
 
   private var testRDD: SchemaRDD = null
-  private val originalParquetFilterPushdownEnabled = TestSQLContext.parquetFilterPushDown
+  private val originalParquetFilterPushdownEnabled = TestSQLContext.conf.parquetFilterPushDown
 
   override def beforeAll() {
     ParquetTestData.writeFile()
@@ -144,7 +144,7 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
   }
 
   ignore("Treat binary as string") {
-    val oldIsParquetBinaryAsString = TestSQLContext.isParquetBinaryAsString
+    val oldIsParquetBinaryAsString = TestSQLContext.conf.isParquetBinaryAsString
 
     // Create the test file.
     val file = getTempFilePath("parquet")
@@ -174,7 +174,7 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
   }
 
   test("Compression options for writing to a Parquetfile") {
-    val defaultParquetCompressionCodec = TestSQLContext.parquetCompressionCodec
+    val defaultParquetCompressionCodec = TestSQLContext.conf.parquetCompressionCodec
     import scala.collection.JavaConversions._
 
     val file = getTempFilePath("parquet")
@@ -186,7 +186,7 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
     rdd.saveAsParquetFile(path)
     var actualCodec = ParquetTypesConverter.readMetaData(new Path(path), Some(TestSQLContext.sparkContext.hadoopConfiguration))
       .getBlocks.flatMap(block => block.getColumns).map(column => column.getCodec.name()).distinct
-    assert(actualCodec === TestSQLContext.parquetCompressionCodec.toUpperCase :: Nil)
+    assert(actualCodec === TestSQLContext.conf.parquetCompressionCodec.toUpperCase :: Nil)
 
     parquetFile(path).registerTempTable("tmp")
     checkAnswer(
@@ -202,7 +202,7 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
     rdd.saveAsParquetFile(path)
     actualCodec = ParquetTypesConverter.readMetaData(new Path(path), Some(TestSQLContext.sparkContext.hadoopConfiguration))
       .getBlocks.flatMap(block => block.getColumns).map(column => column.getCodec.name()).distinct
-    assert(actualCodec === TestSQLContext.parquetCompressionCodec.toUpperCase :: Nil)
+    assert(actualCodec === TestSQLContext.conf.parquetCompressionCodec.toUpperCase :: Nil)
 
     parquetFile(path).registerTempTable("tmp")
     checkAnswer(
@@ -234,7 +234,7 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
     rdd.saveAsParquetFile(path)
     actualCodec = ParquetTypesConverter.readMetaData(new Path(path), Some(TestSQLContext.sparkContext.hadoopConfiguration))
       .getBlocks.flatMap(block => block.getColumns).map(column => column.getCodec.name()).distinct
-    assert(actualCodec === TestSQLContext.parquetCompressionCodec.toUpperCase :: Nil)
+    assert(actualCodec === TestSQLContext.conf.parquetCompressionCodec.toUpperCase :: Nil)
 
     parquetFile(path).registerTempTable("tmp")
     checkAnswer(
@@ -250,7 +250,7 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
     rdd.saveAsParquetFile(path)
     actualCodec = ParquetTypesConverter.readMetaData(new Path(path), Some(TestSQLContext.sparkContext.hadoopConfiguration))
       .getBlocks.flatMap(block => block.getColumns).map(column => column.getCodec.name()).distinct
-    assert(actualCodec === TestSQLContext.parquetCompressionCodec.toUpperCase :: Nil)
+    assert(actualCodec === TestSQLContext.conf.parquetCompressionCodec.toUpperCase :: Nil)
 
     parquetFile(path).registerTempTable("tmp")
     checkAnswer(

http://git-wip-us.apache.org/repos/asf/spark/blob/14e3f114/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
index 23283fd..0d93462 100644
--- a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
+++ b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
@@ -36,8 +36,8 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
 
   private val originalTimeZone = TimeZone.getDefault
   private val originalLocale = Locale.getDefault
-  private val originalColumnBatchSize = TestHive.columnBatchSize
-  private val originalInMemoryPartitionPruning = TestHive.inMemoryPartitionPruning
+  private val originalColumnBatchSize = TestHive.conf.columnBatchSize
+  private val originalInMemoryPartitionPruning = TestHive.conf.inMemoryPartitionPruning
 
   def testCases = hiveQueryDir.listFiles.map(f => f.getName.stripSuffix(".q") -> f)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/14e3f114/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index 09ff4cc..9aeebd7 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -71,8 +71,9 @@ class LocalHiveContext(sc: SparkContext) extends HiveContext(sc) {
 class HiveContext(sc: SparkContext) extends SQLContext(sc) {
   self =>
 
-  // Change the default SQL dialect to HiveQL
-  override private[spark] def dialect: String = getConf(SQLConf.DIALECT, "hiveql")
+  private[sql] override lazy val conf: SQLConf = new SQLConf {
+    override def dialect: String = getConf(SQLConf.DIALECT, "hiveql")
+  }
 
   /**
    * When true, enables an experimental feature where metastore tables that use the parquet SerDe
@@ -87,12 +88,12 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
 
   override def sql(sqlText: String): SchemaRDD = {
     // TODO: Create a framework for registering parsers instead of just hardcoding if statements.
-    if (dialect == "sql") {
+    if (conf.dialect == "sql") {
       super.sql(sqlText)
-    } else if (dialect == "hiveql") {
+    } else if (conf.dialect == "hiveql") {
       new SchemaRDD(this, ddlParser(sqlText).getOrElse(HiveQl.parseSql(sqlText)))
     }  else {
-      sys.error(s"Unsupported SQL dialect: $dialect.  Try 'sql' or 'hiveql'")
+      sys.error(s"Unsupported SQL dialect: ${conf.dialect}.  Try 'sql' or 'hiveql'")
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/14e3f114/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index daeabb6..785a6a1 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -515,7 +515,7 @@ private[hive] case class MetastoreRelation
         // if the size is still less than zero, we use default size
         Option(totalSize).map(_.toLong).filter(_ > 0)
           .getOrElse(Option(rawDataSize).map(_.toLong).filter(_ > 0)
-          .getOrElse(sqlContext.defaultSizeInBytes)))
+          .getOrElse(sqlContext.conf.defaultSizeInBytes)))
     }
   )
 

http://git-wip-us.apache.org/repos/asf/spark/blob/14e3f114/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala
index 31c7ce9..52e1f0d 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala
@@ -102,8 +102,10 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) {
     new this.QueryExecution { val logical = plan }
 
   /** Fewer partitions to speed up testing. */
-  override private[spark] def numShufflePartitions: Int =
-    getConf(SQLConf.SHUFFLE_PARTITIONS, "5").toInt
+  private[sql] override lazy val conf: SQLConf = new SQLConf {
+    override def numShufflePartitions: Int = getConf(SQLConf.SHUFFLE_PARTITIONS, "5").toInt
+    override def dialect: String = getConf(SQLConf.DIALECT, "hiveql")
+  }
 
   /**
    * Returns the value of specified environmental variable as a [[java.io.File]] after checking

http://git-wip-us.apache.org/repos/asf/spark/blob/14e3f114/sql/hive/src/main/scala/org/apache/spark/sql/hive/api/java/JavaHiveContext.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/api/java/JavaHiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/api/java/JavaHiveContext.scala
index 1817c78..038f63f 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/api/java/JavaHiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/api/java/JavaHiveContext.scala
@@ -31,12 +31,12 @@ class JavaHiveContext(sqlContext: SQLContext) extends JavaSQLContext(sqlContext)
 
   override def sql(sqlText: String): JavaSchemaRDD = {
     // TODO: Create a framework for registering parsers instead of just hardcoding if statements.
-    if (sqlContext.dialect == "sql") {
+    if (sqlContext.conf.dialect == "sql") {
       super.sql(sqlText)
-    } else if (sqlContext.dialect == "hiveql") {
+    } else if (sqlContext.conf.dialect == "hiveql") {
       new JavaSchemaRDD(sqlContext, HiveQl.parseSql(sqlText))
     }  else {
-      sys.error(s"Unsupported SQL dialect: ${sqlContext.dialect}.  Try 'sql' or 'hiveql'")
+      sys.error(s"Unsupported SQL dialect: ${sqlContext.conf.dialect}.  Try 'sql' or 'hiveql'")
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/14e3f114/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
index a758f92..0b4e76c 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
@@ -81,7 +81,7 @@ class StatisticsSuite extends QueryTest with BeforeAndAfterAll {
 
     // TODO: How does it works? needs to add it back for other hive version.
     if (HiveShim.version =="0.12.0") {
-      assert(queryTotalSize("analyzeTable") === defaultSizeInBytes)
+      assert(queryTotalSize("analyzeTable") === conf.defaultSizeInBytes)
     }
     sql("ANALYZE TABLE analyzeTable COMPUTE STATISTICS noscan")
 
@@ -110,7 +110,7 @@ class StatisticsSuite extends QueryTest with BeforeAndAfterAll {
         |SELECT * FROM src
       """.stripMargin).collect()
 
-    assert(queryTotalSize("analyzeTable_part") === defaultSizeInBytes)
+    assert(queryTotalSize("analyzeTable_part") === conf.defaultSizeInBytes)
 
     sql("ANALYZE TABLE analyzeTable_part COMPUTE STATISTICS noscan")
 
@@ -151,8 +151,8 @@ class StatisticsSuite extends QueryTest with BeforeAndAfterAll {
       val sizes = rdd.queryExecution.analyzed.collect {
         case r if ct.runtimeClass.isAssignableFrom(r.getClass) => r.statistics.sizeInBytes
       }
-      assert(sizes.size === 2 && sizes(0) <= autoBroadcastJoinThreshold
-        && sizes(1) <= autoBroadcastJoinThreshold,
+      assert(sizes.size === 2 && sizes(0) <= conf.autoBroadcastJoinThreshold
+        && sizes(1) <= conf.autoBroadcastJoinThreshold,
         s"query should contain two relations, each of which has size smaller than autoConvertSize")
 
       // Using `sparkPlan` because for relevant patterns in HashJoin to be
@@ -163,8 +163,8 @@ class StatisticsSuite extends QueryTest with BeforeAndAfterAll {
 
       checkAnswer(rdd, expectedAnswer) // check correctness of output
 
-      TestHive.settings.synchronized {
-        val tmp = autoBroadcastJoinThreshold
+      TestHive.conf.settings.synchronized {
+        val tmp = conf.autoBroadcastJoinThreshold
 
         sql(s"""SET ${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD}=-1""")
         rdd = sql(query)
@@ -207,8 +207,8 @@ class StatisticsSuite extends QueryTest with BeforeAndAfterAll {
         .isAssignableFrom(r.getClass) =>
         r.statistics.sizeInBytes
     }
-    assert(sizes.size === 2 && sizes(1) <= autoBroadcastJoinThreshold
-      && sizes(0) <= autoBroadcastJoinThreshold,
+    assert(sizes.size === 2 && sizes(1) <= conf.autoBroadcastJoinThreshold
+      && sizes(0) <= conf.autoBroadcastJoinThreshold,
       s"query should contain two relations, each of which has size smaller than autoConvertSize")
 
     // Using `sparkPlan` because for relevant patterns in HashJoin to be
@@ -221,8 +221,8 @@ class StatisticsSuite extends QueryTest with BeforeAndAfterAll {
 
     checkAnswer(rdd, answer) // check correctness of output
 
-    TestHive.settings.synchronized {
-      val tmp = autoBroadcastJoinThreshold
+    TestHive.conf.settings.synchronized {
+      val tmp = conf.autoBroadcastJoinThreshold
 
       sql(s"SET ${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD}=-1")
       rdd = sql(leftSemiJoinQuery)

http://git-wip-us.apache.org/repos/asf/spark/blob/14e3f114/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
index 4decd15..c14f0d2 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
@@ -847,7 +847,7 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {
         case Row(key: String, value: String) => key -> value
         case Row(KV(key, value)) => key -> value
       }.toSet
-    clear()
+    conf.clear()
 
     // "SET" itself returns all config variables currently specified in SQLConf.
     // TODO: Should we be listing the default here always? probably...
@@ -879,7 +879,7 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {
       collectResults(sql(s"SET $nonexistentKey"))
     }
 
-    clear()
+    conf.clear()
   }
 
   createQueryTest("select from thrift based table",


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org