You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by hv...@apache.org on 2017/08/11 20:01:05 UTC

spark git commit: [SPARK-21595] Separate thresholds for buffering and spilling in ExternalAppendOnlyUnsafeRowArray

Repository: spark
Updated Branches:
  refs/heads/master 0377338bf -> 94439997d


[SPARK-21595] Separate thresholds for buffering and spilling in ExternalAppendOnlyUnsafeRowArray

## What changes were proposed in this pull request?

[SPARK-21595](https://issues.apache.org/jira/browse/SPARK-21595) reported that there is excessive spilling to disk due to default spill threshold for `ExternalAppendOnlyUnsafeRowArray` being quite small for WINDOW operator. Old behaviour of WINDOW operator (pre https://github.com/apache/spark/pull/16909) would hold data in an array for first 4096 records post which it would switch to `UnsafeExternalSorter` and start spilling to disk after reaching `spark.shuffle.spill.numElementsForceSpillThreshold` (or earlier if there was paucity of memory due to excessive consumers).

Currently the (switch from in-memory to `UnsafeExternalSorter`) and (`UnsafeExternalSorter` spilling to disk) for `ExternalAppendOnlyUnsafeRowArray` is controlled by a single threshold. This PR aims to separate that to have more granular control.

## How was this patch tested?

Added unit tests

Author: Tejas Patil <te...@fb.com>

Closes #18843 from tejasapatil/SPARK-21595.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/94439997
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/94439997
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/94439997

Branch: refs/heads/master
Commit: 94439997d57875838a8283c543f9b44705d3a503
Parents: 0377338
Author: Tejas Patil <te...@fb.com>
Authored: Fri Aug 11 22:01:00 2017 +0200
Committer: Herman van Hovell <hv...@databricks.com>
Committed: Fri Aug 11 22:01:00 2017 +0200

----------------------------------------------------------------------
 .../org/apache/spark/sql/internal/SQLConf.scala |  41 +++++++-
 .../ExternalAppendOnlyUnsafeRowArray.scala      |  28 ++---
 .../execution/joins/CartesianProductExec.scala  |  12 ++-
 .../sql/execution/joins/SortMergeJoinExec.scala |  24 ++++-
 .../spark/sql/execution/window/WindowExec.scala |   4 +-
 .../scala/org/apache/spark/sql/JoinSuite.scala  |   3 +-
 ...ernalAppendOnlyUnsafeRowArrayBenchmark.scala |   7 +-
 .../ExternalAppendOnlyUnsafeRowArraySuite.scala | 103 ++++++++++++-------
 .../sql/execution/SQLWindowFunctionSuite.scala  |   3 +-
 9 files changed, 155 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/94439997/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index ecb941c..733d80e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -844,24 +844,47 @@ object SQLConf {
       .stringConf
       .createWithDefaultFunction(() => TimeZone.getDefault.getID)
 
+  val WINDOW_EXEC_BUFFER_IN_MEMORY_THRESHOLD =
+    buildConf("spark.sql.windowExec.buffer.in.memory.threshold")
+      .internal()
+      .doc("Threshold for number of rows guaranteed to be held in memory by the window operator")
+      .intConf
+      .createWithDefault(4096)
+
   val WINDOW_EXEC_BUFFER_SPILL_THRESHOLD =
     buildConf("spark.sql.windowExec.buffer.spill.threshold")
       .internal()
-      .doc("Threshold for number of rows buffered in window operator")
+      .doc("Threshold for number of rows to be spilled by window operator")
       .intConf
-      .createWithDefault(4096)
+      .createWithDefault(UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD.toInt)
+
+  val SORT_MERGE_JOIN_EXEC_BUFFER_IN_MEMORY_THRESHOLD =
+    buildConf("spark.sql.sortMergeJoinExec.buffer.in.memory.threshold")
+      .internal()
+      .doc("Threshold for number of rows guaranteed to be held in memory by the sort merge " +
+        "join operator")
+      .intConf
+      .createWithDefault(Int.MaxValue)
 
   val SORT_MERGE_JOIN_EXEC_BUFFER_SPILL_THRESHOLD =
     buildConf("spark.sql.sortMergeJoinExec.buffer.spill.threshold")
       .internal()
-      .doc("Threshold for number of rows buffered in sort merge join operator")
+      .doc("Threshold for number of rows to be spilled by sort merge join operator")
       .intConf
-      .createWithDefault(Int.MaxValue)
+      .createWithDefault(UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD.toInt)
+
+  val CARTESIAN_PRODUCT_EXEC_BUFFER_IN_MEMORY_THRESHOLD =
+    buildConf("spark.sql.cartesianProductExec.buffer.in.memory.threshold")
+      .internal()
+      .doc("Threshold for number of rows guaranteed to be held in memory by the cartesian " +
+        "product operator")
+      .intConf
+      .createWithDefault(4096)
 
   val CARTESIAN_PRODUCT_EXEC_BUFFER_SPILL_THRESHOLD =
     buildConf("spark.sql.cartesianProductExec.buffer.spill.threshold")
       .internal()
-      .doc("Threshold for number of rows buffered in cartesian product operator")
+      .doc("Threshold for number of rows to be spilled by cartesian product operator")
       .intConf
       .createWithDefault(UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD.toInt)
 
@@ -1137,11 +1160,19 @@ class SQLConf extends Serializable with Logging {
 
   def joinReorderDPStarFilter: Boolean = getConf(SQLConf.JOIN_REORDER_DP_STAR_FILTER)
 
+  def windowExecBufferInMemoryThreshold: Int = getConf(WINDOW_EXEC_BUFFER_IN_MEMORY_THRESHOLD)
+
   def windowExecBufferSpillThreshold: Int = getConf(WINDOW_EXEC_BUFFER_SPILL_THRESHOLD)
 
+  def sortMergeJoinExecBufferInMemoryThreshold: Int =
+    getConf(SORT_MERGE_JOIN_EXEC_BUFFER_IN_MEMORY_THRESHOLD)
+
   def sortMergeJoinExecBufferSpillThreshold: Int =
     getConf(SORT_MERGE_JOIN_EXEC_BUFFER_SPILL_THRESHOLD)
 
+  def cartesianProductExecBufferInMemoryThreshold: Int =
+    getConf(CARTESIAN_PRODUCT_EXEC_BUFFER_IN_MEMORY_THRESHOLD)
+
   def cartesianProductExecBufferSpillThreshold: Int =
     getConf(CARTESIAN_PRODUCT_EXEC_BUFFER_SPILL_THRESHOLD)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/94439997/sql/core/src/main/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArray.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArray.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArray.scala
index c4d3834..ac282ea 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArray.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArray.scala
@@ -31,16 +31,16 @@ import org.apache.spark.storage.BlockManager
 import org.apache.spark.util.collection.unsafe.sort.{UnsafeExternalSorter, UnsafeSorterIterator}
 
 /**
- * An append-only array for [[UnsafeRow]]s that spills content to disk when there a predefined
- * threshold of rows is reached.
+ * An append-only array for [[UnsafeRow]]s that strictly keeps content in an in-memory array
+ * until [[numRowsInMemoryBufferThreshold]] is reached post which it will switch to a mode which
+ * would flush to disk after [[numRowsSpillThreshold]] is met (or before if there is
+ * excessive memory consumption). Setting these threshold involves following trade-offs:
  *
- * Setting spill threshold faces following trade-off:
- *
- * - If the spill threshold is too high, the in-memory array may occupy more memory than is
- *   available, resulting in OOM.
- * - If the spill threshold is too low, we spill frequently and incur unnecessary disk writes.
- *   This may lead to a performance regression compared to the normal case of using an
- *   [[ArrayBuffer]] or [[Array]].
+ * - If [[numRowsInMemoryBufferThreshold]] is too high, the in-memory array may occupy more memory
+ *   than is available, resulting in OOM.
+ * - If [[numRowsSpillThreshold]] is too low, data will be spilled frequently and lead to
+ *   excessive disk writes. This may lead to a performance regression compared to the normal case
+ *   of using an [[ArrayBuffer]] or [[Array]].
  */
 private[sql] class ExternalAppendOnlyUnsafeRowArray(
     taskMemoryManager: TaskMemoryManager,
@@ -49,9 +49,10 @@ private[sql] class ExternalAppendOnlyUnsafeRowArray(
     taskContext: TaskContext,
     initialSize: Int,
     pageSizeBytes: Long,
+    numRowsInMemoryBufferThreshold: Int,
     numRowsSpillThreshold: Int) extends Logging {
 
-  def this(numRowsSpillThreshold: Int) {
+  def this(numRowsInMemoryBufferThreshold: Int, numRowsSpillThreshold: Int) {
     this(
       TaskContext.get().taskMemoryManager(),
       SparkEnv.get.blockManager,
@@ -59,11 +60,12 @@ private[sql] class ExternalAppendOnlyUnsafeRowArray(
       TaskContext.get(),
       1024,
       SparkEnv.get.memoryManager.pageSizeBytes,
+      numRowsInMemoryBufferThreshold,
       numRowsSpillThreshold)
   }
 
   private val initialSizeOfInMemoryBuffer =
-    Math.min(DefaultInitialSizeOfInMemoryBuffer, numRowsSpillThreshold)
+    Math.min(DefaultInitialSizeOfInMemoryBuffer, numRowsInMemoryBufferThreshold)
 
   private val inMemoryBuffer = if (initialSizeOfInMemoryBuffer > 0) {
     new ArrayBuffer[UnsafeRow](initialSizeOfInMemoryBuffer)
@@ -102,11 +104,11 @@ private[sql] class ExternalAppendOnlyUnsafeRowArray(
   }
 
   def add(unsafeRow: UnsafeRow): Unit = {
-    if (numRows < numRowsSpillThreshold) {
+    if (numRows < numRowsInMemoryBufferThreshold) {
       inMemoryBuffer += unsafeRow.copy()
     } else {
       if (spillableArray == null) {
-        logInfo(s"Reached spill threshold of $numRowsSpillThreshold rows, switching to " +
+        logInfo(s"Reached spill threshold of $numRowsInMemoryBufferThreshold rows, switching to " +
           s"${classOf[UnsafeExternalSorter].getName}")
 
         // We will not sort the rows, so prefixComparator and recordComparator are null

http://git-wip-us.apache.org/repos/asf/spark/blob/94439997/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProductExec.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProductExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProductExec.scala
index f380986..4d261dd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProductExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProductExec.scala
@@ -35,11 +35,12 @@ class UnsafeCartesianRDD(
     left : RDD[UnsafeRow],
     right : RDD[UnsafeRow],
     numFieldsOfRight: Int,
+    inMemoryBufferThreshold: Int,
     spillThreshold: Int)
   extends CartesianRDD[UnsafeRow, UnsafeRow](left.sparkContext, left, right) {
 
   override def compute(split: Partition, context: TaskContext): Iterator[(UnsafeRow, UnsafeRow)] = {
-    val rowArray = new ExternalAppendOnlyUnsafeRowArray(spillThreshold)
+    val rowArray = new ExternalAppendOnlyUnsafeRowArray(inMemoryBufferThreshold, spillThreshold)
 
     val partition = split.asInstanceOf[CartesianPartition]
     rdd2.iterator(partition.s2, context).foreach(rowArray.add)
@@ -71,9 +72,12 @@ case class CartesianProductExec(
     val leftResults = left.execute().asInstanceOf[RDD[UnsafeRow]]
     val rightResults = right.execute().asInstanceOf[RDD[UnsafeRow]]
 
-    val spillThreshold = sqlContext.conf.cartesianProductExecBufferSpillThreshold
-
-    val pair = new UnsafeCartesianRDD(leftResults, rightResults, right.output.size, spillThreshold)
+    val pair = new UnsafeCartesianRDD(
+      leftResults,
+      rightResults,
+      right.output.size,
+      sqlContext.conf.cartesianProductExecBufferInMemoryThreshold,
+      sqlContext.conf.cartesianProductExecBufferSpillThreshold)
     pair.mapPartitionsWithIndexInternal { (index, iter) =>
       val joiner = GenerateUnsafeRowJoiner.create(left.schema, right.schema)
       val filtered = if (condition.isDefined) {

http://git-wip-us.apache.org/repos/asf/spark/blob/94439997/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
index f41fa14..91d214e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
@@ -130,9 +130,14 @@ case class SortMergeJoinExec(
     sqlContext.conf.sortMergeJoinExecBufferSpillThreshold
   }
 
+  private def getInMemoryThreshold: Int = {
+    sqlContext.conf.sortMergeJoinExecBufferInMemoryThreshold
+  }
+
   protected override def doExecute(): RDD[InternalRow] = {
     val numOutputRows = longMetric("numOutputRows")
     val spillThreshold = getSpillThreshold
+    val inMemoryThreshold = getInMemoryThreshold
     left.execute().zipPartitions(right.execute()) { (leftIter, rightIter) =>
       val boundCondition: (InternalRow) => Boolean = {
         condition.map { cond =>
@@ -158,6 +163,7 @@ case class SortMergeJoinExec(
               keyOrdering,
               RowIterator.fromScala(leftIter),
               RowIterator.fromScala(rightIter),
+              inMemoryThreshold,
               spillThreshold
             )
             private[this] val joinRow = new JoinedRow
@@ -201,6 +207,7 @@ case class SortMergeJoinExec(
             keyOrdering,
             streamedIter = RowIterator.fromScala(leftIter),
             bufferedIter = RowIterator.fromScala(rightIter),
+            inMemoryThreshold,
             spillThreshold
           )
           val rightNullRow = new GenericInternalRow(right.output.length)
@@ -214,6 +221,7 @@ case class SortMergeJoinExec(
             keyOrdering,
             streamedIter = RowIterator.fromScala(rightIter),
             bufferedIter = RowIterator.fromScala(leftIter),
+            inMemoryThreshold,
             spillThreshold
           )
           val leftNullRow = new GenericInternalRow(left.output.length)
@@ -247,6 +255,7 @@ case class SortMergeJoinExec(
               keyOrdering,
               RowIterator.fromScala(leftIter),
               RowIterator.fromScala(rightIter),
+              inMemoryThreshold,
               spillThreshold
             )
             private[this] val joinRow = new JoinedRow
@@ -281,6 +290,7 @@ case class SortMergeJoinExec(
               keyOrdering,
               RowIterator.fromScala(leftIter),
               RowIterator.fromScala(rightIter),
+              inMemoryThreshold,
               spillThreshold
             )
             private[this] val joinRow = new JoinedRow
@@ -322,6 +332,7 @@ case class SortMergeJoinExec(
               keyOrdering,
               RowIterator.fromScala(leftIter),
               RowIterator.fromScala(rightIter),
+              inMemoryThreshold,
               spillThreshold
             )
             private[this] val joinRow = new JoinedRow
@@ -420,8 +431,10 @@ case class SortMergeJoinExec(
     val clsName = classOf[ExternalAppendOnlyUnsafeRowArray].getName
 
     val spillThreshold = getSpillThreshold
+    val inMemoryThreshold = getInMemoryThreshold
 
-    ctx.addMutableState(clsName, matches, s"$matches = new $clsName($spillThreshold);")
+    ctx.addMutableState(clsName, matches,
+      s"$matches = new $clsName($inMemoryThreshold, $spillThreshold);")
     // Copy the left keys as class members so they could be used in next function call.
     val matchedKeyVars = copyKeys(ctx, leftKeyVars)
 
@@ -626,6 +639,9 @@ case class SortMergeJoinExec(
  * @param streamedIter an input whose rows will be streamed.
  * @param bufferedIter an input whose rows will be buffered to construct sequences of rows that
  *                     have the same join key.
+ * @param inMemoryThreshold Threshold for number of rows guaranteed to be held in memory by
+ *                          internal buffer
+ * @param spillThreshold Threshold for number of rows to be spilled by internal buffer
  */
 private[joins] class SortMergeJoinScanner(
     streamedKeyGenerator: Projection,
@@ -633,7 +649,8 @@ private[joins] class SortMergeJoinScanner(
     keyOrdering: Ordering[InternalRow],
     streamedIter: RowIterator,
     bufferedIter: RowIterator,
-    bufferThreshold: Int) {
+    inMemoryThreshold: Int,
+    spillThreshold: Int) {
   private[this] var streamedRow: InternalRow = _
   private[this] var streamedRowKey: InternalRow = _
   private[this] var bufferedRow: InternalRow = _
@@ -644,7 +661,8 @@ private[joins] class SortMergeJoinScanner(
    */
   private[this] var matchJoinKey: InternalRow = _
   /** Buffered rows from the buffered side of the join. This is empty if there are no matches. */
-  private[this] val bufferedMatches = new ExternalAppendOnlyUnsafeRowArray(bufferThreshold)
+  private[this] val bufferedMatches =
+    new ExternalAppendOnlyUnsafeRowArray(inMemoryThreshold, spillThreshold)
 
   // Initialization (note: do _not_ want to advance streamed here).
   advancedBufferedToRowWithNullFreeJoinKey()

http://git-wip-us.apache.org/repos/asf/spark/blob/94439997/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala
index f8bb667..800a2ea 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala
@@ -292,6 +292,7 @@ case class WindowExec(
     // Unwrap the expressions and factories from the map.
     val expressions = windowFrameExpressionFactoryPairs.flatMap(_._1)
     val factories = windowFrameExpressionFactoryPairs.map(_._2).toArray
+    val inMemoryThreshold = sqlContext.conf.windowExecBufferInMemoryThreshold
     val spillThreshold = sqlContext.conf.windowExecBufferSpillThreshold
 
     // Start processing.
@@ -322,7 +323,8 @@ case class WindowExec(
         val inputFields = child.output.length
 
         val buffer: ExternalAppendOnlyUnsafeRowArray =
-          new ExternalAppendOnlyUnsafeRowArray(spillThreshold)
+          new ExternalAppendOnlyUnsafeRowArray(inMemoryThreshold, spillThreshold)
+
         var bufferIterator: Iterator[UnsafeRow] = _
 
         val windowFunctionResult = new SpecificInternalRow(expressions.map(_.dataType))

http://git-wip-us.apache.org/repos/asf/spark/blob/94439997/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
index 895ca19..0008d50 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
@@ -665,7 +665,8 @@ class JoinSuite extends QueryTest with SharedSQLContext {
 
   test("test SortMergeJoin (with spill)") {
     withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "1",
-      "spark.sql.sortMergeJoinExec.buffer.spill.threshold" -> "0") {
+      "spark.sql.sortMergeJoinExec.buffer.in.memory.threshold" -> "0",
+      "spark.sql.sortMergeJoinExec.buffer.spill.threshold" -> "1") {
 
       assertSpilled(sparkContext, "inner join") {
         checkAnswer(

http://git-wip-us.apache.org/repos/asf/spark/blob/94439997/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArrayBenchmark.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArrayBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArrayBenchmark.scala
index 031ac38..efe28af 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArrayBenchmark.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArrayBenchmark.scala
@@ -67,7 +67,10 @@ object ExternalAppendOnlyUnsafeRowArrayBenchmark {
     benchmark.addCase("ExternalAppendOnlyUnsafeRowArray") { _: Int =>
       var sum = 0L
       for (_ <- 0L until iterations) {
-        val array = new ExternalAppendOnlyUnsafeRowArray(numSpillThreshold)
+        val array = new ExternalAppendOnlyUnsafeRowArray(
+          ExternalAppendOnlyUnsafeRowArray.DefaultInitialSizeOfInMemoryBuffer,
+          numSpillThreshold)
+
         rows.foreach(x => array.add(x))
 
         val iterator = array.generateIterator()
@@ -143,7 +146,7 @@ object ExternalAppendOnlyUnsafeRowArrayBenchmark {
     benchmark.addCase("ExternalAppendOnlyUnsafeRowArray") { _: Int =>
       var sum = 0L
       for (_ <- 0L until iterations) {
-        val array = new ExternalAppendOnlyUnsafeRowArray(numSpillThreshold)
+        val array = new ExternalAppendOnlyUnsafeRowArray(numSpillThreshold, numSpillThreshold)
         rows.foreach(x => array.add(x))
 
         val iterator = array.generateIterator()

http://git-wip-us.apache.org/repos/asf/spark/blob/94439997/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArraySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArraySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArraySuite.scala
index 53c4163..ecc7264 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArraySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArraySuite.scala
@@ -31,7 +31,7 @@ class ExternalAppendOnlyUnsafeRowArraySuite extends SparkFunSuite with LocalSpar
 
   override def afterAll(): Unit = TaskContext.unset()
 
-  private def withExternalArray(spillThreshold: Int)
+  private def withExternalArray(inMemoryThreshold: Int, spillThreshold: Int)
                                (f: ExternalAppendOnlyUnsafeRowArray => Unit): Unit = {
     sc = new SparkContext("local", "test", new SparkConf(false))
 
@@ -45,6 +45,7 @@ class ExternalAppendOnlyUnsafeRowArraySuite extends SparkFunSuite with LocalSpar
       taskContext,
       1024,
       SparkEnv.get.memoryManager.pageSizeBytes,
+      inMemoryThreshold,
       spillThreshold)
     try f(array) finally {
       array.clear()
@@ -109,9 +110,9 @@ class ExternalAppendOnlyUnsafeRowArraySuite extends SparkFunSuite with LocalSpar
     assert(getNumBytesSpilled > 0)
   }
 
-  test("insert rows less than the spillThreshold") {
-    val spillThreshold = 100
-    withExternalArray(spillThreshold) { array =>
+  test("insert rows less than the inMemoryThreshold") {
+    val (inMemoryThreshold, spillThreshold) = (100, 50)
+    withExternalArray(inMemoryThreshold, spillThreshold) { array =>
       assert(array.isEmpty)
 
       val expectedValues = populateRows(array, 1)
@@ -122,8 +123,8 @@ class ExternalAppendOnlyUnsafeRowArraySuite extends SparkFunSuite with LocalSpar
 
       // Add more rows (but not too many to trigger switch to [[UnsafeExternalSorter]])
       // Verify that NO spill has happened
-      populateRows(array, spillThreshold - 1, expectedValues)
-      assert(array.length == spillThreshold)
+      populateRows(array, inMemoryThreshold - 1, expectedValues)
+      assert(array.length == inMemoryThreshold)
       assertNoSpill()
 
       val iterator2 = validateData(array, expectedValues)
@@ -133,20 +134,42 @@ class ExternalAppendOnlyUnsafeRowArraySuite extends SparkFunSuite with LocalSpar
     }
   }
 
-  test("insert rows more than the spillThreshold to force spill") {
-    val spillThreshold = 100
-    withExternalArray(spillThreshold) { array =>
-      val numValuesInserted = 20 * spillThreshold
-
+  test("insert rows more than the inMemoryThreshold but less than spillThreshold") {
+    val (inMemoryThreshold, spillThreshold) = (10, 50)
+    withExternalArray(inMemoryThreshold, spillThreshold) { array =>
       assert(array.isEmpty)
-      val expectedValues = populateRows(array, 1)
-      assert(array.length == 1)
+      val expectedValues = populateRows(array, inMemoryThreshold - 1)
+      assert(array.length == (inMemoryThreshold - 1))
+      val iterator1 = validateData(array, expectedValues)
+      assertNoSpill()
+
+      // Add more rows to trigger switch to [[UnsafeExternalSorter]] but not too many to cause a
+      // spill to happen. Verify that NO spill has happened
+      populateRows(array, spillThreshold - expectedValues.length - 1, expectedValues)
+      assert(array.length == spillThreshold - 1)
+      assertNoSpill()
+
+      val iterator2 = validateData(array, expectedValues)
+      assert(!iterator2.hasNext)
 
+      assert(!iterator1.hasNext)
+      intercept[ConcurrentModificationException](iterator1.next())
+    }
+  }
+
+  test("insert rows enough to force spill") {
+    val (inMemoryThreshold, spillThreshold) = (20, 10)
+    withExternalArray(inMemoryThreshold, spillThreshold) { array =>
+      assert(array.isEmpty)
+      val expectedValues = populateRows(array, inMemoryThreshold - 1)
+      assert(array.length == (inMemoryThreshold - 1))
       val iterator1 = validateData(array, expectedValues)
+      assertNoSpill()
 
-      // Populate more rows to trigger spill. Verify that spill has happened
-      populateRows(array, numValuesInserted - 1, expectedValues)
-      assert(array.length == numValuesInserted)
+      // Add more rows to trigger switch to [[UnsafeExternalSorter]] and cause a spill to happen.
+      // Verify that spill has happened
+      populateRows(array, 2, expectedValues)
+      assert(array.length == inMemoryThreshold + 1)
       assertSpill()
 
       val iterator2 = validateData(array, expectedValues)
@@ -158,7 +181,7 @@ class ExternalAppendOnlyUnsafeRowArraySuite extends SparkFunSuite with LocalSpar
   }
 
   test("iterator on an empty array should be empty") {
-    withExternalArray(spillThreshold = 10) { array =>
+    withExternalArray(inMemoryThreshold = 4, spillThreshold = 10) { array =>
       val iterator = array.generateIterator()
       assert(array.isEmpty)
       assert(array.length == 0)
@@ -167,7 +190,7 @@ class ExternalAppendOnlyUnsafeRowArraySuite extends SparkFunSuite with LocalSpar
   }
 
   test("generate iterator with negative start index") {
-    withExternalArray(spillThreshold = 2) { array =>
+    withExternalArray(inMemoryThreshold = 100, spillThreshold = 56) { array =>
       val exception =
         intercept[ArrayIndexOutOfBoundsException](array.generateIterator(startIndex = -10))
 
@@ -178,8 +201,8 @@ class ExternalAppendOnlyUnsafeRowArraySuite extends SparkFunSuite with LocalSpar
   }
 
   test("generate iterator with start index exceeding array's size (without spill)") {
-    val spillThreshold = 2
-    withExternalArray(spillThreshold) { array =>
+    val (inMemoryThreshold, spillThreshold) = (20, 100)
+    withExternalArray(inMemoryThreshold, spillThreshold) { array =>
       populateRows(array, spillThreshold / 2)
 
       val exception =
@@ -191,8 +214,8 @@ class ExternalAppendOnlyUnsafeRowArraySuite extends SparkFunSuite with LocalSpar
   }
 
   test("generate iterator with start index exceeding array's size (with spill)") {
-    val spillThreshold = 2
-    withExternalArray(spillThreshold) { array =>
+    val (inMemoryThreshold, spillThreshold) = (20, 100)
+    withExternalArray(inMemoryThreshold, spillThreshold) { array =>
       populateRows(array, spillThreshold * 2)
 
       val exception =
@@ -205,10 +228,10 @@ class ExternalAppendOnlyUnsafeRowArraySuite extends SparkFunSuite with LocalSpar
   }
 
   test("generate iterator with custom start index (without spill)") {
-    val spillThreshold = 10
-    withExternalArray(spillThreshold) { array =>
-      val expectedValues = populateRows(array, spillThreshold)
-      val startIndex = spillThreshold / 2
+    val (inMemoryThreshold, spillThreshold) = (20, 100)
+    withExternalArray(inMemoryThreshold, spillThreshold) { array =>
+      val expectedValues = populateRows(array, inMemoryThreshold)
+      val startIndex = inMemoryThreshold / 2
       val iterator = array.generateIterator(startIndex = startIndex)
       for (i <- startIndex until expectedValues.length) {
         checkIfValueExists(iterator, expectedValues(i))
@@ -217,8 +240,8 @@ class ExternalAppendOnlyUnsafeRowArraySuite extends SparkFunSuite with LocalSpar
   }
 
   test("generate iterator with custom start index (with spill)") {
-    val spillThreshold = 10
-    withExternalArray(spillThreshold) { array =>
+    val (inMemoryThreshold, spillThreshold) = (20, 100)
+    withExternalArray(inMemoryThreshold, spillThreshold) { array =>
       val expectedValues = populateRows(array, spillThreshold * 10)
       val startIndex = spillThreshold * 2
       val iterator = array.generateIterator(startIndex = startIndex)
@@ -229,7 +252,7 @@ class ExternalAppendOnlyUnsafeRowArraySuite extends SparkFunSuite with LocalSpar
   }
 
   test("test iterator invalidation (without spill)") {
-    withExternalArray(spillThreshold = 10) { array =>
+    withExternalArray(inMemoryThreshold = 10, spillThreshold = 100) { array =>
       // insert 2 rows, iterate until the first row
       populateRows(array, 2)
 
@@ -254,9 +277,9 @@ class ExternalAppendOnlyUnsafeRowArraySuite extends SparkFunSuite with LocalSpar
   }
 
   test("test iterator invalidation (with spill)") {
-    val spillThreshold = 10
-    withExternalArray(spillThreshold) { array =>
-      // Populate enough rows so that spill has happens
+    val (inMemoryThreshold, spillThreshold) = (2, 10)
+    withExternalArray(inMemoryThreshold, spillThreshold) { array =>
+      // Populate enough rows so that spill happens
       populateRows(array, spillThreshold * 2)
       assertSpill()
 
@@ -281,7 +304,7 @@ class ExternalAppendOnlyUnsafeRowArraySuite extends SparkFunSuite with LocalSpar
   }
 
   test("clear on an empty the array") {
-    withExternalArray(spillThreshold = 2) { array =>
+    withExternalArray(inMemoryThreshold = 2, spillThreshold = 3) { array =>
       val iterator = array.generateIterator()
       assert(!iterator.hasNext)
 
@@ -299,10 +322,10 @@ class ExternalAppendOnlyUnsafeRowArraySuite extends SparkFunSuite with LocalSpar
   }
 
   test("clear array (without spill)") {
-    val spillThreshold = 10
-    withExternalArray(spillThreshold) { array =>
+    val (inMemoryThreshold, spillThreshold) = (10, 100)
+    withExternalArray(inMemoryThreshold, spillThreshold) { array =>
       // Populate rows ... but not enough to trigger spill
-      populateRows(array, spillThreshold / 2)
+      populateRows(array, inMemoryThreshold / 2)
       assertNoSpill()
 
       // Clear the array
@@ -311,21 +334,21 @@ class ExternalAppendOnlyUnsafeRowArraySuite extends SparkFunSuite with LocalSpar
 
       // Re-populate few rows so that there is no spill
       // Verify the data. Verify that there was no spill
-      val expectedValues = populateRows(array, spillThreshold / 3)
+      val expectedValues = populateRows(array, inMemoryThreshold / 2)
       validateData(array, expectedValues)
       assertNoSpill()
 
       // Populate more rows .. enough to not trigger a spill.
       // Verify the data. Verify that there was no spill
-      populateRows(array, spillThreshold / 3, expectedValues)
+      populateRows(array, inMemoryThreshold / 2, expectedValues)
       validateData(array, expectedValues)
       assertNoSpill()
     }
   }
 
   test("clear array (with spill)") {
-    val spillThreshold = 10
-    withExternalArray(spillThreshold) { array =>
+    val (inMemoryThreshold, spillThreshold) = (10, 20)
+    withExternalArray(inMemoryThreshold, spillThreshold) { array =>
       // Populate enough rows to trigger spill
       populateRows(array, spillThreshold * 2)
       val bytesSpilled = getNumBytesSpilled

http://git-wip-us.apache.org/repos/asf/spark/blob/94439997/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala
index a9f3fb3..a57514c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala
@@ -477,7 +477,8 @@ class SQLWindowFunctionSuite extends QueryTest with SharedSQLContext {
         |WINDOW w1 AS (ORDER BY x ROWS BETWEEN UNBOUNDED PRECEDiNG AND CURRENT RoW)
       """.stripMargin)
 
-    withSQLConf("spark.sql.windowExec.buffer.spill.threshold" -> "1") {
+    withSQLConf("spark.sql.windowExec.buffer.in.memory.threshold" -> "1",
+      "spark.sql.windowExec.buffer.spill.threshold" -> "2") {
       assertSpilled(sparkContext, "test with low buffer spill threshold") {
         checkAnswer(actual, expected)
       }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org