You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2020/09/11 03:27:50 UTC
[spark] branch branch-2.4 updated: [SPARK-32832][SS][2.4] Use
CaseInsensitiveMap for DataStreamReader/Writer options
This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-2.4 by this push:
new ee6b7d4 [SPARK-32832][SS][2.4] Use CaseInsensitiveMap for DataStreamReader/Writer options
ee6b7d4 is described below
commit ee6b7d40f8ccfb485714cdbf36911455f389e1a1
Author: Dongjoon Hyun <do...@apache.org>
AuthorDate: Thu Sep 10 20:25:03 2020 -0700
[SPARK-32832][SS][2.4] Use CaseInsensitiveMap for DataStreamReader/Writer options
### What changes were proposed in this pull request?
This is a backport of https://github.com/apache/spark/pull/29702 .
This PR aims to fix indeterministic behavior on DataStreamReader/Writer options like the following.
```scala
scala> spark.readStream.format("parquet").option("paTh", "1").option("PATH", "2").option("Path", "3").option("patH", "4").option("path", "5").load()
org.apache.spark.sql.AnalysisException: Path does not exist: 1;
```
### Why are the changes needed?
This will make the behavior deterministic.
### Does this PR introduce _any_ user-facing change?
Yes, but the previous behavior is indeterministic.
### How was this patch tested?
Pass the newly test cases.
Closes #29707 from dongjoon-hyun/SPARK-32832-2.
Authored-by: Dongjoon Hyun <do...@apache.org>
Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
.../spark/sql/streaming/DataStreamReader.scala | 7 ++---
.../spark/sql/streaming/DataStreamWriter.scala | 5 ++--
.../test/DataStreamReaderWriterSuite.scala | 31 ++++++++++++++++++++++
3 files changed, 38 insertions(+), 5 deletions(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
index a9cb5e8..1a445a3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
@@ -24,6 +24,7 @@ import scala.collection.JavaConverters._
import org.apache.spark.annotation.InterfaceStability
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset, SparkSession}
+import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.execution.datasources.DataSource
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils
@@ -175,7 +176,7 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
case s: MicroBatchReadSupport =>
val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
ds = s, conf = sparkSession.sessionState.conf)
- val options = sessionOptions ++ extraOptions
+ val options = sessionOptions ++ extraOptions.toMap
val dataSourceOptions = new DataSourceOptions(options.asJava)
var tempReader: MicroBatchReader = null
val schema = try {
@@ -199,7 +200,7 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
case s: ContinuousReadSupport =>
val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
ds = s, conf = sparkSession.sessionState.conf)
- val options = sessionOptions ++ extraOptions
+ val options = sessionOptions ++ extraOptions.toMap
val dataSourceOptions = new DataSourceOptions(options.asJava)
val tempReader = s.createContinuousReader(
Optional.ofNullable(userSpecifiedSchema.orNull),
@@ -467,5 +468,5 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
private var userSpecifiedSchema: Option[StructType] = None
- private var extraOptions = new scala.collection.mutable.HashMap[String, String]
+ private var extraOptions = CaseInsensitiveMap[String](Map.empty)
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
index ec7eb1d..14add0a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
@@ -25,6 +25,7 @@ import org.apache.spark.annotation.{InterfaceStability, Since}
import org.apache.spark.api.java.function.VoidFunction2
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.streaming.InternalOutputModes
+import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.execution.datasources.DataSource
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils
@@ -311,7 +312,7 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
case w: StreamWriteSupport if !disabledSources.contains(w.getClass.getCanonicalName) =>
val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
w, df.sparkSession.sessionState.conf)
- options = sessionOptions ++ extraOptions
+ options = sessionOptions ++ extraOptions.toMap
w
case _ =>
val ds = DataSource(
@@ -422,7 +423,7 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
private var trigger: Trigger = Trigger.ProcessingTime(0L)
- private var extraOptions = new scala.collection.mutable.HashMap[String, String]
+ private var extraOptions = CaseInsensitiveMap[String](Map.empty)
private var foreachWriter: ForeachWriter[T] = null
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
index 297a7cc..4744aca 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
@@ -187,6 +187,37 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter {
query.stop()
}
+ test("SPARK-32832: later option should override earlier options for load()") {
+ spark.readStream
+ .format("org.apache.spark.sql.streaming.test")
+ .option("paTh", "1")
+ .option("PATH", "2")
+ .option("Path", "3")
+ .option("patH", "4")
+ .option("path", "5")
+ .load()
+ assert(LastOptions.parameters("path") == "5")
+ }
+
+ test("SPARK-32832: later option should override earlier options for start()") {
+ val ds = spark.readStream
+ .format("org.apache.spark.sql.streaming.test")
+ .load()
+ assert(LastOptions.parameters.isEmpty)
+
+ val query = ds.writeStream
+ .format("org.apache.spark.sql.streaming.test")
+ .option("checkpointLocation", newMetadataDir)
+ .option("paTh", "1")
+ .option("PATH", "2")
+ .option("Path", "3")
+ .option("patH", "4")
+ .option("path", "5")
+ .start()
+ assert(LastOptions.parameters("path") == "5")
+ query.stop()
+ }
+
test("partitioning") {
val df = spark.readStream
.format("org.apache.spark.sql.streaming.test")
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org