You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2022/09/29 12:37:39 UTC
[spark] branch master updated: [SPARK-39146][CORE][SQL] Introduce local singleton for `ObjectMapper` that may be reused
This is an automated email from the ASF dual-hosted git repository.
srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 94407429427 [SPARK-39146][CORE][SQL] Introduce local singleton for `ObjectMapper` that may be reused
94407429427 is described below
commit 944074294277849f8bb920e8c368ef837c364fb1
Author: yangjie01 <ya...@baidu.com>
AuthorDate: Thu Sep 29 07:37:21 2022 -0500
[SPARK-39146][CORE][SQL] Introduce local singleton for `ObjectMapper` that may be reused
### What changes were proposed in this pull request?
This pr introduce local singletons for Jackson `ObjectMapper` that may be reused in Spark code to reduce the cost of repeatedly creating `ObjectMapper`.
### Why are the changes needed?
Minor performance improvement.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Pass GitHub Actions
Closes #37999 from LuciferYang/SPARK-39146-2.
Authored-by: yangjie01 <ya...@baidu.com>
Signed-off-by: Sean Owen <sr...@gmail.com>
---
.../org/apache/spark/ErrorClassesJSONReader.scala | 19 +++++++++++--------
.../spark/sql/catalyst/util/RebaseDateTime.scala | 8 ++++++--
.../execution/datasources/v2/DataSourceV2Utils.scala | 2 +-
.../execution/datasources/v2/FileDataSourceV2.scala | 10 ++++++++--
4 files changed, 26 insertions(+), 13 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/ErrorClassesJSONReader.scala b/core/src/main/scala/org/apache/spark/ErrorClassesJSONReader.scala
index 9d6dd9dde07..e06fd1711d8 100644
--- a/core/src/main/scala/org/apache/spark/ErrorClassesJSONReader.scala
+++ b/core/src/main/scala/org/apache/spark/ErrorClassesJSONReader.scala
@@ -39,15 +39,9 @@ import org.apache.spark.annotation.DeveloperApi
class ErrorClassesJsonReader(jsonFileURLs: Seq[URL]) {
assert(jsonFileURLs.nonEmpty)
- private def readAsMap(url: URL): SortedMap[String, ErrorInfo] = {
- val mapper: JsonMapper = JsonMapper.builder()
- .addModule(DefaultScalaModule)
- .build()
- mapper.readValue(url, new TypeReference[SortedMap[String, ErrorInfo]]() {})
- }
-
// Exposed for testing
- private[spark] val errorInfoMap = jsonFileURLs.map(readAsMap).reduce(_ ++ _)
+ private[spark] val errorInfoMap =
+ jsonFileURLs.map(ErrorClassesJsonReader.readAsMap).reduce(_ ++ _)
def getErrorMessage(errorClass: String, messageParameters: Map[String, String]): String = {
val messageTemplate = getMessageTemplate(errorClass)
@@ -88,6 +82,15 @@ class ErrorClassesJsonReader(jsonFileURLs: Seq[URL]) {
}
}
+private object ErrorClassesJsonReader {
+ private val mapper: JsonMapper = JsonMapper.builder()
+ .addModule(DefaultScalaModule)
+ .build()
+ private def readAsMap(url: URL): SortedMap[String, ErrorInfo] = {
+ mapper.readValue(url, new TypeReference[SortedMap[String, ErrorInfo]]() {})
+ }
+}
+
/**
* Information associated with an error class.
*
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/RebaseDateTime.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/RebaseDateTime.scala
index dc1c4dbe677..a2a63e2af42 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/RebaseDateTime.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/RebaseDateTime.scala
@@ -268,13 +268,17 @@ object RebaseDateTime {
micros + rebaseInfo.diffs(i)
}
+ private lazy val mapper = {
+ val mapper = new ObjectMapper() with ClassTagExtensions
+ mapper.registerModule(DefaultScalaModule)
+ mapper
+ }
+
// Loads rebasing info from an JSON file. JSON records in the files should conform to
// `JsonRebaseRecord`. AnyRefMap is used here instead of Scala's immutable map because
// it is 2 times faster in DateTimeRebaseBenchmark.
private[sql] def loadRebaseRecords(fileName: String): AnyRefMap[String, RebaseInfo] = {
val file = Utils.getSparkClassLoader.getResource(fileName)
- val mapper = new ObjectMapper() with ClassTagExtensions
- mapper.registerModule(DefaultScalaModule)
val jsonRebaseRecords = mapper.readValue[Seq[JsonRebaseRecord]](file)
val anyRefMap = new AnyRefMap[String, RebaseInfo]((3 * jsonRebaseRecords.size) / 2)
jsonRebaseRecords.foreach { jsonRecord =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala
index 7fd61c44fd1..f1d1cc5a173 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala
@@ -150,6 +150,7 @@ private[sql] object DataSourceV2Utils extends Logging {
}
}
+ private lazy val objectMapper = new ObjectMapper()
private def getOptionsWithPaths(
extraOptions: CaseInsensitiveMap[String],
paths: String*): CaseInsensitiveMap[String] = {
@@ -158,7 +159,6 @@ private[sql] object DataSourceV2Utils extends Logging {
} else if (paths.length == 1) {
extraOptions + ("path" -> paths.head)
} else {
- val objectMapper = new ObjectMapper()
extraOptions + ("paths" -> objectMapper.writeValueAsString(paths.toArray))
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala
index 08635d51172..0bd25064e35 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala
@@ -21,6 +21,7 @@ import java.util
import scala.collection.JavaConverters._
import com.fasterxml.jackson.databind.ObjectMapper
+import com.fasterxml.jackson.module.scala.DefaultScalaModule
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
@@ -50,9 +51,8 @@ trait FileDataSourceV2 extends TableProvider with DataSourceRegister {
lazy val sparkSession = SparkSession.active
protected def getPaths(map: CaseInsensitiveStringMap): Seq[String] = {
- val objectMapper = new ObjectMapper()
val paths = Option(map.get("paths")).map { pathStr =>
- objectMapper.readValue(pathStr, classOf[Array[String]]).toSeq
+ FileDataSourceV2.readPathsToSeq(pathStr)
}.getOrElse(Seq.empty)
paths ++ Option(map.get("path")).toSeq
}
@@ -113,3 +113,9 @@ trait FileDataSourceV2 extends TableProvider with DataSourceRegister {
}
}
}
+
+private object FileDataSourceV2 {
+ private lazy val objectMapper = new ObjectMapper().registerModule(DefaultScalaModule)
+ private def readPathsToSeq(paths: String): Seq[String] =
+ objectMapper.readValue(paths, classOf[Seq[String]])
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org