You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2020/09/11 03:27:50 UTC

[spark] branch branch-2.4 updated: [SPARK-32832][SS][2.4] Use CaseInsensitiveMap for DataStreamReader/Writer options

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
     new ee6b7d4  [SPARK-32832][SS][2.4] Use CaseInsensitiveMap for DataStreamReader/Writer options
ee6b7d4 is described below

commit ee6b7d40f8ccfb485714cdbf36911455f389e1a1
Author: Dongjoon Hyun <do...@apache.org>
AuthorDate: Thu Sep 10 20:25:03 2020 -0700

    [SPARK-32832][SS][2.4] Use CaseInsensitiveMap for DataStreamReader/Writer options
    
    ### What changes were proposed in this pull request?
    
    This is a backport of https://github.com/apache/spark/pull/29702 .
    
    This PR aims to fix indeterministic behavior on DataStreamReader/Writer options like the following.
    ```scala
    scala> spark.readStream.format("parquet").option("paTh", "1").option("PATH", "2").option("Path", "3").option("patH", "4").option("path", "5").load()
    org.apache.spark.sql.AnalysisException: Path does not exist: 1;
    ```
    
    ### Why are the changes needed?
    
    This will make the behavior deterministic.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, but the previous behavior is indeterministic.
    
    ### How was this patch tested?
    
    Pass the newly test cases.
    
    Closes #29707 from dongjoon-hyun/SPARK-32832-2.
    
    Authored-by: Dongjoon Hyun <do...@apache.org>
    Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
 .../spark/sql/streaming/DataStreamReader.scala     |  7 ++---
 .../spark/sql/streaming/DataStreamWriter.scala     |  5 ++--
 .../test/DataStreamReaderWriterSuite.scala         | 31 ++++++++++++++++++++++
 3 files changed, 38 insertions(+), 5 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
index a9cb5e8..1a445a3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
@@ -24,6 +24,7 @@ import scala.collection.JavaConverters._
 import org.apache.spark.annotation.InterfaceStability
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset, SparkSession}
+import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
 import org.apache.spark.sql.execution.command.DDLUtils
 import org.apache.spark.sql.execution.datasources.DataSource
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils
@@ -175,7 +176,7 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
       case s: MicroBatchReadSupport =>
         val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
           ds = s, conf = sparkSession.sessionState.conf)
-        val options = sessionOptions ++ extraOptions
+        val options = sessionOptions ++ extraOptions.toMap
         val dataSourceOptions = new DataSourceOptions(options.asJava)
         var tempReader: MicroBatchReader = null
         val schema = try {
@@ -199,7 +200,7 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
       case s: ContinuousReadSupport =>
         val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
           ds = s, conf = sparkSession.sessionState.conf)
-        val options = sessionOptions ++ extraOptions
+        val options = sessionOptions ++ extraOptions.toMap
         val dataSourceOptions = new DataSourceOptions(options.asJava)
         val tempReader = s.createContinuousReader(
           Optional.ofNullable(userSpecifiedSchema.orNull),
@@ -467,5 +468,5 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
 
   private var userSpecifiedSchema: Option[StructType] = None
 
-  private var extraOptions = new scala.collection.mutable.HashMap[String, String]
+  private var extraOptions = CaseInsensitiveMap[String](Map.empty)
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
index ec7eb1d..14add0a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
@@ -25,6 +25,7 @@ import org.apache.spark.annotation.{InterfaceStability, Since}
 import org.apache.spark.api.java.function.VoidFunction2
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.streaming.InternalOutputModes
+import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
 import org.apache.spark.sql.execution.command.DDLUtils
 import org.apache.spark.sql.execution.datasources.DataSource
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils
@@ -311,7 +312,7 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
         case w: StreamWriteSupport if !disabledSources.contains(w.getClass.getCanonicalName) =>
           val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
             w, df.sparkSession.sessionState.conf)
-          options = sessionOptions ++ extraOptions
+          options = sessionOptions ++ extraOptions.toMap
           w
         case _ =>
           val ds = DataSource(
@@ -422,7 +423,7 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
 
   private var trigger: Trigger = Trigger.ProcessingTime(0L)
 
-  private var extraOptions = new scala.collection.mutable.HashMap[String, String]
+  private var extraOptions = CaseInsensitiveMap[String](Map.empty)
 
   private var foreachWriter: ForeachWriter[T] = null
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
index 297a7cc..4744aca 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
@@ -187,6 +187,37 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter {
     query.stop()
   }
 
+  test("SPARK-32832: later option should override earlier options for load()") {
+    spark.readStream
+      .format("org.apache.spark.sql.streaming.test")
+      .option("paTh", "1")
+      .option("PATH", "2")
+      .option("Path", "3")
+      .option("patH", "4")
+      .option("path", "5")
+      .load()
+    assert(LastOptions.parameters("path") == "5")
+  }
+
+  test("SPARK-32832: later option should override earlier options for start()") {
+    val ds = spark.readStream
+      .format("org.apache.spark.sql.streaming.test")
+      .load()
+    assert(LastOptions.parameters.isEmpty)
+
+    val query = ds.writeStream
+      .format("org.apache.spark.sql.streaming.test")
+      .option("checkpointLocation", newMetadataDir)
+      .option("paTh", "1")
+      .option("PATH", "2")
+      .option("Path", "3")
+      .option("patH", "4")
+      .option("path", "5")
+      .start()
+    assert(LastOptions.parameters("path") == "5")
+    query.stop()
+  }
+
   test("partitioning") {
     val df = spark.readStream
       .format("org.apache.spark.sql.streaming.test")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org