You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2018/09/10 07:11:23 UTC

spark git commit: [SPARK-24999][SQL] Reduce unnecessary 'new' memory operations

Repository: spark
Updated Branches:
  refs/heads/master f8b4d5aaf -> e7853dc10


[SPARK-24999][SQL] Reduce unnecessary 'new' memory operations

## What changes were proposed in this pull request?

This PR is to solve the CodeGen code generated by fast hash, and there is no need to apply for a block of memory for every new entry, because unsafeRow's memory can be reused.

## How was this patch tested?

the existed test cases.

Closes #21968 from heary-cao/updateNewMemory.

Authored-by: caoxuewen <ca...@zte.com.cn>
Signed-off-by: Wenchen Fan <we...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e7853dc1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e7853dc1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e7853dc1

Branch: refs/heads/master
Commit: e7853dc103bf3fd541aa2b498f5f3a223067f812
Parents: f8b4d5a
Author: caoxuewen <ca...@zte.com.cn>
Authored: Mon Sep 10 15:11:14 2018 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Mon Sep 10 15:11:14 2018 +0800

----------------------------------------------------------------------
 .../aggregate/RowBasedHashMapGenerator.scala    | 30 ++++++++++++--------
 1 file changed, 18 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e7853dc1/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/RowBasedHashMapGenerator.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/RowBasedHashMapGenerator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/RowBasedHashMapGenerator.scala
index 3d2443c..56cf78d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/RowBasedHashMapGenerator.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/RowBasedHashMapGenerator.scala
@@ -48,6 +48,12 @@ class RowBasedHashMapGenerator(
     val keySchema = ctx.addReferenceObj("keySchemaTerm", groupingKeySchema)
     val valueSchema = ctx.addReferenceObj("valueSchemaTerm", bufferSchema)
 
+    val numVarLenFields = groupingKeys.map(_.dataType).count {
+      case dt if UnsafeRow.isFixedLength(dt) => false
+      // TODO: consider large decimal and interval type
+      case _ => true
+    }
+
     s"""
        |  private org.apache.spark.sql.catalyst.expressions.RowBasedKeyValueBatch batch;
        |  private int[] buckets;
@@ -60,6 +66,7 @@ class RowBasedHashMapGenerator(
        |  private long emptyVOff;
        |  private int emptyVLen;
        |  private boolean isBatchFull = false;
+       |  private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter agg_rowWriter;
        |
        |
        |  public $generatedClassName(
@@ -75,6 +82,9 @@ class RowBasedHashMapGenerator(
        |    emptyVOff = Platform.BYTE_ARRAY_OFFSET;
        |    emptyVLen = emptyBuffer.length;
        |
+       |    agg_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(
+       |      ${groupingKeySchema.length}, ${numVarLenFields * 32});
+       |
        |    buckets = new int[numBuckets];
        |    java.util.Arrays.fill(buckets, -1);
        |  }
@@ -112,12 +122,6 @@ class RowBasedHashMapGenerator(
    *
    */
    protected def generateFindOrInsert(): String = {
-    val numVarLenFields = groupingKeys.map(_.dataType).count {
-      case dt if UnsafeRow.isFixedLength(dt) => false
-      // TODO: consider large decimal and interval type
-      case _ => true
-    }
-
     val createUnsafeRowForKey = groupingKeys.zipWithIndex.map { case (key: Buffer, ordinal: Int) =>
       key.dataType match {
         case t: DecimalType =>
@@ -130,6 +134,12 @@ class RowBasedHashMapGenerator(
       }
     }.mkString(";\n")
 
+    val resetNullBits = if (groupingKeySchema.map(_.nullable).forall(_ == false)) {
+      ""
+    } else {
+      "agg_rowWriter.zeroOutNullBytes();"
+    }
+
     s"""
        |public org.apache.spark.sql.catalyst.expressions.UnsafeRow findOrInsert(${
             groupingKeySignature}) {
@@ -140,12 +150,8 @@ class RowBasedHashMapGenerator(
        |    // Return bucket index if it's either an empty slot or already contains the key
        |    if (buckets[idx] == -1) {
        |      if (numRows < capacity && !isBatchFull) {
-       |        // creating the unsafe for new entry
-       |        org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter agg_rowWriter
-       |          = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(
-       |              ${groupingKeySchema.length}, ${numVarLenFields * 32});
-       |        agg_rowWriter.reset(); //TODO: investigate if reset or zeroout are actually needed
-       |        agg_rowWriter.zeroOutNullBytes();
+       |        agg_rowWriter.reset();
+       |        $resetNullBits
        |        ${createUnsafeRowForKey};
        |        org.apache.spark.sql.catalyst.expressions.UnsafeRow agg_result
        |          = agg_rowWriter.getRow();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org