You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2016/11/16 09:12:29 UTC
spark git commit: [SPARK-18433][SQL] Improve DataSource option keys
to be more case-insensitive
Repository: spark
Updated Branches:
refs/heads/master 95eb06bd7 -> 74f5c2176
[SPARK-18433][SQL] Improve DataSource option keys to be more case-insensitive
## What changes were proposed in this pull request?
This PR aims to improve DataSource option keys to be more case-insensitive
DataSource partially use CaseInsensitiveMap in code-path. For example, the following fails to find url.
```scala
val df = spark.createDataFrame(sparkContext.parallelize(arr2x2), schema2)
df.write.format("jdbc")
.option("UrL", url1)
.option("dbtable", "TEST.SAVETEST")
.options(properties.asScala)
.save()
```
This PR makes DataSource options to use CaseInsensitiveMap internally and also makes DataSource to use CaseInsensitiveMap generally except `InMemoryFileIndex` and `InsertIntoHadoopFsRelationCommand`. We can not pass them CaseInsensitiveMap because they creates new case-sensitive HadoopConfs by calling newHadoopConfWithOptions(options) inside.
## How was this patch tested?
Pass the Jenkins test with newly added test cases.
Author: Dongjoon Hyun <do...@apache.org>
Closes #15884 from dongjoon-hyun/SPARK-18433.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/74f5c217
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/74f5c217
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/74f5c217
Branch: refs/heads/master
Commit: 74f5c2176d8449e41f520febd38109edaf3f4172
Parents: 95eb06b
Author: Dongjoon Hyun <do...@apache.org>
Authored: Wed Nov 16 17:12:18 2016 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Wed Nov 16 17:12:18 2016 +0800
----------------------------------------------------------------------
.../spark/sql/catalyst/json/JSONOptions.scala | 6 ++--
.../sql/catalyst/util/CaseInsensitiveMap.scala | 36 ++++++++++++++++++++
.../spark/sql/execution/command/ddl.scala | 2 +-
.../sql/execution/datasources/DataSource.scala | 30 ++++++++--------
.../execution/datasources/csv/CSVOptions.scala | 8 +++--
.../spark/sql/execution/datasources/ddl.scala | 18 ----------
.../datasources/jdbc/JDBCOptions.scala | 10 ++++--
.../datasources/parquet/ParquetOptions.scala | 6 +++-
.../execution/streaming/FileStreamOptions.scala | 8 +++--
.../datasources/csv/CSVInferSchemaSuite.scala | 5 +++
.../execution/datasources/json/JsonSuite.scala | 19 +++++++++--
.../datasources/parquet/ParquetIOSuite.scala | 7 ++++
.../apache/spark/sql/jdbc/JDBCWriteSuite.scala | 9 +++++
.../sql/streaming/FileStreamSourceSuite.scala | 5 +++
.../spark/sql/hive/HiveExternalCatalog.scala | 2 +-
.../apache/spark/sql/hive/orc/OrcOptions.scala | 6 +++-
.../spark/sql/hive/orc/OrcSourceSuite.scala | 4 +++
.../apache/spark/sql/hive/parquetSuites.scala | 1 +
18 files changed, 133 insertions(+), 49 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/74f5c217/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
index c459706..38e191b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
@@ -23,7 +23,7 @@ import com.fasterxml.jackson.core.{JsonFactory, JsonParser}
import org.apache.commons.lang3.time.FastDateFormat
import org.apache.spark.internal.Logging
-import org.apache.spark.sql.catalyst.util.{CompressionCodecs, ParseModes}
+import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, CompressionCodecs, ParseModes}
/**
* Options for parsing JSON data into Spark SQL rows.
@@ -31,9 +31,11 @@ import org.apache.spark.sql.catalyst.util.{CompressionCodecs, ParseModes}
* Most of these map directly to Jackson's internal options, specified in [[JsonParser.Feature]].
*/
private[sql] class JSONOptions(
- @transient private val parameters: Map[String, String])
+ @transient private val parameters: CaseInsensitiveMap)
extends Logging with Serializable {
+ def this(parameters: Map[String, String]) = this(new CaseInsensitiveMap(parameters))
+
val samplingRatio =
parameters.get("samplingRatio").map(_.toDouble).getOrElse(1.0)
val primitivesAsString =
http://git-wip-us.apache.org/repos/asf/spark/blob/74f5c217/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CaseInsensitiveMap.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CaseInsensitiveMap.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CaseInsensitiveMap.scala
new file mode 100644
index 0000000..a7f7a8a
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CaseInsensitiveMap.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.util
+
+/**
+ * Builds a map in which keys are case insensitive
+ */
+class CaseInsensitiveMap(map: Map[String, String]) extends Map[String, String]
+ with Serializable {
+
+ val baseMap = map.map(kv => kv.copy(_1 = kv._1.toLowerCase))
+
+ override def get(k: String): Option[String] = baseMap.get(k.toLowerCase)
+
+ override def + [B1 >: String](kv: (String, B1)): Map[String, B1] =
+ baseMap + kv.copy(_1 = kv._1.toLowerCase)
+
+ override def iterator: Iterator[(String, String)] = baseMap.iterator
+
+ override def -(key: String): Map[String, String] = baseMap - key.toLowerCase
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/74f5c217/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index 6c1c398..588aa05 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -33,7 +33,7 @@ import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, BinaryComparison}
import org.apache.spark.sql.catalyst.expressions.{EqualTo, Expression, PredicateHelper}
-import org.apache.spark.sql.execution.datasources.{CaseInsensitiveMap, PartitioningUtils}
+import org.apache.spark.sql.execution.datasources.PartitioningUtils
import org.apache.spark.sql.types._
import org.apache.spark.util.SerializableConfiguration
http://git-wip-us.apache.org/repos/asf/spark/blob/74f5c217/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 65422f1..cfee7be 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -31,6 +31,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable}
import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
import org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider
import org.apache.spark.sql.execution.datasources.json.JsonFileFormat
@@ -80,13 +81,13 @@ case class DataSource(
lazy val providingClass: Class[_] = DataSource.lookupDataSource(className)
lazy val sourceInfo = sourceSchema()
+ private val caseInsensitiveOptions = new CaseInsensitiveMap(options)
/**
* Infer the schema of the given FileFormat, returns a pair of schema and partition column names.
*/
private def inferFileFormatSchema(format: FileFormat): (StructType, Seq[String]) = {
userSpecifiedSchema.map(_ -> partitionColumns).orElse {
- val caseInsensitiveOptions = new CaseInsensitiveMap(options)
val allPaths = caseInsensitiveOptions.get("path")
val globbedPaths = allPaths.toSeq.flatMap { path =>
val hdfsPath = new Path(path)
@@ -114,11 +115,10 @@ case class DataSource(
providingClass.newInstance() match {
case s: StreamSourceProvider =>
val (name, schema) = s.sourceSchema(
- sparkSession.sqlContext, userSpecifiedSchema, className, options)
+ sparkSession.sqlContext, userSpecifiedSchema, className, caseInsensitiveOptions)
SourceInfo(name, schema, Nil)
case format: FileFormat =>
- val caseInsensitiveOptions = new CaseInsensitiveMap(options)
val path = caseInsensitiveOptions.getOrElse("path", {
throw new IllegalArgumentException("'path' is not specified")
})
@@ -158,10 +158,14 @@ case class DataSource(
providingClass.newInstance() match {
case s: StreamSourceProvider =>
s.createSource(
- sparkSession.sqlContext, metadataPath, userSpecifiedSchema, className, options)
+ sparkSession.sqlContext,
+ metadataPath,
+ userSpecifiedSchema,
+ className,
+ caseInsensitiveOptions)
case format: FileFormat =>
- val path = new CaseInsensitiveMap(options).getOrElse("path", {
+ val path = caseInsensitiveOptions.getOrElse("path", {
throw new IllegalArgumentException("'path' is not specified")
})
new FileStreamSource(
@@ -171,7 +175,7 @@ case class DataSource(
schema = sourceInfo.schema,
partitionColumns = sourceInfo.partitionColumns,
metadataPath = metadataPath,
- options = options)
+ options = caseInsensitiveOptions)
case _ =>
throw new UnsupportedOperationException(
s"Data source $className does not support streamed reading")
@@ -182,10 +186,9 @@ case class DataSource(
def createSink(outputMode: OutputMode): Sink = {
providingClass.newInstance() match {
case s: StreamSinkProvider =>
- s.createSink(sparkSession.sqlContext, options, partitionColumns, outputMode)
+ s.createSink(sparkSession.sqlContext, caseInsensitiveOptions, partitionColumns, outputMode)
case fileFormat: FileFormat =>
- val caseInsensitiveOptions = new CaseInsensitiveMap(options)
val path = caseInsensitiveOptions.getOrElse("path", {
throw new IllegalArgumentException("'path' is not specified")
})
@@ -193,7 +196,7 @@ case class DataSource(
throw new IllegalArgumentException(
s"Data source $className does not support $outputMode output mode")
}
- new FileStreamSink(sparkSession, path, fileFormat, partitionColumns, options)
+ new FileStreamSink(sparkSession, path, fileFormat, partitionColumns, caseInsensitiveOptions)
case _ =>
throw new UnsupportedOperationException(
@@ -234,7 +237,6 @@ case class DataSource(
* that files already exist, we don't need to check them again.
*/
def resolveRelation(checkFilesExist: Boolean = true): BaseRelation = {
- val caseInsensitiveOptions = new CaseInsensitiveMap(options)
val relation = (providingClass.newInstance(), userSpecifiedSchema) match {
// TODO: Throw when too much is given.
case (dataSource: SchemaRelationProvider, Some(schema)) =>
@@ -274,7 +276,7 @@ case class DataSource(
dataSchema = dataSchema,
bucketSpec = None,
format,
- options)(sparkSession)
+ caseInsensitiveOptions)(sparkSession)
// This is a non-streaming file based datasource.
case (format: FileFormat, _) =>
@@ -358,13 +360,13 @@ case class DataSource(
providingClass.newInstance() match {
case dataSource: CreatableRelationProvider =>
- dataSource.createRelation(sparkSession.sqlContext, mode, options, data)
+ dataSource.createRelation(sparkSession.sqlContext, mode, caseInsensitiveOptions, data)
case format: FileFormat =>
// Don't glob path for the write path. The contracts here are:
// 1. Only one output path can be specified on the write path;
// 2. Output path must be a legal HDFS style file system path;
// 3. It's OK that the output path doesn't exist yet;
- val allPaths = paths ++ new CaseInsensitiveMap(options).get("path")
+ val allPaths = paths ++ caseInsensitiveOptions.get("path")
val outputPath = if (allPaths.length == 1) {
val path = new Path(allPaths.head)
val fs = path.getFileSystem(sparkSession.sessionState.newHadoopConf())
@@ -391,7 +393,7 @@ case class DataSource(
// TODO: Case sensitivity.
val sameColumns =
existingPartitionColumns.map(_.toLowerCase()) == partitionColumns.map(_.toLowerCase())
- if (existingPartitionColumns.size > 0 && !sameColumns) {
+ if (existingPartitionColumns.nonEmpty && !sameColumns) {
throw new AnalysisException(
s"""Requested partitioning does not match existing partitioning.
|Existing partitioning columns:
http://git-wip-us.apache.org/repos/asf/spark/blob/74f5c217/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
index 5903729..21e5030 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
@@ -23,11 +23,13 @@ import java.util.Locale
import org.apache.commons.lang3.time.FastDateFormat
import org.apache.spark.internal.Logging
-import org.apache.spark.sql.catalyst.util.{CompressionCodecs, ParseModes}
+import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, CompressionCodecs, ParseModes}
-private[csv] class CSVOptions(@transient private val parameters: Map[String, String])
+private[csv] class CSVOptions(@transient private val parameters: CaseInsensitiveMap)
extends Logging with Serializable {
+ def this(parameters: Map[String, String]) = this(new CaseInsensitiveMap(parameters))
+
private def getChar(paramName: String, default: Char): Char = {
val paramValue = parameters.get(paramName)
paramValue match {
@@ -128,7 +130,7 @@ private[csv] class CSVOptions(@transient private val parameters: Map[String, Str
object CSVOptions {
- def apply(): CSVOptions = new CSVOptions(Map.empty)
+ def apply(): CSVOptions = new CSVOptions(new CaseInsensitiveMap(Map.empty))
def apply(paramName: String, paramValue: String): CSVOptions = {
new CSVOptions(Map(paramName -> paramValue))
http://git-wip-us.apache.org/repos/asf/spark/blob/74f5c217/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
index 59fb48f..fa8dfa9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
@@ -96,21 +96,3 @@ case class RefreshResource(path: String)
Seq.empty[Row]
}
}
-
-/**
- * Builds a map in which keys are case insensitive
- */
-class CaseInsensitiveMap(map: Map[String, String]) extends Map[String, String]
- with Serializable {
-
- val baseMap = map.map(kv => kv.copy(_1 = kv._1.toLowerCase))
-
- override def get(k: String): Option[String] = baseMap.get(k.toLowerCase)
-
- override def + [B1 >: String](kv: (String, B1)): Map[String, B1] =
- baseMap + kv.copy(_1 = kv._1.toLowerCase)
-
- override def iterator: Iterator[(String, String)] = baseMap.iterator
-
- override def -(key: String): Map[String, String] = baseMap - key.toLowerCase
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/74f5c217/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
index fcd7409..7f419b5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
@@ -22,19 +22,23 @@ import java.util.Properties
import scala.collection.mutable.ArrayBuffer
+import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
+
/**
* Options for the JDBC data source.
*/
class JDBCOptions(
- @transient private val parameters: Map[String, String])
+ @transient private val parameters: CaseInsensitiveMap)
extends Serializable {
import JDBCOptions._
+ def this(parameters: Map[String, String]) = this(new CaseInsensitiveMap(parameters))
+
def this(url: String, table: String, parameters: Map[String, String]) = {
- this(parameters ++ Map(
+ this(new CaseInsensitiveMap(parameters ++ Map(
JDBCOptions.JDBC_URL -> url,
- JDBCOptions.JDBC_TABLE_NAME -> table))
+ JDBCOptions.JDBC_TABLE_NAME -> table)))
}
val asConnectionProperties: Properties = {
http://git-wip-us.apache.org/repos/asf/spark/blob/74f5c217/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala
index d0fd236..a81a95d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala
@@ -19,18 +19,22 @@ package org.apache.spark.sql.execution.datasources.parquet
import org.apache.parquet.hadoop.metadata.CompressionCodecName
+import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.internal.SQLConf
/**
* Options for the Parquet data source.
*/
private[parquet] class ParquetOptions(
- @transient private val parameters: Map[String, String],
+ @transient private val parameters: CaseInsensitiveMap,
@transient private val sqlConf: SQLConf)
extends Serializable {
import ParquetOptions._
+ def this(parameters: Map[String, String], sqlConf: SQLConf) =
+ this(new CaseInsensitiveMap(parameters), sqlConf)
+
/**
* Compression codec to use. By default use the value specified in SQLConf.
* Acceptable values are defined in [[shortParquetCompressionCodecNames]].
http://git-wip-us.apache.org/repos/asf/spark/blob/74f5c217/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala
index 3efc20c..fdea65c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala
@@ -20,13 +20,15 @@ package org.apache.spark.sql.execution.streaming
import scala.util.Try
import org.apache.spark.internal.Logging
-import org.apache.spark.sql.execution.datasources.CaseInsensitiveMap
+import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.util.Utils
/**
* User specified options for file streams.
*/
-class FileStreamOptions(parameters: Map[String, String]) extends Logging {
+class FileStreamOptions(parameters: CaseInsensitiveMap) extends Logging {
+
+ def this(parameters: Map[String, String]) = this(new CaseInsensitiveMap(parameters))
val maxFilesPerTrigger: Option[Int] = parameters.get("maxFilesPerTrigger").map { str =>
Try(str.toInt).toOption.filter(_ > 0).getOrElse {
@@ -50,5 +52,5 @@ class FileStreamOptions(parameters: Map[String, String]) extends Logging {
/** Options as specified by the user, in a case-insensitive map, without "path" set. */
val optionMapWithoutPath: Map[String, String] =
- new CaseInsensitiveMap(parameters).filterKeys(_ != "path")
+ parameters.filterKeys(_ != "path")
}
http://git-wip-us.apache.org/repos/asf/spark/blob/74f5c217/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala
index 5e00f66..93f752d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchemaSuite.scala
@@ -109,4 +109,9 @@ class CSVInferSchemaSuite extends SparkFunSuite {
val mergedNullTypes = CSVInferSchema.mergeRowTypes(Array(NullType), Array(NullType))
assert(mergedNullTypes.deep == Array(NullType).deep)
}
+
+ test("SPARK-18433: Improve DataSource option keys to be more case-insensitive") {
+ val options = new CSVOptions(Map("TiMeStampFormat" -> "yyyy-mm"))
+ assert(CSVInferSchema.inferField(TimestampType, "2015-08", options) == TimestampType)
+ }
}
http://git-wip-us.apache.org/repos/asf/spark/blob/74f5c217/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index 456052f..598e44e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -1366,7 +1366,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
test("SPARK-6245 JsonRDD.inferSchema on empty RDD") {
// This is really a test that it doesn't throw an exception
- val emptySchema = InferSchema.infer(empty, "", new JSONOptions(Map()))
+ val emptySchema = InferSchema.infer(empty, "", new JSONOptions(Map.empty[String, String]))
assert(StructType(Seq()) === emptySchema)
}
@@ -1390,7 +1390,8 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
}
test("SPARK-8093 Erase empty structs") {
- val emptySchema = InferSchema.infer(emptyRecords, "", new JSONOptions(Map()))
+ val emptySchema = InferSchema.infer(
+ emptyRecords, "", new JSONOptions(Map.empty[String, String]))
assert(StructType(Seq()) === emptySchema)
}
@@ -1749,4 +1750,18 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
checkAnswer(stringTimestampsWithFormat, expectedStringDatesWithFormat)
}
}
+
+ test("SPARK-18433: Improve DataSource option keys to be more case-insensitive") {
+ val records = sparkContext
+ .parallelize("""{"a": 3, "b": 1.1}""" :: """{"a": 3.1, "b": 0.000001}""" :: Nil)
+
+ val schema = StructType(
+ StructField("a", DecimalType(21, 1), true) ::
+ StructField("b", DecimalType(7, 6), true) :: Nil)
+
+ val df1 = spark.read.option("prefersDecimal", "true").json(records)
+ assert(df1.schema == schema)
+ val df2 = spark.read.option("PREfersdecimaL", "true").json(records)
+ assert(df2.schema == schema)
+ }
}
http://git-wip-us.apache.org/repos/asf/spark/blob/74f5c217/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index 580eade..acdadb3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
@@ -736,6 +736,13 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
}
}
}
+
+ test("SPARK-18433: Improve DataSource option keys to be more case-insensitive") {
+ withSQLConf(SQLConf.PARQUET_COMPRESSION.key -> "snappy") {
+ val option = new ParquetOptions(Map("Compression" -> "uncompressed"), spark.sessionState.conf)
+ assert(option.compressionCodecClassName == "UNCOMPRESSED")
+ }
+ }
}
class JobCommitFailureParquetOutputCommitter(outputPath: Path, context: TaskAttemptContext)
http://git-wip-us.apache.org/repos/asf/spark/blob/74f5c217/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
index 96540ec..e3d3c6c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
@@ -303,4 +303,13 @@ class JDBCWriteSuite extends SharedSQLContext with BeforeAndAfter {
assert(e.contains("If 'partitionColumn' is specified then 'lowerBound', 'upperBound'," +
" and 'numPartitions' are required."))
}
+
+ test("SPARK-18433: Improve DataSource option keys to be more case-insensitive") {
+ val df = spark.createDataFrame(sparkContext.parallelize(arr2x2), schema2)
+ df.write.format("jdbc")
+ .option("Url", url1)
+ .option("dbtable", "TEST.SAVETEST")
+ .options(properties.asScala)
+ .save()
+ }
}
http://git-wip-us.apache.org/repos/asf/spark/blob/74f5c217/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index fab7642..b365af7 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -1004,6 +1004,11 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
)
}
}
+
+ test("SPARK-18433: Improve DataSource option keys to be more case-insensitive") {
+ val options = new FileStreamOptions(Map("maxfilespertrigger" -> "1"))
+ assert(options.maxFilesPerTrigger == Some(1))
+ }
}
class FileStreamSourceStressTestSuite extends FileStreamSourceTest {
http://git-wip-us.apache.org/repos/asf/spark/blob/74f5c217/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index 42ce1a8..cbd00da 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -35,8 +35,8 @@ import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, Statistics}
+import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.execution.command.{ColumnStatStruct, DDLUtils}
-import org.apache.spark.sql.execution.datasources.CaseInsensitiveMap
import org.apache.spark.sql.hive.client.HiveClient
import org.apache.spark.sql.internal.HiveSerDe
import org.apache.spark.sql.internal.StaticSQLConf._
http://git-wip-us.apache.org/repos/asf/spark/blob/74f5c217/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcOptions.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcOptions.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcOptions.scala
index c2a126d..ac587ab 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcOptions.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcOptions.scala
@@ -17,14 +17,18 @@
package org.apache.spark.sql.hive.orc
+import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
+
/**
* Options for the ORC data source.
*/
-private[orc] class OrcOptions(@transient private val parameters: Map[String, String])
+private[orc] class OrcOptions(@transient private val parameters: CaseInsensitiveMap)
extends Serializable {
import OrcOptions._
+ def this(parameters: Map[String, String]) = this(new CaseInsensitiveMap(parameters))
+
/**
* Compression codec to use. By default snappy compression.
* Acceptable values are defined in [[shortOrcCompressionCodecNames]].
http://git-wip-us.apache.org/repos/asf/spark/blob/74f5c217/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala
index 0f37cd7..12f9480 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala
@@ -146,6 +146,10 @@ abstract class OrcSuite extends QueryTest with TestHiveSingleton with BeforeAndA
sql("DROP TABLE IF EXISTS orcNullValues")
}
+
+ test("SPARK-18433: Improve DataSource option keys to be more case-insensitive") {
+ assert(new OrcOptions(Map("Orc.Compress" -> "NONE")).compressionCodec == "NONE")
+ }
}
class OrcSourceSuite extends OrcSuite {
http://git-wip-us.apache.org/repos/asf/spark/blob/74f5c217/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
index 3644ff9..2ce60fe 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
@@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.execution.DataSourceScanExec
import org.apache.spark.sql.execution.command.ExecutedCommandExec
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, InsertIntoDataSourceCommand, InsertIntoHadoopFsRelationCommand, LogicalRelation}
+import org.apache.spark.sql.execution.datasources.parquet.ParquetOptions
import org.apache.spark.sql.hive.execution.HiveTableScanExec
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.internal.SQLConf
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org