You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2017/11/22 18:05:49 UTC

spark git commit: [SPARK-22543][SQL] fix java 64kb compile error for deeply nested expressions

Repository: spark
Updated Branches:
  refs/heads/master 327d25fe1 -> 0605ad761


[SPARK-22543][SQL] fix java 64kb compile error for deeply nested expressions

## What changes were proposed in this pull request?

A frequently reported issue of Spark is the Java 64kb compile error. This is because Spark generates a very big method and it's usually caused by 3 reasons:

1. a deep expression tree, e.g. a very complex filter condition
2. many individual expressions, e.g. expressions can have many children, operators can have many expressions.
3. a deep query plan tree (with whole stage codegen)

This PR focuses on 1. There are already several patches(#15620  #18972 #18641) trying to fix this issue and some of them are already merged. However this is an endless job as every non-leaf expression has this issue.

This PR proposes to fix this issue in `Expression.genCode`, to make sure the code for a single expression won't grow too big.

According to maropu 's benchmark, no regression is found with TPCDS (thanks maropu !): https://docs.google.com/spreadsheets/d/1K3_7lX05-ZgxDXi9X_GleNnDjcnJIfoSlSCDZcL4gdg/edit?usp=sharing

## How was this patch tested?

existing test

Author: Wenchen Fan <we...@databricks.com>
Author: Wenchen Fan <cl...@gmail.com>

Closes #19767 from cloud-fan/codegen.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0605ad76
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0605ad76
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0605ad76

Branch: refs/heads/master
Commit: 0605ad761438b202ab077a6af342f48cab2825d8
Parents: 327d25f
Author: Wenchen Fan <we...@databricks.com>
Authored: Wed Nov 22 10:05:46 2017 -0800
Committer: gatorsmile <ga...@gmail.com>
Committed: Wed Nov 22 10:05:46 2017 -0800

----------------------------------------------------------------------
 .../sql/catalyst/expressions/Expression.scala   | 40 +++++++++-
 .../expressions/codegen/CodeGenerator.scala     | 33 +-------
 .../expressions/conditionalExpressions.scala    | 60 ++++----------
 .../catalyst/expressions/namedExpressions.scala |  4 +-
 .../sql/catalyst/expressions/predicates.scala   | 82 +-------------------
 .../expressions/CodeGenerationSuite.scala       |  4 +-
 .../execution/aggregate/HashAggregateExec.scala |  2 +
 7 files changed, 62 insertions(+), 163 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/0605ad76/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
index a3b722a..743782a 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
@@ -104,16 +104,48 @@ abstract class Expression extends TreeNode[Expression] {
     }.getOrElse {
       val isNull = ctx.freshName("isNull")
       val value = ctx.freshName("value")
-      val ve = doGenCode(ctx, ExprCode("", isNull, value))
-      if (ve.code.nonEmpty) {
+      val eval = doGenCode(ctx, ExprCode("", isNull, value))
+      reduceCodeSize(ctx, eval)
+      if (eval.code.nonEmpty) {
         // Add `this` in the comment.
-        ve.copy(code = s"${ctx.registerComment(this.toString)}\n" + ve.code.trim)
+        eval.copy(code = s"${ctx.registerComment(this.toString)}\n" + eval.code.trim)
       } else {
-        ve
+        eval
       }
     }
   }
 
+  private def reduceCodeSize(ctx: CodegenContext, eval: ExprCode): Unit = {
+    // TODO: support whole stage codegen too
+    if (eval.code.trim.length > 1024 && ctx.INPUT_ROW != null && ctx.currentVars == null) {
+      val setIsNull = if (eval.isNull != "false" && eval.isNull != "true") {
+        val globalIsNull = ctx.freshName("globalIsNull")
+        ctx.addMutableState(ctx.JAVA_BOOLEAN, globalIsNull)
+        val localIsNull = eval.isNull
+        eval.isNull = globalIsNull
+        s"$globalIsNull = $localIsNull;"
+      } else {
+        ""
+      }
+
+      val javaType = ctx.javaType(dataType)
+      val newValue = ctx.freshName("value")
+
+      val funcName = ctx.freshName(nodeName)
+      val funcFullName = ctx.addNewFunction(funcName,
+        s"""
+           |private $javaType $funcName(InternalRow ${ctx.INPUT_ROW}) {
+           |  ${eval.code.trim}
+           |  $setIsNull
+           |  return ${eval.value};
+           |}
+           """.stripMargin)
+
+      eval.value = newValue
+      eval.code = s"$javaType $newValue = $funcFullName(${ctx.INPUT_ROW});"
+    }
+  }
+
   /**
    * Returns Java source code that can be compiled to evaluate this expression.
    * The default behavior is to call the eval method of the expression. Concrete expression

http://git-wip-us.apache.org/repos/asf/spark/blob/0605ad76/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
index 7861719..9df8a8d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
@@ -931,36 +931,6 @@ class CodegenContext {
   }
 
   /**
-   * Wrap the generated code of expression, which was created from a row object in INPUT_ROW,
-   * by a function. ev.isNull and ev.value are passed by global variables
-   *
-   * @param ev the code to evaluate expressions.
-   * @param dataType the data type of ev.value.
-   * @param baseFuncName the split function name base.
-   */
-  def createAndAddFunction(
-      ev: ExprCode,
-      dataType: DataType,
-      baseFuncName: String): (String, String, String) = {
-    val globalIsNull = freshName("isNull")
-    addMutableState(JAVA_BOOLEAN, globalIsNull, s"$globalIsNull = false;")
-    val globalValue = freshName("value")
-    addMutableState(javaType(dataType), globalValue,
-      s"$globalValue = ${defaultValue(dataType)};")
-    val funcName = freshName(baseFuncName)
-    val funcBody =
-      s"""
-         |private void $funcName(InternalRow ${INPUT_ROW}) {
-         |  ${ev.code.trim}
-         |  $globalIsNull = ${ev.isNull};
-         |  $globalValue = ${ev.value};
-         |}
-         """.stripMargin
-    val fullFuncName = addNewFunction(funcName, funcBody)
-    (fullFuncName, globalIsNull, globalValue)
-  }
-
-  /**
    * Perform a function which generates a sequence of ExprCodes with a given mapping between
    * expressions and common expressions, instead of using the mapping in current context.
    */
@@ -1065,7 +1035,8 @@ class CodegenContext {
    * elimination will be performed. Subexpression elimination assumes that the code for each
    * expression will be combined in the `expressions` order.
    */
-  def generateExpressions(expressions: Seq[Expression],
+  def generateExpressions(
+      expressions: Seq[Expression],
       doSubexpressionElimination: Boolean = false): Seq[ExprCode] = {
     if (doSubexpressionElimination) subexpressionElimination(expressions)
     expressions.map(e => e.genCode(this))

http://git-wip-us.apache.org/repos/asf/spark/blob/0605ad76/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala
index c41a10c..6195be3 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala
@@ -64,52 +64,22 @@ case class If(predicate: Expression, trueValue: Expression, falseValue: Expressi
     val trueEval = trueValue.genCode(ctx)
     val falseEval = falseValue.genCode(ctx)
 
-    // place generated code of condition, true value and false value in separate methods if
-    // their code combined is large
-    val combinedLength = condEval.code.length + trueEval.code.length + falseEval.code.length
-    val generatedCode = if (combinedLength > 1024 &&
-      // Split these expressions only if they are created from a row object
-      (ctx.INPUT_ROW != null && ctx.currentVars == null)) {
-
-      val (condFuncName, condGlobalIsNull, condGlobalValue) =
-        ctx.createAndAddFunction(condEval, predicate.dataType, "evalIfCondExpr")
-      val (trueFuncName, trueGlobalIsNull, trueGlobalValue) =
-        ctx.createAndAddFunction(trueEval, trueValue.dataType, "evalIfTrueExpr")
-      val (falseFuncName, falseGlobalIsNull, falseGlobalValue) =
-        ctx.createAndAddFunction(falseEval, falseValue.dataType, "evalIfFalseExpr")
+    val code =
       s"""
-        $condFuncName(${ctx.INPUT_ROW});
-        boolean ${ev.isNull} = false;
-        ${ctx.javaType(dataType)} ${ev.value} = ${ctx.defaultValue(dataType)};
-        if (!$condGlobalIsNull && $condGlobalValue) {
-          $trueFuncName(${ctx.INPUT_ROW});
-          ${ev.isNull} = $trueGlobalIsNull;
-          ${ev.value} = $trueGlobalValue;
-        } else {
-          $falseFuncName(${ctx.INPUT_ROW});
-          ${ev.isNull} = $falseGlobalIsNull;
-          ${ev.value} = $falseGlobalValue;
-        }
-      """
-    }
-    else {
-      s"""
-        ${condEval.code}
-        boolean ${ev.isNull} = false;
-        ${ctx.javaType(dataType)} ${ev.value} = ${ctx.defaultValue(dataType)};
-        if (!${condEval.isNull} && ${condEval.value}) {
-          ${trueEval.code}
-          ${ev.isNull} = ${trueEval.isNull};
-          ${ev.value} = ${trueEval.value};
-        } else {
-          ${falseEval.code}
-          ${ev.isNull} = ${falseEval.isNull};
-          ${ev.value} = ${falseEval.value};
-        }
-      """
-    }
-
-    ev.copy(code = generatedCode)
+         |${condEval.code}
+         |boolean ${ev.isNull} = false;
+         |${ctx.javaType(dataType)} ${ev.value} = ${ctx.defaultValue(dataType)};
+         |if (!${condEval.isNull} && ${condEval.value}) {
+         |  ${trueEval.code}
+         |  ${ev.isNull} = ${trueEval.isNull};
+         |  ${ev.value} = ${trueEval.value};
+         |} else {
+         |  ${falseEval.code}
+         |  ${ev.isNull} = ${falseEval.isNull};
+         |  ${ev.value} = ${falseEval.value};
+         |}
+       """.stripMargin
+    ev.copy(code = code)
   }
 
   override def toString: String = s"if ($predicate) $trueValue else $falseValue"

http://git-wip-us.apache.org/repos/asf/spark/blob/0605ad76/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
index e518e73..8df8704 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
@@ -140,7 +140,9 @@ case class Alias(child: Expression, name: String)(
 
   /** Just a simple passthrough for code generation. */
   override def genCode(ctx: CodegenContext): ExprCode = child.genCode(ctx)
-  override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = ev.copy("")
+  override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
+    throw new IllegalStateException("Alias.doGenCode should not be called.")
+  }
 
   override def dataType: DataType = child.dataType
   override def nullable: Boolean = child.nullable

http://git-wip-us.apache.org/repos/asf/spark/blob/0605ad76/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
index c0084af..eb74753 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
@@ -378,46 +378,7 @@ case class And(left: Expression, right: Expression) extends BinaryOperator with
     val eval2 = right.genCode(ctx)
 
     // The result should be `false`, if any of them is `false` whenever the other is null or not.
-
-    // place generated code of eval1 and eval2 in separate methods if their code combined is large
-    val combinedLength = eval1.code.length + eval2.code.length
-    if (combinedLength > 1024 &&
-      // Split these expressions only if they are created from a row object
-      (ctx.INPUT_ROW != null && ctx.currentVars == null)) {
-
-      val (eval1FuncName, eval1GlobalIsNull, eval1GlobalValue) =
-        ctx.createAndAddFunction(eval1, BooleanType, "eval1Expr")
-      val (eval2FuncName, eval2GlobalIsNull, eval2GlobalValue) =
-        ctx.createAndAddFunction(eval2, BooleanType, "eval2Expr")
-      if (!left.nullable && !right.nullable) {
-        val generatedCode = s"""
-         $eval1FuncName(${ctx.INPUT_ROW});
-         boolean ${ev.value} = false;
-         if (${eval1GlobalValue}) {
-           $eval2FuncName(${ctx.INPUT_ROW});
-           ${ev.value} = ${eval2GlobalValue};
-         }
-       """
-        ev.copy(code = generatedCode, isNull = "false")
-      } else {
-        val generatedCode = s"""
-         $eval1FuncName(${ctx.INPUT_ROW});
-         boolean ${ev.isNull} = false;
-         boolean ${ev.value} = false;
-         if (!${eval1GlobalIsNull} && !${eval1GlobalValue}) {
-         } else {
-           $eval2FuncName(${ctx.INPUT_ROW});
-           if (!${eval2GlobalIsNull} && !${eval2GlobalValue}) {
-           } else if (!${eval1GlobalIsNull} && !${eval2GlobalIsNull}) {
-             ${ev.value} = true;
-           } else {
-             ${ev.isNull} = true;
-           }
-         }
-       """
-        ev.copy(code = generatedCode)
-      }
-    } else if (!left.nullable && !right.nullable) {
+    if (!left.nullable && !right.nullable) {
       ev.copy(code = s"""
         ${eval1.code}
         boolean ${ev.value} = false;
@@ -480,46 +441,7 @@ case class Or(left: Expression, right: Expression) extends BinaryOperator with P
     val eval2 = right.genCode(ctx)
 
     // The result should be `true`, if any of them is `true` whenever the other is null or not.
-
-    // place generated code of eval1 and eval2 in separate methods if their code combined is large
-    val combinedLength = eval1.code.length + eval2.code.length
-    if (combinedLength > 1024 &&
-      // Split these expressions only if they are created from a row object
-      (ctx.INPUT_ROW != null && ctx.currentVars == null)) {
-
-      val (eval1FuncName, eval1GlobalIsNull, eval1GlobalValue) =
-        ctx.createAndAddFunction(eval1, BooleanType, "eval1Expr")
-      val (eval2FuncName, eval2GlobalIsNull, eval2GlobalValue) =
-        ctx.createAndAddFunction(eval2, BooleanType, "eval2Expr")
-      if (!left.nullable && !right.nullable) {
-        val generatedCode = s"""
-         $eval1FuncName(${ctx.INPUT_ROW});
-         boolean ${ev.value} = true;
-         if (!${eval1GlobalValue}) {
-           $eval2FuncName(${ctx.INPUT_ROW});
-           ${ev.value} = ${eval2GlobalValue};
-         }
-       """
-        ev.copy(code = generatedCode, isNull = "false")
-      } else {
-        val generatedCode = s"""
-         $eval1FuncName(${ctx.INPUT_ROW});
-         boolean ${ev.isNull} = false;
-         boolean ${ev.value} = true;
-         if (!${eval1GlobalIsNull} && ${eval1GlobalValue}) {
-         } else {
-           $eval2FuncName(${ctx.INPUT_ROW});
-           if (!${eval2GlobalIsNull} && ${eval2GlobalValue}) {
-           } else if (!${eval1GlobalIsNull} && !${eval2GlobalIsNull}) {
-             ${ev.value} = false;
-           } else {
-             ${ev.isNull} = true;
-           }
-         }
-       """
-        ev.copy(code = generatedCode)
-      }
-    } else if (!left.nullable && !right.nullable) {
+    if (!left.nullable && !right.nullable) {
       ev.isNull = "false"
       ev.copy(code = s"""
         ${eval1.code}

http://git-wip-us.apache.org/repos/asf/spark/blob/0605ad76/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala
index 8f6289f..6e33087 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala
@@ -97,7 +97,7 @@ class CodeGenerationSuite extends SparkFunSuite with ExpressionEvalHelper {
     assert(actual(0) == cases)
   }
 
-  test("SPARK-18091: split large if expressions into blocks due to JVM code size limit") {
+  test("SPARK-22543: split large if expressions into blocks due to JVM code size limit") {
     var strExpr: Expression = Literal("abc")
     for (_ <- 1 to 150) {
       strExpr = Decode(Encode(strExpr, "utf-8"), "utf-8")
@@ -342,7 +342,7 @@ class CodeGenerationSuite extends SparkFunSuite with ExpressionEvalHelper {
     projection(row)
   }
 
-  test("SPARK-21720: split large predications into blocks due to JVM code size limit") {
+  test("SPARK-22543: split large predicates into blocks due to JVM code size limit") {
     val length = 600
 
     val input = new GenericInternalRow(length)

http://git-wip-us.apache.org/repos/asf/spark/blob/0605ad76/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
index 19c793e..dc8aecf 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
@@ -179,6 +179,8 @@ case class HashAggregateExec(
   private def doProduceWithoutKeys(ctx: CodegenContext): String = {
     val initAgg = ctx.freshName("initAgg")
     ctx.addMutableState(ctx.JAVA_BOOLEAN, initAgg, s"$initAgg = false;")
+    // The generated function doesn't have input row in the code context.
+    ctx.INPUT_ROW = null
 
     // generate variables for aggregation buffer
     val functions = aggregateExpressions.map(_.aggregateFunction.asInstanceOf[DeclarativeAggregate])


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org