You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Anand Nalya <an...@gmail.com> on 2015/07/07 13:34:02 UTC

(Unknown)

Hi,

Suppose I have an RDD that is loaded from some file and then I also have a
DStream that has data coming from some stream. I want to keep union some of
the tuples from the DStream into my RDD. For this I can use something like
this:

  var myRDD: RDD[(String, Long)] = sc.fromText...
  dstream.foreachRDD{ rdd =>
    myRDD = myRDD.union(rdd.filter(myfilter))
  }

My questions is that for how long spark will keep RDDs underlying the
dstream around? Is there some configuratoin knob that can control that?

Regards,
Anand

Re:

Posted by Anand Nalya <an...@gmail.com>.
>
> A formal requirements spec derived from the above - I think the actual
> requirement here is picking up and adding Specific (filtered) Messages from
> EVERY DStream RDD  to the Batch RDD rather than “preserving” (on top of
> that all) messages from  sliding window and adding them to the Batch RDD.


This is what I need to do. The batch RDD represents the long term state of
the system and the dstream makes updates to this state on some criteria.

Following is the code that I was able to run with checkpoints. As suggested
by @Gerard, I'm trying to break the lineage by cache().

  val base = sc.textFile...
  var newBase = base.cache()

  val dstream: DStream[String] = ssc.textFileStream...
  var current: RDD[(String, Long)] = sc.emptyRDD.cache()

  dstream.countByValue().checkpoint(Seconds(120)).foreachRDD(rdd => {

    current = rdd.union(current.unpersist()).reduceByKey(_+_, 2)

    val joined = current.leftOuterJoin(newBase).cache()
    val toUpdate = joined.filter(myfilter).map(mymap).cache()
    val toNotUpdate = joined.filter(mynotfilter).map(mymap).cache()

    toUpdate.collect().foreach(println) // this goes to some persistet store

    newBase = newBase.unpersist().union(toUpdate).reduceByKey(_+_,
2).cache()

    current = toNotUpdate.cache()

    toUpdate.unpersist()
    toNotUpdate.unpersist()
    joined.unpersist()
    rdd.unpersist()
  })

Even though the above works fine for a while, the number of stages for each
job keeps increasing and resulting in larger processing times. Is there
some workaround for this or using something like redis for keeping the
batchRDD data the way to go?





Anand

On 7 July 2015 at 18:38, Evo Eftimov <ev...@isecc.com> wrote:

> Requirements – then see my abstracted interpretation – what else do you
> need in terms of Requirements …:
>
>
>
> “Suppose I have an RDD that is loaded from some file and then I also have
> a DStream that has data coming from some stream. I want to keep union some
> of the tuples from the DStream into my RDD. For this I can use something
> like this:”
>
> A formal requirements spec derived from the above - I think the actual
> requirement here is picking up and adding Specific (filtered) Messages from
> EVERY DStream RDD  to the Batch RDD rather than “preserving” (on top of
> that all) messages from  sliding window and adding them to the Batch RDD.
> Such requiremet should be defined as the Frequency of Updates to the Batch
> RDD and what these updates are e.g. specific filtered messages and then
> using dstream.window() can be made equal to that frequency
>
> Essentialy the update frequency can range from the filtered messages of
> Every Single DStream RDD to the filetered messages of a SLIDING WINDOW
>
>
>
> Secondly what do you call “mutable uniniong”
>
>
>
> That was his initial code
>
>
>
>   var myRDD: RDD[(String, Long)] = sc.fromText...
>
>   dstream.foreachRDD{ rdd =>
>
>     myRDD = myRDD.union(rdd.filter(myfilter))
>
>   }
>
>
>
>
>
> Here is how it looks when Persisting the result from evet union – supposed
> to produce NEW PERSTINET IMMUTABLE Batch RDD – why is that supposed to be
> less “stable/reliable” – what are the exact tectnical reasons for that
>
>   var myRDD: RDD[(String, Long)] = sc.fromText...
>
>   dstream.foreachRDD{ rdd =>
>
>     myRDD = myRDD.union(rdd.filter(myfilter)).cashe()
>
>   }
>
>
>
>
>
>
>
>
>
> *From:* Gerard Maas [mailto:gerard.maas@gmail.com]
> *Sent:* Tuesday, July 7, 2015 1:55 PM
> *To:* Evo Eftimov
> *Cc:* Anand Nalya; spark users
> *Subject:* Re:
>
>
>
> Evo,
>
>
>
> I'd let the OP clarify the question. I'm not in position of clarifying his
> requirements beyond what's written on the question.
>
>
>
> Regarding window vs mutable union: window is a well-supported feature that
> accumulates messages over time. The mutable unioning of RDDs is bound to
> operational trouble as there're no warranties tied to data preservation and
> it's unclear how one can produce 'cuts' of that union ready to be served
> for some process/computation.  Intuitively, it will 'explode' at some point.
>
>
>
> -kr, Gerard.
>
>
>
>
>
>
>
> On Tue, Jul 7, 2015 at 2:06 PM, Evo Eftimov <ev...@isecc.com> wrote:
>
> spark.streaming.unpersist = false // in order for SStreaming to not drop
> the raw RDD data
>
> spark.cleaner.ttl = <some reasonable value in seconds>
>
>
>
> why is the above suggested provided the persist/vache operation on the
> constantly unioniuzed Batch RDD will have to be invoked anyway (after every
> union with DStream RDD), besides it will result in DStraeam RDDs
> accumulating in RAM unncesesarily for the duration of TTL
>
>
>
> re
>
>
>
> “A more reliable way would be to do dstream.window(...) for the length of
> time you want to keep the data and then union that data with your RDD for
> further processing using transform.”
>
>
>
> I think the actual requirement here is picking up and adding Specific
> Messages from EVERY DStream RDD  to the Batch RDD rather than “preserving”
> messages from specific  sliding window and adding them to the Batch RDD
>
>
>
> This should be defined as the Frequency of Updates to the Batch RDD and
> then using dstream.window() equal to that frequency
>
>
>
> Can you also elaborate why you consider the dstream.window  approach more
> “reliable”
>
>
>
> *From:* Gerard Maas [mailto:gerard.maas@gmail.com]
> *Sent:* Tuesday, July 7, 2015 12:56 PM
> *To:* Anand Nalya
> *Cc:* spark users
> *Subject:* Re:
>
>
>
> Anand,
>
>
>
> AFAIK, you will need to change two settings:
>
>
>
> spark.streaming.unpersist = false // in order for SStreaming to not drop
> the raw RDD data
>
> spark.cleaner.ttl = <some reasonable value in seconds>
>
>
>
> Also be aware that the lineage of your union RDD will grow with each batch
> interval. You will need to break lineage often with cache(), and rely on
> the ttl for clean up.
>
> You will probably be in some tricky ground with this approach.
>
>
>
> A more reliable way would be to do dstream.window(...) for the length of
> time you want to keep the data and then union that data with your RDD for
> further processing using transform.
>
> Something like:
>
> dstream.window(Seconds(900), Seconds(900)).transform(rdd => rdd union
> otherRdd)...
>
>
>
> If you need an unbound amount of dstream batch intervals, considering
> writing the data to secondary storage instead.
>
>
>
> -kr, Gerard.
>
>
>
>
>
>
>
> On Tue, Jul 7, 2015 at 1:34 PM, Anand Nalya <an...@gmail.com> wrote:
>
> Hi,
>
>
>
> Suppose I have an RDD that is loaded from some file and then I also have a
> DStream that has data coming from some stream. I want to keep union some of
> the tuples from the DStream into my RDD. For this I can use something like
> this:
>
>
>
>   var myRDD: RDD[(String, Long)] = sc.fromText...
>
>   dstream.foreachRDD{ rdd =>
>
>     myRDD = myRDD.union(rdd.filter(myfilter))
>
>   }
>
>
>
> My questions is that for how long spark will keep RDDs underlying the
> dstream around? Is there some configuratoin knob that can control that?
>
>
>
> Regards,
>
> Anand
>
>
>
>
>

RE:

Posted by Evo Eftimov <ev...@isecc.com>.
Requirements – then see my abstracted interpretation – what else do you need in terms of Requirements …:

 

“Suppose I have an RDD that is loaded from some file and then I also have a DStream that has data coming from some stream. I want to keep union some of the tuples from the DStream into my RDD. For this I can use something like this:”

A formal requirements spec derived from the above - I think the actual requirement here is picking up and adding Specific (filtered) Messages from EVERY DStream RDD  to the Batch RDD rather than “preserving” (on top of that all) messages from  sliding window and adding them to the Batch RDD. Such requiremet should be defined as the Frequency of Updates to the Batch RDD and what these updates are e.g. specific filtered messages and then using dstream.window() can be made equal to that frequency

Essentialy the update frequency can range from the filtered messages of Every Single DStream RDD to the filetered messages of a SLIDING WINDOW  

 

Secondly what do you call “mutable uniniong”

 

That was his initial code

 

  var myRDD: RDD[(String, Long)] = sc.fromText...

  dstream.foreachRDD{ rdd =>

    myRDD = myRDD.union(rdd.filter(myfilter))

  }

 

 

Here is how it looks when Persisting the result from evet union – supposed to produce NEW PERSTINET IMMUTABLE Batch RDD – why is that supposed to be less “stable/reliable” – what are the exact tectnical reasons for that 

  var myRDD: RDD[(String, Long)] = sc.fromText...

  dstream.foreachRDD{ rdd =>

    myRDD = myRDD.union(rdd.filter(myfilter)).cashe()

  }

 

 

 

 

From: Gerard Maas [mailto:gerard.maas@gmail.com] 
Sent: Tuesday, July 7, 2015 1:55 PM
To: Evo Eftimov
Cc: Anand Nalya; spark users
Subject: Re:

 

Evo,

 

I'd let the OP clarify the question. I'm not in position of clarifying his requirements beyond what's written on the question.

 

Regarding window vs mutable union: window is a well-supported feature that accumulates messages over time. The mutable unioning of RDDs is bound to operational trouble as there're no warranties tied to data preservation and it's unclear how one can produce 'cuts' of that union ready to be served for some process/computation.  Intuitively, it will 'explode' at some point.

 

-kr, Gerard.

 

 

 

On Tue, Jul 7, 2015 at 2:06 PM, Evo Eftimov <ev...@isecc.com> wrote:

spark.streaming.unpersist = false // in order for SStreaming to not drop the raw RDD data

spark.cleaner.ttl = <some reasonable value in seconds>

 

why is the above suggested provided the persist/vache operation on the constantly unioniuzed Batch RDD will have to be invoked anyway (after every union with DStream RDD), besides it will result in DStraeam RDDs accumulating in RAM unncesesarily for the duration of TTL  

 

re 

 

“A more reliable way would be to do dstream.window(...) for the length of time you want to keep the data and then union that data with your RDD for further processing using transform.”

 

I think the actual requirement here is picking up and adding Specific Messages from EVERY DStream RDD  to the Batch RDD rather than “preserving” messages from specific  sliding window and adding them to the Batch RDD

 

This should be defined as the Frequency of Updates to the Batch RDD and then using dstream.window() equal to that frequency 

 

Can you also elaborate why you consider the dstream.window  approach more “reliable”

 

From: Gerard Maas [mailto:gerard.maas@gmail.com] 
Sent: Tuesday, July 7, 2015 12:56 PM
To: Anand Nalya
Cc: spark users
Subject: Re:

 

Anand,

 

AFAIK, you will need to change two settings:

 

spark.streaming.unpersist = false // in order for SStreaming to not drop the raw RDD data

spark.cleaner.ttl = <some reasonable value in seconds>

 

Also be aware that the lineage of your union RDD will grow with each batch interval. You will need to break lineage often with cache(), and rely on the ttl for clean up.

You will probably be in some tricky ground with this approach.

 

A more reliable way would be to do dstream.window(...) for the length of time you want to keep the data and then union that data with your RDD for further processing using transform.

Something like:

dstream.window(Seconds(900), Seconds(900)).transform(rdd => rdd union otherRdd)...

 

If you need an unbound amount of dstream batch intervals, considering writing the data to secondary storage instead.

 

-kr, Gerard.

 

 

 

On Tue, Jul 7, 2015 at 1:34 PM, Anand Nalya <an...@gmail.com> wrote:

Hi,

 

Suppose I have an RDD that is loaded from some file and then I also have a DStream that has data coming from some stream. I want to keep union some of the tuples from the DStream into my RDD. For this I can use something like this:

 

  var myRDD: RDD[(String, Long)] = sc.fromText...

  dstream.foreachRDD{ rdd =>

    myRDD = myRDD.union(rdd.filter(myfilter))

  }

 

My questions is that for how long spark will keep RDDs underlying the dstream around? Is there some configuratoin knob that can control that?

 

Regards,

Anand

 

 


Re:

Posted by Gerard Maas <ge...@gmail.com>.
Evo,

I'd let the OP clarify the question. I'm not in position of clarifying his
requirements beyond what's written on the question.

Regarding window vs mutable union: window is a well-supported feature that
accumulates messages over time. The mutable unioning of RDDs is bound to
operational trouble as there're no warranties tied to data preservation and
it's unclear how one can produce 'cuts' of that union ready to be served
for some process/computation.  Intuitively, it will 'explode' at some point.

-kr, Gerard.



On Tue, Jul 7, 2015 at 2:06 PM, Evo Eftimov <ev...@isecc.com> wrote:

> spark.streaming.unpersist = false // in order for SStreaming to not drop
> the raw RDD data
>
> spark.cleaner.ttl = <some reasonable value in seconds>
>
>
>
> why is the above suggested provided the persist/vache operation on the
> constantly unioniuzed Batch RDD will have to be invoked anyway (after every
> union with DStream RDD), besides it will result in DStraeam RDDs
> accumulating in RAM unncesesarily for the duration of TTL
>
>
>
> re
>
>
>
> “A more reliable way would be to do dstream.window(...) for the length of
> time you want to keep the data and then union that data with your RDD for
> further processing using transform.”
>
>
>
> I think the actual requirement here is picking up and adding Specific
> Messages from EVERY DStream RDD  to the Batch RDD rather than “preserving”
> messages from specific  sliding window and adding them to the Batch RDD
>
>
>
> This should be defined as the Frequency of Updates to the Batch RDD and
> then using dstream.window() equal to that frequency
>
>
>
> Can you also elaborate why you consider the dstream.window  approach more
> “reliable”
>
>
>
> *From:* Gerard Maas [mailto:gerard.maas@gmail.com]
> *Sent:* Tuesday, July 7, 2015 12:56 PM
> *To:* Anand Nalya
> *Cc:* spark users
> *Subject:* Re:
>
>
>
> Anand,
>
>
>
> AFAIK, you will need to change two settings:
>
>
>
> spark.streaming.unpersist = false // in order for SStreaming to not drop
> the raw RDD data
>
> spark.cleaner.ttl = <some reasonable value in seconds>
>
>
>
> Also be aware that the lineage of your union RDD will grow with each batch
> interval. You will need to break lineage often with cache(), and rely on
> the ttl for clean up.
>
> You will probably be in some tricky ground with this approach.
>
>
>
> A more reliable way would be to do dstream.window(...) for the length of
> time you want to keep the data and then union that data with your RDD for
> further processing using transform.
>
> Something like:
>
> dstream.window(Seconds(900), Seconds(900)).transform(rdd => rdd union
> otherRdd)...
>
>
>
> If you need an unbound amount of dstream batch intervals, considering
> writing the data to secondary storage instead.
>
>
>
> -kr, Gerard.
>
>
>
>
>
>
>
> On Tue, Jul 7, 2015 at 1:34 PM, Anand Nalya <an...@gmail.com> wrote:
>
> Hi,
>
>
>
> Suppose I have an RDD that is loaded from some file and then I also have a
> DStream that has data coming from some stream. I want to keep union some of
> the tuples from the DStream into my RDD. For this I can use something like
> this:
>
>
>
>   var myRDD: RDD[(String, Long)] = sc.fromText...
>
>   dstream.foreachRDD{ rdd =>
>
>     myRDD = myRDD.union(rdd.filter(myfilter))
>
>   }
>
>
>
> My questions is that for how long spark will keep RDDs underlying the
> dstream around? Is there some configuratoin knob that can control that?
>
>
>
> Regards,
>
> Anand
>
>
>

RE:

Posted by Evo Eftimov <ev...@isecc.com>.
spark.streaming.unpersist = false // in order for SStreaming to not drop the raw RDD data

spark.cleaner.ttl = <some reasonable value in seconds>

 

why is the above suggested provided the persist/vache operation on the constantly unioniuzed Batch RDD will have to be invoked anyway (after every union with DStream RDD), besides it will result in DStraeam RDDs accumulating in RAM unncesesarily for the duration of TTL  

 

re 

 

“A more reliable way would be to do dstream.window(...) for the length of time you want to keep the data and then union that data with your RDD for further processing using transform.”

 

I think the actual requirement here is picking up and adding Specific Messages from EVERY DStream RDD  to the Batch RDD rather than “preserving” messages from specific  sliding window and adding them to the Batch RDD

 

This should be defined as the Frequency of Updates to the Batch RDD and then using dstream.window() equal to that frequency 

 

Can you also elaborate why you consider the dstream.window  approach more “reliable”

 

From: Gerard Maas [mailto:gerard.maas@gmail.com] 
Sent: Tuesday, July 7, 2015 12:56 PM
To: Anand Nalya
Cc: spark users
Subject: Re:

 

Anand,

 

AFAIK, you will need to change two settings:

 

spark.streaming.unpersist = false // in order for SStreaming to not drop the raw RDD data

spark.cleaner.ttl = <some reasonable value in seconds>

 

Also be aware that the lineage of your union RDD will grow with each batch interval. You will need to break lineage often with cache(), and rely on the ttl for clean up.

You will probably be in some tricky ground with this approach.

 

A more reliable way would be to do dstream.window(...) for the length of time you want to keep the data and then union that data with your RDD for further processing using transform.

Something like:

dstream.window(Seconds(900), Seconds(900)).transform(rdd => rdd union otherRdd)...

 

If you need an unbound amount of dstream batch intervals, considering writing the data to secondary storage instead.

 

-kr, Gerard.

 

 

 

On Tue, Jul 7, 2015 at 1:34 PM, Anand Nalya <an...@gmail.com> wrote:

Hi,

 

Suppose I have an RDD that is loaded from some file and then I also have a DStream that has data coming from some stream. I want to keep union some of the tuples from the DStream into my RDD. For this I can use something like this:

 

  var myRDD: RDD[(String, Long)] = sc.fromText...

  dstream.foreachRDD{ rdd =>

    myRDD = myRDD.union(rdd.filter(myfilter))

  }

 

My questions is that for how long spark will keep RDDs underlying the dstream around? Is there some configuratoin knob that can control that?

 

Regards,

Anand

 


Re:

Posted by Gerard Maas <ge...@gmail.com>.
Anand,

AFAIK, you will need to change two settings:

spark.streaming.unpersist = false // in order for SStreaming to not drop
the raw RDD data
spark.cleaner.ttl = <some reasonable value in seconds>

Also be aware that the lineage of your union RDD will grow with each batch
interval. You will need to break lineage often with cache(), and rely on
the ttl for clean up.
You will probably be in some tricky ground with this approach.

A more reliable way would be to do dstream.window(...) for the length of
time you want to keep the data and then union that data with your RDD for
further processing using transform.
Something like:
dstream.window(Seconds(900), Seconds(900)).transform(rdd => rdd union
otherRdd)...

If you need an unbound amount of dstream batch intervals, considering
writing the data to secondary storage instead.

-kr, Gerard.



On Tue, Jul 7, 2015 at 1:34 PM, Anand Nalya <an...@gmail.com> wrote:

> Hi,
>
> Suppose I have an RDD that is loaded from some file and then I also have a
> DStream that has data coming from some stream. I want to keep union some of
> the tuples from the DStream into my RDD. For this I can use something like
> this:
>
>   var myRDD: RDD[(String, Long)] = sc.fromText...
>   dstream.foreachRDD{ rdd =>
>     myRDD = myRDD.union(rdd.filter(myfilter))
>   }
>
> My questions is that for how long spark will keep RDDs underlying the
> dstream around? Is there some configuratoin knob that can control that?
>
> Regards,
> Anand
>

RE:

Posted by Evo Eftimov <ev...@isecc.com>.
The “RDD” aka Batch RDD which you load from file, will be kept for as long as the Spark Framework is instantiated / running – you can also ensure it is flagged explicitly as Persisted e.g. In Memory and/or disk

 

From: Anand Nalya [mailto:anand.nalya@gmail.com] 
Sent: Tuesday, July 7, 2015 12:34 PM
To: user@spark.apache.org
Subject: 

 

Hi,

 

Suppose I have an RDD that is loaded from some file and then I also have a DStream that has data coming from some stream. I want to keep union some of the tuples from the DStream into my RDD. For this I can use something like this:

 

  var myRDD: RDD[(String, Long)] = sc.fromText...

  dstream.foreachRDD{ rdd =>

    myRDD = myRDD.union(rdd.filter(myfilter))

  }

 

My questions is that for how long spark will keep RDDs underlying the dstream around? Is there some configuratoin knob that can control that?

 

Regards,

Anand