You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2021/02/21 01:58:39 UTC
[spark] branch master updated: [SPARK-34481][SQL] Refactor
dataframe reader/writer optionsWithPath logic
This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 7de49a8 [SPARK-34481][SQL] Refactor dataframe reader/writer optionsWithPath logic
7de49a8 is described below
commit 7de49a8fc0c47fb4d2ce44e3ebe2978e002d9699
Author: Yuchen Huo <yu...@databricks.com>
AuthorDate: Sat Feb 20 17:57:43 2021 -0800
[SPARK-34481][SQL] Refactor dataframe reader/writer optionsWithPath logic
### What changes were proposed in this pull request?
Extract optionsWithPath logic into their own function.
### Why are the changes needed?
Reduce the code duplication and improve modularity.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Just some refactoring. Existing tests.
Closes #31599 from yuchenhuo/SPARK-34481.
Authored-by: Yuchen Huo <yu...@databricks.com>
Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
.../scala/org/apache/spark/sql/DataFrameReader.scala | 20 ++++++++++++--------
.../scala/org/apache/spark/sql/DataFrameWriter.scala | 20 ++++++++++----------
2 files changed, 22 insertions(+), 18 deletions(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index 1a19542..195f4f3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -268,14 +268,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
source = provider, conf = sparkSession.sessionState.conf)
- val optionsWithPath = if (paths.isEmpty) {
- extraOptions
- } else if (paths.length == 1) {
- extraOptions + ("path" -> paths.head)
- } else {
- val objectMapper = new ObjectMapper()
- extraOptions + ("paths" -> objectMapper.writeValueAsString(paths.toArray))
- }
+ val optionsWithPath = getOptionsWithPaths(paths: _*)
val finalOptions = sessionOptions.filterKeys(!optionsWithPath.contains(_)).toMap ++
optionsWithPath.originalMap
@@ -308,6 +301,17 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
}.getOrElse(loadV1Source(paths: _*))
}
+ private def getOptionsWithPaths(paths: String*): CaseInsensitiveMap[String] = {
+ if (paths.isEmpty) {
+ extraOptions
+ } else if (paths.length == 1) {
+ extraOptions + ("path" -> paths.head)
+ } else {
+ val objectMapper = new ObjectMapper()
+ extraOptions + ("paths" -> objectMapper.writeValueAsString(paths.toArray))
+ }
+ }
+
private def loadV1Source(paths: String*) = {
val legacyPathOptionBehavior = sparkSession.sessionState.conf.legacyPathOptionBehavior
val (finalPaths, finalOptions) = if (!legacyPathOptionBehavior && paths.length == 1) {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 1dba17b..fe6572cf 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -314,11 +314,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
provider, df.sparkSession.sessionState.conf)
- val optionsWithPath = if (path.isEmpty) {
- extraOptions
- } else {
- extraOptions + ("path" -> path.get)
- }
+ val optionsWithPath = getOptionsWithPath(path)
val finalOptions = sessionOptions.filterKeys(!optionsWithPath.contains(_)).toMap ++
optionsWithPath.originalMap
@@ -416,6 +412,14 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
}
}
+ private def getOptionsWithPath(path: Option[String]): CaseInsensitiveMap[String] = {
+ if (path.isEmpty) {
+ extraOptions
+ } else {
+ extraOptions + ("path" -> path.get)
+ }
+ }
+
private def saveToV1Source(path: Option[String]): Unit = {
partitioningColumns.foreach { columns =>
extraOptions = extraOptions + (
@@ -423,11 +427,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
DataSourceUtils.encodePartitioningColumns(columns))
}
- val optionsWithPath = if (path.isEmpty) {
- extraOptions
- } else {
- extraOptions + ("path" -> path.get)
- }
+ val optionsWithPath = getOptionsWithPath(path)
// Code path for data source v1.
runCommand(df.sparkSession, "save") {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org