You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Hai Lu <lh...@apache.org> on 2019/08/15 16:16:28 UTC

IllegalStateException: TimestampCombiner moved element from to earlier time in Python

Hi,

This is Hai from LinkedIn.

I'm looking into a bug I found internally when using Beam portable API
(Python) on our own Samza runner.

The pipeline looks something like this:

    (p
     | 'read' >> ReadFromKafka(cluster="tracking", topic="PageViewEvent")
     | 'transform' >> beam.Map(lambda event: process_event(event))
     | 'window' >> beam.WindowInto(FixedWindows(15))
     | 'group' >> *beam.CombinePerKey(beam.combiners.CountCombineFn())*
     ...

The problem comes from the combiners which cause the following exception on
Java side:

Caused by: java.lang.IllegalStateException: TimestampCombiner moved element
from 2019-08-15T03:34:*45.000*Z to earlier time 2019-08-15T03:34:*44.999*Z
for window [2019-08-15T03:34:30.000Z..2019-08-15T03:34:*45.000*Z)
    at
org.apache.beam.runners.core.WatermarkHold.shift(WatermarkHold.java:117)
    at
org.apache.beam.runners.core.WatermarkHold.addElementHold(WatermarkHold.java:154)
    at
org.apache.beam.runners.core.WatermarkHold.addHolds(WatermarkHold.java:98)
    at
org.apache.beam.runners.core.ReduceFnRunner.processElement(ReduceFnRunner.java:605)
    at
org.apache.beam.runners.core.ReduceFnRunner.processElements(ReduceFnRunner.java:349)
    at
org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn.processElement(GroupAlsoByWindowViaWindowSetNewDoFn.java:136)

The exception happens here
https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java#L116
when
we check the shifted timestamp to ensure it's before the timestamp.

    if (shifted.isBefore(timestamp)) {
      throw new IllegalStateException(
          String.format(
              "TimestampCombiner moved element from %s to earlier time %s
for window %s",
              BoundedWindow.formatTimestamp(timestamp),
              BoundedWindow.formatTimestamp(shifted),
              window));
    }

As you can see from the exception, the "shifted" is "XXX 44.999" while the
"timestamp" is "XXX 45.000". The "44.999" is coming from
TimestampCombiner.END_OF_WINDOW
<https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/TimestampCombiner.java#L116>
:

    @Override
    public Instant merge(BoundedWindow intoWindow, Iterable<? extends
Instant> mergingTimestamps) {
      return intoWindow.maxTimestamp();
    }

where intoWindow.maxTimestamp() is:

  /** Returns the largest timestamp that can be included in this window. */
  @Override
  public Instant maxTimestamp() {
    *// end not inclusive*
    return *end.minus(1)*;
  }

Hence, the "44.*999*".

And the "45.000" comes from the Python side when the combiner output
results as pre GBK operation: operations.py#PGBKCVOperation#output_key
<https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/worker/operations.py#L889>

    if windows is 0:
      self.output(_globally_windowed_value.with_value((key, value)))
    else:
      self.output(WindowedValue((key, value), *windows[0].end*, windows))

Here when we generate the window value, the timestamp is assigned to the
closed interval end (45.000) as opposed to open interval end (44.999)

Clearly the "end of window" definition is a bit inconsistent across Python
and Java. I'm yet to try this on other runner so not sure whether this is
only an issue for our Samza runner. I tend to think this is a bug but would
like to confirm with you. If this has not been an issue for other runners,
where did I potentially do wrong.

Right now I can bypass this issue by directly using GroupByKey (instead of
any combiners) and do reducing on my own. But it would be much more
convenient for us to use combiners.

Any advice would be extremely helpful. Thank you in advance!

-Hai

Re: IllegalStateException: TimestampCombiner moved element from to earlier time in Python

Posted by Hai Lu <lh...@apache.org>.
I did a simple fix for this issue here:
https://github.com/apache/beam/pull/9364

Tested locally and it fixes the problem. Can someone help take a look?

Thanks,
Hai

On Thu, Aug 15, 2019 at 9:16 AM Hai Lu <lh...@apache.org> wrote:

> Hi,
>
> This is Hai from LinkedIn.
>
> I'm looking into a bug I found internally when using Beam portable API
> (Python) on our own Samza runner.
>
> The pipeline looks something like this:
>
>     (p
>      | 'read' >> ReadFromKafka(cluster="tracking", topic="PageViewEvent")
>      | 'transform' >> beam.Map(lambda event: process_event(event))
>      | 'window' >> beam.WindowInto(FixedWindows(15))
>      | 'group' >> *beam.CombinePerKey(beam.combiners.CountCombineFn())*
>      ...
>
> The problem comes from the combiners which cause the following exception
> on Java side:
>
> Caused by: java.lang.IllegalStateException: TimestampCombiner moved
> element from 2019-08-15T03:34:*45.000*Z to earlier time 2019-08-15T03:34:
> *44.999*Z for window [2019-08-15T03:34:30.000Z..2019-08-15T03:34:*45.000*
> Z)
>     at
> org.apache.beam.runners.core.WatermarkHold.shift(WatermarkHold.java:117)
>     at
> org.apache.beam.runners.core.WatermarkHold.addElementHold(WatermarkHold.java:154)
>     at
> org.apache.beam.runners.core.WatermarkHold.addHolds(WatermarkHold.java:98)
>     at
> org.apache.beam.runners.core.ReduceFnRunner.processElement(ReduceFnRunner.java:605)
>     at
> org.apache.beam.runners.core.ReduceFnRunner.processElements(ReduceFnRunner.java:349)
>     at
> org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn.processElement(GroupAlsoByWindowViaWindowSetNewDoFn.java:136)
>
> The exception happens here
> https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java#L116 when
> we check the shifted timestamp to ensure it's before the timestamp.
>
>     if (shifted.isBefore(timestamp)) {
>       throw new IllegalStateException(
>           String.format(
>               "TimestampCombiner moved element from %s to earlier time %s
> for window %s",
>               BoundedWindow.formatTimestamp(timestamp),
>               BoundedWindow.formatTimestamp(shifted),
>               window));
>     }
>
> As you can see from the exception, the "shifted" is "XXX 44.999" while the
> "timestamp" is "XXX 45.000". The "44.999" is coming from
> TimestampCombiner.END_OF_WINDOW
> <https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/TimestampCombiner.java#L116>
> :
>
>     @Override
>     public Instant merge(BoundedWindow intoWindow, Iterable<? extends
> Instant> mergingTimestamps) {
>       return intoWindow.maxTimestamp();
>     }
>
> where intoWindow.maxTimestamp() is:
>
>   /** Returns the largest timestamp that can be included in this window. */
>   @Override
>   public Instant maxTimestamp() {
>     *// end not inclusive*
>     return *end.minus(1)*;
>   }
>
> Hence, the "44.*999*".
>
> And the "45.000" comes from the Python side when the combiner output
> results as pre GBK operation: operations.py#PGBKCVOperation#output_key
> <https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/worker/operations.py#L889>
>
>     if windows is 0:
>       self.output(_globally_windowed_value.with_value((key, value)))
>     else:
>       self.output(WindowedValue((key, value), *windows[0].end*, windows))
>
> Here when we generate the window value, the timestamp is assigned to the
> closed interval end (45.000) as opposed to open interval end (44.999)
>
> Clearly the "end of window" definition is a bit inconsistent across Python
> and Java. I'm yet to try this on other runner so not sure whether this is
> only an issue for our Samza runner. I tend to think this is a bug but would
> like to confirm with you. If this has not been an issue for other runners,
> where did I potentially do wrong.
>
> Right now I can bypass this issue by directly using GroupByKey (instead of
> any combiners) and do reducing on my own. But it would be much more
> convenient for us to use combiners.
>
> Any advice would be extremely helpful. Thank you in advance!
>
> -Hai
>