You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by julyfire <he...@gmail.com> on 2014/09/09 08:41:44 UTC

Spark streaming: size of DStream

I want to implement the following logic:

val stream = getFlumeStream() // a DStream

if(size_of_stream > 0)  // if the DStream contains some RDD

  stream.someTransfromation

stream.count() can figure out the number of RDD in a DStream, but it return
a DStream[Long] and can't compare with a number.

does anyone know how to get the number of RDD in a DStream?

Thanks!



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-size-of-DStream-tp13769.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Spark streaming: size of DStream

Posted by Sean Owen <so...@cloudera.com>.
How about calling foreachRDD, and processing whatever data is in each
RDD normally, and also keeping track within the foreachRDD function of
whether any RDD had a count() > 0? if not, then you can execute at the
end your alternate logic in the case of no data. I don't think you
want to operate at the DStream level.

On Tue, Sep 9, 2014 at 8:41 AM, julyfire <he...@gmail.com> wrote:
> Hi Jerry,
>
> Thanks for your reply.
> I use spark streaming to receive the flume stream, then I need to do a
> judgement, in each batchDuration, if the received stream has data, then I
> should do something, if no data, do the other thing. Then I thought the
> count() can give me the measure, but it returns a DStream, not a number.
> so is there a way to achieve this case?
>
>
>
>
> --
> View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-size-of-DStream-tp13769p13775.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Spark streaming: size of DStream

Posted by Luis Ángel Vicente Sánchez <la...@gmail.com>.
If you take into account what streaming means in spark, your goal doesn't
really make sense; you have to assume that your streams are infinite and
you will have to process them till the end of the days. Operations on a
DStream define what you want to do with each element of each RDD, but spark
streaming is smart enough to not apply the transformations if RDD are empty.

The only time where you probably want to know the size of the RDD is when
you are going to perform a side-effect like storing something in a
database, using foreachRDD, i.e:

val flumeStream = ...

val transformedStream = flumeStream.map(... some transformation
...).flatMap(... some other transformation).distinct().....

transformedStream.foreachRDD { rdd =>
  if (rdd.count() != 0) {
    // perform some side effect that shouldn't be done if a transformed
batch is empty
  }
}

2014-09-09 9:20 GMT+01:00 julyfire <he...@gmail.com>:

> i'm sorry I have some error in my code, update here:
>
> var count = -1L // a global variable in the main object
>
> val currentBatch = some_DStream
> val countDStream = currentBatch.map(o=>{
>       count = 0L  // reset the count variable in each batch
>       o
>     })
> countDStream.foreachRDD(rdd=> count += rdd.count())
>
> if (count > 0) {
>   currentBatch.map(...).someOtherTransformation
> }
>
> two problems:
> 1. the variable count just go on accumulate and no reset in each batch
> 2. if(count > 0) only evaluate in the beginning of running the program, so
> the next statement will never run
>
> Can you all give me some suggestion? thanks
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-size-of-DStream-tp13769p13781.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>
>

RE: Spark streaming: size of DStream

Posted by julyfire <he...@gmail.com>.
yes, I agree to directly transform on DStream even there is no data injected
in this batch duration.
while my situation is :
Spark receive flume stream continurously, and I use updateStateByKey
function to collect data for a key among several batches, then I will handle
the collected data after waiting a specified time (which I use a counter to
measure) since the first time no data updated in the updateStateByKey
operation.  Normally, when the waiting time is up, I should collected all
data for a key. But if the flume data source is broken for a while, and if
this interval is over the waiting time, then I will only get partial data
for a key. So I need a way to determine whether current flume stream batch
contains data, if no, it means the flume data source is broken, then I can
skip the updateStateByKey operation, till the flume data source is
reconnected, then the counter in the updateStateByKey function can count
again. In this way I could get the intack data.

another question, why the count variable in map cannot work but it effects
in the foreachRDD in my previous code?

thanks :P 



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-size-of-DStream-tp13769p13785.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


RE: Spark streaming: size of DStream

Posted by "Shao, Saisai" <sa...@intel.com>.
I think you should clarify some things in Spark Streaming:

1. closure in map is running in the remote side, so modify count var will only take effect in remote side. You will always get -1 in driver side.
2. some codes in closure in foreachRDD is lazily executed in each batch duration, while the if (...) code outside the closure is executed once immediately and will never executed again, so your code logic is wrong as expected.
3. I don't think you need to judge whether there is data feed in to do some transformations, you can directly transform on DStream even there is no data injected in this batch duration, it's only an empty transformation, no more specific overhead.

Thanks
Jerry

-----Original Message-----
From: julyfire [mailto:helloweibo@gmail.com] 
Sent: Tuesday, September 09, 2014 4:20 PM
To: user@spark.incubator.apache.org
Subject: RE: Spark streaming: size of DStream

i'm sorry I have some error in my code, update here:

var count = -1L // a global variable in the main object 

val currentBatch = some_DStream
val countDStream = currentBatch.map(o=>{ 
      count = 0L  // reset the count variable in each batch 
      o 
    })
countDStream.foreachRDD(rdd=> count += rdd.count())

if (count > 0) {
  currentBatch.map(...).someOtherTransformation
}

two problems:
1. the variable count just go on accumulate and no reset in each batch 2. if(count > 0) only evaluate in the beginning of running the program, so the next statement will never run

Can you all give me some suggestion? thanks





--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-size-of-DStream-tp13769p13781.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org For additional commands, e-mail: user-help@spark.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


RE: Spark streaming: size of DStream

Posted by julyfire <he...@gmail.com>.
i'm sorry I have some error in my code, update here:

var count = -1L // a global variable in the main object 

val currentBatch = some_DStream 
val countDStream = currentBatch.map(o=>{ 
      count = 0L  // reset the count variable in each batch 
      o 
    }) 
countDStream.foreachRDD(rdd=> count += rdd.count())

if (count > 0) {
  currentBatch.map(...).someOtherTransformation
}

two problems:
1. the variable count just go on accumulate and no reset in each batch
2. if(count > 0) only evaluate in the beginning of running the program, so
the next statement will never run

Can you all give me some suggestion? thanks





--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-size-of-DStream-tp13769p13781.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


RE: Spark streaming: size of DStream

Posted by julyfire <he...@gmail.com>.
Thanks all,

yes, i did using foreachRDD, the following is my code:

var count = -1L // a global variable in the main object

val currentBatch = some_DStream
val countDStream = currentBatch.map(o=>{
      *count = 0L  *// reset the count variable in each batch
      o
    })
countDStream.foreachRDD(rdd=>{println(s); s += rdd.count()})

the variable count stores the number of records of each batch, but it can't
be reset to 0.
I mean this statement *count = 0L *does not work. is my code right?



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-size-of-DStream-tp13769p13778.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


RE: Spark streaming: size of DStream

Posted by "Shao, Saisai" <sa...@intel.com>.
Hi,

I think all the received stream will generate a RDD in each batch duration even there is no data feed in (an empty RDD will be generated). So you cannot use number of RDD to judge whether there is any data received.

One way is to do this in DStream/foreachRDD(), like

a.foreachRDD { r =>
if (r.count() == 0) {
   do something
  } else {
   do some other things.
  }
}

You can try it.

Thanks
Jerry


-----Original Message-----
From: julyfire [mailto:helloweibo@gmail.com] 
Sent: Tuesday, September 09, 2014 3:42 PM
To: user@spark.incubator.apache.org
Subject: RE: Spark streaming: size of DStream

Hi Jerry,

Thanks for your reply.
I use spark streaming to receive the flume stream, then I need to do a judgement, in each batchDuration, if the received stream has data, then I should do something, if no data, do the other thing. Then I thought the
count() can give me the measure, but it returns a DStream, not a number.
so is there a way to achieve this case?




--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-size-of-DStream-tp13769p13775.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org For additional commands, e-mail: user-help@spark.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


RE: Spark streaming: size of DStream

Posted by julyfire <he...@gmail.com>.
Hi Jerry,

Thanks for your reply.
I use spark streaming to receive the flume stream, then I need to do a
judgement, in each batchDuration, if the received stream has data, then I
should do something, if no data, do the other thing. Then I thought the
count() can give me the measure, but it returns a DStream, not a number.
so is there a way to achieve this case?




--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-size-of-DStream-tp13769p13775.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


RE: Spark streaming: size of DStream

Posted by "Shao, Saisai" <sa...@intel.com>.
Hi, 

Is there any specific scenario which needs to know the RDD numbers in the DStream? According to my knowledge DStream will generate one RDD in each right batchDuration, some old rdd will be remembered for windowing-like function, and will be removed when useless. The hashmap generatedRDDs in DStream.scala contains the rdd as you wanted, though you cannot call it from app. 

Besides the count() API returns the records number of this DStream's each RDD, not the number of RDD, the number of RDD should always be 1 as I understand.

Thanks
Jerry

-----Original Message-----
From: julyfire [mailto:helloweibo@gmail.com] 
Sent: Tuesday, September 09, 2014 2:42 PM
To: user@spark.incubator.apache.org
Subject: Spark streaming: size of DStream

I want to implement the following logic:

val stream = getFlumeStream() // a DStream

if(size_of_stream > 0)  // if the DStream contains some RDD

  stream.someTransfromation

stream.count() can figure out the number of RDD in a DStream, but it return a DStream[Long] and can't compare with a number.

does anyone know how to get the number of RDD in a DStream?

Thanks!



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-size-of-DStream-tp13769.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org For additional commands, e-mail: user-help@spark.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org