You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2017/10/04 17:08:27 UTC

spark git commit: [SPARK-21871][SQL] Check actual bytecode size when compiling generated code

Repository: spark
Updated Branches:
  refs/heads/master 64df08b64 -> 4a779bdac


[SPARK-21871][SQL] Check actual bytecode size when compiling generated code

## What changes were proposed in this pull request?
This pr added code to check actual bytecode size when compiling generated code. In #18810, we added code to give up code compilation and use interpreter execution in `SparkPlan` if the line number of generated functions goes over `maxLinesPerFunction`. But, we already have code to collect metrics for compiled bytecode size in `CodeGenerator` object. So,we could easily reuse the code for this purpose.

## How was this patch tested?
Added tests in `WholeStageCodegenSuite`.

Author: Takeshi Yamamuro <ya...@apache.org>

Closes #19083 from maropu/SPARK-21871.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4a779bda
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4a779bda
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4a779bda

Branch: refs/heads/master
Commit: 4a779bdac3e75c17b7d36c5a009ba6c948fa9fb6
Parents: 64df08b
Author: Takeshi Yamamuro <ya...@apache.org>
Authored: Wed Oct 4 10:08:24 2017 -0700
Committer: gatorsmile <ga...@gmail.com>
Committed: Wed Oct 4 10:08:24 2017 -0700

----------------------------------------------------------------------
 .../expressions/codegen/CodeFormatter.scala     |  8 ---
 .../expressions/codegen/CodeGenerator.scala     | 59 ++++++++++----------
 .../codegen/GenerateMutableProjection.scala     |  4 +-
 .../expressions/codegen/GenerateOrdering.scala  |  3 +-
 .../expressions/codegen/GeneratePredicate.scala |  3 +-
 .../codegen/GenerateSafeProjection.scala        |  4 +-
 .../codegen/GenerateUnsafeProjection.scala      |  4 +-
 .../codegen/GenerateUnsafeRowJoiner.scala       |  4 +-
 .../org/apache/spark/sql/internal/SQLConf.scala | 15 ++---
 .../codegen/CodeFormatterSuite.scala            | 32 -----------
 .../sql/execution/WholeStageCodegenExec.scala   | 25 +++++----
 .../columnar/GenerateColumnAccessor.scala       |  3 +-
 .../sql/execution/WholeStageCodegenSuite.scala  | 43 ++++----------
 .../benchmark/AggregateBenchmark.scala          | 36 ++++++------
 14 files changed, 94 insertions(+), 149 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/4a779bda/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeFormatter.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeFormatter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeFormatter.scala
index 7b398f4..60e600d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeFormatter.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeFormatter.scala
@@ -89,14 +89,6 @@ object CodeFormatter {
     }
     new CodeAndComment(code.result().trim(), map)
   }
-
-  def stripExtraNewLinesAndComments(input: String): String = {
-    val commentReg =
-      ("""([ |\t]*?\/\*[\s|\S]*?\*\/[ |\t]*?)|""" +    // strip /*comment*/
-       """([ |\t]*?\/\/[\s\S]*?\n)""").r               // strip //comment
-    val codeWithoutComment = commentReg.replaceAllIn(input, "")
-    codeWithoutComment.replaceAll("""\n\s*\n""", "\n") // strip ExtraNewLines
-  }
 }
 
 private class CodeFormatter {

http://git-wip-us.apache.org/repos/asf/spark/blob/4a779bda/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
index f3b4579..f9c5ef8 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
@@ -374,20 +374,6 @@ class CodegenContext {
   private val placeHolderToComments = new mutable.HashMap[String, String]
 
   /**
-   * It will count the lines of every Java function generated by whole-stage codegen,
-   * if there is a function of length greater than spark.sql.codegen.maxLinesPerFunction,
-   * it will return true.
-   */
-  def isTooLongGeneratedFunction: Boolean = {
-    classFunctions.values.exists { _.values.exists {
-      code =>
-        val codeWithoutComments = CodeFormatter.stripExtraNewLinesAndComments(code)
-        codeWithoutComments.count(_ == '\n') > SQLConf.get.maxLinesPerFunction
-      }
-    }
-  }
-
-  /**
    * Returns a term name that is unique within this instance of a `CodegenContext`.
    */
   def freshName(name: String): String = synchronized {
@@ -1020,10 +1006,16 @@ abstract class CodeGenerator[InType <: AnyRef, OutType <: AnyRef] extends Loggin
 }
 
 object CodeGenerator extends Logging {
+
+  // This is the value of HugeMethodLimit in the OpenJDK JVM settings
+  val DEFAULT_JVM_HUGE_METHOD_LIMIT = 8000
+
   /**
    * Compile the Java source code into a Java class, using Janino.
+   *
+   * @return a pair of a generated class and the max bytecode size of generated functions.
    */
-  def compile(code: CodeAndComment): GeneratedClass = try {
+  def compile(code: CodeAndComment): (GeneratedClass, Int) = try {
     cache.get(code)
   } catch {
     // Cache.get() may wrap the original exception. See the following URL
@@ -1036,7 +1028,7 @@ object CodeGenerator extends Logging {
   /**
    * Compile the Java source code into a Java class, using Janino.
    */
-  private[this] def doCompile(code: CodeAndComment): GeneratedClass = {
+  private[this] def doCompile(code: CodeAndComment): (GeneratedClass, Int) = {
     val evaluator = new ClassBodyEvaluator()
 
     // A special classloader used to wrap the actual parent classloader of
@@ -1075,9 +1067,9 @@ object CodeGenerator extends Logging {
       s"\n${CodeFormatter.format(code)}"
     })
 
-    try {
+    val maxCodeSize = try {
       evaluator.cook("generated.java", code.body)
-      recordCompilationStats(evaluator)
+      updateAndGetCompilationStats(evaluator)
     } catch {
       case e: JaninoRuntimeException =>
         val msg = s"failed to compile: $e"
@@ -1092,13 +1084,15 @@ object CodeGenerator extends Logging {
         logInfo(s"\n${CodeFormatter.format(code, maxLines)}")
         throw new CompileException(msg, e.getLocation)
     }
-    evaluator.getClazz().newInstance().asInstanceOf[GeneratedClass]
+
+    (evaluator.getClazz().newInstance().asInstanceOf[GeneratedClass], maxCodeSize)
   }
 
   /**
-   * Records the generated class and method bytecode sizes by inspecting janino private fields.
+   * Returns the max bytecode size of the generated functions by inspecting janino private fields.
+   * Also, this method updates the metrics information.
    */
-  private def recordCompilationStats(evaluator: ClassBodyEvaluator): Unit = {
+  private def updateAndGetCompilationStats(evaluator: ClassBodyEvaluator): Int = {
     // First retrieve the generated classes.
     val classes = {
       val resultField = classOf[SimpleCompiler].getDeclaredField("result")
@@ -1113,23 +1107,26 @@ object CodeGenerator extends Logging {
     val codeAttr = Utils.classForName("org.codehaus.janino.util.ClassFile$CodeAttribute")
     val codeAttrField = codeAttr.getDeclaredField("code")
     codeAttrField.setAccessible(true)
-    classes.foreach { case (_, classBytes) =>
+    val codeSizes = classes.flatMap { case (_, classBytes) =>
       CodegenMetrics.METRIC_GENERATED_CLASS_BYTECODE_SIZE.update(classBytes.length)
       try {
         val cf = new ClassFile(new ByteArrayInputStream(classBytes))
-        cf.methodInfos.asScala.foreach { method =>
-          method.getAttributes().foreach { a =>
-            if (a.getClass.getName == codeAttr.getName) {
-              CodegenMetrics.METRIC_GENERATED_METHOD_BYTECODE_SIZE.update(
-                codeAttrField.get(a).asInstanceOf[Array[Byte]].length)
-            }
+        val stats = cf.methodInfos.asScala.flatMap { method =>
+          method.getAttributes().filter(_.getClass.getName == codeAttr.getName).map { a =>
+            val byteCodeSize = codeAttrField.get(a).asInstanceOf[Array[Byte]].length
+            CodegenMetrics.METRIC_GENERATED_METHOD_BYTECODE_SIZE.update(byteCodeSize)
+            byteCodeSize
           }
         }
+        Some(stats)
       } catch {
         case NonFatal(e) =>
           logWarning("Error calculating stats of compiled class.", e)
+          None
       }
-    }
+    }.flatten
+
+    codeSizes.max
   }
 
   /**
@@ -1144,8 +1141,8 @@ object CodeGenerator extends Logging {
   private val cache = CacheBuilder.newBuilder()
     .maximumSize(100)
     .build(
-      new CacheLoader[CodeAndComment, GeneratedClass]() {
-        override def load(code: CodeAndComment): GeneratedClass = {
+      new CacheLoader[CodeAndComment, (GeneratedClass, Int)]() {
+        override def load(code: CodeAndComment): (GeneratedClass, Int) = {
           val startTime = System.nanoTime()
           val result = doCompile(code)
           val endTime = System.nanoTime()

http://git-wip-us.apache.org/repos/asf/spark/blob/4a779bda/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala
index 3768dcd..b5429fa 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala
@@ -142,7 +142,7 @@ object GenerateMutableProjection extends CodeGenerator[Seq[Expression], MutableP
       new CodeAndComment(codeBody, ctx.getPlaceHolderToComments()))
     logDebug(s"code for ${expressions.mkString(",")}:\n${CodeFormatter.format(code)}")
 
-    val c = CodeGenerator.compile(code)
-    c.generate(ctx.references.toArray).asInstanceOf[MutableProjection]
+    val (clazz, _) = CodeGenerator.compile(code)
+    clazz.generate(ctx.references.toArray).asInstanceOf[MutableProjection]
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/4a779bda/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala
index 4e47895..1639d1b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala
@@ -185,7 +185,8 @@ object GenerateOrdering extends CodeGenerator[Seq[SortOrder], Ordering[InternalR
       new CodeAndComment(codeBody, ctx.getPlaceHolderToComments()))
     logDebug(s"Generated Ordering by ${ordering.mkString(",")}:\n${CodeFormatter.format(code)}")
 
-    CodeGenerator.compile(code).generate(ctx.references.toArray).asInstanceOf[BaseOrdering]
+    val (clazz, _) = CodeGenerator.compile(code)
+    clazz.generate(ctx.references.toArray).asInstanceOf[BaseOrdering]
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/4a779bda/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GeneratePredicate.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GeneratePredicate.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GeneratePredicate.scala
index e35b9dd..e0fabad 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GeneratePredicate.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GeneratePredicate.scala
@@ -78,6 +78,7 @@ object GeneratePredicate extends CodeGenerator[Expression, Predicate] {
       new CodeAndComment(codeBody, ctx.getPlaceHolderToComments()))
     logDebug(s"Generated predicate '$predicate':\n${CodeFormatter.format(code)}")
 
-    CodeGenerator.compile(code).generate(ctx.references.toArray).asInstanceOf[Predicate]
+    val (clazz, _) = CodeGenerator.compile(code)
+    clazz.generate(ctx.references.toArray).asInstanceOf[Predicate]
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/4a779bda/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateSafeProjection.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateSafeProjection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateSafeProjection.scala
index 192701a..1e4ac3f 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateSafeProjection.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateSafeProjection.scala
@@ -189,8 +189,8 @@ object GenerateSafeProjection extends CodeGenerator[Seq[Expression], Projection]
       new CodeAndComment(codeBody, ctx.getPlaceHolderToComments()))
     logDebug(s"code for ${expressions.mkString(",")}:\n${CodeFormatter.format(code)}")
 
-    val c = CodeGenerator.compile(code)
+    val (clazz, _) = CodeGenerator.compile(code)
     val resultRow = new SpecificInternalRow(expressions.map(_.dataType))
-    c.generate(ctx.references.toArray :+ resultRow).asInstanceOf[Projection]
+    clazz.generate(ctx.references.toArray :+ resultRow).asInstanceOf[Projection]
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/4a779bda/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala
index f2a66ef..4bd50ae 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala
@@ -409,7 +409,7 @@ object GenerateUnsafeProjection extends CodeGenerator[Seq[Expression], UnsafePro
       new CodeAndComment(codeBody, ctx.getPlaceHolderToComments()))
     logDebug(s"code for ${expressions.mkString(",")}:\n${CodeFormatter.format(code)}")
 
-    val c = CodeGenerator.compile(code)
-    c.generate(ctx.references.toArray).asInstanceOf[UnsafeProjection]
+    val (clazz, _) = CodeGenerator.compile(code)
+    clazz.generate(ctx.references.toArray).asInstanceOf[UnsafeProjection]
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/4a779bda/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeRowJoiner.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeRowJoiner.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeRowJoiner.scala
index 4aa5ec8..6bc72a0 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeRowJoiner.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeRowJoiner.scala
@@ -196,7 +196,7 @@ object GenerateUnsafeRowJoiner extends CodeGenerator[(StructType, StructType), U
     val code = CodeFormatter.stripOverlappingComments(new CodeAndComment(codeBody, Map.empty))
     logDebug(s"SpecificUnsafeRowJoiner($schema1, $schema2):\n${CodeFormatter.format(code)}")
 
-    val c = CodeGenerator.compile(code)
-    c.generate(Array.empty).asInstanceOf[UnsafeRowJoiner]
+    val (clazz, _) = CodeGenerator.compile(code)
+    clazz.generate(Array.empty).asInstanceOf[UnsafeRowJoiner]
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/4a779bda/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 1a73d16..5832374 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -30,6 +30,7 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config._
 import org.apache.spark.network.util.ByteUnit
 import org.apache.spark.sql.catalyst.analysis.Resolver
+import org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator
 import org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter
 
 ////////////////////////////////////////////////////////////////////////////////////////////////////
@@ -575,15 +576,15 @@ object SQLConf {
       "disable logging or -1 to apply no limit.")
     .createWithDefault(1000)
 
-  val WHOLESTAGE_MAX_LINES_PER_FUNCTION = buildConf("spark.sql.codegen.maxLinesPerFunction")
+  val WHOLESTAGE_HUGE_METHOD_LIMIT = buildConf("spark.sql.codegen.hugeMethodLimit")
     .internal()
-    .doc("The maximum lines of a single Java function generated by whole-stage codegen. " +
-      "When the generated function exceeds this threshold, " +
+    .doc("The maximum bytecode size of a single compiled Java function generated by whole-stage " +
+      "codegen. When the compiled function exceeds this threshold, " +
       "the whole-stage codegen is deactivated for this subtree of the current query plan. " +
-      "The default value 4000 is the max length of byte code JIT supported " +
-      "for a single function(8000) divided by 2.")
+      s"The default value is ${CodeGenerator.DEFAULT_JVM_HUGE_METHOD_LIMIT} and " +
+      "this is a limit in the OpenJDK JVM implementation.")
     .intConf
-    .createWithDefault(4000)
+    .createWithDefault(CodeGenerator.DEFAULT_JVM_HUGE_METHOD_LIMIT)
 
   val FILES_MAX_PARTITION_BYTES = buildConf("spark.sql.files.maxPartitionBytes")
     .doc("The maximum number of bytes to pack into a single partition when reading files.")
@@ -1058,7 +1059,7 @@ class SQLConf extends Serializable with Logging {
 
   def loggingMaxLinesForCodegen: Int = getConf(CODEGEN_LOGGING_MAX_LINES)
 
-  def maxLinesPerFunction: Int = getConf(WHOLESTAGE_MAX_LINES_PER_FUNCTION)
+  def hugeMethodLimit: Int = getConf(WHOLESTAGE_HUGE_METHOD_LIMIT)
 
   def tableRelationCacheSize: Int =
     getConf(StaticSQLConf.FILESOURCE_TABLE_RELATION_CACHE_SIZE)

http://git-wip-us.apache.org/repos/asf/spark/blob/4a779bda/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeFormatterSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeFormatterSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeFormatterSuite.scala
index a0f1a64..9d0a416 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeFormatterSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeFormatterSuite.scala
@@ -53,38 +53,6 @@ class CodeFormatterSuite extends SparkFunSuite {
     assert(reducedCode.body === "/*project_c4*/")
   }
 
-  test("removing extra new lines and comments") {
-    val code =
-      """
-        |/*
-        |  * multi
-        |  * line
-        |  * comments
-        |  */
-        |
-        |public function() {
-        |/*comment*/
-        |  /*comment_with_space*/
-        |code_body
-        |//comment
-        |code_body
-        |  //comment_with_space
-        |
-        |code_body
-        |}
-      """.stripMargin
-
-    val reducedCode = CodeFormatter.stripExtraNewLinesAndComments(code)
-    assert(reducedCode ===
-      """
-        |public function() {
-        |code_body
-        |code_body
-        |code_body
-        |}
-      """.stripMargin)
-  }
-
   testCase("basic example") {
     """
       |class A {

http://git-wip-us.apache.org/repos/asf/spark/blob/4a779bda/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
index 268ccfa..9073d59 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
@@ -380,16 +380,8 @@ case class WholeStageCodegenExec(child: SparkPlan) extends UnaryExecNode with Co
 
   override def doExecute(): RDD[InternalRow] = {
     val (ctx, cleanedSource) = doCodeGen()
-    if (ctx.isTooLongGeneratedFunction) {
-      logWarning("Found too long generated codes and JIT optimization might not work, " +
-        "Whole-stage codegen disabled for this plan, " +
-        "You can change the config spark.sql.codegen.MaxFunctionLength " +
-        "to adjust the function length limit:\n "
-        + s"$treeString")
-      return child.execute()
-    }
     // try to compile and fallback if it failed
-    try {
+    val (_, maxCodeSize) = try {
       CodeGenerator.compile(cleanedSource)
     } catch {
       case _: Exception if !Utils.isTesting && sqlContext.conf.codegenFallback =>
@@ -397,6 +389,17 @@ case class WholeStageCodegenExec(child: SparkPlan) extends UnaryExecNode with Co
         logWarning(s"Whole-stage codegen disabled for this plan:\n $treeString")
         return child.execute()
     }
+
+    // Check if compiled code has a too large function
+    if (maxCodeSize > sqlContext.conf.hugeMethodLimit) {
+      logWarning(s"Found too long generated codes and JIT optimization might not work: " +
+        s"the bytecode size was $maxCodeSize, this value went over the limit " +
+        s"${sqlContext.conf.hugeMethodLimit}, and the whole-stage codegen was disabled " +
+        s"for this plan. To avoid this, you can raise the limit " +
+        s"${SQLConf.WHOLESTAGE_HUGE_METHOD_LIMIT.key}:\n$treeString")
+      return child.execute()
+    }
+
     val references = ctx.references.toArray
 
     val durationMs = longMetric("pipelineTime")
@@ -405,7 +408,7 @@ case class WholeStageCodegenExec(child: SparkPlan) extends UnaryExecNode with Co
     assert(rdds.size <= 2, "Up to two input RDDs can be supported")
     if (rdds.length == 1) {
       rdds.head.mapPartitionsWithIndex { (index, iter) =>
-        val clazz = CodeGenerator.compile(cleanedSource)
+        val (clazz, _) = CodeGenerator.compile(cleanedSource)
         val buffer = clazz.generate(references).asInstanceOf[BufferedRowIterator]
         buffer.init(index, Array(iter))
         new Iterator[InternalRow] {
@@ -424,7 +427,7 @@ case class WholeStageCodegenExec(child: SparkPlan) extends UnaryExecNode with Co
         // a small hack to obtain the correct partition index
       }.mapPartitionsWithIndex { (index, zippedIter) =>
         val (leftIter, rightIter) = zippedIter.next()
-        val clazz = CodeGenerator.compile(cleanedSource)
+        val (clazz, _) = CodeGenerator.compile(cleanedSource)
         val buffer = clazz.generate(references).asInstanceOf[BufferedRowIterator]
         buffer.init(index, Array(leftIter, rightIter))
         new Iterator[InternalRow] {

http://git-wip-us.apache.org/repos/asf/spark/blob/4a779bda/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala
index da34643..ae600c1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala
@@ -227,6 +227,7 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera
       new CodeAndComment(codeBody, ctx.getPlaceHolderToComments()))
     logDebug(s"Generated ColumnarIterator:\n${CodeFormatter.format(code)}")
 
-    CodeGenerator.compile(code).generate(Array.empty).asInstanceOf[ColumnarIterator]
+    val (clazz, _) = CodeGenerator.compile(code)
+    clazz.generate(Array.empty).asInstanceOf[ColumnarIterator]
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/4a779bda/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
index beeee6a..aaa77b3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
@@ -17,10 +17,8 @@
 
 package org.apache.spark.sql.execution
 
-import org.apache.spark.sql.{Column, Dataset, Row}
-import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
-import org.apache.spark.sql.catalyst.expressions.{Add, Literal, Stack}
-import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.expressions.codegen.{CodeAndComment, CodeGenerator}
 import org.apache.spark.sql.execution.aggregate.HashAggregateExec
 import org.apache.spark.sql.execution.joins.BroadcastHashJoinExec
 import org.apache.spark.sql.execution.joins.SortMergeJoinExec
@@ -151,7 +149,7 @@ class WholeStageCodegenSuite extends SparkPlanTest with SharedSQLContext {
     }
   }
 
-  def genGroupByCodeGenContext(caseNum: Int): CodegenContext = {
+  def genGroupByCode(caseNum: Int): CodeAndComment = {
     val caseExp = (1 to caseNum).map { i =>
       s"case when id > $i and id <= ${i + 1} then 1 else 0 end as v$i"
     }.toList
@@ -176,34 +174,15 @@ class WholeStageCodegenSuite extends SparkPlanTest with SharedSQLContext {
     })
 
     assert(wholeStageCodeGenExec.isDefined)
-    wholeStageCodeGenExec.get.asInstanceOf[WholeStageCodegenExec].doCodeGen()._1
+    wholeStageCodeGenExec.get.asInstanceOf[WholeStageCodegenExec].doCodeGen()._2
   }
 
-  test("SPARK-21603 check there is a too long generated function") {
-    withSQLConf(SQLConf.WHOLESTAGE_MAX_LINES_PER_FUNCTION.key -> "1500") {
-      val ctx = genGroupByCodeGenContext(30)
-      assert(ctx.isTooLongGeneratedFunction === true)
-    }
-  }
-
-  test("SPARK-21603 check there is not a too long generated function") {
-    withSQLConf(SQLConf.WHOLESTAGE_MAX_LINES_PER_FUNCTION.key -> "1500") {
-      val ctx = genGroupByCodeGenContext(1)
-      assert(ctx.isTooLongGeneratedFunction === false)
-    }
-  }
-
-  test("SPARK-21603 check there is not a too long generated function when threshold is Int.Max") {
-    withSQLConf(SQLConf.WHOLESTAGE_MAX_LINES_PER_FUNCTION.key -> Int.MaxValue.toString) {
-      val ctx = genGroupByCodeGenContext(30)
-      assert(ctx.isTooLongGeneratedFunction === false)
-    }
-  }
-
-  test("SPARK-21603 check there is a too long generated function when threshold is 0") {
-    withSQLConf(SQLConf.WHOLESTAGE_MAX_LINES_PER_FUNCTION.key -> "0") {
-      val ctx = genGroupByCodeGenContext(1)
-      assert(ctx.isTooLongGeneratedFunction === true)
-    }
+  test("SPARK-21871 check if we can get large code size when compiling too long functions") {
+    val codeWithShortFunctions = genGroupByCode(3)
+    val (_, maxCodeSize1) = CodeGenerator.compile(codeWithShortFunctions)
+    assert(maxCodeSize1 < SQLConf.WHOLESTAGE_HUGE_METHOD_LIMIT.defaultValue.get)
+    val codeWithLongFunctions = genGroupByCode(20)
+    val (_, maxCodeSize2) = CodeGenerator.compile(codeWithLongFunctions)
+    assert(maxCodeSize2 > SQLConf.WHOLESTAGE_HUGE_METHOD_LIMIT.defaultValue.get)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/4a779bda/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala
index 691fa9a..aca1be0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala
@@ -24,6 +24,7 @@ import org.apache.spark.memory.{StaticMemoryManager, TaskMemoryManager}
 import org.apache.spark.sql.catalyst.expressions.UnsafeRow
 import org.apache.spark.sql.execution.joins.LongToUnsafeRowMap
 import org.apache.spark.sql.execution.vectorized.AggregateHashMap
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.{LongType, StructType}
 import org.apache.spark.unsafe.Platform
 import org.apache.spark.unsafe.hash.Murmur3_x86_32
@@ -301,10 +302,10 @@ class AggregateBenchmark extends BenchmarkBase {
     */
   }
 
-  ignore("max function length of wholestagecodegen") {
+  ignore("max function bytecode size of wholestagecodegen") {
     val N = 20 << 15
 
-    val benchmark = new Benchmark("max function length of wholestagecodegen", N)
+    val benchmark = new Benchmark("max function bytecode size", N)
     def f(): Unit = sparkSession.range(N)
       .selectExpr(
         "id",
@@ -333,33 +334,34 @@ class AggregateBenchmark extends BenchmarkBase {
       .sum()
       .collect()
 
-    benchmark.addCase(s"codegen = F") { iter =>
-      sparkSession.conf.set("spark.sql.codegen.wholeStage", "false")
+    benchmark.addCase("codegen = F") { iter =>
+      sparkSession.conf.set(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key, "false")
       f()
     }
 
-    benchmark.addCase(s"codegen = T maxLinesPerFunction = 10000") { iter =>
-      sparkSession.conf.set("spark.sql.codegen.wholeStage", "true")
-      sparkSession.conf.set("spark.sql.codegen.maxLinesPerFunction", "10000")
+    benchmark.addCase("codegen = T hugeMethodLimit = 10000") { iter =>
+      sparkSession.conf.set(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key, "true")
+      sparkSession.conf.set(SQLConf.WHOLESTAGE_HUGE_METHOD_LIMIT.key, "10000")
       f()
     }
 
-    benchmark.addCase(s"codegen = T maxLinesPerFunction = 1500") { iter =>
-      sparkSession.conf.set("spark.sql.codegen.wholeStage", "true")
-      sparkSession.conf.set("spark.sql.codegen.maxLinesPerFunction", "1500")
+    benchmark.addCase("codegen = T hugeMethodLimit = 1500") { iter =>
+      sparkSession.conf.set(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key, "true")
+      sparkSession.conf.set(SQLConf.WHOLESTAGE_HUGE_METHOD_LIMIT.key, "1500")
       f()
     }
 
     benchmark.run()
 
     /*
-    Java HotSpot(TM) 64-Bit Server VM 1.8.0_111-b14 on Windows 7 6.1
-    Intel64 Family 6 Model 58 Stepping 9, GenuineIntel
-    max function length of wholestagecodegen: Best/Avg Time(ms)    Rate(M/s)  Per Row(ns)  Relative
-    ----------------------------------------------------------------------------------------------
-    codegen = F                                    462 /  533          1.4       704.4     1.0X
-    codegen = T maxLinesPerFunction = 10000       3444 / 3447          0.2      5255.3     0.1X
-    codegen = T maxLinesPerFunction = 1500         447 /  478          1.5       682.1     1.0X
+    Java HotSpot(TM) 64-Bit Server VM 1.8.0_31-b13 on Mac OS X 10.10.2
+    Intel(R) Core(TM) i7-4578U CPU @ 3.00GHz
+
+    max function bytecode size:              Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
+    ------------------------------------------------------------------------------------------------
+    codegen = F                                    709 /  803          0.9        1082.1       1.0X
+    codegen = T hugeMethodLimit = 10000           3485 / 3548          0.2        5317.7       0.2X
+    codegen = T hugeMethodLimit = 1500             636 /  701          1.0         969.9       1.1X
      */
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org