You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2017/10/04 17:08:27 UTC
spark git commit: [SPARK-21871][SQL] Check actual bytecode size when
compiling generated code
Repository: spark
Updated Branches:
refs/heads/master 64df08b64 -> 4a779bdac
[SPARK-21871][SQL] Check actual bytecode size when compiling generated code
## What changes were proposed in this pull request?
This pr added code to check actual bytecode size when compiling generated code. In #18810, we added code to give up code compilation and use interpreter execution in `SparkPlan` if the line number of generated functions goes over `maxLinesPerFunction`. But, we already have code to collect metrics for compiled bytecode size in `CodeGenerator` object. So,we could easily reuse the code for this purpose.
## How was this patch tested?
Added tests in `WholeStageCodegenSuite`.
Author: Takeshi Yamamuro <ya...@apache.org>
Closes #19083 from maropu/SPARK-21871.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4a779bda
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4a779bda
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4a779bda
Branch: refs/heads/master
Commit: 4a779bdac3e75c17b7d36c5a009ba6c948fa9fb6
Parents: 64df08b
Author: Takeshi Yamamuro <ya...@apache.org>
Authored: Wed Oct 4 10:08:24 2017 -0700
Committer: gatorsmile <ga...@gmail.com>
Committed: Wed Oct 4 10:08:24 2017 -0700
----------------------------------------------------------------------
.../expressions/codegen/CodeFormatter.scala | 8 ---
.../expressions/codegen/CodeGenerator.scala | 59 ++++++++++----------
.../codegen/GenerateMutableProjection.scala | 4 +-
.../expressions/codegen/GenerateOrdering.scala | 3 +-
.../expressions/codegen/GeneratePredicate.scala | 3 +-
.../codegen/GenerateSafeProjection.scala | 4 +-
.../codegen/GenerateUnsafeProjection.scala | 4 +-
.../codegen/GenerateUnsafeRowJoiner.scala | 4 +-
.../org/apache/spark/sql/internal/SQLConf.scala | 15 ++---
.../codegen/CodeFormatterSuite.scala | 32 -----------
.../sql/execution/WholeStageCodegenExec.scala | 25 +++++----
.../columnar/GenerateColumnAccessor.scala | 3 +-
.../sql/execution/WholeStageCodegenSuite.scala | 43 ++++----------
.../benchmark/AggregateBenchmark.scala | 36 ++++++------
14 files changed, 94 insertions(+), 149 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/4a779bda/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeFormatter.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeFormatter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeFormatter.scala
index 7b398f4..60e600d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeFormatter.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeFormatter.scala
@@ -89,14 +89,6 @@ object CodeFormatter {
}
new CodeAndComment(code.result().trim(), map)
}
-
- def stripExtraNewLinesAndComments(input: String): String = {
- val commentReg =
- ("""([ |\t]*?\/\*[\s|\S]*?\*\/[ |\t]*?)|""" + // strip /*comment*/
- """([ |\t]*?\/\/[\s\S]*?\n)""").r // strip //comment
- val codeWithoutComment = commentReg.replaceAllIn(input, "")
- codeWithoutComment.replaceAll("""\n\s*\n""", "\n") // strip ExtraNewLines
- }
}
private class CodeFormatter {
http://git-wip-us.apache.org/repos/asf/spark/blob/4a779bda/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
index f3b4579..f9c5ef8 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
@@ -374,20 +374,6 @@ class CodegenContext {
private val placeHolderToComments = new mutable.HashMap[String, String]
/**
- * It will count the lines of every Java function generated by whole-stage codegen,
- * if there is a function of length greater than spark.sql.codegen.maxLinesPerFunction,
- * it will return true.
- */
- def isTooLongGeneratedFunction: Boolean = {
- classFunctions.values.exists { _.values.exists {
- code =>
- val codeWithoutComments = CodeFormatter.stripExtraNewLinesAndComments(code)
- codeWithoutComments.count(_ == '\n') > SQLConf.get.maxLinesPerFunction
- }
- }
- }
-
- /**
* Returns a term name that is unique within this instance of a `CodegenContext`.
*/
def freshName(name: String): String = synchronized {
@@ -1020,10 +1006,16 @@ abstract class CodeGenerator[InType <: AnyRef, OutType <: AnyRef] extends Loggin
}
object CodeGenerator extends Logging {
+
+ // This is the value of HugeMethodLimit in the OpenJDK JVM settings
+ val DEFAULT_JVM_HUGE_METHOD_LIMIT = 8000
+
/**
* Compile the Java source code into a Java class, using Janino.
+ *
+ * @return a pair of a generated class and the max bytecode size of generated functions.
*/
- def compile(code: CodeAndComment): GeneratedClass = try {
+ def compile(code: CodeAndComment): (GeneratedClass, Int) = try {
cache.get(code)
} catch {
// Cache.get() may wrap the original exception. See the following URL
@@ -1036,7 +1028,7 @@ object CodeGenerator extends Logging {
/**
* Compile the Java source code into a Java class, using Janino.
*/
- private[this] def doCompile(code: CodeAndComment): GeneratedClass = {
+ private[this] def doCompile(code: CodeAndComment): (GeneratedClass, Int) = {
val evaluator = new ClassBodyEvaluator()
// A special classloader used to wrap the actual parent classloader of
@@ -1075,9 +1067,9 @@ object CodeGenerator extends Logging {
s"\n${CodeFormatter.format(code)}"
})
- try {
+ val maxCodeSize = try {
evaluator.cook("generated.java", code.body)
- recordCompilationStats(evaluator)
+ updateAndGetCompilationStats(evaluator)
} catch {
case e: JaninoRuntimeException =>
val msg = s"failed to compile: $e"
@@ -1092,13 +1084,15 @@ object CodeGenerator extends Logging {
logInfo(s"\n${CodeFormatter.format(code, maxLines)}")
throw new CompileException(msg, e.getLocation)
}
- evaluator.getClazz().newInstance().asInstanceOf[GeneratedClass]
+
+ (evaluator.getClazz().newInstance().asInstanceOf[GeneratedClass], maxCodeSize)
}
/**
- * Records the generated class and method bytecode sizes by inspecting janino private fields.
+ * Returns the max bytecode size of the generated functions by inspecting janino private fields.
+ * Also, this method updates the metrics information.
*/
- private def recordCompilationStats(evaluator: ClassBodyEvaluator): Unit = {
+ private def updateAndGetCompilationStats(evaluator: ClassBodyEvaluator): Int = {
// First retrieve the generated classes.
val classes = {
val resultField = classOf[SimpleCompiler].getDeclaredField("result")
@@ -1113,23 +1107,26 @@ object CodeGenerator extends Logging {
val codeAttr = Utils.classForName("org.codehaus.janino.util.ClassFile$CodeAttribute")
val codeAttrField = codeAttr.getDeclaredField("code")
codeAttrField.setAccessible(true)
- classes.foreach { case (_, classBytes) =>
+ val codeSizes = classes.flatMap { case (_, classBytes) =>
CodegenMetrics.METRIC_GENERATED_CLASS_BYTECODE_SIZE.update(classBytes.length)
try {
val cf = new ClassFile(new ByteArrayInputStream(classBytes))
- cf.methodInfos.asScala.foreach { method =>
- method.getAttributes().foreach { a =>
- if (a.getClass.getName == codeAttr.getName) {
- CodegenMetrics.METRIC_GENERATED_METHOD_BYTECODE_SIZE.update(
- codeAttrField.get(a).asInstanceOf[Array[Byte]].length)
- }
+ val stats = cf.methodInfos.asScala.flatMap { method =>
+ method.getAttributes().filter(_.getClass.getName == codeAttr.getName).map { a =>
+ val byteCodeSize = codeAttrField.get(a).asInstanceOf[Array[Byte]].length
+ CodegenMetrics.METRIC_GENERATED_METHOD_BYTECODE_SIZE.update(byteCodeSize)
+ byteCodeSize
}
}
+ Some(stats)
} catch {
case NonFatal(e) =>
logWarning("Error calculating stats of compiled class.", e)
+ None
}
- }
+ }.flatten
+
+ codeSizes.max
}
/**
@@ -1144,8 +1141,8 @@ object CodeGenerator extends Logging {
private val cache = CacheBuilder.newBuilder()
.maximumSize(100)
.build(
- new CacheLoader[CodeAndComment, GeneratedClass]() {
- override def load(code: CodeAndComment): GeneratedClass = {
+ new CacheLoader[CodeAndComment, (GeneratedClass, Int)]() {
+ override def load(code: CodeAndComment): (GeneratedClass, Int) = {
val startTime = System.nanoTime()
val result = doCompile(code)
val endTime = System.nanoTime()
http://git-wip-us.apache.org/repos/asf/spark/blob/4a779bda/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala
index 3768dcd..b5429fa 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala
@@ -142,7 +142,7 @@ object GenerateMutableProjection extends CodeGenerator[Seq[Expression], MutableP
new CodeAndComment(codeBody, ctx.getPlaceHolderToComments()))
logDebug(s"code for ${expressions.mkString(",")}:\n${CodeFormatter.format(code)}")
- val c = CodeGenerator.compile(code)
- c.generate(ctx.references.toArray).asInstanceOf[MutableProjection]
+ val (clazz, _) = CodeGenerator.compile(code)
+ clazz.generate(ctx.references.toArray).asInstanceOf[MutableProjection]
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/4a779bda/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala
index 4e47895..1639d1b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala
@@ -185,7 +185,8 @@ object GenerateOrdering extends CodeGenerator[Seq[SortOrder], Ordering[InternalR
new CodeAndComment(codeBody, ctx.getPlaceHolderToComments()))
logDebug(s"Generated Ordering by ${ordering.mkString(",")}:\n${CodeFormatter.format(code)}")
- CodeGenerator.compile(code).generate(ctx.references.toArray).asInstanceOf[BaseOrdering]
+ val (clazz, _) = CodeGenerator.compile(code)
+ clazz.generate(ctx.references.toArray).asInstanceOf[BaseOrdering]
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/4a779bda/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GeneratePredicate.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GeneratePredicate.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GeneratePredicate.scala
index e35b9dd..e0fabad 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GeneratePredicate.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GeneratePredicate.scala
@@ -78,6 +78,7 @@ object GeneratePredicate extends CodeGenerator[Expression, Predicate] {
new CodeAndComment(codeBody, ctx.getPlaceHolderToComments()))
logDebug(s"Generated predicate '$predicate':\n${CodeFormatter.format(code)}")
- CodeGenerator.compile(code).generate(ctx.references.toArray).asInstanceOf[Predicate]
+ val (clazz, _) = CodeGenerator.compile(code)
+ clazz.generate(ctx.references.toArray).asInstanceOf[Predicate]
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/4a779bda/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateSafeProjection.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateSafeProjection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateSafeProjection.scala
index 192701a..1e4ac3f 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateSafeProjection.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateSafeProjection.scala
@@ -189,8 +189,8 @@ object GenerateSafeProjection extends CodeGenerator[Seq[Expression], Projection]
new CodeAndComment(codeBody, ctx.getPlaceHolderToComments()))
logDebug(s"code for ${expressions.mkString(",")}:\n${CodeFormatter.format(code)}")
- val c = CodeGenerator.compile(code)
+ val (clazz, _) = CodeGenerator.compile(code)
val resultRow = new SpecificInternalRow(expressions.map(_.dataType))
- c.generate(ctx.references.toArray :+ resultRow).asInstanceOf[Projection]
+ clazz.generate(ctx.references.toArray :+ resultRow).asInstanceOf[Projection]
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/4a779bda/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala
index f2a66ef..4bd50ae 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala
@@ -409,7 +409,7 @@ object GenerateUnsafeProjection extends CodeGenerator[Seq[Expression], UnsafePro
new CodeAndComment(codeBody, ctx.getPlaceHolderToComments()))
logDebug(s"code for ${expressions.mkString(",")}:\n${CodeFormatter.format(code)}")
- val c = CodeGenerator.compile(code)
- c.generate(ctx.references.toArray).asInstanceOf[UnsafeProjection]
+ val (clazz, _) = CodeGenerator.compile(code)
+ clazz.generate(ctx.references.toArray).asInstanceOf[UnsafeProjection]
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/4a779bda/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeRowJoiner.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeRowJoiner.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeRowJoiner.scala
index 4aa5ec8..6bc72a0 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeRowJoiner.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeRowJoiner.scala
@@ -196,7 +196,7 @@ object GenerateUnsafeRowJoiner extends CodeGenerator[(StructType, StructType), U
val code = CodeFormatter.stripOverlappingComments(new CodeAndComment(codeBody, Map.empty))
logDebug(s"SpecificUnsafeRowJoiner($schema1, $schema2):\n${CodeFormatter.format(code)}")
- val c = CodeGenerator.compile(code)
- c.generate(Array.empty).asInstanceOf[UnsafeRowJoiner]
+ val (clazz, _) = CodeGenerator.compile(code)
+ clazz.generate(Array.empty).asInstanceOf[UnsafeRowJoiner]
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/4a779bda/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 1a73d16..5832374 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -30,6 +30,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.network.util.ByteUnit
import org.apache.spark.sql.catalyst.analysis.Resolver
+import org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator
import org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter
////////////////////////////////////////////////////////////////////////////////////////////////////
@@ -575,15 +576,15 @@ object SQLConf {
"disable logging or -1 to apply no limit.")
.createWithDefault(1000)
- val WHOLESTAGE_MAX_LINES_PER_FUNCTION = buildConf("spark.sql.codegen.maxLinesPerFunction")
+ val WHOLESTAGE_HUGE_METHOD_LIMIT = buildConf("spark.sql.codegen.hugeMethodLimit")
.internal()
- .doc("The maximum lines of a single Java function generated by whole-stage codegen. " +
- "When the generated function exceeds this threshold, " +
+ .doc("The maximum bytecode size of a single compiled Java function generated by whole-stage " +
+ "codegen. When the compiled function exceeds this threshold, " +
"the whole-stage codegen is deactivated for this subtree of the current query plan. " +
- "The default value 4000 is the max length of byte code JIT supported " +
- "for a single function(8000) divided by 2.")
+ s"The default value is ${CodeGenerator.DEFAULT_JVM_HUGE_METHOD_LIMIT} and " +
+ "this is a limit in the OpenJDK JVM implementation.")
.intConf
- .createWithDefault(4000)
+ .createWithDefault(CodeGenerator.DEFAULT_JVM_HUGE_METHOD_LIMIT)
val FILES_MAX_PARTITION_BYTES = buildConf("spark.sql.files.maxPartitionBytes")
.doc("The maximum number of bytes to pack into a single partition when reading files.")
@@ -1058,7 +1059,7 @@ class SQLConf extends Serializable with Logging {
def loggingMaxLinesForCodegen: Int = getConf(CODEGEN_LOGGING_MAX_LINES)
- def maxLinesPerFunction: Int = getConf(WHOLESTAGE_MAX_LINES_PER_FUNCTION)
+ def hugeMethodLimit: Int = getConf(WHOLESTAGE_HUGE_METHOD_LIMIT)
def tableRelationCacheSize: Int =
getConf(StaticSQLConf.FILESOURCE_TABLE_RELATION_CACHE_SIZE)
http://git-wip-us.apache.org/repos/asf/spark/blob/4a779bda/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeFormatterSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeFormatterSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeFormatterSuite.scala
index a0f1a64..9d0a416 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeFormatterSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeFormatterSuite.scala
@@ -53,38 +53,6 @@ class CodeFormatterSuite extends SparkFunSuite {
assert(reducedCode.body === "/*project_c4*/")
}
- test("removing extra new lines and comments") {
- val code =
- """
- |/*
- | * multi
- | * line
- | * comments
- | */
- |
- |public function() {
- |/*comment*/
- | /*comment_with_space*/
- |code_body
- |//comment
- |code_body
- | //comment_with_space
- |
- |code_body
- |}
- """.stripMargin
-
- val reducedCode = CodeFormatter.stripExtraNewLinesAndComments(code)
- assert(reducedCode ===
- """
- |public function() {
- |code_body
- |code_body
- |code_body
- |}
- """.stripMargin)
- }
-
testCase("basic example") {
"""
|class A {
http://git-wip-us.apache.org/repos/asf/spark/blob/4a779bda/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
index 268ccfa..9073d59 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
@@ -380,16 +380,8 @@ case class WholeStageCodegenExec(child: SparkPlan) extends UnaryExecNode with Co
override def doExecute(): RDD[InternalRow] = {
val (ctx, cleanedSource) = doCodeGen()
- if (ctx.isTooLongGeneratedFunction) {
- logWarning("Found too long generated codes and JIT optimization might not work, " +
- "Whole-stage codegen disabled for this plan, " +
- "You can change the config spark.sql.codegen.MaxFunctionLength " +
- "to adjust the function length limit:\n "
- + s"$treeString")
- return child.execute()
- }
// try to compile and fallback if it failed
- try {
+ val (_, maxCodeSize) = try {
CodeGenerator.compile(cleanedSource)
} catch {
case _: Exception if !Utils.isTesting && sqlContext.conf.codegenFallback =>
@@ -397,6 +389,17 @@ case class WholeStageCodegenExec(child: SparkPlan) extends UnaryExecNode with Co
logWarning(s"Whole-stage codegen disabled for this plan:\n $treeString")
return child.execute()
}
+
+ // Check if compiled code has a too large function
+ if (maxCodeSize > sqlContext.conf.hugeMethodLimit) {
+ logWarning(s"Found too long generated codes and JIT optimization might not work: " +
+ s"the bytecode size was $maxCodeSize, this value went over the limit " +
+ s"${sqlContext.conf.hugeMethodLimit}, and the whole-stage codegen was disabled " +
+ s"for this plan. To avoid this, you can raise the limit " +
+ s"${SQLConf.WHOLESTAGE_HUGE_METHOD_LIMIT.key}:\n$treeString")
+ return child.execute()
+ }
+
val references = ctx.references.toArray
val durationMs = longMetric("pipelineTime")
@@ -405,7 +408,7 @@ case class WholeStageCodegenExec(child: SparkPlan) extends UnaryExecNode with Co
assert(rdds.size <= 2, "Up to two input RDDs can be supported")
if (rdds.length == 1) {
rdds.head.mapPartitionsWithIndex { (index, iter) =>
- val clazz = CodeGenerator.compile(cleanedSource)
+ val (clazz, _) = CodeGenerator.compile(cleanedSource)
val buffer = clazz.generate(references).asInstanceOf[BufferedRowIterator]
buffer.init(index, Array(iter))
new Iterator[InternalRow] {
@@ -424,7 +427,7 @@ case class WholeStageCodegenExec(child: SparkPlan) extends UnaryExecNode with Co
// a small hack to obtain the correct partition index
}.mapPartitionsWithIndex { (index, zippedIter) =>
val (leftIter, rightIter) = zippedIter.next()
- val clazz = CodeGenerator.compile(cleanedSource)
+ val (clazz, _) = CodeGenerator.compile(cleanedSource)
val buffer = clazz.generate(references).asInstanceOf[BufferedRowIterator]
buffer.init(index, Array(leftIter, rightIter))
new Iterator[InternalRow] {
http://git-wip-us.apache.org/repos/asf/spark/blob/4a779bda/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala
index da34643..ae600c1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala
@@ -227,6 +227,7 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera
new CodeAndComment(codeBody, ctx.getPlaceHolderToComments()))
logDebug(s"Generated ColumnarIterator:\n${CodeFormatter.format(code)}")
- CodeGenerator.compile(code).generate(Array.empty).asInstanceOf[ColumnarIterator]
+ val (clazz, _) = CodeGenerator.compile(code)
+ clazz.generate(Array.empty).asInstanceOf[ColumnarIterator]
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/4a779bda/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
index beeee6a..aaa77b3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
@@ -17,10 +17,8 @@
package org.apache.spark.sql.execution
-import org.apache.spark.sql.{Column, Dataset, Row}
-import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
-import org.apache.spark.sql.catalyst.expressions.{Add, Literal, Stack}
-import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.expressions.codegen.{CodeAndComment, CodeGenerator}
import org.apache.spark.sql.execution.aggregate.HashAggregateExec
import org.apache.spark.sql.execution.joins.BroadcastHashJoinExec
import org.apache.spark.sql.execution.joins.SortMergeJoinExec
@@ -151,7 +149,7 @@ class WholeStageCodegenSuite extends SparkPlanTest with SharedSQLContext {
}
}
- def genGroupByCodeGenContext(caseNum: Int): CodegenContext = {
+ def genGroupByCode(caseNum: Int): CodeAndComment = {
val caseExp = (1 to caseNum).map { i =>
s"case when id > $i and id <= ${i + 1} then 1 else 0 end as v$i"
}.toList
@@ -176,34 +174,15 @@ class WholeStageCodegenSuite extends SparkPlanTest with SharedSQLContext {
})
assert(wholeStageCodeGenExec.isDefined)
- wholeStageCodeGenExec.get.asInstanceOf[WholeStageCodegenExec].doCodeGen()._1
+ wholeStageCodeGenExec.get.asInstanceOf[WholeStageCodegenExec].doCodeGen()._2
}
- test("SPARK-21603 check there is a too long generated function") {
- withSQLConf(SQLConf.WHOLESTAGE_MAX_LINES_PER_FUNCTION.key -> "1500") {
- val ctx = genGroupByCodeGenContext(30)
- assert(ctx.isTooLongGeneratedFunction === true)
- }
- }
-
- test("SPARK-21603 check there is not a too long generated function") {
- withSQLConf(SQLConf.WHOLESTAGE_MAX_LINES_PER_FUNCTION.key -> "1500") {
- val ctx = genGroupByCodeGenContext(1)
- assert(ctx.isTooLongGeneratedFunction === false)
- }
- }
-
- test("SPARK-21603 check there is not a too long generated function when threshold is Int.Max") {
- withSQLConf(SQLConf.WHOLESTAGE_MAX_LINES_PER_FUNCTION.key -> Int.MaxValue.toString) {
- val ctx = genGroupByCodeGenContext(30)
- assert(ctx.isTooLongGeneratedFunction === false)
- }
- }
-
- test("SPARK-21603 check there is a too long generated function when threshold is 0") {
- withSQLConf(SQLConf.WHOLESTAGE_MAX_LINES_PER_FUNCTION.key -> "0") {
- val ctx = genGroupByCodeGenContext(1)
- assert(ctx.isTooLongGeneratedFunction === true)
- }
+ test("SPARK-21871 check if we can get large code size when compiling too long functions") {
+ val codeWithShortFunctions = genGroupByCode(3)
+ val (_, maxCodeSize1) = CodeGenerator.compile(codeWithShortFunctions)
+ assert(maxCodeSize1 < SQLConf.WHOLESTAGE_HUGE_METHOD_LIMIT.defaultValue.get)
+ val codeWithLongFunctions = genGroupByCode(20)
+ val (_, maxCodeSize2) = CodeGenerator.compile(codeWithLongFunctions)
+ assert(maxCodeSize2 > SQLConf.WHOLESTAGE_HUGE_METHOD_LIMIT.defaultValue.get)
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/4a779bda/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala
index 691fa9a..aca1be0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AggregateBenchmark.scala
@@ -24,6 +24,7 @@ import org.apache.spark.memory.{StaticMemoryManager, TaskMemoryManager}
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.execution.joins.LongToUnsafeRowMap
import org.apache.spark.sql.execution.vectorized.AggregateHashMap
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{LongType, StructType}
import org.apache.spark.unsafe.Platform
import org.apache.spark.unsafe.hash.Murmur3_x86_32
@@ -301,10 +302,10 @@ class AggregateBenchmark extends BenchmarkBase {
*/
}
- ignore("max function length of wholestagecodegen") {
+ ignore("max function bytecode size of wholestagecodegen") {
val N = 20 << 15
- val benchmark = new Benchmark("max function length of wholestagecodegen", N)
+ val benchmark = new Benchmark("max function bytecode size", N)
def f(): Unit = sparkSession.range(N)
.selectExpr(
"id",
@@ -333,33 +334,34 @@ class AggregateBenchmark extends BenchmarkBase {
.sum()
.collect()
- benchmark.addCase(s"codegen = F") { iter =>
- sparkSession.conf.set("spark.sql.codegen.wholeStage", "false")
+ benchmark.addCase("codegen = F") { iter =>
+ sparkSession.conf.set(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key, "false")
f()
}
- benchmark.addCase(s"codegen = T maxLinesPerFunction = 10000") { iter =>
- sparkSession.conf.set("spark.sql.codegen.wholeStage", "true")
- sparkSession.conf.set("spark.sql.codegen.maxLinesPerFunction", "10000")
+ benchmark.addCase("codegen = T hugeMethodLimit = 10000") { iter =>
+ sparkSession.conf.set(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key, "true")
+ sparkSession.conf.set(SQLConf.WHOLESTAGE_HUGE_METHOD_LIMIT.key, "10000")
f()
}
- benchmark.addCase(s"codegen = T maxLinesPerFunction = 1500") { iter =>
- sparkSession.conf.set("spark.sql.codegen.wholeStage", "true")
- sparkSession.conf.set("spark.sql.codegen.maxLinesPerFunction", "1500")
+ benchmark.addCase("codegen = T hugeMethodLimit = 1500") { iter =>
+ sparkSession.conf.set(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key, "true")
+ sparkSession.conf.set(SQLConf.WHOLESTAGE_HUGE_METHOD_LIMIT.key, "1500")
f()
}
benchmark.run()
/*
- Java HotSpot(TM) 64-Bit Server VM 1.8.0_111-b14 on Windows 7 6.1
- Intel64 Family 6 Model 58 Stepping 9, GenuineIntel
- max function length of wholestagecodegen: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
- ----------------------------------------------------------------------------------------------
- codegen = F 462 / 533 1.4 704.4 1.0X
- codegen = T maxLinesPerFunction = 10000 3444 / 3447 0.2 5255.3 0.1X
- codegen = T maxLinesPerFunction = 1500 447 / 478 1.5 682.1 1.0X
+ Java HotSpot(TM) 64-Bit Server VM 1.8.0_31-b13 on Mac OS X 10.10.2
+ Intel(R) Core(TM) i7-4578U CPU @ 3.00GHz
+
+ max function bytecode size: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
+ ------------------------------------------------------------------------------------------------
+ codegen = F 709 / 803 0.9 1082.1 1.0X
+ codegen = T hugeMethodLimit = 10000 3485 / 3548 0.2 5317.7 0.2X
+ codegen = T hugeMethodLimit = 1500 636 / 701 1.0 969.9 1.1X
*/
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org