You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by hv...@apache.org on 2017/08/11 20:01:05 UTC
spark git commit: [SPARK-21595] Separate thresholds for buffering and
spilling in ExternalAppendOnlyUnsafeRowArray
Repository: spark
Updated Branches:
refs/heads/master 0377338bf -> 94439997d
[SPARK-21595] Separate thresholds for buffering and spilling in ExternalAppendOnlyUnsafeRowArray
## What changes were proposed in this pull request?
[SPARK-21595](https://issues.apache.org/jira/browse/SPARK-21595) reported that there is excessive spilling to disk due to default spill threshold for `ExternalAppendOnlyUnsafeRowArray` being quite small for WINDOW operator. Old behaviour of WINDOW operator (pre https://github.com/apache/spark/pull/16909) would hold data in an array for first 4096 records post which it would switch to `UnsafeExternalSorter` and start spilling to disk after reaching `spark.shuffle.spill.numElementsForceSpillThreshold` (or earlier if there was paucity of memory due to excessive consumers).
Currently the (switch from in-memory to `UnsafeExternalSorter`) and (`UnsafeExternalSorter` spilling to disk) for `ExternalAppendOnlyUnsafeRowArray` is controlled by a single threshold. This PR aims to separate that to have more granular control.
## How was this patch tested?
Added unit tests
Author: Tejas Patil <te...@fb.com>
Closes #18843 from tejasapatil/SPARK-21595.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/94439997
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/94439997
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/94439997
Branch: refs/heads/master
Commit: 94439997d57875838a8283c543f9b44705d3a503
Parents: 0377338
Author: Tejas Patil <te...@fb.com>
Authored: Fri Aug 11 22:01:00 2017 +0200
Committer: Herman van Hovell <hv...@databricks.com>
Committed: Fri Aug 11 22:01:00 2017 +0200
----------------------------------------------------------------------
.../org/apache/spark/sql/internal/SQLConf.scala | 41 +++++++-
.../ExternalAppendOnlyUnsafeRowArray.scala | 28 ++---
.../execution/joins/CartesianProductExec.scala | 12 ++-
.../sql/execution/joins/SortMergeJoinExec.scala | 24 ++++-
.../spark/sql/execution/window/WindowExec.scala | 4 +-
.../scala/org/apache/spark/sql/JoinSuite.scala | 3 +-
...ernalAppendOnlyUnsafeRowArrayBenchmark.scala | 7 +-
.../ExternalAppendOnlyUnsafeRowArraySuite.scala | 103 ++++++++++++-------
.../sql/execution/SQLWindowFunctionSuite.scala | 3 +-
9 files changed, 155 insertions(+), 70 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/94439997/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index ecb941c..733d80e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -844,24 +844,47 @@ object SQLConf {
.stringConf
.createWithDefaultFunction(() => TimeZone.getDefault.getID)
+ val WINDOW_EXEC_BUFFER_IN_MEMORY_THRESHOLD =
+ buildConf("spark.sql.windowExec.buffer.in.memory.threshold")
+ .internal()
+ .doc("Threshold for number of rows guaranteed to be held in memory by the window operator")
+ .intConf
+ .createWithDefault(4096)
+
val WINDOW_EXEC_BUFFER_SPILL_THRESHOLD =
buildConf("spark.sql.windowExec.buffer.spill.threshold")
.internal()
- .doc("Threshold for number of rows buffered in window operator")
+ .doc("Threshold for number of rows to be spilled by window operator")
.intConf
- .createWithDefault(4096)
+ .createWithDefault(UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD.toInt)
+
+ val SORT_MERGE_JOIN_EXEC_BUFFER_IN_MEMORY_THRESHOLD =
+ buildConf("spark.sql.sortMergeJoinExec.buffer.in.memory.threshold")
+ .internal()
+ .doc("Threshold for number of rows guaranteed to be held in memory by the sort merge " +
+ "join operator")
+ .intConf
+ .createWithDefault(Int.MaxValue)
val SORT_MERGE_JOIN_EXEC_BUFFER_SPILL_THRESHOLD =
buildConf("spark.sql.sortMergeJoinExec.buffer.spill.threshold")
.internal()
- .doc("Threshold for number of rows buffered in sort merge join operator")
+ .doc("Threshold for number of rows to be spilled by sort merge join operator")
.intConf
- .createWithDefault(Int.MaxValue)
+ .createWithDefault(UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD.toInt)
+
+ val CARTESIAN_PRODUCT_EXEC_BUFFER_IN_MEMORY_THRESHOLD =
+ buildConf("spark.sql.cartesianProductExec.buffer.in.memory.threshold")
+ .internal()
+ .doc("Threshold for number of rows guaranteed to be held in memory by the cartesian " +
+ "product operator")
+ .intConf
+ .createWithDefault(4096)
val CARTESIAN_PRODUCT_EXEC_BUFFER_SPILL_THRESHOLD =
buildConf("spark.sql.cartesianProductExec.buffer.spill.threshold")
.internal()
- .doc("Threshold for number of rows buffered in cartesian product operator")
+ .doc("Threshold for number of rows to be spilled by cartesian product operator")
.intConf
.createWithDefault(UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD.toInt)
@@ -1137,11 +1160,19 @@ class SQLConf extends Serializable with Logging {
def joinReorderDPStarFilter: Boolean = getConf(SQLConf.JOIN_REORDER_DP_STAR_FILTER)
+ def windowExecBufferInMemoryThreshold: Int = getConf(WINDOW_EXEC_BUFFER_IN_MEMORY_THRESHOLD)
+
def windowExecBufferSpillThreshold: Int = getConf(WINDOW_EXEC_BUFFER_SPILL_THRESHOLD)
+ def sortMergeJoinExecBufferInMemoryThreshold: Int =
+ getConf(SORT_MERGE_JOIN_EXEC_BUFFER_IN_MEMORY_THRESHOLD)
+
def sortMergeJoinExecBufferSpillThreshold: Int =
getConf(SORT_MERGE_JOIN_EXEC_BUFFER_SPILL_THRESHOLD)
+ def cartesianProductExecBufferInMemoryThreshold: Int =
+ getConf(CARTESIAN_PRODUCT_EXEC_BUFFER_IN_MEMORY_THRESHOLD)
+
def cartesianProductExecBufferSpillThreshold: Int =
getConf(CARTESIAN_PRODUCT_EXEC_BUFFER_SPILL_THRESHOLD)
http://git-wip-us.apache.org/repos/asf/spark/blob/94439997/sql/core/src/main/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArray.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArray.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArray.scala
index c4d3834..ac282ea 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArray.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArray.scala
@@ -31,16 +31,16 @@ import org.apache.spark.storage.BlockManager
import org.apache.spark.util.collection.unsafe.sort.{UnsafeExternalSorter, UnsafeSorterIterator}
/**
- * An append-only array for [[UnsafeRow]]s that spills content to disk when there a predefined
- * threshold of rows is reached.
+ * An append-only array for [[UnsafeRow]]s that strictly keeps content in an in-memory array
+ * until [[numRowsInMemoryBufferThreshold]] is reached post which it will switch to a mode which
+ * would flush to disk after [[numRowsSpillThreshold]] is met (or before if there is
+ * excessive memory consumption). Setting these threshold involves following trade-offs:
*
- * Setting spill threshold faces following trade-off:
- *
- * - If the spill threshold is too high, the in-memory array may occupy more memory than is
- * available, resulting in OOM.
- * - If the spill threshold is too low, we spill frequently and incur unnecessary disk writes.
- * This may lead to a performance regression compared to the normal case of using an
- * [[ArrayBuffer]] or [[Array]].
+ * - If [[numRowsInMemoryBufferThreshold]] is too high, the in-memory array may occupy more memory
+ * than is available, resulting in OOM.
+ * - If [[numRowsSpillThreshold]] is too low, data will be spilled frequently and lead to
+ * excessive disk writes. This may lead to a performance regression compared to the normal case
+ * of using an [[ArrayBuffer]] or [[Array]].
*/
private[sql] class ExternalAppendOnlyUnsafeRowArray(
taskMemoryManager: TaskMemoryManager,
@@ -49,9 +49,10 @@ private[sql] class ExternalAppendOnlyUnsafeRowArray(
taskContext: TaskContext,
initialSize: Int,
pageSizeBytes: Long,
+ numRowsInMemoryBufferThreshold: Int,
numRowsSpillThreshold: Int) extends Logging {
- def this(numRowsSpillThreshold: Int) {
+ def this(numRowsInMemoryBufferThreshold: Int, numRowsSpillThreshold: Int) {
this(
TaskContext.get().taskMemoryManager(),
SparkEnv.get.blockManager,
@@ -59,11 +60,12 @@ private[sql] class ExternalAppendOnlyUnsafeRowArray(
TaskContext.get(),
1024,
SparkEnv.get.memoryManager.pageSizeBytes,
+ numRowsInMemoryBufferThreshold,
numRowsSpillThreshold)
}
private val initialSizeOfInMemoryBuffer =
- Math.min(DefaultInitialSizeOfInMemoryBuffer, numRowsSpillThreshold)
+ Math.min(DefaultInitialSizeOfInMemoryBuffer, numRowsInMemoryBufferThreshold)
private val inMemoryBuffer = if (initialSizeOfInMemoryBuffer > 0) {
new ArrayBuffer[UnsafeRow](initialSizeOfInMemoryBuffer)
@@ -102,11 +104,11 @@ private[sql] class ExternalAppendOnlyUnsafeRowArray(
}
def add(unsafeRow: UnsafeRow): Unit = {
- if (numRows < numRowsSpillThreshold) {
+ if (numRows < numRowsInMemoryBufferThreshold) {
inMemoryBuffer += unsafeRow.copy()
} else {
if (spillableArray == null) {
- logInfo(s"Reached spill threshold of $numRowsSpillThreshold rows, switching to " +
+ logInfo(s"Reached spill threshold of $numRowsInMemoryBufferThreshold rows, switching to " +
s"${classOf[UnsafeExternalSorter].getName}")
// We will not sort the rows, so prefixComparator and recordComparator are null
http://git-wip-us.apache.org/repos/asf/spark/blob/94439997/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProductExec.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProductExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProductExec.scala
index f380986..4d261dd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProductExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProductExec.scala
@@ -35,11 +35,12 @@ class UnsafeCartesianRDD(
left : RDD[UnsafeRow],
right : RDD[UnsafeRow],
numFieldsOfRight: Int,
+ inMemoryBufferThreshold: Int,
spillThreshold: Int)
extends CartesianRDD[UnsafeRow, UnsafeRow](left.sparkContext, left, right) {
override def compute(split: Partition, context: TaskContext): Iterator[(UnsafeRow, UnsafeRow)] = {
- val rowArray = new ExternalAppendOnlyUnsafeRowArray(spillThreshold)
+ val rowArray = new ExternalAppendOnlyUnsafeRowArray(inMemoryBufferThreshold, spillThreshold)
val partition = split.asInstanceOf[CartesianPartition]
rdd2.iterator(partition.s2, context).foreach(rowArray.add)
@@ -71,9 +72,12 @@ case class CartesianProductExec(
val leftResults = left.execute().asInstanceOf[RDD[UnsafeRow]]
val rightResults = right.execute().asInstanceOf[RDD[UnsafeRow]]
- val spillThreshold = sqlContext.conf.cartesianProductExecBufferSpillThreshold
-
- val pair = new UnsafeCartesianRDD(leftResults, rightResults, right.output.size, spillThreshold)
+ val pair = new UnsafeCartesianRDD(
+ leftResults,
+ rightResults,
+ right.output.size,
+ sqlContext.conf.cartesianProductExecBufferInMemoryThreshold,
+ sqlContext.conf.cartesianProductExecBufferSpillThreshold)
pair.mapPartitionsWithIndexInternal { (index, iter) =>
val joiner = GenerateUnsafeRowJoiner.create(left.schema, right.schema)
val filtered = if (condition.isDefined) {
http://git-wip-us.apache.org/repos/asf/spark/blob/94439997/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
index f41fa14..91d214e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
@@ -130,9 +130,14 @@ case class SortMergeJoinExec(
sqlContext.conf.sortMergeJoinExecBufferSpillThreshold
}
+ private def getInMemoryThreshold: Int = {
+ sqlContext.conf.sortMergeJoinExecBufferInMemoryThreshold
+ }
+
protected override def doExecute(): RDD[InternalRow] = {
val numOutputRows = longMetric("numOutputRows")
val spillThreshold = getSpillThreshold
+ val inMemoryThreshold = getInMemoryThreshold
left.execute().zipPartitions(right.execute()) { (leftIter, rightIter) =>
val boundCondition: (InternalRow) => Boolean = {
condition.map { cond =>
@@ -158,6 +163,7 @@ case class SortMergeJoinExec(
keyOrdering,
RowIterator.fromScala(leftIter),
RowIterator.fromScala(rightIter),
+ inMemoryThreshold,
spillThreshold
)
private[this] val joinRow = new JoinedRow
@@ -201,6 +207,7 @@ case class SortMergeJoinExec(
keyOrdering,
streamedIter = RowIterator.fromScala(leftIter),
bufferedIter = RowIterator.fromScala(rightIter),
+ inMemoryThreshold,
spillThreshold
)
val rightNullRow = new GenericInternalRow(right.output.length)
@@ -214,6 +221,7 @@ case class SortMergeJoinExec(
keyOrdering,
streamedIter = RowIterator.fromScala(rightIter),
bufferedIter = RowIterator.fromScala(leftIter),
+ inMemoryThreshold,
spillThreshold
)
val leftNullRow = new GenericInternalRow(left.output.length)
@@ -247,6 +255,7 @@ case class SortMergeJoinExec(
keyOrdering,
RowIterator.fromScala(leftIter),
RowIterator.fromScala(rightIter),
+ inMemoryThreshold,
spillThreshold
)
private[this] val joinRow = new JoinedRow
@@ -281,6 +290,7 @@ case class SortMergeJoinExec(
keyOrdering,
RowIterator.fromScala(leftIter),
RowIterator.fromScala(rightIter),
+ inMemoryThreshold,
spillThreshold
)
private[this] val joinRow = new JoinedRow
@@ -322,6 +332,7 @@ case class SortMergeJoinExec(
keyOrdering,
RowIterator.fromScala(leftIter),
RowIterator.fromScala(rightIter),
+ inMemoryThreshold,
spillThreshold
)
private[this] val joinRow = new JoinedRow
@@ -420,8 +431,10 @@ case class SortMergeJoinExec(
val clsName = classOf[ExternalAppendOnlyUnsafeRowArray].getName
val spillThreshold = getSpillThreshold
+ val inMemoryThreshold = getInMemoryThreshold
- ctx.addMutableState(clsName, matches, s"$matches = new $clsName($spillThreshold);")
+ ctx.addMutableState(clsName, matches,
+ s"$matches = new $clsName($inMemoryThreshold, $spillThreshold);")
// Copy the left keys as class members so they could be used in next function call.
val matchedKeyVars = copyKeys(ctx, leftKeyVars)
@@ -626,6 +639,9 @@ case class SortMergeJoinExec(
* @param streamedIter an input whose rows will be streamed.
* @param bufferedIter an input whose rows will be buffered to construct sequences of rows that
* have the same join key.
+ * @param inMemoryThreshold Threshold for number of rows guaranteed to be held in memory by
+ * internal buffer
+ * @param spillThreshold Threshold for number of rows to be spilled by internal buffer
*/
private[joins] class SortMergeJoinScanner(
streamedKeyGenerator: Projection,
@@ -633,7 +649,8 @@ private[joins] class SortMergeJoinScanner(
keyOrdering: Ordering[InternalRow],
streamedIter: RowIterator,
bufferedIter: RowIterator,
- bufferThreshold: Int) {
+ inMemoryThreshold: Int,
+ spillThreshold: Int) {
private[this] var streamedRow: InternalRow = _
private[this] var streamedRowKey: InternalRow = _
private[this] var bufferedRow: InternalRow = _
@@ -644,7 +661,8 @@ private[joins] class SortMergeJoinScanner(
*/
private[this] var matchJoinKey: InternalRow = _
/** Buffered rows from the buffered side of the join. This is empty if there are no matches. */
- private[this] val bufferedMatches = new ExternalAppendOnlyUnsafeRowArray(bufferThreshold)
+ private[this] val bufferedMatches =
+ new ExternalAppendOnlyUnsafeRowArray(inMemoryThreshold, spillThreshold)
// Initialization (note: do _not_ want to advance streamed here).
advancedBufferedToRowWithNullFreeJoinKey()
http://git-wip-us.apache.org/repos/asf/spark/blob/94439997/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala
index f8bb667..800a2ea 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala
@@ -292,6 +292,7 @@ case class WindowExec(
// Unwrap the expressions and factories from the map.
val expressions = windowFrameExpressionFactoryPairs.flatMap(_._1)
val factories = windowFrameExpressionFactoryPairs.map(_._2).toArray
+ val inMemoryThreshold = sqlContext.conf.windowExecBufferInMemoryThreshold
val spillThreshold = sqlContext.conf.windowExecBufferSpillThreshold
// Start processing.
@@ -322,7 +323,8 @@ case class WindowExec(
val inputFields = child.output.length
val buffer: ExternalAppendOnlyUnsafeRowArray =
- new ExternalAppendOnlyUnsafeRowArray(spillThreshold)
+ new ExternalAppendOnlyUnsafeRowArray(inMemoryThreshold, spillThreshold)
+
var bufferIterator: Iterator[UnsafeRow] = _
val windowFunctionResult = new SpecificInternalRow(expressions.map(_.dataType))
http://git-wip-us.apache.org/repos/asf/spark/blob/94439997/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
index 895ca19..0008d50 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
@@ -665,7 +665,8 @@ class JoinSuite extends QueryTest with SharedSQLContext {
test("test SortMergeJoin (with spill)") {
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "1",
- "spark.sql.sortMergeJoinExec.buffer.spill.threshold" -> "0") {
+ "spark.sql.sortMergeJoinExec.buffer.in.memory.threshold" -> "0",
+ "spark.sql.sortMergeJoinExec.buffer.spill.threshold" -> "1") {
assertSpilled(sparkContext, "inner join") {
checkAnswer(
http://git-wip-us.apache.org/repos/asf/spark/blob/94439997/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArrayBenchmark.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArrayBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArrayBenchmark.scala
index 031ac38..efe28af 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArrayBenchmark.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArrayBenchmark.scala
@@ -67,7 +67,10 @@ object ExternalAppendOnlyUnsafeRowArrayBenchmark {
benchmark.addCase("ExternalAppendOnlyUnsafeRowArray") { _: Int =>
var sum = 0L
for (_ <- 0L until iterations) {
- val array = new ExternalAppendOnlyUnsafeRowArray(numSpillThreshold)
+ val array = new ExternalAppendOnlyUnsafeRowArray(
+ ExternalAppendOnlyUnsafeRowArray.DefaultInitialSizeOfInMemoryBuffer,
+ numSpillThreshold)
+
rows.foreach(x => array.add(x))
val iterator = array.generateIterator()
@@ -143,7 +146,7 @@ object ExternalAppendOnlyUnsafeRowArrayBenchmark {
benchmark.addCase("ExternalAppendOnlyUnsafeRowArray") { _: Int =>
var sum = 0L
for (_ <- 0L until iterations) {
- val array = new ExternalAppendOnlyUnsafeRowArray(numSpillThreshold)
+ val array = new ExternalAppendOnlyUnsafeRowArray(numSpillThreshold, numSpillThreshold)
rows.foreach(x => array.add(x))
val iterator = array.generateIterator()
http://git-wip-us.apache.org/repos/asf/spark/blob/94439997/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArraySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArraySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArraySuite.scala
index 53c4163..ecc7264 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArraySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArraySuite.scala
@@ -31,7 +31,7 @@ class ExternalAppendOnlyUnsafeRowArraySuite extends SparkFunSuite with LocalSpar
override def afterAll(): Unit = TaskContext.unset()
- private def withExternalArray(spillThreshold: Int)
+ private def withExternalArray(inMemoryThreshold: Int, spillThreshold: Int)
(f: ExternalAppendOnlyUnsafeRowArray => Unit): Unit = {
sc = new SparkContext("local", "test", new SparkConf(false))
@@ -45,6 +45,7 @@ class ExternalAppendOnlyUnsafeRowArraySuite extends SparkFunSuite with LocalSpar
taskContext,
1024,
SparkEnv.get.memoryManager.pageSizeBytes,
+ inMemoryThreshold,
spillThreshold)
try f(array) finally {
array.clear()
@@ -109,9 +110,9 @@ class ExternalAppendOnlyUnsafeRowArraySuite extends SparkFunSuite with LocalSpar
assert(getNumBytesSpilled > 0)
}
- test("insert rows less than the spillThreshold") {
- val spillThreshold = 100
- withExternalArray(spillThreshold) { array =>
+ test("insert rows less than the inMemoryThreshold") {
+ val (inMemoryThreshold, spillThreshold) = (100, 50)
+ withExternalArray(inMemoryThreshold, spillThreshold) { array =>
assert(array.isEmpty)
val expectedValues = populateRows(array, 1)
@@ -122,8 +123,8 @@ class ExternalAppendOnlyUnsafeRowArraySuite extends SparkFunSuite with LocalSpar
// Add more rows (but not too many to trigger switch to [[UnsafeExternalSorter]])
// Verify that NO spill has happened
- populateRows(array, spillThreshold - 1, expectedValues)
- assert(array.length == spillThreshold)
+ populateRows(array, inMemoryThreshold - 1, expectedValues)
+ assert(array.length == inMemoryThreshold)
assertNoSpill()
val iterator2 = validateData(array, expectedValues)
@@ -133,20 +134,42 @@ class ExternalAppendOnlyUnsafeRowArraySuite extends SparkFunSuite with LocalSpar
}
}
- test("insert rows more than the spillThreshold to force spill") {
- val spillThreshold = 100
- withExternalArray(spillThreshold) { array =>
- val numValuesInserted = 20 * spillThreshold
-
+ test("insert rows more than the inMemoryThreshold but less than spillThreshold") {
+ val (inMemoryThreshold, spillThreshold) = (10, 50)
+ withExternalArray(inMemoryThreshold, spillThreshold) { array =>
assert(array.isEmpty)
- val expectedValues = populateRows(array, 1)
- assert(array.length == 1)
+ val expectedValues = populateRows(array, inMemoryThreshold - 1)
+ assert(array.length == (inMemoryThreshold - 1))
+ val iterator1 = validateData(array, expectedValues)
+ assertNoSpill()
+
+ // Add more rows to trigger switch to [[UnsafeExternalSorter]] but not too many to cause a
+ // spill to happen. Verify that NO spill has happened
+ populateRows(array, spillThreshold - expectedValues.length - 1, expectedValues)
+ assert(array.length == spillThreshold - 1)
+ assertNoSpill()
+
+ val iterator2 = validateData(array, expectedValues)
+ assert(!iterator2.hasNext)
+ assert(!iterator1.hasNext)
+ intercept[ConcurrentModificationException](iterator1.next())
+ }
+ }
+
+ test("insert rows enough to force spill") {
+ val (inMemoryThreshold, spillThreshold) = (20, 10)
+ withExternalArray(inMemoryThreshold, spillThreshold) { array =>
+ assert(array.isEmpty)
+ val expectedValues = populateRows(array, inMemoryThreshold - 1)
+ assert(array.length == (inMemoryThreshold - 1))
val iterator1 = validateData(array, expectedValues)
+ assertNoSpill()
- // Populate more rows to trigger spill. Verify that spill has happened
- populateRows(array, numValuesInserted - 1, expectedValues)
- assert(array.length == numValuesInserted)
+ // Add more rows to trigger switch to [[UnsafeExternalSorter]] and cause a spill to happen.
+ // Verify that spill has happened
+ populateRows(array, 2, expectedValues)
+ assert(array.length == inMemoryThreshold + 1)
assertSpill()
val iterator2 = validateData(array, expectedValues)
@@ -158,7 +181,7 @@ class ExternalAppendOnlyUnsafeRowArraySuite extends SparkFunSuite with LocalSpar
}
test("iterator on an empty array should be empty") {
- withExternalArray(spillThreshold = 10) { array =>
+ withExternalArray(inMemoryThreshold = 4, spillThreshold = 10) { array =>
val iterator = array.generateIterator()
assert(array.isEmpty)
assert(array.length == 0)
@@ -167,7 +190,7 @@ class ExternalAppendOnlyUnsafeRowArraySuite extends SparkFunSuite with LocalSpar
}
test("generate iterator with negative start index") {
- withExternalArray(spillThreshold = 2) { array =>
+ withExternalArray(inMemoryThreshold = 100, spillThreshold = 56) { array =>
val exception =
intercept[ArrayIndexOutOfBoundsException](array.generateIterator(startIndex = -10))
@@ -178,8 +201,8 @@ class ExternalAppendOnlyUnsafeRowArraySuite extends SparkFunSuite with LocalSpar
}
test("generate iterator with start index exceeding array's size (without spill)") {
- val spillThreshold = 2
- withExternalArray(spillThreshold) { array =>
+ val (inMemoryThreshold, spillThreshold) = (20, 100)
+ withExternalArray(inMemoryThreshold, spillThreshold) { array =>
populateRows(array, spillThreshold / 2)
val exception =
@@ -191,8 +214,8 @@ class ExternalAppendOnlyUnsafeRowArraySuite extends SparkFunSuite with LocalSpar
}
test("generate iterator with start index exceeding array's size (with spill)") {
- val spillThreshold = 2
- withExternalArray(spillThreshold) { array =>
+ val (inMemoryThreshold, spillThreshold) = (20, 100)
+ withExternalArray(inMemoryThreshold, spillThreshold) { array =>
populateRows(array, spillThreshold * 2)
val exception =
@@ -205,10 +228,10 @@ class ExternalAppendOnlyUnsafeRowArraySuite extends SparkFunSuite with LocalSpar
}
test("generate iterator with custom start index (without spill)") {
- val spillThreshold = 10
- withExternalArray(spillThreshold) { array =>
- val expectedValues = populateRows(array, spillThreshold)
- val startIndex = spillThreshold / 2
+ val (inMemoryThreshold, spillThreshold) = (20, 100)
+ withExternalArray(inMemoryThreshold, spillThreshold) { array =>
+ val expectedValues = populateRows(array, inMemoryThreshold)
+ val startIndex = inMemoryThreshold / 2
val iterator = array.generateIterator(startIndex = startIndex)
for (i <- startIndex until expectedValues.length) {
checkIfValueExists(iterator, expectedValues(i))
@@ -217,8 +240,8 @@ class ExternalAppendOnlyUnsafeRowArraySuite extends SparkFunSuite with LocalSpar
}
test("generate iterator with custom start index (with spill)") {
- val spillThreshold = 10
- withExternalArray(spillThreshold) { array =>
+ val (inMemoryThreshold, spillThreshold) = (20, 100)
+ withExternalArray(inMemoryThreshold, spillThreshold) { array =>
val expectedValues = populateRows(array, spillThreshold * 10)
val startIndex = spillThreshold * 2
val iterator = array.generateIterator(startIndex = startIndex)
@@ -229,7 +252,7 @@ class ExternalAppendOnlyUnsafeRowArraySuite extends SparkFunSuite with LocalSpar
}
test("test iterator invalidation (without spill)") {
- withExternalArray(spillThreshold = 10) { array =>
+ withExternalArray(inMemoryThreshold = 10, spillThreshold = 100) { array =>
// insert 2 rows, iterate until the first row
populateRows(array, 2)
@@ -254,9 +277,9 @@ class ExternalAppendOnlyUnsafeRowArraySuite extends SparkFunSuite with LocalSpar
}
test("test iterator invalidation (with spill)") {
- val spillThreshold = 10
- withExternalArray(spillThreshold) { array =>
- // Populate enough rows so that spill has happens
+ val (inMemoryThreshold, spillThreshold) = (2, 10)
+ withExternalArray(inMemoryThreshold, spillThreshold) { array =>
+ // Populate enough rows so that spill happens
populateRows(array, spillThreshold * 2)
assertSpill()
@@ -281,7 +304,7 @@ class ExternalAppendOnlyUnsafeRowArraySuite extends SparkFunSuite with LocalSpar
}
test("clear on an empty the array") {
- withExternalArray(spillThreshold = 2) { array =>
+ withExternalArray(inMemoryThreshold = 2, spillThreshold = 3) { array =>
val iterator = array.generateIterator()
assert(!iterator.hasNext)
@@ -299,10 +322,10 @@ class ExternalAppendOnlyUnsafeRowArraySuite extends SparkFunSuite with LocalSpar
}
test("clear array (without spill)") {
- val spillThreshold = 10
- withExternalArray(spillThreshold) { array =>
+ val (inMemoryThreshold, spillThreshold) = (10, 100)
+ withExternalArray(inMemoryThreshold, spillThreshold) { array =>
// Populate rows ... but not enough to trigger spill
- populateRows(array, spillThreshold / 2)
+ populateRows(array, inMemoryThreshold / 2)
assertNoSpill()
// Clear the array
@@ -311,21 +334,21 @@ class ExternalAppendOnlyUnsafeRowArraySuite extends SparkFunSuite with LocalSpar
// Re-populate few rows so that there is no spill
// Verify the data. Verify that there was no spill
- val expectedValues = populateRows(array, spillThreshold / 3)
+ val expectedValues = populateRows(array, inMemoryThreshold / 2)
validateData(array, expectedValues)
assertNoSpill()
// Populate more rows .. enough to not trigger a spill.
// Verify the data. Verify that there was no spill
- populateRows(array, spillThreshold / 3, expectedValues)
+ populateRows(array, inMemoryThreshold / 2, expectedValues)
validateData(array, expectedValues)
assertNoSpill()
}
}
test("clear array (with spill)") {
- val spillThreshold = 10
- withExternalArray(spillThreshold) { array =>
+ val (inMemoryThreshold, spillThreshold) = (10, 20)
+ withExternalArray(inMemoryThreshold, spillThreshold) { array =>
// Populate enough rows to trigger spill
populateRows(array, spillThreshold * 2)
val bytesSpilled = getNumBytesSpilled
http://git-wip-us.apache.org/repos/asf/spark/blob/94439997/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala
index a9f3fb3..a57514c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala
@@ -477,7 +477,8 @@ class SQLWindowFunctionSuite extends QueryTest with SharedSQLContext {
|WINDOW w1 AS (ORDER BY x ROWS BETWEEN UNBOUNDED PRECEDiNG AND CURRENT RoW)
""".stripMargin)
- withSQLConf("spark.sql.windowExec.buffer.spill.threshold" -> "1") {
+ withSQLConf("spark.sql.windowExec.buffer.in.memory.threshold" -> "1",
+ "spark.sql.windowExec.buffer.spill.threshold" -> "2") {
assertSpilled(sparkContext, "test with low buffer spill threshold") {
checkAnswer(actual, expected)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org