You are viewing a plain text version of this content. The canonical link for it is here.
Posted to by on 2016/04/16 00:55:39 UTC

spark git commit: [SPARK-14620][SQL] Use/benchmark a better hash in VectorizedHashMap

Repository: spark
Updated Branches:
  refs/heads/master 8028a2888 -> 4df65184b

[SPARK-14620][SQL] Use/benchmark a better hash in VectorizedHashMap

## What changes were proposed in this pull request?

This PR uses a better hashing algorithm while probing the AggregateHashMap:

long h = 0
h = (h ^ (0x9e3779b9)) + key_1 + (h << 6) + (h >>> 2);
h = (h ^ (0x9e3779b9)) + key_2 + (h << 6) + (h >>> 2);
h = (h ^ (0x9e3779b9)) + key_3 + (h << 6) + (h >>> 2);
h = (h ^ (0x9e3779b9)) + key_n + (h << 6) + (h >>> 2);
return h

Depends on:
## How was this patch tested?

    Java HotSpot(TM) 64-Bit Server VM 1.8.0_73-b02 on Mac OS X 10.11.4
    Intel(R) Core(TM) i7-4960HQ CPU  2.60GHz
    Aggregate w keys:                   Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
    codegen = F                              2417 / 2457          8.7         115.2       1.0X
    codegen = T hashmap = F                  1554 / 1581         13.5          74.1       1.6X
    codegen = T hashmap = T                   877 /  929         23.9          41.8       2.8X

Author: Sameer Agarwal <>

Closes #12379 from sameeragarwal/hash.


Branch: refs/heads/master
Commit: 4df65184b6b865a26e4d5c99bbfd3c24ab7179dc
Parents: 8028a28
Author: Sameer Agarwal <>
Authored: Fri Apr 15 15:55:31 2016 -0700
Committer: Yin Huai <>
Committed: Fri Apr 15 15:55:31 2016 -0700

 .../aggregate/VectorizedHashMapGenerator.scala  | 48 ++++++++++----------
 .../execution/BenchmarkWholeStageCodegen.scala  | 46 +++++++++++++++++--
 2 files changed, 66 insertions(+), 28 deletions(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/VectorizedHashMapGenerator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/VectorizedHashMapGenerator.scala
index 395cc7a..dd9b2f0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/VectorizedHashMapGenerator.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/VectorizedHashMapGenerator.scala
@@ -86,28 +86,21 @@ class VectorizedHashMapGenerator(
        |  private org.apache.spark.sql.execution.vectorized.ColumnarBatch batch;
        |  private org.apache.spark.sql.execution.vectorized.ColumnarBatch aggregateBufferBatch;
        |  private int[] buckets;
-       |  private int numBuckets;
-       |  private int maxSteps;
+       |  private int capacity = 1 << 16;
+       |  private double loadFactor = 0.5;
+       |  private int numBuckets = (int) (capacity / loadFactor);
+       |  private int maxSteps = 2;
        |  private int numRows = 0;
        |  private org.apache.spark.sql.types.StructType schema = $generatedSchema
        |  private org.apache.spark.sql.types.StructType aggregateBufferSchema =
        |    $generatedAggBufferSchema
        |  public $generatedClassName() {
-       |    // TODO: These should be generated based on the schema
-       |    int DEFAULT_CAPACITY = 1 << 16;
-       |    double DEFAULT_LOAD_FACTOR = 0.25;
-       |    int DEFAULT_MAX_STEPS = 2;
-       |    assert (DEFAULT_CAPACITY > 0 && ((DEFAULT_CAPACITY & (DEFAULT_CAPACITY - 1)) == 0));
-       |    this.maxSteps = DEFAULT_MAX_STEPS;
-       |    numBuckets = (int) (DEFAULT_CAPACITY / DEFAULT_LOAD_FACTOR);
-       |
        |    batch = org.apache.spark.sql.execution.vectorized.ColumnarBatch.allocate(schema,
-       |      org.apache.spark.memory.MemoryMode.ON_HEAP, DEFAULT_CAPACITY);
-       |
+       |      org.apache.spark.memory.MemoryMode.ON_HEAP, capacity);
        |    // TODO: Possibly generate this projection in TungstenAggregate directly
        |    aggregateBufferBatch = org.apache.spark.sql.execution.vectorized.ColumnarBatch.allocate(
-       |      aggregateBufferSchema, org.apache.spark.memory.MemoryMode.ON_HEAP, DEFAULT_CAPACITY);
+       |      aggregateBufferSchema, org.apache.spark.memory.MemoryMode.ON_HEAP, capacity);
        |    for (int i = 0 ; i < aggregateBufferBatch.numCols(); i++) {
        |       aggregateBufferBatch.setColumn(i, batch.column(i+${groupingKeys.length}));
        |    }
@@ -130,9 +123,11 @@ class VectorizedHashMapGenerator(
   private def generateHashFunction(): String = {
-       |// TODO: Improve this hash function
        |private long hash($groupingKeySignature) {
-       |  return ${" | ")};
+       |  long h = 0;
+       |  ${ => s"h = (h ^ (0x9e3779b9)) + ${key._2} + (h << 6) + (h >>> 2);")
+            .mkString("\n")}
+       |  return h;
@@ -201,15 +196,20 @@ class VectorizedHashMapGenerator(
        |  while (step < maxSteps) {
        |    // Return bucket index if it's either an empty slot or already contains the key
        |    if (buckets[idx] == -1) {
-       |      ${ =>
-                s"batch.column(${k._2}).putLong(numRows, ${k._1._2});").mkString("\n")}
-       |      ${ =>
-                s"batch.column(${groupingKeys.length + k._2}).putNull(numRows);")
-                .mkString("\n")}
-       |      buckets[idx] = numRows++;
-       |      batch.setNumRows(numRows);
-       |      aggregateBufferBatch.setNumRows(numRows);
-       |      return aggregateBufferBatch.getRow(buckets[idx]);
+       |      if (numRows < capacity) {
+       |        ${ =>
+                  s"batch.column(${k._2}).putLong(numRows, ${k._1._2});").mkString("\n")}
+       |        ${ =>
+                  s"batch.column(${groupingKeys.length + k._2}).putNull(numRows);")
+                  .mkString("\n")}
+       |        buckets[idx] = numRows++;
+       |        batch.setNumRows(numRows);
+       |        aggregateBufferBatch.setNumRows(numRows);
+       |        return aggregateBufferBatch.getRow(buckets[idx]);
+       |      } else {
+       |        // No more space
+       |        return null;
+       |      }
        |    } else if (equals(idx, ${", ")})) {
        |      return aggregateBufferBatch.getRow(buckets[idx]);
        |    }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala
index d23f19c..3fb70f2 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala
@@ -150,7 +150,7 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite {
-  ignore("aggregate with keys") {
+  ignore("aggregate with linear keys") {
     val N = 20 << 20
     val benchmark = new Benchmark("Aggregate w keys", N)
@@ -180,9 +180,47 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite {
     Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz
     Aggregate w keys:                   Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
-    codegen = F                              2219 / 2392          9.4         105.8       1.0X
-    codegen = T hashmap = F                  1330 / 1466         15.8          63.4       1.7X
-    codegen = T hashmap = T                   384 /  518         54.7          18.3       5.8X
+    codegen = F                              2067 / 2166         10.1          98.6       1.0X
+    codegen = T hashmap = F                  1149 / 1321         18.3          54.8       1.8X
+    codegen = T hashmap = T                   388 /  475         54.0          18.5       5.3X
+    */
+  }
+  ignore("aggregate with randomized keys") {
+    val N = 20 << 20
+    val benchmark = new Benchmark("Aggregate w keys", N)
+    sqlContext.range(N).selectExpr("id", "floor(rand() * 10000) as k").registerTempTable("test")
+    def f(): Unit = sqlContext.sql("select k, k, sum(id) from test group by k, k").collect()
+    benchmark.addCase(s"codegen = F") { iter =>
+      sqlContext.setConf("spark.sql.codegen.wholeStage", "false")
+      f()
+    }
+    benchmark.addCase(s"codegen = T hashmap = F") { iter =>
+      sqlContext.setConf("spark.sql.codegen.wholeStage", "true")
+      sqlContext.setConf("", "false")
+      f()
+    }
+    benchmark.addCase(s"codegen = T hashmap = T") { iter =>
+      sqlContext.setConf("spark.sql.codegen.wholeStage", "true")
+      sqlContext.setConf("", "true")
+      f()
+    }
+    /*
+    Java HotSpot(TM) 64-Bit Server VM 1.8.0_73-b02 on Mac OS X 10.11.4
+    Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz
+    Aggregate w keys:                   Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
+    -------------------------------------------------------------------------------------------
+    codegen = F                              2517 / 2608          8.3         120.0       1.0X
+    codegen = T hashmap = F                  1484 / 1560         14.1          70.8       1.7X
+    codegen = T hashmap = T                   794 /  908         26.4          37.9       3.2X

To unsubscribe, e-mail:
For additional commands, e-mail: