You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Tathagata Das (JIRA)" <ji...@apache.org> on 2017/03/20 10:54:41 UTC

[jira] [Resolved] (SPARK-19838) Adding Processing Time based timeout

     [ https://issues.apache.org/jira/browse/SPARK-19838?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Tathagata Das resolved SPARK-19838.
-----------------------------------
       Resolution: Fixed
    Fix Version/s: 2.2.0

https://github.com/apache/spark/pull/17179

> Adding Processing Time based timeout
> ------------------------------------
>
>                 Key: SPARK-19838
>                 URL: https://issues.apache.org/jira/browse/SPARK-19838
>             Project: Spark
>          Issue Type: Sub-task
>          Components: Structured Streaming
>    Affects Versions: 2.1.0
>            Reporter: Tathagata Das
>            Assignee: Tathagata Das
>             Fix For: 2.2.0
>
>
> When a key does not get any new data in mapGroupsWithState, the mapping function is never called on it. So we need a timeout feature that calls the function again in such cases, so that the user can decide whether to continue waiting or clean up (remove state, save stuff externally, etc.).
> Timeouts can be either based  on processing time or event time. This JIRA is for processing time, but defines the high level API design for both. The usage would look like this 
> {code}
> def stateFunction(key: K, value: Iterator[V], state: KeyedState[S]): U = {
>   ...
>   state.setTimeoutDuration(10000)
>   ...
> }
> dataset					// type is Dataset[T]
>   .groupByKey[K](keyingFunc)   // generates KeyValueGroupedDataset[K, T]
>   .mapGroupsWithState[S, U](
>      func = stateFunction, 
>      timeout = KeyedStateTimeout.withProcessingTime)	// returns Dataset[U]
> {code}
> Note the following design aspects. 
> - The timeout type is provided as a param in mapGroupsWithState as a parameter global to all the keys. This is so that the planner knows this at planning time, and accordingly optimize the execution based on whether to saves extra info in state or not (e.g. timeout durations or timestamps).
> - The exact timeout duration is provided inside the function call so that it can be customized on a per key basis.
> - When the timeout occurs for a key, the function is called with no values, and {{KeyedState.isTimingOut()}} set to {{true}}.
> - The timeout is reset for key every time the function is called on the key, that is, when the key has new data, or the key has timed out. So the user has to set the timeout duration everytime the function is called, otherwise there will not be any timeout set.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org