You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2017/06/15 05:45:16 UTC

spark git commit: [SPARK-18016][SQL][CATALYST] Code Generation: Constant Pool Limit - Class Splitting

Repository: spark
Updated Branches:
  refs/heads/master 205142817 -> b32b2123d


[SPARK-18016][SQL][CATALYST] Code Generation: Constant Pool Limit - Class Splitting

## What changes were proposed in this pull request?

This pull-request exclusively includes the class splitting feature described in #16648. When code for a given class would grow beyond 1600k bytes, a private, nested sub-class is generated into which subsequent functions are inlined. Additional sub-classes are generated as the code threshold is met subsequent times. This code includes 3 changes:

1. Includes helper maps, lists, and functions for keeping track of sub-classes during code generation (included in the `CodeGenerator` class). These helper functions allow nested classes and split functions to be initialized/declared/inlined to the appropriate locations in the various projection classes.
2. Changes `addNewFunction` to return a string to support instances where a split function is inlined to a nested class and not the outer class (and so must be invoked using the class-qualified name). Uses of `addNewFunction` throughout the codebase are modified so that the returned name is properly used.
3. Removes instances of the `this` keyword when used on data inside generated classes. All state declared in the outer class is by default global and accessible to the nested classes. However, if a reference to global state in a nested class is prepended with the `this` keyword, it would attempt to reference state belonging to the nested class (which would not exist), rather than the correct variable belonging to the outer class.

## How was this patch tested?

Added a test case to the `GeneratedProjectionSuite` that increases the number of columns tested in various projections to a threshold that would previously have triggered a `JaninoRuntimeException` for the Constant Pool.

Note: This PR does not address the second Constant Pool issue with code generation (also mentioned in #16648): excess global mutable state. A second PR may be opened to resolve that issue.

Author: ALeksander Eskilson <al...@cerner.com>

Closes #18075 from bdrillard/class_splitting_only.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b32b2123
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b32b2123
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b32b2123

Branch: refs/heads/master
Commit: b32b2123ddca66e00acf4c9d956232e07f779f9f
Parents: 2051428
Author: ALeksander Eskilson <al...@cerner.com>
Authored: Thu Jun 15 13:45:08 2017 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Thu Jun 15 13:45:08 2017 +0800

----------------------------------------------------------------------
 sql/catalyst/pom.xml                            |   7 +
 .../sql/catalyst/expressions/ScalaUDF.scala     |   6 +-
 .../expressions/codegen/CodeGenerator.scala     | 140 ++++++++++++++++---
 .../codegen/GenerateMutableProjection.scala     |  17 ++-
 .../expressions/codegen/GenerateOrdering.scala  |   3 +
 .../expressions/codegen/GeneratePredicate.scala |   3 +
 .../codegen/GenerateSafeProjection.scala        |   9 +-
 .../codegen/GenerateUnsafeProjection.scala      |   9 +-
 .../expressions/complexTypeCreator.scala        |   6 +-
 .../expressions/conditionalExpressions.scala    |   4 +-
 .../sql/catalyst/expressions/generators.scala   |   6 +-
 .../catalyst/expressions/objects/objects.scala  |   2 +-
 .../codegen/GeneratedProjectionSuite.scala      |  72 ++++++++--
 sql/core/pom.xml                                |   7 +
 .../spark/sql/execution/ColumnarBatchScan.scala |   6 +-
 .../apache/spark/sql/execution/SortExec.scala   |   4 +-
 .../sql/execution/WholeStageCodegenExec.scala   |   3 +
 .../execution/aggregate/HashAggregateExec.scala |   8 +-
 .../sql/execution/basicPhysicalOperators.scala  |  11 +-
 .../columnar/GenerateColumnAccessor.scala       |  13 +-
 .../sql/execution/joins/SortMergeJoinExec.scala |   2 +-
 .../org/apache/spark/sql/execution/limit.scala  |   2 +-
 22 files changed, 259 insertions(+), 81 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b32b2123/sql/catalyst/pom.xml
----------------------------------------------------------------------
diff --git a/sql/catalyst/pom.xml b/sql/catalyst/pom.xml
index 8d80f8e..36948ba 100644
--- a/sql/catalyst/pom.xml
+++ b/sql/catalyst/pom.xml
@@ -132,6 +132,13 @@
         </executions>
       </plugin>
       <plugin>
+        <groupId>org.scalatest</groupId>
+        <artifactId>scalatest-maven-plugin</artifactId>
+        <configuration>
+          <argLine>-Xmx4g -Xss4096k -XX:MaxPermSize=${MaxPermGen} -XX:ReservedCodeCacheSize=512m</argLine>
+        </configuration>
+      </plugin>
+      <plugin>
         <groupId>org.antlr</groupId>
         <artifactId>antlr4-maven-plugin</artifactId>
         <executions>

http://git-wip-us.apache.org/repos/asf/spark/blob/b32b2123/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala
index af1eba2..a54f6d0 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala
@@ -988,7 +988,7 @@ case class ScalaUDF(
     val converterTerm = ctx.freshName("converter")
     val expressionIdx = ctx.references.size - 1
     ctx.addMutableState(converterClassName, converterTerm,
-      s"this.$converterTerm = ($converterClassName)$typeConvertersClassName" +
+      s"$converterTerm = ($converterClassName)$typeConvertersClassName" +
         s".createToScalaConverter(((${expressionClassName})((($scalaUDFClassName)" +
           s"references[$expressionIdx]).getChildren().apply($index))).dataType());")
     converterTerm
@@ -1005,7 +1005,7 @@ case class ScalaUDF(
     // Generate codes used to convert the returned value of user-defined functions to Catalyst type
     val catalystConverterTerm = ctx.freshName("catalystConverter")
     ctx.addMutableState(converterClassName, catalystConverterTerm,
-      s"this.$catalystConverterTerm = ($converterClassName)$typeConvertersClassName" +
+      s"$catalystConverterTerm = ($converterClassName)$typeConvertersClassName" +
         s".createToCatalystConverter($scalaUDF.dataType());")
 
     val resultTerm = ctx.freshName("result")
@@ -1019,7 +1019,7 @@ case class ScalaUDF(
 
     val funcTerm = ctx.freshName("udf")
     ctx.addMutableState(funcClassName, funcTerm,
-      s"this.$funcTerm = ($funcClassName)$scalaUDF.userDefinedFunc();")
+      s"$funcTerm = ($funcClassName)$scalaUDF.userDefinedFunc();")
 
     // codegen for children expressions
     val evals = children.map(_.genCode(ctx))

http://git-wip-us.apache.org/repos/asf/spark/blob/b32b2123/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
index fd97802..5158949 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
@@ -28,7 +28,6 @@ import scala.util.control.NonFatal
 
 import com.google.common.cache.{CacheBuilder, CacheLoader}
 import com.google.common.util.concurrent.{ExecutionError, UncheckedExecutionException}
-import org.apache.commons.lang3.exception.ExceptionUtils
 import org.codehaus.commons.compiler.CompileException
 import org.codehaus.janino.{ByteArrayClassLoader, ClassBodyEvaluator, JaninoRuntimeException, SimpleCompiler}
 import org.codehaus.janino.util.ClassFile
@@ -113,7 +112,7 @@ class CodegenContext {
     val idx = references.length
     references += obj
     val clsName = Option(className).getOrElse(obj.getClass.getName)
-    addMutableState(clsName, term, s"this.$term = ($clsName) references[$idx];")
+    addMutableState(clsName, term, s"$term = ($clsName) references[$idx];")
     term
   }
 
@@ -203,16 +202,6 @@ class CodegenContext {
   }
 
   /**
-   * Holding all the functions those will be added into generated class.
-   */
-  val addedFunctions: mutable.Map[String, String] =
-    mutable.Map.empty[String, String]
-
-  def addNewFunction(funcName: String, funcCode: String): Unit = {
-    addedFunctions += ((funcName, funcCode))
-  }
-
-  /**
    * Holds expressions that are equivalent. Used to perform subexpression elimination
    * during codegen.
    *
@@ -233,10 +222,118 @@ class CodegenContext {
   // The collection of sub-expression result resetting methods that need to be called on each row.
   val subexprFunctions = mutable.ArrayBuffer.empty[String]
 
-  def declareAddedFunctions(): String = {
-    addedFunctions.map { case (funcName, funcCode) => funcCode }.mkString("\n")
+  val outerClassName = "OuterClass"
+
+  /**
+   * Holds the class and instance names to be generated, where `OuterClass` is a placeholder
+   * standing for whichever class is generated as the outermost class and which will contain any
+   * nested sub-classes. All other classes and instance names in this list will represent private,
+   * nested sub-classes.
+   */
+  private val classes: mutable.ListBuffer[(String, String)] =
+    mutable.ListBuffer[(String, String)](outerClassName -> null)
+
+  // A map holding the current size in bytes of each class to be generated.
+  private val classSize: mutable.Map[String, Int] =
+    mutable.Map[String, Int](outerClassName -> 0)
+
+  // Nested maps holding function names and their code belonging to each class.
+  private val classFunctions: mutable.Map[String, mutable.Map[String, String]] =
+    mutable.Map(outerClassName -> mutable.Map.empty[String, String])
+
+  // Returns the size of the most recently added class.
+  private def currClassSize(): Int = classSize(classes.head._1)
+
+  // Returns the class name and instance name for the most recently added class.
+  private def currClass(): (String, String) = classes.head
+
+  // Adds a new class. Requires the class' name, and its instance name.
+  private def addClass(className: String, classInstance: String): Unit = {
+    classes.prepend(className -> classInstance)
+    classSize += className -> 0
+    classFunctions += className -> mutable.Map.empty[String, String]
+  }
+
+  /**
+   * Adds a function to the generated class. If the code for the `OuterClass` grows too large, the
+   * function will be inlined into a new private, nested class, and a class-qualified name for the
+   * function will be returned. Otherwise, the function will be inined to the `OuterClass` the
+   * simple `funcName` will be returned.
+   *
+   * @param funcName the class-unqualified name of the function
+   * @param funcCode the body of the function
+   * @param inlineToOuterClass whether the given code must be inlined to the `OuterClass`. This
+   *                           can be necessary when a function is declared outside of the context
+   *                           it is eventually referenced and a returned qualified function name
+   *                           cannot otherwise be accessed.
+   * @return the name of the function, qualified by class if it will be inlined to a private,
+   *         nested sub-class
+   */
+  def addNewFunction(
+      funcName: String,
+      funcCode: String,
+      inlineToOuterClass: Boolean = false): String = {
+    // The number of named constants that can exist in the class is limited by the Constant Pool
+    // limit, 65,536. We cannot know how many constants will be inserted for a class, so we use a
+    // threshold of 1600k bytes to determine when a function should be inlined to a private, nested
+    // sub-class.
+    val (className, classInstance) = if (inlineToOuterClass) {
+      outerClassName -> ""
+    } else if (currClassSize > 1600000) {
+      val className = freshName("NestedClass")
+      val classInstance = freshName("nestedClassInstance")
+
+      addClass(className, classInstance)
+
+      className -> classInstance
+    } else {
+      currClass()
+    }
+
+    classSize(className) += funcCode.length
+    classFunctions(className) += funcName -> funcCode
+
+    if (className == outerClassName) {
+      funcName
+    } else {
+
+      s"$classInstance.$funcName"
+    }
+  }
+
+  /**
+   * Instantiates all nested, private sub-classes as objects to the `OuterClass`
+   */
+  private[sql] def initNestedClasses(): String = {
+    // Nested, private sub-classes have no mutable state (though they do reference the outer class'
+    // mutable state), so we declare and initialize them inline to the OuterClass.
+    classes.filter(_._1 != outerClassName).map {
+      case (className, classInstance) =>
+        s"private $className $classInstance = new $className();"
+    }.mkString("\n")
+  }
+
+  /**
+   * Declares all function code that should be inlined to the `OuterClass`.
+   */
+  private[sql] def declareAddedFunctions(): String = {
+    classFunctions(outerClassName).values.mkString("\n")
   }
 
+  /**
+   * Declares all nested, private sub-classes and the function code that should be inlined to them.
+   */
+  private[sql] def declareNestedClasses(): String = {
+    classFunctions.filterKeys(_ != outerClassName).map {
+      case (className, functions) =>
+        s"""
+           |private class $className {
+           |  ${functions.values.mkString("\n")}
+           |}
+           """.stripMargin
+    }
+  }.mkString("\n")
+
   final val JAVA_BOOLEAN = "boolean"
   final val JAVA_BYTE = "byte"
   final val JAVA_SHORT = "short"
@@ -556,8 +653,7 @@ class CodegenContext {
             return 0;
           }
         """
-      addNewFunction(compareFunc, funcCode)
-      s"this.$compareFunc($c1, $c2)"
+      s"${addNewFunction(compareFunc, funcCode)}($c1, $c2)"
     case schema: StructType =>
       val comparisons = GenerateOrdering.genComparisons(this, schema)
       val compareFunc = freshName("compareStruct")
@@ -573,8 +669,7 @@ class CodegenContext {
             return 0;
           }
         """
-      addNewFunction(compareFunc, funcCode)
-      s"this.$compareFunc($c1, $c2)"
+      s"${addNewFunction(compareFunc, funcCode)}($c1, $c2)"
     case other if other.isInstanceOf[AtomicType] => s"$c1.compare($c2)"
     case udt: UserDefinedType[_] => genComp(udt.sqlType, c1, c2)
     case _ =>
@@ -629,7 +724,9 @@ class CodegenContext {
 
   /**
    * Splits the generated code of expressions into multiple functions, because function has
-   * 64kb code size limit in JVM
+   * 64kb code size limit in JVM. If the class to which the function would be inlined would grow
+   * beyond 1600kb, we declare a private, nested sub-class, and the function is inlined to it
+   * instead, because classes have a constant pool limit of 65,536 named values.
    *
    * @param row the variable name of row that is used by expressions
    * @param expressions the codes to evaluate expressions.
@@ -689,7 +786,6 @@ class CodegenContext {
            |}
          """.stripMargin
         addNewFunction(name, code)
-        name
       }
 
       foldFunctions(functions.map(name => s"$name(${arguments.map(_._2).mkString(", ")})"))
@@ -773,8 +869,6 @@ class CodegenContext {
            |}
            """.stripMargin
 
-      addNewFunction(fnName, fn)
-
       // Add a state and a mapping of the common subexpressions that are associate with this
       // state. Adding this expression to subExprEliminationExprMap means it will call `fn`
       // when it is code generated. This decision should be a cost based one.
@@ -792,7 +886,7 @@ class CodegenContext {
       addMutableState(javaType(expr.dataType), value,
         s"$value = ${defaultValue(expr.dataType)};")
 
-      subexprFunctions += s"$fnName($INPUT_ROW);"
+      subexprFunctions += s"${addNewFunction(fnName, fn)}($INPUT_ROW);"
       val state = SubExprEliminationState(isNull, value)
       e.foreach(subExprEliminationExprs.put(_, state))
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/b32b2123/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala
index 4d73244..6357668 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala
@@ -63,21 +63,21 @@ object GenerateMutableProjection extends CodeGenerator[Seq[Expression], MutableP
         if (e.nullable) {
           val isNull = s"isNull_$i"
           val value = s"value_$i"
-          ctx.addMutableState("boolean", isNull, s"this.$isNull = true;")
+          ctx.addMutableState("boolean", isNull, s"$isNull = true;")
           ctx.addMutableState(ctx.javaType(e.dataType), value,
-            s"this.$value = ${ctx.defaultValue(e.dataType)};")
+            s"$value = ${ctx.defaultValue(e.dataType)};")
           s"""
             ${ev.code}
-            this.$isNull = ${ev.isNull};
-            this.$value = ${ev.value};
+            $isNull = ${ev.isNull};
+            $value = ${ev.value};
            """
         } else {
           val value = s"value_$i"
           ctx.addMutableState(ctx.javaType(e.dataType), value,
-            s"this.$value = ${ctx.defaultValue(e.dataType)};")
+            s"$value = ${ctx.defaultValue(e.dataType)};")
           s"""
             ${ev.code}
-            this.$value = ${ev.value};
+            $value = ${ev.value};
            """
         }
     }
@@ -87,7 +87,7 @@ object GenerateMutableProjection extends CodeGenerator[Seq[Expression], MutableP
 
     val updates = validExpr.zip(index).map {
       case (e, i) =>
-        val ev = ExprCode("", s"this.isNull_$i", s"this.value_$i")
+        val ev = ExprCode("", s"isNull_$i", s"value_$i")
         ctx.updateColumn("mutableRow", e.dataType, i, ev, e.nullable)
     }
 
@@ -135,6 +135,9 @@ object GenerateMutableProjection extends CodeGenerator[Seq[Expression], MutableP
           $allUpdates
           return mutableRow;
         }
+
+        ${ctx.initNestedClasses()}
+        ${ctx.declareNestedClasses()}
       }
     """
 

http://git-wip-us.apache.org/repos/asf/spark/blob/b32b2123/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala
index f7fc2d5..a319432 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala
@@ -179,6 +179,9 @@ object GenerateOrdering extends CodeGenerator[Seq[SortOrder], Ordering[InternalR
           $comparisons
           return 0;
         }
+
+        ${ctx.initNestedClasses()}
+        ${ctx.declareNestedClasses()}
       }"""
 
     val code = CodeFormatter.stripOverlappingComments(

http://git-wip-us.apache.org/repos/asf/spark/blob/b32b2123/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GeneratePredicate.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GeneratePredicate.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GeneratePredicate.scala
index dcd1ed9..b400783 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GeneratePredicate.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GeneratePredicate.scala
@@ -72,6 +72,9 @@ object GeneratePredicate extends CodeGenerator[Expression, Predicate] {
           ${eval.code}
           return !${eval.isNull} && ${eval.value};
         }
+
+        ${ctx.initNestedClasses()}
+        ${ctx.declareNestedClasses()}
       }"""
 
     val code = CodeFormatter.stripOverlappingComments(

http://git-wip-us.apache.org/repos/asf/spark/blob/b32b2123/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateSafeProjection.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateSafeProjection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateSafeProjection.scala
index b1cb6ed..f708aef 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateSafeProjection.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateSafeProjection.scala
@@ -49,7 +49,7 @@ object GenerateSafeProjection extends CodeGenerator[Seq[Expression], Projection]
     val output = ctx.freshName("safeRow")
     val values = ctx.freshName("values")
     // These expressions could be split into multiple functions
-    ctx.addMutableState("Object[]", values, s"this.$values = null;")
+    ctx.addMutableState("Object[]", values, s"$values = null;")
 
     val rowClass = classOf[GenericInternalRow].getName
 
@@ -65,10 +65,10 @@ object GenerateSafeProjection extends CodeGenerator[Seq[Expression], Projection]
     val allFields = ctx.splitExpressions(tmp, fieldWriters)
     val code = s"""
       final InternalRow $tmp = $input;
-      this.$values = new Object[${schema.length}];
+      $values = new Object[${schema.length}];
       $allFields
       final InternalRow $output = new $rowClass($values);
-      this.$values = null;
+      $values = null;
     """
 
     ExprCode(code, "false", output)
@@ -184,6 +184,9 @@ object GenerateSafeProjection extends CodeGenerator[Seq[Expression], Projection]
           $allExpressions
           return mutableRow;
         }
+
+        ${ctx.initNestedClasses()}
+        ${ctx.declareNestedClasses()}
       }
     """
 

http://git-wip-us.apache.org/repos/asf/spark/blob/b32b2123/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala
index efbbc03..6be69d1 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala
@@ -82,7 +82,7 @@ object GenerateUnsafeProjection extends CodeGenerator[Seq[Expression], UnsafePro
     val rowWriterClass = classOf[UnsafeRowWriter].getName
     val rowWriter = ctx.freshName("rowWriter")
     ctx.addMutableState(rowWriterClass, rowWriter,
-      s"this.$rowWriter = new $rowWriterClass($bufferHolder, ${inputs.length});")
+      s"$rowWriter = new $rowWriterClass($bufferHolder, ${inputs.length});")
 
     val resetWriter = if (isTopLevel) {
       // For top level row writer, it always writes to the beginning of the global buffer holder,
@@ -182,7 +182,7 @@ object GenerateUnsafeProjection extends CodeGenerator[Seq[Expression], UnsafePro
     val arrayWriterClass = classOf[UnsafeArrayWriter].getName
     val arrayWriter = ctx.freshName("arrayWriter")
     ctx.addMutableState(arrayWriterClass, arrayWriter,
-      s"this.$arrayWriter = new $arrayWriterClass();")
+      s"$arrayWriter = new $arrayWriterClass();")
     val numElements = ctx.freshName("numElements")
     val index = ctx.freshName("index")
     val element = ctx.freshName("element")
@@ -321,7 +321,7 @@ object GenerateUnsafeProjection extends CodeGenerator[Seq[Expression], UnsafePro
     val holder = ctx.freshName("holder")
     val holderClass = classOf[BufferHolder].getName
     ctx.addMutableState(holderClass, holder,
-      s"this.$holder = new $holderClass($result, ${numVarLenFields * 32});")
+      s"$holder = new $holderClass($result, ${numVarLenFields * 32});")
 
     val resetBufferHolder = if (numVarLenFields == 0) {
       ""
@@ -402,6 +402,9 @@ object GenerateUnsafeProjection extends CodeGenerator[Seq[Expression], UnsafePro
           ${eval.code.trim}
           return ${eval.value};
         }
+
+        ${ctx.initNestedClasses()}
+        ${ctx.declareNestedClasses()}
       }
       """
 

http://git-wip-us.apache.org/repos/asf/spark/blob/b32b2123/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala
index b6675a8..98c4cbe 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala
@@ -93,7 +93,7 @@ private [sql] object GenArrayData {
     if (!ctx.isPrimitiveType(elementType)) {
       val genericArrayClass = classOf[GenericArrayData].getName
       ctx.addMutableState("Object[]", arrayName,
-        s"this.$arrayName = new Object[${numElements}];")
+        s"$arrayName = new Object[${numElements}];")
 
       val assignments = elementsCode.zipWithIndex.map { case (eval, i) =>
         val isNullAssignment = if (!isMapKey) {
@@ -340,7 +340,7 @@ case class CreateNamedStruct(children: Seq[Expression]) extends CreateNamedStruc
   override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
     val rowClass = classOf[GenericInternalRow].getName
     val values = ctx.freshName("values")
-    ctx.addMutableState("Object[]", values, s"this.$values = null;")
+    ctx.addMutableState("Object[]", values, s"$values = null;")
 
     ev.copy(code = s"""
       $values = new Object[${valExprs.size}];""" +
@@ -357,7 +357,7 @@ case class CreateNamedStruct(children: Seq[Expression]) extends CreateNamedStruc
         }) +
       s"""
         final InternalRow ${ev.value} = new $rowClass($values);
-        this.$values = null;
+        $values = null;
       """, isNull = "false")
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/b32b2123/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala
index ee365fe..ae8efb6 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala
@@ -131,8 +131,8 @@ case class If(predicate: Expression, trueValue: Expression, falseValue: Expressi
          |  $globalValue = ${ev.value};
          |}
          """.stripMargin
-    ctx.addNewFunction(funcName, funcBody)
-    (funcName, globalIsNull, globalValue)
+    val fullFuncName = ctx.addNewFunction(funcName, funcBody)
+    (fullFuncName, globalIsNull, globalValue)
   }
 
   override def toString: String = s"if ($predicate) $trueValue else $falseValue"

http://git-wip-us.apache.org/repos/asf/spark/blob/b32b2123/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala
index e023f05..c217aa8 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala
@@ -200,7 +200,7 @@ case class Stack(children: Seq[Expression]) extends Generator {
   override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
     // Rows - we write these into an array.
     val rowData = ctx.freshName("rows")
-    ctx.addMutableState("InternalRow[]", rowData, s"this.$rowData = new InternalRow[$numRows];")
+    ctx.addMutableState("InternalRow[]", rowData, s"$rowData = new InternalRow[$numRows];")
     val values = children.tail
     val dataTypes = values.take(numFields).map(_.dataType)
     val code = ctx.splitExpressions(ctx.INPUT_ROW, Seq.tabulate(numRows) { row =>
@@ -209,7 +209,7 @@ case class Stack(children: Seq[Expression]) extends Generator {
         if (index < values.length) values(index) else Literal(null, dataTypes(col))
       }
       val eval = CreateStruct(fields).genCode(ctx)
-      s"${eval.code}\nthis.$rowData[$row] = ${eval.value};"
+      s"${eval.code}\n$rowData[$row] = ${eval.value};"
     })
 
     // Create the collection.
@@ -217,7 +217,7 @@ case class Stack(children: Seq[Expression]) extends Generator {
     ctx.addMutableState(
       s"$wrapperClass<InternalRow>",
       ev.value,
-      s"this.${ev.value} = $wrapperClass$$.MODULE$$.make(this.$rowData);")
+      s"${ev.value} = $wrapperClass$$.MODULE$$.make($rowData);")
     ev.copy(code = code, isNull = "false")
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/b32b2123/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
index 5bb0feb..073993c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
@@ -1163,7 +1163,7 @@ case class InitializeJavaBean(beanInstance: Expression, setters: Map[String, Exp
 
     val code = s"""
       ${instanceGen.code}
-      this.${javaBeanInstance} = ${instanceGen.value};
+      ${javaBeanInstance} = ${instanceGen.value};
       if (!${instanceGen.isNull}) {
         $initializeCode
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/b32b2123/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/GeneratedProjectionSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/GeneratedProjectionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/GeneratedProjectionSuite.scala
index b69b74b..58ea5b9 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/GeneratedProjectionSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/GeneratedProjectionSuite.scala
@@ -33,10 +33,10 @@ class GeneratedProjectionSuite extends SparkFunSuite {
 
   test("generated projections on wider table") {
     val N = 1000
-    val wideRow1 = new GenericInternalRow((1 to N).toArray[Any])
+    val wideRow1 = new GenericInternalRow((0 until N).toArray[Any])
     val schema1 = StructType((1 to N).map(i => StructField("", IntegerType)))
     val wideRow2 = new GenericInternalRow(
-      (1 to N).map(i => UTF8String.fromString(i.toString)).toArray[Any])
+      (0 until N).map(i => UTF8String.fromString(i.toString)).toArray[Any])
     val schema2 = StructType((1 to N).map(i => StructField("", StringType)))
     val joined = new JoinedRow(wideRow1, wideRow2)
     val joinedSchema = StructType(schema1 ++ schema2)
@@ -48,12 +48,12 @@ class GeneratedProjectionSuite extends SparkFunSuite {
     val unsafeProj = UnsafeProjection.create(nestedSchema)
     val unsafe: UnsafeRow = unsafeProj(nested)
     (0 until N).foreach { i =>
-      val s = UTF8String.fromString((i + 1).toString)
-      assert(i + 1 === unsafe.getInt(i + 2))
+      val s = UTF8String.fromString(i.toString)
+      assert(i === unsafe.getInt(i + 2))
       assert(s === unsafe.getUTF8String(i + 2 + N))
-      assert(i + 1 === unsafe.getStruct(0, N * 2).getInt(i))
+      assert(i === unsafe.getStruct(0, N * 2).getInt(i))
       assert(s === unsafe.getStruct(0, N * 2).getUTF8String(i + N))
-      assert(i + 1 === unsafe.getStruct(1, N * 2).getInt(i))
+      assert(i === unsafe.getStruct(1, N * 2).getInt(i))
       assert(s === unsafe.getStruct(1, N * 2).getUTF8String(i + N))
     }
 
@@ -62,13 +62,63 @@ class GeneratedProjectionSuite extends SparkFunSuite {
     val result = safeProj(unsafe)
     // Can't compare GenericInternalRow with JoinedRow directly
     (0 until N).foreach { i =>
-      val r = i + 1
-      val s = UTF8String.fromString((i + 1).toString)
-      assert(r === result.getInt(i + 2))
+      val s = UTF8String.fromString(i.toString)
+      assert(i === result.getInt(i + 2))
       assert(s === result.getUTF8String(i + 2 + N))
-      assert(r === result.getStruct(0, N * 2).getInt(i))
+      assert(i === result.getStruct(0, N * 2).getInt(i))
       assert(s === result.getStruct(0, N * 2).getUTF8String(i + N))
-      assert(r === result.getStruct(1, N * 2).getInt(i))
+      assert(i === result.getStruct(1, N * 2).getInt(i))
+      assert(s === result.getStruct(1, N * 2).getUTF8String(i + N))
+    }
+
+    // test generated MutableProjection
+    val exprs = nestedSchema.fields.zipWithIndex.map { case (f, i) =>
+      BoundReference(i, f.dataType, true)
+    }
+    val mutableProj = GenerateMutableProjection.generate(exprs)
+    val row1 = mutableProj(result)
+    assert(result === row1)
+    val row2 = mutableProj(result)
+    assert(result === row2)
+  }
+
+  test("SPARK-18016: generated projections on wider table requiring class-splitting") {
+    val N = 4000
+    val wideRow1 = new GenericInternalRow((0 until N).toArray[Any])
+    val schema1 = StructType((1 to N).map(i => StructField("", IntegerType)))
+    val wideRow2 = new GenericInternalRow(
+      (0 until N).map(i => UTF8String.fromString(i.toString)).toArray[Any])
+    val schema2 = StructType((1 to N).map(i => StructField("", StringType)))
+    val joined = new JoinedRow(wideRow1, wideRow2)
+    val joinedSchema = StructType(schema1 ++ schema2)
+    val nested = new JoinedRow(InternalRow(joined, joined), joined)
+    val nestedSchema = StructType(
+      Seq(StructField("", joinedSchema), StructField("", joinedSchema)) ++ joinedSchema)
+
+    // test generated UnsafeProjection
+    val unsafeProj = UnsafeProjection.create(nestedSchema)
+    val unsafe: UnsafeRow = unsafeProj(nested)
+    (0 until N).foreach { i =>
+      val s = UTF8String.fromString(i.toString)
+      assert(i === unsafe.getInt(i + 2))
+      assert(s === unsafe.getUTF8String(i + 2 + N))
+      assert(i === unsafe.getStruct(0, N * 2).getInt(i))
+      assert(s === unsafe.getStruct(0, N * 2).getUTF8String(i + N))
+      assert(i === unsafe.getStruct(1, N * 2).getInt(i))
+      assert(s === unsafe.getStruct(1, N * 2).getUTF8String(i + N))
+    }
+
+    // test generated SafeProjection
+    val safeProj = FromUnsafeProjection(nestedSchema)
+    val result = safeProj(unsafe)
+    // Can't compare GenericInternalRow with JoinedRow directly
+    (0 until N).foreach { i =>
+      val s = UTF8String.fromString(i.toString)
+      assert(i === result.getInt(i + 2))
+      assert(s === result.getUTF8String(i + 2 + N))
+      assert(i === result.getStruct(0, N * 2).getInt(i))
+      assert(s === result.getStruct(0, N * 2).getUTF8String(i + N))
+      assert(i === result.getStruct(1, N * 2).getInt(i))
       assert(s === result.getStruct(1, N * 2).getUTF8String(i + N))
     }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/b32b2123/sql/core/pom.xml
----------------------------------------------------------------------
diff --git a/sql/core/pom.xml b/sql/core/pom.xml
index fe4be96..7327c9b 100644
--- a/sql/core/pom.xml
+++ b/sql/core/pom.xml
@@ -184,6 +184,13 @@
         </executions>
       </plugin>
       <plugin>
+        <groupId>org.scalatest</groupId>
+        <artifactId>scalatest-maven-plugin</artifactId>
+        <configuration>
+          <argLine>-Xmx4g -Xss4096k -XX:MaxPermSize=${MaxPermGen} -XX:ReservedCodeCacheSize=512m</argLine>
+        </configuration>
+      </plugin>
+      <plugin>
         <groupId>org.codehaus.mojo</groupId>
         <artifactId>build-helper-maven-plugin</artifactId>
         <executions>

http://git-wip-us.apache.org/repos/asf/spark/blob/b32b2123/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala
index e861166..74a47da 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala
@@ -93,7 +93,7 @@ private[sql] trait ColumnarBatchScan extends CodegenSupport {
     }
 
     val nextBatch = ctx.freshName("nextBatch")
-    ctx.addNewFunction(nextBatch,
+    val nextBatchFuncName = ctx.addNewFunction(nextBatch,
       s"""
          |private void $nextBatch() throws java.io.IOException {
          |  long getBatchStart = System.nanoTime();
@@ -121,7 +121,7 @@ private[sql] trait ColumnarBatchScan extends CodegenSupport {
     }
     s"""
        |if ($batch == null) {
-       |  $nextBatch();
+       |  $nextBatchFuncName();
        |}
        |while ($batch != null) {
        |  int $numRows = $batch.numRows();
@@ -133,7 +133,7 @@ private[sql] trait ColumnarBatchScan extends CodegenSupport {
        |  }
        |  $idx = $numRows;
        |  $batch = null;
-       |  $nextBatch();
+       |  $nextBatchFuncName();
        |}
        |$scanTimeMetric.add($scanTimeTotalNs / (1000 * 1000));
        |$scanTimeTotalNs = 0;

http://git-wip-us.apache.org/repos/asf/spark/blob/b32b2123/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala
index f98ae82..ff71fd4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala
@@ -141,7 +141,7 @@ case class SortExec(
     ctx.addMutableState("scala.collection.Iterator<UnsafeRow>", sortedIterator, "")
 
     val addToSorter = ctx.freshName("addToSorter")
-    ctx.addNewFunction(addToSorter,
+    val addToSorterFuncName = ctx.addNewFunction(addToSorter,
       s"""
         | private void $addToSorter() throws java.io.IOException {
         |   ${child.asInstanceOf[CodegenSupport].produce(ctx, this)}
@@ -160,7 +160,7 @@ case class SortExec(
     s"""
        | if ($needToSort) {
        |   long $spillSizeBefore = $metrics.memoryBytesSpilled();
-       |   $addToSorter();
+       |   $addToSorterFuncName();
        |   $sortedIterator = $sorterVariable.sort();
        |   $sortTime.add($sorterVariable.getSortTimeNanos() / 1000000);
        |   $peakMemory.add($sorterVariable.getPeakMemoryUsage());

http://git-wip-us.apache.org/repos/asf/spark/blob/b32b2123/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
index ac30b11..0bd28e3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
@@ -357,6 +357,9 @@ case class WholeStageCodegenExec(child: SparkPlan) extends UnaryExecNode with Co
         protected void processNext() throws java.io.IOException {
           ${code.trim}
         }
+
+        ${ctx.initNestedClasses()}
+        ${ctx.declareNestedClasses()}
       }
       """.trim
 

http://git-wip-us.apache.org/repos/asf/spark/blob/b32b2123/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
index 9df5e58..5027a61 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
@@ -212,7 +212,7 @@ case class HashAggregateExec(
     }
 
     val doAgg = ctx.freshName("doAggregateWithoutKey")
-    ctx.addNewFunction(doAgg,
+    val doAggFuncName = ctx.addNewFunction(doAgg,
       s"""
          | private void $doAgg() throws java.io.IOException {
          |   // initialize aggregation buffer
@@ -229,7 +229,7 @@ case class HashAggregateExec(
        | while (!$initAgg) {
        |   $initAgg = true;
        |   long $beforeAgg = System.nanoTime();
-       |   $doAgg();
+       |   $doAggFuncName();
        |   $aggTime.add((System.nanoTime() - $beforeAgg) / 1000000);
        |
        |   // output the result
@@ -600,7 +600,7 @@ case class HashAggregateExec(
       } else ""
     }
 
-    ctx.addNewFunction(doAgg,
+    val doAggFuncName = ctx.addNewFunction(doAgg,
       s"""
         ${generateGenerateCode}
         private void $doAgg() throws java.io.IOException {
@@ -681,7 +681,7 @@ case class HashAggregateExec(
      if (!$initAgg) {
        $initAgg = true;
        long $beforeAgg = System.nanoTime();
-       $doAgg();
+       $doAggFuncName();
        $aggTime.add((System.nanoTime() - $beforeAgg) / 1000000);
      }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/b32b2123/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
index bd7a5c5..f3ca839 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
@@ -281,10 +281,8 @@ case class SampleExec(
       val samplerClass = classOf[PoissonSampler[UnsafeRow]].getName
       val initSampler = ctx.freshName("initSampler")
       ctx.copyResult = true
-      ctx.addMutableState(s"$samplerClass<UnsafeRow>", sampler,
-        s"$initSampler();")
 
-      ctx.addNewFunction(initSampler,
+      val initSamplerFuncName = ctx.addNewFunction(initSampler,
         s"""
           | private void $initSampler() {
           |   $sampler = new $samplerClass<UnsafeRow>($upperBound - $lowerBound, false);
@@ -299,6 +297,9 @@ case class SampleExec(
           | }
          """.stripMargin.trim)
 
+      ctx.addMutableState(s"$samplerClass<UnsafeRow>", sampler,
+        s"$initSamplerFuncName();")
+
       val samplingCount = ctx.freshName("samplingCount")
       s"""
          | int $samplingCount = $sampler.sample();
@@ -394,7 +395,7 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range)
     // The default size of a batch, which must be positive integer
     val batchSize = 1000
 
-    ctx.addNewFunction("initRange",
+    val initRangeFuncName = ctx.addNewFunction("initRange",
       s"""
         | private void initRange(int idx) {
         |   $BigInt index = $BigInt.valueOf(idx);
@@ -451,7 +452,7 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range)
       | // initialize Range
       | if (!$initTerm) {
       |   $initTerm = true;
-      |   initRange(partitionIndex);
+      |   $initRangeFuncName(partitionIndex);
       | }
       |
       | while (true) {

http://git-wip-us.apache.org/repos/asf/spark/blob/b32b2123/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala
index 14024d6..d3fa0dc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala
@@ -128,9 +128,7 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera
       } else {
         val groupedAccessorsItr = initializeAccessors.grouped(numberOfStatementsThreshold)
         val groupedExtractorsItr = extractors.grouped(numberOfStatementsThreshold)
-        var groupedAccessorsLength = 0
-        groupedAccessorsItr.zipWithIndex.foreach { case (body, i) =>
-          groupedAccessorsLength += 1
+        val accessorNames = groupedAccessorsItr.zipWithIndex.map { case (body, i) =>
           val funcName = s"accessors$i"
           val funcCode = s"""
              |private void $funcName() {
@@ -139,7 +137,7 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera
            """.stripMargin
           ctx.addNewFunction(funcName, funcCode)
         }
-        groupedExtractorsItr.zipWithIndex.foreach { case (body, i) =>
+        val extractorNames = groupedExtractorsItr.zipWithIndex.map { case (body, i) =>
           val funcName = s"extractors$i"
           val funcCode = s"""
              |private void $funcName() {
@@ -148,8 +146,8 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera
            """.stripMargin
           ctx.addNewFunction(funcName, funcCode)
         }
-        ((0 to groupedAccessorsLength - 1).map { i => s"accessors$i();" }.mkString("\n"),
-         (0 to groupedAccessorsLength - 1).map { i => s"extractors$i();" }.mkString("\n"))
+        (accessorNames.map { accessorName => s"$accessorName();" }.mkString("\n"),
+         extractorNames.map { extractorName => s"$extractorName();"}.mkString("\n"))
       }
 
     val codeBody = s"""
@@ -224,6 +222,9 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera
           unsafeRow.setTotalSize(bufferHolder.totalSize());
           return unsafeRow;
         }
+
+        ${ctx.initNestedClasses()}
+        ${ctx.declareNestedClasses()}
       }"""
 
     val code = CodeFormatter.stripOverlappingComments(

http://git-wip-us.apache.org/repos/asf/spark/blob/b32b2123/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
index 26fb610..8445c26 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
@@ -478,7 +478,7 @@ case class SortMergeJoinExec(
          |  }
          |  return false; // unreachable
          |}
-       """.stripMargin)
+       """.stripMargin, inlineToOuterClass = true)
 
     (leftRow, matches)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/b32b2123/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala
index 757fe21..73a0f87 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala
@@ -75,7 +75,7 @@ trait BaseLimitExec extends UnaryExecNode with CodegenSupport {
       protected boolean stopEarly() {
         return $stopEarly;
       }
-    """)
+    """, inlineToOuterClass = true)
     val countTerm = ctx.freshName("count")
     ctx.addMutableState("int", countTerm, s"$countTerm = 0;")
     s"""


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org