You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2020/09/10 06:51:48 UTC
[spark] branch branch-3.0 updated: [SPARK-32832][SS] Use
CaseInsensitiveMap for DataStreamReader/Writer options
This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 44acb5a [SPARK-32832][SS] Use CaseInsensitiveMap for DataStreamReader/Writer options
44acb5a is described below
commit 44acb5a03a1c3cebe0935ec6f2b2d59afbb8f7e2
Author: Dongjoon Hyun <do...@apache.org>
AuthorDate: Wed Sep 9 23:41:32 2020 -0700
[SPARK-32832][SS] Use CaseInsensitiveMap for DataStreamReader/Writer options
This PR aims to fix indeterministic behavior on DataStreamReader/Writer options like the following.
```scala
scala> spark.readStream.format("parquet").option("paTh", "1").option("PATH", "2").option("Path", "3").option("patH", "4").option("path", "5").load()
org.apache.spark.sql.AnalysisException: Path does not exist: 1;
```
This will make the behavior deterministic.
Yes, but the previous behavior is indeterministic.
Pass the newly test cases.
Closes #29702 from dongjoon-hyun/SPARK-32832.
Authored-by: Dongjoon Hyun <do...@apache.org>
Signed-off-by: Dongjoon Hyun <do...@apache.org>
(cherry picked from commit 2f85f9516cfc33a376871cf27f9fb4ac30ecbed8)
Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
.../spark/sql/streaming/DataStreamReader.scala | 5 ++--
.../spark/sql/streaming/DataStreamWriter.scala | 5 ++--
.../test/DataStreamReaderWriterSuite.scala | 31 ++++++++++++++++++++++
3 files changed, 37 insertions(+), 4 deletions(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
index 1d7e4d3..6b30949 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
@@ -24,6 +24,7 @@ import scala.collection.JavaConverters._
import org.apache.spark.annotation.Evolving
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset, SparkSession}
+import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.connector.catalog.{SupportsRead, TableProvider}
import org.apache.spark.sql.connector.catalog.TableCapability._
import org.apache.spark.sql.execution.command.DDLUtils
@@ -210,7 +211,7 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
case provider: TableProvider if !provider.isInstanceOf[FileDataSourceV2] =>
val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
source = provider, conf = sparkSession.sessionState.conf)
- val options = sessionOptions ++ extraOptions
+ val options = sessionOptions ++ extraOptions.toMap
val dsOptions = new CaseInsensitiveStringMap(options.asJava)
val table = DataSourceV2Utils.getTableFromProvider(provider, dsOptions, userSpecifiedSchema)
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._
@@ -520,5 +521,5 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
private var userSpecifiedSchema: Option[StructType] = None
- private var extraOptions = new scala.collection.mutable.HashMap[String, String]
+ private var extraOptions = CaseInsensitiveMap[String](Map.empty)
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
index 1d0ca4d..07ab400 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
@@ -26,6 +26,7 @@ import org.apache.spark.annotation.Evolving
import org.apache.spark.api.java.function.VoidFunction2
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.streaming.InternalOutputModes
+import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.connector.catalog.{SupportsWrite, TableProvider}
import org.apache.spark.sql.connector.catalog.TableCapability._
import org.apache.spark.sql.execution.command.DDLUtils
@@ -349,7 +350,7 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
val provider = cls.getConstructor().newInstance().asInstanceOf[TableProvider]
val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
source = provider, conf = df.sparkSession.sessionState.conf)
- val options = sessionOptions ++ extraOptions
+ val options = sessionOptions ++ extraOptions.toMap
val dsOptions = new CaseInsensitiveStringMap(options.asJava)
val table = DataSourceV2Utils.getTableFromProvider(
provider, dsOptions, userSpecifiedSchema = None)
@@ -472,7 +473,7 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
private var trigger: Trigger = Trigger.ProcessingTime(0L)
- private var extraOptions = new scala.collection.mutable.HashMap[String, String]
+ private var extraOptions = CaseInsensitiveMap[String](Map.empty)
private var foreachWriter: ForeachWriter[T] = null
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
index b646387..d90af35 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
@@ -185,6 +185,37 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter {
query.stop()
}
+ test("SPARK-32832: later option should override earlier options for load()") {
+ spark.readStream
+ .format("org.apache.spark.sql.streaming.test")
+ .option("paTh", "1")
+ .option("PATH", "2")
+ .option("Path", "3")
+ .option("patH", "4")
+ .option("path", "5")
+ .load()
+ assert(LastOptions.parameters("path") == "5")
+ }
+
+ test("SPARK-32832: later option should override earlier options for start()") {
+ val ds = spark.readStream
+ .format("org.apache.spark.sql.streaming.test")
+ .load()
+ assert(LastOptions.parameters.isEmpty)
+
+ val query = ds.writeStream
+ .format("org.apache.spark.sql.streaming.test")
+ .option("checkpointLocation", newMetadataDir)
+ .option("paTh", "1")
+ .option("PATH", "2")
+ .option("Path", "3")
+ .option("patH", "4")
+ .option("path", "5")
+ .start()
+ assert(LastOptions.parameters("path") == "5")
+ query.stop()
+ }
+
test("partitioning") {
val df = spark.readStream
.format("org.apache.spark.sql.streaming.test")
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org