You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Tobias Pfeiffer <tg...@preferred.jp> on 2015/07/03 06:05:29 UTC

Re: reduceByKeyAndWindow, but using log timestamps instead of clock seconds

Hi,

On Thu, Jan 29, 2015 at 9:52 AM, Tobias Pfeiffer <tg...@preferred.jp> wrote:

> Hi,
>
> On Thu, Jan 29, 2015 at 1:54 AM, YaoPau <jo...@gmail.com> wrote:
>>
>> My thinking is to maintain state in an RDD and update it an persist it
>> with
>> each 2-second pass, but this also seems like it could get messy.  Any
>> thoughts or examples that might help me?
>>
>
> I have just implemented some timestamp-based windowing on DStreams (can't
> share the code now, but will be published a couple of months ahead),
> although with the assumption that items are in correct order. The main
> challenge (rather technical) was to keep proper state across RDD boundaries
> and to tell the state "you can mark this partial window from the last
> interval as 'complete' now" without shuffling too much data around. For
> example, if there are some empty intervals, you don't know when the next
> item to go into the partial window will arrive, or if there will be one at
> all. I guess if you want to have out-of-order tolerance, that will become
> even trickier, as you need to define and think about some timeout for
> partial windows in your state...
>

Sorry, it took ages to get the code published, and I am not involved in
that project any more and cannot provide a lot of support, but if you are
interested in the code, here it is:

https://github.com/jubatus/jubaql-server/blob/master/processor/src/main/scala/us/jubat/jubaql_server/processor/SlidingWindow.scala

The usage can be seen in

https://github.com/jubatus/jubaql-server/blob/master/processor/src/test/scala/us/jubat/jubaql_server/processor/SlidingStreamSpec.scala

and it basically boils down to

  val inputStream: DStream[(Long, T)] = ssc.queueStream(itemsQueue,
oneAtATime = true)
  val windowStream: DStream[(Long, (Long, T))] =
SlidingWindow.byTimestamp(inputStream,
length, step)

where the first Long in the result is a window ID and the second Long is
the original timestamp, so now you can use any of the groupByKey,
reduceByKey etc. functions.

Hope this helps anyone,
Tobias