You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2017/06/15 05:45:16 UTC
spark git commit: [SPARK-18016][SQL][CATALYST] Code Generation:
Constant Pool Limit - Class Splitting
Repository: spark
Updated Branches:
refs/heads/master 205142817 -> b32b2123d
[SPARK-18016][SQL][CATALYST] Code Generation: Constant Pool Limit - Class Splitting
## What changes were proposed in this pull request?
This pull-request exclusively includes the class splitting feature described in #16648. When code for a given class would grow beyond 1600k bytes, a private, nested sub-class is generated into which subsequent functions are inlined. Additional sub-classes are generated as the code threshold is met subsequent times. This code includes 3 changes:
1. Includes helper maps, lists, and functions for keeping track of sub-classes during code generation (included in the `CodeGenerator` class). These helper functions allow nested classes and split functions to be initialized/declared/inlined to the appropriate locations in the various projection classes.
2. Changes `addNewFunction` to return a string to support instances where a split function is inlined to a nested class and not the outer class (and so must be invoked using the class-qualified name). Uses of `addNewFunction` throughout the codebase are modified so that the returned name is properly used.
3. Removes instances of the `this` keyword when used on data inside generated classes. All state declared in the outer class is by default global and accessible to the nested classes. However, if a reference to global state in a nested class is prepended with the `this` keyword, it would attempt to reference state belonging to the nested class (which would not exist), rather than the correct variable belonging to the outer class.
## How was this patch tested?
Added a test case to the `GeneratedProjectionSuite` that increases the number of columns tested in various projections to a threshold that would previously have triggered a `JaninoRuntimeException` for the Constant Pool.
Note: This PR does not address the second Constant Pool issue with code generation (also mentioned in #16648): excess global mutable state. A second PR may be opened to resolve that issue.
Author: ALeksander Eskilson <al...@cerner.com>
Closes #18075 from bdrillard/class_splitting_only.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b32b2123
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b32b2123
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b32b2123
Branch: refs/heads/master
Commit: b32b2123ddca66e00acf4c9d956232e07f779f9f
Parents: 2051428
Author: ALeksander Eskilson <al...@cerner.com>
Authored: Thu Jun 15 13:45:08 2017 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Thu Jun 15 13:45:08 2017 +0800
----------------------------------------------------------------------
sql/catalyst/pom.xml | 7 +
.../sql/catalyst/expressions/ScalaUDF.scala | 6 +-
.../expressions/codegen/CodeGenerator.scala | 140 ++++++++++++++++---
.../codegen/GenerateMutableProjection.scala | 17 ++-
.../expressions/codegen/GenerateOrdering.scala | 3 +
.../expressions/codegen/GeneratePredicate.scala | 3 +
.../codegen/GenerateSafeProjection.scala | 9 +-
.../codegen/GenerateUnsafeProjection.scala | 9 +-
.../expressions/complexTypeCreator.scala | 6 +-
.../expressions/conditionalExpressions.scala | 4 +-
.../sql/catalyst/expressions/generators.scala | 6 +-
.../catalyst/expressions/objects/objects.scala | 2 +-
.../codegen/GeneratedProjectionSuite.scala | 72 ++++++++--
sql/core/pom.xml | 7 +
.../spark/sql/execution/ColumnarBatchScan.scala | 6 +-
.../apache/spark/sql/execution/SortExec.scala | 4 +-
.../sql/execution/WholeStageCodegenExec.scala | 3 +
.../execution/aggregate/HashAggregateExec.scala | 8 +-
.../sql/execution/basicPhysicalOperators.scala | 11 +-
.../columnar/GenerateColumnAccessor.scala | 13 +-
.../sql/execution/joins/SortMergeJoinExec.scala | 2 +-
.../org/apache/spark/sql/execution/limit.scala | 2 +-
22 files changed, 259 insertions(+), 81 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/b32b2123/sql/catalyst/pom.xml
----------------------------------------------------------------------
diff --git a/sql/catalyst/pom.xml b/sql/catalyst/pom.xml
index 8d80f8e..36948ba 100644
--- a/sql/catalyst/pom.xml
+++ b/sql/catalyst/pom.xml
@@ -132,6 +132,13 @@
</executions>
</plugin>
<plugin>
+ <groupId>org.scalatest</groupId>
+ <artifactId>scalatest-maven-plugin</artifactId>
+ <configuration>
+ <argLine>-Xmx4g -Xss4096k -XX:MaxPermSize=${MaxPermGen} -XX:ReservedCodeCacheSize=512m</argLine>
+ </configuration>
+ </plugin>
+ <plugin>
<groupId>org.antlr</groupId>
<artifactId>antlr4-maven-plugin</artifactId>
<executions>
http://git-wip-us.apache.org/repos/asf/spark/blob/b32b2123/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala
index af1eba2..a54f6d0 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala
@@ -988,7 +988,7 @@ case class ScalaUDF(
val converterTerm = ctx.freshName("converter")
val expressionIdx = ctx.references.size - 1
ctx.addMutableState(converterClassName, converterTerm,
- s"this.$converterTerm = ($converterClassName)$typeConvertersClassName" +
+ s"$converterTerm = ($converterClassName)$typeConvertersClassName" +
s".createToScalaConverter(((${expressionClassName})((($scalaUDFClassName)" +
s"references[$expressionIdx]).getChildren().apply($index))).dataType());")
converterTerm
@@ -1005,7 +1005,7 @@ case class ScalaUDF(
// Generate codes used to convert the returned value of user-defined functions to Catalyst type
val catalystConverterTerm = ctx.freshName("catalystConverter")
ctx.addMutableState(converterClassName, catalystConverterTerm,
- s"this.$catalystConverterTerm = ($converterClassName)$typeConvertersClassName" +
+ s"$catalystConverterTerm = ($converterClassName)$typeConvertersClassName" +
s".createToCatalystConverter($scalaUDF.dataType());")
val resultTerm = ctx.freshName("result")
@@ -1019,7 +1019,7 @@ case class ScalaUDF(
val funcTerm = ctx.freshName("udf")
ctx.addMutableState(funcClassName, funcTerm,
- s"this.$funcTerm = ($funcClassName)$scalaUDF.userDefinedFunc();")
+ s"$funcTerm = ($funcClassName)$scalaUDF.userDefinedFunc();")
// codegen for children expressions
val evals = children.map(_.genCode(ctx))
http://git-wip-us.apache.org/repos/asf/spark/blob/b32b2123/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
index fd97802..5158949 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
@@ -28,7 +28,6 @@ import scala.util.control.NonFatal
import com.google.common.cache.{CacheBuilder, CacheLoader}
import com.google.common.util.concurrent.{ExecutionError, UncheckedExecutionException}
-import org.apache.commons.lang3.exception.ExceptionUtils
import org.codehaus.commons.compiler.CompileException
import org.codehaus.janino.{ByteArrayClassLoader, ClassBodyEvaluator, JaninoRuntimeException, SimpleCompiler}
import org.codehaus.janino.util.ClassFile
@@ -113,7 +112,7 @@ class CodegenContext {
val idx = references.length
references += obj
val clsName = Option(className).getOrElse(obj.getClass.getName)
- addMutableState(clsName, term, s"this.$term = ($clsName) references[$idx];")
+ addMutableState(clsName, term, s"$term = ($clsName) references[$idx];")
term
}
@@ -203,16 +202,6 @@ class CodegenContext {
}
/**
- * Holding all the functions those will be added into generated class.
- */
- val addedFunctions: mutable.Map[String, String] =
- mutable.Map.empty[String, String]
-
- def addNewFunction(funcName: String, funcCode: String): Unit = {
- addedFunctions += ((funcName, funcCode))
- }
-
- /**
* Holds expressions that are equivalent. Used to perform subexpression elimination
* during codegen.
*
@@ -233,10 +222,118 @@ class CodegenContext {
// The collection of sub-expression result resetting methods that need to be called on each row.
val subexprFunctions = mutable.ArrayBuffer.empty[String]
- def declareAddedFunctions(): String = {
- addedFunctions.map { case (funcName, funcCode) => funcCode }.mkString("\n")
+ val outerClassName = "OuterClass"
+
+ /**
+ * Holds the class and instance names to be generated, where `OuterClass` is a placeholder
+ * standing for whichever class is generated as the outermost class and which will contain any
+ * nested sub-classes. All other classes and instance names in this list will represent private,
+ * nested sub-classes.
+ */
+ private val classes: mutable.ListBuffer[(String, String)] =
+ mutable.ListBuffer[(String, String)](outerClassName -> null)
+
+ // A map holding the current size in bytes of each class to be generated.
+ private val classSize: mutable.Map[String, Int] =
+ mutable.Map[String, Int](outerClassName -> 0)
+
+ // Nested maps holding function names and their code belonging to each class.
+ private val classFunctions: mutable.Map[String, mutable.Map[String, String]] =
+ mutable.Map(outerClassName -> mutable.Map.empty[String, String])
+
+ // Returns the size of the most recently added class.
+ private def currClassSize(): Int = classSize(classes.head._1)
+
+ // Returns the class name and instance name for the most recently added class.
+ private def currClass(): (String, String) = classes.head
+
+ // Adds a new class. Requires the class' name, and its instance name.
+ private def addClass(className: String, classInstance: String): Unit = {
+ classes.prepend(className -> classInstance)
+ classSize += className -> 0
+ classFunctions += className -> mutable.Map.empty[String, String]
+ }
+
+ /**
+ * Adds a function to the generated class. If the code for the `OuterClass` grows too large, the
+ * function will be inlined into a new private, nested class, and a class-qualified name for the
+ * function will be returned. Otherwise, the function will be inined to the `OuterClass` the
+ * simple `funcName` will be returned.
+ *
+ * @param funcName the class-unqualified name of the function
+ * @param funcCode the body of the function
+ * @param inlineToOuterClass whether the given code must be inlined to the `OuterClass`. This
+ * can be necessary when a function is declared outside of the context
+ * it is eventually referenced and a returned qualified function name
+ * cannot otherwise be accessed.
+ * @return the name of the function, qualified by class if it will be inlined to a private,
+ * nested sub-class
+ */
+ def addNewFunction(
+ funcName: String,
+ funcCode: String,
+ inlineToOuterClass: Boolean = false): String = {
+ // The number of named constants that can exist in the class is limited by the Constant Pool
+ // limit, 65,536. We cannot know how many constants will be inserted for a class, so we use a
+ // threshold of 1600k bytes to determine when a function should be inlined to a private, nested
+ // sub-class.
+ val (className, classInstance) = if (inlineToOuterClass) {
+ outerClassName -> ""
+ } else if (currClassSize > 1600000) {
+ val className = freshName("NestedClass")
+ val classInstance = freshName("nestedClassInstance")
+
+ addClass(className, classInstance)
+
+ className -> classInstance
+ } else {
+ currClass()
+ }
+
+ classSize(className) += funcCode.length
+ classFunctions(className) += funcName -> funcCode
+
+ if (className == outerClassName) {
+ funcName
+ } else {
+
+ s"$classInstance.$funcName"
+ }
+ }
+
+ /**
+ * Instantiates all nested, private sub-classes as objects to the `OuterClass`
+ */
+ private[sql] def initNestedClasses(): String = {
+ // Nested, private sub-classes have no mutable state (though they do reference the outer class'
+ // mutable state), so we declare and initialize them inline to the OuterClass.
+ classes.filter(_._1 != outerClassName).map {
+ case (className, classInstance) =>
+ s"private $className $classInstance = new $className();"
+ }.mkString("\n")
+ }
+
+ /**
+ * Declares all function code that should be inlined to the `OuterClass`.
+ */
+ private[sql] def declareAddedFunctions(): String = {
+ classFunctions(outerClassName).values.mkString("\n")
}
+ /**
+ * Declares all nested, private sub-classes and the function code that should be inlined to them.
+ */
+ private[sql] def declareNestedClasses(): String = {
+ classFunctions.filterKeys(_ != outerClassName).map {
+ case (className, functions) =>
+ s"""
+ |private class $className {
+ | ${functions.values.mkString("\n")}
+ |}
+ """.stripMargin
+ }
+ }.mkString("\n")
+
final val JAVA_BOOLEAN = "boolean"
final val JAVA_BYTE = "byte"
final val JAVA_SHORT = "short"
@@ -556,8 +653,7 @@ class CodegenContext {
return 0;
}
"""
- addNewFunction(compareFunc, funcCode)
- s"this.$compareFunc($c1, $c2)"
+ s"${addNewFunction(compareFunc, funcCode)}($c1, $c2)"
case schema: StructType =>
val comparisons = GenerateOrdering.genComparisons(this, schema)
val compareFunc = freshName("compareStruct")
@@ -573,8 +669,7 @@ class CodegenContext {
return 0;
}
"""
- addNewFunction(compareFunc, funcCode)
- s"this.$compareFunc($c1, $c2)"
+ s"${addNewFunction(compareFunc, funcCode)}($c1, $c2)"
case other if other.isInstanceOf[AtomicType] => s"$c1.compare($c2)"
case udt: UserDefinedType[_] => genComp(udt.sqlType, c1, c2)
case _ =>
@@ -629,7 +724,9 @@ class CodegenContext {
/**
* Splits the generated code of expressions into multiple functions, because function has
- * 64kb code size limit in JVM
+ * 64kb code size limit in JVM. If the class to which the function would be inlined would grow
+ * beyond 1600kb, we declare a private, nested sub-class, and the function is inlined to it
+ * instead, because classes have a constant pool limit of 65,536 named values.
*
* @param row the variable name of row that is used by expressions
* @param expressions the codes to evaluate expressions.
@@ -689,7 +786,6 @@ class CodegenContext {
|}
""".stripMargin
addNewFunction(name, code)
- name
}
foldFunctions(functions.map(name => s"$name(${arguments.map(_._2).mkString(", ")})"))
@@ -773,8 +869,6 @@ class CodegenContext {
|}
""".stripMargin
- addNewFunction(fnName, fn)
-
// Add a state and a mapping of the common subexpressions that are associate with this
// state. Adding this expression to subExprEliminationExprMap means it will call `fn`
// when it is code generated. This decision should be a cost based one.
@@ -792,7 +886,7 @@ class CodegenContext {
addMutableState(javaType(expr.dataType), value,
s"$value = ${defaultValue(expr.dataType)};")
- subexprFunctions += s"$fnName($INPUT_ROW);"
+ subexprFunctions += s"${addNewFunction(fnName, fn)}($INPUT_ROW);"
val state = SubExprEliminationState(isNull, value)
e.foreach(subExprEliminationExprs.put(_, state))
}
http://git-wip-us.apache.org/repos/asf/spark/blob/b32b2123/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala
index 4d73244..6357668 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala
@@ -63,21 +63,21 @@ object GenerateMutableProjection extends CodeGenerator[Seq[Expression], MutableP
if (e.nullable) {
val isNull = s"isNull_$i"
val value = s"value_$i"
- ctx.addMutableState("boolean", isNull, s"this.$isNull = true;")
+ ctx.addMutableState("boolean", isNull, s"$isNull = true;")
ctx.addMutableState(ctx.javaType(e.dataType), value,
- s"this.$value = ${ctx.defaultValue(e.dataType)};")
+ s"$value = ${ctx.defaultValue(e.dataType)};")
s"""
${ev.code}
- this.$isNull = ${ev.isNull};
- this.$value = ${ev.value};
+ $isNull = ${ev.isNull};
+ $value = ${ev.value};
"""
} else {
val value = s"value_$i"
ctx.addMutableState(ctx.javaType(e.dataType), value,
- s"this.$value = ${ctx.defaultValue(e.dataType)};")
+ s"$value = ${ctx.defaultValue(e.dataType)};")
s"""
${ev.code}
- this.$value = ${ev.value};
+ $value = ${ev.value};
"""
}
}
@@ -87,7 +87,7 @@ object GenerateMutableProjection extends CodeGenerator[Seq[Expression], MutableP
val updates = validExpr.zip(index).map {
case (e, i) =>
- val ev = ExprCode("", s"this.isNull_$i", s"this.value_$i")
+ val ev = ExprCode("", s"isNull_$i", s"value_$i")
ctx.updateColumn("mutableRow", e.dataType, i, ev, e.nullable)
}
@@ -135,6 +135,9 @@ object GenerateMutableProjection extends CodeGenerator[Seq[Expression], MutableP
$allUpdates
return mutableRow;
}
+
+ ${ctx.initNestedClasses()}
+ ${ctx.declareNestedClasses()}
}
"""
http://git-wip-us.apache.org/repos/asf/spark/blob/b32b2123/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala
index f7fc2d5..a319432 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala
@@ -179,6 +179,9 @@ object GenerateOrdering extends CodeGenerator[Seq[SortOrder], Ordering[InternalR
$comparisons
return 0;
}
+
+ ${ctx.initNestedClasses()}
+ ${ctx.declareNestedClasses()}
}"""
val code = CodeFormatter.stripOverlappingComments(
http://git-wip-us.apache.org/repos/asf/spark/blob/b32b2123/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GeneratePredicate.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GeneratePredicate.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GeneratePredicate.scala
index dcd1ed9..b400783 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GeneratePredicate.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GeneratePredicate.scala
@@ -72,6 +72,9 @@ object GeneratePredicate extends CodeGenerator[Expression, Predicate] {
${eval.code}
return !${eval.isNull} && ${eval.value};
}
+
+ ${ctx.initNestedClasses()}
+ ${ctx.declareNestedClasses()}
}"""
val code = CodeFormatter.stripOverlappingComments(
http://git-wip-us.apache.org/repos/asf/spark/blob/b32b2123/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateSafeProjection.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateSafeProjection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateSafeProjection.scala
index b1cb6ed..f708aef 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateSafeProjection.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateSafeProjection.scala
@@ -49,7 +49,7 @@ object GenerateSafeProjection extends CodeGenerator[Seq[Expression], Projection]
val output = ctx.freshName("safeRow")
val values = ctx.freshName("values")
// These expressions could be split into multiple functions
- ctx.addMutableState("Object[]", values, s"this.$values = null;")
+ ctx.addMutableState("Object[]", values, s"$values = null;")
val rowClass = classOf[GenericInternalRow].getName
@@ -65,10 +65,10 @@ object GenerateSafeProjection extends CodeGenerator[Seq[Expression], Projection]
val allFields = ctx.splitExpressions(tmp, fieldWriters)
val code = s"""
final InternalRow $tmp = $input;
- this.$values = new Object[${schema.length}];
+ $values = new Object[${schema.length}];
$allFields
final InternalRow $output = new $rowClass($values);
- this.$values = null;
+ $values = null;
"""
ExprCode(code, "false", output)
@@ -184,6 +184,9 @@ object GenerateSafeProjection extends CodeGenerator[Seq[Expression], Projection]
$allExpressions
return mutableRow;
}
+
+ ${ctx.initNestedClasses()}
+ ${ctx.declareNestedClasses()}
}
"""
http://git-wip-us.apache.org/repos/asf/spark/blob/b32b2123/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala
index efbbc03..6be69d1 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala
@@ -82,7 +82,7 @@ object GenerateUnsafeProjection extends CodeGenerator[Seq[Expression], UnsafePro
val rowWriterClass = classOf[UnsafeRowWriter].getName
val rowWriter = ctx.freshName("rowWriter")
ctx.addMutableState(rowWriterClass, rowWriter,
- s"this.$rowWriter = new $rowWriterClass($bufferHolder, ${inputs.length});")
+ s"$rowWriter = new $rowWriterClass($bufferHolder, ${inputs.length});")
val resetWriter = if (isTopLevel) {
// For top level row writer, it always writes to the beginning of the global buffer holder,
@@ -182,7 +182,7 @@ object GenerateUnsafeProjection extends CodeGenerator[Seq[Expression], UnsafePro
val arrayWriterClass = classOf[UnsafeArrayWriter].getName
val arrayWriter = ctx.freshName("arrayWriter")
ctx.addMutableState(arrayWriterClass, arrayWriter,
- s"this.$arrayWriter = new $arrayWriterClass();")
+ s"$arrayWriter = new $arrayWriterClass();")
val numElements = ctx.freshName("numElements")
val index = ctx.freshName("index")
val element = ctx.freshName("element")
@@ -321,7 +321,7 @@ object GenerateUnsafeProjection extends CodeGenerator[Seq[Expression], UnsafePro
val holder = ctx.freshName("holder")
val holderClass = classOf[BufferHolder].getName
ctx.addMutableState(holderClass, holder,
- s"this.$holder = new $holderClass($result, ${numVarLenFields * 32});")
+ s"$holder = new $holderClass($result, ${numVarLenFields * 32});")
val resetBufferHolder = if (numVarLenFields == 0) {
""
@@ -402,6 +402,9 @@ object GenerateUnsafeProjection extends CodeGenerator[Seq[Expression], UnsafePro
${eval.code.trim}
return ${eval.value};
}
+
+ ${ctx.initNestedClasses()}
+ ${ctx.declareNestedClasses()}
}
"""
http://git-wip-us.apache.org/repos/asf/spark/blob/b32b2123/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala
index b6675a8..98c4cbe 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala
@@ -93,7 +93,7 @@ private [sql] object GenArrayData {
if (!ctx.isPrimitiveType(elementType)) {
val genericArrayClass = classOf[GenericArrayData].getName
ctx.addMutableState("Object[]", arrayName,
- s"this.$arrayName = new Object[${numElements}];")
+ s"$arrayName = new Object[${numElements}];")
val assignments = elementsCode.zipWithIndex.map { case (eval, i) =>
val isNullAssignment = if (!isMapKey) {
@@ -340,7 +340,7 @@ case class CreateNamedStruct(children: Seq[Expression]) extends CreateNamedStruc
override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
val rowClass = classOf[GenericInternalRow].getName
val values = ctx.freshName("values")
- ctx.addMutableState("Object[]", values, s"this.$values = null;")
+ ctx.addMutableState("Object[]", values, s"$values = null;")
ev.copy(code = s"""
$values = new Object[${valExprs.size}];""" +
@@ -357,7 +357,7 @@ case class CreateNamedStruct(children: Seq[Expression]) extends CreateNamedStruc
}) +
s"""
final InternalRow ${ev.value} = new $rowClass($values);
- this.$values = null;
+ $values = null;
""", isNull = "false")
}
http://git-wip-us.apache.org/repos/asf/spark/blob/b32b2123/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala
index ee365fe..ae8efb6 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala
@@ -131,8 +131,8 @@ case class If(predicate: Expression, trueValue: Expression, falseValue: Expressi
| $globalValue = ${ev.value};
|}
""".stripMargin
- ctx.addNewFunction(funcName, funcBody)
- (funcName, globalIsNull, globalValue)
+ val fullFuncName = ctx.addNewFunction(funcName, funcBody)
+ (fullFuncName, globalIsNull, globalValue)
}
override def toString: String = s"if ($predicate) $trueValue else $falseValue"
http://git-wip-us.apache.org/repos/asf/spark/blob/b32b2123/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala
index e023f05..c217aa8 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala
@@ -200,7 +200,7 @@ case class Stack(children: Seq[Expression]) extends Generator {
override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
// Rows - we write these into an array.
val rowData = ctx.freshName("rows")
- ctx.addMutableState("InternalRow[]", rowData, s"this.$rowData = new InternalRow[$numRows];")
+ ctx.addMutableState("InternalRow[]", rowData, s"$rowData = new InternalRow[$numRows];")
val values = children.tail
val dataTypes = values.take(numFields).map(_.dataType)
val code = ctx.splitExpressions(ctx.INPUT_ROW, Seq.tabulate(numRows) { row =>
@@ -209,7 +209,7 @@ case class Stack(children: Seq[Expression]) extends Generator {
if (index < values.length) values(index) else Literal(null, dataTypes(col))
}
val eval = CreateStruct(fields).genCode(ctx)
- s"${eval.code}\nthis.$rowData[$row] = ${eval.value};"
+ s"${eval.code}\n$rowData[$row] = ${eval.value};"
})
// Create the collection.
@@ -217,7 +217,7 @@ case class Stack(children: Seq[Expression]) extends Generator {
ctx.addMutableState(
s"$wrapperClass<InternalRow>",
ev.value,
- s"this.${ev.value} = $wrapperClass$$.MODULE$$.make(this.$rowData);")
+ s"${ev.value} = $wrapperClass$$.MODULE$$.make($rowData);")
ev.copy(code = code, isNull = "false")
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/b32b2123/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
index 5bb0feb..073993c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
@@ -1163,7 +1163,7 @@ case class InitializeJavaBean(beanInstance: Expression, setters: Map[String, Exp
val code = s"""
${instanceGen.code}
- this.${javaBeanInstance} = ${instanceGen.value};
+ ${javaBeanInstance} = ${instanceGen.value};
if (!${instanceGen.isNull}) {
$initializeCode
}
http://git-wip-us.apache.org/repos/asf/spark/blob/b32b2123/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/GeneratedProjectionSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/GeneratedProjectionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/GeneratedProjectionSuite.scala
index b69b74b..58ea5b9 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/GeneratedProjectionSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/GeneratedProjectionSuite.scala
@@ -33,10 +33,10 @@ class GeneratedProjectionSuite extends SparkFunSuite {
test("generated projections on wider table") {
val N = 1000
- val wideRow1 = new GenericInternalRow((1 to N).toArray[Any])
+ val wideRow1 = new GenericInternalRow((0 until N).toArray[Any])
val schema1 = StructType((1 to N).map(i => StructField("", IntegerType)))
val wideRow2 = new GenericInternalRow(
- (1 to N).map(i => UTF8String.fromString(i.toString)).toArray[Any])
+ (0 until N).map(i => UTF8String.fromString(i.toString)).toArray[Any])
val schema2 = StructType((1 to N).map(i => StructField("", StringType)))
val joined = new JoinedRow(wideRow1, wideRow2)
val joinedSchema = StructType(schema1 ++ schema2)
@@ -48,12 +48,12 @@ class GeneratedProjectionSuite extends SparkFunSuite {
val unsafeProj = UnsafeProjection.create(nestedSchema)
val unsafe: UnsafeRow = unsafeProj(nested)
(0 until N).foreach { i =>
- val s = UTF8String.fromString((i + 1).toString)
- assert(i + 1 === unsafe.getInt(i + 2))
+ val s = UTF8String.fromString(i.toString)
+ assert(i === unsafe.getInt(i + 2))
assert(s === unsafe.getUTF8String(i + 2 + N))
- assert(i + 1 === unsafe.getStruct(0, N * 2).getInt(i))
+ assert(i === unsafe.getStruct(0, N * 2).getInt(i))
assert(s === unsafe.getStruct(0, N * 2).getUTF8String(i + N))
- assert(i + 1 === unsafe.getStruct(1, N * 2).getInt(i))
+ assert(i === unsafe.getStruct(1, N * 2).getInt(i))
assert(s === unsafe.getStruct(1, N * 2).getUTF8String(i + N))
}
@@ -62,13 +62,63 @@ class GeneratedProjectionSuite extends SparkFunSuite {
val result = safeProj(unsafe)
// Can't compare GenericInternalRow with JoinedRow directly
(0 until N).foreach { i =>
- val r = i + 1
- val s = UTF8String.fromString((i + 1).toString)
- assert(r === result.getInt(i + 2))
+ val s = UTF8String.fromString(i.toString)
+ assert(i === result.getInt(i + 2))
assert(s === result.getUTF8String(i + 2 + N))
- assert(r === result.getStruct(0, N * 2).getInt(i))
+ assert(i === result.getStruct(0, N * 2).getInt(i))
assert(s === result.getStruct(0, N * 2).getUTF8String(i + N))
- assert(r === result.getStruct(1, N * 2).getInt(i))
+ assert(i === result.getStruct(1, N * 2).getInt(i))
+ assert(s === result.getStruct(1, N * 2).getUTF8String(i + N))
+ }
+
+ // test generated MutableProjection
+ val exprs = nestedSchema.fields.zipWithIndex.map { case (f, i) =>
+ BoundReference(i, f.dataType, true)
+ }
+ val mutableProj = GenerateMutableProjection.generate(exprs)
+ val row1 = mutableProj(result)
+ assert(result === row1)
+ val row2 = mutableProj(result)
+ assert(result === row2)
+ }
+
+ test("SPARK-18016: generated projections on wider table requiring class-splitting") {
+ val N = 4000
+ val wideRow1 = new GenericInternalRow((0 until N).toArray[Any])
+ val schema1 = StructType((1 to N).map(i => StructField("", IntegerType)))
+ val wideRow2 = new GenericInternalRow(
+ (0 until N).map(i => UTF8String.fromString(i.toString)).toArray[Any])
+ val schema2 = StructType((1 to N).map(i => StructField("", StringType)))
+ val joined = new JoinedRow(wideRow1, wideRow2)
+ val joinedSchema = StructType(schema1 ++ schema2)
+ val nested = new JoinedRow(InternalRow(joined, joined), joined)
+ val nestedSchema = StructType(
+ Seq(StructField("", joinedSchema), StructField("", joinedSchema)) ++ joinedSchema)
+
+ // test generated UnsafeProjection
+ val unsafeProj = UnsafeProjection.create(nestedSchema)
+ val unsafe: UnsafeRow = unsafeProj(nested)
+ (0 until N).foreach { i =>
+ val s = UTF8String.fromString(i.toString)
+ assert(i === unsafe.getInt(i + 2))
+ assert(s === unsafe.getUTF8String(i + 2 + N))
+ assert(i === unsafe.getStruct(0, N * 2).getInt(i))
+ assert(s === unsafe.getStruct(0, N * 2).getUTF8String(i + N))
+ assert(i === unsafe.getStruct(1, N * 2).getInt(i))
+ assert(s === unsafe.getStruct(1, N * 2).getUTF8String(i + N))
+ }
+
+ // test generated SafeProjection
+ val safeProj = FromUnsafeProjection(nestedSchema)
+ val result = safeProj(unsafe)
+ // Can't compare GenericInternalRow with JoinedRow directly
+ (0 until N).foreach { i =>
+ val s = UTF8String.fromString(i.toString)
+ assert(i === result.getInt(i + 2))
+ assert(s === result.getUTF8String(i + 2 + N))
+ assert(i === result.getStruct(0, N * 2).getInt(i))
+ assert(s === result.getStruct(0, N * 2).getUTF8String(i + N))
+ assert(i === result.getStruct(1, N * 2).getInt(i))
assert(s === result.getStruct(1, N * 2).getUTF8String(i + N))
}
http://git-wip-us.apache.org/repos/asf/spark/blob/b32b2123/sql/core/pom.xml
----------------------------------------------------------------------
diff --git a/sql/core/pom.xml b/sql/core/pom.xml
index fe4be96..7327c9b 100644
--- a/sql/core/pom.xml
+++ b/sql/core/pom.xml
@@ -184,6 +184,13 @@
</executions>
</plugin>
<plugin>
+ <groupId>org.scalatest</groupId>
+ <artifactId>scalatest-maven-plugin</artifactId>
+ <configuration>
+ <argLine>-Xmx4g -Xss4096k -XX:MaxPermSize=${MaxPermGen} -XX:ReservedCodeCacheSize=512m</argLine>
+ </configuration>
+ </plugin>
+ <plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<executions>
http://git-wip-us.apache.org/repos/asf/spark/blob/b32b2123/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala
index e861166..74a47da 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala
@@ -93,7 +93,7 @@ private[sql] trait ColumnarBatchScan extends CodegenSupport {
}
val nextBatch = ctx.freshName("nextBatch")
- ctx.addNewFunction(nextBatch,
+ val nextBatchFuncName = ctx.addNewFunction(nextBatch,
s"""
|private void $nextBatch() throws java.io.IOException {
| long getBatchStart = System.nanoTime();
@@ -121,7 +121,7 @@ private[sql] trait ColumnarBatchScan extends CodegenSupport {
}
s"""
|if ($batch == null) {
- | $nextBatch();
+ | $nextBatchFuncName();
|}
|while ($batch != null) {
| int $numRows = $batch.numRows();
@@ -133,7 +133,7 @@ private[sql] trait ColumnarBatchScan extends CodegenSupport {
| }
| $idx = $numRows;
| $batch = null;
- | $nextBatch();
+ | $nextBatchFuncName();
|}
|$scanTimeMetric.add($scanTimeTotalNs / (1000 * 1000));
|$scanTimeTotalNs = 0;
http://git-wip-us.apache.org/repos/asf/spark/blob/b32b2123/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala
index f98ae82..ff71fd4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala
@@ -141,7 +141,7 @@ case class SortExec(
ctx.addMutableState("scala.collection.Iterator<UnsafeRow>", sortedIterator, "")
val addToSorter = ctx.freshName("addToSorter")
- ctx.addNewFunction(addToSorter,
+ val addToSorterFuncName = ctx.addNewFunction(addToSorter,
s"""
| private void $addToSorter() throws java.io.IOException {
| ${child.asInstanceOf[CodegenSupport].produce(ctx, this)}
@@ -160,7 +160,7 @@ case class SortExec(
s"""
| if ($needToSort) {
| long $spillSizeBefore = $metrics.memoryBytesSpilled();
- | $addToSorter();
+ | $addToSorterFuncName();
| $sortedIterator = $sorterVariable.sort();
| $sortTime.add($sorterVariable.getSortTimeNanos() / 1000000);
| $peakMemory.add($sorterVariable.getPeakMemoryUsage());
http://git-wip-us.apache.org/repos/asf/spark/blob/b32b2123/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
index ac30b11..0bd28e3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
@@ -357,6 +357,9 @@ case class WholeStageCodegenExec(child: SparkPlan) extends UnaryExecNode with Co
protected void processNext() throws java.io.IOException {
${code.trim}
}
+
+ ${ctx.initNestedClasses()}
+ ${ctx.declareNestedClasses()}
}
""".trim
http://git-wip-us.apache.org/repos/asf/spark/blob/b32b2123/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
index 9df5e58..5027a61 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
@@ -212,7 +212,7 @@ case class HashAggregateExec(
}
val doAgg = ctx.freshName("doAggregateWithoutKey")
- ctx.addNewFunction(doAgg,
+ val doAggFuncName = ctx.addNewFunction(doAgg,
s"""
| private void $doAgg() throws java.io.IOException {
| // initialize aggregation buffer
@@ -229,7 +229,7 @@ case class HashAggregateExec(
| while (!$initAgg) {
| $initAgg = true;
| long $beforeAgg = System.nanoTime();
- | $doAgg();
+ | $doAggFuncName();
| $aggTime.add((System.nanoTime() - $beforeAgg) / 1000000);
|
| // output the result
@@ -600,7 +600,7 @@ case class HashAggregateExec(
} else ""
}
- ctx.addNewFunction(doAgg,
+ val doAggFuncName = ctx.addNewFunction(doAgg,
s"""
${generateGenerateCode}
private void $doAgg() throws java.io.IOException {
@@ -681,7 +681,7 @@ case class HashAggregateExec(
if (!$initAgg) {
$initAgg = true;
long $beforeAgg = System.nanoTime();
- $doAgg();
+ $doAggFuncName();
$aggTime.add((System.nanoTime() - $beforeAgg) / 1000000);
}
http://git-wip-us.apache.org/repos/asf/spark/blob/b32b2123/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
index bd7a5c5..f3ca839 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
@@ -281,10 +281,8 @@ case class SampleExec(
val samplerClass = classOf[PoissonSampler[UnsafeRow]].getName
val initSampler = ctx.freshName("initSampler")
ctx.copyResult = true
- ctx.addMutableState(s"$samplerClass<UnsafeRow>", sampler,
- s"$initSampler();")
- ctx.addNewFunction(initSampler,
+ val initSamplerFuncName = ctx.addNewFunction(initSampler,
s"""
| private void $initSampler() {
| $sampler = new $samplerClass<UnsafeRow>($upperBound - $lowerBound, false);
@@ -299,6 +297,9 @@ case class SampleExec(
| }
""".stripMargin.trim)
+ ctx.addMutableState(s"$samplerClass<UnsafeRow>", sampler,
+ s"$initSamplerFuncName();")
+
val samplingCount = ctx.freshName("samplingCount")
s"""
| int $samplingCount = $sampler.sample();
@@ -394,7 +395,7 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range)
// The default size of a batch, which must be positive integer
val batchSize = 1000
- ctx.addNewFunction("initRange",
+ val initRangeFuncName = ctx.addNewFunction("initRange",
s"""
| private void initRange(int idx) {
| $BigInt index = $BigInt.valueOf(idx);
@@ -451,7 +452,7 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range)
| // initialize Range
| if (!$initTerm) {
| $initTerm = true;
- | initRange(partitionIndex);
+ | $initRangeFuncName(partitionIndex);
| }
|
| while (true) {
http://git-wip-us.apache.org/repos/asf/spark/blob/b32b2123/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala
index 14024d6..d3fa0dc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala
@@ -128,9 +128,7 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera
} else {
val groupedAccessorsItr = initializeAccessors.grouped(numberOfStatementsThreshold)
val groupedExtractorsItr = extractors.grouped(numberOfStatementsThreshold)
- var groupedAccessorsLength = 0
- groupedAccessorsItr.zipWithIndex.foreach { case (body, i) =>
- groupedAccessorsLength += 1
+ val accessorNames = groupedAccessorsItr.zipWithIndex.map { case (body, i) =>
val funcName = s"accessors$i"
val funcCode = s"""
|private void $funcName() {
@@ -139,7 +137,7 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera
""".stripMargin
ctx.addNewFunction(funcName, funcCode)
}
- groupedExtractorsItr.zipWithIndex.foreach { case (body, i) =>
+ val extractorNames = groupedExtractorsItr.zipWithIndex.map { case (body, i) =>
val funcName = s"extractors$i"
val funcCode = s"""
|private void $funcName() {
@@ -148,8 +146,8 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera
""".stripMargin
ctx.addNewFunction(funcName, funcCode)
}
- ((0 to groupedAccessorsLength - 1).map { i => s"accessors$i();" }.mkString("\n"),
- (0 to groupedAccessorsLength - 1).map { i => s"extractors$i();" }.mkString("\n"))
+ (accessorNames.map { accessorName => s"$accessorName();" }.mkString("\n"),
+ extractorNames.map { extractorName => s"$extractorName();"}.mkString("\n"))
}
val codeBody = s"""
@@ -224,6 +222,9 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera
unsafeRow.setTotalSize(bufferHolder.totalSize());
return unsafeRow;
}
+
+ ${ctx.initNestedClasses()}
+ ${ctx.declareNestedClasses()}
}"""
val code = CodeFormatter.stripOverlappingComments(
http://git-wip-us.apache.org/repos/asf/spark/blob/b32b2123/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
index 26fb610..8445c26 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
@@ -478,7 +478,7 @@ case class SortMergeJoinExec(
| }
| return false; // unreachable
|}
- """.stripMargin)
+ """.stripMargin, inlineToOuterClass = true)
(leftRow, matches)
}
http://git-wip-us.apache.org/repos/asf/spark/blob/b32b2123/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala
index 757fe21..73a0f87 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala
@@ -75,7 +75,7 @@ trait BaseLimitExec extends UnaryExecNode with CodegenSupport {
protected boolean stopEarly() {
return $stopEarly;
}
- """)
+ """, inlineToOuterClass = true)
val countTerm = ctx.freshName("count")
ctx.addMutableState("int", countTerm, s"$countTerm = 0;")
s"""
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org