You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2015/06/18 08:22:59 UTC

[1/2] spark git commit: [SPARK-7961][SQL]Refactor SQLConf to display better error message

Repository: spark
Updated Branches:
  refs/heads/master 9db73ec12 -> 78a430ea4


http://git-wip-us.apache.org/repos/asf/spark/blob/78a430ea/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
----------------------------------------------------------------------
diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
index 178bd1f..301aa5a 100644
--- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
+++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
@@ -113,8 +113,8 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
     withJdbcStatement { statement =>
       val resultSet = statement.executeQuery("SET spark.sql.hive.version")
       resultSet.next()
-      assert(resultSet.getString(1) ===
-        s"spark.sql.hive.version=${HiveContext.hiveExecutionVersion}")
+      assert(resultSet.getString(1) === "spark.sql.hive.version")
+      assert(resultSet.getString(2) === HiveContext.hiveExecutionVersion)
     }
   }
 
@@ -238,7 +238,7 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
       // first session, we get the default value of the session status
       { statement =>
 
-        val rs1 = statement.executeQuery(s"SET ${SQLConf.SHUFFLE_PARTITIONS}")
+        val rs1 = statement.executeQuery(s"SET ${SQLConf.SHUFFLE_PARTITIONS.key}")
         rs1.next()
         defaultV1 = rs1.getString(1)
         assert(defaultV1 != "200")
@@ -256,19 +256,21 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
       { statement =>
 
         val queries = Seq(
-            s"SET ${SQLConf.SHUFFLE_PARTITIONS}=291",
+            s"SET ${SQLConf.SHUFFLE_PARTITIONS.key}=291",
             "SET hive.cli.print.header=true"
             )
 
         queries.map(statement.execute)
-        val rs1 = statement.executeQuery(s"SET ${SQLConf.SHUFFLE_PARTITIONS}")
+        val rs1 = statement.executeQuery(s"SET ${SQLConf.SHUFFLE_PARTITIONS.key}")
         rs1.next()
-        assert("spark.sql.shuffle.partitions=291" === rs1.getString(1))
+        assert("spark.sql.shuffle.partitions" === rs1.getString(1))
+        assert("291" === rs1.getString(2))
         rs1.close()
 
         val rs2 = statement.executeQuery("SET hive.cli.print.header")
         rs2.next()
-        assert("hive.cli.print.header=true" === rs2.getString(1))
+        assert("hive.cli.print.header" === rs2.getString(1))
+        assert("true" === rs2.getString(2))
         rs2.close()
       },
 
@@ -276,7 +278,7 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
       // default value
       { statement =>
 
-        val rs1 = statement.executeQuery(s"SET ${SQLConf.SHUFFLE_PARTITIONS}")
+        val rs1 = statement.executeQuery(s"SET ${SQLConf.SHUFFLE_PARTITIONS.key}")
         rs1.next()
         assert(defaultV1 === rs1.getString(1))
         rs1.close()
@@ -404,8 +406,8 @@ class HiveThriftHttpServerSuite extends HiveThriftJdbcTest {
     withJdbcStatement { statement =>
       val resultSet = statement.executeQuery("SET spark.sql.hive.version")
       resultSet.next()
-      assert(resultSet.getString(1) ===
-        s"spark.sql.hive.version=${HiveContext.hiveExecutionVersion}")
+      assert(resultSet.getString(1) === "spark.sql.hive.version")
+      assert(resultSet.getString(2) === HiveContext.hiveExecutionVersion)
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/78a430ea/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
index 82c0b49..432de25 100644
--- a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
+++ b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
@@ -47,17 +47,17 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
     // Add Locale setting
     Locale.setDefault(Locale.US)
     // Set a relatively small column batch size for testing purposes
-    TestHive.setConf(SQLConf.COLUMN_BATCH_SIZE, "5")
+    TestHive.setConf(SQLConf.COLUMN_BATCH_SIZE, 5)
     // Enable in-memory partition pruning for testing purposes
-    TestHive.setConf(SQLConf.IN_MEMORY_PARTITION_PRUNING, "true")
+    TestHive.setConf(SQLConf.IN_MEMORY_PARTITION_PRUNING, true)
   }
 
   override def afterAll() {
     TestHive.cacheTables = false
     TimeZone.setDefault(originalTimeZone)
     Locale.setDefault(originalLocale)
-    TestHive.setConf(SQLConf.COLUMN_BATCH_SIZE, originalColumnBatchSize.toString)
-    TestHive.setConf(SQLConf.IN_MEMORY_PARTITION_PRUNING, originalInMemoryPartitionPruning.toString)
+    TestHive.setConf(SQLConf.COLUMN_BATCH_SIZE, originalColumnBatchSize)
+    TestHive.setConf(SQLConf.IN_MEMORY_PARTITION_PRUNING, originalInMemoryPartitionPruning)
   }
 
   /** A list of tests deemed out of scope currently and thus completely disregarded. */

http://git-wip-us.apache.org/repos/asf/spark/blob/78a430ea/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/SortMergeCompatibilitySuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/SortMergeCompatibilitySuite.scala b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/SortMergeCompatibilitySuite.scala
index 65d070b..f458567 100644
--- a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/SortMergeCompatibilitySuite.scala
+++ b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/SortMergeCompatibilitySuite.scala
@@ -26,11 +26,11 @@ import org.apache.spark.sql.hive.test.TestHive
 class SortMergeCompatibilitySuite extends HiveCompatibilitySuite {
   override def beforeAll() {
     super.beforeAll()
-    TestHive.setConf(SQLConf.SORTMERGE_JOIN, "true")
+    TestHive.setConf(SQLConf.SORTMERGE_JOIN, true)
   }
 
   override def afterAll() {
-    TestHive.setConf(SQLConf.SORTMERGE_JOIN, "false")
+    TestHive.setConf(SQLConf.SORTMERGE_JOIN, false)
     super.afterAll()
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/78a430ea/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index c50835d..4a66d65 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -21,15 +21,13 @@ import java.io.File
 import java.net.{URL, URLClassLoader}
 import java.sql.Timestamp
 
-import org.apache.hadoop.hive.common.StatsSetupConst
-import org.apache.hadoop.hive.common.`type`.HiveDecimal
-import org.apache.spark.sql.catalyst.ParserDialect
-
 import scala.collection.JavaConversions._
 import scala.collection.mutable.HashMap
 import scala.language.implicitConversions
 
 import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.hive.common.StatsSetupConst
+import org.apache.hadoop.hive.common.`type`.HiveDecimal
 import org.apache.hadoop.hive.conf.HiveConf
 import org.apache.hadoop.hive.ql.metadata.Table
 import org.apache.hadoop.hive.ql.parse.VariableSubstitution
@@ -39,6 +37,9 @@ import org.apache.hadoop.hive.serde2.io.{DateWritable, TimestampWritable}
 import org.apache.spark.SparkContext
 import org.apache.spark.annotation.Experimental
 import org.apache.spark.sql._
+import org.apache.spark.sql.SQLConf.SQLConfEntry
+import org.apache.spark.sql.SQLConf.SQLConfEntry._
+import org.apache.spark.sql.catalyst.ParserDialect
 import org.apache.spark.sql.catalyst.analysis._
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.execution.{ExecutedCommand, ExtractPythonUdfs, SetCommand}
@@ -69,13 +70,14 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
 
   import HiveContext._
 
+  println("create HiveContext")
+
   /**
    * When true, enables an experimental feature where metastore tables that use the parquet SerDe
    * are automatically converted to use the Spark SQL parquet table scan, instead of the Hive
    * SerDe.
    */
-  protected[sql] def convertMetastoreParquet: Boolean =
-    getConf("spark.sql.hive.convertMetastoreParquet", "true") == "true"
+  protected[sql] def convertMetastoreParquet: Boolean = getConf(CONVERT_METASTORE_PARQUET)
 
   /**
    * When true, also tries to merge possibly different but compatible Parquet schemas in different
@@ -84,7 +86,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
    * This configuration is only effective when "spark.sql.hive.convertMetastoreParquet" is true.
    */
   protected[sql] def convertMetastoreParquetWithSchemaMerging: Boolean =
-    getConf("spark.sql.hive.convertMetastoreParquet.mergeSchema", "false") == "true"
+    getConf(CONVERT_METASTORE_PARQUET_WITH_SCHEMA_MERGING)
 
   /**
    * When true, a table created by a Hive CTAS statement (no USING clause) will be
@@ -98,8 +100,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
    *   - The CTAS statement specifies SequenceFile (STORED AS SEQUENCEFILE) as the file format
    *     and no SerDe is specified (no ROW FORMAT SERDE clause).
    */
-  protected[sql] def convertCTAS: Boolean =
-    getConf("spark.sql.hive.convertCTAS", "false").toBoolean
+  protected[sql] def convertCTAS: Boolean = getConf(CONVERT_CTAS)
 
   /**
    * The version of the hive client that will be used to communicate with the metastore.  Note that
@@ -117,8 +118,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
    *              option is only valid when using the execution version of Hive.
    *  - maven - download the correct version of hive on demand from maven.
    */
-  protected[hive] def hiveMetastoreJars: String =
-    getConf(HIVE_METASTORE_JARS, "builtin")
+  protected[hive] def hiveMetastoreJars: String = getConf(HIVE_METASTORE_JARS)
 
   /**
    * A comma separated list of class prefixes that should be loaded using the classloader that
@@ -128,11 +128,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
    * custom appenders that are used by log4j.
    */
   protected[hive] def hiveMetastoreSharedPrefixes: Seq[String] =
-    getConf("spark.sql.hive.metastore.sharedPrefixes", jdbcPrefixes)
-      .split(",").filterNot(_ == "")
-
-  private def jdbcPrefixes = Seq(
-    "com.mysql.jdbc", "org.postgresql", "com.microsoft.sqlserver", "oracle.jdbc").mkString(",")
+    getConf(HIVE_METASTORE_SHARED_PREFIXES).filterNot(_ == "")
 
   /**
    * A comma separated list of class prefixes that should explicitly be reloaded for each version
@@ -140,14 +136,12 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
    * prefix that typically would be shared (i.e. org.apache.spark.*)
    */
   protected[hive] def hiveMetastoreBarrierPrefixes: Seq[String] =
-    getConf("spark.sql.hive.metastore.barrierPrefixes", "")
-      .split(",").filterNot(_ == "")
+    getConf(HIVE_METASTORE_BARRIER_PREFIXES).filterNot(_ == "")
 
   /*
    * hive thrift server use background spark sql thread pool to execute sql queries
    */
-  protected[hive] def hiveThriftServerAsync: Boolean =
-    getConf("spark.sql.hive.thriftServer.async", "true").toBoolean
+  protected[hive] def hiveThriftServerAsync: Boolean = getConf(HIVE_THRIFT_SERVER_ASYNC)
 
   @transient
   protected[sql] lazy val substitutor = new VariableSubstitution()
@@ -364,7 +358,11 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
     hiveconf.set(key, value)
   }
 
-  /* A catalyst metadata catalog that points to the Hive Metastore. */
+  private[sql] override def setConf[T](entry: SQLConfEntry[T], value: T): Unit = {
+    setConf(entry.key, entry.stringConverter(value))
+  }
+
+    /* A catalyst metadata catalog that points to the Hive Metastore. */
   @transient
   override protected[sql] lazy val catalog =
     new HiveMetastoreCatalog(metadataHive, this) with OverrideCatalog
@@ -402,8 +400,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
   protected[hive] class SQLSession extends super.SQLSession {
     protected[sql] override lazy val conf: SQLConf = new SQLConf {
       override def dialect: String = getConf(SQLConf.DIALECT, "hiveql")
-      override def caseSensitiveAnalysis: Boolean =
-        getConf(SQLConf.CASE_SENSITIVE, "false").toBoolean
+      override def caseSensitiveAnalysis: Boolean = getConf(SQLConf.CASE_SENSITIVE, false)
     }
 
     /**
@@ -519,7 +516,50 @@ private[hive] object HiveContext {
   val hiveExecutionVersion: String = "0.13.1"
 
   val HIVE_METASTORE_VERSION: String = "spark.sql.hive.metastore.version"
-  val HIVE_METASTORE_JARS: String = "spark.sql.hive.metastore.jars"
+  val HIVE_METASTORE_JARS = stringConf("spark.sql.hive.metastore.jars",
+    defaultValue = Some("builtin"),
+    doc = "Location of the jars that should be used to instantiate the HiveMetastoreClient. This" +
+      " property can be one of three options: " +
+      "1. \"builtin\" Use Hive 0.13.1, which is bundled with the Spark assembly jar when " +
+      "<code>-Phive</code> is enabled. When this option is chosen, " +
+      "spark.sql.hive.metastore.version must be either <code>0.13.1</code> or not defined. " +
+      "2. \"maven\" Use Hive jars of specified version downloaded from Maven repositories." +
+      "3. A classpath in the standard format for both Hive and Hadoop.")
+
+  val CONVERT_METASTORE_PARQUET = booleanConf("spark.sql.hive.convertMetastoreParquet",
+    defaultValue = Some(true),
+    doc = "When set to false, Spark SQL will use the Hive SerDe for parquet tables instead of " +
+      "the built in support.")
+
+  val CONVERT_METASTORE_PARQUET_WITH_SCHEMA_MERGING = booleanConf(
+    "spark.sql.hive.convertMetastoreParquet.mergeSchema",
+    defaultValue = Some(false),
+    doc = "TODO")
+
+  val CONVERT_CTAS = booleanConf("spark.sql.hive.convertCTAS",
+    defaultValue = Some(false),
+    doc = "TODO")
+
+  val HIVE_METASTORE_SHARED_PREFIXES = stringSeqConf("spark.sql.hive.metastore.sharedPrefixes",
+    defaultValue = Some(jdbcPrefixes),
+    doc = "A comma separated list of class prefixes that should be loaded using the classloader " +
+      "that is shared between Spark SQL and a specific version of Hive. An example of classes " +
+      "that should be shared is JDBC drivers that are needed to talk to the metastore. Other " +
+      "classes that need to be shared are those that interact with classes that are already " +
+      "shared. For example, custom appenders that are used by log4j.")
+
+  private def jdbcPrefixes = Seq(
+    "com.mysql.jdbc", "org.postgresql", "com.microsoft.sqlserver", "oracle.jdbc")
+
+  val HIVE_METASTORE_BARRIER_PREFIXES = stringSeqConf("spark.sql.hive.metastore.barrierPrefixes",
+    defaultValue = Some(Seq()),
+    doc = "A comma separated list of class prefixes that should explicitly be reloaded for each " +
+      "version of Hive that Spark SQL is communicating with. For example, Hive UDFs that are " +
+      "declared in a prefix that typically would be shared (i.e. <code>org.apache.spark.*</code>).")
+
+  val HIVE_THRIFT_SERVER_ASYNC = booleanConf("spark.sql.hive.thriftServer.async",
+    defaultValue = Some(true),
+    doc = "TODO")
 
   /** Constructs a configuration for hive, where the metastore is located in a temp directory. */
   def newTemporaryConfiguration(): Map[String, String] = {

http://git-wip-us.apache.org/repos/asf/spark/blob/78a430ea/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
index 9215509..f901bd8 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
@@ -112,12 +112,11 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) {
   protected[hive] class SQLSession extends super.SQLSession {
     /** Fewer partitions to speed up testing. */
     protected[sql] override lazy val conf: SQLConf = new SQLConf {
-      override def numShufflePartitions: Int = getConf(SQLConf.SHUFFLE_PARTITIONS, "5").toInt
+      override def numShufflePartitions: Int = getConf(SQLConf.SHUFFLE_PARTITIONS, 5)
       // TODO as in unit test, conf.clear() probably be called, all of the value will be cleared.
       // The super.getConf(SQLConf.DIALECT) is "sql" by default, we need to set it as "hiveql"
       override def dialect: String = super.getConf(SQLConf.DIALECT, "hiveql")
-      override def caseSensitiveAnalysis: Boolean =
-        getConf(SQLConf.CASE_SENSITIVE, "false").toBoolean
+      override def caseSensitiveAnalysis: Boolean = getConf(SQLConf.CASE_SENSITIVE, false)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/78a430ea/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala
index a0d80dc..af68615 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala
@@ -81,11 +81,11 @@ class HiveParquetSuite extends QueryTest with ParquetTest {
     }
   }
 
-  withSQLConf(SQLConf.PARQUET_USE_DATA_SOURCE_API -> "true") {
+  withSQLConf(SQLConf.PARQUET_USE_DATA_SOURCE_API.key -> "true") {
     run("Parquet data source enabled")
   }
 
-  withSQLConf(SQLConf.PARQUET_USE_DATA_SOURCE_API -> "false") {
+  withSQLConf(SQLConf.PARQUET_USE_DATA_SOURCE_API.key -> "false") {
     run("Parquet data source disabled")
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/78a430ea/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index 79a85b2..cc294bc 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -456,7 +456,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with BeforeA
       withTable("savedJsonTable") {
         val df = (1 to 10).map(i => i -> s"str$i").toDF("a", "b")
 
-        withSQLConf(SQLConf.DEFAULT_DATA_SOURCE_NAME -> "json") {
+        withSQLConf(SQLConf.DEFAULT_DATA_SOURCE_NAME.key -> "json") {
           // Save the df as a managed table (by not specifying the path).
           df.write.saveAsTable("savedJsonTable")
 
@@ -484,7 +484,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with BeforeA
         }
 
         // Create an external table by specifying the path.
-        withSQLConf(SQLConf.DEFAULT_DATA_SOURCE_NAME -> "not a source name") {
+        withSQLConf(SQLConf.DEFAULT_DATA_SOURCE_NAME.key -> "not a source name") {
           df.write
             .format("org.apache.spark.sql.json")
             .mode(SaveMode.Append)
@@ -508,7 +508,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with BeforeA
           s"""{ "a": $i, "b": "str$i" }"""
         }))
 
-        withSQLConf(SQLConf.DEFAULT_DATA_SOURCE_NAME -> "not a source name") {
+        withSQLConf(SQLConf.DEFAULT_DATA_SOURCE_NAME.key -> "not a source name") {
           df.write
             .format("json")
             .mode(SaveMode.Append)
@@ -516,7 +516,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with BeforeA
             .saveAsTable("savedJsonTable")
         }
 
-        withSQLConf(SQLConf.DEFAULT_DATA_SOURCE_NAME -> "json") {
+        withSQLConf(SQLConf.DEFAULT_DATA_SOURCE_NAME.key -> "json") {
           createExternalTable("createdJsonTable", tempPath.toString)
           assert(table("createdJsonTable").schema === df.schema)
           checkAnswer(sql("SELECT * FROM createdJsonTable"), df)
@@ -533,7 +533,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with BeforeA
         checkAnswer(read.json(tempPath.toString), df)
 
         // Try to specify the schema.
-        withSQLConf(SQLConf.DEFAULT_DATA_SOURCE_NAME -> "not a source name") {
+        withSQLConf(SQLConf.DEFAULT_DATA_SOURCE_NAME.key -> "not a source name") {
           val schema = StructType(StructField("b", StringType, true) :: Nil)
           createExternalTable(
             "createdJsonTable",
@@ -563,8 +563,8 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with BeforeA
 
   test("scan a parquet table created through a CTAS statement") {
     withSQLConf(
-      "spark.sql.hive.convertMetastoreParquet" -> "true",
-      SQLConf.PARQUET_USE_DATA_SOURCE_API -> "true") {
+      HiveContext.CONVERT_METASTORE_PARQUET.key -> "true",
+      SQLConf.PARQUET_USE_DATA_SOURCE_API.key -> "true") {
 
       withTempTable("jt") {
         (1 to 10).map(i => i -> s"str$i").toDF("a", "b").registerTempTable("jt")
@@ -706,7 +706,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with BeforeA
   }
 
   test("SPARK-6024 wide schema support") {
-    withSQLConf(SQLConf.SCHEMA_STRING_LENGTH_THRESHOLD -> "4000") {
+    withSQLConf(SQLConf.SCHEMA_STRING_LENGTH_THRESHOLD.key -> "4000") {
       withTable("wide_schema") {
         // We will need 80 splits for this schema if the threshold is 4000.
         val schema = StructType((1 to 5000).map(i => StructField(s"c_$i", StringType, true)))

http://git-wip-us.apache.org/repos/asf/spark/blob/78a430ea/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
index 78c94e6..f067ea0 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
@@ -167,7 +167,7 @@ class StatisticsSuite extends QueryTest with BeforeAndAfterAll {
       ctx.conf.settings.synchronized {
         val tmp = ctx.conf.autoBroadcastJoinThreshold
 
-        sql(s"""SET ${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD}=-1""")
+        sql(s"""SET ${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key}=-1""")
         df = sql(query)
         bhj = df.queryExecution.sparkPlan.collect { case j: BroadcastHashJoin => j }
         assert(bhj.isEmpty, "BroadcastHashJoin still planned even though it is switched off")
@@ -176,7 +176,7 @@ class StatisticsSuite extends QueryTest with BeforeAndAfterAll {
         assert(shj.size === 1,
           "ShuffledHashJoin should be planned when BroadcastHashJoin is turned off")
 
-        sql(s"""SET ${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD}=$tmp""")
+        sql(s"""SET ${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key}=$tmp""")
       }
 
       after()
@@ -225,7 +225,7 @@ class StatisticsSuite extends QueryTest with BeforeAndAfterAll {
     ctx.conf.settings.synchronized {
       val tmp = ctx.conf.autoBroadcastJoinThreshold
 
-      sql(s"SET ${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD}=-1")
+      sql(s"SET ${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key}=-1")
       df = sql(leftSemiJoinQuery)
       bhj = df.queryExecution.sparkPlan.collect {
         case j: BroadcastLeftSemiJoinHash => j
@@ -238,7 +238,7 @@ class StatisticsSuite extends QueryTest with BeforeAndAfterAll {
       assert(shj.size === 1,
         "LeftSemiJoinHash should be planned when BroadcastHashJoin is turned off")
 
-      sql(s"SET ${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD}=$tmp")
+      sql(s"SET ${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key}=$tmp")
     }
 
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/78a430ea/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
index 6d8d99e..51dabc6 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
@@ -1084,14 +1084,16 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {
     val testKey = "spark.sql.key.usedfortestonly"
     val testVal = "test.val.0"
     val nonexistentKey = "nonexistent"
-    val KV = "([^=]+)=([^=]*)".r
-    def collectResults(df: DataFrame): Set[(String, String)] =
+    def collectResults(df: DataFrame): Set[Any] =
       df.collect().map {
         case Row(key: String, value: String) => key -> value
-        case Row(KV(key, value)) => key -> value
+        case Row(key: String, defaultValue: String, doc: String) => (key, defaultValue, doc)
       }.toSet
     conf.clear()
 
+    val expectedConfs = conf.getAllDefinedConfs.toSet
+    assertResult(expectedConfs)(collectResults(sql("SET -v")))
+
     // "SET" itself returns all config variables currently specified in SQLConf.
     // TODO: Should we be listing the default here always? probably...
     assert(sql("SET").collect().size == 0)
@@ -1102,16 +1104,12 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {
 
     assert(hiveconf.get(testKey, "") == testVal)
     assertResult(Set(testKey -> testVal))(collectResults(sql("SET")))
-    assertResult(Set(testKey -> testVal))(collectResults(sql("SET -v")))
 
     sql(s"SET ${testKey + testKey}=${testVal + testVal}")
     assert(hiveconf.get(testKey + testKey, "") == testVal + testVal)
     assertResult(Set(testKey -> testVal, (testKey + testKey) -> (testVal + testVal))) {
       collectResults(sql("SET"))
     }
-    assertResult(Set(testKey -> testVal, (testKey + testKey) -> (testVal + testVal))) {
-      collectResults(sql("SET -v"))
-    }
 
     // "SET key"
     assertResult(Set(testKey -> testVal)) {

http://git-wip-us.apache.org/repos/asf/spark/blob/78a430ea/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index 984d97d..e1c9926 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -24,7 +24,7 @@ import org.apache.spark.sql._
 import org.apache.spark.sql.hive.test.TestHive
 import org.apache.spark.sql.hive.test.TestHive._
 import org.apache.spark.sql.hive.test.TestHive.implicits._
-import org.apache.spark.sql.hive.{HiveQLDialect, MetastoreRelation}
+import org.apache.spark.sql.hive.{HiveContext, HiveQLDialect, MetastoreRelation}
 import org.apache.spark.sql.parquet.ParquetRelation2
 import org.apache.spark.sql.sources.LogicalRelation
 import org.apache.spark.sql.types._
@@ -191,9 +191,9 @@ class SQLQuerySuite extends QueryTest {
       }
     }
 
-    val originalConf = getConf("spark.sql.hive.convertCTAS", "false")
+    val originalConf = convertCTAS
 
-    setConf("spark.sql.hive.convertCTAS", "true")
+    setConf(HiveContext.CONVERT_CTAS, true)
 
     sql("CREATE TABLE ctas1 AS SELECT key k, value FROM src ORDER BY k, value")
     sql("CREATE TABLE IF NOT EXISTS ctas1 AS SELECT key k, value FROM src ORDER BY k, value")
@@ -235,7 +235,7 @@ class SQLQuerySuite extends QueryTest {
     checkRelation("ctas1", false)
     sql("DROP TABLE ctas1")
 
-    setConf("spark.sql.hive.convertCTAS", originalConf)
+    setConf(HiveContext.CONVERT_CTAS, originalConf)
   }
 
   test("SQL Dialect Switching") {
@@ -332,7 +332,7 @@ class SQLQuerySuite extends QueryTest {
 
     val origUseParquetDataSource = conf.parquetUseDataSourceApi
     try {
-      setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, "false")
+      setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, false)
       sql(
         """CREATE TABLE ctas5
           | STORED AS parquet AS
@@ -348,7 +348,7 @@ class SQLQuerySuite extends QueryTest {
         "MANAGED_TABLE"
       )
 
-      val default = getConf("spark.sql.hive.convertMetastoreParquet", "true")
+      val default = convertMetastoreParquet
       // use the Hive SerDe for parquet tables
       sql("set spark.sql.hive.convertMetastoreParquet = false")
       checkAnswer(
@@ -356,7 +356,7 @@ class SQLQuerySuite extends QueryTest {
         sql("SELECT key, value FROM src ORDER BY key, value").collect().toSeq)
       sql(s"set spark.sql.hive.convertMetastoreParquet = $default")
     } finally {
-      setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, origUseParquetDataSource.toString)
+      setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, origUseParquetDataSource)
     }
   }
 
@@ -603,8 +603,8 @@ class SQLQuerySuite extends QueryTest {
     // generates an invalid query plan.
     val rdd = sparkContext.makeRDD((1 to 5).map(i => s"""{"a":[$i, ${i + 1}]}"""))
     read.json(rdd).registerTempTable("data")
-    val originalConf = getConf("spark.sql.hive.convertCTAS", "false")
-    setConf("spark.sql.hive.convertCTAS", "false")
+    val originalConf = convertCTAS
+    setConf(HiveContext.CONVERT_CTAS, false)
 
     sql("CREATE TABLE explodeTest (key bigInt)")
     table("explodeTest").queryExecution.analyzed match {
@@ -621,7 +621,7 @@ class SQLQuerySuite extends QueryTest {
 
     sql("DROP TABLE explodeTest")
     dropTempTable("data")
-    setConf("spark.sql.hive.convertCTAS", originalConf)
+    setConf(HiveContext.CONVERT_CTAS, originalConf)
   }
 
   test("sanity test for SPARK-6618") {

http://git-wip-us.apache.org/repos/asf/spark/blob/78a430ea/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
index 3864349..c2e0980 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
@@ -153,7 +153,7 @@ class ParquetMetastoreSuiteBase extends ParquetPartitioningTest {
     val rdd2 = sparkContext.parallelize((1 to 10).map(i => s"""{"a":[$i, null]}"""))
     read.json(rdd2).registerTempTable("jt_array")
 
-    setConf("spark.sql.hive.convertMetastoreParquet", "true")
+    setConf(HiveContext.CONVERT_METASTORE_PARQUET, true)
   }
 
   override def afterAll(): Unit = {
@@ -164,7 +164,7 @@ class ParquetMetastoreSuiteBase extends ParquetPartitioningTest {
     sql("DROP TABLE normal_parquet")
     sql("DROP TABLE IF EXISTS jt")
     sql("DROP TABLE IF EXISTS jt_array")
-    setConf("spark.sql.hive.convertMetastoreParquet", "false")
+    setConf(HiveContext.CONVERT_METASTORE_PARQUET, false)
   }
 
   test(s"conversion is working") {
@@ -199,14 +199,14 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase {
         |  OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
       """.stripMargin)
 
-    conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, "true")
+    conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, true)
   }
 
   override def afterAll(): Unit = {
     super.afterAll()
     sql("DROP TABLE IF EXISTS test_parquet")
 
-    setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf.toString)
+    setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf)
   }
 
   test("scan an empty parquet table") {
@@ -546,12 +546,12 @@ class ParquetDataSourceOffMetastoreSuite extends ParquetMetastoreSuiteBase {
 
   override def beforeAll(): Unit = {
     super.beforeAll()
-    conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, "false")
+    conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, false)
   }
 
   override def afterAll(): Unit = {
     super.afterAll()
-    setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf.toString)
+    setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf)
   }
 
   test("MetastoreRelation in InsertIntoTable will not be converted") {
@@ -692,12 +692,12 @@ class ParquetDataSourceOnSourceSuite extends ParquetSourceSuiteBase {
 
   override def beforeAll(): Unit = {
     super.beforeAll()
-    conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, "true")
+    conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, true)
   }
 
   override def afterAll(): Unit = {
     super.afterAll()
-    setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf.toString)
+    setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf)
   }
 
   test("values in arrays and maps stored in parquet are always nullable") {
@@ -750,12 +750,12 @@ class ParquetDataSourceOffSourceSuite extends ParquetSourceSuiteBase {
 
   override def beforeAll(): Unit = {
     super.beforeAll()
-    conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, "false")
+    conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, false)
   }
 
   override def afterAll(): Unit = {
     super.afterAll()
-    setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf.toString)
+    setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf)
   }
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


[2/2] spark git commit: [SPARK-7961][SQL]Refactor SQLConf to display better error message

Posted by rx...@apache.org.
[SPARK-7961][SQL]Refactor SQLConf to display better error message

1. Add `SQLConfEntry` to store the information about a configuration. For those configurations that cannot be found in `sql-programming-guide.md`, I left the doc as `<TODO>`.
2. Verify the value when setting a configuration if this is in SQLConf.
3. Use `SET -v` to display all public configurations.

Author: zsxwing <zs...@gmail.com>

Closes #6747 from zsxwing/sqlconf and squashes the following commits:

7d09bad [zsxwing] Use SQLConfEntry in HiveContext
49f6213 [zsxwing] Add getConf, setConf to SQLContext and HiveContext
e014f53 [zsxwing] Merge branch 'master' into sqlconf
93dad8e [zsxwing] Fix the unit tests
cf950c1 [zsxwing] Fix the code style and tests
3c5f03e [zsxwing] Add unsetConf(SQLConfEntry) and fix the code style
a2f4add [zsxwing] getConf will return the default value if a config is not set
037b1db [zsxwing] Add schema to SetCommand
0520c3c [zsxwing] Merge branch 'master' into sqlconf
7afb0ec [zsxwing] Fix the configurations about HiveThriftServer
7e728e3 [zsxwing] Add doc for SQLConfEntry and fix 'toString'
5e95b10 [zsxwing] Add enumConf
c6ba76d [zsxwing] setRawString => setConfString, getRawString => getConfString
4abd807 [zsxwing] Fix the test for 'set -v'
6e47e56 [zsxwing] Fix the compilation error
8973ced [zsxwing] Remove floatConf
1fc3a8b [zsxwing] Remove the 'conf' command and use 'set -v' instead
99c9c16 [zsxwing] Fix tests that use SQLConfEntry as a string
88a03cc [zsxwing] Add new lines between confs and return types
ce7c6c8 [zsxwing] Remove seqConf
f3c1b33 [zsxwing] Refactor SQLConf to display better error message


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/78a430ea
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/78a430ea
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/78a430ea

Branch: refs/heads/master
Commit: 78a430ea4d2aef58a8bf38ce488553ca6acea428
Parents: 9db73ec
Author: zsxwing <zs...@gmail.com>
Authored: Wed Jun 17 23:22:54 2015 -0700
Committer: Reynold Xin <rx...@databricks.com>
Committed: Wed Jun 17 23:22:54 2015 -0700

----------------------------------------------------------------------
 docs/sql-programming-guide.md                   |   4 +-
 .../scala/org/apache/spark/sql/SQLConf.scala    | 493 +++++++++++++++----
 .../scala/org/apache/spark/sql/SQLContext.scala |  25 +-
 .../org/apache/spark/sql/SparkSQLParser.scala   |   4 +-
 .../apache/spark/sql/execution/commands.scala   |  98 +++-
 .../spark/sql/execution/debug/package.scala     |   2 +-
 .../sql/parquet/ParquetTableOperations.scala    |   8 +-
 .../apache/spark/sql/parquet/newParquet.scala   |   6 +-
 .../org/apache/spark/sql/sources/commands.scala |   2 +-
 .../apache/spark/sql/test/TestSQLContext.scala  |   2 +-
 .../spark/sql/DataFrameAggregateSuite.scala     |   4 +-
 .../org/apache/spark/sql/DataFrameSuite.scala   |  14 +-
 .../scala/org/apache/spark/sql/JoinSuite.scala  |  14 +-
 .../apache/spark/sql/SQLConfEntrySuite.scala    | 150 ++++++
 .../org/apache/spark/sql/SQLConfSuite.scala     |  10 +-
 .../org/apache/spark/sql/SQLQuerySuite.scala    |  42 +-
 .../columnar/PartitionBatchPruningSuite.scala   |   8 +-
 .../spark/sql/execution/PlannerSuite.scala      |   8 +-
 .../org/apache/spark/sql/json/JsonSuite.scala   |   4 +-
 .../spark/sql/parquet/ParquetFilterSuite.scala  |  14 +-
 .../spark/sql/parquet/ParquetIOSuite.scala      |  16 +-
 .../spark/sql/parquet/ParquetQuerySuite.scala   |   8 +-
 .../spark/sql/sources/DataSourceTest.scala      |   2 +-
 .../apache/spark/sql/test/SQLTestUtils.scala    |   6 +-
 .../hive/thriftserver/HiveThriftServer2.scala   |   4 +-
 .../SparkExecuteStatementOperation.scala        |   2 +-
 .../thriftserver/HiveThriftServer2Suites.scala  |  22 +-
 .../hive/execution/HiveCompatibilitySuite.scala |   8 +-
 .../execution/SortMergeCompatibilitySuite.scala |   4 +-
 .../org/apache/spark/sql/hive/HiveContext.scala |  88 +++-
 .../apache/spark/sql/hive/test/TestHive.scala   |   5 +-
 .../spark/sql/hive/HiveParquetSuite.scala       |   4 +-
 .../sql/hive/MetastoreDataSourcesSuite.scala    |  16 +-
 .../apache/spark/sql/hive/StatisticsSuite.scala |   8 +-
 .../sql/hive/execution/HiveQuerySuite.scala     |  12 +-
 .../sql/hive/execution/SQLQuerySuite.scala      |  20 +-
 .../apache/spark/sql/hive/parquetSuites.scala   |  20 +-
 37 files changed, 861 insertions(+), 296 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/78a430ea/docs/sql-programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md
index 61f9c5f..c6e6ec8 100644
--- a/docs/sql-programming-guide.md
+++ b/docs/sql-programming-guide.md
@@ -1220,7 +1220,7 @@ Configuration of Parquet can be done using the `setConf` method on `SQLContext`
   <td>false</td>
   <td>
     Some other Parquet-producing systems, in particular Impala and older versions of Spark SQL, do
-    not differentiate between binary data and strings when writing out the Parquet schema.  This
+    not differentiate between binary data and strings when writing out the Parquet schema. This
     flag tells Spark SQL to interpret binary data as a string to provide compatibility with these systems.
   </td>
 </tr>
@@ -1237,7 +1237,7 @@ Configuration of Parquet can be done using the `setConf` method on `SQLContext`
   <td><code>spark.sql.parquet.cacheMetadata</code></td>
   <td>true</td>
   <td>
-    Turns on caching of Parquet schema metadata.  Can speed up querying of static data.
+    Turns on caching of Parquet schema metadata. Can speed up querying of static data.
   </td>
 </tr>
 <tr>

http://git-wip-us.apache.org/repos/asf/spark/blob/78a430ea/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
index 55ab6b3..16493c3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
@@ -25,74 +25,333 @@ import scala.collection.JavaConversions._
 import org.apache.spark.sql.catalyst.CatalystConf
 
 private[spark] object SQLConf {
-  val COMPRESS_CACHED = "spark.sql.inMemoryColumnarStorage.compressed"
-  val COLUMN_BATCH_SIZE = "spark.sql.inMemoryColumnarStorage.batchSize"
-  val IN_MEMORY_PARTITION_PRUNING = "spark.sql.inMemoryColumnarStorage.partitionPruning"
-  val AUTO_BROADCASTJOIN_THRESHOLD = "spark.sql.autoBroadcastJoinThreshold"
-  val DEFAULT_SIZE_IN_BYTES = "spark.sql.defaultSizeInBytes"
-  val SHUFFLE_PARTITIONS = "spark.sql.shuffle.partitions"
-  val CODEGEN_ENABLED = "spark.sql.codegen"
-  val UNSAFE_ENABLED = "spark.sql.unsafe.enabled"
-  val DIALECT = "spark.sql.dialect"
-  val CASE_SENSITIVE = "spark.sql.caseSensitive"
-
-  val PARQUET_BINARY_AS_STRING = "spark.sql.parquet.binaryAsString"
-  val PARQUET_INT96_AS_TIMESTAMP = "spark.sql.parquet.int96AsTimestamp"
-  val PARQUET_CACHE_METADATA = "spark.sql.parquet.cacheMetadata"
-  val PARQUET_COMPRESSION = "spark.sql.parquet.compression.codec"
-  val PARQUET_FILTER_PUSHDOWN_ENABLED = "spark.sql.parquet.filterPushdown"
-  val PARQUET_USE_DATA_SOURCE_API = "spark.sql.parquet.useDataSourceApi"
-
-  val ORC_FILTER_PUSHDOWN_ENABLED = "spark.sql.orc.filterPushdown"
-
-  val HIVE_VERIFY_PARTITIONPATH = "spark.sql.hive.verifyPartitionPath"
-
-  val COLUMN_NAME_OF_CORRUPT_RECORD = "spark.sql.columnNameOfCorruptRecord"
-  val BROADCAST_TIMEOUT = "spark.sql.broadcastTimeout"
+
+  private val sqlConfEntries = java.util.Collections.synchronizedMap(
+    new java.util.HashMap[String, SQLConfEntry[_]]())
+
+  /**
+   * An entry contains all meta information for a configuration.
+   *
+   * @param key the key for the configuration
+   * @param defaultValue the default value for the configuration
+   * @param valueConverter how to convert a string to the value. It should throw an exception if the
+   *                       string does not have the required format.
+   * @param stringConverter how to convert a value to a string that the user can use it as a valid
+   *                        string value. It's usually `toString`. But sometimes, a custom converter
+   *                        is necessary. E.g., if T is List[String], `a, b, c` is better than
+   *                        `List(a, b, c)`.
+   * @param doc the document for the configuration
+   * @param isPublic if this configuration is public to the user. If it's `false`, this
+   *                 configuration is only used internally and we should not expose it to the user.
+   * @tparam T the value type
+   */
+  private[sql] class SQLConfEntry[T] private(
+      val key: String,
+      val defaultValue: Option[T],
+      val valueConverter: String => T,
+      val stringConverter: T => String,
+      val doc: String,
+      val isPublic: Boolean) {
+
+    def defaultValueString: String = defaultValue.map(stringConverter).getOrElse("<undefined>")
+
+    override def toString: String = {
+      s"SQLConfEntry(key = $key, defaultValue=$defaultValueString, doc=$doc, isPublic = $isPublic)"
+    }
+  }
+
+  private[sql] object SQLConfEntry {
+
+    private def apply[T](
+          key: String,
+          defaultValue: Option[T],
+          valueConverter: String => T,
+          stringConverter: T => String,
+          doc: String,
+          isPublic: Boolean): SQLConfEntry[T] =
+      sqlConfEntries.synchronized {
+        if (sqlConfEntries.containsKey(key)) {
+          throw new IllegalArgumentException(s"Duplicate SQLConfEntry. $key has been registered")
+        }
+        val entry =
+          new SQLConfEntry[T](key, defaultValue, valueConverter, stringConverter, doc, isPublic)
+        sqlConfEntries.put(key, entry)
+        entry
+      }
+
+    def intConf(
+          key: String,
+          defaultValue: Option[Int] = None,
+          doc: String = "",
+          isPublic: Boolean = true): SQLConfEntry[Int] =
+      SQLConfEntry(key, defaultValue, { v =>
+        try {
+          v.toInt
+        } catch {
+          case _: NumberFormatException =>
+            throw new IllegalArgumentException(s"$key should be int, but was $v")
+        }
+      }, _.toString, doc, isPublic)
+
+    def longConf(
+        key: String,
+        defaultValue: Option[Long] = None,
+        doc: String = "",
+        isPublic: Boolean = true): SQLConfEntry[Long] =
+      SQLConfEntry(key, defaultValue, { v =>
+        try {
+          v.toLong
+        } catch {
+          case _: NumberFormatException =>
+            throw new IllegalArgumentException(s"$key should be long, but was $v")
+        }
+      }, _.toString, doc, isPublic)
+
+    def doubleConf(
+        key: String,
+        defaultValue: Option[Double] = None,
+        doc: String = "",
+        isPublic: Boolean = true): SQLConfEntry[Double] =
+      SQLConfEntry(key, defaultValue, { v =>
+        try {
+          v.toDouble
+        } catch {
+          case _: NumberFormatException =>
+            throw new IllegalArgumentException(s"$key should be double, but was $v")
+        }
+      }, _.toString, doc, isPublic)
+
+    def booleanConf(
+        key: String,
+        defaultValue: Option[Boolean] = None,
+        doc: String = "",
+        isPublic: Boolean = true): SQLConfEntry[Boolean] =
+      SQLConfEntry(key, defaultValue, { v =>
+        try {
+          v.toBoolean
+        } catch {
+          case _: IllegalArgumentException =>
+            throw new IllegalArgumentException(s"$key should be boolean, but was $v")
+        }
+      }, _.toString, doc, isPublic)
+
+    def stringConf(
+        key: String,
+        defaultValue: Option[String] = None,
+        doc: String = "",
+        isPublic: Boolean = true): SQLConfEntry[String] =
+      SQLConfEntry(key, defaultValue, v => v, v => v, doc, isPublic)
+
+    def enumConf[T](
+        key: String,
+        valueConverter: String => T,
+        validValues: Set[T],
+        defaultValue: Option[T] = None,
+        doc: String = "",
+        isPublic: Boolean = true): SQLConfEntry[T] =
+      SQLConfEntry(key, defaultValue, v => {
+        val _v = valueConverter(v)
+        if (!validValues.contains(_v)) {
+          throw new IllegalArgumentException(
+            s"The value of $key should be one of ${validValues.mkString(", ")}, but was $v")
+        }
+        _v
+      }, _.toString, doc, isPublic)
+
+    def seqConf[T](
+        key: String,
+        valueConverter: String => T,
+        defaultValue: Option[Seq[T]] = None,
+        doc: String = "",
+        isPublic: Boolean = true): SQLConfEntry[Seq[T]] = {
+      SQLConfEntry(
+        key, defaultValue, _.split(",").map(valueConverter), _.mkString(","), doc, isPublic)
+    }
+
+    def stringSeqConf(
+        key: String,
+        defaultValue: Option[Seq[String]] = None,
+        doc: String = "",
+        isPublic: Boolean = true): SQLConfEntry[Seq[String]] = {
+      seqConf(key, s => s, defaultValue, doc, isPublic)
+    }
+  }
+
+  import SQLConfEntry._
+
+  val COMPRESS_CACHED = booleanConf("spark.sql.inMemoryColumnarStorage.compressed",
+    defaultValue = Some(true),
+    doc = "When set to true Spark SQL will automatically select a compression codec for each " +
+      "column based on statistics of the data.")
+
+  val COLUMN_BATCH_SIZE = intConf("spark.sql.inMemoryColumnarStorage.batchSize",
+    defaultValue = Some(10000),
+    doc = "Controls the size of batches for columnar caching.  Larger batch sizes can improve " +
+      "memory utilization and compression, but risk OOMs when caching data.")
+
+  val IN_MEMORY_PARTITION_PRUNING =
+    booleanConf("spark.sql.inMemoryColumnarStorage.partitionPruning",
+      defaultValue = Some(false),
+      doc = "<TODO>")
+
+  val AUTO_BROADCASTJOIN_THRESHOLD = intConf("spark.sql.autoBroadcastJoinThreshold",
+    defaultValue = Some(10 * 1024 * 1024),
+    doc = "Configures the maximum size in bytes for a table that will be broadcast to all worker " +
+      "nodes when performing a join.  By setting this value to -1 broadcasting can be disabled. " +
+      "Note that currently statistics are only supported for Hive Metastore tables where the " +
+      "command<code>ANALYZE TABLE &lt;tableName&gt; COMPUTE STATISTICS noscan</code> has been run.")
+
+  val DEFAULT_SIZE_IN_BYTES = longConf("spark.sql.defaultSizeInBytes", isPublic = false)
+
+  val SHUFFLE_PARTITIONS = intConf("spark.sql.shuffle.partitions",
+    defaultValue = Some(200),
+    doc = "Configures the number of partitions to use when shuffling data for joins or " +
+      "aggregations.")
+
+  val CODEGEN_ENABLED = booleanConf("spark.sql.codegen",
+    defaultValue = Some(true),
+    doc = "When true, code will be dynamically generated at runtime for expression evaluation in" +
+      " a specific query. For some queries with complicated expression this option can lead to " +
+      "significant speed-ups. However, for simple queries this can actually slow down query " +
+      "execution.")
+
+  val UNSAFE_ENABLED = booleanConf("spark.sql.unsafe.enabled",
+    defaultValue = Some(false),
+    doc = "<TDDO>")
+
+  val DIALECT = stringConf("spark.sql.dialect", defaultValue = Some("sql"), doc = "<TODO>")
+
+  val CASE_SENSITIVE = booleanConf("spark.sql.caseSensitive",
+    defaultValue = Some(true),
+    doc = "<TODO>")
+
+  val PARQUET_BINARY_AS_STRING = booleanConf("spark.sql.parquet.binaryAsString",
+    defaultValue = Some(false),
+    doc = "Some other Parquet-producing systems, in particular Impala and older versions of " +
+      "Spark SQL, do not differentiate between binary data and strings when writing out the " +
+      "Parquet schema. This flag tells Spark SQL to interpret binary data as a string to provide " +
+      "compatibility with these systems.")
+
+  val PARQUET_INT96_AS_TIMESTAMP = booleanConf("spark.sql.parquet.int96AsTimestamp",
+    defaultValue = Some(true),
+    doc = "Some Parquet-producing systems, in particular Impala, store Timestamp into INT96. " +
+      "Spark would also store Timestamp as INT96 because we need to avoid precision lost of the " +
+      "nanoseconds field. This flag tells Spark SQL to interpret INT96 data as a timestamp to " +
+      "provide compatibility with these systems.")
+
+  val PARQUET_CACHE_METADATA = booleanConf("spark.sql.parquet.cacheMetadata",
+    defaultValue = Some(true),
+    doc = "Turns on caching of Parquet schema metadata. Can speed up querying of static data.")
+
+  val PARQUET_COMPRESSION = enumConf("spark.sql.parquet.compression.codec",
+    valueConverter = v => v.toLowerCase,
+    validValues = Set("uncompressed", "snappy", "gzip", "lzo"),
+    defaultValue = Some("gzip"),
+    doc = "Sets the compression codec use when writing Parquet files. Acceptable values include: " +
+      "uncompressed, snappy, gzip, lzo.")
+
+  val PARQUET_FILTER_PUSHDOWN_ENABLED = booleanConf("spark.sql.parquet.filterPushdown",
+    defaultValue = Some(false),
+    doc = "Turn on Parquet filter pushdown optimization. This feature is turned off by default" +
+      " because of a known bug in Paruet 1.6.0rc3 " +
+      "(<a href=\"https://issues.apache.org/jira/browse/PARQUET-136\">PARQUET-136</a>). However, " +
+      "if your table doesn't contain any nullable string or binary columns, it's still safe to " +
+      "turn this feature on.")
+
+  val PARQUET_USE_DATA_SOURCE_API = booleanConf("spark.sql.parquet.useDataSourceApi",
+    defaultValue = Some(true),
+    doc = "<TODO>")
+
+  val ORC_FILTER_PUSHDOWN_ENABLED = booleanConf("spark.sql.orc.filterPushdown",
+    defaultValue = Some(false),
+    doc = "<TODO>")
+
+  val HIVE_VERIFY_PARTITIONPATH = booleanConf("spark.sql.hive.verifyPartitionPath",
+    defaultValue = Some(true),
+    doc = "<TODO>")
+
+  val COLUMN_NAME_OF_CORRUPT_RECORD = stringConf("spark.sql.columnNameOfCorruptRecord",
+    defaultValue = Some("_corrupt_record"),
+    doc = "<TODO>")
+
+  val BROADCAST_TIMEOUT = intConf("spark.sql.broadcastTimeout",
+    defaultValue = Some(5 * 60),
+    doc = "<TODO>")
 
   // Options that control which operators can be chosen by the query planner.  These should be
   // considered hints and may be ignored by future versions of Spark SQL.
-  val EXTERNAL_SORT = "spark.sql.planner.externalSort"
-  val SORTMERGE_JOIN = "spark.sql.planner.sortMergeJoin"
+  val EXTERNAL_SORT = booleanConf("spark.sql.planner.externalSort",
+    defaultValue = Some(true),
+    doc = "When true, performs sorts spilling to disk as needed otherwise sort each partition in" +
+      " memory.")
+
+  val SORTMERGE_JOIN = booleanConf("spark.sql.planner.sortMergeJoin",
+    defaultValue = Some(false),
+    doc = "<TODO>")
 
   // This is only used for the thriftserver
-  val THRIFTSERVER_POOL = "spark.sql.thriftserver.scheduler.pool"
-  val THRIFTSERVER_UI_STATEMENT_LIMIT = "spark.sql.thriftserver.ui.retainedStatements"
-  val THRIFTSERVER_UI_SESSION_LIMIT = "spark.sql.thriftserver.ui.retainedSessions"
+  val THRIFTSERVER_POOL = stringConf("spark.sql.thriftserver.scheduler.pool",
+    doc = "Set a Fair Scheduler pool for a JDBC client session")
+
+  val THRIFTSERVER_UI_STATEMENT_LIMIT = intConf("spark.sql.thriftserver.ui.retainedStatements",
+    defaultValue = Some(200),
+    doc = "<TODO>")
+
+  val THRIFTSERVER_UI_SESSION_LIMIT = intConf("spark.sql.thriftserver.ui.retainedSessions",
+    defaultValue = Some(200),
+    doc = "<TODO>")
 
   // This is used to set the default data source
-  val DEFAULT_DATA_SOURCE_NAME = "spark.sql.sources.default"
+  val DEFAULT_DATA_SOURCE_NAME = stringConf("spark.sql.sources.default",
+    defaultValue = Some("org.apache.spark.sql.parquet"),
+    doc = "<TODO>")
+
   // This is used to control the when we will split a schema's JSON string to multiple pieces
   // in order to fit the JSON string in metastore's table property (by default, the value has
   // a length restriction of 4000 characters). We will split the JSON string of a schema
   // to its length exceeds the threshold.
-  val SCHEMA_STRING_LENGTH_THRESHOLD = "spark.sql.sources.schemaStringLengthThreshold"
+  val SCHEMA_STRING_LENGTH_THRESHOLD = intConf("spark.sql.sources.schemaStringLengthThreshold",
+    defaultValue = Some(4000),
+    doc = "<TODO>")
 
   // Whether to perform partition discovery when loading external data sources.  Default to true.
-  val PARTITION_DISCOVERY_ENABLED = "spark.sql.sources.partitionDiscovery.enabled"
+  val PARTITION_DISCOVERY_ENABLED = booleanConf("spark.sql.sources.partitionDiscovery.enabled",
+    defaultValue = Some(true),
+    doc = "<TODO>")
 
   // Whether to perform partition column type inference. Default to true.
-  val PARTITION_COLUMN_TYPE_INFERENCE = "spark.sql.sources.partitionColumnTypeInference.enabled"
+  val PARTITION_COLUMN_TYPE_INFERENCE =
+    booleanConf("spark.sql.sources.partitionColumnTypeInference.enabled",
+      defaultValue = Some(true),
+      doc = "<TODO>")
 
   // The output committer class used by FSBasedRelation. The specified class needs to be a
   // subclass of org.apache.hadoop.mapreduce.OutputCommitter.
   // NOTE: This property should be set in Hadoop `Configuration` rather than Spark `SQLConf`
-  val OUTPUT_COMMITTER_CLASS = "spark.sql.sources.outputCommitterClass"
+  val OUTPUT_COMMITTER_CLASS =
+    stringConf("spark.sql.sources.outputCommitterClass", isPublic = false)
 
   // Whether to perform eager analysis when constructing a dataframe.
   // Set to false when debugging requires the ability to look at invalid query plans.
-  val DATAFRAME_EAGER_ANALYSIS = "spark.sql.eagerAnalysis"
+  val DATAFRAME_EAGER_ANALYSIS = booleanConf("spark.sql.eagerAnalysis",
+    defaultValue = Some(true),
+    doc = "<TODO>")
 
   // Whether to automatically resolve ambiguity in join conditions for self-joins.
   // See SPARK-6231.
-  val DATAFRAME_SELF_JOIN_AUTO_RESOLVE_AMBIGUITY = "spark.sql.selfJoinAutoResolveAmbiguity"
+  val DATAFRAME_SELF_JOIN_AUTO_RESOLVE_AMBIGUITY =
+    booleanConf("spark.sql.selfJoinAutoResolveAmbiguity", defaultValue = Some(true), doc = "<TODO>")
 
   // Whether to retain group by columns or not in GroupedData.agg.
-  val DATAFRAME_RETAIN_GROUP_COLUMNS = "spark.sql.retainGroupColumns"
+  val DATAFRAME_RETAIN_GROUP_COLUMNS = booleanConf("spark.sql.retainGroupColumns",
+    defaultValue = Some(true),
+    doc = "<TODO>")
 
-  val USE_SQL_SERIALIZER2 = "spark.sql.useSerializer2"
+  val USE_SQL_SERIALIZER2 = booleanConf("spark.sql.useSerializer2",
+    defaultValue = Some(true), doc = "<TODO>")
 
-  val USE_JACKSON_STREAMING_API = "spark.sql.json.useJacksonStreamingAPI"
+  val USE_JACKSON_STREAMING_API = booleanConf("spark.sql.json.useJacksonStreamingAPI",
+    defaultValue = Some(true), doc = "<TODO>")
 
   object Deprecated {
     val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks"
@@ -131,56 +390,54 @@ private[sql] class SQLConf extends Serializable with CatalystConf {
    * Note that the choice of dialect does not affect things like what tables are available or
    * how query execution is performed.
    */
-  private[spark] def dialect: String = getConf(DIALECT, "sql")
+  private[spark] def dialect: String = getConf(DIALECT)
 
   /** When true tables cached using the in-memory columnar caching will be compressed. */
-  private[spark] def useCompression: Boolean = getConf(COMPRESS_CACHED, "true").toBoolean
+  private[spark] def useCompression: Boolean = getConf(COMPRESS_CACHED)
 
   /** The compression codec for writing to a Parquetfile */
-  private[spark] def parquetCompressionCodec: String = getConf(PARQUET_COMPRESSION, "gzip")
+  private[spark] def parquetCompressionCodec: String = getConf(PARQUET_COMPRESSION)
+
+  private[spark] def parquetCacheMetadata: Boolean = getConf(PARQUET_CACHE_METADATA)
 
   /** The number of rows that will be  */
-  private[spark] def columnBatchSize: Int = getConf(COLUMN_BATCH_SIZE, "10000").toInt
+  private[spark] def columnBatchSize: Int = getConf(COLUMN_BATCH_SIZE)
 
   /** Number of partitions to use for shuffle operators. */
-  private[spark] def numShufflePartitions: Int = getConf(SHUFFLE_PARTITIONS, "200").toInt
+  private[spark] def numShufflePartitions: Int = getConf(SHUFFLE_PARTITIONS)
 
   /** When true predicates will be passed to the parquet record reader when possible. */
-  private[spark] def parquetFilterPushDown =
-    getConf(PARQUET_FILTER_PUSHDOWN_ENABLED, "false").toBoolean
+  private[spark] def parquetFilterPushDown: Boolean = getConf(PARQUET_FILTER_PUSHDOWN_ENABLED)
 
   /** When true uses Parquet implementation based on data source API */
-  private[spark] def parquetUseDataSourceApi =
-    getConf(PARQUET_USE_DATA_SOURCE_API, "true").toBoolean
+  private[spark] def parquetUseDataSourceApi: Boolean = getConf(PARQUET_USE_DATA_SOURCE_API)
 
-  private[spark] def orcFilterPushDown =
-    getConf(ORC_FILTER_PUSHDOWN_ENABLED, "false").toBoolean
+  private[spark] def orcFilterPushDown: Boolean = getConf(ORC_FILTER_PUSHDOWN_ENABLED)
 
   /** When true uses verifyPartitionPath to prune the path which is not exists. */
-  private[spark] def verifyPartitionPath =
-    getConf(HIVE_VERIFY_PARTITIONPATH, "true").toBoolean
+  private[spark] def verifyPartitionPath: Boolean = getConf(HIVE_VERIFY_PARTITIONPATH)
 
   /** When true the planner will use the external sort, which may spill to disk. */
-  private[spark] def externalSortEnabled: Boolean = getConf(EXTERNAL_SORT, "true").toBoolean
+  private[spark] def externalSortEnabled: Boolean = getConf(EXTERNAL_SORT)
 
   /**
    * Sort merge join would sort the two side of join first, and then iterate both sides together
    * only once to get all matches. Using sort merge join can save a lot of memory usage compared
    * to HashJoin.
    */
-  private[spark] def sortMergeJoinEnabled: Boolean = getConf(SORTMERGE_JOIN, "false").toBoolean
+  private[spark] def sortMergeJoinEnabled: Boolean = getConf(SORTMERGE_JOIN)
 
   /**
    * When set to true, Spark SQL will use the Janino at runtime to generate custom bytecode
    * that evaluates expressions found in queries.  In general this custom code runs much faster
    * than interpreted evaluation, but there are some start-up costs (5-10ms) due to compilation.
    */
-  private[spark] def codegenEnabled: Boolean = getConf(CODEGEN_ENABLED, "true").toBoolean
+  private[spark] def codegenEnabled: Boolean = getConf(CODEGEN_ENABLED)
 
   /**
    * caseSensitive analysis true by default
    */
-  def caseSensitiveAnalysis: Boolean = getConf(SQLConf.CASE_SENSITIVE, "true").toBoolean
+  def caseSensitiveAnalysis: Boolean = getConf(SQLConf.CASE_SENSITIVE)
 
   /**
    * When set to true, Spark SQL will use managed memory for certain operations.  This option only
@@ -188,15 +445,14 @@ private[sql] class SQLConf extends Serializable with CatalystConf {
    *
    * Defaults to false as this feature is currently experimental.
    */
-  private[spark] def unsafeEnabled: Boolean = getConf(UNSAFE_ENABLED, "false").toBoolean
+  private[spark] def unsafeEnabled: Boolean = getConf(UNSAFE_ENABLED)
 
-  private[spark] def useSqlSerializer2: Boolean = getConf(USE_SQL_SERIALIZER2, "true").toBoolean
+  private[spark] def useSqlSerializer2: Boolean = getConf(USE_SQL_SERIALIZER2)
 
   /**
    * Selects between the new (true) and old (false) JSON handlers, to be removed in Spark 1.5.0
    */
-  private[spark] def useJacksonStreamingAPI: Boolean =
-    getConf(USE_JACKSON_STREAMING_API, "true").toBoolean
+  private[spark] def useJacksonStreamingAPI: Boolean = getConf(USE_JACKSON_STREAMING_API)
 
   /**
    * Upper bound on the sizes (in bytes) of the tables qualified for the auto conversion to
@@ -205,8 +461,7 @@ private[sql] class SQLConf extends Serializable with CatalystConf {
    *
    * Hive setting: hive.auto.convert.join.noconditionaltask.size, whose default value is 10000.
    */
-  private[spark] def autoBroadcastJoinThreshold: Int =
-    getConf(AUTO_BROADCASTJOIN_THRESHOLD, (10 * 1024 * 1024).toString).toInt
+  private[spark] def autoBroadcastJoinThreshold: Int = getConf(AUTO_BROADCASTJOIN_THRESHOLD)
 
   /**
    * The default size in bytes to assign to a logical operator's estimation statistics.  By default,
@@ -215,82 +470,116 @@ private[sql] class SQLConf extends Serializable with CatalystConf {
    * in joins.
    */
   private[spark] def defaultSizeInBytes: Long =
-    getConf(DEFAULT_SIZE_IN_BYTES, (autoBroadcastJoinThreshold + 1).toString).toLong
+    getConf(DEFAULT_SIZE_IN_BYTES, autoBroadcastJoinThreshold + 1L)
 
   /**
    * When set to true, we always treat byte arrays in Parquet files as strings.
    */
-  private[spark] def isParquetBinaryAsString: Boolean =
-    getConf(PARQUET_BINARY_AS_STRING, "false").toBoolean
+  private[spark] def isParquetBinaryAsString: Boolean = getConf(PARQUET_BINARY_AS_STRING)
 
   /**
    * When set to true, we always treat INT96Values in Parquet files as timestamp.
    */
-  private[spark] def isParquetINT96AsTimestamp: Boolean =
-    getConf(PARQUET_INT96_AS_TIMESTAMP, "true").toBoolean
+  private[spark] def isParquetINT96AsTimestamp: Boolean = getConf(PARQUET_INT96_AS_TIMESTAMP)
 
   /**
    * When set to true, partition pruning for in-memory columnar tables is enabled.
    */
-  private[spark] def inMemoryPartitionPruning: Boolean =
-    getConf(IN_MEMORY_PARTITION_PRUNING, "false").toBoolean
+  private[spark] def inMemoryPartitionPruning: Boolean = getConf(IN_MEMORY_PARTITION_PRUNING)
 
-  private[spark] def columnNameOfCorruptRecord: String =
-    getConf(COLUMN_NAME_OF_CORRUPT_RECORD, "_corrupt_record")
+  private[spark] def columnNameOfCorruptRecord: String = getConf(COLUMN_NAME_OF_CORRUPT_RECORD)
 
   /**
    * Timeout in seconds for the broadcast wait time in hash join
    */
-  private[spark] def broadcastTimeout: Int =
-    getConf(BROADCAST_TIMEOUT, (5 * 60).toString).toInt
+  private[spark] def broadcastTimeout: Int = getConf(BROADCAST_TIMEOUT)
 
-  private[spark] def defaultDataSourceName: String =
-    getConf(DEFAULT_DATA_SOURCE_NAME, "org.apache.spark.sql.parquet")
+  private[spark] def defaultDataSourceName: String = getConf(DEFAULT_DATA_SOURCE_NAME)
 
-  private[spark] def partitionDiscoveryEnabled() =
-    getConf(SQLConf.PARTITION_DISCOVERY_ENABLED, "true").toBoolean
+  private[spark] def partitionDiscoveryEnabled(): Boolean =
+    getConf(SQLConf.PARTITION_DISCOVERY_ENABLED)
 
-  private[spark] def partitionColumnTypeInferenceEnabled() =
-    getConf(SQLConf.PARTITION_COLUMN_TYPE_INFERENCE, "true").toBoolean
+  private[spark] def partitionColumnTypeInferenceEnabled(): Boolean =
+    getConf(SQLConf.PARTITION_COLUMN_TYPE_INFERENCE)
 
   // Do not use a value larger than 4000 as the default value of this property.
   // See the comments of SCHEMA_STRING_LENGTH_THRESHOLD above for more information.
-  private[spark] def schemaStringLengthThreshold: Int =
-    getConf(SCHEMA_STRING_LENGTH_THRESHOLD, "4000").toInt
+  private[spark] def schemaStringLengthThreshold: Int = getConf(SCHEMA_STRING_LENGTH_THRESHOLD)
 
-  private[spark] def dataFrameEagerAnalysis: Boolean =
-    getConf(DATAFRAME_EAGER_ANALYSIS, "true").toBoolean
+  private[spark] def dataFrameEagerAnalysis: Boolean = getConf(DATAFRAME_EAGER_ANALYSIS)
 
   private[spark] def dataFrameSelfJoinAutoResolveAmbiguity: Boolean =
-    getConf(DATAFRAME_SELF_JOIN_AUTO_RESOLVE_AMBIGUITY, "true").toBoolean
+    getConf(DATAFRAME_SELF_JOIN_AUTO_RESOLVE_AMBIGUITY)
 
-  private[spark] def dataFrameRetainGroupColumns: Boolean =
-    getConf(DATAFRAME_RETAIN_GROUP_COLUMNS, "true").toBoolean
+  private[spark] def dataFrameRetainGroupColumns: Boolean = getConf(DATAFRAME_RETAIN_GROUP_COLUMNS)
 
   /** ********************** SQLConf functionality methods ************ */
 
   /** Set Spark SQL configuration properties. */
   def setConf(props: Properties): Unit = settings.synchronized {
-    props.foreach { case (k, v) => settings.put(k, v) }
+    props.foreach { case (k, v) => setConfString(k, v) }
   }
 
-  /** Set the given Spark SQL configuration property. */
-  def setConf(key: String, value: String): Unit = {
+  /** Set the given Spark SQL configuration property using a `string` value. */
+  def setConfString(key: String, value: String): Unit = {
     require(key != null, "key cannot be null")
     require(value != null, s"value cannot be null for key: $key")
+    val entry = sqlConfEntries.get(key)
+    if (entry != null) {
+      // Only verify configs in the SQLConf object
+      entry.valueConverter(value)
+    }
     settings.put(key, value)
   }
 
+  /** Set the given Spark SQL configuration property. */
+  def setConf[T](entry: SQLConfEntry[T], value: T): Unit = {
+    require(entry != null, "entry cannot be null")
+    require(value != null, s"value cannot be null for key: ${entry.key}")
+    require(sqlConfEntries.get(entry.key) == entry, s"$entry is not registered")
+    settings.put(entry.key, entry.stringConverter(value))
+  }
+
   /** Return the value of Spark SQL configuration property for the given key. */
-  def getConf(key: String): String = {
-    Option(settings.get(key)).getOrElse(throw new NoSuchElementException(key))
+  def getConfString(key: String): String = {
+    Option(settings.get(key)).
+      orElse {
+        // Try to use the default value
+        Option(sqlConfEntries.get(key)).map(_.defaultValueString)
+      }.
+      getOrElse(throw new NoSuchElementException(key))
+  }
+
+  /**
+   * Return the value of Spark SQL configuration property for the given key. If the key is not set
+   * yet, return `defaultValue`. This is useful when `defaultValue` in SQLConfEntry is not the
+   * desired one.
+   */
+  def getConf[T](entry: SQLConfEntry[T], defaultValue: T): T = {
+    require(sqlConfEntries.get(entry.key) == entry, s"$entry is not registered")
+    Option(settings.get(entry.key)).map(entry.valueConverter).getOrElse(defaultValue)
   }
 
   /**
    * Return the value of Spark SQL configuration property for the given key. If the key is not set
-   * yet, return `defaultValue`.
+   * yet, return `defaultValue` in [[SQLConfEntry]].
+   */
+  def getConf[T](entry: SQLConfEntry[T]): T = {
+    require(sqlConfEntries.get(entry.key) == entry, s"$entry is not registered")
+    Option(settings.get(entry.key)).map(entry.valueConverter).orElse(entry.defaultValue).
+      getOrElse(throw new NoSuchElementException(entry.key))
+  }
+
+  /**
+   * Return the `string` value of Spark SQL configuration property for the given key. If the key is
+   * not set yet, return `defaultValue`.
    */
-  def getConf(key: String, defaultValue: String): String = {
+  def getConfString(key: String, defaultValue: String): String = {
+    val entry = sqlConfEntries.get(key)
+    if (entry != null && defaultValue != "<undefined>") {
+      // Only verify configs in the SQLConf object
+      entry.valueConverter(defaultValue)
+    }
     Option(settings.get(key)).getOrElse(defaultValue)
   }
 
@@ -300,11 +589,25 @@ private[sql] class SQLConf extends Serializable with CatalystConf {
    */
   def getAllConfs: immutable.Map[String, String] = settings.synchronized { settings.toMap }
 
-  private[spark] def unsetConf(key: String) {
+  /**
+   * Return all the configuration definitions that have been defined in [[SQLConf]]. Each
+   * definition contains key, defaultValue and doc.
+   */
+  def getAllDefinedConfs: Seq[(String, String, String)] = sqlConfEntries.synchronized {
+    sqlConfEntries.values.filter(_.isPublic).map { entry =>
+      (entry.key, entry.defaultValueString, entry.doc)
+    }.toSeq
+  }
+
+  private[spark] def unsetConf(key: String): Unit = {
     settings -= key
   }
 
-  private[spark] def clear() {
+  private[spark] def unsetConf(entry: SQLConfEntry[_]): Unit = {
+    settings -= entry.key
+  }
+
+  private[spark] def clear(): Unit = {
     settings.clear()
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/78a430ea/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 6b605f7..04fc798 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -31,6 +31,7 @@ import org.apache.spark.SparkContext
 import org.apache.spark.annotation.{DeveloperApi, Experimental}
 import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
 import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.SQLConf.SQLConfEntry
 import org.apache.spark.sql.catalyst.analysis._
 import org.apache.spark.sql.catalyst.errors.DialectException
 import org.apache.spark.sql.catalyst.expressions._
@@ -79,13 +80,16 @@ class SQLContext(@transient val sparkContext: SparkContext)
    */
   def setConf(props: Properties): Unit = conf.setConf(props)
 
+  /** Set the given Spark SQL configuration property. */
+  private[sql] def setConf[T](entry: SQLConfEntry[T], value: T): Unit = conf.setConf(entry, value)
+
   /**
    * Set the given Spark SQL configuration property.
    *
    * @group config
    * @since 1.0.0
    */
-  def setConf(key: String, value: String): Unit = conf.setConf(key, value)
+  def setConf(key: String, value: String): Unit = conf.setConfString(key, value)
 
   /**
    * Return the value of Spark SQL configuration property for the given key.
@@ -93,7 +97,22 @@ class SQLContext(@transient val sparkContext: SparkContext)
    * @group config
    * @since 1.0.0
    */
-  def getConf(key: String): String = conf.getConf(key)
+  def getConf(key: String): String = conf.getConfString(key)
+
+  /**
+   * Return the value of Spark SQL configuration property for the given key. If the key is not set
+   * yet, return `defaultValue` in [[SQLConfEntry]].
+   */
+  private[sql] def getConf[T](entry: SQLConfEntry[T]): T = conf.getConf(entry)
+
+  /**
+   * Return the value of Spark SQL configuration property for the given key. If the key is not set
+   * yet, return `defaultValue`. This is useful when `defaultValue` in SQLConfEntry is not the
+   * desired one.
+   */
+  private[sql] def getConf[T](entry: SQLConfEntry[T], defaultValue: T): T = {
+    conf.getConf(entry, defaultValue)
+  }
 
   /**
    * Return the value of Spark SQL configuration property for the given key. If the key is not set
@@ -102,7 +121,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
    * @group config
    * @since 1.0.0
    */
-  def getConf(key: String, defaultValue: String): String = conf.getConf(key, defaultValue)
+  def getConf(key: String, defaultValue: String): String = conf.getConfString(key, defaultValue)
 
   /**
    * Return all the configuration properties that have been set (i.e. not the default).

http://git-wip-us.apache.org/repos/asf/spark/blob/78a430ea/sql/core/src/main/scala/org/apache/spark/sql/SparkSQLParser.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSQLParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSQLParser.scala
index 305b306..e59fa6e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSQLParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSQLParser.scala
@@ -44,8 +44,8 @@ private[sql] class SparkSQLParser(fallback: String => LogicalPlan) extends Abstr
 
     private val pair: Parser[LogicalPlan] =
       (key ~ ("=".r ~> value).?).? ^^ {
-        case None => SetCommand(None, output)
-        case Some(k ~ v) => SetCommand(Some(k.trim -> v.map(_.trim)), output)
+        case None => SetCommand(None)
+        case Some(k ~ v) => SetCommand(Some(k.trim -> v.map(_.trim)))
       }
 
     def apply(input: String): LogicalPlan = parseAll(pair, input) match {

http://git-wip-us.apache.org/repos/asf/spark/blob/78a430ea/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
index c9dfcea..5e9951f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.execution
 
+import java.util.NoSuchElementException
+
 import org.apache.spark.Logging
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.rdd.RDD
@@ -75,48 +77,92 @@ private[sql] case class ExecutedCommand(cmd: RunnableCommand) extends SparkPlan
  * :: DeveloperApi ::
  */
 @DeveloperApi
-case class SetCommand(
-    kv: Option[(String, Option[String])],
-    override val output: Seq[Attribute])
-  extends RunnableCommand with Logging {
+case class SetCommand(kv: Option[(String, Option[String])]) extends RunnableCommand with Logging {
+
+  private def keyValueOutput: Seq[Attribute] = {
+    val schema = StructType(
+      StructField("key", StringType, false) ::
+        StructField("value", StringType, false) :: Nil)
+    schema.toAttributes
+  }
 
-  override def run(sqlContext: SQLContext): Seq[Row] = kv match {
+  private val (_output, runFunc): (Seq[Attribute], SQLContext => Seq[Row]) = kv match {
     // Configures the deprecated "mapred.reduce.tasks" property.
     case Some((SQLConf.Deprecated.MAPRED_REDUCE_TASKS, Some(value))) =>
-      logWarning(
-        s"Property ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS} is deprecated, " +
-          s"automatically converted to ${SQLConf.SHUFFLE_PARTITIONS} instead.")
-      if (value.toInt < 1) {
-        val msg = s"Setting negative ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS} for automatically " +
-          "determining the number of reducers is not supported."
-        throw new IllegalArgumentException(msg)
-      } else {
-        sqlContext.setConf(SQLConf.SHUFFLE_PARTITIONS, value)
-        Seq(Row(s"${SQLConf.SHUFFLE_PARTITIONS}=$value"))
+      val runFunc = (sqlContext: SQLContext) => {
+        logWarning(
+          s"Property ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS} is deprecated, " +
+            s"automatically converted to ${SQLConf.SHUFFLE_PARTITIONS.key} instead.")
+        if (value.toInt < 1) {
+          val msg =
+            s"Setting negative ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS} for automatically " +
+              "determining the number of reducers is not supported."
+          throw new IllegalArgumentException(msg)
+        } else {
+          sqlContext.setConf(SQLConf.SHUFFLE_PARTITIONS.key, value)
+          Seq(Row(SQLConf.SHUFFLE_PARTITIONS.key, value))
+        }
       }
+      (keyValueOutput, runFunc)
 
     // Configures a single property.
     case Some((key, Some(value))) =>
-      sqlContext.setConf(key, value)
-      Seq(Row(s"$key=$value"))
+      val runFunc = (sqlContext: SQLContext) => {
+        sqlContext.setConf(key, value)
+        Seq(Row(key, value))
+      }
+      (keyValueOutput, runFunc)
 
-    // Queries all key-value pairs that are set in the SQLConf of the sqlContext.
-    // Notice that different from Hive, here "SET -v" is an alias of "SET".
     // (In Hive, "SET" returns all changed properties while "SET -v" returns all properties.)
-    case Some(("-v", None)) | None =>
-      sqlContext.getAllConfs.map { case (k, v) => Row(s"$k=$v") }.toSeq
+    // Queries all key-value pairs that are set in the SQLConf of the sqlContext.
+    case None =>
+      val runFunc = (sqlContext: SQLContext) => {
+        sqlContext.getAllConfs.map { case (k, v) => Row(k, v) }.toSeq
+      }
+      (keyValueOutput, runFunc)
+
+    // Queries all properties along with their default values and docs that are defined in the
+    // SQLConf of the sqlContext.
+    case Some(("-v", None)) =>
+      val runFunc = (sqlContext: SQLContext) => {
+        sqlContext.conf.getAllDefinedConfs.map { case (key, defaultValue, doc) =>
+          Row(key, defaultValue, doc)
+        }
+      }
+      val schema = StructType(
+        StructField("key", StringType, false) ::
+          StructField("default", StringType, false) ::
+          StructField("meaning", StringType, false) :: Nil)
+      (schema.toAttributes, runFunc)
 
     // Queries the deprecated "mapred.reduce.tasks" property.
     case Some((SQLConf.Deprecated.MAPRED_REDUCE_TASKS, None)) =>
-      logWarning(
-        s"Property ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS} is deprecated, " +
-          s"showing ${SQLConf.SHUFFLE_PARTITIONS} instead.")
-      Seq(Row(s"${SQLConf.SHUFFLE_PARTITIONS}=${sqlContext.conf.numShufflePartitions}"))
+      val runFunc = (sqlContext: SQLContext) => {
+        logWarning(
+          s"Property ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS} is deprecated, " +
+            s"showing ${SQLConf.SHUFFLE_PARTITIONS.key} instead.")
+        Seq(Row(SQLConf.SHUFFLE_PARTITIONS.key, sqlContext.conf.numShufflePartitions.toString))
+      }
+      (keyValueOutput, runFunc)
 
     // Queries a single property.
     case Some((key, None)) =>
-      Seq(Row(s"$key=${sqlContext.getConf(key, "<undefined>")}"))
+      val runFunc = (sqlContext: SQLContext) => {
+        val value =
+          try {
+            sqlContext.getConf(key)
+          } catch {
+            case _: NoSuchElementException => "<undefined>"
+          }
+        Seq(Row(key, value))
+      }
+      (keyValueOutput, runFunc)
   }
+
+  override val output: Seq[Attribute] = _output
+
+  override def run(sqlContext: SQLContext): Seq[Row] = runFunc(sqlContext)
+
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/78a430ea/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
index 3ee4033..2964eda 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
@@ -48,7 +48,7 @@ package object debug {
    */
   implicit class DebugSQLContext(sqlContext: SQLContext) {
     def debug(): Unit = {
-      sqlContext.setConf(SQLConf.DATAFRAME_EAGER_ANALYSIS, "false")
+      sqlContext.setConf(SQLConf.DATAFRAME_EAGER_ANALYSIS, false)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/78a430ea/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
index 39360e1..65ecad9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
@@ -113,12 +113,12 @@ private[sql] case class ParquetTableScan(
       .foreach(ParquetInputFormat.setFilterPredicate(conf, _))
 
     // Tell FilteringParquetRowInputFormat whether it's okay to cache Parquet and FS metadata
-    conf.set(
-      SQLConf.PARQUET_CACHE_METADATA,
-      sqlContext.getConf(SQLConf.PARQUET_CACHE_METADATA, "true"))
+    conf.setBoolean(
+      SQLConf.PARQUET_CACHE_METADATA.key,
+      sqlContext.getConf(SQLConf.PARQUET_CACHE_METADATA, true))
 
     // Use task side metadata in parquet
-    conf.setBoolean(ParquetInputFormat.TASK_SIDE_METADATA, true);
+    conf.setBoolean(ParquetInputFormat.TASK_SIDE_METADATA, true)
 
     val baseRDD =
       new org.apache.spark.rdd.NewHadoopRDD(

http://git-wip-us.apache.org/repos/asf/spark/blob/78a430ea/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
index bba6f1e..4c702c3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
@@ -220,7 +220,7 @@ private[sql] class ParquetRelation2(
     }
 
     conf.setClass(
-      SQLConf.OUTPUT_COMMITTER_CLASS,
+      SQLConf.OUTPUT_COMMITTER_CLASS.key,
       committerClass,
       classOf[ParquetOutputCommitter])
 
@@ -259,7 +259,7 @@ private[sql] class ParquetRelation2(
       filters: Array[Filter],
       inputFiles: Array[FileStatus],
       broadcastedConf: Broadcast[SerializableWritable[Configuration]]): RDD[Row] = {
-    val useMetadataCache = sqlContext.getConf(SQLConf.PARQUET_CACHE_METADATA, "true").toBoolean
+    val useMetadataCache = sqlContext.getConf(SQLConf.PARQUET_CACHE_METADATA)
     val parquetFilterPushDown = sqlContext.conf.parquetFilterPushDown
     // Create the function to set variable Parquet confs at both driver and executor side.
     val initLocalJobFuncOpt =
@@ -498,7 +498,7 @@ private[sql] object ParquetRelation2 extends Logging {
       ParquetTypesConverter.convertToString(dataSchema.toAttributes))
 
     // Tell FilteringParquetRowInputFormat whether it's okay to cache Parquet and FS metadata
-    conf.set(SQLConf.PARQUET_CACHE_METADATA, useMetadataCache.toString)
+    conf.setBoolean(SQLConf.PARQUET_CACHE_METADATA.key, useMetadataCache)
   }
 
   /** This closure sets input paths at the driver side. */

http://git-wip-us.apache.org/repos/asf/spark/blob/78a430ea/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
index 3dbe6fa..d39a20b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
@@ -323,7 +323,7 @@ private[sql] abstract class BaseWriterContainer(
 
   private def newOutputCommitter(context: TaskAttemptContext): OutputCommitter = {
     val committerClass = context.getConfiguration.getClass(
-      SQLConf.OUTPUT_COMMITTER_CLASS, null, classOf[OutputCommitter])
+      SQLConf.OUTPUT_COMMITTER_CLASS.key, null, classOf[OutputCommitter])
 
     Option(committerClass).map { clazz =>
       logInfo(s"Using user defined output committer class ${clazz.getCanonicalName}")

http://git-wip-us.apache.org/repos/asf/spark/blob/78a430ea/sql/core/src/main/scala/org/apache/spark/sql/test/TestSQLContext.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/test/TestSQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/test/TestSQLContext.scala
index 356a610..9fa3945 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/test/TestSQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/test/TestSQLContext.scala
@@ -38,7 +38,7 @@ class LocalSQLContext
   protected[sql] class SQLSession extends super.SQLSession {
     protected[sql] override lazy val conf: SQLConf = new SQLConf {
       /** Fewer partitions to speed up testing. */
-      override def numShufflePartitions: Int = this.getConf(SQLConf.SHUFFLE_PARTITIONS, "5").toInt
+      override def numShufflePartitions: Int = this.getConf(SQLConf.SHUFFLE_PARTITIONS, 5)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/78a430ea/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
index 790b405..b26d3ab 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
@@ -68,12 +68,12 @@ class DataFrameAggregateSuite extends QueryTest {
       Seq(Row(1, 3), Row(2, 3), Row(3, 3))
     )
 
-    ctx.conf.setConf("spark.sql.retainGroupColumns", "false")
+    ctx.conf.setConf(SQLConf.DATAFRAME_RETAIN_GROUP_COLUMNS, false)
     checkAnswer(
       testData2.groupBy("a").agg(sum($"b")),
       Seq(Row(3), Row(3), Row(3))
     )
-    ctx.conf.setConf("spark.sql.retainGroupColumns", "true")
+    ctx.conf.setConf(SQLConf.DATAFRAME_RETAIN_GROUP_COLUMNS, true)
   }
 
   test("agg without groups") {

http://git-wip-us.apache.org/repos/asf/spark/blob/78a430ea/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index fa98e23..ba1d020 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -33,7 +33,7 @@ class DataFrameSuite extends QueryTest {
   test("analysis error should be eagerly reported") {
     val oldSetting = ctx.conf.dataFrameEagerAnalysis
     // Eager analysis.
-    ctx.setConf(SQLConf.DATAFRAME_EAGER_ANALYSIS, "true")
+    ctx.setConf(SQLConf.DATAFRAME_EAGER_ANALYSIS, true)
 
     intercept[Exception] { testData.select('nonExistentName) }
     intercept[Exception] {
@@ -47,11 +47,11 @@ class DataFrameSuite extends QueryTest {
     }
 
     // No more eager analysis once the flag is turned off
-    ctx.setConf(SQLConf.DATAFRAME_EAGER_ANALYSIS, "false")
+    ctx.setConf(SQLConf.DATAFRAME_EAGER_ANALYSIS, false)
     testData.select('nonExistentName)
 
     // Set the flag back to original value before this test.
-    ctx.setConf(SQLConf.DATAFRAME_EAGER_ANALYSIS, oldSetting.toString)
+    ctx.setConf(SQLConf.DATAFRAME_EAGER_ANALYSIS, oldSetting)
   }
 
   test("dataframe toString") {
@@ -70,7 +70,7 @@ class DataFrameSuite extends QueryTest {
 
   test("invalid plan toString, debug mode") {
     val oldSetting = ctx.conf.dataFrameEagerAnalysis
-    ctx.setConf(SQLConf.DATAFRAME_EAGER_ANALYSIS, "true")
+    ctx.setConf(SQLConf.DATAFRAME_EAGER_ANALYSIS, true)
 
     // Turn on debug mode so we can see invalid query plans.
     import org.apache.spark.sql.execution.debug._
@@ -83,7 +83,7 @@ class DataFrameSuite extends QueryTest {
         badPlan.toString)
 
     // Set the flag back to original value before this test.
-    ctx.setConf(SQLConf.DATAFRAME_EAGER_ANALYSIS, oldSetting.toString)
+    ctx.setConf(SQLConf.DATAFRAME_EAGER_ANALYSIS, oldSetting)
   }
 
   test("access complex data") {
@@ -556,13 +556,13 @@ class DataFrameSuite extends QueryTest {
 
   test("SPARK-6899") {
     val originalValue = ctx.conf.codegenEnabled
-    ctx.setConf(SQLConf.CODEGEN_ENABLED, "true")
+    ctx.setConf(SQLConf.CODEGEN_ENABLED, true)
     try{
       checkAnswer(
         decimalData.agg(avg('a)),
         Row(new java.math.BigDecimal(2.0)))
     } finally {
-      ctx.setConf(SQLConf.CODEGEN_ENABLED, originalValue.toString)
+      ctx.setConf(SQLConf.CODEGEN_ENABLED, originalValue)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/78a430ea/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
index ffd26c4..20390a5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
@@ -95,14 +95,14 @@ class JoinSuite extends QueryTest with BeforeAndAfterEach {
         classOf[BroadcastNestedLoopJoin])
     ).foreach { case (query, joinClass) => assertJoin(query, joinClass) }
     try {
-      ctx.conf.setConf("spark.sql.planner.sortMergeJoin", "true")
+      ctx.conf.setConf(SQLConf.SORTMERGE_JOIN, true)
       Seq(
         ("SELECT * FROM testData JOIN testData2 ON key = a", classOf[SortMergeJoin]),
         ("SELECT * FROM testData JOIN testData2 ON key = a and key = 2", classOf[SortMergeJoin]),
         ("SELECT * FROM testData JOIN testData2 ON key = a where key = 2", classOf[SortMergeJoin])
       ).foreach { case (query, joinClass) => assertJoin(query, joinClass) }
     } finally {
-      ctx.conf.setConf("spark.sql.planner.sortMergeJoin", SORTMERGEJOIN_ENABLED.toString)
+      ctx.conf.setConf(SQLConf.SORTMERGE_JOIN, SORTMERGEJOIN_ENABLED)
     }
   }
 
@@ -118,7 +118,7 @@ class JoinSuite extends QueryTest with BeforeAndAfterEach {
         classOf[BroadcastHashJoin])
     ).foreach { case (query, joinClass) => assertJoin(query, joinClass) }
     try {
-      ctx.conf.setConf("spark.sql.planner.sortMergeJoin", "true")
+      ctx.conf.setConf(SQLConf.SORTMERGE_JOIN, true)
       Seq(
         ("SELECT * FROM testData join testData2 ON key = a", classOf[BroadcastHashJoin]),
         ("SELECT * FROM testData join testData2 ON key = a and key = 2",
@@ -127,7 +127,7 @@ class JoinSuite extends QueryTest with BeforeAndAfterEach {
           classOf[BroadcastHashJoin])
       ).foreach { case (query, joinClass) => assertJoin(query, joinClass) }
     } finally {
-      ctx.conf.setConf("spark.sql.planner.sortMergeJoin", SORTMERGEJOIN_ENABLED.toString)
+      ctx.conf.setConf(SQLConf.SORTMERGE_JOIN, SORTMERGEJOIN_ENABLED)
     }
 
     ctx.sql("UNCACHE TABLE testData")
@@ -416,7 +416,7 @@ class JoinSuite extends QueryTest with BeforeAndAfterEach {
     ctx.sql("CACHE TABLE testData")
     val tmp = ctx.conf.autoBroadcastJoinThreshold
 
-    ctx.sql(s"SET ${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD}=1000000000")
+    ctx.sql(s"SET ${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key}=1000000000")
     Seq(
       ("SELECT * FROM testData LEFT SEMI JOIN testData2 ON key = a",
         classOf[BroadcastLeftSemiJoinHash])
@@ -424,7 +424,7 @@ class JoinSuite extends QueryTest with BeforeAndAfterEach {
       case (query, joinClass) => assertJoin(query, joinClass)
     }
 
-    ctx.sql(s"SET ${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD}=-1")
+    ctx.sql(s"SET ${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key}=-1")
 
     Seq(
       ("SELECT * FROM testData LEFT SEMI JOIN testData2 ON key = a", classOf[LeftSemiJoinHash])
@@ -432,7 +432,7 @@ class JoinSuite extends QueryTest with BeforeAndAfterEach {
       case (query, joinClass) => assertJoin(query, joinClass)
     }
 
-    ctx.setConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD, tmp.toString)
+    ctx.setConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD, tmp)
     ctx.sql("UNCACHE TABLE testData")
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/78a430ea/sql/core/src/test/scala/org/apache/spark/sql/SQLConfEntrySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLConfEntrySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLConfEntrySuite.scala
new file mode 100644
index 0000000..2e33777
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLConfEntrySuite.scala
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.SQLConf._
+
+class SQLConfEntrySuite extends SparkFunSuite {
+
+  val conf = new SQLConf
+
+  test("intConf") {
+    val key = "spark.sql.SQLConfEntrySuite.int"
+    val confEntry = SQLConfEntry.intConf(key)
+    assert(conf.getConf(confEntry, 5) === 5)
+
+    conf.setConf(confEntry, 10)
+    assert(conf.getConf(confEntry, 5) === 10)
+
+    conf.setConfString(key, "20")
+    assert(conf.getConfString(key, "5") === "20")
+    assert(conf.getConfString(key) === "20")
+    assert(conf.getConf(confEntry, 5) === 20)
+
+    val e = intercept[IllegalArgumentException] {
+      conf.setConfString(key, "abc")
+    }
+    assert(e.getMessage === s"$key should be int, but was abc")
+  }
+
+  test("longConf") {
+    val key = "spark.sql.SQLConfEntrySuite.long"
+    val confEntry = SQLConfEntry.longConf(key)
+    assert(conf.getConf(confEntry, 5L) === 5L)
+
+    conf.setConf(confEntry, 10L)
+    assert(conf.getConf(confEntry, 5L) === 10L)
+
+    conf.setConfString(key, "20")
+    assert(conf.getConfString(key, "5") === "20")
+    assert(conf.getConfString(key) === "20")
+    assert(conf.getConf(confEntry, 5L) === 20L)
+
+    val e = intercept[IllegalArgumentException] {
+      conf.setConfString(key, "abc")
+    }
+    assert(e.getMessage === s"$key should be long, but was abc")
+  }
+
+  test("booleanConf") {
+    val key = "spark.sql.SQLConfEntrySuite.boolean"
+    val confEntry = SQLConfEntry.booleanConf(key)
+    assert(conf.getConf(confEntry, false) === false)
+
+    conf.setConf(confEntry, true)
+    assert(conf.getConf(confEntry, false) === true)
+
+    conf.setConfString(key, "true")
+    assert(conf.getConfString(key, "false") === "true")
+    assert(conf.getConfString(key) === "true")
+    assert(conf.getConf(confEntry, false) === true)
+
+    val e = intercept[IllegalArgumentException] {
+      conf.setConfString(key, "abc")
+    }
+    assert(e.getMessage === s"$key should be boolean, but was abc")
+  }
+
+  test("doubleConf") {
+    val key = "spark.sql.SQLConfEntrySuite.double"
+    val confEntry = SQLConfEntry.doubleConf(key)
+    assert(conf.getConf(confEntry, 5.0) === 5.0)
+
+    conf.setConf(confEntry, 10.0)
+    assert(conf.getConf(confEntry, 5.0) === 10.0)
+
+    conf.setConfString(key, "20.0")
+    assert(conf.getConfString(key, "5.0") === "20.0")
+    assert(conf.getConfString(key) === "20.0")
+    assert(conf.getConf(confEntry, 5.0) === 20.0)
+
+    val e = intercept[IllegalArgumentException] {
+      conf.setConfString(key, "abc")
+    }
+    assert(e.getMessage === s"$key should be double, but was abc")
+  }
+
+  test("stringConf") {
+    val key = "spark.sql.SQLConfEntrySuite.string"
+    val confEntry = SQLConfEntry.stringConf(key)
+    assert(conf.getConf(confEntry, "abc") === "abc")
+
+    conf.setConf(confEntry, "abcd")
+    assert(conf.getConf(confEntry, "abc") === "abcd")
+
+    conf.setConfString(key, "abcde")
+    assert(conf.getConfString(key, "abc") === "abcde")
+    assert(conf.getConfString(key) === "abcde")
+    assert(conf.getConf(confEntry, "abc") === "abcde")
+  }
+
+  test("enumConf") {
+    val key = "spark.sql.SQLConfEntrySuite.enum"
+    val confEntry = SQLConfEntry.enumConf(key, v => v, Set("a", "b", "c"), defaultValue = Some("a"))
+    assert(conf.getConf(confEntry) === "a")
+
+    conf.setConf(confEntry, "b")
+    assert(conf.getConf(confEntry) === "b")
+
+    conf.setConfString(key, "c")
+    assert(conf.getConfString(key, "a") === "c")
+    assert(conf.getConfString(key) === "c")
+    assert(conf.getConf(confEntry) === "c")
+
+    val e = intercept[IllegalArgumentException] {
+      conf.setConfString(key, "d")
+    }
+    assert(e.getMessage === s"The value of $key should be one of a, b, c, but was d")
+  }
+
+  test("stringSeqConf") {
+    val key = "spark.sql.SQLConfEntrySuite.stringSeq"
+    val confEntry = SQLConfEntry.stringSeqConf("spark.sql.SQLConfEntrySuite.stringSeq",
+      defaultValue = Some(Nil))
+    assert(conf.getConf(confEntry, Seq("a", "b", "c")) === Seq("a", "b", "c"))
+
+    conf.setConf(confEntry, Seq("a", "b", "c", "d"))
+    assert(conf.getConf(confEntry, Seq("a", "b", "c")) === Seq("a", "b", "c", "d"))
+
+    conf.setConfString(key, "a,b,c,d,e")
+    assert(conf.getConfString(key, "a,b,c") === "a,b,c,d,e")
+    assert(conf.getConfString(key) === "a,b,c,d,e")
+    assert(conf.getConf(confEntry, Seq("a", "b", "c")) === Seq("a", "b", "c", "d", "e"))
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/78a430ea/sql/core/src/test/scala/org/apache/spark/sql/SQLConfSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLConfSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLConfSuite.scala
index 76d0dd1..75791e9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLConfSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLConfSuite.scala
@@ -75,6 +75,14 @@ class SQLConfSuite extends QueryTest {
   test("deprecated property") {
     ctx.conf.clear()
     ctx.sql(s"set ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS}=10")
-    assert(ctx.getConf(SQLConf.SHUFFLE_PARTITIONS) === "10")
+    assert(ctx.conf.numShufflePartitions === 10)
+  }
+
+  test("invalid conf value") {
+    ctx.conf.clear()
+    val e = intercept[IllegalArgumentException] {
+      ctx.sql(s"set ${SQLConf.CASE_SENSITIVE.key}=10")
+    }
+    assert(e.getMessage === s"${SQLConf.CASE_SENSITIVE.key} should be boolean, but was 10")
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/78a430ea/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 30db840..82f3fdb 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -190,7 +190,7 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll with SQLTestUtils {
 
   test("aggregation with codegen") {
     val originalValue = sqlContext.conf.codegenEnabled
-    sqlContext.setConf(SQLConf.CODEGEN_ENABLED, "true")
+    sqlContext.setConf(SQLConf.CODEGEN_ENABLED, true)
     // Prepare a table that we can group some rows.
     sqlContext.table("testData")
       .unionAll(sqlContext.table("testData"))
@@ -287,7 +287,7 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll with SQLTestUtils {
         Row(0, null, 0) :: Nil)
     } finally {
       sqlContext.dropTempTable("testData3x")
-      sqlContext.setConf(SQLConf.CODEGEN_ENABLED, originalValue.toString)
+      sqlContext.setConf(SQLConf.CODEGEN_ENABLED, originalValue)
     }
   }
 
@@ -480,41 +480,41 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll with SQLTestUtils {
 
   test("sorting") {
     val before = sqlContext.conf.externalSortEnabled
-    sqlContext.setConf(SQLConf.EXTERNAL_SORT, "false")
+    sqlContext.setConf(SQLConf.EXTERNAL_SORT, false)
     sortTest()
-    sqlContext.setConf(SQLConf.EXTERNAL_SORT, before.toString)
+    sqlContext.setConf(SQLConf.EXTERNAL_SORT, before)
   }
 
   test("external sorting") {
     val before = sqlContext.conf.externalSortEnabled
-    sqlContext.setConf(SQLConf.EXTERNAL_SORT, "true")
+    sqlContext.setConf(SQLConf.EXTERNAL_SORT, true)
     sortTest()
-    sqlContext.setConf(SQLConf.EXTERNAL_SORT, before.toString)
+    sqlContext.setConf(SQLConf.EXTERNAL_SORT, before)
   }
 
   test("SPARK-6927 sorting with codegen on") {
     val externalbefore = sqlContext.conf.externalSortEnabled
     val codegenbefore = sqlContext.conf.codegenEnabled
-    sqlContext.setConf(SQLConf.EXTERNAL_SORT, "false")
-    sqlContext.setConf(SQLConf.CODEGEN_ENABLED, "true")
+    sqlContext.setConf(SQLConf.EXTERNAL_SORT, false)
+    sqlContext.setConf(SQLConf.CODEGEN_ENABLED, true)
     try{
       sortTest()
     } finally {
-      sqlContext.setConf(SQLConf.EXTERNAL_SORT, externalbefore.toString)
-      sqlContext.setConf(SQLConf.CODEGEN_ENABLED, codegenbefore.toString)
+      sqlContext.setConf(SQLConf.EXTERNAL_SORT, externalbefore)
+      sqlContext.setConf(SQLConf.CODEGEN_ENABLED, codegenbefore)
     }
   }
 
   test("SPARK-6927 external sorting with codegen on") {
     val externalbefore = sqlContext.conf.externalSortEnabled
     val codegenbefore = sqlContext.conf.codegenEnabled
-    sqlContext.setConf(SQLConf.CODEGEN_ENABLED, "true")
-    sqlContext.setConf(SQLConf.EXTERNAL_SORT, "true")
+    sqlContext.setConf(SQLConf.CODEGEN_ENABLED, true)
+    sqlContext.setConf(SQLConf.EXTERNAL_SORT, true)
     try {
       sortTest()
     } finally {
-      sqlContext.setConf(SQLConf.EXTERNAL_SORT, externalbefore.toString)
-      sqlContext.setConf(SQLConf.CODEGEN_ENABLED, codegenbefore.toString)
+      sqlContext.setConf(SQLConf.EXTERNAL_SORT, externalbefore)
+      sqlContext.setConf(SQLConf.CODEGEN_ENABLED, codegenbefore)
     }
   }
 
@@ -908,25 +908,25 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll with SQLTestUtils {
     sql(s"SET $testKey=$testVal")
     checkAnswer(
       sql("SET"),
-      Row(s"$testKey=$testVal")
+      Row(testKey, testVal)
     )
 
     sql(s"SET ${testKey + testKey}=${testVal + testVal}")
     checkAnswer(
       sql("set"),
       Seq(
-        Row(s"$testKey=$testVal"),
-        Row(s"${testKey + testKey}=${testVal + testVal}"))
+        Row(testKey, testVal),
+        Row(testKey + testKey, testVal + testVal))
     )
 
     // "set key"
     checkAnswer(
       sql(s"SET $testKey"),
-      Row(s"$testKey=$testVal")
+      Row(testKey, testVal)
     )
     checkAnswer(
       sql(s"SET $nonexistentKey"),
-      Row(s"$nonexistentKey=<undefined>")
+      Row(nonexistentKey, "<undefined>")
     )
     sqlContext.conf.clear()
   }
@@ -1340,12 +1340,12 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll with SQLTestUtils {
   }
 
   test("SPARK-4699 case sensitivity SQL query") {
-    sqlContext.setConf(SQLConf.CASE_SENSITIVE, "false")
+    sqlContext.setConf(SQLConf.CASE_SENSITIVE, false)
     val data = TestData(1, "val_1") :: TestData(2, "val_2") :: Nil
     val rdd = sqlContext.sparkContext.parallelize((0 to 1).map(i => data(i)))
     rdd.toDF().registerTempTable("testTable1")
     checkAnswer(sql("SELECT VALUE FROM TESTTABLE1 where KEY = 1"), Row("val_1"))
-    sqlContext.setConf(SQLConf.CASE_SENSITIVE, "true")
+    sqlContext.setConf(SQLConf.CASE_SENSITIVE, true)
   }
 
   test("SPARK-6145: ORDER BY test for nested fields") {

http://git-wip-us.apache.org/repos/asf/spark/blob/78a430ea/sql/core/src/test/scala/org/apache/spark/sql/columnar/PartitionBatchPruningSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/PartitionBatchPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/PartitionBatchPruningSuite.scala
index 6545c6b..2c08799 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/PartitionBatchPruningSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/PartitionBatchPruningSuite.scala
@@ -32,7 +32,7 @@ class PartitionBatchPruningSuite extends SparkFunSuite with BeforeAndAfterAll wi
 
   override protected def beforeAll(): Unit = {
     // Make a table with 5 partitions, 2 batches per partition, 10 elements per batch
-    ctx.setConf(SQLConf.COLUMN_BATCH_SIZE, "10")
+    ctx.setConf(SQLConf.COLUMN_BATCH_SIZE, 10)
 
     val pruningData = ctx.sparkContext.makeRDD((1 to 100).map { key =>
       val string = if (((key - 1) / 10) % 2 == 0) null else key.toString
@@ -41,14 +41,14 @@ class PartitionBatchPruningSuite extends SparkFunSuite with BeforeAndAfterAll wi
     pruningData.registerTempTable("pruningData")
 
     // Enable in-memory partition pruning
-    ctx.setConf(SQLConf.IN_MEMORY_PARTITION_PRUNING, "true")
+    ctx.setConf(SQLConf.IN_MEMORY_PARTITION_PRUNING, true)
     // Enable in-memory table scan accumulators
     ctx.setConf("spark.sql.inMemoryTableScanStatistics.enable", "true")
   }
 
   override protected def afterAll(): Unit = {
-    ctx.setConf(SQLConf.COLUMN_BATCH_SIZE, originalColumnBatchSize.toString)
-    ctx.setConf(SQLConf.IN_MEMORY_PARTITION_PRUNING, originalInMemoryPartitionPruning.toString)
+    ctx.setConf(SQLConf.COLUMN_BATCH_SIZE, originalColumnBatchSize)
+    ctx.setConf(SQLConf.IN_MEMORY_PARTITION_PRUNING, originalInMemoryPartitionPruning)
   }
 
   before {

http://git-wip-us.apache.org/repos/asf/spark/blob/78a430ea/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
index 3e27f58..5854ab4 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
@@ -63,7 +63,7 @@ class PlannerSuite extends SparkFunSuite {
 
   test("sizeInBytes estimation of limit operator for broadcast hash join optimization") {
     def checkPlan(fieldTypes: Seq[DataType], newThreshold: Int): Unit = {
-      setConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD, newThreshold.toString)
+      setConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD, newThreshold)
       val fields = fieldTypes.zipWithIndex.map {
         case (dataType, index) => StructField(s"c${index}", dataType, true)
       } :+ StructField("key", IntegerType, true)
@@ -119,12 +119,12 @@ class PlannerSuite extends SparkFunSuite {
 
     checkPlan(complexTypes, newThreshold = 901617)
 
-    setConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD, origThreshold.toString)
+    setConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD, origThreshold)
   }
 
   test("InMemoryRelation statistics propagation") {
     val origThreshold = conf.autoBroadcastJoinThreshold
-    setConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD, 81920.toString)
+    setConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD, 81920)
 
     testData.limit(3).registerTempTable("tiny")
     sql("CACHE TABLE tiny")
@@ -139,6 +139,6 @@ class PlannerSuite extends SparkFunSuite {
     assert(broadcastHashJoins.size === 1, "Should use broadcast hash join")
     assert(shuffledHashJoins.isEmpty, "Should not use shuffled hash join")
 
-    setConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD, origThreshold.toString)
+    setConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD, origThreshold)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/78a430ea/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
index fca2436..945d437 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
@@ -1077,14 +1077,14 @@ class JsonSuite extends QueryTest with TestJsonData {
   }
 
   test("SPARK-7565 MapType in JsonRDD") {
-    val useStreaming = ctx.getConf(SQLConf.USE_JACKSON_STREAMING_API, "true")
+    val useStreaming = ctx.conf.useJacksonStreamingAPI
     val oldColumnNameOfCorruptRecord = ctx.conf.columnNameOfCorruptRecord
     ctx.setConf(SQLConf.COLUMN_NAME_OF_CORRUPT_RECORD, "_unparsed")
 
     val schemaWithSimpleMap = StructType(
       StructField("map", MapType(StringType, IntegerType, true), false) :: Nil)
     try{
-      for (useStreaming <- List("true", "false")) {
+      for (useStreaming <- List(true, false)) {
         ctx.setConf(SQLConf.USE_JACKSON_STREAMING_API, useStreaming)
         val temp = Utils.createTempDir().getPath
 

http://git-wip-us.apache.org/repos/asf/spark/blob/78a430ea/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala
index fa5d4ec..a2763c7 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala
@@ -51,7 +51,7 @@ class ParquetFilterSuiteBase extends QueryTest with ParquetTest {
       expected: Seq[Row]): Unit = {
     val output = predicate.collect { case a: Attribute => a }.distinct
 
-    withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED -> "true") {
+    withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true") {
       val query = df
         .select(output.map(e => Column(e)): _*)
         .where(Column(predicate))
@@ -314,17 +314,17 @@ class ParquetDataSourceOnFilterSuite extends ParquetFilterSuiteBase with BeforeA
   lazy val originalConf = sqlContext.conf.parquetUseDataSourceApi
 
   override protected def beforeAll(): Unit = {
-    sqlContext.conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, "true")
+    sqlContext.conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, true)
   }
 
   override protected def afterAll(): Unit = {
-    sqlContext.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf.toString)
+    sqlContext.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf)
   }
 
   test("SPARK-6554: don't push down predicates which reference partition columns") {
     import sqlContext.implicits._
 
-    withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED -> "true") {
+    withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true") {
       withTempPath { dir =>
         val path = s"${dir.getCanonicalPath}/part=1"
         (1 to 3).map(i => (i, i.toString)).toDF("a", "b").write.parquet(path)
@@ -343,17 +343,17 @@ class ParquetDataSourceOffFilterSuite extends ParquetFilterSuiteBase with Before
   lazy val originalConf = sqlContext.conf.parquetUseDataSourceApi
 
   override protected def beforeAll(): Unit = {
-    sqlContext.conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, "false")
+    sqlContext.conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, false)
   }
 
   override protected def afterAll(): Unit = {
-    sqlContext.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf.toString)
+    sqlContext.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf)
   }
 
   test("SPARK-6742: don't push down predicates which reference partition columns") {
     import sqlContext.implicits._
 
-    withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED -> "true") {
+    withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true") {
       withTempPath { dir =>
         val path = s"${dir.getCanonicalPath}/part=1"
         (1 to 3).map(i => (i, i.toString)).toDF("a", "b").write.parquet(path)

http://git-wip-us.apache.org/repos/asf/spark/blob/78a430ea/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
index fc827bc..284d99d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
@@ -94,8 +94,8 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest {
     val data = (1 to 4).map(i => Tuple1(i.toString))
     // Property spark.sql.parquet.binaryAsString shouldn't affect Parquet files written by Spark SQL
     // as we store Spark SQL schema in the extra metadata.
-    withSQLConf(SQLConf.PARQUET_BINARY_AS_STRING -> "false")(checkParquetFile(data))
-    withSQLConf(SQLConf.PARQUET_BINARY_AS_STRING -> "true")(checkParquetFile(data))
+    withSQLConf(SQLConf.PARQUET_BINARY_AS_STRING.key -> "false")(checkParquetFile(data))
+    withSQLConf(SQLConf.PARQUET_BINARY_AS_STRING.key -> "true")(checkParquetFile(data))
   }
 
   test("fixed-length decimals") {
@@ -231,7 +231,7 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest {
     val data = (0 until 10).map(i => (i, i.toString))
 
     def checkCompressionCodec(codec: CompressionCodecName): Unit = {
-      withSQLConf(SQLConf.PARQUET_COMPRESSION -> codec.name()) {
+      withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> codec.name()) {
         withParquetFile(data) { path =>
           assertResult(sqlContext.conf.parquetCompressionCodec.toUpperCase) {
             compressionCodecFor(path)
@@ -408,7 +408,7 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest {
       val clonedConf = new Configuration(configuration)
 
       configuration.set(
-        SQLConf.OUTPUT_COMMITTER_CLASS, classOf[ParquetOutputCommitter].getCanonicalName)
+        SQLConf.OUTPUT_COMMITTER_CLASS.key, classOf[ParquetOutputCommitter].getCanonicalName)
 
       configuration.set(
         "spark.sql.parquet.output.committer.class",
@@ -440,11 +440,11 @@ class ParquetDataSourceOnIOSuite extends ParquetIOSuiteBase with BeforeAndAfterA
   private lazy val originalConf = sqlContext.conf.parquetUseDataSourceApi
 
   override protected def beforeAll(): Unit = {
-    sqlContext.conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, "true")
+    sqlContext.conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, true)
   }
 
   override protected def afterAll(): Unit = {
-    sqlContext.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf.toString)
+    sqlContext.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API.key, originalConf.toString)
   }
 
   test("SPARK-6330 regression test") {
@@ -464,10 +464,10 @@ class ParquetDataSourceOffIOSuite extends ParquetIOSuiteBase with BeforeAndAfter
   private lazy val originalConf = sqlContext.conf.parquetUseDataSourceApi
 
   override protected def beforeAll(): Unit = {
-    sqlContext.conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, "false")
+    sqlContext.conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, false)
   }
 
   override protected def afterAll(): Unit = {
-    sqlContext.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf.toString)
+    sqlContext.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/78a430ea/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
index be3b34d..fafad67 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
@@ -128,11 +128,11 @@ class ParquetDataSourceOnQuerySuite extends ParquetQuerySuiteBase with BeforeAnd
   private lazy val originalConf = sqlContext.conf.parquetUseDataSourceApi
 
   override protected def beforeAll(): Unit = {
-    sqlContext.conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, "true")
+    sqlContext.conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, true)
   }
 
   override protected def afterAll(): Unit = {
-    sqlContext.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf.toString)
+    sqlContext.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf)
   }
 }
 
@@ -140,10 +140,10 @@ class ParquetDataSourceOffQuerySuite extends ParquetQuerySuiteBase with BeforeAn
   private lazy val originalConf = sqlContext.conf.parquetUseDataSourceApi
 
   override protected def beforeAll(): Unit = {
-    sqlContext.conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, "false")
+    sqlContext.conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, false)
   }
 
   override protected def afterAll(): Unit = {
-    sqlContext.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf.toString)
+    sqlContext.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/78a430ea/sql/core/src/test/scala/org/apache/spark/sql/sources/DataSourceTest.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/DataSourceTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/DataSourceTest.scala
index 3f77960..00cc7d5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/DataSourceTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/DataSourceTest.scala
@@ -27,7 +27,7 @@ abstract class DataSourceTest extends QueryTest with BeforeAndAfter {
   // We want to test some edge cases.
   protected implicit lazy val caseInsensitiveContext = {
     val ctx = new SQLContext(TestSQLContext.sparkContext)
-    ctx.setConf(SQLConf.CASE_SENSITIVE, "false")
+    ctx.setConf(SQLConf.CASE_SENSITIVE, false)
     ctx
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/78a430ea/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
index ac4a00a..fa01823 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
@@ -37,11 +37,11 @@ trait SQLTestUtils {
    */
   protected def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = {
     val (keys, values) = pairs.unzip
-    val currentValues = keys.map(key => Try(sqlContext.conf.getConf(key)).toOption)
-    (keys, values).zipped.foreach(sqlContext.conf.setConf)
+    val currentValues = keys.map(key => Try(sqlContext.conf.getConfString(key)).toOption)
+    (keys, values).zipped.foreach(sqlContext.conf.setConfString)
     try f finally {
       keys.zip(currentValues).foreach {
-        case (key, Some(value)) => sqlContext.conf.setConf(key, value)
+        case (key, Some(value)) => sqlContext.conf.setConfString(key, value)
         case (key, None) => sqlContext.conf.unsetConf(key)
       }
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/78a430ea/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
----------------------------------------------------------------------
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
index c9da252..700d994 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
@@ -153,9 +153,9 @@ object HiveThriftServer2 extends Logging {
     val sessionList = new mutable.LinkedHashMap[String, SessionInfo]
     val executionList = new mutable.LinkedHashMap[String, ExecutionInfo]
     val retainedStatements =
-      conf.getConf(SQLConf.THRIFTSERVER_UI_STATEMENT_LIMIT, "200").toInt
+      conf.getConf(SQLConf.THRIFTSERVER_UI_STATEMENT_LIMIT)
     val retainedSessions =
-      conf.getConf(SQLConf.THRIFTSERVER_UI_SESSION_LIMIT, "200").toInt
+      conf.getConf(SQLConf.THRIFTSERVER_UI_SESSION_LIMIT)
     var totalRunning = 0
 
     override def onJobStart(jobStart: SparkListenerJobStart): Unit = {

http://git-wip-us.apache.org/repos/asf/spark/blob/78a430ea/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
----------------------------------------------------------------------
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
index e071103..e875888 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
@@ -219,7 +219,7 @@ private[hive] class SparkExecuteStatementOperation(
       result = hiveContext.sql(statement)
       logDebug(result.queryExecution.toString())
       result.queryExecution.logical match {
-        case SetCommand(Some((SQLConf.THRIFTSERVER_POOL, Some(value))), _) =>
+        case SetCommand(Some((SQLConf.THRIFTSERVER_POOL.key, Some(value)))) =>
           sessionToActivePool(parentSession.getSessionHandle) = value
           logInfo(s"Setting spark.scheduler.pool=$value for future statements in this session.")
         case _ =>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org