You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2017/10/27 20:43:14 UTC

spark git commit: [SPARK-22226][SQL] splitExpression can create too many method calls in the outer class

Repository: spark
Updated Branches:
  refs/heads/master 36b826f5d -> b3d8fc3dc


[SPARK-22226][SQL] splitExpression can create too many method calls in the outer class

## What changes were proposed in this pull request?

SPARK-18016 introduced `NestedClass` to avoid that the many methods generated by `splitExpressions` contribute to the outer class' constant pool, making it growing too much. Unfortunately, despite their definition is stored in the `NestedClass`, they all are invoked in the outer class and for each method invocation, there are two entries added to the constant pool: a `Methodref` and a `Utf8` entry (you can easily check this compiling a simple sample class with `janinoc` and looking at its Constant Pool). This limits the scalability of the solution with very large methods which are split in a lot of small ones. This means that currently we are generating classes like this one:

```
class SpecificUnsafeProjection extends org.apache.spark.sql.catalyst.expressions.UnsafeProjection {
...
  public UnsafeRow apply(InternalRow i) {
     rowWriter.zeroOutNullBytes();
     apply_0(i);
     apply_1(i);
...
    nestedClassInstance.apply_862(i);
    nestedClassInstance.apply_863(i);
...
    nestedClassInstance1.apply_1612(i);
    nestedClassInstance1.apply_1613(i);
...
  }
...
  private class NestedClass {
    private void apply_862(InternalRow i) { ... }
    private void apply_863(InternalRow i) { ... }
...
  }
  private class NestedClass1 {
    private void apply_1612(InternalRow i) { ... }
    private void apply_1613(InternalRow i) { ... }
...
  }
}
```

This PR reduce the Constant Pool size of the outer class by adding a new method to each nested class: in this method we invoke all the small methods generated by `splitExpression` in that nested class. In this way, in the outer class there is only one method invocation per nested class, reducing by orders of magnitude the entries in its constant pool because of method invocations. This means that after the patch the generated code becomes:

```
class SpecificUnsafeProjection extends org.apache.spark.sql.catalyst.expressions.UnsafeProjection {
...
  public UnsafeRow apply(InternalRow i) {
     rowWriter.zeroOutNullBytes();
     apply_0(i);
     apply_1(i);
     ...
     nestedClassInstance.apply(i);
     nestedClassInstance1.apply(i);
     ...
  }
...
  private class NestedClass {
    private void apply_862(InternalRow i) { ... }
    private void apply_863(InternalRow i) { ... }
...
    private void apply(InternalRow i) {
      apply_862(i);
      apply_863(i);
      ...
    }
  }
  private class NestedClass1 {
    private void apply_1612(InternalRow i) { ... }
    private void apply_1613(InternalRow i) { ... }
...
    private void apply(InternalRow i) {
      apply_1612(i);
      apply_1613(i);
      ...
    }
  }
}
```

## How was this patch tested?

Added UT and existing UTs

Author: Marco Gaido <mg...@hortonworks.com>
Author: Marco Gaido <ma...@gmail.com>

Closes #19480 from mgaido91/SPARK-22226.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b3d8fc3d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b3d8fc3d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b3d8fc3d

Branch: refs/heads/master
Commit: b3d8fc3dc458d42cf11d961762ce99f551f68548
Parents: 36b826f
Author: Marco Gaido <mg...@hortonworks.com>
Authored: Fri Oct 27 13:43:09 2017 -0700
Committer: gatorsmile <ga...@gmail.com>
Committed: Fri Oct 27 13:43:09 2017 -0700

----------------------------------------------------------------------
 .../expressions/codegen/CodeGenerator.scala     | 156 ++++++++++++++++---
 .../expressions/CodeGenerationSuite.scala       |  17 ++
 .../org/apache/spark/sql/DataFrameSuite.scala   |  12 ++
 3 files changed, 167 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b3d8fc3d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
index 2cb6659..58738b5 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
@@ -78,6 +78,22 @@ case class SubExprEliminationState(isNull: String, value: String)
 case class SubExprCodes(codes: Seq[String], states: Map[Expression, SubExprEliminationState])
 
 /**
+ * The main information about a new added function.
+ *
+ * @param functionName String representing the name of the function
+ * @param innerClassName Optional value which is empty if the function is added to
+ *                       the outer class, otherwise it contains the name of the
+ *                       inner class in which the function has been added.
+ * @param innerClassInstance Optional value which is empty if the function is added to
+ *                           the outer class, otherwise it contains the name of the
+ *                           instance of the inner class in the outer class.
+ */
+private[codegen] case class NewFunctionSpec(
+    functionName: String,
+    innerClassName: Option[String],
+    innerClassInstance: Option[String])
+
+/**
  * A context for codegen, tracking a list of objects that could be passed into generated Java
  * function.
  */
@@ -228,8 +244,8 @@ class CodegenContext {
   /**
    * Holds the class and instance names to be generated, where `OuterClass` is a placeholder
    * standing for whichever class is generated as the outermost class and which will contain any
-   * nested sub-classes. All other classes and instance names in this list will represent private,
-   * nested sub-classes.
+   * inner sub-classes. All other classes and instance names in this list will represent private,
+   * inner sub-classes.
    */
   private val classes: mutable.ListBuffer[(String, String)] =
     mutable.ListBuffer[(String, String)](outerClassName -> null)
@@ -260,8 +276,8 @@ class CodegenContext {
 
   /**
    * Adds a function to the generated class. If the code for the `OuterClass` grows too large, the
-   * function will be inlined into a new private, nested class, and a class-qualified name for the
-   * function will be returned. Otherwise, the function will be inined to the `OuterClass` the
+   * function will be inlined into a new private, inner class, and a class-qualified name for the
+   * function will be returned. Otherwise, the function will be inlined to the `OuterClass` the
    * simple `funcName` will be returned.
    *
    * @param funcName the class-unqualified name of the function
@@ -271,19 +287,27 @@ class CodegenContext {
    *                           it is eventually referenced and a returned qualified function name
    *                           cannot otherwise be accessed.
    * @return the name of the function, qualified by class if it will be inlined to a private,
-   *         nested sub-class
+   *         inner class
    */
   def addNewFunction(
       funcName: String,
       funcCode: String,
       inlineToOuterClass: Boolean = false): String = {
-    // The number of named constants that can exist in the class is limited by the Constant Pool
-    // limit, 65,536. We cannot know how many constants will be inserted for a class, so we use a
-    // threshold of 1600k bytes to determine when a function should be inlined to a private, nested
-    // sub-class.
+    val newFunction = addNewFunctionInternal(funcName, funcCode, inlineToOuterClass)
+    newFunction match {
+      case NewFunctionSpec(functionName, None, None) => functionName
+      case NewFunctionSpec(functionName, Some(_), Some(innerClassInstance)) =>
+        innerClassInstance + "." + functionName
+    }
+  }
+
+  private[this] def addNewFunctionInternal(
+      funcName: String,
+      funcCode: String,
+      inlineToOuterClass: Boolean): NewFunctionSpec = {
     val (className, classInstance) = if (inlineToOuterClass) {
       outerClassName -> ""
-    } else if (currClassSize > 1600000) {
+    } else if (currClassSize > CodeGenerator.GENERATED_CLASS_SIZE_THRESHOLD) {
       val className = freshName("NestedClass")
       val classInstance = freshName("nestedClassInstance")
 
@@ -294,17 +318,23 @@ class CodegenContext {
       currClass()
     }
 
-    classSize(className) += funcCode.length
-    classFunctions(className) += funcName -> funcCode
+    addNewFunctionToClass(funcName, funcCode, className)
 
     if (className == outerClassName) {
-      funcName
+      NewFunctionSpec(funcName, None, None)
     } else {
-
-      s"$classInstance.$funcName"
+      NewFunctionSpec(funcName, Some(className), Some(classInstance))
     }
   }
 
+  private[this] def addNewFunctionToClass(
+      funcName: String,
+      funcCode: String,
+      className: String) = {
+    classSize(className) += funcCode.length
+    classFunctions(className) += funcName -> funcCode
+  }
+
   /**
    * Declares all function code. If the added functions are too many, split them into nested
    * sub-classes to avoid hitting Java compiler constant pool limitation.
@@ -738,7 +768,7 @@ class CodegenContext {
   /**
    * Splits the generated code of expressions into multiple functions, because function has
    * 64kb code size limit in JVM. If the class to which the function would be inlined would grow
-   * beyond 1600kb, we declare a private, nested sub-class, and the function is inlined to it
+   * beyond 1000kb, we declare a private, inner sub-class, and the function is inlined to it
    * instead, because classes have a constant pool limit of 65,536 named values.
    *
    * @param row the variable name of row that is used by expressions
@@ -801,10 +831,90 @@ class CodegenContext {
            |  ${makeSplitFunction(body)}
            |}
          """.stripMargin
-        addNewFunction(name, code)
+        addNewFunctionInternal(name, code, inlineToOuterClass = false)
       }
 
-      foldFunctions(functions.map(name => s"$name(${arguments.map(_._2).mkString(", ")})"))
+      val (outerClassFunctions, innerClassFunctions) = functions.partition(_.innerClassName.isEmpty)
+
+      val argsString = arguments.map(_._2).mkString(", ")
+      val outerClassFunctionCalls = outerClassFunctions.map(f => s"${f.functionName}($argsString)")
+
+      val innerClassFunctionCalls = generateInnerClassesFunctionCalls(
+        innerClassFunctions,
+        func,
+        arguments,
+        returnType,
+        makeSplitFunction,
+        foldFunctions)
+
+      foldFunctions(outerClassFunctionCalls ++ innerClassFunctionCalls)
+    }
+  }
+
+  /**
+   * Here we handle all the methods which have been added to the inner classes and
+   * not to the outer class.
+   * Since they can be many, their direct invocation in the outer class adds many entries
+   * to the outer class' constant pool. This can cause the constant pool to past JVM limit.
+   * Moreover, this can cause also the outer class method where all the invocations are
+   * performed to grow beyond the 64k limit.
+   * To avoid these problems, we group them and we call only the grouping methods in the
+   * outer class.
+   *
+   * @param functions a [[Seq]] of [[NewFunctionSpec]] defined in the inner classes
+   * @param funcName the split function name base.
+   * @param arguments the list of (type, name) of the arguments of the split function.
+   * @param returnType the return type of the split function.
+   * @param makeSplitFunction makes split function body, e.g. add preparation or cleanup.
+   * @param foldFunctions folds the split function calls.
+   * @return an [[Iterable]] containing the methods' invocations
+   */
+  private def generateInnerClassesFunctionCalls(
+      functions: Seq[NewFunctionSpec],
+      funcName: String,
+      arguments: Seq[(String, String)],
+      returnType: String,
+      makeSplitFunction: String => String,
+      foldFunctions: Seq[String] => String): Iterable[String] = {
+    val innerClassToFunctions = mutable.LinkedHashMap.empty[(String, String), Seq[String]]
+    functions.foreach(f => {
+      val key = (f.innerClassName.get, f.innerClassInstance.get)
+      val value = f.functionName +: innerClassToFunctions.getOrElse(key, Seq.empty[String])
+      innerClassToFunctions.put(key, value)
+    })
+
+    val argDefinitionString = arguments.map { case (t, name) => s"$t $name" }.mkString(", ")
+    val argInvocationString = arguments.map(_._2).mkString(", ")
+
+    innerClassToFunctions.flatMap {
+      case ((innerClassName, innerClassInstance), innerClassFunctions) =>
+        // for performance reasons, the functions are prepended, instead of appended,
+        // thus here they are in reversed order
+        val orderedFunctions = innerClassFunctions.reverse
+        if (orderedFunctions.size > CodeGenerator.MERGE_SPLIT_METHODS_THRESHOLD) {
+          // Adding a new function to each inner class which contains the invocation of all the
+          // ones which have been added to that inner class. For example,
+          //   private class NestedClass {
+          //     private void apply_862(InternalRow i) { ... }
+          //     private void apply_863(InternalRow i) { ... }
+          //       ...
+          //     private void apply(InternalRow i) {
+          //       apply_862(i);
+          //       apply_863(i);
+          //       ...
+          //     }
+          //   }
+          val body = foldFunctions(orderedFunctions.map(name => s"$name($argInvocationString)"))
+          val code = s"""
+              |private $returnType $funcName($argDefinitionString) {
+              |  ${makeSplitFunction(body)}
+              |}
+            """.stripMargin
+          addNewFunctionToClass(funcName, code, innerClassName)
+          Seq(s"$innerClassInstance.$funcName($argInvocationString)")
+        } else {
+          orderedFunctions.map(f => s"$innerClassInstance.$f($argInvocationString)")
+        }
     }
   }
 
@@ -1013,6 +1123,16 @@ object CodeGenerator extends Logging {
   // This is the value of HugeMethodLimit in the OpenJDK JVM settings
   val DEFAULT_JVM_HUGE_METHOD_LIMIT = 8000
 
+  // This is the threshold over which the methods in an inner class are grouped in a single
+  // method which is going to be called by the outer class instead of the many small ones
+  val MERGE_SPLIT_METHODS_THRESHOLD = 3
+
+  // The number of named constants that can exist in the class is limited by the Constant Pool
+  // limit, 65,536. We cannot know how many constants will be inserted for a class, so we use a
+  // threshold of 1000k bytes to determine when a function should be inlined to a private, inner
+  // class.
+  val GENERATED_CLASS_SIZE_THRESHOLD = 1000000
+
   /**
    * Compile the Java source code into a Java class, using Janino.
    *

http://git-wip-us.apache.org/repos/asf/spark/blob/b3d8fc3d/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala
index 7ea0bec..1e6f7b6 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala
@@ -201,6 +201,23 @@ class CodeGenerationSuite extends SparkFunSuite with ExpressionEvalHelper {
     }
   }
 
+  test("SPARK-22226: group splitted expressions into one method per nested class") {
+    val length = 10000
+    val expressions = Seq.fill(length) {
+      ToUTCTimestamp(
+        Literal.create(Timestamp.valueOf("2017-10-10 00:00:00"), TimestampType),
+        Literal.create("PST", StringType))
+    }
+    val plan = GenerateMutableProjection.generate(expressions)
+    val actual = plan(new GenericInternalRow(length)).toSeq(expressions.map(_.dataType))
+    val expected = Seq.fill(length)(
+      DateTimeUtils.fromJavaTimestamp(Timestamp.valueOf("2017-10-10 07:00:00")))
+
+    if (actual != expected) {
+      fail(s"Incorrect Evaluation: expressions: $expressions, actual: $actual, expected: $expected")
+    }
+  }
+
   test("test generated safe and unsafe projection") {
     val schema = new StructType(Array(
       StructField("a", StringType, true),

http://git-wip-us.apache.org/repos/asf/spark/blob/b3d8fc3d/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index 473c355..17c88b0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -2106,6 +2106,18 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
       Seq(Row(7, 1, 1), Row(7, 1, 2), Row(7, 2, 1), Row(7, 2, 2), Row(7, 3, 1), Row(7, 3, 2)))
   }
 
+  test("SPARK-22226: splitExpressions should not generate codes beyond 64KB") {
+    val colNumber = 10000
+    val input = spark.range(2).rdd.map(_ => Row(1 to colNumber: _*))
+    val df = sqlContext.createDataFrame(input, StructType(
+      (1 to colNumber).map(colIndex => StructField(s"_$colIndex", IntegerType, false))))
+    val newCols = (1 to colNumber).flatMap { colIndex =>
+      Seq(expr(s"if(1000 < _$colIndex, 1000, _$colIndex)"),
+        expr(s"sqrt(_$colIndex)"))
+    }
+    df.select(newCols: _*).collect()
+  }
+
   test("SPARK-22271: mean overflows and returns null for some decimal variables") {
     val d = 0.034567890
     val df = Seq(d, d, d, d, d, d, d, d, d, d).toDF("DecimalCol")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org