You are viewing a plain text version of this content. The canonical link for it is here.
Posted to by Jeremy Davis <> on 2016/09/21 05:26:53 UTC

OutOfMemory while calculating window functions

 Hello all,
I ran in to a weird OOM issue when using the sliding windows. (Spark 2.0, Scala 2.11.7, Java 1.8.0_11, OSX 10.10.5)
I’m using the Dataframe API and calling various:
Window.partitionBy(...).orderBy(...).rowsBetween(…) etc.
with just 4 types of aggregations:(avg,min,max,stddev),  … avg.over(window)  etc..
All parameterized over several window sizes (-3,-4,-5,-8,-9,-13,-18,-20,-21,-34,-55)
-3 meaning (-3,0)

I’m using a Timestamp as the order column, and aggregating over a Double.
In a given partition, there are only around 6500 samples of data that are being aggregated. For some reason I hit some sort of non-linear memory use around 5000 samples per partition (Perhaps doubling an array somewhere?).
I shrank the dataset down to just 4 partitions, but I still OOM a 10G heap while running in Local Mode. It all seems odd when my input is on the order of a just a few MB for this test case.

I’m wondering if this sounds like expected behavior? Seems like my use case is reasonable.

Also, it will OOM when I run multithreaded with 2 threads and up, but seems to work single threaded  (“local[1]”).

I will try to put together a simple repro case tomorrow.

Attached is a Yourkit Screen Shot. I suspect the long[] arrays double once more before OOM.

Re: OutOfMemory while calculating window functions

Posted by Jeremy Davis <>.
Here is a unit test that will OOM a 10G heap

import java.sql.Timestamp

import org.apache.spark.sql.SparkSession
import org.junit.Test
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._

import scala.collection.mutable.ArrayBuffer

 * A Small Unit Test to demonstrate Spark Window Functions OOM
class SparkTest {

  def testWindows() {
    val sparkSession = SparkSession.builder().master("local[7]").appName("tests").getOrCreate()
    import sparkSession.implicits._

    println("Init Dataset")

    val partitions = (0 until 4)
    val entries = (0 until 6500)

    //val windows = (5 to 15 by 5) //Works
    val windows = (5 to 65 by 5)   //OOM 10G

    val testData = new ArrayBuffer[(String,Timestamp,Double)]

    for( p <- partitions) {
      for( e <- entries ) yield {
        testData += (("Key"+p,new Timestamp(60000*e),e*2.0))

    val ds = testData.toDF("key","datetime","value")

    var resultFrame = ds

    val baseWin = Window.partitionBy("key").orderBy("datetime")
    for( win <- windows ) {
      resultFrame = resultFrame.withColumn("avg"+(-win),avg("value").over(baseWin.rowsBetween(-win,0)))



> On Sep 20, 2016, at 10:26 PM, Jeremy Davis <> wrote:
>  Hello all,
> I ran in to a weird OOM issue when using the sliding windows. (Spark 2.0, Scala 2.11.7, Java 1.8.0_11, OSX 10.10.5)
> I’m using the Dataframe API and calling various:
> Window.partitionBy(...).orderBy(...).rowsBetween(…) etc.
> with just 4 types of aggregations:(avg,min,max,stddev),  … avg.over(window)  etc..
> All parameterized over several window sizes (-3,-4,-5,-8,-9,-13,-18,-20,-21,-34,-55)
> -3 meaning (-3,0)
> I’m using a Timestamp as the order column, and aggregating over a Double.
> In a given partition, there are only around 6500 samples of data that are being aggregated. For some reason I hit some sort of non-linear memory use around 5000 samples per partition (Perhaps doubling an array somewhere?).
> I shrank the dataset down to just 4 partitions, but I still OOM a 10G heap while running in Local Mode. It all seems odd when my input is on the order of a just a few MB for this test case.
> I’m wondering if this sounds like expected behavior? Seems like my use case is reasonable.
> Also, it will OOM when I run multithreaded with 2 threads and up, but seems to work single threaded  (“local[1]”).
> I will try to put together a simple repro case tomorrow.
> Attached is a Yourkit Screen Shot. I suspect the long[] arrays double once more before OOM.
> <windowfuncs.jpg>