You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Apache Spark (JIRA)" <ji...@apache.org> on 2014/11/14 01:22:33 UTC

[jira] [Commented] (SPARK-4392) Event proration based on event timestamps

    [ https://issues.apache.org/jira/browse/SPARK-4392?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14211567#comment-14211567 ] 

Apache Spark commented on SPARK-4392:
-------------------------------------

User 'bijaybisht' has created a pull request for this issue:
https://github.com/apache/spark/pull/2633

> Event proration based on event timestamps
> -----------------------------------------
>
>                 Key: SPARK-4392
>                 URL: https://issues.apache.org/jira/browse/SPARK-4392
>             Project: Spark
>          Issue Type: New Feature
>          Components: Streaming
>            Reporter: Bijay Singh Bisht
>
> Topic: Spark Streaming using Event TimeStamps
> Spark streaming creates a batch (RDD) of events every T duration. The batch is based on a schedule and the timestamp associated with the batch is the time at which it was scheduled by Spark. Spark applied timestamp may be less relevant than the timestamp for which the event was originally meant to be.
> The fundamental reason for the event timestamp to differ from the spark stamp is the delay in event generation in the upstream system and delay in transporting the event to spark after the event generation. The problem is compounded in case of events having a start and an end with both the timestamps packed in a single event generated after the event ends as illustrated in the following diagram.
> (upstream) -------s--------e----g---------------->
> (spark ) ------------------------r------------->
> Horizontal axis is time. Event starts at s ends at e and the event record is generated at g, which is then received by spark at r.
> So there is a need to create batches which only contain the relevant events or the relevant proportion of the events according to the original timestamp passed to Spark as a part of the received tuples. Lets refer to a batch which has all the events occurring in the time window it represents as a bin. So a bin from T1 - T2 will 'only have events' which occurred in the period T1 - T2. The definition of the bin can be extended to include ‘all the events’ which occurred in a given period, however second constraint is harder to satisfy in practice, because events can be arbitrarily delayed.
> For the rest of the discussion the definition of the batch and the bin shall be as per the previous paragraph.
> Bin sizes determine time series time granularity and is an independent consideration in itself i.e independent of the batch/event/delay.
> Lets say that batch size is T and the bin size is n*T and an event is delayed (for reception) at a maximum by d*T. So in order to generate a bin, n + d batches of size T are required.
> Conversely every batch is going to contribute to current up till the last ceiling((n + d)/ n) bins.
> For for batch @ t. The contents can be seen as T1 @ t (where the notation T1 @ t implies events corresponding to bin T1 from batch t), T1 - 1 @t, T1 - 2 @t ... T1 - m @ t (where T1 - 1, represents the bin previous to T1 and m = ceiling(n + d)/ n))).
> We can then de-multiplex the contributions from batch @ t into bins T1, T1 - 1, T1 - 2, T1 -3, resulting into streams which represent partial bins relative to the batch stream. So a stream i represents partial bin T1 - i received at t. This way the spark application can deliver incremental bins to the downstream in the most real time possible. Now depending on how the downstream application can handle the partial bins, the definition and the generation of the streams needs to be controlled.
> Cases:
> The downstream application can handle incremental updates to the bin (i.e. a partial bin = current partial bin + latest partial bin). For this what is required is m streams which send the updates every T interval where
> Stream 1: T1 @ t
> Stream 2: T1 - 1 @ t
> …
> Stream m: T1 - m @ t.
> The downstream application can only handle full updates to the bin ( i.e. partial bin = latest partial bin). For this what is required is m streams which send the updates every T interval where
> Stream 1: T1 @ t
> Stream 2: T1 - 1 @ t + @ t - 1
> ...
> Stream m: T1 - m @ t + … + @ t - m
> i.e a bin is getting updated at every T until the bin is final. The first stream represents the most current bin with the latest cumulative update. The next stream represents the previous bin with the latest cumulative update and so on. Until the last stream which represents a final bin.
> The downstream application cannot handle updates to a bin. This is basically the last stream from case 2 (highlighted in bold) with the exception that it slides by nT and not T. Note that the next bin after T1 @ t is T1 + 1 @ t + n*T, because the size of the bin is n*T.
> Typically each stream needs to treated similarly because it represents that same kind of content, however there can be use cases where the stream may be required to be treated differently. A consideration for the API.
> Implementation:
> In order to transform a batch stream to a partial bin stream, we can filter the events and put the prorated events in a bin streams representing T @ t, T-1 @ t and so on.
> For this we can define a new DStream which generates a DStream by prorating the data from batch to a bin corresponding to the stream.
> For the use case 2 which requires progressively accumulating all the events for a bin. A new DStream is required which generates a pulsating window which goes from (s*n + 1) to (s*n + n) where s is the partial stream index. A stream index 0 implies that it is the most current partial bin stream.
> APIs
> BinStreamerT
> This will return a BinStreamer object.
> The BinStreamer object can be used to generate incremental bin streams (case 1)/ final bin (case 3) stream/ updated bin streams (case 2) using the following APIs.
> BinStreamer.incrementalStreams(sizeInNumBatches: Int, delayIndex: Int, numStreams: Int) : Seq[BinStream[(T,percentage)]]
> BinStreamer.finalStream(sizeInNumBatches: Int, delayIndex: Int) : BinStream[(T,percentage)]
> BinStreamer.updatedStreams(sizeInNumBatches: Int, delayIndex: Int, numStreams: Int) : Seq[BinStream[(T,percentage)]]
>     DStream[T] : This is the batch stream.
>         start : Closure to get the start time from the record.
> end : Closure to get the end time from the record.
> sizeInNumBatches : The size of bin as a multiple of batch size.
> delayIndex : The maximum delay between the event relevance and event reception.
> numStreams: This is the number of bin streams. Even though it is fixed by batch size, bin size and the delayIndex. This is an optional parameter to control the number of output Streams and it does so by delaying the most current bin.
> Each BinStream will wrap a DStream.
> def prorate(binStart: Time, binEnd: Time)(x: T) = {
>   val sx = startFunc(x)
>   val ex = endFunc(x)
>   // Even though binStart is not inclusive, binStart here implies limit x as x approaches binStart+
>   val s = if (sx > binStart) sx else binStart
>   val e = if (ex < binEnd) ex else binEnd
>   if (ex == sx) {
>     (x, 1.0)
>   }
>   else {
>     (x, (e - s) / (ex - sx))
>   }
> }
> def filter(binStart: Time, binEnd: Time)(x: T) = {
> // The flow is starting in the subsequent bin
> if (startFunc(x) > binEnd) false
> // The flow ended in the prior bin
> else if (endFunc(x) <= binStart) false
> // start(x) approaches from binEnd+
> else if (startFunc(x) == binEnd && endFunc(x) > binEnd) false
> else true
> }



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org