You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Justin Grimes <jg...@adzerk.com> on 2015/08/20 18:58:29 UTC

Windowed stream operations -- These are too lazy for some use cases

We are aggregating real time logs of events, and want to do windows of 30
minutes. However, since the computation doesn't start until 30 minutes have
passed, there is a ton of data built up that processing could've already
started on. When it comes time to actually process the data, there is too
much for our cluster to handle at once.

The basic idea is this:

 val mergedMain = mergedStream
      .flatMap(r => ....) // denormalize data for this particular output
stream
      .reduceByKey((x:Array[Int],y:Array[Int]) => sumAggregates(x,y)) //
this would sum over the batches
      .reduceByKeyAndWindow((x:Array[Int],y:Array[Int]) =>
sumAggregates(x,y), 1800000, 1800000) // sum over the windows
      .map(rec => ...) // convert data to other format
      .foreachRDD{ (rdd, time) =>
        rdd.saveAsTextFile(...) // save to text files
      }

I would want the batches to be reduced as soon as they arrive (the first
reduceByKey), since there isn't any reason to wait. Instead all of the
unprocessed data has to be processed at the same time (this data is being
heavily denormalized in some cases, and so generates a bunch of additional
data).

Thanks for any help.

Re: Windowed stream operations -- These are too lazy for some use cases

Posted by Justin Grimes <jg...@adzerk.com>.
I tried something like that. When I tried just doing count() on the
DStream, it didn't seem like it was actually forcing the computation.

What (sort of) worked was doing a forEachRDD((rdd) => rdd.count()), or
doing a print() on the DStream. The only problem was this seemed to add a
lot of processing overhead -- I couldn't figure out exactly why but it
seemed to have something to do with forEachRDD only being executed on the
driver.

On Thu, Aug 20, 2015 at 1:39 PM, Iulian Dragoș <iu...@typesafe.com>
wrote:

> On Thu, Aug 20, 2015 at 6:58 PM, Justin Grimes <jg...@adzerk.com> wrote:
>
> We are aggregating real time logs of events, and want to do windows of 30
>> minutes. However, since the computation doesn't start until 30 minutes have
>> passed, there is a ton of data built up that processing could've already
>> started on. When it comes time to actually process the data, there is too
>> much for our cluster to handle at once.
>>
>> The basic idea is this:
>>
>>  val mergedMain = mergedStream
>>       .flatMap(r => ....) // denormalize data for this particular output
>> stream
>>       .reduceByKey((x:Array[Int],y:Array[Int]) => sumAggregates(x,y)) //
>> this would sum over the batches
>>
> Could you add a dummy action at this point?
>
> val firstStep = mergedStream
>       .flatMap(r => ....) // denormalize data for this particular output stream
>       .reduceByKey((x:Array[Int],y:Array[Int]) => sumAggregates(x,y)) // this would sum over the batches
>       .persist() // this will be reused in windowing operations
>
> firstStep.count() // just to trigger computation
>
> firstStep
>       .reduceByKeyAndWindow((x:Array[Int],y:Array[Int]) => sumAggregates(x,y), 1800000, 1800000) // sum over the windows
>       .map(rec => ...) // convert data to other format
>       .foreachRDD{ (rdd, time) =>
>         rdd.saveAsTextFile(...) // save to text files
>       }
>
>       .reduceByKeyAndWindow((x:Array[Int],y:Array[Int]) =>
>> sumAggregates(x,y), 1800000, 1800000) // sum over the windows
>>       .map(rec => ...) // convert data to other format
>>       .foreachRDD{ (rdd, time) =>
>>         rdd.saveAsTextFile(...) // save to text files
>>       }
>>
>> I would want the batches to be reduced as soon as they arrive (the first
>> reduceByKey), since there isn't any reason to wait. Instead all of the
>> unprocessed data has to be processed at the same time (this data is being
>> heavily denormalized in some cases, and so generates a bunch of additional
>> data).
>>
>> Thanks for any help.
>>
> ​
> --
>
> --
> Iulian Dragos
>
> ------
> Reactive Apps on the JVM
> www.typesafe.com
>
>

Re: Windowed stream operations -- These are too lazy for some use cases

Posted by Iulian Dragoș <iu...@typesafe.com>.
On Thu, Aug 20, 2015 at 6:58 PM, Justin Grimes <jg...@adzerk.com> wrote:

We are aggregating real time logs of events, and want to do windows of 30
> minutes. However, since the computation doesn't start until 30 minutes have
> passed, there is a ton of data built up that processing could've already
> started on. When it comes time to actually process the data, there is too
> much for our cluster to handle at once.
>
> The basic idea is this:
>
>  val mergedMain = mergedStream
>       .flatMap(r => ....) // denormalize data for this particular output
> stream
>       .reduceByKey((x:Array[Int],y:Array[Int]) => sumAggregates(x,y)) //
> this would sum over the batches
>
Could you add a dummy action at this point?

val firstStep = mergedStream
      .flatMap(r => ....) // denormalize data for this particular output stream
      .reduceByKey((x:Array[Int],y:Array[Int]) => sumAggregates(x,y))
// this would sum over the batches
      .persist() // this will be reused in windowing operations

firstStep.count() // just to trigger computation

firstStep
      .reduceByKeyAndWindow((x:Array[Int],y:Array[Int]) =>
sumAggregates(x,y), 1800000, 1800000) // sum over the windows
      .map(rec => ...) // convert data to other format
      .foreachRDD{ (rdd, time) =>
        rdd.saveAsTextFile(...) // save to text files
      }

      .reduceByKeyAndWindow((x:Array[Int],y:Array[Int]) =>
> sumAggregates(x,y), 1800000, 1800000) // sum over the windows
>       .map(rec => ...) // convert data to other format
>       .foreachRDD{ (rdd, time) =>
>         rdd.saveAsTextFile(...) // save to text files
>       }
>
> I would want the batches to be reduced as soon as they arrive (the first
> reduceByKey), since there isn't any reason to wait. Instead all of the
> unprocessed data has to be processed at the same time (this data is being
> heavily denormalized in some cases, and so generates a bunch of additional
> data).
>
> Thanks for any help.
>
​
-- 

--
Iulian Dragos

------
Reactive Apps on the JVM
www.typesafe.com