You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2018/09/19 23:53:35 UTC
spark git commit: [SPARK-25425][SQL][BACKPORT-2.4] Extra options
should override session options in DataSource V2
Repository: spark
Updated Branches:
refs/heads/branch-2.4 9031c7848 -> a9a8d3a4b
[SPARK-25425][SQL][BACKPORT-2.4] Extra options should override session options in DataSource V2
## What changes were proposed in this pull request?
In the PR, I propose overriding session options by extra options in DataSource V2. Extra options are more specific and set via `.option()`, and should overwrite more generic session options.
## How was this patch tested?
Added tests for read and write paths.
Closes #22474 from MaxGekk/session-options-2.4.
Authored-by: Maxim Gekk <ma...@databricks.com>
Signed-off-by: Dongjoon Hyun <do...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a9a8d3a4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a9a8d3a4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a9a8d3a4
Branch: refs/heads/branch-2.4
Commit: a9a8d3a4b92be89defd82d5f2eeb3f9af45c687d
Parents: 9031c78
Author: Maxim Gekk <ma...@databricks.com>
Authored: Wed Sep 19 16:53:26 2018 -0700
Committer: Dongjoon Hyun <do...@apache.org>
Committed: Wed Sep 19 16:53:26 2018 -0700
----------------------------------------------------------------------
.../org/apache/spark/sql/DataFrameReader.scala | 2 +-
.../org/apache/spark/sql/DataFrameWriter.scala | 8 +++--
.../sql/sources/v2/DataSourceV2Suite.scala | 33 ++++++++++++++++++++
.../sources/v2/SimpleWritableDataSource.scala | 7 ++++-
4 files changed, 45 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/a9a8d3a4/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index 371ec70..27a1af2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -202,7 +202,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
DataSourceOptions.PATHS_KEY -> objectMapper.writeValueAsString(paths.toArray)
}
Dataset.ofRows(sparkSession, DataSourceV2Relation.create(
- ds, extraOptions.toMap ++ sessionOptions + pathsOption,
+ ds, sessionOptions ++ extraOptions.toMap + pathsOption,
userSpecifiedSchema = userSpecifiedSchema))
} else {
loadV1Source(paths: _*)
http://git-wip-us.apache.org/repos/asf/spark/blob/a9a8d3a4/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 4aeddfd..80ade7c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -241,10 +241,12 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
val source = cls.newInstance().asInstanceOf[DataSourceV2]
source match {
case ws: WriteSupport =>
- val options = extraOptions ++
- DataSourceV2Utils.extractSessionConfigs(source, df.sparkSession.sessionState.conf)
+ val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
+ source,
+ df.sparkSession.sessionState.conf)
+ val options = sessionOptions ++ extraOptions
+ val relation = DataSourceV2Relation.create(source, options)
- val relation = DataSourceV2Relation.create(source, options.toMap)
if (mode == SaveMode.Append) {
runCommand(df.sparkSession, "save") {
AppendData.byName(relation, df.logicalPlan)
http://git-wip-us.apache.org/repos/asf/spark/blob/a9a8d3a4/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
index 12beca2..bafde50 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.sources.v2
+import java.io.File
import java.util.{ArrayList, List => JList}
import test.org.apache.spark.sql.sources.v2._
@@ -322,6 +323,38 @@ class DataSourceV2Suite extends QueryTest with SharedSQLContext {
checkCanonicalizedOutput(df, 2, 2)
checkCanonicalizedOutput(df.select('i), 2, 1)
}
+
+ test("SPARK-25425: extra options should override sessions options during reading") {
+ val prefix = "spark.datasource.userDefinedDataSource."
+ val optionName = "optionA"
+ withSQLConf(prefix + optionName -> "true") {
+ val df = spark
+ .read
+ .option(optionName, false)
+ .format(classOf[DataSourceV2WithSessionConfig].getName).load()
+ val options = df.queryExecution.optimizedPlan.collectFirst {
+ case d: DataSourceV2Relation => d.options
+ }
+ assert(options.get.get(optionName) == Some("false"))
+ }
+ }
+
+ test("SPARK-25425: extra options should override sessions options during writing") {
+ withTempPath { path =>
+ val sessionPath = path.getCanonicalPath
+ withSQLConf("spark.datasource.simpleWritableDataSource.path" -> sessionPath) {
+ withTempPath { file =>
+ val optionPath = file.getCanonicalPath
+ val format = classOf[SimpleWritableDataSource].getName
+
+ val df = Seq((1L, 2L)).toDF("i", "j")
+ df.write.format(format).option("path", optionPath).save()
+ assert(!new File(sessionPath).exists)
+ checkAnswer(spark.read.format(format).option("path", optionPath).load(), df)
+ }
+ }
+ }
+ }
}
class SimpleSinglePartitionSource extends DataSourceV2 with ReadSupport {
http://git-wip-us.apache.org/repos/asf/spark/blob/a9a8d3a4/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala
index e1b8e9c..654c62d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala
@@ -38,10 +38,15 @@ import org.apache.spark.util.SerializableConfiguration
* Each task writes data to `target/_temporary/jobId/$jobId-$partitionId-$attemptNumber`.
* Each job moves files from `target/_temporary/jobId/` to `target`.
*/
-class SimpleWritableDataSource extends DataSourceV2 with ReadSupport with WriteSupport {
+class SimpleWritableDataSource extends DataSourceV2
+ with ReadSupport
+ with WriteSupport
+ with SessionConfigSupport {
private val schema = new StructType().add("i", "long").add("j", "long")
+ override def keyPrefix: String = "simpleWritableDataSource"
+
class Reader(path: String, conf: Configuration) extends DataSourceReader {
override def readSchema(): StructType = schema
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org