You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Jeremy Davis <je...@speakeasy.net> on 2016/09/26 16:03:53 UTC
Sliding Window Memory use
Hi, I posted this to users, but didn’t get any responses.
I just wanted to highlight what seems like excessive memory use when using sliding windows.
I have attached a test case where starting with certainly less than 1MB of data I can OOM a 10G heap.
Regards,
-JD
--------------
import java.sql.Timestamp
import org.apache.spark.sql.SparkSession
import org.junit.Test
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import scala.collection.mutable.ArrayBuffer
/**
* A Small Unit Test to demonstrate Spark Window Functions OOM
*/
class SparkTest {
@Test
def testWindows() {
val sparkSession = SparkSession.builder().master("local[7]").appName("tests").getOrCreate()
import sparkSession.implicits._
println("Init Dataset")
val partitions = (0 until 4)
val entries = (0 until 6500)
//val windows = (5 to 15 by 5) //Works
val windows = (5 to 65 by 5) //OOM 10G
val testData = new ArrayBuffer[(String,Timestamp,Double)]
for( p <- partitions) {
for( e <- entries ) yield {
testData += (("Key"+p,new Timestamp(60000*e),e*2.0))
}
}
val ds = testData.toDF("key","datetime","value")
ds.show()
var resultFrame = ds
resultFrame.schema.fields.foreach(println)
val baseWin = Window.partitionBy("key").orderBy("datetime")
for( win <- windows ) {
resultFrame = resultFrame.withColumn("avg"+(-win),avg("value").over(baseWin.rowsBetween(-win,0)))
.withColumn("stddev"+(-win),stddev("value").over(baseWin.rowsBetween(-win,0)))
.withColumn("min"+(-win),min("value").over(baseWin.rowsBetween(-win,0)))
.withColumn("max"+(-win),max("value").over(baseWin.rowsBetween(-win,0)))
}
resultFrame.show()
}
}
Re: Sliding Window Memory use
Posted by Jeremy Davis <je...@speakeasy.net>.
Thanks Reynold, I appreciate it, that was my sense as well.
I will take a look at the community edition.
Btw, this is a profiler view a few moments before it went OOM…
-JD
> On Sep 26, 2016, at 9:49 PM, Reynold Xin <rx...@databricks.com> wrote:
>
> I ran it on Databricks community edition which was a local[8] cluster with 6GB of RAM. It ran fine.
>
> That said, looking at the plan, we can definitely simplify this quite a bit. We had a new Window physical execution node for each window expression, when we could have collapsed all of them into a single one.
>
>
> On Mon, Sep 26, 2016 at 9:03 AM, Jeremy Davis <jerdavis@speakeasy.net <ma...@speakeasy.net>> wrote:
>
> Hi, I posted this to users, but didn’t get any responses.
> I just wanted to highlight what seems like excessive memory use when using sliding windows.
> I have attached a test case where starting with certainly less than 1MB of data I can OOM a 10G heap.
>
> Regards,
> -JD
>
>
>
> --------------
>
> import java.sql.Timestamp
>
> import org.apache.spark.sql.SparkSession
> import org.junit.Test
> import org.apache.spark.sql.expressions.Window
> import org.apache.spark.sql.functions._
>
> import scala.collection.mutable.ArrayBuffer
>
>
> /**
> * A Small Unit Test to demonstrate Spark Window Functions OOM
> */
> class SparkTest {
>
>
> @Test
> def testWindows() {
> val sparkSession = SparkSession.builder().master("local[7]").appName("tests").getOrCreate()
> import sparkSession.implicits._
>
> println("Init Dataset")
>
> val partitions = (0 until 4)
> val entries = (0 until 6500)
>
> //val windows = (5 to 15 by 5) //Works
> val windows = (5 to 65 by 5) //OOM 10G
>
> val testData = new ArrayBuffer[(String,Timestamp,Double)]
>
>
> for( p <- partitions) {
> for( e <- entries ) yield {
> testData += (("Key"+p,new Timestamp(60000*e),e*2.0))
> }
> }
>
> val ds = testData.toDF("key","datetime","value")
> ds.show()
>
>
> var resultFrame = ds
> resultFrame.schema.fields.foreach(println)
>
>
> val baseWin = Window.partitionBy("key").orderBy("datetime")
> for( win <- windows ) {
> resultFrame = resultFrame.withColumn("avg"+(-win),avg("value").over(baseWin.rowsBetween(-win,0)))
> .withColumn("stddev"+(-win),stddev("value").over(baseWin.rowsBetween(-win,0)))
> .withColumn("min"+(-win),min("value").over(baseWin.rowsBetween(-win,0)))
> .withColumn("max"+(-win),max("value").over(baseWin.rowsBetween(-win,0)))
> }
> resultFrame.show()
>
> }
>
> }
>
>
>
Re: Sliding Window Memory use
Posted by Reynold Xin <rx...@databricks.com>.
I ran it on Databricks community edition which was a local[8] cluster with
6GB of RAM. It ran fine.
That said, looking at the plan, we can definitely simplify this quite a
bit. We had a new Window physical execution node for each window
expression, when we could have collapsed all of them into a single one.
On Mon, Sep 26, 2016 at 9:03 AM, Jeremy Davis <je...@speakeasy.net>
wrote:
>
> Hi, I posted this to users, but didn’t get any responses.
> I just wanted to highlight what seems like excessive memory use when using
> sliding windows.
> I have attached a test case where starting with certainly less than 1MB of
> data I can OOM a 10G heap.
>
> Regards,
> -JD
>
>
>
> --------------
>
>
> import java.sql.Timestamp
>
> import org.apache.spark.sql.SparkSession
> import org.junit.Test
> import org.apache.spark.sql.expressions.Window
> import org.apache.spark.sql.functions._
>
> import scala.collection.mutable.ArrayBuffer
>
>
> /**
> * A Small Unit Test to demonstrate Spark Window Functions OOM
> */
> class SparkTest {
>
>
> @Test
> def testWindows() {
> val sparkSession = SparkSession.builder().master("local[7]").appName("tests").getOrCreate()
> import sparkSession.implicits._
>
> println("Init Dataset")
>
> val partitions = (0 until 4)
> val entries = (0 until 6500)
>
> //val windows = (5 to 15 by 5) //Works
> val windows = (5 to 65 by 5) //OOM 10G
>
> val testData = new ArrayBuffer[(String,Timestamp,Double)]
>
>
> for( p <- partitions) {
> for( e <- entries ) yield {
> testData += (("Key"+p,new Timestamp(60000*e),e*2.0))
> }
> }
>
> val ds = testData.toDF("key","datetime","value")
> ds.show()
>
>
> var resultFrame = ds
> resultFrame.schema.fields.foreach(println)
>
>
> val baseWin = Window.partitionBy("key").orderBy("datetime")
> for( win <- windows ) {
> resultFrame = resultFrame.withColumn("avg"+(-win),avg("value").over(baseWin.rowsBetween(-win,0)))
> .withColumn("stddev"+(-win),stddev("value").over(baseWin.rowsBetween(-win,0)))
> .withColumn("min"+(-win),min("value").over(baseWin.rowsBetween(-win,0)))
> .withColumn("max"+(-win),max("value").over(baseWin.rowsBetween(-win,0)))
> }
> resultFrame.show()
>
> }
>
> }
>
>
>
>