You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by da...@apache.org on 2016/03/17 05:46:07 UTC

spark git commit: [SPARK-13873] [SQL] Avoid copy of UnsafeRow when there is no join in whole stage codegen

Repository: spark
Updated Branches:
  refs/heads/master 917f4000b -> c100d31dd


[SPARK-13873] [SQL] Avoid copy of UnsafeRow when there is no join in whole stage codegen

## What changes were proposed in this pull request?

We need to copy the UnsafeRow since a Join could produce multiple rows from single input rows. We could avoid that if there is no join (or the join will not produce multiple rows) inside WholeStageCodegen.

Updated the benchmark for `collect`, we could see 20-30% speedup.

## How was this patch tested?

existing unit tests.

Author: Davies Liu <da...@databricks.com>

Closes #11740 from davies/avoid_copy2.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c100d31d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c100d31d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c100d31d

Branch: refs/heads/master
Commit: c100d31ddc6db9c03b7a65a20a7dd56dcdc18baf
Parents: 917f400
Author: Davies Liu <da...@databricks.com>
Authored: Wed Mar 16 21:46:04 2016 -0700
Committer: Davies Liu <da...@gmail.com>
Committed: Wed Mar 16 21:46:04 2016 -0700

----------------------------------------------------------------------
 .../sql/catalyst/expressions/codegen/CodeGenerator.scala  | 10 ++++++++++
 .../scala/org/apache/spark/sql/execution/Expand.scala     |  1 +
 .../org/apache/spark/sql/execution/GroupedIterator.scala  |  3 ++-
 .../main/scala/org/apache/spark/sql/execution/Sort.scala  |  5 ++++-
 .../apache/spark/sql/execution/WholeStageCodegen.scala    |  9 +++++++--
 .../spark/sql/execution/aggregate/TungstenAggregate.scala |  4 ++++
 .../spark/sql/execution/joins/BroadcastHashJoin.scala     |  2 ++
 .../apache/spark/sql/execution/joins/SortMergeJoin.scala  |  1 +
 .../spark/sql/execution/BenchmarkWholeStageCodegen.scala  |  8 ++++----
 9 files changed, 35 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/c100d31d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
index 3dbe634..dd899d0 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
@@ -78,6 +78,16 @@ class CodegenContext {
   var currentVars: Seq[ExprCode] = null
 
   /**
+   * Whether should we copy the result rows or not.
+   *
+   * If any operator inside WholeStageCodegen generate multiple rows from a single row (for
+   * example, Join), this should be true.
+   *
+   * If an operator starts a new pipeline, this should be reset to false before calling `consume()`.
+   */
+  var copyResult: Boolean = false
+
+  /**
    * Holding expressions' mutable states like `MonotonicallyIncreasingID.count` as a
    * 3-tuple: java type, variable name, code to init it.
    * As an example, ("int", "count", "count = 0;") will produce code:

http://git-wip-us.apache.org/repos/asf/spark/blob/c100d31d/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala
index a84e180..05627ba 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala
@@ -187,6 +187,7 @@ case class Expand(
     val i = ctx.freshName("i")
     // these column have to declared before the loop.
     val evaluate = evaluateVariables(outputColumns)
+    ctx.copyResult = true
     s"""
        |$evaluate
        |for (int $i = 0; $i < ${projections.length}; $i ++) {

http://git-wip-us.apache.org/repos/asf/spark/blob/c100d31d/sql/core/src/main/scala/org/apache/spark/sql/execution/GroupedIterator.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/GroupedIterator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/GroupedIterator.scala
index ef84992..431f021 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/GroupedIterator.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/GroupedIterator.scala
@@ -115,7 +115,8 @@ class GroupedIterator private(
       false
     } else {
       // Skip to next group.
-      while (input.hasNext && keyOrdering.compare(currentGroup, currentRow) == 0) {
+      // currentRow may be overwritten by `hasNext`, so we should compare them first.
+      while (keyOrdering.compare(currentGroup, currentRow) == 0 && input.hasNext) {
         currentRow = input.next()
       }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/c100d31d/sql/core/src/main/scala/org/apache/spark/sql/execution/Sort.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Sort.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Sort.scala
index 5a67cd0..b4dd770 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Sort.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Sort.scala
@@ -111,7 +111,6 @@ case class Sort(
     val needToSort = ctx.freshName("needToSort")
     ctx.addMutableState("boolean", needToSort, s"$needToSort = true;")
 
-
     // Initialize the class member variables. This includes the instance of the Sorter and
     // the iterator to return sorted rows.
     val thisPlan = ctx.addReferenceObj("plan", this)
@@ -132,6 +131,10 @@ case class Sort(
         | }
       """.stripMargin.trim)
 
+    // The child could change `copyResult` to true, but we had already consumed all the rows,
+    // so `copyResult` should be reset to `false`.
+    ctx.copyResult = false
+
     val outputRow = ctx.freshName("outputRow")
     val dataSize = metricTerm(ctx, "dataSize")
     val spillSize = metricTerm(ctx, "spillSize")

http://git-wip-us.apache.org/repos/asf/spark/blob/c100d31d/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala
index a54b772..67aef72 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala
@@ -379,10 +379,15 @@ case class WholeStageCodegen(child: SparkPlan) extends UnaryNode with CodegenSup
       input: Seq[ExprCode],
       row: String = null): String = {
 
+    val doCopy = if (ctx.copyResult) {
+      ".copy()"
+    } else {
+      ""
+    }
     if (row != null) {
       // There is an UnsafeRow already
       s"""
-         |append($row.copy());
+         |append($row$doCopy);
        """.stripMargin.trim
     } else {
       assert(input != null)
@@ -397,7 +402,7 @@ case class WholeStageCodegen(child: SparkPlan) extends UnaryNode with CodegenSup
         s"""
            |$evaluateInputs
            |${code.code.trim}
-           |append(${code.value}.copy());
+           |append(${code.value}$doCopy);
          """.stripMargin.trim
       } else {
         // There is no columns

http://git-wip-us.apache.org/repos/asf/spark/blob/c100d31d/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
index 1c4d594..28945a5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
@@ -465,6 +465,10 @@ case class TungstenAggregate(
     val outputCode = generateResultCode(ctx, keyTerm, bufferTerm, thisPlan)
     val numOutput = metricTerm(ctx, "numOutputRows")
 
+    // The child could change `copyResult` to true, but we had already consumed all the rows,
+    // so `copyResult` should be reset to `false`.
+    ctx.copyResult = false
+
     s"""
      if (!$initAgg) {
        $initAgg = true;

http://git-wip-us.apache.org/repos/asf/spark/blob/c100d31d/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
index f84ed41..aa2da28 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
@@ -230,6 +230,7 @@ case class BroadcastHashJoin(
        """.stripMargin
 
     } else {
+      ctx.copyResult = true
       val matches = ctx.freshName("matches")
       val bufferType = classOf[CompactBuffer[UnsafeRow]].getName
       val i = ctx.freshName("i")
@@ -303,6 +304,7 @@ case class BroadcastHashJoin(
        """.stripMargin
 
     } else {
+      ctx.copyResult = true
       val matches = ctx.freshName("matches")
       val bufferType = classOf[CompactBuffer[UnsafeRow]].getName
       val i = ctx.freshName("i")

http://git-wip-us.apache.org/repos/asf/spark/blob/c100d31d/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala
index d0724ff..807b39a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala
@@ -404,6 +404,7 @@ case class SortMergeJoin(
   }
 
   override def doProduce(ctx: CodegenContext): String = {
+    ctx.copyResult = true
     val leftInput = ctx.freshName("leftInput")
     ctx.addMutableState("scala.collection.Iterator", leftInput, s"$leftInput = inputs[0];")
     val rightInput = ctx.freshName("rightInput")

http://git-wip-us.apache.org/repos/asf/spark/blob/c100d31d/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala
index cb67264..b6051b0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala
@@ -457,12 +457,12 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite {
     benchmark.run()
 
     /**
-     * Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz
+    Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz
     collect:                            Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
     -------------------------------------------------------------------------------------------
-    collect 1 million                          775 / 1170          1.4         738.9       1.0X
-    collect 2 millions                        1153 / 1758          0.9        1099.3       0.7X
-    collect 4 millions                        4451 / 5124          0.2        4244.9       0.2X
+    collect 1 million                         439 /  654          2.4         418.7       1.0X
+    collect 2 millions                        961 / 1907          1.1         916.4       0.5X
+    collect 4 millions                       3193 / 3895          0.3        3044.7       0.1X
      */
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org