You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2016/02/26 05:17:51 UTC

spark git commit: [SPARK-13361][SQL] Add benchmark codes for Encoder#compress() in CompressionSchemeBenchmark

Repository: spark
Updated Branches:
  refs/heads/master 633d63a48 -> 1b39fafa7


[SPARK-13361][SQL] Add benchmark codes for Encoder#compress() in CompressionSchemeBenchmark

This pr added benchmark codes for Encoder#compress().
Also, it replaced the benchmark results with new ones because the output format of `Benchmark` changed.

Author: Takeshi YAMAMURO <li...@gmail.com>

Closes #11236 from maropu/CompressionSpike.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1b39fafa
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1b39fafa
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1b39fafa

Branch: refs/heads/master
Commit: 1b39fafa75a162f183824ff2daa61d73b05ebc83
Parents: 633d63a
Author: Takeshi YAMAMURO <li...@gmail.com>
Authored: Thu Feb 25 20:17:48 2016 -0800
Committer: Reynold Xin <rx...@databricks.com>
Committed: Thu Feb 25 20:17:48 2016 -0800

----------------------------------------------------------------------
 .../CompressionSchemeBenchmark.scala            | 282 +++++++++++++------
 1 file changed, 193 insertions(+), 89 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/1b39fafa/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/CompressionSchemeBenchmark.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/CompressionSchemeBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/CompressionSchemeBenchmark.scala
index 95eb5cf..0000a5d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/CompressionSchemeBenchmark.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/CompressionSchemeBenchmark.scala
@@ -17,19 +17,13 @@
 
 package org.apache.spark.sql.execution.columnar.compression
 
-import java.nio.ByteBuffer
-import java.nio.ByteOrder
+import java.nio.{ByteBuffer, ByteOrder}
 
 import org.apache.commons.lang3.RandomStringUtils
 import org.apache.commons.math3.distribution.LogNormalDistribution
 
 import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, GenericMutableRow}
-import org.apache.spark.sql.execution.columnar.BOOLEAN
-import org.apache.spark.sql.execution.columnar.INT
-import org.apache.spark.sql.execution.columnar.LONG
-import org.apache.spark.sql.execution.columnar.NativeColumnType
-import org.apache.spark.sql.execution.columnar.SHORT
-import org.apache.spark.sql.execution.columnar.STRING
+import org.apache.spark.sql.execution.columnar.{BOOLEAN, INT, LONG, NativeColumnType, SHORT, STRING}
 import org.apache.spark.sql.types.AtomicType
 import org.apache.spark.util.Benchmark
 import org.apache.spark.util.Utils._
@@ -53,35 +47,70 @@ object CompressionSchemeBenchmark extends AllCompressionSchemes {
     () => rng.sample
   }
 
-  private[this] def runBenchmark[T <: AtomicType](
+  private[this] def prepareEncodeInternal[T <: AtomicType](
+    count: Int,
+    tpe: NativeColumnType[T],
+    supportedScheme: CompressionScheme,
+    input: ByteBuffer): ((ByteBuffer, ByteBuffer) => ByteBuffer, Double, ByteBuffer) = {
+    assert(supportedScheme.supports(tpe))
+
+    def toRow(d: Any) = new GenericInternalRow(Array[Any](d))
+    val encoder = supportedScheme.encoder(tpe)
+    for (i <- 0 until count) {
+      encoder.gatherCompressibilityStats(toRow(tpe.extract(input)), 0)
+    }
+    input.rewind()
+
+    val compressedSize = if (encoder.compressedSize == 0) {
+      input.remaining()
+    } else {
+      encoder.compressedSize
+    }
+
+    (encoder.compress, encoder.compressionRatio, allocateLocal(4 + compressedSize))
+  }
+
+  private[this] def runEncodeBenchmark[T <: AtomicType](
       name: String,
       iters: Int,
       count: Int,
       tpe: NativeColumnType[T],
       input: ByteBuffer): Unit = {
-
     val benchmark = new Benchmark(name, iters * count)
 
     schemes.filter(_.supports(tpe)).map { scheme =>
-      def toRow(d: Any) = new GenericInternalRow(Array[Any](d))
-      val encoder = scheme.encoder(tpe)
-      for (i <- 0 until count) {
-        encoder.gatherCompressibilityStats(toRow(tpe.extract(input)), 0)
-      }
-      input.rewind()
+      val (compressFunc, compressionRatio, buf) = prepareEncodeInternal(count, tpe, scheme, input)
+      val label = s"${getFormattedClassName(scheme)}(${compressionRatio.formatted("%.3f")})"
 
-      val label = s"${getFormattedClassName(scheme)}(${encoder.compressionRatio.formatted("%.3f")})"
       benchmark.addCase(label)({ i: Int =>
-        val compressedSize = if (encoder.compressedSize == 0) {
-          input.remaining()
-        } else {
-          encoder.compressedSize
+        for (n <- 0L until iters) {
+          compressFunc(input, buf)
+          input.rewind()
+          buf.rewind()
         }
+      })
+    }
+
+    benchmark.run()
+  }
 
-        val buf = allocateLocal(4 + compressedSize)
+  private[this] def runDecodeBenchmark[T <: AtomicType](
+      name: String,
+      iters: Int,
+      count: Int,
+      tpe: NativeColumnType[T],
+      input: ByteBuffer): Unit = {
+    val benchmark = new Benchmark(name, iters * count)
+
+    schemes.filter(_.supports(tpe)).map { scheme =>
+      val (compressFunc, _, buf) = prepareEncodeInternal(count, tpe, scheme, input)
+      val compressedBuf = compressFunc(input, buf)
+      val label = s"${getFormattedClassName(scheme)}"
+
+      input.rewind()
+
+      benchmark.addCase(label)({ i: Int =>
         val rowBuf = new GenericMutableRow(1)
-        val compressedBuf = encoder.compress(input, buf)
-        input.rewind()
 
         for (n <- 0L until iters) {
           compressedBuf.rewind.position(4)
@@ -96,16 +125,10 @@ object CompressionSchemeBenchmark extends AllCompressionSchemes {
     benchmark.run()
   }
 
-  def bitDecode(iters: Int): Unit = {
+  def bitEncodingBenchmark(iters: Int): Unit = {
     val count = 65536
     val testData = allocateLocal(count * BOOLEAN.defaultSize)
 
-    // Intel(R) Core(TM) i7-4578U CPU @ 3.00GHz
-    // BOOLEAN Decode:                    Avg Time(ms)    Avg Rate(M/s)  Relative Rate
-    // -------------------------------------------------------------------------------
-    // PassThrough(1.000)                       124.98           536.96         1.00 X
-    // RunLengthEncoding(2.494)                 631.37           106.29         0.20 X
-    // BooleanBitSet(0.125)                    1200.36            55.91         0.10 X
     val g = {
       val rng = genLowerSkewData()
       () => (rng().toInt % 2).toByte
@@ -113,110 +136,176 @@ object CompressionSchemeBenchmark extends AllCompressionSchemes {
     for (i <- 0 until count) {
       testData.put(i * BOOLEAN.defaultSize, g())
     }
-    runBenchmark("BOOLEAN Decode", iters, count, BOOLEAN, testData)
+
+    // Intel(R) Core(TM) i7-4578U CPU @ 3.00GHz
+    // BOOLEAN Encode:                     Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
+    // -------------------------------------------------------------------------------------------
+    // PassThrough(1.000)                          3 /    4      19300.2           0.1       1.0X
+    // RunLengthEncoding(2.491)                  923 /  939         72.7          13.8       0.0X
+    // BooleanBitSet(0.125)                      359 /  363        187.1           5.3       0.0X
+    runEncodeBenchmark("BOOLEAN Encode", iters, count, BOOLEAN, testData)
+
+
+    // Intel(R) Core(TM) i7-4578U CPU @ 3.00GHz
+    // BOOLEAN Decode:                     Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
+    // -------------------------------------------------------------------------------------------
+    // PassThrough                               129 /  136        519.8           1.9       1.0X
+    // RunLengthEncoding                         613 /  623        109.4           9.1       0.2X
+    // BooleanBitSet                            1196 / 1222         56.1          17.8       0.1X
+    runDecodeBenchmark("BOOLEAN Decode", iters, count, BOOLEAN, testData)
   }
 
-  def shortDecode(iters: Int): Unit = {
+  def shortEncodingBenchmark(iters: Int): Unit = {
     val count = 65536
     val testData = allocateLocal(count * SHORT.defaultSize)
 
-    // Intel(R) Core(TM) i7-4578U CPU @ 3.00GHz
-    // SHORT Decode (Lower Skew):         Avg Time(ms)    Avg Rate(M/s)  Relative Rate
-    // -------------------------------------------------------------------------------
-    // PassThrough(1.000)                       376.87           178.07         1.00 X
-    // RunLengthEncoding(1.498)                 831.59            80.70         0.45 X
     val g1 = genLowerSkewData()
     for (i <- 0 until count) {
       testData.putShort(i * SHORT.defaultSize, g1().toShort)
     }
-    runBenchmark("SHORT Decode (Lower Skew)", iters, count, SHORT, testData)
 
     // Intel(R) Core(TM) i7-4578U CPU @ 3.00GHz
-    // SHORT Decode (Higher Skew):        Avg Time(ms)    Avg Rate(M/s)  Relative Rate
-    // -------------------------------------------------------------------------------
-    // PassThrough(1.000)                       426.83           157.23         1.00 X
-    // RunLengthEncoding(1.996)                 845.56            79.37         0.50 X
+    // SHORT Encode (Lower Skew):          Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
+    // -------------------------------------------------------------------------------------------
+    // PassThrough(1.000)                          6 /    7      10971.4           0.1       1.0X
+    // RunLengthEncoding(1.510)                 1526 / 1542         44.0          22.7       0.0X
+    runEncodeBenchmark("SHORT Encode (Lower Skew)", iters, count, SHORT, testData)
+
+    // Intel(R) Core(TM) i7-4578U CPU @ 3.00GHz
+    // SHORT Decode (Lower Skew):          Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
+    // -------------------------------------------------------------------------------------------
+    // PassThrough                               811 /  837         82.8          12.1       1.0X
+    // RunLengthEncoding                        1219 / 1266         55.1          18.2       0.7X
+    runDecodeBenchmark("SHORT Decode (Lower Skew)", iters, count, SHORT, testData)
+
     val g2 = genHigherSkewData()
     for (i <- 0 until count) {
       testData.putShort(i * SHORT.defaultSize, g2().toShort)
     }
-    runBenchmark("SHORT Decode (Higher Skew)", iters, count, SHORT, testData)
+
+    // Intel(R) Core(TM) i7-4578U CPU @ 3.00GHz
+    // SHORT Encode (Higher Skew):         Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
+    // -------------------------------------------------------------------------------------------
+    // PassThrough(1.000)                          7 /    7      10112.4           0.1       1.0X
+    // RunLengthEncoding(2.009)                 1623 / 1661         41.4          24.2       0.0X
+    runEncodeBenchmark("SHORT Encode (Higher Skew)", iters, count, SHORT, testData)
+
+    // Intel(R) Core(TM) i7-4578U CPU @ 3.00GHz
+    // SHORT Decode (Higher Skew):         Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
+    // -------------------------------------------------------------------------------------------
+    // PassThrough                               818 /  827         82.0          12.2       1.0X
+    // RunLengthEncoding                        1202 / 1237         55.8          17.9       0.7X
+    runDecodeBenchmark("SHORT Decode (Higher Skew)", iters, count, SHORT, testData)
   }
 
-  def intDecode(iters: Int): Unit = {
+  def intEncodingBenchmark(iters: Int): Unit = {
     val count = 65536
     val testData = allocateLocal(count * INT.defaultSize)
 
-    // Intel(R) Core(TM) i7-4578U CPU @ 3.00GHz
-    // INT Decode(Lower Skew):            Avg Time(ms)    Avg Rate(M/s)  Relative Rate
-    // -------------------------------------------------------------------------------
-    // PassThrough(1.000)                       325.16           206.39         1.00 X
-    // RunLengthEncoding(0.997)                1219.44            55.03         0.27 X
-    // DictionaryEncoding(0.500)                955.51            70.23         0.34 X
-    // IntDelta(0.250)                         1146.02            58.56         0.28 X
     val g1 = genLowerSkewData()
     for (i <- 0 until count) {
       testData.putInt(i * INT.defaultSize, g1().toInt)
     }
-    runBenchmark("INT Decode(Lower Skew)", iters, count, INT, testData)
 
     // Intel(R) Core(TM) i7-4578U CPU @ 3.00GHz
-    //   INT Decode(Higher Skew):           Avg Time(ms)    Avg Rate(M/s)  Relative Rate
-    // -------------------------------------------------------------------------------
-    // PassThrough(1.000)                      1133.45            59.21         1.00 X
-    // RunLengthEncoding(1.334)                1399.00            47.97         0.81 X
-    // DictionaryEncoding(0.501)               1032.87            64.97         1.10 X
-    // IntDelta(0.250)                          948.02            70.79         1.20 X
+    // INT Encode (Lower Skew):            Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
+    // -------------------------------------------------------------------------------------------
+    // PassThrough(1.000)                         18 /   19       3716.4           0.3       1.0X
+    // RunLengthEncoding(1.001)                 1992 / 2056         33.7          29.7       0.0X
+    // DictionaryEncoding(0.500)                 723 /  739         92.8          10.8       0.0X
+    // IntDelta(0.250)                           368 /  377        182.2           5.5       0.0X
+    runEncodeBenchmark("INT Encode (Lower Skew)", iters, count, INT, testData)
+
+    // Intel(R) Core(TM) i7-4578U CPU @ 3.00GHz
+    // INT Decode (Lower Skew):            Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
+    // -------------------------------------------------------------------------------------------
+    // PassThrough                               821 /  845         81.8          12.2       1.0X
+    // RunLengthEncoding                        1246 / 1256         53.9          18.6       0.7X
+    // DictionaryEncoding                        757 /  766         88.6          11.3       1.1X
+    // IntDelta                                  680 /  689         98.7          10.1       1.2X
+    runDecodeBenchmark("INT Decode (Lower Skew)", iters, count, INT, testData)
+
     val g2 = genHigherSkewData()
     for (i <- 0 until count) {
       testData.putInt(i * INT.defaultSize, g2().toInt)
     }
-    runBenchmark("INT Decode(Higher Skew)", iters, count, INT, testData)
+
+    // Intel(R) Core(TM) i7-4578U CPU @ 3.00GHz
+    // INT Encode (Higher Skew):           Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
+    // -------------------------------------------------------------------------------------------
+    // PassThrough(1.000)                         17 /   19       3888.4           0.3       1.0X
+    // RunLengthEncoding(1.339)                 2127 / 2148         31.5          31.7       0.0X
+    // DictionaryEncoding(0.501)                 960 /  972         69.9          14.3       0.0X
+    // IntDelta(0.250)                           362 /  366        185.5           5.4       0.0X
+    runEncodeBenchmark("INT Encode (Higher Skew)", iters, count, INT, testData)
+
+    // Intel(R) Core(TM) i7-4578U CPU @ 3.00GHz
+    // INT Decode (Higher Skew):           Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
+    // -------------------------------------------------------------------------------------------
+    // PassThrough                               838 /  884         80.1          12.5       1.0X
+    // RunLengthEncoding                        1287 / 1311         52.1          19.2       0.7X
+    // DictionaryEncoding                        844 /  859         79.5          12.6       1.0X
+    // IntDelta                                  764 /  784         87.8          11.4       1.1X
+    runDecodeBenchmark("INT Decode (Higher Skew)", iters, count, INT, testData)
   }
 
-  def longDecode(iters: Int): Unit = {
+  def longEncodingBenchmark(iters: Int): Unit = {
     val count = 65536
     val testData = allocateLocal(count * LONG.defaultSize)
 
-    // Intel(R) Core(TM) i7-4578U CPU @ 3.00GHz
-    // LONG Decode(Lower Skew):           Avg Time(ms)    Avg Rate(M/s)  Relative Rate
-    // -------------------------------------------------------------------------------
-    // PassThrough(1.000)                      1101.07            60.95         1.00 X
-    // RunLengthEncoding(0.756)                1372.57            48.89         0.80 X
-    // DictionaryEncoding(0.250)                947.80            70.81         1.16 X
-    // LongDelta(0.125)                         721.51            93.01         1.53 X
     val g1 = genLowerSkewData()
     for (i <- 0 until count) {
       testData.putLong(i * LONG.defaultSize, g1().toLong)
     }
-    runBenchmark("LONG Decode(Lower Skew)", iters, count, LONG, testData)
 
     // Intel(R) Core(TM) i7-4578U CPU @ 3.00GHz
-    // LONG Decode(Higher Skew):          Avg Time(ms)    Avg Rate(M/s)  Relative Rate
-    // -------------------------------------------------------------------------------
-    // PassThrough(1.000)                       986.71            68.01         1.00 X
-    // RunLengthEncoding(1.013)                1348.69            49.76         0.73 X
-    // DictionaryEncoding(0.251)                865.48            77.54         1.14 X
-    // LongDelta(0.125)                         816.90            82.15         1.21 X
+    // LONG Encode (Lower Skew):           Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
+    // -------------------------------------------------------------------------------------------
+    // PassThrough(1.000)                         37 /   38       1804.8           0.6       1.0X
+    // RunLengthEncoding(0.748)                 2065 / 2094         32.5          30.8       0.0X
+    // DictionaryEncoding(0.250)                 950 /  962         70.6          14.2       0.0X
+    // LongDelta(0.125)                          475 /  482        141.2           7.1       0.1X
+    runEncodeBenchmark("LONG Encode (Lower Skew)", iters, count, LONG, testData)
+
+    // Intel(R) Core(TM) i7-4578U CPU @ 3.00GHz
+    // LONG Decode (Lower Skew):           Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
+    // -------------------------------------------------------------------------------------------
+    // PassThrough                               888 /  894         75.5          13.2       1.0X
+    // RunLengthEncoding                        1301 / 1311         51.6          19.4       0.7X
+    // DictionaryEncoding                        887 /  904         75.7          13.2       1.0X
+    // LongDelta                                 693 /  735         96.8          10.3       1.3X
+    runDecodeBenchmark("LONG Decode (Lower Skew)", iters, count, LONG, testData)
+
     val g2 = genHigherSkewData()
     for (i <- 0 until count) {
       testData.putLong(i * LONG.defaultSize, g2().toLong)
     }
-    runBenchmark("LONG Decode(Higher Skew)", iters, count, LONG, testData)
+
+    // Intel(R) Core(TM) i7-4578U CPU @ 3.00GHz
+    // LONG Encode (Higher Skew):          Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
+    // -------------------------------------------------------------------------------------------
+    // PassThrough(1.000)                         34 /   35       1963.9           0.5       1.0X
+    // RunLengthEncoding(0.999)                 2260 / 3021         29.7          33.7       0.0X
+    // DictionaryEncoding(0.251)                1270 / 1438         52.8          18.9       0.0X
+    // LongDelta(0.125)                          496 /  509        135.3           7.4       0.1X
+    runEncodeBenchmark("LONG Encode (Higher Skew)", iters, count, LONG, testData)
+
+    // Intel(R) Core(TM) i7-4578U CPU @ 3.00GHz
+    // LONG Decode (Higher Skew):          Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
+    // -------------------------------------------------------------------------------------------
+    // PassThrough                               965 / 1494         69.5          14.4       1.0X
+    // RunLengthEncoding                        1350 / 1378         49.7          20.1       0.7X
+    // DictionaryEncoding                        892 /  924         75.2          13.3       1.1X
+    // LongDelta                                 817 /  847         82.2          12.2       1.2X
+    runDecodeBenchmark("LONG Decode (Higher Skew)", iters, count, LONG, testData)
   }
 
-  def stringDecode(iters: Int): Unit = {
+  def stringEncodingBenchmark(iters: Int): Unit = {
     val count = 65536
     val strLen = 8
     val tableSize = 16
     val testData = allocateLocal(count * (4 + strLen))
 
-    // Intel(R) Core(TM) i7-4578U CPU @ 3.00GHz
-    // STRING Decode:                     Avg Time(ms)    Avg Rate(M/s)  Relative Rate
-    // -------------------------------------------------------------------------------
-    // PassThrough(1.000)                      2277.05            29.47         1.00 X
-    // RunLengthEncoding(0.893)                2624.35            25.57         0.87 X
-    // DictionaryEncoding(0.167)               2672.28            25.11         0.85 X
     val g = {
       val dataTable = (0 until tableSize).map(_ => RandomStringUtils.randomAlphabetic(strLen))
       val rng = genHigherSkewData()
@@ -227,14 +316,29 @@ object CompressionSchemeBenchmark extends AllCompressionSchemes {
       testData.put(g().getBytes)
     }
     testData.rewind()
-    runBenchmark("STRING Decode", iters, count, STRING, testData)
+
+    // Intel(R) Core(TM) i7-4578U CPU @ 3.00GHz
+    // STRING Encode:                      Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
+    // -------------------------------------------------------------------------------------------
+    // PassThrough(1.000)                         56 /   57       1197.9           0.8       1.0X
+    // RunLengthEncoding(0.893)                 4892 / 4937         13.7          72.9       0.0X
+    // DictionaryEncoding(0.167)                2968 / 2992         22.6          44.2       0.0X
+    runEncodeBenchmark("STRING Encode", iters, count, STRING, testData)
+
+    // Intel(R) Core(TM) i7-4578U CPU @ 3.00GHz
+    // STRING Decode:                      Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
+    // -------------------------------------------------------------------------------------------
+    // PassThrough                              2422 / 2449         27.7          36.1       1.0X
+    // RunLengthEncoding                        2885 / 3018         23.3          43.0       0.8X
+    // DictionaryEncoding                       2716 / 2752         24.7          40.5       0.9X
+    runDecodeBenchmark("STRING Decode", iters, count, STRING, testData)
   }
 
   def main(args: Array[String]): Unit = {
-    bitDecode(1024)
-    shortDecode(1024)
-    intDecode(1024)
-    longDecode(1024)
-    stringDecode(1024)
+    bitEncodingBenchmark(1024)
+    shortEncodingBenchmark(1024)
+    intEncodingBenchmark(1024)
+    longEncodingBenchmark(1024)
+    stringEncodingBenchmark(1024)
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org