You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2022/09/29 12:37:39 UTC

[spark] branch master updated: [SPARK-39146][CORE][SQL] Introduce local singleton for `ObjectMapper` that may be reused

This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 94407429427 [SPARK-39146][CORE][SQL] Introduce local singleton for `ObjectMapper` that may be reused
94407429427 is described below

commit 944074294277849f8bb920e8c368ef837c364fb1
Author: yangjie01 <ya...@baidu.com>
AuthorDate: Thu Sep 29 07:37:21 2022 -0500

    [SPARK-39146][CORE][SQL] Introduce local singleton for `ObjectMapper` that may be reused
    
    ### What changes were proposed in this pull request?
    This pr introduce local singletons for  Jackson `ObjectMapper` that may be reused in Spark  code to reduce the cost of repeatedly creating `ObjectMapper`.
    
    ### Why are the changes needed?
    Minor performance improvement.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    Pass GitHub Actions
    
    Closes #37999 from LuciferYang/SPARK-39146-2.
    
    Authored-by: yangjie01 <ya...@baidu.com>
    Signed-off-by: Sean Owen <sr...@gmail.com>
---
 .../org/apache/spark/ErrorClassesJSONReader.scala     | 19 +++++++++++--------
 .../spark/sql/catalyst/util/RebaseDateTime.scala      |  8 ++++++--
 .../execution/datasources/v2/DataSourceV2Utils.scala  |  2 +-
 .../execution/datasources/v2/FileDataSourceV2.scala   | 10 ++++++++--
 4 files changed, 26 insertions(+), 13 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/ErrorClassesJSONReader.scala b/core/src/main/scala/org/apache/spark/ErrorClassesJSONReader.scala
index 9d6dd9dde07..e06fd1711d8 100644
--- a/core/src/main/scala/org/apache/spark/ErrorClassesJSONReader.scala
+++ b/core/src/main/scala/org/apache/spark/ErrorClassesJSONReader.scala
@@ -39,15 +39,9 @@ import org.apache.spark.annotation.DeveloperApi
 class ErrorClassesJsonReader(jsonFileURLs: Seq[URL]) {
   assert(jsonFileURLs.nonEmpty)
 
-  private def readAsMap(url: URL): SortedMap[String, ErrorInfo] = {
-    val mapper: JsonMapper = JsonMapper.builder()
-      .addModule(DefaultScalaModule)
-      .build()
-    mapper.readValue(url, new TypeReference[SortedMap[String, ErrorInfo]]() {})
-  }
-
   // Exposed for testing
-  private[spark] val errorInfoMap = jsonFileURLs.map(readAsMap).reduce(_ ++ _)
+  private[spark] val errorInfoMap =
+    jsonFileURLs.map(ErrorClassesJsonReader.readAsMap).reduce(_ ++ _)
 
   def getErrorMessage(errorClass: String, messageParameters: Map[String, String]): String = {
     val messageTemplate = getMessageTemplate(errorClass)
@@ -88,6 +82,15 @@ class ErrorClassesJsonReader(jsonFileURLs: Seq[URL]) {
   }
 }
 
+private object ErrorClassesJsonReader {
+  private val mapper: JsonMapper = JsonMapper.builder()
+    .addModule(DefaultScalaModule)
+    .build()
+  private def readAsMap(url: URL): SortedMap[String, ErrorInfo] = {
+    mapper.readValue(url, new TypeReference[SortedMap[String, ErrorInfo]]() {})
+  }
+}
+
 /**
  * Information associated with an error class.
  *
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/RebaseDateTime.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/RebaseDateTime.scala
index dc1c4dbe677..a2a63e2af42 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/RebaseDateTime.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/RebaseDateTime.scala
@@ -268,13 +268,17 @@ object RebaseDateTime {
     micros + rebaseInfo.diffs(i)
   }
 
+  private lazy val mapper = {
+    val mapper = new ObjectMapper() with ClassTagExtensions
+    mapper.registerModule(DefaultScalaModule)
+    mapper
+  }
+
   // Loads rebasing info from an JSON file. JSON records in the files should conform to
   // `JsonRebaseRecord`. AnyRefMap is used here instead of Scala's immutable map because
   // it is 2 times faster in DateTimeRebaseBenchmark.
   private[sql] def loadRebaseRecords(fileName: String): AnyRefMap[String, RebaseInfo] = {
     val file = Utils.getSparkClassLoader.getResource(fileName)
-    val mapper = new ObjectMapper() with ClassTagExtensions
-    mapper.registerModule(DefaultScalaModule)
     val jsonRebaseRecords = mapper.readValue[Seq[JsonRebaseRecord]](file)
     val anyRefMap = new AnyRefMap[String, RebaseInfo]((3 * jsonRebaseRecords.size) / 2)
     jsonRebaseRecords.foreach { jsonRecord =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala
index 7fd61c44fd1..f1d1cc5a173 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala
@@ -150,6 +150,7 @@ private[sql] object DataSourceV2Utils extends Logging {
     }
   }
 
+  private lazy val objectMapper = new ObjectMapper()
   private def getOptionsWithPaths(
       extraOptions: CaseInsensitiveMap[String],
       paths: String*): CaseInsensitiveMap[String] = {
@@ -158,7 +159,6 @@ private[sql] object DataSourceV2Utils extends Logging {
     } else if (paths.length == 1) {
       extraOptions + ("path" -> paths.head)
     } else {
-      val objectMapper = new ObjectMapper()
       extraOptions + ("paths" -> objectMapper.writeValueAsString(paths.toArray))
     }
   }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala
index 08635d51172..0bd25064e35 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala
@@ -21,6 +21,7 @@ import java.util
 import scala.collection.JavaConverters._
 
 import com.fasterxml.jackson.databind.ObjectMapper
+import com.fasterxml.jackson.module.scala.DefaultScalaModule
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 
@@ -50,9 +51,8 @@ trait FileDataSourceV2 extends TableProvider with DataSourceRegister {
   lazy val sparkSession = SparkSession.active
 
   protected def getPaths(map: CaseInsensitiveStringMap): Seq[String] = {
-    val objectMapper = new ObjectMapper()
     val paths = Option(map.get("paths")).map { pathStr =>
-      objectMapper.readValue(pathStr, classOf[Array[String]]).toSeq
+      FileDataSourceV2.readPathsToSeq(pathStr)
     }.getOrElse(Seq.empty)
     paths ++ Option(map.get("path")).toSeq
   }
@@ -113,3 +113,9 @@ trait FileDataSourceV2 extends TableProvider with DataSourceRegister {
     }
   }
 }
+
+private object FileDataSourceV2 {
+  private lazy val objectMapper = new ObjectMapper().registerModule(DefaultScalaModule)
+  private def readPathsToSeq(paths: String): Seq[String] =
+    objectMapper.readValue(paths, classOf[Seq[String]])
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org