You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2016/11/16 09:12:29 UTC

spark git commit: [SPARK-18433][SQL] Improve DataSource option keys to be more case-insensitive

Repository: spark
Updated Branches:
  refs/heads/master 95eb06bd7 -> 74f5c2176


[SPARK-18433][SQL] Improve DataSource option keys to be more case-insensitive

## What changes were proposed in this pull request?

This PR aims to improve DataSource option keys to be more case-insensitive

DataSource partially use CaseInsensitiveMap in code-path. For example, the following fails to find url.

```scala
val df = spark.createDataFrame(sparkContext.parallelize(arr2x2), schema2)
df.write.format("jdbc")
    .option("UrL", url1)
    .option("dbtable", "TEST.SAVETEST")
    .options(properties.asScala)
    .save()
```

This PR makes DataSource options to use CaseInsensitiveMap internally and also makes DataSource to use CaseInsensitiveMap generally except `InMemoryFileIndex` and `InsertIntoHadoopFsRelationCommand`. We can not pass them CaseInsensitiveMap because they creates new case-sensitive HadoopConfs by calling newHadoopConfWithOptions(options) inside.

## How was this patch tested?

Pass the Jenkins test with newly added test cases.

Author: Dongjoon Hyun <do...@apache.org>

Closes #15884 from dongjoon-hyun/SPARK-18433.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/74f5c217
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/74f5c217
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/74f5c217

Branch: refs/heads/master
Commit: 74f5c2176d8449e41f520febd38109edaf3f4172
Parents: 95eb06b
Author: Dongjoon Hyun <do...@apache.org>
Authored: Wed Nov 16 17:12:18 2016 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Wed Nov 16 17:12:18 2016 +0800

----------------------------------------------------------------------
 .../spark/sql/catalyst/json/JSONOptions.scala   |  6 ++--
 .../sql/catalyst/util/CaseInsensitiveMap.scala  | 36 ++++++++++++++++++++
 .../spark/sql/execution/command/ddl.scala       |  2 +-
 .../sql/execution/datasources/DataSource.scala  | 30 ++++++++--------
 .../execution/datasources/csv/CSVOptions.scala  |  8 +++--
 .../spark/sql/execution/datasources/ddl.scala   | 18 ----------
 .../datasources/jdbc/JDBCOptions.scala          | 10 ++++--
 .../datasources/parquet/ParquetOptions.scala    |  6 +++-
 .../execution/streaming/FileStreamOptions.scala |  8 +++--
 .../datasources/csv/CSVInferSchemaSuite.scala   |  5 +++
 .../execution/datasources/json/JsonSuite.scala  | 19 +++++++++--
 .../datasources/parquet/ParquetIOSuite.scala    |  7 ++++
 .../apache/spark/sql/jdbc/JDBCWriteSuite.scala  |  9 +++++
 .../sql/streaming/FileStreamSourceSuite.scala   |  5 +++
 .../spark/sql/hive/HiveExternalCatalog.scala    |  2 +-
 .../apache/spark/sql/hive/orc/OrcOptions.scala  |  6 +++-
 .../spark/sql/hive/orc/OrcSourceSuite.scala     |  4 +++
 .../apache/spark/sql/hive/parquetSuites.scala   |  1 +
 18 files changed, 133 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/74f5c217/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
index c459706..38e191b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
@@ -23,7 +23,7 @@ import com.fasterxml.jackson.core.{JsonFactory, JsonParser}
 import org.apache.commons.lang3.time.FastDateFormat
 
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.catalyst.util.{CompressionCodecs, ParseModes}
+import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, CompressionCodecs, ParseModes}
 
 /**
  * Options for parsing JSON data into Spark SQL rows.
@@ -31,9 +31,11 @@ import org.apache.spark.sql.catalyst.util.{CompressionCodecs, ParseModes}
  * Most of these map directly to Jackson's internal options, specified in [[JsonParser.Feature]].
  */
 private[sql] class JSONOptions(
-    @transient private val parameters: Map[String, String])
+    @transient private val parameters: CaseInsensitiveMap)
   extends Logging with Serializable  {
 
+  def this(parameters: Map[String, String]) = this(new CaseInsensitiveMap(parameters))
+
   val samplingRatio =
     parameters.get("samplingRatio").map(_.toDouble).getOrElse(1.0)
   val primitivesAsString =

http://git-wip-us.apache.org/repos/asf/spark/blob/74f5c217/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CaseInsensitiveMap.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CaseInsensitiveMap.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CaseInsensitiveMap.scala
new file mode 100644
index 0000000..a7f7a8a
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CaseInsensitiveMap.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.util
+
+/**
+ * Builds a map in which keys are case insensitive
+ */
+class CaseInsensitiveMap(map: Map[String, String]) extends Map[String, String]
+  with Serializable {
+
+  val baseMap = map.map(kv => kv.copy(_1 = kv._1.toLowerCase))
+
+  override def get(k: String): Option[String] = baseMap.get(k.toLowerCase)
+
+  override def + [B1 >: String](kv: (String, B1)): Map[String, B1] =
+    baseMap + kv.copy(_1 = kv._1.toLowerCase)
+
+  override def iterator: Iterator[(String, String)] = baseMap.iterator
+
+  override def -(key: String): Map[String, String] = baseMap - key.toLowerCase
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/74f5c217/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index 6c1c398..588aa05 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -33,7 +33,7 @@ import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, BinaryComparison}
 import org.apache.spark.sql.catalyst.expressions.{EqualTo, Expression, PredicateHelper}
-import org.apache.spark.sql.execution.datasources.{CaseInsensitiveMap, PartitioningUtils}
+import org.apache.spark.sql.execution.datasources.PartitioningUtils
 import org.apache.spark.sql.types._
 import org.apache.spark.util.SerializableConfiguration
 

http://git-wip-us.apache.org/repos/asf/spark/blob/74f5c217/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 65422f1..cfee7be 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -31,6 +31,7 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable}
 import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
 import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
 import org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider
 import org.apache.spark.sql.execution.datasources.json.JsonFileFormat
@@ -80,13 +81,13 @@ case class DataSource(
 
   lazy val providingClass: Class[_] = DataSource.lookupDataSource(className)
   lazy val sourceInfo = sourceSchema()
+  private val caseInsensitiveOptions = new CaseInsensitiveMap(options)
 
   /**
    * Infer the schema of the given FileFormat, returns a pair of schema and partition column names.
    */
   private def inferFileFormatSchema(format: FileFormat): (StructType, Seq[String]) = {
     userSpecifiedSchema.map(_ -> partitionColumns).orElse {
-      val caseInsensitiveOptions = new CaseInsensitiveMap(options)
       val allPaths = caseInsensitiveOptions.get("path")
       val globbedPaths = allPaths.toSeq.flatMap { path =>
         val hdfsPath = new Path(path)
@@ -114,11 +115,10 @@ case class DataSource(
     providingClass.newInstance() match {
       case s: StreamSourceProvider =>
         val (name, schema) = s.sourceSchema(
-          sparkSession.sqlContext, userSpecifiedSchema, className, options)
+          sparkSession.sqlContext, userSpecifiedSchema, className, caseInsensitiveOptions)
         SourceInfo(name, schema, Nil)
 
       case format: FileFormat =>
-        val caseInsensitiveOptions = new CaseInsensitiveMap(options)
         val path = caseInsensitiveOptions.getOrElse("path", {
           throw new IllegalArgumentException("'path' is not specified")
         })
@@ -158,10 +158,14 @@ case class DataSource(
     providingClass.newInstance() match {
       case s: StreamSourceProvider =>
         s.createSource(
-          sparkSession.sqlContext, metadataPath, userSpecifiedSchema, className, options)
+          sparkSession.sqlContext,
+          metadataPath,
+          userSpecifiedSchema,
+          className,
+          caseInsensitiveOptions)
 
       case format: FileFormat =>
-        val path = new CaseInsensitiveMap(options).getOrElse("path", {
+        val path = caseInsensitiveOptions.getOrElse("path", {
           throw new IllegalArgumentException("'path' is not specified")
         })
         new FileStreamSource(
@@ -171,7 +175,7 @@ case class DataSource(
           schema = sourceInfo.schema,
           partitionColumns = sourceInfo.partitionColumns,
           metadataPath = metadataPath,
-          options = options)
+          options = caseInsensitiveOptions)
       case _ =>
         throw new UnsupportedOperationException(
           s"Data source $className does not support streamed reading")
@@ -182,10 +186,9 @@ case class DataSource(
   def createSink(outputMode: OutputMode): Sink = {
     providingClass.newInstance() match {
       case s: StreamSinkProvider =>
-        s.createSink(sparkSession.sqlContext, options, partitionColumns, outputMode)
+        s.createSink(sparkSession.sqlContext, caseInsensitiveOptions, partitionColumns, outputMode)
 
       case fileFormat: FileFormat =>
-        val caseInsensitiveOptions = new CaseInsensitiveMap(options)
         val path = caseInsensitiveOptions.getOrElse("path", {
           throw new IllegalArgumentException("'path' is not specified")
         })
@@ -193,7 +196,7 @@ case class DataSource(
           throw new IllegalArgumentException(
             s"Data source $className does not support $outputMode output mode")
         }
-        new FileStreamSink(sparkSession, path, fileFormat, partitionColumns, options)
+        new FileStreamSink(sparkSession, path, fileFormat, partitionColumns, caseInsensitiveOptions)
 
       case _ =>
         throw new UnsupportedOperationException(
@@ -234,7 +237,6 @@ case class DataSource(
    *                        that files already exist, we don't need to check them again.
    */
   def resolveRelation(checkFilesExist: Boolean = true): BaseRelation = {
-    val caseInsensitiveOptions = new CaseInsensitiveMap(options)
     val relation = (providingClass.newInstance(), userSpecifiedSchema) match {
       // TODO: Throw when too much is given.
       case (dataSource: SchemaRelationProvider, Some(schema)) =>
@@ -274,7 +276,7 @@ case class DataSource(
           dataSchema = dataSchema,
           bucketSpec = None,
           format,
-          options)(sparkSession)
+          caseInsensitiveOptions)(sparkSession)
 
       // This is a non-streaming file based datasource.
       case (format: FileFormat, _) =>
@@ -358,13 +360,13 @@ case class DataSource(
 
     providingClass.newInstance() match {
       case dataSource: CreatableRelationProvider =>
-        dataSource.createRelation(sparkSession.sqlContext, mode, options, data)
+        dataSource.createRelation(sparkSession.sqlContext, mode, caseInsensitiveOptions, data)
       case format: FileFormat =>
         // Don't glob path for the write path.  The contracts here are:
         //  1. Only one output path can be specified on the write path;
         //  2. Output path must be a legal HDFS style file system path;
         //  3. It's OK that the output path doesn't exist yet;
-        val allPaths = paths ++ new CaseInsensitiveMap(options).get("path")
+        val allPaths = paths ++ caseInsensitiveOptions.get("path")
         val outputPath = if (allPaths.length == 1) {
           val path = new Path(allPaths.head)
           val fs = path.getFileSystem(sparkSession.sessionState.newHadoopConf())
@@ -391,7 +393,7 @@ case class DataSource(
           // TODO: Case sensitivity.
           val sameColumns =
             existingPartitionColumns.map(_.toLowerCase()) == partitionColumns.map(_.toLowerCase())
-          if (existingPartitionColumns.size > 0 && !sameColumns) {
+          if (existingPartitionColumns.nonEmpty && !sameColumns) {
             throw new AnalysisException(
               s"""Requested partitioning does not match existing partitioning.
                  |Existing partitioning columns:

http://git-wip-us.apache.org/repos/asf/spark/blob/74f5c217/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
index 5903729..21e5030 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
@@ -23,11 +23,13 @@ import java.util.Locale
 import org.apache.commons.lang3.time.FastDateFormat
 
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.catalyst.util.{CompressionCodecs, ParseModes}
+import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, CompressionCodecs, ParseModes}
 
-private[csv] class CSVOptions(@transient private val parameters: Map[String, String])
+private[csv] class CSVOptions(@transient private val parameters: CaseInsensitiveMap)
   extends Logging with Serializable {
 
+  def this(parameters: Map[String, String]) = this(new CaseInsensitiveMap(parameters))
+
   private def getChar(paramName: String, default: Char): Char = {
     val paramValue = parameters.get(paramName)
     paramValue match {
@@ -128,7 +130,7 @@ private[csv] class CSVOptions(@transient private val parameters: Map[String, Str
 
 object CSVOptions {
 
-  def apply(): CSVOptions = new CSVOptions(Map.empty)
+  def apply(): CSVOptions = new CSVOptions(new CaseInsensitiveMap(Map.empty))
 
   def apply(paramName: String, paramValue: String): CSVOptions = {
     new CSVOptions(Map(paramName -> paramValue))

http://git-wip-us.apache.org/repos/asf/spark/blob/74f5c217/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
index 59fb48f..fa8dfa9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
@@ -96,21 +96,3 @@ case class RefreshResource(path: String)
     Seq.empty[Row]
   }
 }
-
-/**
- * Builds a map in which keys are case insensitive
- */
-class CaseInsensitiveMap(map: Map[String, String]) extends Map[String, String]
-  with Serializable {
-
-  val baseMap = map.map(kv => kv.copy(_1 = kv._1.toLowerCase))
-
-  override def get(k: String): Option[String] = baseMap.get(k.toLowerCase)
-
-  override def + [B1 >: String](kv: (String, B1)): Map[String, B1] =
-    baseMap + kv.copy(_1 = kv._1.toLowerCase)
-
-  override def iterator: Iterator[(String, String)] = baseMap.iterator
-
-  override def -(key: String): Map[String, String] = baseMap - key.toLowerCase
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/74f5c217/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
index fcd7409..7f419b5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
@@ -22,19 +22,23 @@ import java.util.Properties
 
 import scala.collection.mutable.ArrayBuffer
 
+import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
+
 /**
  * Options for the JDBC data source.
  */
 class JDBCOptions(
-    @transient private val parameters: Map[String, String])
+    @transient private val parameters: CaseInsensitiveMap)
   extends Serializable {
 
   import JDBCOptions._
 
+  def this(parameters: Map[String, String]) = this(new CaseInsensitiveMap(parameters))
+
   def this(url: String, table: String, parameters: Map[String, String]) = {
-    this(parameters ++ Map(
+    this(new CaseInsensitiveMap(parameters ++ Map(
       JDBCOptions.JDBC_URL -> url,
-      JDBCOptions.JDBC_TABLE_NAME -> table))
+      JDBCOptions.JDBC_TABLE_NAME -> table)))
   }
 
   val asConnectionProperties: Properties = {

http://git-wip-us.apache.org/repos/asf/spark/blob/74f5c217/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala
index d0fd236..a81a95d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala
@@ -19,18 +19,22 @@ package org.apache.spark.sql.execution.datasources.parquet
 
 import org.apache.parquet.hadoop.metadata.CompressionCodecName
 
+import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
 import org.apache.spark.sql.internal.SQLConf
 
 /**
  * Options for the Parquet data source.
  */
 private[parquet] class ParquetOptions(
-    @transient private val parameters: Map[String, String],
+    @transient private val parameters: CaseInsensitiveMap,
     @transient private val sqlConf: SQLConf)
   extends Serializable {
 
   import ParquetOptions._
 
+  def this(parameters: Map[String, String], sqlConf: SQLConf) =
+    this(new CaseInsensitiveMap(parameters), sqlConf)
+
   /**
    * Compression codec to use. By default use the value specified in SQLConf.
    * Acceptable values are defined in [[shortParquetCompressionCodecNames]].

http://git-wip-us.apache.org/repos/asf/spark/blob/74f5c217/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala
index 3efc20c..fdea65c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala
@@ -20,13 +20,15 @@ package org.apache.spark.sql.execution.streaming
 import scala.util.Try
 
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.execution.datasources.CaseInsensitiveMap
+import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
 import org.apache.spark.util.Utils
 
 /**
  * User specified options for file streams.
  */
-class FileStreamOptions(parameters: Map[String, String]) extends Logging {
+class FileStreamOptions(parameters: CaseInsensitiveMap) extends Logging {
+
+  def this(parameters: Map[String, String]) = this(new CaseInsensitiveMap(parameters))
 
   val maxFilesPerTrigger: Option[Int] = parameters.get("maxFilesPerTrigger").map { str =>
     Try(str.toInt).toOption.filter(_ > 0).getOrElse {
@@ -50,5 +52,5 @@ class FileStreamOptions(parameters: Map[String, String]) extends Logging {
 
   /** Options as specified by the user, in a case-insensitive map, without "path" set. */
   val optionMapWithoutPath: Map[String, String] =
-    new CaseInsensitiveMap(parameters).filterKeys(_ != "path")
+    parameters.filterKeys(_ != "path")
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/74f5c217/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala
index 5e00f66..93f752d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala
@@ -109,4 +109,9 @@ class CSVInferSchemaSuite extends SparkFunSuite {
     val mergedNullTypes = CSVInferSchema.mergeRowTypes(Array(NullType), Array(NullType))
     assert(mergedNullTypes.deep == Array(NullType).deep)
   }
+
+  test("SPARK-18433: Improve DataSource option keys to be more case-insensitive") {
+    val options = new CSVOptions(Map("TiMeStampFormat" -> "yyyy-mm"))
+    assert(CSVInferSchema.inferField(TimestampType, "2015-08", options) == TimestampType)
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/74f5c217/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index 456052f..598e44e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -1366,7 +1366,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
 
   test("SPARK-6245 JsonRDD.inferSchema on empty RDD") {
     // This is really a test that it doesn't throw an exception
-    val emptySchema = InferSchema.infer(empty, "", new JSONOptions(Map()))
+    val emptySchema = InferSchema.infer(empty, "", new JSONOptions(Map.empty[String, String]))
     assert(StructType(Seq()) === emptySchema)
   }
 
@@ -1390,7 +1390,8 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
   }
 
   test("SPARK-8093 Erase empty structs") {
-    val emptySchema = InferSchema.infer(emptyRecords, "", new JSONOptions(Map()))
+    val emptySchema = InferSchema.infer(
+      emptyRecords, "", new JSONOptions(Map.empty[String, String]))
     assert(StructType(Seq()) === emptySchema)
   }
 
@@ -1749,4 +1750,18 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
       checkAnswer(stringTimestampsWithFormat, expectedStringDatesWithFormat)
     }
   }
+
+  test("SPARK-18433: Improve DataSource option keys to be more case-insensitive") {
+    val records = sparkContext
+      .parallelize("""{"a": 3, "b": 1.1}""" :: """{"a": 3.1, "b": 0.000001}""" :: Nil)
+
+    val schema = StructType(
+      StructField("a", DecimalType(21, 1), true) ::
+      StructField("b", DecimalType(7, 6), true) :: Nil)
+
+    val df1 = spark.read.option("prefersDecimal", "true").json(records)
+    assert(df1.schema == schema)
+    val df2 = spark.read.option("PREfersdecimaL", "true").json(records)
+    assert(df2.schema == schema)
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/74f5c217/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index 580eade..acdadb3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
@@ -736,6 +736,13 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
       }
     }
   }
+
+  test("SPARK-18433: Improve DataSource option keys to be more case-insensitive") {
+    withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> "snappy") {
+      val option = new ParquetOptions(Map("Compression" -> "uncompressed"), spark.sessionState.conf)
+      assert(option.compressionCodecClassName == "UNCOMPRESSED")
+    }
+  }
 }
 
 class JobCommitFailureParquetOutputCommitter(outputPath: Path, context: TaskAttemptContext)

http://git-wip-us.apache.org/repos/asf/spark/blob/74f5c217/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
index 96540ec..e3d3c6c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
@@ -303,4 +303,13 @@ class JDBCWriteSuite extends SharedSQLContext with BeforeAndAfter {
     assert(e.contains("If 'partitionColumn' is specified then 'lowerBound', 'upperBound'," +
       " and 'numPartitions' are required."))
   }
+
+  test("SPARK-18433: Improve DataSource option keys to be more case-insensitive") {
+    val df = spark.createDataFrame(sparkContext.parallelize(arr2x2), schema2)
+    df.write.format("jdbc")
+      .option("Url", url1)
+      .option("dbtable", "TEST.SAVETEST")
+      .options(properties.asScala)
+      .save()
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/74f5c217/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index fab7642..b365af7 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -1004,6 +1004,11 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
       )
     }
   }
+
+  test("SPARK-18433: Improve DataSource option keys to be more case-insensitive") {
+    val options = new FileStreamOptions(Map("maxfilespertrigger" -> "1"))
+    assert(options.maxFilesPerTrigger == Some(1))
+  }
 }
 
 class FileStreamSourceStressTestSuite extends FileStreamSourceTest {

http://git-wip-us.apache.org/repos/asf/spark/blob/74f5c217/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index 42ce1a8..cbd00da 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -35,8 +35,8 @@ import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
 import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, Statistics}
+import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
 import org.apache.spark.sql.execution.command.{ColumnStatStruct, DDLUtils}
-import org.apache.spark.sql.execution.datasources.CaseInsensitiveMap
 import org.apache.spark.sql.hive.client.HiveClient
 import org.apache.spark.sql.internal.HiveSerDe
 import org.apache.spark.sql.internal.StaticSQLConf._

http://git-wip-us.apache.org/repos/asf/spark/blob/74f5c217/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcOptions.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcOptions.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcOptions.scala
index c2a126d..ac587ab 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcOptions.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcOptions.scala
@@ -17,14 +17,18 @@
 
 package org.apache.spark.sql.hive.orc
 
+import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
+
 /**
  * Options for the ORC data source.
  */
-private[orc] class OrcOptions(@transient private val parameters: Map[String, String])
+private[orc] class OrcOptions(@transient private val parameters: CaseInsensitiveMap)
   extends Serializable {
 
   import OrcOptions._
 
+  def this(parameters: Map[String, String]) = this(new CaseInsensitiveMap(parameters))
+
   /**
    * Compression codec to use. By default snappy compression.
    * Acceptable values are defined in [[shortOrcCompressionCodecNames]].

http://git-wip-us.apache.org/repos/asf/spark/blob/74f5c217/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala
index 0f37cd7..12f9480 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala
@@ -146,6 +146,10 @@ abstract class OrcSuite extends QueryTest with TestHiveSingleton with BeforeAndA
 
     sql("DROP TABLE IF EXISTS orcNullValues")
   }
+
+  test("SPARK-18433: Improve DataSource option keys to be more case-insensitive") {
+    assert(new OrcOptions(Map("Orc.Compress" -> "NONE")).compressionCodec == "NONE")
+  }
 }
 
 class OrcSourceSuite extends OrcSuite {

http://git-wip-us.apache.org/repos/asf/spark/blob/74f5c217/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
index 3644ff9..2ce60fe 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
@@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.execution.DataSourceScanExec
 import org.apache.spark.sql.execution.command.ExecutedCommandExec
 import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, InsertIntoDataSourceCommand, InsertIntoHadoopFsRelationCommand, LogicalRelation}
+import org.apache.spark.sql.execution.datasources.parquet.ParquetOptions
 import org.apache.spark.sql.hive.execution.HiveTableScanExec
 import org.apache.spark.sql.hive.test.TestHiveSingleton
 import org.apache.spark.sql.internal.SQLConf


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org