You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Jeremy Davis <je...@speakeasy.net> on 2016/09/21 05:26:53 UTC
OutOfMemory while calculating window functions
Hello all,
I ran in to a weird OOM issue when using the sliding windows. (Spark 2.0, Scala 2.11.7, Java 1.8.0_11, OSX 10.10.5)
I’m using the Dataframe API and calling various:
Window.partitionBy(...).orderBy(...).rowsBetween(…) etc.
with just 4 types of aggregations:(avg,min,max,stddev), … avg.over(window) etc..
All parameterized over several window sizes (-3,-4,-5,-8,-9,-13,-18,-20,-21,-34,-55)
-3 meaning (-3,0)
I’m using a Timestamp as the order column, and aggregating over a Double.
In a given partition, there are only around 6500 samples of data that are being aggregated. For some reason I hit some sort of non-linear memory use around 5000 samples per partition (Perhaps doubling an array somewhere?).
I shrank the dataset down to just 4 partitions, but I still OOM a 10G heap while running in Local Mode. It all seems odd when my input is on the order of a just a few MB for this test case.
I’m wondering if this sounds like expected behavior? Seems like my use case is reasonable.
Also, it will OOM when I run multithreaded with 2 threads and up, but seems to work single threaded (“local[1]”).
I will try to put together a simple repro case tomorrow.
Attached is a Yourkit Screen Shot. I suspect the long[] arrays double once more before OOM.
Re: OutOfMemory while calculating window functions
Posted by Jeremy Davis <je...@speakeasy.net>.
Here is a unit test that will OOM a 10G heap
--------------
import java.sql.Timestamp
import org.apache.spark.sql.SparkSession
import org.junit.Test
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import scala.collection.mutable.ArrayBuffer
/**
* A Small Unit Test to demonstrate Spark Window Functions OOM
*/
class SparkTest {
@Test
def testWindows() {
val sparkSession = SparkSession.builder().master("local[7]").appName("tests").getOrCreate()
import sparkSession.implicits._
println("Init Dataset")
val partitions = (0 until 4)
val entries = (0 until 6500)
//val windows = (5 to 15 by 5) //Works
val windows = (5 to 65 by 5) //OOM 10G
val testData = new ArrayBuffer[(String,Timestamp,Double)]
for( p <- partitions) {
for( e <- entries ) yield {
testData += (("Key"+p,new Timestamp(60000*e),e*2.0))
}
}
val ds = testData.toDF("key","datetime","value")
ds.show()
var resultFrame = ds
resultFrame.schema.fields.foreach(println)
val baseWin = Window.partitionBy("key").orderBy("datetime")
for( win <- windows ) {
resultFrame = resultFrame.withColumn("avg"+(-win),avg("value").over(baseWin.rowsBetween(-win,0)))
.withColumn("stddev"+(-win),stddev("value").over(baseWin.rowsBetween(-win,0)))
.withColumn("min"+(-win),min("value").over(baseWin.rowsBetween(-win,0)))
.withColumn("max"+(-win),max("value").over(baseWin.rowsBetween(-win,0)))
}
resultFrame.show()
}
}
> On Sep 20, 2016, at 10:26 PM, Jeremy Davis <je...@speakeasy.net> wrote:
>
> Hello all,
> I ran in to a weird OOM issue when using the sliding windows. (Spark 2.0, Scala 2.11.7, Java 1.8.0_11, OSX 10.10.5)
> I’m using the Dataframe API and calling various:
> Window.partitionBy(...).orderBy(...).rowsBetween(…) etc.
> with just 4 types of aggregations:(avg,min,max,stddev), … avg.over(window) etc..
> All parameterized over several window sizes (-3,-4,-5,-8,-9,-13,-18,-20,-21,-34,-55)
> -3 meaning (-3,0)
>
> I’m using a Timestamp as the order column, and aggregating over a Double.
> In a given partition, there are only around 6500 samples of data that are being aggregated. For some reason I hit some sort of non-linear memory use around 5000 samples per partition (Perhaps doubling an array somewhere?).
> I shrank the dataset down to just 4 partitions, but I still OOM a 10G heap while running in Local Mode. It all seems odd when my input is on the order of a just a few MB for this test case.
>
> I’m wondering if this sounds like expected behavior? Seems like my use case is reasonable.
>
> Also, it will OOM when I run multithreaded with 2 threads and up, but seems to work single threaded (“local[1]”).
>
> I will try to put together a simple repro case tomorrow.
>
>
> Attached is a Yourkit Screen Shot. I suspect the long[] arrays double once more before OOM.
>
> <windowfuncs.jpg>