You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2019/01/09 17:54:51 UTC
[spark] branch master updated: [SPARK-25484][SQL][TEST] Refactor
ExternalAppendOnlyUnsafeRowArrayBenchmark
This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 49c062b [SPARK-25484][SQL][TEST] Refactor ExternalAppendOnlyUnsafeRowArrayBenchmark
49c062b is described below
commit 49c062b2e0487b13b732b18edde105e1f000c20d
Author: Peter Toth <pe...@gmail.com>
AuthorDate: Wed Jan 9 09:54:21 2019 -0800
[SPARK-25484][SQL][TEST] Refactor ExternalAppendOnlyUnsafeRowArrayBenchmark
## What changes were proposed in this pull request?
Refactor ExternalAppendOnlyUnsafeRowArrayBenchmark to use main method.
## How was this patch tested?
Manually tested and regenerated results.
Please note that `spark.memory.debugFill` setting has a huge impact on this benchmark. Since it is set to true by default when running the benchmark from SBT, we need to disable it:
```
SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt ";project sql;set javaOptions in Test += \"-Dspark.memory.debugFill=false\";test:runMain org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArrayBenchmark"
```
Closes #22617 from peter-toth/SPARK-25484.
Lead-authored-by: Peter Toth <pe...@gmail.com>
Co-authored-by: Peter Toth <pt...@hortonworks.com>
Co-authored-by: Dongjoon Hyun <do...@apache.org>
Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
...alAppendOnlyUnsafeRowArrayBenchmark-results.txt | 45 ++++++
...ExternalAppendOnlyUnsafeRowArrayBenchmark.scala | 158 ++++++++-------------
2 files changed, 105 insertions(+), 98 deletions(-)
diff --git a/sql/core/benchmarks/ExternalAppendOnlyUnsafeRowArrayBenchmark-results.txt b/sql/core/benchmarks/ExternalAppendOnlyUnsafeRowArrayBenchmark-results.txt
new file mode 100644
index 0000000..02c6b72
--- /dev/null
+++ b/sql/core/benchmarks/ExternalAppendOnlyUnsafeRowArrayBenchmark-results.txt
@@ -0,0 +1,45 @@
+================================================================================================
+WITHOUT SPILL
+================================================================================================
+
+OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64
+Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
+Array with 100000 rows: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
+------------------------------------------------------------------------------------------------
+ArrayBuffer 6378 / 6550 16.1 62.3 1.0X
+ExternalAppendOnlyUnsafeRowArray 6196 / 6242 16.5 60.5 1.0X
+
+OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64
+Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
+Array with 1000 rows: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
+------------------------------------------------------------------------------------------------
+ArrayBuffer 11988 / 12027 21.9 45.7 1.0X
+ExternalAppendOnlyUnsafeRowArray 37480 / 37574 7.0 143.0 0.3X
+
+OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64
+Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
+Array with 30000 rows: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
+------------------------------------------------------------------------------------------------
+ArrayBuffer 23536 / 23538 20.9 47.9 1.0X
+ExternalAppendOnlyUnsafeRowArray 31275 / 31277 15.7 63.6 0.8X
+
+
+================================================================================================
+WITH SPILL
+================================================================================================
+
+OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64
+Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
+Spilling with 1000 rows: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
+------------------------------------------------------------------------------------------------
+UnsafeExternalSorter 29241 / 29279 9.0 111.5 1.0X
+ExternalAppendOnlyUnsafeRowArray 14309 / 14313 18.3 54.6 2.0X
+
+OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64
+Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
+Spilling with 10000 rows: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
+------------------------------------------------------------------------------------------------
+UnsafeExternalSorter 11 / 11 14.8 67.4 1.0X
+ExternalAppendOnlyUnsafeRowArray 9 / 9 17.6 56.8 1.2X
+
+
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArrayBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArrayBenchmark.scala
index 611b2fc..e174dc6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArrayBenchmark.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArrayBenchmark.scala
@@ -20,24 +20,57 @@ package org.apache.spark.sql.execution
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.{SparkConf, SparkContext, SparkEnv, TaskContext}
-import org.apache.spark.benchmark.Benchmark
+import org.apache.spark.benchmark.{Benchmark, BenchmarkBase}
import org.apache.spark.internal.config
import org.apache.spark.memory.MemoryTestingUtils
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter
-object ExternalAppendOnlyUnsafeRowArrayBenchmark {
+/**
+ * Benchmark ExternalAppendOnlyUnsafeRowArray.
+ * To run this benchmark:
+ * {{{
+ * 1. without sbt:
+ * bin/spark-submit --class <this class> --jars <spark core test jar> <spark sql test jar>
+ * 2. build/sbt build/sbt ";project sql;set javaOptions
+ * in Test += \"-Dspark.memory.debugFill=false\";test:runMain <this class>"
+ * 3. generate result: SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt ";project sql;set javaOptions
+ * in Test += \"-Dspark.memory.debugFill=false\";test:runMain <this class>"
+ * Results will be written to
+ * "benchmarks/ExternalAppendOnlyUnsafeRowArrayBenchmark-results.txt".
+ * }}}
+ */
+object ExternalAppendOnlyUnsafeRowArrayBenchmark extends BenchmarkBase {
- def testAgainstRawArrayBuffer(numSpillThreshold: Int, numRows: Int, iterations: Int): Unit = {
+ private val conf = new SparkConf(false)
+ // Make the Java serializer write a reset instruction (TC_RESET) after each object to test
+ // for a bug we had with bytes written past the last object in a batch (SPARK-2792)
+ .set("spark.serializer.objectStreamReset", "1")
+ .set("spark.serializer", "org.apache.spark.serializer.JavaSerializer")
+
+ private def withFakeTaskContext(f: => Unit): Unit = {
+ val sc = new SparkContext("local", "test", conf)
+ val taskContext = MemoryTestingUtils.fakeTaskContext(SparkEnv.get)
+ TaskContext.setTaskContext(taskContext)
+ f
+ sc.stop()
+ }
+
+ private def testRows(numRows: Int): Seq[UnsafeRow] = {
val random = new java.util.Random()
- val rows = (1 to numRows).map(_ => {
+ (1 to numRows).map(_ => {
val row = new UnsafeRow(1)
row.pointTo(new Array[Byte](64), 16)
row.setLong(0, random.nextLong())
row
})
+ }
- val benchmark = new Benchmark(s"Array with $numRows rows", iterations * numRows)
+ def testAgainstRawArrayBuffer(numSpillThreshold: Int, numRows: Int, iterations: Int): Unit = {
+ val rows = testRows(numRows)
+
+ val benchmark = new Benchmark(s"Array with $numRows rows", iterations * numRows,
+ output = output)
// Internally, `ExternalAppendOnlyUnsafeRowArray` will create an
// in-memory buffer of size `numSpillThreshold`. This will mimic that
@@ -82,33 +115,19 @@ object ExternalAppendOnlyUnsafeRowArrayBenchmark {
}
}
- val conf = new SparkConf(false)
- // Make the Java serializer write a reset instruction (TC_RESET) after each object to test
- // for a bug we had with bytes written past the last object in a batch (SPARK-2792)
- conf.set("spark.serializer.objectStreamReset", "1")
- conf.set("spark.serializer", "org.apache.spark.serializer.JavaSerializer")
-
- val sc = new SparkContext("local", "test", conf)
- val taskContext = MemoryTestingUtils.fakeTaskContext(SparkEnv.get)
- TaskContext.setTaskContext(taskContext)
- benchmark.run()
- sc.stop()
+ withFakeTaskContext {
+ benchmark.run()
+ }
}
def testAgainstRawUnsafeExternalSorter(
numSpillThreshold: Int,
numRows: Int,
iterations: Int): Unit = {
+ val rows = testRows(numRows)
- val random = new java.util.Random()
- val rows = (1 to numRows).map(_ => {
- val row = new UnsafeRow(1)
- row.pointTo(new Array[Byte](64), 16)
- row.setLong(0, random.nextLong())
- row
- })
-
- val benchmark = new Benchmark(s"Spilling with $numRows rows", iterations * numRows)
+ val benchmark = new Benchmark(s"Spilling with $numRows rows", iterations * numRows,
+ output = output)
benchmark.addCase("UnsafeExternalSorter") { _: Int =>
var sum = 0L
@@ -158,80 +177,23 @@ object ExternalAppendOnlyUnsafeRowArrayBenchmark {
}
}
- val conf = new SparkConf(false)
- // Make the Java serializer write a reset instruction (TC_RESET) after each object to test
- // for a bug we had with bytes written past the last object in a batch (SPARK-2792)
- conf.set("spark.serializer.objectStreamReset", "1")
- conf.set("spark.serializer", "org.apache.spark.serializer.JavaSerializer")
-
- val sc = new SparkContext("local", "test", conf)
- val taskContext = MemoryTestingUtils.fakeTaskContext(SparkEnv.get)
- TaskContext.setTaskContext(taskContext)
- benchmark.run()
- sc.stop()
+ withFakeTaskContext {
+ benchmark.run()
+ }
}
- def main(args: Array[String]): Unit = {
-
- // ========================================================================================= //
- // WITHOUT SPILL
- // ========================================================================================= //
-
- val spillThreshold = 100 * 1000
-
- /*
- Intel(R) Core(TM) i7-6920HQ CPU @ 2.90GHz
-
- Array with 1000 rows: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
- ------------------------------------------------------------------------------------------------
- ArrayBuffer 7821 / 7941 33.5 29.8 1.0X
- ExternalAppendOnlyUnsafeRowArray 8798 / 8819 29.8 33.6 0.9X
- */
- testAgainstRawArrayBuffer(spillThreshold, 1000, 1 << 18)
-
- /*
- Intel(R) Core(TM) i7-6920HQ CPU @ 2.90GHz
-
- Array with 30000 rows: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
- ------------------------------------------------------------------------------------------------
- ArrayBuffer 19200 / 19206 25.6 39.1 1.0X
- ExternalAppendOnlyUnsafeRowArray 19558 / 19562 25.1 39.8 1.0X
- */
- testAgainstRawArrayBuffer(spillThreshold, 30 * 1000, 1 << 14)
-
- /*
- Intel(R) Core(TM) i7-6920HQ CPU @ 2.90GHz
-
- Array with 100000 rows: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
- ------------------------------------------------------------------------------------------------
- ArrayBuffer 5949 / 6028 17.2 58.1 1.0X
- ExternalAppendOnlyUnsafeRowArray 6078 / 6138 16.8 59.4 1.0X
- */
- testAgainstRawArrayBuffer(spillThreshold, 100 * 1000, 1 << 10)
-
- // ========================================================================================= //
- // WITH SPILL
- // ========================================================================================= //
-
- /*
- Intel(R) Core(TM) i7-6920HQ CPU @ 2.90GHz
-
- Spilling with 1000 rows: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
- ------------------------------------------------------------------------------------------------
- UnsafeExternalSorter 9239 / 9470 28.4 35.2 1.0X
- ExternalAppendOnlyUnsafeRowArray 8857 / 8909 29.6 33.8 1.0X
- */
- testAgainstRawUnsafeExternalSorter(100 * 1000, 1000, 1 << 18)
-
- /*
- Intel(R) Core(TM) i7-6920HQ CPU @ 2.90GHz
-
- Spilling with 10000 rows: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
- ------------------------------------------------------------------------------------------------
- UnsafeExternalSorter 4 / 5 39.3 25.5 1.0X
- ExternalAppendOnlyUnsafeRowArray 5 / 6 29.8 33.5 0.8X
- */
- testAgainstRawUnsafeExternalSorter(
- config.SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD.defaultValue.get, 10 * 1000, 1 << 4)
+ override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
+ runBenchmark("WITHOUT SPILL") {
+ val spillThreshold = 100 * 1000
+ testAgainstRawArrayBuffer(spillThreshold, 100 * 1000, 1 << 10)
+ testAgainstRawArrayBuffer(spillThreshold, 1000, 1 << 18)
+ testAgainstRawArrayBuffer(spillThreshold, 30 * 1000, 1 << 14)
+ }
+
+ runBenchmark("WITH SPILL") {
+ testAgainstRawUnsafeExternalSorter(100 * 1000, 1000, 1 << 18)
+ testAgainstRawUnsafeExternalSorter(
+ config.SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD.defaultValue.get, 10 * 1000, 1 << 4)
+ }
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org