You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Jeremy Davis <je...@speakeasy.net> on 2016/09/26 16:03:53 UTC

Sliding Window Memory use

Hi, I posted this to users, but didn’t get any responses.
I just wanted to highlight what seems like excessive memory use when using sliding windows.
I have attached a test case where starting with certainly less than 1MB of data I can OOM a 10G heap.

Regards,
-JD



--------------

import java.sql.Timestamp

import org.apache.spark.sql.SparkSession
import org.junit.Test
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._

import scala.collection.mutable.ArrayBuffer


/**
 * A Small Unit Test to demonstrate Spark Window Functions OOM
 */
class SparkTest {


  @Test
  def testWindows() {
    val sparkSession = SparkSession.builder().master("local[7]").appName("tests").getOrCreate()
    import sparkSession.implicits._

    println("Init Dataset")

    val partitions = (0 until 4)
    val entries = (0 until 6500)

    //val windows = (5 to 15 by 5) //Works
    val windows = (5 to 65 by 5)   //OOM 10G

    val testData = new ArrayBuffer[(String,Timestamp,Double)]


    for( p <- partitions) {
      for( e <- entries ) yield {
        testData += (("Key"+p,new Timestamp(60000*e),e*2.0))
      }
    }

    val ds = testData.toDF("key","datetime","value")
    ds.show()


    var resultFrame = ds
    resultFrame.schema.fields.foreach(println)


    val baseWin = Window.partitionBy("key").orderBy("datetime")
    for( win <- windows ) {
      resultFrame = resultFrame.withColumn("avg"+(-win),avg("value").over(baseWin.rowsBetween(-win,0)))
            .withColumn("stddev"+(-win),stddev("value").over(baseWin.rowsBetween(-win,0)))
            .withColumn("min"+(-win),min("value").over(baseWin.rowsBetween(-win,0)))
            .withColumn("max"+(-win),max("value").over(baseWin.rowsBetween(-win,0)))
    }
    resultFrame.show()

  }

}



Re: Sliding Window Memory use

Posted by Jeremy Davis <je...@speakeasy.net>.
Thanks Reynold, I appreciate it, that was my sense as well.
I will take a look at the community edition.

Btw, this is a profiler view a few moments before it went OOM…



-JD





> On Sep 26, 2016, at 9:49 PM, Reynold Xin <rx...@databricks.com> wrote:
> 
> I ran it on Databricks community edition which was a local[8] cluster with 6GB of RAM. It ran fine.
> 
> That said, looking at the plan, we can definitely simplify this quite a bit. We had a new Window physical execution node for each window expression, when we could have collapsed all of them into a single one.
> 
> 
> On Mon, Sep 26, 2016 at 9:03 AM, Jeremy Davis <jerdavis@speakeasy.net <ma...@speakeasy.net>> wrote:
> 
> Hi, I posted this to users, but didn’t get any responses.
> I just wanted to highlight what seems like excessive memory use when using sliding windows.
> I have attached a test case where starting with certainly less than 1MB of data I can OOM a 10G heap.
> 
> Regards,
> -JD
> 
> 
> 
> --------------
> 
> import java.sql.Timestamp
> 
> import org.apache.spark.sql.SparkSession
> import org.junit.Test
> import org.apache.spark.sql.expressions.Window
> import org.apache.spark.sql.functions._
> 
> import scala.collection.mutable.ArrayBuffer
> 
> 
> /**
>  * A Small Unit Test to demonstrate Spark Window Functions OOM
>  */
> class SparkTest {
> 
> 
>   @Test
>   def testWindows() {
>     val sparkSession = SparkSession.builder().master("local[7]").appName("tests").getOrCreate()
>     import sparkSession.implicits._
> 
>     println("Init Dataset")
> 
>     val partitions = (0 until 4)
>     val entries = (0 until 6500)
> 
>     //val windows = (5 to 15 by 5) //Works
>     val windows = (5 to 65 by 5)   //OOM 10G
> 
>     val testData = new ArrayBuffer[(String,Timestamp,Double)]
> 
> 
>     for( p <- partitions) {
>       for( e <- entries ) yield {
>         testData += (("Key"+p,new Timestamp(60000*e),e*2.0))
>       }
>     }
> 
>     val ds = testData.toDF("key","datetime","value")
>     ds.show()
> 
> 
>     var resultFrame = ds
>     resultFrame.schema.fields.foreach(println)
> 
> 
>     val baseWin = Window.partitionBy("key").orderBy("datetime")
>     for( win <- windows ) {
>       resultFrame = resultFrame.withColumn("avg"+(-win),avg("value").over(baseWin.rowsBetween(-win,0)))
>             .withColumn("stddev"+(-win),stddev("value").over(baseWin.rowsBetween(-win,0)))
>             .withColumn("min"+(-win),min("value").over(baseWin.rowsBetween(-win,0)))
>             .withColumn("max"+(-win),max("value").over(baseWin.rowsBetween(-win,0)))
>     }
>     resultFrame.show()
> 
>   }
> 
> }
> 
> 
> 


Re: Sliding Window Memory use

Posted by Reynold Xin <rx...@databricks.com>.
I ran it on Databricks community edition which was a local[8] cluster with
6GB of RAM. It ran fine.

That said, looking at the plan, we can definitely simplify this quite a
bit. We had a new Window physical execution node for each window
expression, when we could have collapsed all of them into a single one.


On Mon, Sep 26, 2016 at 9:03 AM, Jeremy Davis <je...@speakeasy.net>
wrote:

>
> Hi, I posted this to users, but didn’t get any responses.
> I just wanted to highlight what seems like excessive memory use when using
> sliding windows.
> I have attached a test case where starting with certainly less than 1MB of
> data I can OOM a 10G heap.
>
> Regards,
> -JD
>
>
>
> --------------
>
>
> import java.sql.Timestamp
>
> import org.apache.spark.sql.SparkSession
> import org.junit.Test
> import org.apache.spark.sql.expressions.Window
> import org.apache.spark.sql.functions._
>
> import scala.collection.mutable.ArrayBuffer
>
>
> /**
>  * A Small Unit Test to demonstrate Spark Window Functions OOM
>  */
> class SparkTest {
>
>
>   @Test
>   def testWindows() {
>     val sparkSession = SparkSession.builder().master("local[7]").appName("tests").getOrCreate()
>     import sparkSession.implicits._
>
>     println("Init Dataset")
>
>     val partitions = (0 until 4)
>     val entries = (0 until 6500)
>
>     //val windows = (5 to 15 by 5) //Works
>     val windows = (5 to 65 by 5)   //OOM 10G
>
>     val testData = new ArrayBuffer[(String,Timestamp,Double)]
>
>
>     for( p <- partitions) {
>       for( e <- entries ) yield {
>         testData += (("Key"+p,new Timestamp(60000*e),e*2.0))
>       }
>     }
>
>     val ds = testData.toDF("key","datetime","value")
>     ds.show()
>
>
>     var resultFrame = ds
>     resultFrame.schema.fields.foreach(println)
>
>
>     val baseWin = Window.partitionBy("key").orderBy("datetime")
>     for( win <- windows ) {
>       resultFrame = resultFrame.withColumn("avg"+(-win),avg("value").over(baseWin.rowsBetween(-win,0)))
>             .withColumn("stddev"+(-win),stddev("value").over(baseWin.rowsBetween(-win,0)))
>             .withColumn("min"+(-win),min("value").over(baseWin.rowsBetween(-win,0)))
>             .withColumn("max"+(-win),max("value").over(baseWin.rowsBetween(-win,0)))
>     }
>     resultFrame.show()
>
>   }
>
> }
>
>
>
>