You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by "Zhang, Lubo" <lu...@intel.com> on 2017/07/26 08:54:05 UTC

Questions about Stateful Operations in SS

Hi all

I have a question about the Stateful  operations [map/flatmap]GroupsWithState in Structured streaming. Issue are as follows:

Take StructuredSessionization case for example, first I input two words like apache and spark in batch 0, then input another word Hadoop in batch 1 until timeout happens (here the timeout type is ProcessingTimeout). So I can see both words apache and spark are outputed since each group state is timedout. But if I input the same word apache in batch 1 which already existed in batch 0, the result shows only spark is expired.  I deep into this and find the code https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala#L131  deal with the group state update, it first process new group data and set the flag hasTimedout to be false in update func , which result the key already timedout to be just update. I know the timeout function call will not occur until there is new data to trigger, but  I am wondering why don't we first process timeout keys, so we can retrieve the expired data exist in batch 0 in user-given function

def statefunc(key: K, values: Iterator [V],state: GroupState [S]): U = {

    if (state.hasTimedOut) {                // If called when timing out, remove the state
      ToDO;
      state.remove()

} else if (state.exists) {
}
}


Thanks
Lubo


RE: Questions about Stateful Operations in SS

Posted by "Zhang, Lubo" <lu...@intel.com>.
Got you, thanks for your reply.

Best regards
Lubo

From: Tathagata Das [mailto:tathagata.das1565@gmail.com]
Sent: Thursday, July 27, 2017 3:08 AM
To: Zhang, Lubo <lu...@intel.com>
Cc: dev@spark.apache.org
Subject: Re: Questions about Stateful Operations in SS

Hello Lubo,

The idea of timeouts is to make a best-effort and last-resort effort to process a key, when it has not received data for a while. With processing time timeout is 1 minute, the system guarantees that it will not timeout unless at least 1 minute has passed. Defining a precise timing on when the timeout is triggered, is really hard for many reasons (distributed system, lack of precise clock-synch, need for deterministic re-executions for fault-tolerance, etc.). We made a design decision to process timed out data after processing the data in a batch, but that choice should not affect your business logic if your logic is constructed the right way. So your business logic should set loosely defined timeout durations, and not depend on the exactly timing of when the timeouts are hit.

TD

On Wed, Jul 26, 2017 at 1:54 AM, Zhang, Lubo <lu...@intel.com>> wrote:
Hi all

I have a question about the Stateful  operations [map/flatmap]GroupsWithState in Structured streaming. Issue are as follows:

Take StructuredSessionization case for example, first I input two words like apache and spark in batch 0, then input another word Hadoop in batch 1 until timeout happens (here the timeout type is ProcessingTimeout). So I can see both words apache and spark are outputed since each group state is timedout. But if I input the same word apache in batch 1 which already existed in batch 0, the result shows only spark is expired.  I deep into this and find the code https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala#L131  deal with the group state update, it first process new group data and set the flag hasTimedout to be false in update func , which result the key already timedout to be just update. I know the timeout function call will not occur until there is new data to trigger, but  I am wondering why don’t we first process timeout keys, so we can retrieve the expired data exist in batch 0 in user-given function

def statefunc(key: K, values: Iterator [V],state: GroupState [S]): U = {

    if (state.hasTimedOut) {                // If called when timing out, remove the state
      ToDO;
      state.remove()

} else if (state.exists) {
}
}


Thanks
Lubo



RE: Questions about Stateful Operations in SS

Posted by "Zhang, Lubo" <lu...@intel.com>.

From: Tathagata Das [mailto:tathagata.das1565@gmail.com]
Sent: Thursday, July 27, 2017 3:08 AM
To: Zhang, Lubo <lu...@intel.com>
Cc: dev@spark.apache.org
Subject: Re: Questions about Stateful Operations in SS

Hello Lubo,

The idea of timeouts is to make a best-effort and last-resort effort to process a key, when it has not received data for a while. With processing time timeout is 1 minute, the system guarantees that it will not timeout unless at least 1 minute has passed. Defining a precise timing on when the timeout is triggered, is really hard for many reasons (distributed system, lack of precise clock-synch, need for deterministic re-executions for fault-tolerance, etc.). We made a design decision to process timed out data after processing the data in a batch, but that choice should not affect your business logic if your logic is constructed the right way. So your business logic should set loosely defined timeout durations, and not depend on the exactly timing of when the timeouts are hit.

TD

On Wed, Jul 26, 2017 at 1:54 AM, Zhang, Lubo <lu...@intel.com>> wrote:
Hi all

I have a question about the Stateful  operations [map/flatmap]GroupsWithState in Structured streaming. Issue are as follows:

Take StructuredSessionization case for example, first I input two words like apache and spark in batch 0, then input another word Hadoop in batch 1 until timeout happens (here the timeout type is ProcessingTimeout). So I can see both words apache and spark are outputed since each group state is timedout. But if I input the same word apache in batch 1 which already existed in batch 0, the result shows only spark is expired.  I deep into this and find the code https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala#L131  deal with the group state update, it first process new group data and set the flag hasTimedout to be false in update func , which result the key already timedout to be just update. I know the timeout function call will not occur until there is new data to trigger, but  I am wondering why don’t we first process timeout keys, so we can retrieve the expired data exist in batch 0 in user-given function

def statefunc(key: K, values: Iterator [V],state: GroupState [S]): U = {

    if (state.hasTimedOut) {                // If called when timing out, remove the state
      ToDO;
      state.remove()

} else if (state.exists) {
}
}


Thanks
Lubo



Re: Questions about Stateful Operations in SS

Posted by Tathagata Das <ta...@gmail.com>.
Hello Lubo,

The idea of timeouts is to make a best-effort and last-resort effort to
process a key, when it has not received data for a while. With processing
time timeout is 1 minute, the system guarantees that it will not timeout
unless at least 1 minute has passed. Defining a precise timing on when the
timeout is triggered, is really hard for many reasons (distributed system,
lack of precise clock-synch, need for deterministic re-executions for
fault-tolerance, etc.). We made a design decision to process timed out data
after processing the data in a batch, but that choice should not affect
your business logic if your logic is constructed the right way. So your
business logic should set loosely defined timeout durations, and not depend
on the exactly timing of when the timeouts are hit.

TD

On Wed, Jul 26, 2017 at 1:54 AM, Zhang, Lubo <lu...@intel.com> wrote:

> Hi all
>
>
>
> I have a question about the Stateful  operations
> [map/flatmap]GroupsWithState in Structured streaming. Issue are as follows:
>
>
>
> Take StructuredSessionization case for example, first I input two words
> like apache and spark in batch 0, then input another word Hadoop in batch 1
> until timeout happens (here the timeout type is ProcessingTimeout). So I
> can see both words apache and spark are outputed since each group state is
> timedout. But if I input the same word apache in batch 1 which already
> existed in batch 0, the result shows only spark is expired.  I deep into
> this and find the code https://github.com/apache/
> spark/blob/master/sql/core/src/main/scala/org/apache/
> spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala#L131  deal
> with the group state update, it first process new group data and set the
> flag hasTimedout to be false in update func , which result the key already
> timedout to be just update. I know the timeout function call will not
> occur until there is new data to trigger, but  I am wondering why don’t
> we first process timeout keys, so we can retrieve the expired data exist in
> batch 0 in user-given function
>
>
>
> def statefunc(key: K, values: Iterator [V],state: GroupState [S]): U = {
>
>     *if* (state.hasTimedOut) {                // If called when timing out, remove the state
>
>       ToDO;
>
>       state.remove()
>
>
>
> } *else* *if* (state.exists) {
>
> }
>
> }
>
>
>
>
>
> Thanks
>
> Lubo
>
>
>