You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Beam JIRA Bot (Jira)" <ji...@apache.org> on 2020/08/10 17:08:13 UTC

[jira] [Commented] (BEAM-7995) IllegalStateException: TimestampCombiner moved element from to earlier time in Python

    [ https://issues.apache.org/jira/browse/BEAM-7995?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17174673#comment-17174673 ] 

Beam JIRA Bot commented on BEAM-7995:
-------------------------------------

This issue is P2 but has been unassigned without any comment for 60 days so it has been labeled "stale-P2". If this issue is still affecting you, we care! Please comment and remove the label. Otherwise, in 14 days the issue will be moved to P3.

Please see https://beam.apache.org/contribute/jira-priorities/ for a detailed explanation of what these priorities mean.


> IllegalStateException: TimestampCombiner moved element from to earlier time in Python
> -------------------------------------------------------------------------------------
>
>                 Key: BEAM-7995
>                 URL: https://issues.apache.org/jira/browse/BEAM-7995
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-py-core
>            Reporter: Hai Lu
>            Priority: P2
>              Labels: stale-P2
>          Time Spent: 2h 50m
>  Remaining Estimate: 0h
>
> I'm looking into a bug I found internally when using Beam portable API (Python) on our own Samza runner. 
>  
> The pipeline looks something like this:
>  
>     (p
>      | 'read' >> ReadFromKafka(cluster="tracking", topic="PageViewEvent")
>      | 'transform' >> beam.Map(lambda event: process_event(event))
>      | 'window' >> beam.WindowInto(FixedWindows(15))
>      | 'group' >> *beam.CombinePerKey(beam.combiners.CountCombineFn())*
>      ...
>  
> The problem comes from the combiners which cause the following exception on Java side:
>  
> Caused by: java.lang.IllegalStateException: TimestampCombiner moved element from 2019-08-15T03:34:*45.000*Z to earlier time 2019-08-15T03:34:*44.999*Z for window [2019-08-15T03:34:30.000Z..2019-08-15T03:34:*45.000*Z)
>     at org.apache.beam.runners.core.WatermarkHold.shift(WatermarkHold.java:117)
>     at org.apache.beam.runners.core.WatermarkHold.addElementHold(WatermarkHold.java:154)
>     at org.apache.beam.runners.core.WatermarkHold.addHolds(WatermarkHold.java:98)
>     at org.apache.beam.runners.core.ReduceFnRunner.processElement(ReduceFnRunner.java:605)
>     at org.apache.beam.runners.core.ReduceFnRunner.processElements(ReduceFnRunner.java:349)
>     at org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn.processElement(GroupAlsoByWindowViaWindowSetNewDoFn.java:136)
>  
> The exception happens here [https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java#L116] when we check the shifted timestamp to ensure it's before the timestamp.
>  
>     if (shifted.isBefore(timestamp)) {
>       throw new IllegalStateException(
>           String.format(
>               "TimestampCombiner moved element from %s to earlier time %s for window %s",
>               BoundedWindow.formatTimestamp(timestamp),
>               BoundedWindow.formatTimestamp(shifted),
>               window));
>     }
>  
> As you can see from the exception, the "shifted" is "XXX 44.999" while the "timestamp" is "XXX 45.000". The "44.999" is coming from [TimestampCombiner.END_OF_WINDOW|https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/TimestampCombiner.java#L116]:
>  
>     @Override
>     public Instant merge(BoundedWindow intoWindow, Iterable<? extends Instant> mergingTimestamps) {
>       return intoWindow.maxTimestamp();
>     }
>  
> where intoWindow.maxTimestamp() is:
>  
>   /** Returns the largest timestamp that can be included in this window. */
>   @Override
>   public Instant maxTimestamp() {
>     *// end not inclusive*
>     return *end.minus(1)*;
>   }
>  
> Hence, the "44.*999*". 
>  
> And the "45.000" comes from the Python side when the combiner output results as pre GBK operation: [operations.py#PGBKCVOperation#output_key|https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/worker/operations.py#L889]
>  
>     if windows is 0:
>       self.output(_globally_windowed_value.with_value((key, value)))
>     else:
>       self.output(WindowedValue((key, value), *windows[0].end*, windows))
>  
> Here when we generate the window value, the timestamp is assigned to the closed interval end (45.000) as opposed to open interval end (44.999)
>  
> Clearly the "end of window" definition is a bit inconsistent across Python and Java. I'm yet to try this on other runner so not sure whether this is only an issue for our Samza runner. I tend to think this is a bug but would like to confirm with you. If this has not been an issue for other runners, where did I potentially do wrong.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)