You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2017/07/19 13:35:42 UTC

spark git commit: [SPARK-21414] Refine SlidingWindowFunctionFrame to avoid OOM.

Repository: spark
Updated Branches:
  refs/heads/master 46307b2cd -> 4eb081cc8


[SPARK-21414] Refine SlidingWindowFunctionFrame to avoid OOM.

## What changes were proposed in this pull request?

In `SlidingWindowFunctionFrame`, it is now adding all rows to the buffer for which the input row value is equal to or less than the output row upper bound, then drop all rows from the buffer for which the input row value is smaller than the output row lower bound.
This could result in the buffer is very big though the window is small.
For example:
```
select a, b, sum(a)
over (partition by b order by a range between 1000000 following and 1000001 following)
from table
```
We can refine the logic and just add the qualified rows into buffer.

## How was this patch tested?
Manual test:
Run sql
`select shop, shopInfo, district, sum(revenue) over(partition by district order by revenue range between 100 following and 200 following) from revenueList limit 10`
against a table with 4  columns(shop: String, shopInfo: String, district: String, revenue: Int). The biggest partition is around 2G bytes, containing 200k lines.
Configure the executor with 2G bytes memory.
With the change in this pr, it works find. Without this change, below exception will be thrown.
```
MemoryError: Java heap space
	at org.apache.spark.sql.catalyst.expressions.UnsafeRow.copy(UnsafeRow.java:504)
	at org.apache.spark.sql.catalyst.expressions.UnsafeRow.copy(UnsafeRow.java:62)
	at org.apache.spark.sql.execution.window.SlidingWindowFunctionFrame.write(WindowFunctionFrame.scala:201)
	at org.apache.spark.sql.execution.window.WindowExec$$anonfun$14$$anon$1.next(WindowExec.scala:365)
	at org.apache.spark.sql.execution.window.WindowExec$$anonfun$14$$anon$1.next(WindowExec.scala:289)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:341)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
```

Author: jinxing <ji...@126.com>

Closes #18634 from jinxing64/SPARK-21414.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4eb081cc
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4eb081cc
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4eb081cc

Branch: refs/heads/master
Commit: 4eb081cc870a9d1c42aae90418535f7d782553e9
Parents: 46307b2
Author: jinxing <ji...@126.com>
Authored: Wed Jul 19 21:35:26 2017 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Wed Jul 19 21:35:26 2017 +0800

----------------------------------------------------------------------
 .../execution/window/WindowFunctionFrame.scala  | 22 ++++++-----
 .../sql/execution/SQLWindowFunctionSuite.scala  | 40 ++++++++++++++++++++
 2 files changed, 53 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/4eb081cc/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowFunctionFrame.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowFunctionFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowFunctionFrame.scala
index af2b4fb..156002e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowFunctionFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowFunctionFrame.scala
@@ -195,15 +195,6 @@ private[window] final class SlidingWindowFunctionFrame(
   override def write(index: Int, current: InternalRow): Unit = {
     var bufferUpdated = index == 0
 
-    // Add all rows to the buffer for which the input row value is equal to or less than
-    // the output row upper bound.
-    while (nextRow != null && ubound.compare(nextRow, inputHighIndex, current, index) <= 0) {
-      buffer.add(nextRow.copy())
-      nextRow = WindowFunctionFrame.getNextOrNull(inputIterator)
-      inputHighIndex += 1
-      bufferUpdated = true
-    }
-
     // Drop all rows from the buffer for which the input row value is smaller than
     // the output row lower bound.
     while (!buffer.isEmpty && lbound.compare(buffer.peek(), inputLowIndex, current, index) < 0) {
@@ -212,6 +203,19 @@ private[window] final class SlidingWindowFunctionFrame(
       bufferUpdated = true
     }
 
+    // Add all rows to the buffer for which the input row value is equal to or less than
+    // the output row upper bound.
+    while (nextRow != null && ubound.compare(nextRow, inputHighIndex, current, index) <= 0) {
+      if (lbound.compare(nextRow, inputLowIndex, current, index) < 0) {
+        inputLowIndex += 1
+      } else {
+        buffer.add(nextRow.copy())
+        bufferUpdated = true
+      }
+      nextRow = WindowFunctionFrame.getNextOrNull(inputIterator)
+      inputHighIndex += 1
+    }
+
     // Only recalculate and update when the buffer changes.
     if (bufferUpdated) {
       processor.initialize(input.length)

http://git-wip-us.apache.org/repos/asf/spark/blob/4eb081cc/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala
index 52e4f04..a9f3fb3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala
@@ -356,6 +356,46 @@ class SQLWindowFunctionSuite extends QueryTest with SharedSQLContext {
     spark.catalog.dropTempView("nums")
   }
 
+  test("window function: mutiple window expressions specified by range in a single expression") {
+    val nums = sparkContext.parallelize(1 to 10).map(x => (x, x % 2)).toDF("x", "y")
+    nums.createOrReplaceTempView("nums")
+    withTempView("nums") {
+      val expected =
+        Row(1, 1, 1, 4, null, 8, 25) ::
+          Row(1, 3, 4, 9, 1, 12, 24) ::
+          Row(1, 5, 9, 15, 4, 16, 21) ::
+          Row(1, 7, 16, 21, 8, 9, 16) ::
+          Row(1, 9, 25, 16, 12, null, 9) ::
+          Row(0, 2, 2, 6, null, 10, 30) ::
+          Row(0, 4, 6, 12, 2, 14, 28) ::
+          Row(0, 6, 12, 18, 6, 18, 24) ::
+          Row(0, 8, 20, 24, 10, 10, 18) ::
+          Row(0, 10, 30, 18, 14, null, 10) ::
+          Nil
+
+      val actual = sql(
+        """
+          |SELECT
+          |  y,
+          |  x,
+          |  sum(x) over w1 as history_sum,
+          |  sum(x) over w2 as period_sum1,
+          |  sum(x) over w3 as period_sum2,
+          |  sum(x) over w4 as period_sum3,
+          |  sum(x) over w5 as future_sum
+          |FROM nums
+          |WINDOW
+          |  w1 AS (PARTITION BY y ORDER BY x RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW),
+          |  w2 AS (PARTITION BY y ORDER BY x RANGE BETWEEN 2 PRECEDING AND 2 FOLLOWING),
+          |  w3 AS (PARTITION BY y ORDER BY x RANGE BETWEEN 4 PRECEDING AND 2 PRECEDING ),
+          |  w4 AS (PARTITION BY y ORDER BY x RANGE BETWEEN 2 FOLLOWING AND 4 FOLLOWING),
+          |  w5 AS (PARTITION BY y ORDER BY x RANGE BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING)
+        """.stripMargin
+      )
+      checkAnswer(actual, expected)
+    }
+  }
+
   test("SPARK-7595: Window will cause resolve failed with self join") {
     checkAnswer(sql(
       """


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org