You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2018/09/10 07:11:23 UTC
spark git commit: [SPARK-24999][SQL] Reduce unnecessary 'new' memory
operations
Repository: spark
Updated Branches:
refs/heads/master f8b4d5aaf -> e7853dc10
[SPARK-24999][SQL] Reduce unnecessary 'new' memory operations
## What changes were proposed in this pull request?
This PR is to solve the CodeGen code generated by fast hash, and there is no need to apply for a block of memory for every new entry, because unsafeRow's memory can be reused.
## How was this patch tested?
the existed test cases.
Closes #21968 from heary-cao/updateNewMemory.
Authored-by: caoxuewen <ca...@zte.com.cn>
Signed-off-by: Wenchen Fan <we...@databricks.com>
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e7853dc1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e7853dc1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e7853dc1
Branch: refs/heads/master
Commit: e7853dc103bf3fd541aa2b498f5f3a223067f812
Parents: f8b4d5a
Author: caoxuewen <ca...@zte.com.cn>
Authored: Mon Sep 10 15:11:14 2018 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Mon Sep 10 15:11:14 2018 +0800
----------------------------------------------------------------------
.../aggregate/RowBasedHashMapGenerator.scala | 30 ++++++++++++--------
1 file changed, 18 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/e7853dc1/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/RowBasedHashMapGenerator.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/RowBasedHashMapGenerator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/RowBasedHashMapGenerator.scala
index 3d2443c..56cf78d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/RowBasedHashMapGenerator.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/RowBasedHashMapGenerator.scala
@@ -48,6 +48,12 @@ class RowBasedHashMapGenerator(
val keySchema = ctx.addReferenceObj("keySchemaTerm", groupingKeySchema)
val valueSchema = ctx.addReferenceObj("valueSchemaTerm", bufferSchema)
+ val numVarLenFields = groupingKeys.map(_.dataType).count {
+ case dt if UnsafeRow.isFixedLength(dt) => false
+ // TODO: consider large decimal and interval type
+ case _ => true
+ }
+
s"""
| private org.apache.spark.sql.catalyst.expressions.RowBasedKeyValueBatch batch;
| private int[] buckets;
@@ -60,6 +66,7 @@ class RowBasedHashMapGenerator(
| private long emptyVOff;
| private int emptyVLen;
| private boolean isBatchFull = false;
+ | private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter agg_rowWriter;
|
|
| public $generatedClassName(
@@ -75,6 +82,9 @@ class RowBasedHashMapGenerator(
| emptyVOff = Platform.BYTE_ARRAY_OFFSET;
| emptyVLen = emptyBuffer.length;
|
+ | agg_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(
+ | ${groupingKeySchema.length}, ${numVarLenFields * 32});
+ |
| buckets = new int[numBuckets];
| java.util.Arrays.fill(buckets, -1);
| }
@@ -112,12 +122,6 @@ class RowBasedHashMapGenerator(
*
*/
protected def generateFindOrInsert(): String = {
- val numVarLenFields = groupingKeys.map(_.dataType).count {
- case dt if UnsafeRow.isFixedLength(dt) => false
- // TODO: consider large decimal and interval type
- case _ => true
- }
-
val createUnsafeRowForKey = groupingKeys.zipWithIndex.map { case (key: Buffer, ordinal: Int) =>
key.dataType match {
case t: DecimalType =>
@@ -130,6 +134,12 @@ class RowBasedHashMapGenerator(
}
}.mkString(";\n")
+ val resetNullBits = if (groupingKeySchema.map(_.nullable).forall(_ == false)) {
+ ""
+ } else {
+ "agg_rowWriter.zeroOutNullBytes();"
+ }
+
s"""
|public org.apache.spark.sql.catalyst.expressions.UnsafeRow findOrInsert(${
groupingKeySignature}) {
@@ -140,12 +150,8 @@ class RowBasedHashMapGenerator(
| // Return bucket index if it's either an empty slot or already contains the key
| if (buckets[idx] == -1) {
| if (numRows < capacity && !isBatchFull) {
- | // creating the unsafe for new entry
- | org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter agg_rowWriter
- | = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(
- | ${groupingKeySchema.length}, ${numVarLenFields * 32});
- | agg_rowWriter.reset(); //TODO: investigate if reset or zeroout are actually needed
- | agg_rowWriter.zeroOutNullBytes();
+ | agg_rowWriter.reset();
+ | $resetNullBits
| ${createUnsafeRowForKey};
| org.apache.spark.sql.catalyst.expressions.UnsafeRow agg_result
| = agg_rowWriter.getRow();
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org