You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Imran Alam <im...@newscred.com> on 2015/07/10 11:07:30 UTC

reduceByKeyAndWindow with initial state

We have a streaming job that makes use of reduceByKeyAndWindow
<https://github.com/apache/spark/blob/v1.4.0/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala#L334-L341>.
We want this to work with an initial state. The idea is to avoid losing
state if the streaming job is restarted, also to take historical data into
account for the windows. But reduceByKeyAndWindow doesn't accept any
"initialRDD" parameter like updateStateByKey
<https://github.com/apache/spark/blob/v1.4.0/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala#L435-L445>
does.

The plan is to extend reduceByKeyAndWindow to accept an "initalRDDs"
parameter, so that the DStream starts with those RDDs as initial value of
"generatedRDD" rather than an empty map. But the "generatedRDD" is a
private variable, so I'm bit confused on how to proceed with the plan.

Re: reduceByKeyAndWindow with initial state

Posted by Imran Alam <im...@newscred.com>.
I'm talking about the variant with inverseReduce. For example, if my batch
duration is 1s and window duration is 10s, when I start the streaming job
I'd want to start with a complete window instead of empty window, given I
already have the RDDs for the batches that are missing at startup. After
1s, I'd want the oldest batch to be dropped off, and the inverse reduce
being applied to all RDDs as usual.

On Sat, Jul 11, 2015 at 6:50 AM, Tathagata Das <td...@databricks.com> wrote:

> Are you talking about reduceByKeyAndWindow with or without inverse reduce?
>
> TD
>
> On Fri, Jul 10, 2015 at 2:07 AM, Imran Alam <im...@newscred.com> wrote:
>
>> We have a streaming job that makes use of reduceByKeyAndWindow
>> <https://github.com/apache/spark/blob/v1.4.0/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala#L334-L341>.
>> We want this to work with an initial state. The idea is to avoid losing
>> state if the streaming job is restarted, also to take historical data into
>> account for the windows. But reduceByKeyAndWindow doesn't accept any
>> "initialRDD" parameter like updateStateByKey
>> <https://github.com/apache/spark/blob/v1.4.0/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala#L435-L445>
>> does.
>>
>> The plan is to extend reduceByKeyAndWindow to accept an "initalRDDs"
>> parameter, so that the DStream starts with those RDDs as initial value of
>> "generatedRDD" rather than an empty map. But the "generatedRDD" is a
>> private variable, so I'm bit confused on how to proceed with the plan.
>>
>>
>

Re: reduceByKeyAndWindow with initial state

Posted by Tathagata Das <td...@databricks.com>.
Are you talking about reduceByKeyAndWindow with or without inverse reduce?

TD

On Fri, Jul 10, 2015 at 2:07 AM, Imran Alam <im...@newscred.com> wrote:

> We have a streaming job that makes use of reduceByKeyAndWindow
> <https://github.com/apache/spark/blob/v1.4.0/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala#L334-L341>.
> We want this to work with an initial state. The idea is to avoid losing
> state if the streaming job is restarted, also to take historical data into
> account for the windows. But reduceByKeyAndWindow doesn't accept any
> "initialRDD" parameter like updateStateByKey
> <https://github.com/apache/spark/blob/v1.4.0/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala#L435-L445>
> does.
>
> The plan is to extend reduceByKeyAndWindow to accept an "initalRDDs"
> parameter, so that the DStream starts with those RDDs as initial value of
> "generatedRDD" rather than an empty map. But the "generatedRDD" is a
> private variable, so I'm bit confused on how to proceed with the plan.
>
>