You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2019/01/09 17:54:51 UTC

[spark] branch master updated: [SPARK-25484][SQL][TEST] Refactor ExternalAppendOnlyUnsafeRowArrayBenchmark

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 49c062b  [SPARK-25484][SQL][TEST] Refactor ExternalAppendOnlyUnsafeRowArrayBenchmark
49c062b is described below

commit 49c062b2e0487b13b732b18edde105e1f000c20d
Author: Peter Toth <pe...@gmail.com>
AuthorDate: Wed Jan 9 09:54:21 2019 -0800

    [SPARK-25484][SQL][TEST] Refactor ExternalAppendOnlyUnsafeRowArrayBenchmark
    
    ## What changes were proposed in this pull request?
    
    Refactor ExternalAppendOnlyUnsafeRowArrayBenchmark to use main method.
    
    ## How was this patch tested?
    
    Manually tested and regenerated results.
    Please note that `spark.memory.debugFill` setting has a huge impact on this benchmark. Since it is set to true by default when running the benchmark from SBT, we need to disable it:
    ```
    SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt ";project sql;set javaOptions in Test += \"-Dspark.memory.debugFill=false\";test:runMain org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArrayBenchmark"
    ```
    
    Closes #22617 from peter-toth/SPARK-25484.
    
    Lead-authored-by: Peter Toth <pe...@gmail.com>
    Co-authored-by: Peter Toth <pt...@hortonworks.com>
    Co-authored-by: Dongjoon Hyun <do...@apache.org>
    Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
 ...alAppendOnlyUnsafeRowArrayBenchmark-results.txt |  45 ++++++
 ...ExternalAppendOnlyUnsafeRowArrayBenchmark.scala | 158 ++++++++-------------
 2 files changed, 105 insertions(+), 98 deletions(-)

diff --git a/sql/core/benchmarks/ExternalAppendOnlyUnsafeRowArrayBenchmark-results.txt b/sql/core/benchmarks/ExternalAppendOnlyUnsafeRowArrayBenchmark-results.txt
new file mode 100644
index 0000000..02c6b72
--- /dev/null
+++ b/sql/core/benchmarks/ExternalAppendOnlyUnsafeRowArrayBenchmark-results.txt
@@ -0,0 +1,45 @@
+================================================================================================
+WITHOUT SPILL
+================================================================================================
+
+OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64
+Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
+Array with 100000 rows:                  Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
+------------------------------------------------------------------------------------------------
+ArrayBuffer                                   6378 / 6550         16.1          62.3       1.0X
+ExternalAppendOnlyUnsafeRowArray              6196 / 6242         16.5          60.5       1.0X
+
+OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64
+Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
+Array with 1000 rows:                    Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
+------------------------------------------------------------------------------------------------
+ArrayBuffer                                 11988 / 12027         21.9          45.7       1.0X
+ExternalAppendOnlyUnsafeRowArray            37480 / 37574          7.0         143.0       0.3X
+
+OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64
+Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
+Array with 30000 rows:                   Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
+------------------------------------------------------------------------------------------------
+ArrayBuffer                                 23536 / 23538         20.9          47.9       1.0X
+ExternalAppendOnlyUnsafeRowArray            31275 / 31277         15.7          63.6       0.8X
+
+
+================================================================================================
+WITH SPILL
+================================================================================================
+
+OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64
+Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
+Spilling with 1000 rows:                 Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
+------------------------------------------------------------------------------------------------
+UnsafeExternalSorter                        29241 / 29279          9.0         111.5       1.0X
+ExternalAppendOnlyUnsafeRowArray            14309 / 14313         18.3          54.6       2.0X
+
+OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64
+Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
+Spilling with 10000 rows:                Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
+------------------------------------------------------------------------------------------------
+UnsafeExternalSorter                            11 /   11         14.8          67.4       1.0X
+ExternalAppendOnlyUnsafeRowArray                 9 /    9         17.6          56.8       1.2X
+
+
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArrayBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArrayBenchmark.scala
index 611b2fc..e174dc6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArrayBenchmark.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArrayBenchmark.scala
@@ -20,24 +20,57 @@ package org.apache.spark.sql.execution
 import scala.collection.mutable.ArrayBuffer
 
 import org.apache.spark.{SparkConf, SparkContext, SparkEnv, TaskContext}
-import org.apache.spark.benchmark.Benchmark
+import org.apache.spark.benchmark.{Benchmark, BenchmarkBase}
 import org.apache.spark.internal.config
 import org.apache.spark.memory.MemoryTestingUtils
 import org.apache.spark.sql.catalyst.expressions.UnsafeRow
 import org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter
 
-object ExternalAppendOnlyUnsafeRowArrayBenchmark {
+/**
+ * Benchmark ExternalAppendOnlyUnsafeRowArray.
+ * To run this benchmark:
+ * {{{
+ *   1. without sbt:
+ *      bin/spark-submit --class <this class> --jars <spark core test jar> <spark sql test jar>
+ *   2. build/sbt build/sbt ";project sql;set javaOptions
+ *        in Test += \"-Dspark.memory.debugFill=false\";test:runMain <this class>"
+ *   3. generate result: SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt ";project sql;set javaOptions
+ *        in Test += \"-Dspark.memory.debugFill=false\";test:runMain <this class>"
+ *      Results will be written to
+ *      "benchmarks/ExternalAppendOnlyUnsafeRowArrayBenchmark-results.txt".
+ * }}}
+ */
+object ExternalAppendOnlyUnsafeRowArrayBenchmark extends BenchmarkBase {
 
-  def testAgainstRawArrayBuffer(numSpillThreshold: Int, numRows: Int, iterations: Int): Unit = {
+  private val conf = new SparkConf(false)
+    // Make the Java serializer write a reset instruction (TC_RESET) after each object to test
+    // for a bug we had with bytes written past the last object in a batch (SPARK-2792)
+    .set("spark.serializer.objectStreamReset", "1")
+    .set("spark.serializer", "org.apache.spark.serializer.JavaSerializer")
+
+  private def withFakeTaskContext(f: => Unit): Unit = {
+    val sc = new SparkContext("local", "test", conf)
+    val taskContext = MemoryTestingUtils.fakeTaskContext(SparkEnv.get)
+    TaskContext.setTaskContext(taskContext)
+    f
+    sc.stop()
+  }
+
+  private def testRows(numRows: Int): Seq[UnsafeRow] = {
     val random = new java.util.Random()
-    val rows = (1 to numRows).map(_ => {
+    (1 to numRows).map(_ => {
       val row = new UnsafeRow(1)
       row.pointTo(new Array[Byte](64), 16)
       row.setLong(0, random.nextLong())
       row
     })
+  }
 
-    val benchmark = new Benchmark(s"Array with $numRows rows", iterations * numRows)
+  def testAgainstRawArrayBuffer(numSpillThreshold: Int, numRows: Int, iterations: Int): Unit = {
+    val rows = testRows(numRows)
+
+    val benchmark = new Benchmark(s"Array with $numRows rows", iterations * numRows,
+      output = output)
 
     // Internally, `ExternalAppendOnlyUnsafeRowArray` will create an
     // in-memory buffer of size `numSpillThreshold`. This will mimic that
@@ -82,33 +115,19 @@ object ExternalAppendOnlyUnsafeRowArrayBenchmark {
       }
     }
 
-    val conf = new SparkConf(false)
-    // Make the Java serializer write a reset instruction (TC_RESET) after each object to test
-    // for a bug we had with bytes written past the last object in a batch (SPARK-2792)
-    conf.set("spark.serializer.objectStreamReset", "1")
-    conf.set("spark.serializer", "org.apache.spark.serializer.JavaSerializer")
-
-    val sc = new SparkContext("local", "test", conf)
-    val taskContext = MemoryTestingUtils.fakeTaskContext(SparkEnv.get)
-    TaskContext.setTaskContext(taskContext)
-    benchmark.run()
-    sc.stop()
+    withFakeTaskContext {
+      benchmark.run()
+    }
   }
 
   def testAgainstRawUnsafeExternalSorter(
       numSpillThreshold: Int,
       numRows: Int,
       iterations: Int): Unit = {
+    val rows = testRows(numRows)
 
-    val random = new java.util.Random()
-    val rows = (1 to numRows).map(_ => {
-      val row = new UnsafeRow(1)
-      row.pointTo(new Array[Byte](64), 16)
-      row.setLong(0, random.nextLong())
-      row
-    })
-
-    val benchmark = new Benchmark(s"Spilling with $numRows rows", iterations * numRows)
+    val benchmark = new Benchmark(s"Spilling with $numRows rows", iterations * numRows,
+      output = output)
 
     benchmark.addCase("UnsafeExternalSorter") { _: Int =>
       var sum = 0L
@@ -158,80 +177,23 @@ object ExternalAppendOnlyUnsafeRowArrayBenchmark {
       }
     }
 
-    val conf = new SparkConf(false)
-    // Make the Java serializer write a reset instruction (TC_RESET) after each object to test
-    // for a bug we had with bytes written past the last object in a batch (SPARK-2792)
-    conf.set("spark.serializer.objectStreamReset", "1")
-    conf.set("spark.serializer", "org.apache.spark.serializer.JavaSerializer")
-
-    val sc = new SparkContext("local", "test", conf)
-    val taskContext = MemoryTestingUtils.fakeTaskContext(SparkEnv.get)
-    TaskContext.setTaskContext(taskContext)
-    benchmark.run()
-    sc.stop()
+    withFakeTaskContext {
+      benchmark.run()
+    }
   }
 
-  def main(args: Array[String]): Unit = {
-
-    // ========================================================================================= //
-    // WITHOUT SPILL
-    // ========================================================================================= //
-
-    val spillThreshold = 100 * 1000
-
-    /*
-    Intel(R) Core(TM) i7-6920HQ CPU @ 2.90GHz
-
-    Array with 1000 rows:                    Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
-    ------------------------------------------------------------------------------------------------
-    ArrayBuffer                                   7821 / 7941         33.5          29.8       1.0X
-    ExternalAppendOnlyUnsafeRowArray              8798 / 8819         29.8          33.6       0.9X
-    */
-    testAgainstRawArrayBuffer(spillThreshold, 1000, 1 << 18)
-
-    /*
-    Intel(R) Core(TM) i7-6920HQ CPU @ 2.90GHz
-
-    Array with 30000 rows:                   Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
-    ------------------------------------------------------------------------------------------------
-    ArrayBuffer                                 19200 / 19206         25.6          39.1       1.0X
-    ExternalAppendOnlyUnsafeRowArray            19558 / 19562         25.1          39.8       1.0X
-    */
-    testAgainstRawArrayBuffer(spillThreshold, 30 * 1000, 1 << 14)
-
-    /*
-    Intel(R) Core(TM) i7-6920HQ CPU @ 2.90GHz
-
-    Array with 100000 rows:                  Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
-    ------------------------------------------------------------------------------------------------
-    ArrayBuffer                                   5949 / 6028         17.2          58.1       1.0X
-    ExternalAppendOnlyUnsafeRowArray              6078 / 6138         16.8          59.4       1.0X
-    */
-    testAgainstRawArrayBuffer(spillThreshold, 100 * 1000, 1 << 10)
-
-    // ========================================================================================= //
-    // WITH SPILL
-    // ========================================================================================= //
-
-    /*
-    Intel(R) Core(TM) i7-6920HQ CPU @ 2.90GHz
-
-    Spilling with 1000 rows:                 Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
-    ------------------------------------------------------------------------------------------------
-    UnsafeExternalSorter                          9239 / 9470         28.4          35.2       1.0X
-    ExternalAppendOnlyUnsafeRowArray              8857 / 8909         29.6          33.8       1.0X
-    */
-    testAgainstRawUnsafeExternalSorter(100 * 1000, 1000, 1 << 18)
-
-    /*
-    Intel(R) Core(TM) i7-6920HQ CPU @ 2.90GHz
-
-    Spilling with 10000 rows:                Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
-    ------------------------------------------------------------------------------------------------
-    UnsafeExternalSorter                             4 /    5         39.3          25.5       1.0X
-    ExternalAppendOnlyUnsafeRowArray                 5 /    6         29.8          33.5       0.8X
-    */
-    testAgainstRawUnsafeExternalSorter(
-      config.SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD.defaultValue.get, 10 * 1000, 1 << 4)
+  override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
+    runBenchmark("WITHOUT SPILL") {
+      val spillThreshold = 100 * 1000
+      testAgainstRawArrayBuffer(spillThreshold, 100 * 1000, 1 << 10)
+      testAgainstRawArrayBuffer(spillThreshold, 1000, 1 << 18)
+      testAgainstRawArrayBuffer(spillThreshold, 30 * 1000, 1 << 14)
+    }
+
+    runBenchmark("WITH SPILL") {
+      testAgainstRawUnsafeExternalSorter(100 * 1000, 1000, 1 << 18)
+      testAgainstRawUnsafeExternalSorter(
+        config.SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD.defaultValue.get, 10 * 1000, 1 << 4)
+    }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org