You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2021/02/21 01:58:39 UTC

[spark] branch master updated: [SPARK-34481][SQL] Refactor dataframe reader/writer optionsWithPath logic

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 7de49a8  [SPARK-34481][SQL] Refactor dataframe reader/writer optionsWithPath logic
7de49a8 is described below

commit 7de49a8fc0c47fb4d2ce44e3ebe2978e002d9699
Author: Yuchen Huo <yu...@databricks.com>
AuthorDate: Sat Feb 20 17:57:43 2021 -0800

    [SPARK-34481][SQL] Refactor dataframe reader/writer optionsWithPath logic
    
    ### What changes were proposed in this pull request?
    
    Extract optionsWithPath logic into their own function.
    
    ### Why are the changes needed?
    
    Reduce the code duplication and improve modularity.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Just some refactoring. Existing tests.
    
    Closes #31599 from yuchenhuo/SPARK-34481.
    
    Authored-by: Yuchen Huo <yu...@databricks.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../scala/org/apache/spark/sql/DataFrameReader.scala | 20 ++++++++++++--------
 .../scala/org/apache/spark/sql/DataFrameWriter.scala | 20 ++++++++++----------
 2 files changed, 22 insertions(+), 18 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index 1a19542..195f4f3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -268,14 +268,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
       val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
         source = provider, conf = sparkSession.sessionState.conf)
 
-      val optionsWithPath = if (paths.isEmpty) {
-        extraOptions
-      } else if (paths.length == 1) {
-        extraOptions + ("path" -> paths.head)
-      } else {
-        val objectMapper = new ObjectMapper()
-        extraOptions + ("paths" -> objectMapper.writeValueAsString(paths.toArray))
-      }
+      val optionsWithPath = getOptionsWithPaths(paths: _*)
 
       val finalOptions = sessionOptions.filterKeys(!optionsWithPath.contains(_)).toMap ++
         optionsWithPath.originalMap
@@ -308,6 +301,17 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
     }.getOrElse(loadV1Source(paths: _*))
   }
 
+  private def getOptionsWithPaths(paths: String*): CaseInsensitiveMap[String] = {
+    if (paths.isEmpty) {
+      extraOptions
+    } else if (paths.length == 1) {
+      extraOptions + ("path" -> paths.head)
+    } else {
+      val objectMapper = new ObjectMapper()
+      extraOptions + ("paths" -> objectMapper.writeValueAsString(paths.toArray))
+    }
+  }
+
   private def loadV1Source(paths: String*) = {
     val legacyPathOptionBehavior = sparkSession.sessionState.conf.legacyPathOptionBehavior
     val (finalPaths, finalOptions) = if (!legacyPathOptionBehavior && paths.length == 1) {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 1dba17b..fe6572cf 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -314,11 +314,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
       val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
         provider, df.sparkSession.sessionState.conf)
 
-      val optionsWithPath = if (path.isEmpty) {
-        extraOptions
-      } else {
-        extraOptions + ("path" -> path.get)
-      }
+      val optionsWithPath = getOptionsWithPath(path)
 
       val finalOptions = sessionOptions.filterKeys(!optionsWithPath.contains(_)).toMap ++
         optionsWithPath.originalMap
@@ -416,6 +412,14 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
     }
   }
 
+  private def getOptionsWithPath(path: Option[String]): CaseInsensitiveMap[String] = {
+    if (path.isEmpty) {
+      extraOptions
+    } else {
+      extraOptions + ("path" -> path.get)
+    }
+  }
+
   private def saveToV1Source(path: Option[String]): Unit = {
     partitioningColumns.foreach { columns =>
       extraOptions = extraOptions + (
@@ -423,11 +427,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
         DataSourceUtils.encodePartitioningColumns(columns))
     }
 
-    val optionsWithPath = if (path.isEmpty) {
-      extraOptions
-    } else {
-      extraOptions + ("path" -> path.get)
-    }
+    val optionsWithPath = getOptionsWithPath(path)
 
     // Code path for data source v1.
     runCommand(df.sparkSession, "save") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org