You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2020/09/10 06:51:48 UTC

[spark] branch branch-3.0 updated: [SPARK-32832][SS] Use CaseInsensitiveMap for DataStreamReader/Writer options

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 44acb5a  [SPARK-32832][SS] Use CaseInsensitiveMap for DataStreamReader/Writer options
44acb5a is described below

commit 44acb5a03a1c3cebe0935ec6f2b2d59afbb8f7e2
Author: Dongjoon Hyun <do...@apache.org>
AuthorDate: Wed Sep 9 23:41:32 2020 -0700

    [SPARK-32832][SS] Use CaseInsensitiveMap for DataStreamReader/Writer options
    
    This PR aims to fix indeterministic behavior on DataStreamReader/Writer options like the following.
    ```scala
    scala> spark.readStream.format("parquet").option("paTh", "1").option("PATH", "2").option("Path", "3").option("patH", "4").option("path", "5").load()
    org.apache.spark.sql.AnalysisException: Path does not exist: 1;
    ```
    
    This will make the behavior deterministic.
    
    Yes, but the previous behavior is indeterministic.
    
    Pass the newly test cases.
    
    Closes #29702 from dongjoon-hyun/SPARK-32832.
    
    Authored-by: Dongjoon Hyun <do...@apache.org>
    Signed-off-by: Dongjoon Hyun <do...@apache.org>
    (cherry picked from commit 2f85f9516cfc33a376871cf27f9fb4ac30ecbed8)
    Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
 .../spark/sql/streaming/DataStreamReader.scala     |  5 ++--
 .../spark/sql/streaming/DataStreamWriter.scala     |  5 ++--
 .../test/DataStreamReaderWriterSuite.scala         | 31 ++++++++++++++++++++++
 3 files changed, 37 insertions(+), 4 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
index 1d7e4d3..6b30949 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
@@ -24,6 +24,7 @@ import scala.collection.JavaConverters._
 import org.apache.spark.annotation.Evolving
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset, SparkSession}
+import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
 import org.apache.spark.sql.connector.catalog.{SupportsRead, TableProvider}
 import org.apache.spark.sql.connector.catalog.TableCapability._
 import org.apache.spark.sql.execution.command.DDLUtils
@@ -210,7 +211,7 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
       case provider: TableProvider if !provider.isInstanceOf[FileDataSourceV2] =>
         val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
           source = provider, conf = sparkSession.sessionState.conf)
-        val options = sessionOptions ++ extraOptions
+        val options = sessionOptions ++ extraOptions.toMap
         val dsOptions = new CaseInsensitiveStringMap(options.asJava)
         val table = DataSourceV2Utils.getTableFromProvider(provider, dsOptions, userSpecifiedSchema)
         import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._
@@ -520,5 +521,5 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
 
   private var userSpecifiedSchema: Option[StructType] = None
 
-  private var extraOptions = new scala.collection.mutable.HashMap[String, String]
+  private var extraOptions = CaseInsensitiveMap[String](Map.empty)
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
index 1d0ca4d..07ab400 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
@@ -26,6 +26,7 @@ import org.apache.spark.annotation.Evolving
 import org.apache.spark.api.java.function.VoidFunction2
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.streaming.InternalOutputModes
+import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
 import org.apache.spark.sql.connector.catalog.{SupportsWrite, TableProvider}
 import org.apache.spark.sql.connector.catalog.TableCapability._
 import org.apache.spark.sql.execution.command.DDLUtils
@@ -349,7 +350,7 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
         val provider = cls.getConstructor().newInstance().asInstanceOf[TableProvider]
         val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
           source = provider, conf = df.sparkSession.sessionState.conf)
-        val options = sessionOptions ++ extraOptions
+        val options = sessionOptions ++ extraOptions.toMap
         val dsOptions = new CaseInsensitiveStringMap(options.asJava)
         val table = DataSourceV2Utils.getTableFromProvider(
           provider, dsOptions, userSpecifiedSchema = None)
@@ -472,7 +473,7 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
 
   private var trigger: Trigger = Trigger.ProcessingTime(0L)
 
-  private var extraOptions = new scala.collection.mutable.HashMap[String, String]
+  private var extraOptions = CaseInsensitiveMap[String](Map.empty)
 
   private var foreachWriter: ForeachWriter[T] = null
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
index b646387..d90af35 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
@@ -185,6 +185,37 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter {
     query.stop()
   }
 
+  test("SPARK-32832: later option should override earlier options for load()") {
+    spark.readStream
+      .format("org.apache.spark.sql.streaming.test")
+      .option("paTh", "1")
+      .option("PATH", "2")
+      .option("Path", "3")
+      .option("patH", "4")
+      .option("path", "5")
+      .load()
+    assert(LastOptions.parameters("path") == "5")
+  }
+
+  test("SPARK-32832: later option should override earlier options for start()") {
+    val ds = spark.readStream
+      .format("org.apache.spark.sql.streaming.test")
+      .load()
+    assert(LastOptions.parameters.isEmpty)
+
+    val query = ds.writeStream
+      .format("org.apache.spark.sql.streaming.test")
+      .option("checkpointLocation", newMetadataDir)
+      .option("paTh", "1")
+      .option("PATH", "2")
+      .option("Path", "3")
+      .option("patH", "4")
+      .option("path", "5")
+      .start()
+    assert(LastOptions.parameters("path") == "5")
+    query.stop()
+  }
+
   test("partitioning") {
     val df = spark.readStream
       .format("org.apache.spark.sql.streaming.test")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org