You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by da...@apache.org on 2016/04/02 07:38:14 UTC

spark git commit: [SPARK-14138] [SQL] [MASTER] Fix generated SpecificColumnarIterator code can exceed JVM size limit for cached DataFrames

Repository: spark
Updated Branches:
  refs/heads/master 27e71a2cd -> 877dc712e


[SPARK-14138] [SQL] [MASTER] Fix generated SpecificColumnarIterator code can exceed JVM size limit for cached DataFrames

## What changes were proposed in this pull request?

This PR reduces Java byte code size of method in ```SpecificColumnarIterator``` by using a approach to make a group for  lot of ```ColumnAccessor``` instantiations or method calls (more than 200) into a method

## How was this patch tested?

Added a new unit test, which includes large instantiations and method calls, to ```InMemoryColumnarQuerySuite```

Author: Kazuaki Ishizaki <is...@jp.ibm.com>

Closes #12108 from kiszk/SPARK-14138-master.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/877dc712
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/877dc712
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/877dc712

Branch: refs/heads/master
Commit: 877dc712e66db69cb320e10ba5edebca401591e3
Parents: 27e71a2
Author: Kazuaki Ishizaki <is...@jp.ibm.com>
Authored: Fri Apr 1 22:38:07 2016 -0700
Committer: Davies Liu <da...@gmail.com>
Committed: Fri Apr 1 22:38:07 2016 -0700

----------------------------------------------------------------------
 .../columnar/GenerateColumnAccessor.scala       | 46 +++++++++++++++++---
 .../columnar/InMemoryColumnarQuerySuite.scala   | 10 +++++
 2 files changed, 51 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/877dc712/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala
index d4e5db4..e2e33e3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala
@@ -88,7 +88,7 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera
         case array: ArrayType => classOf[ArrayColumnAccessor].getName
         case t: MapType => classOf[MapColumnAccessor].getName
       }
-      ctx.addMutableState(accessorCls, accessorName, s"$accessorName = null;")
+      ctx.addMutableState(accessorCls, accessorName, "")
 
       val createCode = dt match {
         case t if ctx.isPrimitiveType(dt) =>
@@ -114,6 +114,42 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera
       (createCode, extract + patch)
     }.unzip
 
+    /*
+     * 200 = 6000 bytes / 30 (up to 30 bytes per one call))
+     * the maximum byte code size to be compiled for HotSpot is 8000.
+     * We should keep less than 8000
+     */
+    val numberOfStatementsThreshold = 200
+    val (initializerAccessorCalls, extractorCalls) =
+      if (initializeAccessors.length <= numberOfStatementsThreshold) {
+        (initializeAccessors.mkString("\n"), extractors.mkString("\n"))
+      } else {
+        val groupedAccessorsItr = initializeAccessors.grouped(numberOfStatementsThreshold)
+        val groupedExtractorsItr = extractors.grouped(numberOfStatementsThreshold)
+        var groupedAccessorsLength = 0
+        groupedAccessorsItr.zipWithIndex.map { case (body, i) =>
+          groupedAccessorsLength += 1
+          val funcName = s"accessors$i"
+          val funcCode = s"""
+             |private void $funcName() {
+             |  ${body.mkString("\n")}
+             |}
+           """.stripMargin
+          ctx.addNewFunction(funcName, funcCode)
+        }
+        groupedExtractorsItr.zipWithIndex.map { case (body, i) =>
+          val funcName = s"extractors$i"
+          val funcCode = s"""
+             |private void $funcName() {
+             |  ${body.mkString("\n")}
+             |}
+           """.stripMargin
+          ctx.addNewFunction(funcName, funcCode)
+        }
+        ((0 to groupedAccessorsLength - 1).map { i => s"accessors$i();" }.mkString("\n"),
+         (0 to groupedAccessorsLength - 1).map { i => s"extractors$i();" }.mkString("\n"))
+      }
+
     val code = s"""
       import java.nio.ByteBuffer;
       import java.nio.ByteOrder;
@@ -149,8 +185,6 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera
           this.nativeOrder = ByteOrder.nativeOrder();
           this.buffers = new byte[${columnTypes.length}][];
           this.mutableRow = new MutableUnsafeRow(rowWriter);
-
-          ${ctx.initMutableStates()}
         }
 
         public void initialize(Iterator input, DataType[] columnTypes, int[] columnIndexes) {
@@ -159,6 +193,8 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera
           this.columnIndexes = columnIndexes;
         }
 
+        ${ctx.declareAddedFunctions()}
+
         public boolean hasNext() {
           if (currentRow < numRowsInBatch) {
             return true;
@@ -173,7 +209,7 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera
           for (int i = 0; i < columnIndexes.length; i ++) {
             buffers[i] = batch.buffers()[columnIndexes[i]];
           }
-          ${initializeAccessors.mkString("\n")}
+          ${initializerAccessorCalls}
 
           return hasNext();
         }
@@ -182,7 +218,7 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera
           currentRow += 1;
           bufferHolder.reset();
           rowWriter.zeroOutNullBytes();
-          ${extractors.mkString("\n")}
+          ${extractorCalls}
           unsafeRow.setTotalSize(bufferHolder.totalSize());
           return unsafeRow;
         }

http://git-wip-us.apache.org/repos/asf/spark/blob/877dc712/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
index 9e04caf..50c8745 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
@@ -220,4 +220,14 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext {
     assert(data.count() === 10)
     assert(data.filter($"s" === "3").count() === 1)
   }
+
+  test("SPARK-14138: Generated SpecificColumnarIterator can exceed JVM size limit for cached DF") {
+    val length1 = 3999
+    val columnTypes1 = List.fill(length1)(IntegerType)
+    val columnarIterator1 = GenerateColumnAccessor.generate(columnTypes1)
+
+    val length2 = 10000
+    val columnTypes2 = List.fill(length2)(IntegerType)
+    val columnarIterator2 = GenerateColumnAccessor.generate(columnTypes2)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org