You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2020/07/23 23:13:09 UTC
[spark] branch branch-2.4 updated: [SPARK-32364][SQL][2.4] Use
CaseInsensitiveMap for DataFrameReader/Writer options
This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-2.4 by this push:
new 6a653a2 [SPARK-32364][SQL][2.4] Use CaseInsensitiveMap for DataFrameReader/Writer options
6a653a2 is described below
commit 6a653a2faeb05c1d0f91cbbcaf3c8e37b0d6e0bc
Author: Dongjoon Hyun <do...@apache.org>
AuthorDate: Thu Jul 23 16:09:27 2020 -0700
[SPARK-32364][SQL][2.4] Use CaseInsensitiveMap for DataFrameReader/Writer options
### What changes were proposed in this pull request?
This PR is a backport of SPARK-32364 (https://github.com/apache/spark/pull/29160, https://github.com/apache/spark/pull/29191).
When a user have multiple options like `path`, `paTH`, and `PATH` for the same key `path`, `option/options` is indeterministic because `extraOptions` is `HashMap`. This PR aims to use `CaseInsensitiveMap` instead of `HashMap` to fix this bug fundamentally.
Like the following, DataFrame's `option/options` have been non-deterministic in terms of case-insensitivity because it stores the options at `extraOptions` which is using `HashMap` class.
```scala
spark.read
.option("paTh", "1")
.option("PATH", "2")
.option("Path", "3")
.option("patH", "4")
.load("5")
...
org.apache.spark.sql.AnalysisException:
Path does not exist: file:/.../1;
```
Also, this PR adds the following.
1. Add an explicit document to `DataFrameReader/DataFrameWriter`.
2. Add `toMap` to `CaseInsensitiveMap` in order to return `originalMap: Map[String, T]` because it's more consistent with the existing `case-sensitive key names` behavior for the existing code pattern like `AppendData.byName(..., extraOptions.toMap)`. Previously, it was `HashMap.toMap`.
3. During (2), we need to change the following to keep the original logic using `CaseInsensitiveMap.++`.
```scala
- val params = extraOptions.toMap ++ connectionProperties.asScala.toMap
+ val params = extraOptions ++ connectionProperties.asScala
```
4. Additionally, use `.toMap` in the following because `dsOptions.asCaseSensitiveMap()` is used later.
```scala
- val options = sessionOptions ++ extraOptions
+ val options = sessionOptions.filterKeys(!extraOptions.contains(_)) ++ extraOptions.toMap
val dsOptions = new CaseInsensitiveStringMap(options.asJava)
```
`extraOptions.toMap` is used in several places (e.g. `DataFrameReader`) to hand over `Map[String, T]`. In this case, `CaseInsensitiveMap[T] private (val originalMap: Map[String, T])` had better return `originalMap`.
### Why are the changes needed?
This will fix indeterministic behavior.
### Does this PR introduce _any_ user-facing change?
Yes.
### How was this patch tested?
Pass the Jenkins with the existing tests and newly add test cases.
Closes #29209 from dongjoon-hyun/SPARK-32364-2.4.
Authored-by: Dongjoon Hyun <do...@apache.org>
Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
.../sql/catalyst/util/CaseInsensitiveMap.scala | 8 ++++++-
.../org/apache/spark/sql/DataFrameReader.scala | 25 +++++++++++++++++++---
.../org/apache/spark/sql/DataFrameWriter.scala | 25 +++++++++++++++++++---
.../org/apache/spark/sql/jdbc/JDBCSuite.scala | 24 ++++++++++++++++++++-
.../sql/test/DataFrameReaderWriterSuite.scala | 22 +++++++++++++++++++
5 files changed, 96 insertions(+), 8 deletions(-)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CaseInsensitiveMap.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CaseInsensitiveMap.scala
index 4b149d2..eb12d33 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CaseInsensitiveMap.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CaseInsensitiveMap.scala
@@ -35,15 +35,21 @@ class CaseInsensitiveMap[T] private (val originalMap: Map[String, T]) extends Ma
override def contains(k: String): Boolean =
keyLowerCasedMap.contains(k.toLowerCase(Locale.ROOT))
- override def +[B1 >: T](kv: (String, B1)): Map[String, B1] = {
+ override def +[B1 >: T](kv: (String, B1)): CaseInsensitiveMap[B1] = {
new CaseInsensitiveMap(originalMap.filter(!_._1.equalsIgnoreCase(kv._1)) + kv)
}
+ def ++(xs: TraversableOnce[(String, T)]): CaseInsensitiveMap[T] = {
+ xs.foldLeft(this)(_ + _)
+ }
+
override def iterator: Iterator[(String, T)] = keyLowerCasedMap.iterator
override def -(key: String): Map[String, T] = {
new CaseInsensitiveMap(originalMap.filterKeys(!_.equalsIgnoreCase(key)))
}
+
+ def toMap: Map[String, T] = originalMap
}
object CaseInsensitiveMap {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index c71f871..ce0a4e8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -30,6 +30,7 @@ import org.apache.spark.api.java.JavaRDD
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.json.{CreateJacksonParser, JacksonParser, JSONOptions}
+import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.execution.datasources.{DataSource, FailureSafeParser}
import org.apache.spark.sql.execution.datasources.csv._
@@ -91,6 +92,9 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
/**
* Adds an input option for the underlying data source.
*
+ * All options are maintained in a case-insensitive way in terms of key names.
+ * If a new option has the same key case-insensitively, it will override the existing option.
+ *
* You can set the following option(s):
* <ul>
* <li>`timeZone` (default session local timezone): sets the string that indicates a timezone
@@ -107,6 +111,9 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
/**
* Adds an input option for the underlying data source.
*
+ * All options are maintained in a case-insensitive way in terms of key names.
+ * If a new option has the same key case-insensitively, it will override the existing option.
+ *
* @since 2.0.0
*/
def option(key: String, value: Boolean): DataFrameReader = option(key, value.toString)
@@ -114,6 +121,9 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
/**
* Adds an input option for the underlying data source.
*
+ * All options are maintained in a case-insensitive way in terms of key names.
+ * If a new option has the same key case-insensitively, it will override the existing option.
+ *
* @since 2.0.0
*/
def option(key: String, value: Long): DataFrameReader = option(key, value.toString)
@@ -121,6 +131,9 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
/**
* Adds an input option for the underlying data source.
*
+ * All options are maintained in a case-insensitive way in terms of key names.
+ * If a new option has the same key case-insensitively, it will override the existing option.
+ *
* @since 2.0.0
*/
def option(key: String, value: Double): DataFrameReader = option(key, value.toString)
@@ -128,6 +141,9 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
/**
* (Scala-specific) Adds input options for the underlying data source.
*
+ * All options are maintained in a case-insensitive way in terms of key names.
+ * If a new option has the same key case-insensitively, it will override the existing option.
+ *
* You can set the following option(s):
* <ul>
* <li>`timeZone` (default session local timezone): sets the string that indicates a timezone
@@ -144,6 +160,9 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
/**
* Adds input options for the underlying data source.
*
+ * All options are maintained in a case-insensitive way in terms of key names.
+ * If a new option has the same key case-insensitively, it will override the existing option.
+ *
* You can set the following option(s):
* <ul>
* <li>`timeZone` (default session local timezone): sets the string that indicates a timezone
@@ -234,7 +253,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
// properties should override settings in extraOptions.
this.extraOptions ++= properties.asScala
// explicit url and dbtable should override all
- this.extraOptions += (JDBCOptions.JDBC_URL -> url, JDBCOptions.JDBC_TABLE_NAME -> table)
+ this.extraOptions ++= Seq(JDBCOptions.JDBC_URL -> url, JDBCOptions.JDBC_TABLE_NAME -> table)
format("jdbc").load()
}
@@ -305,7 +324,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
connectionProperties: Properties): DataFrame = {
assertNoSpecifiedSchema("jdbc")
// connectionProperties should override settings in extraOptions.
- val params = extraOptions.toMap ++ connectionProperties.asScala.toMap
+ val params = extraOptions ++ connectionProperties.asScala
val options = new JDBCOptions(url, table, params)
val parts: Array[Partition] = predicates.zipWithIndex.map { case (part, i) =>
JDBCPartition(part, i) : Partition
@@ -790,6 +809,6 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
private var userSpecifiedSchema: Option[StructType] = None
- private val extraOptions = new scala.collection.mutable.HashMap[String, String]
+ private var extraOptions = CaseInsensitiveMap[String](Map.empty)
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index f47926c..3337f22 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, UnresolvedRelation}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.plans.logical.{AppendData, InsertIntoTable, LogicalPlan}
+import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.execution.SQLExecution
import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, DataSourceUtils, LogicalRelation}
@@ -98,6 +99,9 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
/**
* Adds an output option for the underlying data source.
*
+ * All options are maintained in a case-insensitive way in terms of key names.
+ * If a new option has the same key case-insensitively, it will override the existing option.
+ *
* You can set the following option(s):
* <ul>
* <li>`timeZone` (default session local timezone): sets the string that indicates a timezone
@@ -114,6 +118,9 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
/**
* Adds an output option for the underlying data source.
*
+ * All options are maintained in a case-insensitive way in terms of key names.
+ * If a new option has the same key case-insensitively, it will override the existing option.
+ *
* @since 2.0.0
*/
def option(key: String, value: Boolean): DataFrameWriter[T] = option(key, value.toString)
@@ -121,6 +128,9 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
/**
* Adds an output option for the underlying data source.
*
+ * All options are maintained in a case-insensitive way in terms of key names.
+ * If a new option has the same key case-insensitively, it will override the existing option.
+ *
* @since 2.0.0
*/
def option(key: String, value: Long): DataFrameWriter[T] = option(key, value.toString)
@@ -128,6 +138,9 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
/**
* Adds an output option for the underlying data source.
*
+ * All options are maintained in a case-insensitive way in terms of key names.
+ * If a new option has the same key case-insensitively, it will override the existing option.
+ *
* @since 2.0.0
*/
def option(key: String, value: Double): DataFrameWriter[T] = option(key, value.toString)
@@ -135,6 +148,9 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
/**
* (Scala-specific) Adds output options for the underlying data source.
*
+ * All options are maintained in a case-insensitive way in terms of key names.
+ * If a new option has the same key case-insensitively, it will override the existing option.
+ *
* You can set the following option(s):
* <ul>
* <li>`timeZone` (default session local timezone): sets the string that indicates a timezone
@@ -151,6 +167,9 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
/**
* Adds output options for the underlying data source.
*
+ * All options are maintained in a case-insensitive way in terms of key names.
+ * If a new option has the same key case-insensitively, it will override the existing option.
+ *
* You can set the following option(s):
* <ul>
* <li>`timeZone` (default session local timezone): sets the string that indicates a timezone
@@ -251,7 +270,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
source,
df.sparkSession.sessionState.conf)
- val options = sessionOptions ++ extraOptions
+ val options = sessionOptions.filterKeys(!extraOptions.contains(_)) ++ extraOptions.toMap
val writer = ws.createWriter(
UUID.randomUUID.toString, df.logicalPlan.output.toStructType, mode,
@@ -512,7 +531,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
// connectionProperties should override settings in extraOptions.
this.extraOptions ++= connectionProperties.asScala
// explicit url and dbtable should override all
- this.extraOptions += ("url" -> url, "dbtable" -> table)
+ this.extraOptions ++= Seq("url" -> url, "dbtable" -> table)
format("jdbc").save()
}
@@ -692,7 +711,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
private var mode: SaveMode = SaveMode.ErrorIfExists
- private val extraOptions = new scala.collection.mutable.HashMap[String, String]
+ private var extraOptions = CaseInsensitiveMap[String](Map.empty)
private var partitioningColumns: Option[Seq[String]] = None
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
index 0edd226..dc61f72 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
@@ -21,6 +21,8 @@ import java.math.BigDecimal
import java.sql.{Date, DriverManager, SQLException, Timestamp}
import java.util.{Calendar, GregorianCalendar, Properties}
+import scala.collection.JavaConverters._
+
import org.h2.jdbc.JdbcSQLException
import org.scalatest.{BeforeAndAfter, PrivateMethodTester}
@@ -1261,7 +1263,8 @@ class JDBCSuite extends QueryTest
testJdbcOptions(new JDBCOptions(parameters))
testJdbcOptions(new JDBCOptions(CaseInsensitiveMap(parameters)))
// test add/remove key-value from the case-insensitive map
- var modifiedParameters = CaseInsensitiveMap(Map.empty) ++ parameters
+ var modifiedParameters =
+ (CaseInsensitiveMap(Map.empty) ++ parameters).asInstanceOf[Map[String, String]]
testJdbcOptions(new JDBCOptions(modifiedParameters))
modifiedParameters -= "dbtable"
assert(modifiedParameters.get("dbTAblE").isEmpty)
@@ -1585,4 +1588,23 @@ class JDBCSuite extends QueryTest
checkNotPushdown(sql("SELECT name, theid FROM predicateOption WHERE theid = 1")),
Row("fred", 1) :: Nil)
}
+
+ test("SPARK-32364: JDBCOption constructor") {
+ val extraOptions = CaseInsensitiveMap[String](Map("UrL" -> "url1", "dBTable" -> "table1"))
+ val connectionProperties = new Properties()
+ connectionProperties.put("url", "url2")
+ connectionProperties.put("dbtable", "table2")
+
+ // connection property should override the options in extraOptions
+ val params = extraOptions ++ connectionProperties.asScala
+ assert(params.size == 2)
+ assert(params.get("uRl").contains("url2"))
+ assert(params.get("DbtaBle").contains("table2"))
+
+ // JDBCOptions constructor parameter should overwrite the existing conf
+ val options = new JDBCOptions(url, "table3", params)
+ assert(options.asProperties.size == 2)
+ assert(options.asProperties.get("url") == url)
+ assert(options.asProperties.get("dbtable") == "table3")
+ }
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
index 67cd0b9..2b5b227 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
@@ -212,6 +212,28 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext with Be
assert(LastOptions.parameters("opt3") == "3")
}
+ test("SPARK-32364: path argument of load function should override all existing options") {
+ spark.read
+ .format("org.apache.spark.sql.test")
+ .option("paTh", "1")
+ .option("PATH", "2")
+ .option("Path", "3")
+ .option("patH", "4")
+ .load("5")
+ assert(LastOptions.parameters("path") == "5")
+ }
+
+ test("SPARK-32364: path argument of save function should override all existing options") {
+ Seq(1).toDF.write
+ .format("org.apache.spark.sql.test")
+ .option("paTh", "1")
+ .option("PATH", "2")
+ .option("Path", "3")
+ .option("patH", "4")
+ .save("5")
+ assert(LastOptions.parameters("path") == "5")
+ }
+
test("pass partitionBy as options") {
Seq(true, false).foreach { flag =>
withSQLConf(SQLConf.LEGACY_PASS_PARTITION_BY_AS_OPTIONS.key -> s"$flag") {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org