You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by da...@apache.org on 2016/02/04 02:07:36 UTC

spark git commit: [SPARK-13131] [SQL] Use best and average time in benchmark

Repository: spark
Updated Branches:
  refs/heads/master 915a75398 -> de0914522


[SPARK-13131] [SQL] Use best and average time in benchmark

Best time is stabler than average time, also added a column for nano seconds per row (which could be used to estimate contributions of each components in a query).

Having best time and average time together for more information (we can see kind of variance).

rate, time per row and relative are all calculated using best time.

The result looks like this:
```
Intel(R) Core(TM) i7-4558U CPU  2.80GHz
rang/filter/sum:                    Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
-------------------------------------------------------------------------------------------
rang/filter/sum codegen=false          14332 / 16646         36.0          27.8       1.0X
rang/filter/sum codegen=true              845 /  940        620.0           1.6      17.0X
```

Author: Davies Liu <da...@databricks.com>

Closes #11018 from davies/gen_bench.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/de091452
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/de091452
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/de091452

Branch: refs/heads/master
Commit: de0914522fc5b2658959f9e2272b4e3162b14978
Parents: 915a753
Author: Davies Liu <da...@databricks.com>
Authored: Wed Feb 3 17:07:27 2016 -0800
Committer: Davies Liu <da...@gmail.com>
Committed: Wed Feb 3 17:07:27 2016 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/util/Benchmark.scala |  38 +++--
 .../execution/BenchmarkWholeStageCodegen.scala  | 154 ++++++++-----------
 2 files changed, 89 insertions(+), 103 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/de091452/core/src/main/scala/org/apache/spark/util/Benchmark.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Benchmark.scala b/core/src/main/scala/org/apache/spark/util/Benchmark.scala
index d484cec..d1699f5 100644
--- a/core/src/main/scala/org/apache/spark/util/Benchmark.scala
+++ b/core/src/main/scala/org/apache/spark/util/Benchmark.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.util
 
 import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
 
 import org.apache.commons.lang3.SystemUtils
 
@@ -59,17 +60,21 @@ private[spark] class Benchmark(
     }
     println
 
-    val firstRate = results.head.avgRate
+    val firstBest = results.head.bestMs
+    val firstAvg = results.head.avgMs
     // The results are going to be processor specific so it is useful to include that.
     println(Benchmark.getProcessorName())
-    printf("%-30s %16s %16s %14s\n", name + ":", "Avg Time(ms)", "Avg Rate(M/s)", "Relative Rate")
-    println("-------------------------------------------------------------------------------")
-    results.zip(benchmarks).foreach { r =>
-      printf("%-30s %16s %16s %14s\n",
-        r._2.name,
-        "%10.2f" format r._1.avgMs,
-        "%10.2f" format r._1.avgRate,
-        "%6.2f X" format (r._1.avgRate / firstRate))
+    printf("%-35s %16s %12s %13s %10s\n", name + ":", "Best/Avg Time(ms)", "Rate(M/s)",
+      "Per Row(ns)", "Relative")
+    println("-----------------------------------------------------------------------------------" +
+      "--------")
+    results.zip(benchmarks).foreach { case (result, benchmark) =>
+      printf("%-35s %16s %12s %13s %10s\n",
+        benchmark.name,
+        "%5.0f / %4.0f" format (result.bestMs, result.avgMs),
+        "%10.1f" format result.bestRate,
+        "%6.1f" format (1000 / result.bestRate),
+        "%3.1fX" format (firstBest / result.bestMs))
     }
     println
     // scalastyle:on
@@ -78,7 +83,7 @@ private[spark] class Benchmark(
 
 private[spark] object Benchmark {
   case class Case(name: String, fn: Int => Unit)
-  case class Result(avgMs: Double, avgRate: Double)
+  case class Result(avgMs: Double, bestRate: Double, bestMs: Double)
 
   /**
    * This should return a user helpful processor information. Getting at this depends on the OS.
@@ -99,22 +104,27 @@ private[spark] object Benchmark {
    * the rate of the function.
    */
   def measure(num: Long, iters: Int, outputPerIteration: Boolean)(f: Int => Unit): Result = {
-    var totalTime = 0L
+    val runTimes = ArrayBuffer[Long]()
     for (i <- 0 until iters + 1) {
       val start = System.nanoTime()
 
       f(i)
 
       val end = System.nanoTime()
-      if (i != 0) totalTime += end - start
+      val runTime = end - start
+      if (i > 0) {
+        runTimes += runTime
+      }
 
       if (outputPerIteration) {
         // scalastyle:off
-        println(s"Iteration $i took ${(end - start) / 1000} microseconds")
+        println(s"Iteration $i took ${runTime / 1000} microseconds")
         // scalastyle:on
       }
     }
-    Result(totalTime.toDouble / 1000000 / iters, num * iters / (totalTime.toDouble / 1000))
+    val best = runTimes.min
+    val avg = runTimes.sum / iters
+    Result(avg / 1000000, num / (best / 1000), best / 1000000)
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/de091452/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala
index 15ba773..33d4976 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala
@@ -34,54 +34,47 @@ import org.apache.spark.util.Benchmark
   */
 class BenchmarkWholeStageCodegen extends SparkFunSuite {
   lazy val conf = new SparkConf().setMaster("local[1]").setAppName("benchmark")
+    .set("spark.sql.shuffle.partitions", "1")
   lazy val sc = SparkContext.getOrCreate(conf)
   lazy val sqlContext = SQLContext.getOrCreate(sc)
 
-  def testWholeStage(values: Int): Unit = {
-    val benchmark = new Benchmark("rang/filter/aggregate", values)
+  def runBenchmark(name: String, values: Int)(f: => Unit): Unit = {
+    val benchmark = new Benchmark(name, values)
 
-    benchmark.addCase("Without codegen") { iter =>
-      sqlContext.setConf("spark.sql.codegen.wholeStage", "false")
-      sqlContext.range(values).filter("(id & 1) = 1").count()
-    }
-
-    benchmark.addCase("With codegen") { iter =>
-      sqlContext.setConf("spark.sql.codegen.wholeStage", "true")
-      sqlContext.range(values).filter("(id & 1) = 1").count()
+    Seq(false, true).foreach { enabled =>
+      benchmark.addCase(s"$name codegen=$enabled") { iter =>
+        sqlContext.setConf("spark.sql.codegen.wholeStage", enabled.toString)
+        f
+      }
     }
 
-    /*
-      Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz
-      rang/filter/aggregate:            Avg Time(ms)    Avg Rate(M/s)  Relative Rate
-      -------------------------------------------------------------------------------
-      Without codegen             7775.53            26.97         1.00 X
-      With codegen                 342.15           612.94        22.73 X
-    */
     benchmark.run()
   }
 
-  def testStatFunctions(values: Int): Unit = {
-
-    val benchmark = new Benchmark("stat functions", values)
-
-    benchmark.addCase("stddev w/o codegen") { iter =>
-      sqlContext.setConf("spark.sql.codegen.wholeStage", "false")
-      sqlContext.range(values).groupBy().agg("id" -> "stddev").collect()
+  // These benchmark are skipped in normal build
+  ignore("range/filter/sum") {
+    val N = 500 << 20
+    runBenchmark("rang/filter/sum", N) {
+      sqlContext.range(N).filter("(id & 1) = 1").groupBy().sum().collect()
     }
+    /*
+    Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz
+    rang/filter/sum:                    Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
+    -------------------------------------------------------------------------------------------
+    rang/filter/sum codegen=false          14332 / 16646         36.0          27.8       1.0X
+    rang/filter/sum codegen=true              845 /  940        620.0           1.6      17.0X
+    */
+  }
 
-    benchmark.addCase("stddev w codegen") { iter =>
-      sqlContext.setConf("spark.sql.codegen.wholeStage", "true")
-      sqlContext.range(values).groupBy().agg("id" -> "stddev").collect()
-    }
+  ignore("stat functions") {
+    val N = 100 << 20
 
-    benchmark.addCase("kurtosis w/o codegen") { iter =>
-      sqlContext.setConf("spark.sql.codegen.wholeStage", "false")
-      sqlContext.range(values).groupBy().agg("id" -> "kurtosis").collect()
+    runBenchmark("stddev", N) {
+      sqlContext.range(N).groupBy().agg("id" -> "stddev").collect()
     }
 
-    benchmark.addCase("kurtosis w codegen") { iter =>
-      sqlContext.setConf("spark.sql.codegen.wholeStage", "true")
-      sqlContext.range(values).groupBy().agg("id" -> "kurtosis").collect()
+    runBenchmark("kurtosis", N) {
+      sqlContext.range(N).groupBy().agg("id" -> "kurtosis").collect()
     }
 
 
@@ -99,64 +92,56 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite {
       Using DeclarativeAggregate:
 
       Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz
-      stddev:                            Avg Time(ms)    Avg Rate(M/s)  Relative Rate
-      -------------------------------------------------------------------------------
-      stddev w/o codegen                       989.22            21.20         1.00 X
-      stddev w codegen                         352.35            59.52         2.81 X
-      kurtosis w/o codegen                    3636.91             5.77         0.27 X
-      kurtosis w codegen                       369.25            56.79         2.68 X
+      stddev:                             Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
+      -------------------------------------------------------------------------------------------
+      stddev codegen=false                     5630 / 5776         18.0          55.6       1.0X
+      stddev codegen=true                      1259 / 1314         83.0          12.0       4.5X
+
+      Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz
+      kurtosis:                           Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
+      -------------------------------------------------------------------------------------------
+      kurtosis codegen=false                 14847 / 15084          7.0         142.9       1.0X
+      kurtosis codegen=true                    1652 / 2124         63.0          15.9       9.0X
       */
-    benchmark.run()
   }
 
-  def testAggregateWithKey(values: Int): Unit = {
-    val benchmark = new Benchmark("Aggregate with keys", values)
+  ignore("aggregate with keys") {
+    val N = 20 << 20
 
-    benchmark.addCase("Aggregate w/o codegen") { iter =>
-      sqlContext.setConf("spark.sql.codegen.wholeStage", "false")
-      sqlContext.range(values).selectExpr("(id & 65535) as k").groupBy("k").sum().collect()
-    }
-    benchmark.addCase(s"Aggregate w codegen") { iter =>
-      sqlContext.setConf("spark.sql.codegen.wholeStage", "true")
-      sqlContext.range(values).selectExpr("(id & 65535) as k").groupBy("k").sum().collect()
+    runBenchmark("Aggregate w keys", N) {
+      sqlContext.range(N).selectExpr("(id & 65535) as k").groupBy("k").sum().collect()
     }
 
     /*
     Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz
-    Aggregate with keys:               Avg Time(ms)    Avg Rate(M/s)  Relative Rate
-    -------------------------------------------------------------------------------
-    Aggregate w/o codegen                   4254.38             4.93         1.00 X
-    Aggregate w codegen                     2661.45             7.88         1.60 X
+    Aggregate w keys:                   Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
+    -------------------------------------------------------------------------------------------
+    Aggregate w keys codegen=false           2402 / 2551          8.0         125.0       1.0X
+    Aggregate w keys codegen=true            1620 / 1670         12.0          83.3       1.5X
     */
-    benchmark.run()
   }
 
-  def testBroadcastHashJoin(values: Int): Unit = {
-    val benchmark = new Benchmark("BroadcastHashJoin", values)
-
+  ignore("broadcast hash join") {
+    val N = 20 << 20
     val dim = broadcast(sqlContext.range(1 << 16).selectExpr("id as k", "cast(id as string) as v"))
 
-    benchmark.addCase("BroadcastHashJoin w/o codegen") { iter =>
-      sqlContext.setConf("spark.sql.codegen.wholeStage", "false")
-      sqlContext.range(values).join(dim, (col("id") % 60000) === col("k")).count()
-    }
-    benchmark.addCase(s"BroadcastHashJoin w codegen") { iter =>
-      sqlContext.setConf("spark.sql.codegen.wholeStage", "true")
-      sqlContext.range(values).join(dim, (col("id") % 60000) === col("k")).count()
+    runBenchmark("BroadcastHashJoin", N) {
+      sqlContext.range(N).join(dim, (col("id") % 60000) === col("k")).count()
     }
 
     /*
-      Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz
-      BroadcastHashJoin:                 Avg Time(ms)    Avg Rate(M/s)  Relative Rate
-      -------------------------------------------------------------------------------
-      BroadcastHashJoin w/o codegen           3053.41             3.43         1.00 X
-      BroadcastHashJoin w codegen             1028.40            10.20         2.97 X
+    Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz
+    BroadcastHashJoin:                  Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
+    -------------------------------------------------------------------------------------------
+    BroadcastHashJoin codegen=false          4405 / 6147          4.0         250.0       1.0X
+    BroadcastHashJoin codegen=true           1857 / 1878         11.0          90.9       2.4X
     */
-    benchmark.run()
   }
 
-  def testBytesToBytesMap(values: Int): Unit = {
-    val benchmark = new Benchmark("BytesToBytesMap", values)
+  ignore("hash and BytesToBytesMap") {
+    val N = 50 << 20
+
+    val benchmark = new Benchmark("BytesToBytesMap", N)
 
     benchmark.addCase("hash") { iter =>
       var i = 0
@@ -167,7 +152,7 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite {
       val value = new UnsafeRow(2)
       value.pointTo(valueBytes, Platform.BYTE_ARRAY_OFFSET, 16)
       var s = 0
-      while (i < values) {
+      while (i < N) {
         key.setInt(0, i % 1000)
         val h = Murmur3_x86_32.hashUnsafeWords(
           key.getBaseObject, key.getBaseOffset, key.getSizeInBytes, 0)
@@ -194,7 +179,7 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite {
         val value = new UnsafeRow(2)
         value.pointTo(valueBytes, Platform.BYTE_ARRAY_OFFSET, 16)
         var i = 0
-        while (i < values) {
+        while (i < N) {
           key.setInt(0, i % 65536)
           val loc = map.lookup(key.getBaseObject, key.getBaseOffset, key.getSizeInBytes)
           if (loc.isDefined) {
@@ -212,21 +197,12 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite {
 
     /**
     Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz
-    Aggregate with keys:               Avg Time(ms)    Avg Rate(M/s)  Relative Rate
-    -------------------------------------------------------------------------------
-    hash                                     662.06            79.19         1.00 X
-    BytesToBytesMap (off Heap)              2209.42            23.73         0.30 X
-    BytesToBytesMap (on Heap)               2957.68            17.73         0.22 X
+    BytesToBytesMap:                    Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
+    -------------------------------------------------------------------------------------------
+    hash                                      628 /  661         83.0          12.0       1.0X
+    BytesToBytesMap (off Heap)               3292 / 3408         15.0          66.7       0.2X
+    BytesToBytesMap (on Heap)                3349 / 4267         15.0          66.7       0.2X
       */
     benchmark.run()
   }
-
-  // These benchmark are skipped in normal build
-  ignore("benchmark") {
-    // testWholeStage(200 << 20)
-    // testStatFunctions(20 << 20)
-    // testAggregateWithKey(20 << 20)
-    // testBytesToBytesMap(50 << 20)
-    // testBroadcastHashJoin(10 << 20)
-  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org