You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by yh...@apache.org on 2016/03/31 20:53:18 UTC

spark git commit: [SPARK-14263][SQL] Benchmark Vectorized HashMap for GroupBy Aggregates

Repository: spark
Updated Branches:
  refs/heads/master 8b207f3b6 -> 8d6207206


[SPARK-14263][SQL] Benchmark Vectorized HashMap for GroupBy Aggregates

## What changes were proposed in this pull request?

This PR proposes a new data-structure based on a vectorized hashmap that can be potentially _codegened_ in `TungstenAggregate` to speed up aggregates with group by. Micro-benchmarks show a 10x improvement over the current `BytesToBytes` aggregation map.

## How was this patch tested?

    Intel(R) Core(TM) i7-4960HQ CPU  2.60GHz
    BytesToBytesMap:                    Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
    -------------------------------------------------------------------------------------------
    hash                                      108 /  119         96.9          10.3       1.0X
    fast hash                                  63 /   70        166.2           6.0       1.7X
    arrayEqual                                 70 /   73        150.8           6.6       1.6X
    Java HashMap (Long)                       141 /  200         74.3          13.5       0.8X
    Java HashMap (two ints)                   145 /  185         72.3          13.8       0.7X
    Java HashMap (UnsafeRow)                  499 /  524         21.0          47.6       0.2X
    BytesToBytesMap (off Heap)                483 /  548         21.7          46.0       0.2X
    BytesToBytesMap (on Heap)                 485 /  562         21.6          46.2       0.2X
    Vectorized Hashmap                         54 /   60        193.7           5.2       2.0X

Author: Sameer Agarwal <sa...@databricks.com>

Closes #12055 from sameeragarwal/vectorized-hashmap.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8d620720
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8d620720
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8d620720

Branch: refs/heads/master
Commit: 8d6207206c9fd71178417c12cdacf368362df4d8
Parents: 8b207f3
Author: Sameer Agarwal <sa...@databricks.com>
Authored: Thu Mar 31 11:53:13 2016 -0700
Committer: Yin Huai <yh...@databricks.com>
Committed: Thu Mar 31 11:53:13 2016 -0700

----------------------------------------------------------------------
 .../execution/vectorized/AggregateHashMap.java  | 107 +++++++++++++++++++
 .../execution/BenchmarkWholeStageCodegen.scala  |  45 ++++++--
 2 files changed, 142 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/8d620720/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/AggregateHashMap.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/AggregateHashMap.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/AggregateHashMap.java
new file mode 100644
index 0000000..abe8db5
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/AggregateHashMap.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.vectorized;
+
+import java.util.Arrays;
+
+import org.apache.spark.memory.MemoryMode;
+import org.apache.spark.sql.types.StructType;
+
+import static org.apache.spark.sql.types.DataTypes.LongType;
+
+/**
+ * This is an illustrative implementation of an append-only single-key/single value aggregate hash
+ * map that can act as a 'cache' for extremely fast key-value lookups while evaluating aggregates
+ * (and fall back to the `BytesToBytesMap` if a given key isn't found). This can be potentially
+ * 'codegened' in TungstenAggregate to speed up aggregates w/ key.
+ *
+ * It is backed by a power-of-2-sized array for index lookups and a columnar batch that stores the
+ * key-value pairs. The index lookups in the array rely on linear probing (with a small number of
+ * maximum tries) and use an inexpensive hash function which makes it really efficient for a
+ * majority of lookups. However, using linear probing and an inexpensive hash function also makes it
+ * less robust as compared to the `BytesToBytesMap` (especially for a large number of keys or even
+ * for certain distribution of keys) and requires us to fall back on the latter for correctness.
+ */
+public class AggregateHashMap {
+  public ColumnarBatch batch;
+  public int[] buckets;
+
+  private int numBuckets;
+  private int numRows = 0;
+  private int maxSteps = 3;
+
+  private static int DEFAULT_CAPACITY = 1 << 16;
+  private static double DEFAULT_LOAD_FACTOR = 0.25;
+  private static int DEFAULT_MAX_STEPS = 3;
+
+  public AggregateHashMap(StructType schema, int capacity, double loadFactor, int maxSteps) {
+
+    // We currently only support single key-value pair that are both longs
+    assert (schema.size() == 2 && schema.fields()[0].dataType() == LongType &&
+        schema.fields()[1].dataType() == LongType);
+
+    // capacity should be a power of 2
+    assert (capacity > 0 && ((capacity & (capacity - 1)) == 0));
+
+    this.maxSteps = maxSteps;
+    numBuckets = (int) (capacity / loadFactor);
+    batch = ColumnarBatch.allocate(schema, MemoryMode.ON_HEAP, capacity);
+    buckets = new int[numBuckets];
+    Arrays.fill(buckets, -1);
+  }
+
+  public AggregateHashMap(StructType schema) {
+    this(schema, DEFAULT_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_MAX_STEPS);
+  }
+
+  public int findOrInsert(long key) {
+    int idx = find(key);
+    if (idx != -1 && buckets[idx] == -1) {
+      batch.column(0).putLong(numRows, key);
+      batch.column(1).putLong(numRows, 0);
+      buckets[idx] = numRows++;
+    }
+    return idx;
+  }
+
+  public int find(long key) {
+    long h = hash(key);
+    int step = 0;
+    int idx = (int) h & (numBuckets - 1);
+    while (step < maxSteps) {
+      // Return bucket index if it's either an empty slot or already contains the key
+      if (buckets[idx] == -1) {
+        return idx;
+      } else if (equals(idx, key)) {
+        return idx;
+      }
+      idx = (idx + 1) & (numBuckets - 1);
+      step++;
+    }
+    // Didn't find it
+    return -1;
+  }
+
+  private long hash(long key) {
+    return key;
+  }
+
+  private boolean equals(int idx, long key1) {
+    return batch.column(0).getLong(buckets[idx]) == key1;
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/8d620720/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala
index a16092e..003d3e0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala
@@ -23,8 +23,9 @@ import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
 import org.apache.spark.memory.{StaticMemoryManager, TaskMemoryManager}
 import org.apache.spark.sql.SQLContext
 import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.execution.vectorized.AggregateHashMap
 import org.apache.spark.sql.functions._
-import org.apache.spark.sql.types.IntegerType
+import org.apache.spark.sql.types.{IntegerType, LongType, StructType}
 import org.apache.spark.unsafe.Platform
 import org.apache.spark.unsafe.hash.Murmur3_x86_32
 import org.apache.spark.unsafe.map.BytesToBytesMap
@@ -463,18 +464,42 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite {
       }
     }
 
+    benchmark.addCase("Aggregate HashMap") { iter =>
+      var i = 0
+      val numKeys = 65536
+      val schema = new StructType()
+        .add("key", LongType)
+        .add("value", LongType)
+      val map = new AggregateHashMap(schema)
+      while (i < numKeys) {
+        val idx = map.findOrInsert(i.toLong)
+        map.batch.column(1).putLong(map.buckets(idx),
+          map.batch.column(1).getLong(map.buckets(idx)) + 1)
+        i += 1
+      }
+      var s = 0
+      i = 0
+      while (i < N) {
+        if (map.find(i % 100000) != -1) {
+          s += 1
+        }
+        i += 1
+      }
+    }
+
     /**
-    Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz
+    Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz
     BytesToBytesMap:                    Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
     -------------------------------------------------------------------------------------------
-    hash                                      651 /  678         80.0          12.5       1.0X
-    fast hash                                 336 /  343        155.9           6.4       1.9X
-    arrayEqual                                417 /  428        125.0           8.0       1.6X
-    Java HashMap (Long)                       145 /  168         72.2          13.8       0.8X
-    Java HashMap (two ints)                   157 /  164         66.8          15.0       0.8X
-    Java HashMap (UnsafeRow)                  538 /  573         19.5          51.3       0.2X
-    BytesToBytesMap (off Heap)               2594 / 2664         20.2          49.5       0.2X
-    BytesToBytesMap (on Heap)                2693 / 2989         19.5          51.4       0.2X
+    hash                                      112 /  116         93.2          10.7       1.0X
+    fast hash                                  65 /   69        160.9           6.2       1.7X
+    arrayEqual                                 66 /   69        159.1           6.3       1.7X
+    Java HashMap (Long)                       137 /  182         76.3          13.1       0.8X
+    Java HashMap (two ints)                   182 /  230         57.8          17.3       0.6X
+    Java HashMap (UnsafeRow)                  511 /  565         20.5          48.8       0.2X
+    BytesToBytesMap (off Heap)                481 /  515         21.8          45.9       0.2X
+    BytesToBytesMap (on Heap)                 529 /  600         19.8          50.5       0.2X
+    Aggregate HashMap                          56 /   62        187.9           5.3       2.0X
       */
     benchmark.run()
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org