You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Anand Nalya <an...@gmail.com> on 2015/07/09 11:48:09 UTC

Breaking lineage and reducing stages in Spark Streaming

Hi,

I've an application in which an rdd is being updated with tuples coming
from RDDs in a DStream with following pattern.

dstream.foreachRDD(rdd => {
  myRDD = myRDD.union(rdd.filter(myfilter)).reduceByKey(_+_)
})

I'm using cache() and checkpointin to cache results. Over the time, the
lineage of myRDD keeps increasing and stages in each batch of dstream keeps
increasing, even though all the earlier stages are skipped. When the number
of stages grow big enough, the overall delay due to scheduling delay starts
increasing. The processing time for each batch is still fixed.

Following figures illustrate the problem:

Job execution: https://i.imgur.com/GVHeXH3.png?1


Delays: https://i.imgur.com/1DZHydw.png?1


Is there some pattern that I can use to avoid this?

Regards,
Anand

Re: Breaking lineage and reducing stages in Spark Streaming

Posted by Anand Nalya <an...@gmail.com>.
Thanks for the help Dean/TD,

I was able to cut the lineage with checkpointing with following code:

dstream.countByValue().foreachRDD((rdd, time) => {
    val joined = rdd.union(current).reduceByKey(_+_, 2).leftOuterJoin(base)
    val toUpdate = joined.filter(myfilter).map(mymap)
    val toNotUpdate = joined.filter(mynotfilter).map(mymap)

    base = base.union(toUpdate).reduceByKey(_+_, 2)
    current = toNotUpdate

    if(time.isMultipleOf(duration)){
      base.checkpoint()
      current.checkpoint()
    }
    println(toUpdate.count()) // to persistence
  })

Thanks,
Anand

On 10 July 2015 at 02:16, Tathagata Das <td...@databricks.com> wrote:

> Summarizing the main problems discussed by Dean
>
> 1. If you have an infinitely growing lineage, bad things will eventually
> happen. You HAVE TO periodically (say every 10th batch), checkpoint the
> information.
>
> 2. Unpersist the previous `current` RDD ONLY AFTER running an action on
> the `newCurrent`. Otherwise you are throwing current out of the cache
> before newCurrent has been computed. Modifying Dean's example.
>
> val newCurrent = rdd.union(current).reduceByKey(_+_)
> ...
> // join with newCurrent
> // collect or count or any action that uses newCurrent
> //
>
> // Now you can unpersist because the newCurrent has been persisted and
> wont require falling back to this cached current RDD.
> current.unpersist()
>
>
> On Thu, Jul 9, 2015 at 6:36 AM, Dean Wampler <de...@gmail.com>
> wrote:
>
>> I think you're complicating the cache behavior by aggressively re-using
>> vars when temporary vals would be more straightforward. For example,
>> newBase = newBase.unpersist()... effectively means that newBase's data is
>> not actually cached when the subsequent .union(...) is performed, so it
>> probably goes back to the lineage... Same with the current.unpersist logic
>> before it.
>>
>> Names are cheap, so just use local vals:
>>
>> val newCurrent = rdd.union(current).reduceByKey(_+_)
>> current.unpersist()
>>
>> Also, what happens if you omit the "2" argument for the number of
>> partitions in reduceByKey?
>>
>> Other minor points:
>>
>> I would change the joined, toUpdate, toNotUpdate logic to this:
>>
>> val joined = current.leftOuterJoin(newBase).map(mymap).cache()
>>
>> val toUpdate = joined.filter(myfilter).cache()
>> val toNotUpdate = joined.filter(mynotfilter).cache()
>>
>>
>> Maybe it's just for this email example, but you don't need to call
>> collect on toUpdate before using foreach(println). If the RDD is huge, you
>> definitely don't want to do that.
>>
>> Hope this helps.
>>
>> dean
>>
>> Dean Wampler, Ph.D.
>> Author: Programming Scala, 2nd Edition
>> <http://shop.oreilly.com/product/0636920033073.do> (O'Reilly)
>> Typesafe <http://typesafe.com>
>> @deanwampler <http://twitter.com/deanwampler>
>> http://polyglotprogramming.com
>>
>> On Thu, Jul 9, 2015 at 8:06 AM, Anand Nalya <an...@gmail.com>
>> wrote:
>>
>>> Yes, myRDD is outside of DStream. Following is the actual code where newBase
>>> and current are the rdds being updated with each batch:
>>>
>>>   val base = sc.textFile...
>>>   var newBase = base.cache()
>>>
>>>   val dstream: DStream[String] = ssc.textFileStream...
>>>   var current: RDD[(String, Long)] = sc.emptyRDD.cache()
>>>
>>>   dstream.countByValue().checkpoint(Seconds(120)).foreachRDD(rdd => {
>>>
>>>     current = rdd.union(current.unpersist()).reduceByKey(_+_, 2)
>>>
>>>     val joined = current.leftOuterJoin(newBase).cache()
>>>     val toUpdate = joined.filter(myfilter).map(mymap).cache()
>>>     val toNotUpdate = joined.filter(mynotfilter).map(mymap).cache()
>>>
>>>     toUpdate.collect().foreach(println) // this goes to some store
>>>
>>>     newBase = newBase.unpersist().union(toUpdate).reduceByKey(_+_,
>>> 2).cache()
>>>
>>>     current = toNotUpdate.cache()
>>>
>>>     toUpdate.unpersist()
>>>     joined.unpersist()
>>>     rdd.unpersist()
>>>   })
>>>
>>>
>>> Regards,
>>>
>>> Anand
>>>
>>>
>>> On 9 July 2015 at 18:16, Dean Wampler <de...@gmail.com> wrote:
>>>
>>>> Is myRDD outside a DStream? If so are you persisting on each batch
>>>> iteration? It should be checkpointed frequently too.
>>>>
>>>> Dean Wampler, Ph.D.
>>>> Author: Programming Scala, 2nd Edition
>>>> <http://shop.oreilly.com/product/0636920033073.do> (O'Reilly)
>>>> Typesafe <http://typesafe.com>
>>>> @deanwampler <http://twitter.com/deanwampler>
>>>> http://polyglotprogramming.com
>>>>
>>>> On Thu, Jul 9, 2015 at 5:56 AM, Anand Nalya <an...@gmail.com>
>>>> wrote:
>>>>
>>>>> The data coming from dstream have the same keys that are in myRDD, so
>>>>> the reduceByKey after union keeps the overall tuple count in myRDD
>>>>> fixed. Or even with fixed tuple count, it will keep consuming more
>>>>> resources?
>>>>>
>>>>> On 9 July 2015 at 16:19, Tathagata Das <td...@databricks.com> wrote:
>>>>>
>>>>>> If you are continuously unioning RDDs, then you are accumulating ever
>>>>>> increasing data, and you are processing ever increasing amount of data in
>>>>>> every batch. Obviously this is going to not last for very long. You
>>>>>> fundamentally cannot keep processing ever increasing amount of data with
>>>>>> finite resources, isnt it?
>>>>>>
>>>>>> On Thu, Jul 9, 2015 at 3:17 AM, Anand Nalya <an...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Thats from the Streaming tab for Spark 1.4 WebUI.
>>>>>>>
>>>>>>> On 9 July 2015 at 15:35, Michel Hubert <mi...@vsnsystemen.nl>
>>>>>>> wrote:
>>>>>>>
>>>>>>>>  Hi,
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> I was just wondering how you generated to second image with the
>>>>>>>> charts.
>>>>>>>>
>>>>>>>> What product?
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> *From:* Anand Nalya [mailto:anand.nalya@gmail.com]
>>>>>>>> *Sent:* donderdag 9 juli 2015 11:48
>>>>>>>> *To:* spark users
>>>>>>>> *Subject:* Breaking lineage and reducing stages in Spark Streaming
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> I've an application in which an rdd is being updated with tuples
>>>>>>>> coming from RDDs in a DStream with following pattern.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> dstream.foreachRDD(rdd => {
>>>>>>>>
>>>>>>>>   myRDD = myRDD.union(rdd.filter(myfilter)).reduceByKey(_+_)
>>>>>>>>
>>>>>>>> })
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> I'm using cache() and checkpointin to cache results. Over the time,
>>>>>>>> the lineage of myRDD keeps increasing and stages in each batch of dstream
>>>>>>>> keeps increasing, even though all the earlier stages are skipped. When the
>>>>>>>> number of stages grow big enough, the overall delay due to scheduling delay
>>>>>>>> starts increasing. The processing time for each batch is still fixed.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> Following figures illustrate the problem:
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> Job execution: https://i.imgur.com/GVHeXH3.png?1
>>>>>>>>
>>>>>>>> [image: Image removed by sender.]
>>>>>>>>
>>>>>>>> Delays: https://i.imgur.com/1DZHydw.png?1
>>>>>>>>
>>>>>>>> [image: Image removed by sender.]
>>>>>>>>
>>>>>>>> Is there some pattern that I can use to avoid this?
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> Regards,
>>>>>>>>
>>>>>>>> Anand
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Breaking lineage and reducing stages in Spark Streaming

Posted by Tathagata Das <td...@databricks.com>.
Summarizing the main problems discussed by Dean

1. If you have an infinitely growing lineage, bad things will eventually
happen. You HAVE TO periodically (say every 10th batch), checkpoint the
information.

2. Unpersist the previous `current` RDD ONLY AFTER running an action on the
`newCurrent`. Otherwise you are throwing current out of the cache before
newCurrent has been computed. Modifying Dean's example.

val newCurrent = rdd.union(current).reduceByKey(_+_)
...
// join with newCurrent
// collect or count or any action that uses newCurrent
//

// Now you can unpersist because the newCurrent has been persisted and wont
require falling back to this cached current RDD.
current.unpersist()


On Thu, Jul 9, 2015 at 6:36 AM, Dean Wampler <de...@gmail.com> wrote:

> I think you're complicating the cache behavior by aggressively re-using
> vars when temporary vals would be more straightforward. For example,
> newBase = newBase.unpersist()... effectively means that newBase's data is
> not actually cached when the subsequent .union(...) is performed, so it
> probably goes back to the lineage... Same with the current.unpersist logic
> before it.
>
> Names are cheap, so just use local vals:
>
> val newCurrent = rdd.union(current).reduceByKey(_+_)
> current.unpersist()
>
> Also, what happens if you omit the "2" argument for the number of
> partitions in reduceByKey?
>
> Other minor points:
>
> I would change the joined, toUpdate, toNotUpdate logic to this:
>
> val joined = current.leftOuterJoin(newBase).map(mymap).cache()
>
> val toUpdate = joined.filter(myfilter).cache()
> val toNotUpdate = joined.filter(mynotfilter).cache()
>
>
> Maybe it's just for this email example, but you don't need to call collect
> on toUpdate before using foreach(println). If the RDD is huge, you
> definitely don't want to do that.
>
> Hope this helps.
>
> dean
>
> Dean Wampler, Ph.D.
> Author: Programming Scala, 2nd Edition
> <http://shop.oreilly.com/product/0636920033073.do> (O'Reilly)
> Typesafe <http://typesafe.com>
> @deanwampler <http://twitter.com/deanwampler>
> http://polyglotprogramming.com
>
> On Thu, Jul 9, 2015 at 8:06 AM, Anand Nalya <an...@gmail.com> wrote:
>
>> Yes, myRDD is outside of DStream. Following is the actual code where newBase
>> and current are the rdds being updated with each batch:
>>
>>   val base = sc.textFile...
>>   var newBase = base.cache()
>>
>>   val dstream: DStream[String] = ssc.textFileStream...
>>   var current: RDD[(String, Long)] = sc.emptyRDD.cache()
>>
>>   dstream.countByValue().checkpoint(Seconds(120)).foreachRDD(rdd => {
>>
>>     current = rdd.union(current.unpersist()).reduceByKey(_+_, 2)
>>
>>     val joined = current.leftOuterJoin(newBase).cache()
>>     val toUpdate = joined.filter(myfilter).map(mymap).cache()
>>     val toNotUpdate = joined.filter(mynotfilter).map(mymap).cache()
>>
>>     toUpdate.collect().foreach(println) // this goes to some store
>>
>>     newBase = newBase.unpersist().union(toUpdate).reduceByKey(_+_,
>> 2).cache()
>>
>>     current = toNotUpdate.cache()
>>
>>     toUpdate.unpersist()
>>     joined.unpersist()
>>     rdd.unpersist()
>>   })
>>
>>
>> Regards,
>>
>> Anand
>>
>>
>> On 9 July 2015 at 18:16, Dean Wampler <de...@gmail.com> wrote:
>>
>>> Is myRDD outside a DStream? If so are you persisting on each batch
>>> iteration? It should be checkpointed frequently too.
>>>
>>> Dean Wampler, Ph.D.
>>> Author: Programming Scala, 2nd Edition
>>> <http://shop.oreilly.com/product/0636920033073.do> (O'Reilly)
>>> Typesafe <http://typesafe.com>
>>> @deanwampler <http://twitter.com/deanwampler>
>>> http://polyglotprogramming.com
>>>
>>> On Thu, Jul 9, 2015 at 5:56 AM, Anand Nalya <an...@gmail.com>
>>> wrote:
>>>
>>>> The data coming from dstream have the same keys that are in myRDD, so
>>>> the reduceByKey after union keeps the overall tuple count in myRDD
>>>> fixed. Or even with fixed tuple count, it will keep consuming more
>>>> resources?
>>>>
>>>> On 9 July 2015 at 16:19, Tathagata Das <td...@databricks.com> wrote:
>>>>
>>>>> If you are continuously unioning RDDs, then you are accumulating ever
>>>>> increasing data, and you are processing ever increasing amount of data in
>>>>> every batch. Obviously this is going to not last for very long. You
>>>>> fundamentally cannot keep processing ever increasing amount of data with
>>>>> finite resources, isnt it?
>>>>>
>>>>> On Thu, Jul 9, 2015 at 3:17 AM, Anand Nalya <an...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Thats from the Streaming tab for Spark 1.4 WebUI.
>>>>>>
>>>>>> On 9 July 2015 at 15:35, Michel Hubert <mi...@vsnsystemen.nl>
>>>>>> wrote:
>>>>>>
>>>>>>>  Hi,
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> I was just wondering how you generated to second image with the
>>>>>>> charts.
>>>>>>>
>>>>>>> What product?
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> *From:* Anand Nalya [mailto:anand.nalya@gmail.com]
>>>>>>> *Sent:* donderdag 9 juli 2015 11:48
>>>>>>> *To:* spark users
>>>>>>> *Subject:* Breaking lineage and reducing stages in Spark Streaming
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> I've an application in which an rdd is being updated with tuples
>>>>>>> coming from RDDs in a DStream with following pattern.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> dstream.foreachRDD(rdd => {
>>>>>>>
>>>>>>>   myRDD = myRDD.union(rdd.filter(myfilter)).reduceByKey(_+_)
>>>>>>>
>>>>>>> })
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> I'm using cache() and checkpointin to cache results. Over the time,
>>>>>>> the lineage of myRDD keeps increasing and stages in each batch of dstream
>>>>>>> keeps increasing, even though all the earlier stages are skipped. When the
>>>>>>> number of stages grow big enough, the overall delay due to scheduling delay
>>>>>>> starts increasing. The processing time for each batch is still fixed.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Following figures illustrate the problem:
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Job execution: https://i.imgur.com/GVHeXH3.png?1
>>>>>>>
>>>>>>> [image: Image removed by sender.]
>>>>>>>
>>>>>>> Delays: https://i.imgur.com/1DZHydw.png?1
>>>>>>>
>>>>>>> [image: Image removed by sender.]
>>>>>>>
>>>>>>> Is there some pattern that I can use to avoid this?
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Regards,
>>>>>>>
>>>>>>> Anand
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Breaking lineage and reducing stages in Spark Streaming

Posted by Dean Wampler <de...@gmail.com>.
I think you're complicating the cache behavior by aggressively re-using
vars when temporary vals would be more straightforward. For example,
newBase = newBase.unpersist()... effectively means that newBase's data is
not actually cached when the subsequent .union(...) is performed, so it
probably goes back to the lineage... Same with the current.unpersist logic
before it.

Names are cheap, so just use local vals:

val newCurrent = rdd.union(current).reduceByKey(_+_)
current.unpersist()

Also, what happens if you omit the "2" argument for the number of
partitions in reduceByKey?

Other minor points:

I would change the joined, toUpdate, toNotUpdate logic to this:

val joined = current.leftOuterJoin(newBase).map(mymap).cache()

val toUpdate = joined.filter(myfilter).cache()
val toNotUpdate = joined.filter(mynotfilter).cache()


Maybe it's just for this email example, but you don't need to call collect
on toUpdate before using foreach(println). If the RDD is huge, you
definitely don't want to do that.

Hope this helps.

dean

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
<http://shop.oreilly.com/product/0636920033073.do> (O'Reilly)
Typesafe <http://typesafe.com>
@deanwampler <http://twitter.com/deanwampler>
http://polyglotprogramming.com

On Thu, Jul 9, 2015 at 8:06 AM, Anand Nalya <an...@gmail.com> wrote:

> Yes, myRDD is outside of DStream. Following is the actual code where newBase
> and current are the rdds being updated with each batch:
>
>   val base = sc.textFile...
>   var newBase = base.cache()
>
>   val dstream: DStream[String] = ssc.textFileStream...
>   var current: RDD[(String, Long)] = sc.emptyRDD.cache()
>
>   dstream.countByValue().checkpoint(Seconds(120)).foreachRDD(rdd => {
>
>     current = rdd.union(current.unpersist()).reduceByKey(_+_, 2)
>
>     val joined = current.leftOuterJoin(newBase).cache()
>     val toUpdate = joined.filter(myfilter).map(mymap).cache()
>     val toNotUpdate = joined.filter(mynotfilter).map(mymap).cache()
>
>     toUpdate.collect().foreach(println) // this goes to some store
>
>     newBase = newBase.unpersist().union(toUpdate).reduceByKey(_+_,
> 2).cache()
>
>     current = toNotUpdate.cache()
>
>     toUpdate.unpersist()
>     joined.unpersist()
>     rdd.unpersist()
>   })
>
>
> Regards,
>
> Anand
>
>
> On 9 July 2015 at 18:16, Dean Wampler <de...@gmail.com> wrote:
>
>> Is myRDD outside a DStream? If so are you persisting on each batch
>> iteration? It should be checkpointed frequently too.
>>
>> Dean Wampler, Ph.D.
>> Author: Programming Scala, 2nd Edition
>> <http://shop.oreilly.com/product/0636920033073.do> (O'Reilly)
>> Typesafe <http://typesafe.com>
>> @deanwampler <http://twitter.com/deanwampler>
>> http://polyglotprogramming.com
>>
>> On Thu, Jul 9, 2015 at 5:56 AM, Anand Nalya <an...@gmail.com>
>> wrote:
>>
>>> The data coming from dstream have the same keys that are in myRDD, so
>>> the reduceByKey after union keeps the overall tuple count in myRDD
>>> fixed. Or even with fixed tuple count, it will keep consuming more
>>> resources?
>>>
>>> On 9 July 2015 at 16:19, Tathagata Das <td...@databricks.com> wrote:
>>>
>>>> If you are continuously unioning RDDs, then you are accumulating ever
>>>> increasing data, and you are processing ever increasing amount of data in
>>>> every batch. Obviously this is going to not last for very long. You
>>>> fundamentally cannot keep processing ever increasing amount of data with
>>>> finite resources, isnt it?
>>>>
>>>> On Thu, Jul 9, 2015 at 3:17 AM, Anand Nalya <an...@gmail.com>
>>>> wrote:
>>>>
>>>>> Thats from the Streaming tab for Spark 1.4 WebUI.
>>>>>
>>>>> On 9 July 2015 at 15:35, Michel Hubert <mi...@vsnsystemen.nl> wrote:
>>>>>
>>>>>>  Hi,
>>>>>>
>>>>>>
>>>>>>
>>>>>> I was just wondering how you generated to second image with the
>>>>>> charts.
>>>>>>
>>>>>> What product?
>>>>>>
>>>>>>
>>>>>>
>>>>>> *From:* Anand Nalya [mailto:anand.nalya@gmail.com]
>>>>>> *Sent:* donderdag 9 juli 2015 11:48
>>>>>> *To:* spark users
>>>>>> *Subject:* Breaking lineage and reducing stages in Spark Streaming
>>>>>>
>>>>>>
>>>>>>
>>>>>> Hi,
>>>>>>
>>>>>>
>>>>>>
>>>>>> I've an application in which an rdd is being updated with tuples
>>>>>> coming from RDDs in a DStream with following pattern.
>>>>>>
>>>>>>
>>>>>>
>>>>>> dstream.foreachRDD(rdd => {
>>>>>>
>>>>>>   myRDD = myRDD.union(rdd.filter(myfilter)).reduceByKey(_+_)
>>>>>>
>>>>>> })
>>>>>>
>>>>>>
>>>>>>
>>>>>> I'm using cache() and checkpointin to cache results. Over the time,
>>>>>> the lineage of myRDD keeps increasing and stages in each batch of dstream
>>>>>> keeps increasing, even though all the earlier stages are skipped. When the
>>>>>> number of stages grow big enough, the overall delay due to scheduling delay
>>>>>> starts increasing. The processing time for each batch is still fixed.
>>>>>>
>>>>>>
>>>>>>
>>>>>> Following figures illustrate the problem:
>>>>>>
>>>>>>
>>>>>>
>>>>>> Job execution: https://i.imgur.com/GVHeXH3.png?1
>>>>>>
>>>>>> [image: Image removed by sender.]
>>>>>>
>>>>>> Delays: https://i.imgur.com/1DZHydw.png?1
>>>>>>
>>>>>> [image: Image removed by sender.]
>>>>>>
>>>>>> Is there some pattern that I can use to avoid this?
>>>>>>
>>>>>>
>>>>>>
>>>>>> Regards,
>>>>>>
>>>>>> Anand
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Breaking lineage and reducing stages in Spark Streaming

Posted by Anand Nalya <an...@gmail.com>.
Yes, myRDD is outside of DStream. Following is the actual code where newBase
and current are the rdds being updated with each batch:

  val base = sc.textFile...
  var newBase = base.cache()

  val dstream: DStream[String] = ssc.textFileStream...
  var current: RDD[(String, Long)] = sc.emptyRDD.cache()

  dstream.countByValue().checkpoint(Seconds(120)).foreachRDD(rdd => {

    current = rdd.union(current.unpersist()).reduceByKey(_+_, 2)

    val joined = current.leftOuterJoin(newBase).cache()
    val toUpdate = joined.filter(myfilter).map(mymap).cache()
    val toNotUpdate = joined.filter(mynotfilter).map(mymap).cache()

    toUpdate.collect().foreach(println) // this goes to some store

    newBase = newBase.unpersist().union(toUpdate).reduceByKey(_+_,
2).cache()

    current = toNotUpdate.cache()

    toUpdate.unpersist()
    joined.unpersist()
    rdd.unpersist()
  })


Regards,

Anand


On 9 July 2015 at 18:16, Dean Wampler <de...@gmail.com> wrote:

> Is myRDD outside a DStream? If so are you persisting on each batch
> iteration? It should be checkpointed frequently too.
>
> Dean Wampler, Ph.D.
> Author: Programming Scala, 2nd Edition
> <http://shop.oreilly.com/product/0636920033073.do> (O'Reilly)
> Typesafe <http://typesafe.com>
> @deanwampler <http://twitter.com/deanwampler>
> http://polyglotprogramming.com
>
> On Thu, Jul 9, 2015 at 5:56 AM, Anand Nalya <an...@gmail.com> wrote:
>
>> The data coming from dstream have the same keys that are in myRDD, so the reduceByKey
>> after union keeps the overall tuple count in myRDD fixed. Or even with
>> fixed tuple count, it will keep consuming more resources?
>>
>> On 9 July 2015 at 16:19, Tathagata Das <td...@databricks.com> wrote:
>>
>>> If you are continuously unioning RDDs, then you are accumulating ever
>>> increasing data, and you are processing ever increasing amount of data in
>>> every batch. Obviously this is going to not last for very long. You
>>> fundamentally cannot keep processing ever increasing amount of data with
>>> finite resources, isnt it?
>>>
>>> On Thu, Jul 9, 2015 at 3:17 AM, Anand Nalya <an...@gmail.com>
>>> wrote:
>>>
>>>> Thats from the Streaming tab for Spark 1.4 WebUI.
>>>>
>>>> On 9 July 2015 at 15:35, Michel Hubert <mi...@vsnsystemen.nl> wrote:
>>>>
>>>>>  Hi,
>>>>>
>>>>>
>>>>>
>>>>> I was just wondering how you generated to second image with the charts.
>>>>>
>>>>> What product?
>>>>>
>>>>>
>>>>>
>>>>> *From:* Anand Nalya [mailto:anand.nalya@gmail.com]
>>>>> *Sent:* donderdag 9 juli 2015 11:48
>>>>> *To:* spark users
>>>>> *Subject:* Breaking lineage and reducing stages in Spark Streaming
>>>>>
>>>>>
>>>>>
>>>>> Hi,
>>>>>
>>>>>
>>>>>
>>>>> I've an application in which an rdd is being updated with tuples
>>>>> coming from RDDs in a DStream with following pattern.
>>>>>
>>>>>
>>>>>
>>>>> dstream.foreachRDD(rdd => {
>>>>>
>>>>>   myRDD = myRDD.union(rdd.filter(myfilter)).reduceByKey(_+_)
>>>>>
>>>>> })
>>>>>
>>>>>
>>>>>
>>>>> I'm using cache() and checkpointin to cache results. Over the time,
>>>>> the lineage of myRDD keeps increasing and stages in each batch of dstream
>>>>> keeps increasing, even though all the earlier stages are skipped. When the
>>>>> number of stages grow big enough, the overall delay due to scheduling delay
>>>>> starts increasing. The processing time for each batch is still fixed.
>>>>>
>>>>>
>>>>>
>>>>> Following figures illustrate the problem:
>>>>>
>>>>>
>>>>>
>>>>> Job execution: https://i.imgur.com/GVHeXH3.png?1
>>>>>
>>>>> [image: Image removed by sender.]
>>>>>
>>>>> Delays: https://i.imgur.com/1DZHydw.png?1
>>>>>
>>>>> [image: Image removed by sender.]
>>>>>
>>>>> Is there some pattern that I can use to avoid this?
>>>>>
>>>>>
>>>>>
>>>>> Regards,
>>>>>
>>>>> Anand
>>>>>
>>>>
>>>>
>>>
>>
>

Re: Breaking lineage and reducing stages in Spark Streaming

Posted by Dean Wampler <de...@gmail.com>.
Is myRDD outside a DStream? If so are you persisting on each batch
iteration? It should be checkpointed frequently too.

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
<http://shop.oreilly.com/product/0636920033073.do> (O'Reilly)
Typesafe <http://typesafe.com>
@deanwampler <http://twitter.com/deanwampler>
http://polyglotprogramming.com

On Thu, Jul 9, 2015 at 5:56 AM, Anand Nalya <an...@gmail.com> wrote:

> The data coming from dstream have the same keys that are in myRDD, so the reduceByKey
> after union keeps the overall tuple count in myRDD fixed. Or even with
> fixed tuple count, it will keep consuming more resources?
>
> On 9 July 2015 at 16:19, Tathagata Das <td...@databricks.com> wrote:
>
>> If you are continuously unioning RDDs, then you are accumulating ever
>> increasing data, and you are processing ever increasing amount of data in
>> every batch. Obviously this is going to not last for very long. You
>> fundamentally cannot keep processing ever increasing amount of data with
>> finite resources, isnt it?
>>
>> On Thu, Jul 9, 2015 at 3:17 AM, Anand Nalya <an...@gmail.com>
>> wrote:
>>
>>> Thats from the Streaming tab for Spark 1.4 WebUI.
>>>
>>> On 9 July 2015 at 15:35, Michel Hubert <mi...@vsnsystemen.nl> wrote:
>>>
>>>>  Hi,
>>>>
>>>>
>>>>
>>>> I was just wondering how you generated to second image with the charts.
>>>>
>>>> What product?
>>>>
>>>>
>>>>
>>>> *From:* Anand Nalya [mailto:anand.nalya@gmail.com]
>>>> *Sent:* donderdag 9 juli 2015 11:48
>>>> *To:* spark users
>>>> *Subject:* Breaking lineage and reducing stages in Spark Streaming
>>>>
>>>>
>>>>
>>>> Hi,
>>>>
>>>>
>>>>
>>>> I've an application in which an rdd is being updated with tuples coming
>>>> from RDDs in a DStream with following pattern.
>>>>
>>>>
>>>>
>>>> dstream.foreachRDD(rdd => {
>>>>
>>>>   myRDD = myRDD.union(rdd.filter(myfilter)).reduceByKey(_+_)
>>>>
>>>> })
>>>>
>>>>
>>>>
>>>> I'm using cache() and checkpointin to cache results. Over the time, the
>>>> lineage of myRDD keeps increasing and stages in each batch of dstream keeps
>>>> increasing, even though all the earlier stages are skipped. When the number
>>>> of stages grow big enough, the overall delay due to scheduling delay starts
>>>> increasing. The processing time for each batch is still fixed.
>>>>
>>>>
>>>>
>>>> Following figures illustrate the problem:
>>>>
>>>>
>>>>
>>>> Job execution: https://i.imgur.com/GVHeXH3.png?1
>>>>
>>>> [image: Image removed by sender.]
>>>>
>>>> Delays: https://i.imgur.com/1DZHydw.png?1
>>>>
>>>> [image: Image removed by sender.]
>>>>
>>>> Is there some pattern that I can use to avoid this?
>>>>
>>>>
>>>>
>>>> Regards,
>>>>
>>>> Anand
>>>>
>>>
>>>
>>
>

Re: Breaking lineage and reducing stages in Spark Streaming

Posted by Anand Nalya <an...@gmail.com>.
The data coming from dstream have the same keys that are in myRDD, so
the reduceByKey
after union keeps the overall tuple count in myRDD fixed. Or even with
fixed tuple count, it will keep consuming more resources?

On 9 July 2015 at 16:19, Tathagata Das <td...@databricks.com> wrote:

> If you are continuously unioning RDDs, then you are accumulating ever
> increasing data, and you are processing ever increasing amount of data in
> every batch. Obviously this is going to not last for very long. You
> fundamentally cannot keep processing ever increasing amount of data with
> finite resources, isnt it?
>
> On Thu, Jul 9, 2015 at 3:17 AM, Anand Nalya <an...@gmail.com> wrote:
>
>> Thats from the Streaming tab for Spark 1.4 WebUI.
>>
>> On 9 July 2015 at 15:35, Michel Hubert <mi...@vsnsystemen.nl> wrote:
>>
>>>  Hi,
>>>
>>>
>>>
>>> I was just wondering how you generated to second image with the charts.
>>>
>>> What product?
>>>
>>>
>>>
>>> *From:* Anand Nalya [mailto:anand.nalya@gmail.com]
>>> *Sent:* donderdag 9 juli 2015 11:48
>>> *To:* spark users
>>> *Subject:* Breaking lineage and reducing stages in Spark Streaming
>>>
>>>
>>>
>>> Hi,
>>>
>>>
>>>
>>> I've an application in which an rdd is being updated with tuples coming
>>> from RDDs in a DStream with following pattern.
>>>
>>>
>>>
>>> dstream.foreachRDD(rdd => {
>>>
>>>   myRDD = myRDD.union(rdd.filter(myfilter)).reduceByKey(_+_)
>>>
>>> })
>>>
>>>
>>>
>>> I'm using cache() and checkpointin to cache results. Over the time, the
>>> lineage of myRDD keeps increasing and stages in each batch of dstream keeps
>>> increasing, even though all the earlier stages are skipped. When the number
>>> of stages grow big enough, the overall delay due to scheduling delay starts
>>> increasing. The processing time for each batch is still fixed.
>>>
>>>
>>>
>>> Following figures illustrate the problem:
>>>
>>>
>>>
>>> Job execution: https://i.imgur.com/GVHeXH3.png?1
>>>
>>> [image: Image removed by sender.]
>>>
>>> Delays: https://i.imgur.com/1DZHydw.png?1
>>>
>>> [image: Image removed by sender.]
>>>
>>> Is there some pattern that I can use to avoid this?
>>>
>>>
>>>
>>> Regards,
>>>
>>> Anand
>>>
>>
>>
>

Re: Breaking lineage and reducing stages in Spark Streaming

Posted by Tathagata Das <td...@databricks.com>.
If you are continuously unioning RDDs, then you are accumulating ever
increasing data, and you are processing ever increasing amount of data in
every batch. Obviously this is going to not last for very long. You
fundamentally cannot keep processing ever increasing amount of data with
finite resources, isnt it?

On Thu, Jul 9, 2015 at 3:17 AM, Anand Nalya <an...@gmail.com> wrote:

> Thats from the Streaming tab for Spark 1.4 WebUI.
>
> On 9 July 2015 at 15:35, Michel Hubert <mi...@vsnsystemen.nl> wrote:
>
>>  Hi,
>>
>>
>>
>> I was just wondering how you generated to second image with the charts.
>>
>> What product?
>>
>>
>>
>> *From:* Anand Nalya [mailto:anand.nalya@gmail.com]
>> *Sent:* donderdag 9 juli 2015 11:48
>> *To:* spark users
>> *Subject:* Breaking lineage and reducing stages in Spark Streaming
>>
>>
>>
>> Hi,
>>
>>
>>
>> I've an application in which an rdd is being updated with tuples coming
>> from RDDs in a DStream with following pattern.
>>
>>
>>
>> dstream.foreachRDD(rdd => {
>>
>>   myRDD = myRDD.union(rdd.filter(myfilter)).reduceByKey(_+_)
>>
>> })
>>
>>
>>
>> I'm using cache() and checkpointin to cache results. Over the time, the
>> lineage of myRDD keeps increasing and stages in each batch of dstream keeps
>> increasing, even though all the earlier stages are skipped. When the number
>> of stages grow big enough, the overall delay due to scheduling delay starts
>> increasing. The processing time for each batch is still fixed.
>>
>>
>>
>> Following figures illustrate the problem:
>>
>>
>>
>> Job execution: https://i.imgur.com/GVHeXH3.png?1
>>
>> [image: Image removed by sender.]
>>
>> Delays: https://i.imgur.com/1DZHydw.png?1
>>
>> [image: Image removed by sender.]
>>
>> Is there some pattern that I can use to avoid this?
>>
>>
>>
>> Regards,
>>
>> Anand
>>
>
>

Re: Breaking lineage and reducing stages in Spark Streaming

Posted by Anand Nalya <an...@gmail.com>.
Thats from the Streaming tab for Spark 1.4 WebUI.

On 9 July 2015 at 15:35, Michel Hubert <mi...@vsnsystemen.nl> wrote:

>  Hi,
>
>
>
> I was just wondering how you generated to second image with the charts.
>
> What product?
>
>
>
> *From:* Anand Nalya [mailto:anand.nalya@gmail.com]
> *Sent:* donderdag 9 juli 2015 11:48
> *To:* spark users
> *Subject:* Breaking lineage and reducing stages in Spark Streaming
>
>
>
> Hi,
>
>
>
> I've an application in which an rdd is being updated with tuples coming
> from RDDs in a DStream with following pattern.
>
>
>
> dstream.foreachRDD(rdd => {
>
>   myRDD = myRDD.union(rdd.filter(myfilter)).reduceByKey(_+_)
>
> })
>
>
>
> I'm using cache() and checkpointin to cache results. Over the time, the
> lineage of myRDD keeps increasing and stages in each batch of dstream keeps
> increasing, even though all the earlier stages are skipped. When the number
> of stages grow big enough, the overall delay due to scheduling delay starts
> increasing. The processing time for each batch is still fixed.
>
>
>
> Following figures illustrate the problem:
>
>
>
> Job execution: https://i.imgur.com/GVHeXH3.png?1
>
> [image: Image removed by sender.]
>
> Delays: https://i.imgur.com/1DZHydw.png?1
>
> [image: Image removed by sender.]
>
> Is there some pattern that I can use to avoid this?
>
>
>
> Regards,
>
> Anand
>

RE: Breaking lineage and reducing stages in Spark Streaming

Posted by Michel Hubert <mi...@vsnsystemen.nl>.
Hi,

I was just wondering how you generated to second image with the charts.
What product?

From: Anand Nalya [mailto:anand.nalya@gmail.com]
Sent: donderdag 9 juli 2015 11:48
To: spark users
Subject: Breaking lineage and reducing stages in Spark Streaming

Hi,

I've an application in which an rdd is being updated with tuples coming from RDDs in a DStream with following pattern.

dstream.foreachRDD(rdd => {
  myRDD = myRDD.union(rdd.filter(myfilter)).reduceByKey(_+_)
})

I'm using cache() and checkpointin to cache results. Over the time, the lineage of myRDD keeps increasing and stages in each batch of dstream keeps increasing, even though all the earlier stages are skipped. When the number of stages grow big enough, the overall delay due to scheduling delay starts increasing. The processing time for each batch is still fixed.

Following figures illustrate the problem:

Job execution: https://i.imgur.com/GVHeXH3.png?1
[Image removed by sender.]
Delays: https://i.imgur.com/1DZHydw.png?1
[Image removed by sender.]
Is there some pattern that I can use to avoid this?

Regards,
Anand