You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2019/09/15 07:28:52 UTC

[spark] branch master updated: [SPARK-29081][CORE] Replace calls to SerializationUtils.clone on properties with a faster implementation

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 8c0e961  [SPARK-29081][CORE] Replace calls to SerializationUtils.clone on properties with a faster implementation
8c0e961 is described below

commit 8c0e961f6c5fe0da3b36a6fe642c12f88ac34d0f
Author: David Lewis <da...@databricks.com>
AuthorDate: Sun Sep 15 00:28:32 2019 -0700

    [SPARK-29081][CORE] Replace calls to SerializationUtils.clone on properties with a faster implementation
    
    Replace use of `SerializationUtils.clone` with new `Utils.cloneProperties` method
    Add benchmark + results showing dramatic speed up for effectively equivalent functionality.
    
    ### What changes were proposed in this pull request?
    While I am not sure that SerializationUtils.clone is a performance issue in production, I am sure that it is overkill for the task it is doing (providing a distinct copy of a `Properties` object).
    This PR provides a benchmark showing the dramatic improvement over the clone operation and replaces uses of `SerializationUtils.clone` on `Properties` with the more specialized `Utils.cloneProperties`.
    
    ### Does this PR introduce any user-facing change?
    Strings are immutable so there is no reason to serialize and deserialize them, it just creates extra garbage.
    The only functionality that would be changed is the unsupported insertion of non-String objects into the spark local properties.
    
    ### How was this patch tested?
    
    1. Pass the Jenkins with the existing tests.
    2. Since this is a performance improvement PR, manually run the benchmark.
    
    Closes #25787 from databricks-david-lewis/SPARK-29081.
    
    Authored-by: David Lewis <da...@databricks.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../PropertiesCloneBenchmark-results.txt           | 40 ++++++++++++
 .../main/scala/org/apache/spark/SparkContext.scala |  3 +-
 .../org/apache/spark/scheduler/DAGScheduler.scala  | 10 ++-
 .../main/scala/org/apache/spark/util/Utils.scala   |  7 ++
 .../org/apache/spark/benchmark/Benchmark.scala     |  4 +-
 .../spark/util/PropertiesCloneBenchmark.scala      | 74 ++++++++++++++++++++++
 .../apache/spark/util/ResetSystemProperties.scala  |  5 +-
 .../apache/spark/streaming/StreamingContext.scala  |  3 +-
 .../spark/streaming/scheduler/JobScheduler.scala   |  6 +-
 9 files changed, 134 insertions(+), 18 deletions(-)

diff --git a/core/benchmarks/PropertiesCloneBenchmark-results.txt b/core/benchmarks/PropertiesCloneBenchmark-results.txt
new file mode 100644
index 0000000..00c9561
--- /dev/null
+++ b/core/benchmarks/PropertiesCloneBenchmark-results.txt
@@ -0,0 +1,40 @@
+================================================================================================
+Properties Cloning
+================================================================================================
+
+Java HotSpot(TM) 64-Bit Server VM 1.8.0_131-b11 on Mac OS X 10.14.6
+Intel(R) Core(TM) i9-8950HK CPU @ 2.90GHz
+Empty Properties:                         Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
+------------------------------------------------------------------------------------------------------------------------
+SerializationUtils.clone                              0              0           0          0.2        4184.0       1.0X
+Utils.cloneProperties                                 0              0           0         55.6          18.0     232.4X
+
+Java HotSpot(TM) 64-Bit Server VM 1.8.0_131-b11 on Mac OS X 10.14.6
+Intel(R) Core(TM) i9-8950HK CPU @ 2.90GHz
+System Properties:                        Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
+------------------------------------------------------------------------------------------------------------------------
+SerializationUtils.clone                              0              0           0          0.0      107612.0       1.0X
+Utils.cloneProperties                                 0              0           0          1.0         962.0     111.9X
+
+Java HotSpot(TM) 64-Bit Server VM 1.8.0_131-b11 on Mac OS X 10.14.6
+Intel(R) Core(TM) i9-8950HK CPU @ 2.90GHz
+Small Properties:                         Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
+------------------------------------------------------------------------------------------------------------------------
+SerializationUtils.clone                              0              0           0          0.0      330210.0       1.0X
+Utils.cloneProperties                                 0              0           0          0.9        1082.0     305.2X
+
+Java HotSpot(TM) 64-Bit Server VM 1.8.0_131-b11 on Mac OS X 10.14.6
+Intel(R) Core(TM) i9-8950HK CPU @ 2.90GHz
+Medium Properties:                        Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
+------------------------------------------------------------------------------------------------------------------------
+SerializationUtils.clone                              1              2           0          0.0     1336301.0       1.0X
+Utils.cloneProperties                                 0              0           0          0.2        5456.0     244.9X
+
+Java HotSpot(TM) 64-Bit Server VM 1.8.0_131-b11 on Mac OS X 10.14.6
+Intel(R) Core(TM) i9-8950HK CPU @ 2.90GHz
+Large Properties:                         Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
+------------------------------------------------------------------------------------------------------------------------
+SerializationUtils.clone                              3              3           0          0.0     2634336.0       1.0X
+Utils.cloneProperties                                 0              0           0          0.1       10822.0     243.4X
+
+
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 396d712..44c59e2 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -31,7 +31,6 @@ import scala.reflect.{classTag, ClassTag}
 import scala.util.control.NonFatal
 
 import com.google.common.collect.MapMaker
-import org.apache.commons.lang3.SerializationUtils
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.hadoop.io.{ArrayWritable, BooleanWritable, BytesWritable, DoubleWritable, FloatWritable, IntWritable, LongWritable, NullWritable, Text, Writable}
@@ -346,7 +345,7 @@ class SparkContext(config: SparkConf) extends Logging {
     override protected def childValue(parent: Properties): Properties = {
       // Note: make a clone such that changes in the parent properties aren't reflected in
       // the those of the children threads, which has confusing semantics (SPARK-10563).
-      SerializationUtils.clone(parent)
+      Utils.cloneProperties(parent)
     }
     override protected def initialValue(): Properties = new Properties()
   }
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 9df5945..894234f 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -29,8 +29,6 @@ import scala.collection.mutable.{HashMap, HashSet, ListBuffer}
 import scala.concurrent.duration._
 import scala.util.control.NonFatal
 
-import org.apache.commons.lang3.SerializationUtils
-
 import org.apache.spark._
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics}
@@ -698,7 +696,7 @@ private[spark] class DAGScheduler(
     if (partitions.isEmpty) {
       val time = clock.getTimeMillis()
       listenerBus.post(
-        SparkListenerJobStart(jobId, time, Seq[StageInfo](), SerializationUtils.clone(properties)))
+        SparkListenerJobStart(jobId, time, Seq[StageInfo](), Utils.cloneProperties(properties)))
       listenerBus.post(
         SparkListenerJobEnd(jobId, time, JobSucceeded))
       // Return immediately if the job is running 0 tasks
@@ -710,7 +708,7 @@ private[spark] class DAGScheduler(
     val waiter = new JobWaiter[U](this, jobId, partitions.size, resultHandler)
     eventProcessLoop.post(JobSubmitted(
       jobId, rdd, func2, partitions.toArray, callSite, waiter,
-      SerializationUtils.clone(properties)))
+      Utils.cloneProperties(properties)))
     waiter
   }
 
@@ -782,7 +780,7 @@ private[spark] class DAGScheduler(
     val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
     eventProcessLoop.post(JobSubmitted(
       jobId, rdd, func2, rdd.partitions.indices.toArray, callSite, listener,
-      SerializationUtils.clone(properties)))
+      Utils.cloneProperties(properties)))
     listener.awaitResult()    // Will throw an exception if the job fails
   }
 
@@ -819,7 +817,7 @@ private[spark] class DAGScheduler(
       this, jobId, 1,
       (_: Int, r: MapOutputStatistics) => callback(r))
     eventProcessLoop.post(MapStageSubmitted(
-      jobId, dependency, callSite, waiter, SerializationUtils.clone(properties)))
+      jobId, dependency, callSite, waiter, Utils.cloneProperties(properties)))
     waiter
   }
 
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 5cd937a..c47a23e 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -2950,6 +2950,13 @@ private[spark] object Utils extends Logging {
     val codec = codecFactory.getCodec(path)
     codec == null || codec.isInstanceOf[SplittableCompressionCodec]
   }
+
+  /** Create a new properties object with the same values as `props` */
+  def cloneProperties(props: Properties): Properties = {
+    val resultProps = new Properties()
+    props.forEach((k, v) => resultProps.put(k, v))
+    resultProps
+  }
 }
 
 private[util] object CallerContext extends Logging {
diff --git a/core/src/test/scala/org/apache/spark/benchmark/Benchmark.scala b/core/src/test/scala/org/apache/spark/benchmark/Benchmark.scala
index 73f9d0e..022fcbb 100644
--- a/core/src/test/scala/org/apache/spark/benchmark/Benchmark.scala
+++ b/core/src/test/scala/org/apache/spark/benchmark/Benchmark.scala
@@ -141,12 +141,14 @@ private[spark] class Benchmark(
     val minIters = if (overrideNumIters != 0) overrideNumIters else minNumIters
     val minDuration = if (overrideNumIters != 0) 0 else minTime.toNanos
     val runTimes = ArrayBuffer[Long]()
+    var totalTime = 0L
     var i = 0
-    while (i < minIters || runTimes.sum < minDuration) {
+    while (i < minIters || totalTime < minDuration) {
       val timer = new Benchmark.Timer(i)
       f(timer)
       val runTime = timer.totalTime()
       runTimes += runTime
+      totalTime += runTime
 
       if (outputPerIteration) {
         // scalastyle:off
diff --git a/core/src/test/scala/org/apache/spark/util/PropertiesCloneBenchmark.scala b/core/src/test/scala/org/apache/spark/util/PropertiesCloneBenchmark.scala
new file mode 100644
index 0000000..0726886
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/util/PropertiesCloneBenchmark.scala
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.util
+
+import java.util.Properties
+
+import scala.util.Random
+
+import org.apache.commons.lang.SerializationUtils
+
+import org.apache.spark.benchmark.{Benchmark, BenchmarkBase}
+
+
+/**
+ * Benchmark for Kryo Unsafe vs safe Serialization.
+ * To run this benchmark:
+ * {{{
+ *   1. without sbt:
+ *      bin/spark-submit --class <this class> --jars <spark core test jar>
+ *   2. build/sbt "core/test:runMain <this class>"
+ *   3. generate result:
+ *      SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "core/test:runMain <this class>"
+ *      Results will be written to "benchmarks/PropertiesCloneBenchmark-results.txt".
+ * }}}
+ */
+object PropertiesCloneBenchmark extends BenchmarkBase {
+  /**
+   * Benchmark various cases of cloning properties objects
+   */
+  override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
+    runBenchmark("Properties Cloning") {
+      def compareSerialization(name: String, props: Properties): Unit = {
+        val benchmark = new Benchmark(name, 1, output = output)
+        benchmark.addCase("SerializationUtils.clone") { _ =>
+          SerializationUtils.clone(props)
+        }
+        benchmark.addCase("Utils.cloneProperties") { _ =>
+          Utils.cloneProperties(props)
+        }
+        benchmark.run()
+      }
+      compareSerialization("Empty Properties", new Properties)
+      compareSerialization("System Properties", System.getProperties)
+      compareSerialization("Small Properties", makeRandomProps(10, 40, 100))
+      compareSerialization("Medium Properties", makeRandomProps(50, 40, 100))
+      compareSerialization("Large Properties", makeRandomProps(100, 40, 100))
+    }
+  }
+
+  def makeRandomProps(numProperties: Int, keySize: Int, valueSize: Int): Properties = {
+    val props = new Properties
+    for (_ <- 1 to numProperties) {
+      props.put(
+        Random.alphanumeric.take(keySize),
+        Random.alphanumeric.take(valueSize)
+      )
+    }
+    props
+  }
+}
diff --git a/core/src/test/scala/org/apache/spark/util/ResetSystemProperties.scala b/core/src/test/scala/org/apache/spark/util/ResetSystemProperties.scala
index 75e4504..0b17965 100644
--- a/core/src/test/scala/org/apache/spark/util/ResetSystemProperties.scala
+++ b/core/src/test/scala/org/apache/spark/util/ResetSystemProperties.scala
@@ -19,7 +19,6 @@ package org.apache.spark.util
 
 import java.util.Properties
 
-import org.apache.commons.lang3.SerializationUtils
 import org.scalatest.{BeforeAndAfterEach, Suite}
 
 /**
@@ -43,11 +42,11 @@ private[spark] trait ResetSystemProperties extends BeforeAndAfterEach { this: Su
   var oldProperties: Properties = null
 
   override def beforeEach(): Unit = {
-    // we need SerializationUtils.clone instead of `new Properties(System.getProperties())` because
+    // we need Utils.cloneProperties instead of `new Properties(System.getProperties())` because
     // the later way of creating a copy does not copy the properties but it initializes a new
     // Properties object with the given properties as defaults. They are not recognized at all
     // by standard Scala wrapper over Java Properties then.
-    oldProperties = SerializationUtils.clone(System.getProperties)
+    oldProperties = Utils.cloneProperties(System.getProperties)
     super.beforeEach()
   }
 
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 589dd87..21ffa78 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -26,7 +26,6 @@ import scala.collection.mutable.Queue
 import scala.reflect.ClassTag
 import scala.util.control.NonFatal
 
-import org.apache.commons.lang3.SerializationUtils
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.io.{BytesWritable, LongWritable, Text}
@@ -586,7 +585,7 @@ class StreamingContext private[streaming] (
               sparkContext.setCallSite(startSite.get)
               sparkContext.clearJobGroup()
               sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false")
-              savedProperties.set(SerializationUtils.clone(sparkContext.localProperties.get()))
+              savedProperties.set(Utils.cloneProperties(sparkContext.localProperties.get()))
               scheduler.start()
             }
             state = StreamingContextState.ACTIVE
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
index 68594e8..2388ca8 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
@@ -22,8 +22,6 @@ import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
 import scala.collection.JavaConverters._
 import scala.util.Failure
 
-import org.apache.commons.lang3.SerializationUtils
-
 import org.apache.spark.ExecutorAllocationClient
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.io.SparkHadoopWriterUtils
@@ -31,7 +29,7 @@ import org.apache.spark.rdd.RDD
 import org.apache.spark.streaming._
 import org.apache.spark.streaming.api.python.PythonDStream
 import org.apache.spark.streaming.ui.UIUtils
-import org.apache.spark.util.{EventLoop, ThreadUtils}
+import org.apache.spark.util.{EventLoop, ThreadUtils, Utils}
 
 
 private[scheduler] sealed trait JobSchedulerEvent
@@ -231,7 +229,7 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
     def run() {
       val oldProps = ssc.sparkContext.getLocalProperties
       try {
-        ssc.sparkContext.setLocalProperties(SerializationUtils.clone(ssc.savedProperties.get()))
+        ssc.sparkContext.setLocalProperties(Utils.cloneProperties(ssc.savedProperties.get()))
         val formattedTime = UIUtils.formatBatchTime(
           job.time.milliseconds, ssc.graph.batchDuration.milliseconds, showYYYYMMSS = false)
         val batchUrl = s"/streaming/batch/?id=${job.time.milliseconds}"


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org