You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2019/02/19 05:07:15 UTC
[spark] branch master updated: [SPARK-26909][FOLLOWUP][SQL] use
unsafeRow.hashCode() as hash value in HashAggregate
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 743b73d [SPARK-26909][FOLLOWUP][SQL] use unsafeRow.hashCode() as hash value in HashAggregate
743b73d is described below
commit 743b73daf7fbbb6cd0f763955ed331ac3889ba6f
Author: yucai <yy...@ebay.com>
AuthorDate: Tue Feb 19 13:01:10 2019 +0800
[SPARK-26909][FOLLOWUP][SQL] use unsafeRow.hashCode() as hash value in HashAggregate
## What changes were proposed in this pull request?
This is a followup PR for #21149.
New way uses unsafeRow.hashCode() as hash value in HashAggregate.
The unsafe row has [null bit set] etc., so the hash should be different from shuffle hash, and then we don't need a special seed.
## How was this patch tested?
UTs.
Closes #23821 from yucai/unsafe_hash.
Authored-by: yucai <yy...@ebay.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../spark/sql/execution/aggregate/HashAggregateExec.scala | 14 ++++----------
1 file changed, 4 insertions(+), 10 deletions(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
index 17cc7fd..23ae1f0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
@@ -742,6 +742,7 @@ case class HashAggregateExec(
val fastRowKeys = ctx.generateExpressions(
bindReferences[Expression](groupingExpressions, child.output))
val unsafeRowKeys = unsafeRowKeyCode.value
+ val unsafeRowKeyHash = ctx.freshName("unsafeRowKeyHash")
val unsafeRowBuffer = ctx.freshName("unsafeRowAggBuffer")
val fastRowBuffer = ctx.freshName("fastAggBuffer")
@@ -755,13 +756,6 @@ case class HashAggregateExec(
}
}
- // generate hash code for key
- // SPARK-24076: HashAggregate uses the same hash algorithm on the same expressions
- // as ShuffleExchange, it may lead to bad hash conflict when shuffle.partitions=8192*n,
- // pick a different seed to avoid this conflict
- val hashExpr = Murmur3Hash(groupingExpressions, 48)
- val hashEval = BindReferences.bindReference(hashExpr, child.output).genCode(ctx)
-
val (checkFallbackForGeneratedHashMap, checkFallbackForBytesToBytesMap, resetCounter,
incCounter) = if (testFallbackStartsAt.isDefined) {
val countTerm = ctx.addMutableState(CodeGenerator.JAVA_INT, "fallbackCounter")
@@ -777,11 +771,11 @@ case class HashAggregateExec(
s"""
|// generate grouping key
|${unsafeRowKeyCode.code}
- |${hashEval.code}
+ |int $unsafeRowKeyHash = ${unsafeRowKeyCode.value}.hashCode();
|if ($checkFallbackForBytesToBytesMap) {
| // try to get the buffer from hash map
| $unsafeRowBuffer =
- | $hashMapTerm.getAggregationBufferFromUnsafeRow($unsafeRowKeys, ${hashEval.value});
+ | $hashMapTerm.getAggregationBufferFromUnsafeRow($unsafeRowKeys, $unsafeRowKeyHash);
|}
|// Can't allocate buffer from the hash map. Spill the map and fallback to sort-based
|// aggregation after processing all input rows.
@@ -795,7 +789,7 @@ case class HashAggregateExec(
| // the hash map had be spilled, it should have enough memory now,
| // try to allocate buffer again.
| $unsafeRowBuffer = $hashMapTerm.getAggregationBufferFromUnsafeRow(
- | $unsafeRowKeys, ${hashEval.value});
+ | $unsafeRowKeys, $unsafeRowKeyHash);
| if ($unsafeRowBuffer == null) {
| // failed to allocate the first page
| throw new $oomeClassName("No enough memory for aggregation");
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org