You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by da...@apache.org on 2016/03/10 19:05:01 UTC

spark git commit: [SPARK-13636] [SQL] Directly consume UnsafeRow in wholestage codegen plans

Repository: spark
Updated Branches:
  refs/heads/master 74267beb3 -> d24801ad2


[SPARK-13636] [SQL] Directly consume UnsafeRow in wholestage codegen plans

JIRA: https://issues.apache.org/jira/browse/SPARK-13636

## What changes were proposed in this pull request?

As shown in the wholestage codegen verion of Sort operator, when Sort is top of Exchange (or other operator that produce UnsafeRow), we will create variables from UnsafeRow, than create another UnsafeRow using these variables. We should avoid the unnecessary unpack and pack variables from UnsafeRows.

## How was this patch tested?

All existing wholestage codegen tests should be passed.

Author: Liang-Chi Hsieh <vi...@gmail.com>

Closes #11484 from viirya/direct-consume-unsaferow.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d24801ad
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d24801ad
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d24801ad

Branch: refs/heads/master
Commit: d24801ad285ac3f2282fe20d1250a010673e2f96
Parents: 74267be
Author: Liang-Chi Hsieh <vi...@gmail.com>
Authored: Thu Mar 10 10:04:56 2016 -0800
Committer: Davies Liu <da...@gmail.com>
Committed: Thu Mar 10 10:04:56 2016 -0800

----------------------------------------------------------------------
 .../org/apache/spark/sql/execution/Expand.scala |  2 +-
 .../org/apache/spark/sql/execution/Sort.scala   | 28 ++++++++++++--------
 .../spark/sql/execution/WholeStageCodegen.scala | 24 +++++++++++++----
 .../execution/aggregate/TungstenAggregate.scala |  2 +-
 .../spark/sql/execution/basicOperators.scala    |  4 +--
 .../spark/sql/execution/debug/package.scala     |  2 +-
 .../sql/execution/joins/BroadcastHashJoin.scala |  2 +-
 .../org/apache/spark/sql/execution/limit.scala  |  2 +-
 8 files changed, 43 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d24801ad/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala
index 524285b..a84e180 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala
@@ -93,7 +93,7 @@ case class Expand(
     child.asInstanceOf[CodegenSupport].produce(ctx, this)
   }
 
-  override def doConsume(ctx: CodegenContext, input: Seq[ExprCode]): String = {
+  override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: String): String = {
     /*
      * When the projections list looks like:
      *   expr1A, exprB, expr1C

http://git-wip-us.apache.org/repos/asf/spark/blob/d24801ad/sql/core/src/main/scala/org/apache/spark/sql/execution/Sort.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Sort.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Sort.scala
index 2ea889e..5a67cd0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Sort.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Sort.scala
@@ -105,6 +105,8 @@ case class Sort(
   // Name of sorter variable used in codegen.
   private var sorterVariable: String = _
 
+  override def preferUnsafeRow: Boolean = true
+
   override protected def doProduce(ctx: CodegenContext): String = {
     val needToSort = ctx.freshName("needToSort")
     ctx.addMutableState("boolean", needToSort, s"$needToSort = true;")
@@ -153,18 +155,22 @@ case class Sort(
      """.stripMargin.trim
   }
 
-  override def doConsume(ctx: CodegenContext, input: Seq[ExprCode]): String = {
-    val colExprs = child.output.zipWithIndex.map { case (attr, i) =>
-      BoundReference(i, attr.dataType, attr.nullable)
-    }
+  override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: String): String = {
+    if (row != null) {
+      s"$sorterVariable.insertRow((UnsafeRow)$row);"
+    } else {
+      val colExprs = child.output.zipWithIndex.map { case (attr, i) =>
+        BoundReference(i, attr.dataType, attr.nullable)
+      }
 
-    ctx.currentVars = input
-    val code = GenerateUnsafeProjection.createCode(ctx, colExprs)
+      ctx.currentVars = input
+      val code = GenerateUnsafeProjection.createCode(ctx, colExprs)
 
-    s"""
-       | // Convert the input attributes to an UnsafeRow and add it to the sorter
-       | ${code.code}
-       | $sorterVariable.insertRow(${code.value});
-     """.stripMargin.trim
+      s"""
+         | // Convert the input attributes to an UnsafeRow and add it to the sorter
+         | ${code.code}
+         | $sorterVariable.insertRow(${code.value});
+       """.stripMargin.trim
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/d24801ad/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala
index dd831e6..e8e42d7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala
@@ -65,7 +65,12 @@ trait CodegenSupport extends SparkPlan {
   /**
     * Which SparkPlan is calling produce() of this one. It's itself for the first SparkPlan.
     */
-  private var parent: CodegenSupport = null
+  protected var parent: CodegenSupport = null
+
+  /**
+    * Whether this SparkPlan prefers to accept UnsafeRow as input in doConsume.
+    */
+  def preferUnsafeRow: Boolean = false
 
   /**
     * Returns all the RDDs of InternalRow which generates the input rows.
@@ -176,11 +181,20 @@ trait CodegenSupport extends SparkPlan {
       } else {
         input
       }
+
+    val evaluated =
+      if (row != null && preferUnsafeRow) {
+        // Current plan can consume UnsafeRows directly.
+        ""
+      } else {
+        evaluateRequiredVariables(child.output, inputVars, usedInputs)
+      }
+
     s"""
        |
        |/*** CONSUME: ${toCommentSafeString(this.simpleString)} */
-       |${evaluateRequiredVariables(child.output, inputVars, usedInputs)}
-       |${doConsume(ctx, inputVars)}
+       |${evaluated}
+       |${doConsume(ctx, inputVars, row)}
      """.stripMargin
   }
 
@@ -195,7 +209,7 @@ trait CodegenSupport extends SparkPlan {
     *   if (isNull1 || !value2) continue;
     *   # call consume(), which will call parent.doConsume()
     */
-  protected def doConsume(ctx: CodegenContext, input: Seq[ExprCode]): String = {
+  protected def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: String): String = {
     throw new UnsupportedOperationException
   }
 }
@@ -238,7 +252,7 @@ case class InputAdapter(child: SparkPlan) extends UnaryNode with CodegenSupport
     s"""
        | while (!shouldStop() && $input.hasNext()) {
        |   InternalRow $row = (InternalRow) $input.next();
-       |   ${consume(ctx, columns).trim}
+       |   ${consume(ctx, columns, row).trim}
        | }
      """.stripMargin
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/d24801ad/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
index f856634..1c4d594 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
@@ -139,7 +139,7 @@ case class TungstenAggregate(
     }
   }
 
-  override def doConsume(ctx: CodegenContext, input: Seq[ExprCode]): String = {
+  override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: String): String = {
     if (groupingExpressions.isEmpty) {
       doConsumeWithoutKeys(ctx, input)
     } else {

http://git-wip-us.apache.org/repos/asf/spark/blob/d24801ad/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
index 4901298..6ebbc8b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
@@ -49,7 +49,7 @@ case class Project(projectList: Seq[NamedExpression], child: SparkPlan)
     references.filter(a => usedMoreThanOnce.contains(a.exprId))
   }
 
-  override def doConsume(ctx: CodegenContext, input: Seq[ExprCode]): String = {
+  override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: String): String = {
     val exprs = projectList.map(x =>
       ExpressionCanonicalizer.execute(BindReferences.bindReference(x, child.output)))
     ctx.currentVars = input
@@ -88,7 +88,7 @@ case class Filter(condition: Expression, child: SparkPlan) extends UnaryNode wit
     child.asInstanceOf[CodegenSupport].produce(ctx, this)
   }
 
-  override def doConsume(ctx: CodegenContext, input: Seq[ExprCode]): String = {
+  override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: String): String = {
     val numOutput = metricTerm(ctx, "numOutputRows")
     val expr = ExpressionCanonicalizer.execute(
       BindReferences.bindReference(condition, child.output))

http://git-wip-us.apache.org/repos/asf/spark/blob/d24801ad/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
index fed88b8..034bf15 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
@@ -136,7 +136,7 @@ package object debug {
       child.asInstanceOf[CodegenSupport].produce(ctx, this)
     }
 
-    override def doConsume(ctx: CodegenContext, input: Seq[ExprCode]): String = {
+    override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: String): String = {
       consume(ctx, input)
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/d24801ad/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
index c52662a..4c8f808 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
@@ -107,7 +107,7 @@ case class BroadcastHashJoin(
     streamedPlan.asInstanceOf[CodegenSupport].produce(ctx, this)
   }
 
-  override def doConsume(ctx: CodegenContext, input: Seq[ExprCode]): String = {
+  override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: String): String = {
     if (joinType == Inner) {
       codegenInner(ctx, input)
     } else {

http://git-wip-us.apache.org/repos/asf/spark/blob/d24801ad/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala
index 5a7516b..ca624a5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala
@@ -65,7 +65,7 @@ trait BaseLimit extends UnaryNode with CodegenSupport {
     child.asInstanceOf[CodegenSupport].produce(ctx, this)
   }
 
-  override def doConsume(ctx: CodegenContext, input: Seq[ExprCode]): String = {
+  override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: String): String = {
     val stopEarly = ctx.freshName("stopEarly")
     ctx.addMutableState("boolean", stopEarly, s"$stopEarly = false;")
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org