You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2020/07/23 23:13:09 UTC

[spark] branch branch-2.4 updated: [SPARK-32364][SQL][2.4] Use CaseInsensitiveMap for DataFrameReader/Writer options

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
     new 6a653a2  [SPARK-32364][SQL][2.4] Use CaseInsensitiveMap for DataFrameReader/Writer options
6a653a2 is described below

commit 6a653a2faeb05c1d0f91cbbcaf3c8e37b0d6e0bc
Author: Dongjoon Hyun <do...@apache.org>
AuthorDate: Thu Jul 23 16:09:27 2020 -0700

    [SPARK-32364][SQL][2.4] Use CaseInsensitiveMap for DataFrameReader/Writer options
    
    ### What changes were proposed in this pull request?
    
    This PR is a backport of SPARK-32364 (https://github.com/apache/spark/pull/29160, https://github.com/apache/spark/pull/29191).
    
    When a user have multiple options like `path`, `paTH`, and `PATH` for the same key `path`, `option/options` is indeterministic because `extraOptions` is `HashMap`. This PR aims to use `CaseInsensitiveMap` instead of `HashMap` to fix this bug fundamentally.
    
    Like the following, DataFrame's `option/options` have been non-deterministic in terms of case-insensitivity because it stores the options at `extraOptions` which is using `HashMap` class.
    
    ```scala
    spark.read
      .option("paTh", "1")
      .option("PATH", "2")
      .option("Path", "3")
      .option("patH", "4")
      .load("5")
    ...
    org.apache.spark.sql.AnalysisException:
    Path does not exist: file:/.../1;
    ```
    
    Also, this PR adds the following.
    
    1. Add an explicit document to `DataFrameReader/DataFrameWriter`.
    
    2. Add `toMap` to `CaseInsensitiveMap` in order to return `originalMap: Map[String, T]` because it's more consistent with the existing `case-sensitive key names` behavior for the existing code pattern like `AppendData.byName(..., extraOptions.toMap)`. Previously, it was `HashMap.toMap`.
    
    3. During (2), we need to change the following to keep the original logic using `CaseInsensitiveMap.++`.
    ```scala
    - val params = extraOptions.toMap ++ connectionProperties.asScala.toMap
    + val params = extraOptions ++ connectionProperties.asScala
    ```
    
    4. Additionally, use `.toMap` in the following because `dsOptions.asCaseSensitiveMap()` is used later.
    ```scala
    - val options = sessionOptions ++ extraOptions
    + val options = sessionOptions.filterKeys(!extraOptions.contains(_)) ++ extraOptions.toMap
      val dsOptions = new CaseInsensitiveStringMap(options.asJava)
    ```
    
    `extraOptions.toMap` is used in several places (e.g. `DataFrameReader`) to hand over `Map[String, T]`. In this case, `CaseInsensitiveMap[T] private (val originalMap: Map[String, T])` had better return `originalMap`.
    
    ### Why are the changes needed?
    
    This will fix indeterministic behavior.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes.
    
    ### How was this patch tested?
    
    Pass the Jenkins with the existing tests and newly add test cases.
    
    Closes #29209 from dongjoon-hyun/SPARK-32364-2.4.
    
    Authored-by: Dongjoon Hyun <do...@apache.org>
    Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
 .../sql/catalyst/util/CaseInsensitiveMap.scala     |  8 ++++++-
 .../org/apache/spark/sql/DataFrameReader.scala     | 25 +++++++++++++++++++---
 .../org/apache/spark/sql/DataFrameWriter.scala     | 25 +++++++++++++++++++---
 .../org/apache/spark/sql/jdbc/JDBCSuite.scala      | 24 ++++++++++++++++++++-
 .../sql/test/DataFrameReaderWriterSuite.scala      | 22 +++++++++++++++++++
 5 files changed, 96 insertions(+), 8 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CaseInsensitiveMap.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CaseInsensitiveMap.scala
index 4b149d2..eb12d33 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CaseInsensitiveMap.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CaseInsensitiveMap.scala
@@ -35,15 +35,21 @@ class CaseInsensitiveMap[T] private (val originalMap: Map[String, T]) extends Ma
   override def contains(k: String): Boolean =
     keyLowerCasedMap.contains(k.toLowerCase(Locale.ROOT))
 
-  override def +[B1 >: T](kv: (String, B1)): Map[String, B1] = {
+  override def +[B1 >: T](kv: (String, B1)): CaseInsensitiveMap[B1] = {
     new CaseInsensitiveMap(originalMap.filter(!_._1.equalsIgnoreCase(kv._1)) + kv)
   }
 
+  def ++(xs: TraversableOnce[(String, T)]): CaseInsensitiveMap[T] = {
+    xs.foldLeft(this)(_ + _)
+  }
+
   override def iterator: Iterator[(String, T)] = keyLowerCasedMap.iterator
 
   override def -(key: String): Map[String, T] = {
     new CaseInsensitiveMap(originalMap.filterKeys(!_.equalsIgnoreCase(key)))
   }
+
+  def toMap: Map[String, T] = originalMap
 }
 
 object CaseInsensitiveMap {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index c71f871..ce0a4e8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -30,6 +30,7 @@ import org.apache.spark.api.java.JavaRDD
 import org.apache.spark.internal.Logging
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.json.{CreateJacksonParser, JacksonParser, JSONOptions}
+import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
 import org.apache.spark.sql.execution.command.DDLUtils
 import org.apache.spark.sql.execution.datasources.{DataSource, FailureSafeParser}
 import org.apache.spark.sql.execution.datasources.csv._
@@ -91,6 +92,9 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
   /**
    * Adds an input option for the underlying data source.
    *
+   * All options are maintained in a case-insensitive way in terms of key names.
+   * If a new option has the same key case-insensitively, it will override the existing option.
+   *
    * You can set the following option(s):
    * <ul>
    * <li>`timeZone` (default session local timezone): sets the string that indicates a timezone
@@ -107,6 +111,9 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
   /**
    * Adds an input option for the underlying data source.
    *
+   * All options are maintained in a case-insensitive way in terms of key names.
+   * If a new option has the same key case-insensitively, it will override the existing option.
+   *
    * @since 2.0.0
    */
   def option(key: String, value: Boolean): DataFrameReader = option(key, value.toString)
@@ -114,6 +121,9 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
   /**
    * Adds an input option for the underlying data source.
    *
+   * All options are maintained in a case-insensitive way in terms of key names.
+   * If a new option has the same key case-insensitively, it will override the existing option.
+   *
    * @since 2.0.0
    */
   def option(key: String, value: Long): DataFrameReader = option(key, value.toString)
@@ -121,6 +131,9 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
   /**
    * Adds an input option for the underlying data source.
    *
+   * All options are maintained in a case-insensitive way in terms of key names.
+   * If a new option has the same key case-insensitively, it will override the existing option.
+   *
    * @since 2.0.0
    */
   def option(key: String, value: Double): DataFrameReader = option(key, value.toString)
@@ -128,6 +141,9 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
   /**
    * (Scala-specific) Adds input options for the underlying data source.
    *
+   * All options are maintained in a case-insensitive way in terms of key names.
+   * If a new option has the same key case-insensitively, it will override the existing option.
+   *
    * You can set the following option(s):
    * <ul>
    * <li>`timeZone` (default session local timezone): sets the string that indicates a timezone
@@ -144,6 +160,9 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
   /**
    * Adds input options for the underlying data source.
    *
+   * All options are maintained in a case-insensitive way in terms of key names.
+   * If a new option has the same key case-insensitively, it will override the existing option.
+   *
    * You can set the following option(s):
    * <ul>
    * <li>`timeZone` (default session local timezone): sets the string that indicates a timezone
@@ -234,7 +253,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
     // properties should override settings in extraOptions.
     this.extraOptions ++= properties.asScala
     // explicit url and dbtable should override all
-    this.extraOptions += (JDBCOptions.JDBC_URL -> url, JDBCOptions.JDBC_TABLE_NAME -> table)
+    this.extraOptions ++= Seq(JDBCOptions.JDBC_URL -> url, JDBCOptions.JDBC_TABLE_NAME -> table)
     format("jdbc").load()
   }
 
@@ -305,7 +324,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
       connectionProperties: Properties): DataFrame = {
     assertNoSpecifiedSchema("jdbc")
     // connectionProperties should override settings in extraOptions.
-    val params = extraOptions.toMap ++ connectionProperties.asScala.toMap
+    val params = extraOptions ++ connectionProperties.asScala
     val options = new JDBCOptions(url, table, params)
     val parts: Array[Partition] = predicates.zipWithIndex.map { case (part, i) =>
       JDBCPartition(part, i) : Partition
@@ -790,6 +809,6 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
 
   private var userSpecifiedSchema: Option[StructType] = None
 
-  private val extraOptions = new scala.collection.mutable.HashMap[String, String]
+  private var extraOptions = CaseInsensitiveMap[String](Map.empty)
 
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index f47926c..3337f22 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, UnresolvedRelation}
 import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.plans.logical.{AppendData, InsertIntoTable, LogicalPlan}
+import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
 import org.apache.spark.sql.execution.SQLExecution
 import org.apache.spark.sql.execution.command.DDLUtils
 import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, DataSourceUtils, LogicalRelation}
@@ -98,6 +99,9 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
   /**
    * Adds an output option for the underlying data source.
    *
+   * All options are maintained in a case-insensitive way in terms of key names.
+   * If a new option has the same key case-insensitively, it will override the existing option.
+   *
    * You can set the following option(s):
    * <ul>
    * <li>`timeZone` (default session local timezone): sets the string that indicates a timezone
@@ -114,6 +118,9 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
   /**
    * Adds an output option for the underlying data source.
    *
+   * All options are maintained in a case-insensitive way in terms of key names.
+   * If a new option has the same key case-insensitively, it will override the existing option.
+   *
    * @since 2.0.0
    */
   def option(key: String, value: Boolean): DataFrameWriter[T] = option(key, value.toString)
@@ -121,6 +128,9 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
   /**
    * Adds an output option for the underlying data source.
    *
+   * All options are maintained in a case-insensitive way in terms of key names.
+   * If a new option has the same key case-insensitively, it will override the existing option.
+   *
    * @since 2.0.0
    */
   def option(key: String, value: Long): DataFrameWriter[T] = option(key, value.toString)
@@ -128,6 +138,9 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
   /**
    * Adds an output option for the underlying data source.
    *
+   * All options are maintained in a case-insensitive way in terms of key names.
+   * If a new option has the same key case-insensitively, it will override the existing option.
+   *
    * @since 2.0.0
    */
   def option(key: String, value: Double): DataFrameWriter[T] = option(key, value.toString)
@@ -135,6 +148,9 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
   /**
    * (Scala-specific) Adds output options for the underlying data source.
    *
+   * All options are maintained in a case-insensitive way in terms of key names.
+   * If a new option has the same key case-insensitively, it will override the existing option.
+   *
    * You can set the following option(s):
    * <ul>
    * <li>`timeZone` (default session local timezone): sets the string that indicates a timezone
@@ -151,6 +167,9 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
   /**
    * Adds output options for the underlying data source.
    *
+   * All options are maintained in a case-insensitive way in terms of key names.
+   * If a new option has the same key case-insensitively, it will override the existing option.
+   *
    * You can set the following option(s):
    * <ul>
    * <li>`timeZone` (default session local timezone): sets the string that indicates a timezone
@@ -251,7 +270,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
           val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
             source,
             df.sparkSession.sessionState.conf)
-          val options = sessionOptions ++ extraOptions
+          val options = sessionOptions.filterKeys(!extraOptions.contains(_)) ++ extraOptions.toMap
 
           val writer = ws.createWriter(
             UUID.randomUUID.toString, df.logicalPlan.output.toStructType, mode,
@@ -512,7 +531,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
     // connectionProperties should override settings in extraOptions.
     this.extraOptions ++= connectionProperties.asScala
     // explicit url and dbtable should override all
-    this.extraOptions += ("url" -> url, "dbtable" -> table)
+    this.extraOptions ++= Seq("url" -> url, "dbtable" -> table)
     format("jdbc").save()
   }
 
@@ -692,7 +711,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
 
   private var mode: SaveMode = SaveMode.ErrorIfExists
 
-  private val extraOptions = new scala.collection.mutable.HashMap[String, String]
+  private var extraOptions = CaseInsensitiveMap[String](Map.empty)
 
   private var partitioningColumns: Option[Seq[String]] = None
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
index 0edd226..dc61f72 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
@@ -21,6 +21,8 @@ import java.math.BigDecimal
 import java.sql.{Date, DriverManager, SQLException, Timestamp}
 import java.util.{Calendar, GregorianCalendar, Properties}
 
+import scala.collection.JavaConverters._
+
 import org.h2.jdbc.JdbcSQLException
 import org.scalatest.{BeforeAndAfter, PrivateMethodTester}
 
@@ -1261,7 +1263,8 @@ class JDBCSuite extends QueryTest
     testJdbcOptions(new JDBCOptions(parameters))
     testJdbcOptions(new JDBCOptions(CaseInsensitiveMap(parameters)))
     // test add/remove key-value from the case-insensitive map
-    var modifiedParameters = CaseInsensitiveMap(Map.empty) ++ parameters
+    var modifiedParameters =
+      (CaseInsensitiveMap(Map.empty) ++ parameters).asInstanceOf[Map[String, String]]
     testJdbcOptions(new JDBCOptions(modifiedParameters))
     modifiedParameters -= "dbtable"
     assert(modifiedParameters.get("dbTAblE").isEmpty)
@@ -1585,4 +1588,23 @@ class JDBCSuite extends QueryTest
       checkNotPushdown(sql("SELECT name, theid FROM predicateOption WHERE theid = 1")),
       Row("fred", 1) :: Nil)
   }
+
+  test("SPARK-32364: JDBCOption constructor") {
+    val extraOptions = CaseInsensitiveMap[String](Map("UrL" -> "url1", "dBTable" -> "table1"))
+    val connectionProperties = new Properties()
+    connectionProperties.put("url", "url2")
+    connectionProperties.put("dbtable", "table2")
+
+    // connection property should override the options in extraOptions
+    val params = extraOptions ++ connectionProperties.asScala
+    assert(params.size == 2)
+    assert(params.get("uRl").contains("url2"))
+    assert(params.get("DbtaBle").contains("table2"))
+
+    // JDBCOptions constructor parameter should overwrite the existing conf
+    val options = new JDBCOptions(url, "table3", params)
+    assert(options.asProperties.size == 2)
+    assert(options.asProperties.get("url") == url)
+    assert(options.asProperties.get("dbtable") == "table3")
+  }
 }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
index 67cd0b9..2b5b227 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
@@ -212,6 +212,28 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext with Be
     assert(LastOptions.parameters("opt3") == "3")
   }
 
+  test("SPARK-32364: path argument of load function should override all existing options") {
+    spark.read
+      .format("org.apache.spark.sql.test")
+      .option("paTh", "1")
+      .option("PATH", "2")
+      .option("Path", "3")
+      .option("patH", "4")
+      .load("5")
+    assert(LastOptions.parameters("path") == "5")
+  }
+
+  test("SPARK-32364: path argument of save function should override all existing options") {
+    Seq(1).toDF.write
+      .format("org.apache.spark.sql.test")
+      .option("paTh", "1")
+      .option("PATH", "2")
+      .option("Path", "3")
+      .option("patH", "4")
+      .save("5")
+    assert(LastOptions.parameters("path") == "5")
+  }
+
   test("pass partitionBy as options") {
     Seq(true, false).foreach { flag =>
       withSQLConf(SQLConf.LEGACY_PASS_PARTITION_BY_AS_OPTIONS.key -> s"$flag") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org