You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by daniel williams <da...@gmail.com> on 2018/01/12 22:36:27 UTC

flatMapGroupsWithState not timing out (spark 2.2.1)

Hi,

I’m attempting to leverage flatMapGroupsWithState to handle some arbitrary
aggregations and am noticing a couple of things:

   - *ProcessingTimeTimeout* + *setTimeoutDuration* timeout not being
   honored
   - *EventTimeTimeout* + watermark value not being honored.
   - *EventTimeTimeout* + *setTimeoutTimestamp* not being honored

I’ve come to this conclusion due to never hitting a conditional check (with
log output) for the *hasTimedOut* property. Each of these scenarios was
tested in isolation from each other and all three exhibited the same
behavior — failure to reach a timeout event, and Spark induced huge
duration between batches.

The test was 2000 messages read from a Kafka topic with two distinct groups
(1000 messages / group).

To give an idea of what I’m attempting to do: aggregate all events into a
single bucket given some timeout expiry.

Also, it should be noted, in this example I’m attempting to get the *final*
value of the GroupState object as its timedout. This is why I attempt to do
a second pass on the timeout — but that doesn’t really matter as I’m not
even getting the timeout event.

My code is here:

    val stream = reader
      .load()
      .selectExpr(
        "CAST(key AS STRING)",
        "topic",
        "CAST(value AS BINARY)",
        "timestamp"
      )
      .as[KafkaLoadType].map(el =>
getJacksonReader(classOf[Data]).readValue[Data](new String(el._3)))
      .withWatermark("when", "10 seconds")
      .groupByKey(f => (f.name, f.when))
      .flatMapGroupsWithState[SessionInfo, Result](OutputMode.Append,
GroupStateTimeout.EventTimeTimeout()) {
      case ((name, when),
      events: Iterator[Data], state: GroupState[SessionInfo]) => {

        state.setTimeoutTimestamp(DateTime.now.plusMinutes(1).getMillis)

        info("Starting flatMapGroupsWithState func")

        val asList = events.toList

        info(s"${name} iterator size: ${asList.size}")

        if (state.exists) {
          info(s"State exists: ${state.get}")
        }

        var session = state.getOption.getOrElse(SessionInfo.zero(when, name))

        asList.foreach(e => {
          session = session.add(e.value)
        })

        info(s"Updating value to ${session}")

        state.update(session)

        val result = if (state.hasTimedOut && !state.get.finalized) {
          info("State has timedout ... finalizing")

          state.update(state.get.copy(finalized = true))

          Iterator(Option(state.get).map(r => Result(r.when, r.name,
r.value)).get)
        } else if (state.hasTimedOut && state.get.finalized) {
          info("State has timedout AND is finalized")

          val r = state.get

          state.remove()

          Iterator(Option(r).map(r => Result(r.when, r.name, r.value)).get)
        } else {
          val result = state.get

          info(s"Returning ${result}")

          //          state.remove()

          Iterator(Option(result).map(r => Result(r.when, r.name, r.value)).get)
        }

        info("Exiting flatMapGroupsWithState func")

        result
      }
    }.writeStream.trigger(Trigger.ProcessingTime(500))
      .format("console").option("truncate", false)
      .outputMode(OutputMode.Append)
      .start()

​



Thanks for any help.

dan

Re: flatMapGroupsWithState not timing out (spark 2.2.1)

Posted by Tathagata Das <ta...@gmail.com>.
Aah okay!

How are testing whether there is a timeout? The situation that would lead
to the *EventTimeTimeout* would be the following.
1. Send bunch of data to group1, to set the timeout timestamp using
event-time
2. Then send more data to group2 only, to advance the watermark (since it's
based on event time across all the groups) and see timeout occurs.
Note that you have to keep sending some data to other groups so
that microbatches are triggered continuously and watermark is recalculated.
If you send bunch of data and then stop sending and just wait, then the
watermark will not advance (as there is no data to recalculate watermark)
and therefore may not hit the condition watermark > timeout timestamp.

For *ProcessingTimeTimeout* the situation is different. That should
rely solely on the wallclock time, not on any watermark.
In that case, you still have to keep sending data to continuously trigger
microbatches, as without any data, there wont be microbatches triggered and
therefore no timeouts will be processed. This is a known issue that we will
fix. It should work fine if you keep pushing data to group2; group1 should
timeout.

Did that make sense?

TD

On Fri, Jan 12, 2018 at 3:43 PM, daniel williams <da...@gmail.com>
wrote:

> Hi Tathagata,
>
> Thanks for the response and consideration. Noted in my points in my email
> that was actually one of the tests that I did (EventTimeTimeout solely with
> watermark) for the group — and it again never timed out. The code I posted
> was a later test where I was trying to use some of the additional
> GroupState methods to force a timeout. I suppose I could create an
> additional test of when.plus(1 minute) and see what happens.
>
> Thanks and let me know if you have any more thoughts.
>
> dan
> ​
>
>
> On Fri, Jan 12, 2018 at 4:39 PM, Tathagata Das <
> tathagata.das1565@gmail.com> wrote:
>
>> Hello Dan,
>>
>> From your code, it seems like you are setting the timeout timestamp based
>> on the current processing-time / wall-clock-time, while the watermark is
>> being calculated on the event-time ("when" column). The semantics of the
>> EventTimeTimeout is that when the last set timeout timestamp of a group
>> becomes older than the watermark (that is calculated across all groups)
>> because that group did not get any new data for a while, then there is a
>> timeout and the function is called with hasTimedOut to true. However, in
>> this case, the timeout timestamp is being from a different source of time
>> (using the wall clock time) than the watermark (using event-time), so they
>> may not correlate correctly. For example, if the event-time in the test
>> data is such that it is always one hour behind the wall clock time, the
>> watermark will be atleast 1 hour older than the set timeout timestamp, and
>> the group would have to not received data for more than an hour before it
>> times out.
>>
>> So I would verify what is the gap between the event-time in data, and the
>> wall-clock time that is being used to set to understand what is going on.
>> Or even better, just use the event-time in the data to calculate the
>> timeout timestamp and not use processing time timeout anywhere.
>>
>> Let me know how it goes.
>>
>> TD
>>
>>
>>
>> On Fri, Jan 12, 2018 at 2:36 PM, daniel williams <
>> daniel.williams@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I’m attempting to leverage flatMapGroupsWithState to handle some
>>> arbitrary aggregations and am noticing a couple of things:
>>>
>>>    - *ProcessingTimeTimeout* + *setTimeoutDuration* timeout not being
>>>    honored
>>>    - *EventTimeTimeout* + watermark value not being honored.
>>>    - *EventTimeTimeout* + *setTimeoutTimestamp* not being honored
>>>
>>> I’ve come to this conclusion due to never hitting a conditional check
>>> (with log output) for the *hasTimedOut* property. Each of these
>>> scenarios was tested in isolation from each other and all three exhibited
>>> the same behavior — failure to reach a timeout event, and Spark induced
>>> huge duration between batches.
>>>
>>> The test was 2000 messages read from a Kafka topic with two distinct
>>> groups (1000 messages / group).
>>>
>>> To give an idea of what I’m attempting to do: aggregate all events into
>>> a single bucket given some timeout expiry.
>>>
>>> Also, it should be noted, in this example I’m attempting to get the
>>> *final* value of the GroupState object as its timedout. This is why I
>>> attempt to do a second pass on the timeout — but that doesn’t really matter
>>> as I’m not even getting the timeout event.
>>>
>>> My code is here:
>>>
>>>     val stream = reader
>>>       .load()
>>>       .selectExpr(
>>>         "CAST(key AS STRING)",
>>>         "topic",
>>>         "CAST(value AS BINARY)",
>>>         "timestamp"
>>>       )
>>>       .as[KafkaLoadType].map(el => getJacksonReader(classOf[Data]).readValue[Data](new String(el._3)))
>>>       .withWatermark("when", "10 seconds")
>>>       .groupByKey(f => (f.name, f.when))
>>>       .flatMapGroupsWithState[SessionInfo, Result](OutputMode.Append, GroupStateTimeout.EventTimeTimeout()) {
>>>       case ((name, when),
>>>       events: Iterator[Data], state: GroupState[SessionInfo]) => {
>>>
>>>         state.setTimeoutTimestamp(DateTime.now.plusMinutes(1).getMillis)
>>>
>>>         info("Starting flatMapGroupsWithState func")
>>>
>>>         val asList = events.toList
>>>
>>>         info(s"${name} iterator size: ${asList.size}")
>>>
>>>         if (state.exists) {
>>>           info(s"State exists: ${state.get}")
>>>         }
>>>
>>>         var session = state.getOption.getOrElse(SessionInfo.zero(when, name))
>>>
>>>         asList.foreach(e => {
>>>           session = session.add(e.value)
>>>         })
>>>
>>>         info(s"Updating value to ${session}")
>>>
>>>         state.update(session)
>>>
>>>         val result = if (state.hasTimedOut && !state.get.finalized) {
>>>           info("State has timedout ... finalizing")
>>>
>>>           state.update(state.get.copy(finalized = true))
>>>
>>>           Iterator(Option(state.get).map(r => Result(r.when, r.name, r.value)).get)
>>>         } else if (state.hasTimedOut && state.get.finalized) {
>>>           info("State has timedout AND is finalized")
>>>
>>>           val r = state.get
>>>
>>>           state.remove()
>>>
>>>           Iterator(Option(r).map(r => Result(r.when, r.name, r.value)).get)
>>>         } else {
>>>           val result = state.get
>>>
>>>           info(s"Returning ${result}")
>>>
>>>           //          state.remove()
>>>
>>>           Iterator(Option(result).map(r => Result(r.when, r.name, r.value)).get)
>>>         }
>>>
>>>         info("Exiting flatMapGroupsWithState func")
>>>
>>>         result
>>>       }
>>>     }.writeStream.trigger(Trigger.ProcessingTime(500))
>>>       .format("console").option("truncate", false)
>>>       .outputMode(OutputMode.Append)
>>>       .start()
>>>
>>> ​
>>>
>>>
>>>
>>> Thanks for any help.
>>>
>>> dan
>>>
>>
>>
>
>
> --
> -dan
>

Re: flatMapGroupsWithState not timing out (spark 2.2.1)

Posted by Tathagata Das <ta...@gmail.com>.
Hello Dan,

From your code, it seems like you are setting the timeout timestamp based
on the current processing-time / wall-clock-time, while the watermark is
being calculated on the event-time ("when" column). The semantics of the
EventTimeTimeout is that when the last set timeout timestamp of a group
becomes older than the watermark (that is calculated across all groups)
because that group did not get any new data for a while, then there is a
timeout and the function is called with hasTimedOut to true. However, in
this case, the timeout timestamp is being from a different source of time
(using the wall clock time) than the watermark (using event-time), so they
may not correlate correctly. For example, if the event-time in the test
data is such that it is always one hour behind the wall clock time, the
watermark will be atleast 1 hour older than the set timeout timestamp, and
the group would have to not received data for more than an hour before it
times out.

So I would verify what is the gap between the event-time in data, and the
wall-clock time that is being used to set to understand what is going on.
Or even better, just use the event-time in the data to calculate the
timeout timestamp and not use processing time timeout anywhere.

Let me know how it goes.

TD



On Fri, Jan 12, 2018 at 2:36 PM, daniel williams <da...@gmail.com>
wrote:

> Hi,
>
> I’m attempting to leverage flatMapGroupsWithState to handle some
> arbitrary aggregations and am noticing a couple of things:
>
>    - *ProcessingTimeTimeout* + *setTimeoutDuration* timeout not being
>    honored
>    - *EventTimeTimeout* + watermark value not being honored.
>    - *EventTimeTimeout* + *setTimeoutTimestamp* not being honored
>
> I’ve come to this conclusion due to never hitting a conditional check
> (with log output) for the *hasTimedOut* property. Each of these scenarios
> was tested in isolation from each other and all three exhibited the same
> behavior — failure to reach a timeout event, and Spark induced huge
> duration between batches.
>
> The test was 2000 messages read from a Kafka topic with two distinct
> groups (1000 messages / group).
>
> To give an idea of what I’m attempting to do: aggregate all events into a
> single bucket given some timeout expiry.
>
> Also, it should be noted, in this example I’m attempting to get the
> *final* value of the GroupState object as its timedout. This is why I
> attempt to do a second pass on the timeout — but that doesn’t really matter
> as I’m not even getting the timeout event.
>
> My code is here:
>
>     val stream = reader
>       .load()
>       .selectExpr(
>         "CAST(key AS STRING)",
>         "topic",
>         "CAST(value AS BINARY)",
>         "timestamp"
>       )
>       .as[KafkaLoadType].map(el => getJacksonReader(classOf[Data]).readValue[Data](new String(el._3)))
>       .withWatermark("when", "10 seconds")
>       .groupByKey(f => (f.name, f.when))
>       .flatMapGroupsWithState[SessionInfo, Result](OutputMode.Append, GroupStateTimeout.EventTimeTimeout()) {
>       case ((name, when),
>       events: Iterator[Data], state: GroupState[SessionInfo]) => {
>
>         state.setTimeoutTimestamp(DateTime.now.plusMinutes(1).getMillis)
>
>         info("Starting flatMapGroupsWithState func")
>
>         val asList = events.toList
>
>         info(s"${name} iterator size: ${asList.size}")
>
>         if (state.exists) {
>           info(s"State exists: ${state.get}")
>         }
>
>         var session = state.getOption.getOrElse(SessionInfo.zero(when, name))
>
>         asList.foreach(e => {
>           session = session.add(e.value)
>         })
>
>         info(s"Updating value to ${session}")
>
>         state.update(session)
>
>         val result = if (state.hasTimedOut && !state.get.finalized) {
>           info("State has timedout ... finalizing")
>
>           state.update(state.get.copy(finalized = true))
>
>           Iterator(Option(state.get).map(r => Result(r.when, r.name, r.value)).get)
>         } else if (state.hasTimedOut && state.get.finalized) {
>           info("State has timedout AND is finalized")
>
>           val r = state.get
>
>           state.remove()
>
>           Iterator(Option(r).map(r => Result(r.when, r.name, r.value)).get)
>         } else {
>           val result = state.get
>
>           info(s"Returning ${result}")
>
>           //          state.remove()
>
>           Iterator(Option(result).map(r => Result(r.when, r.name, r.value)).get)
>         }
>
>         info("Exiting flatMapGroupsWithState func")
>
>         result
>       }
>     }.writeStream.trigger(Trigger.ProcessingTime(500))
>       .format("console").option("truncate", false)
>       .outputMode(OutputMode.Append)
>       .start()
>
> ​
>
>
>
> Thanks for any help.
>
> dan
>