You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by "Sandeep Kathula (Jira)" <ji...@apache.org> on 2020/02/25 19:11:00 UTC

[jira] [Created] (SAMZA-2469) Out of order events throwing Duplicate key registration with same timer exception

Sandeep Kathula created SAMZA-2469:
--------------------------------------

             Summary: Out of order events throwing Duplicate key registration with same timer exception
                 Key: SAMZA-2469
                 URL: https://issues.apache.org/jira/browse/SAMZA-2469
             Project: Samza
          Issue Type: Bug
            Reporter: Sandeep Kathula


When using sessions with certain gap and with triggers in beam code with samza runner, for out of order events we are seeing exception:
{code:java}
 Caused by: java.lang.IllegalStateException: Duplicate key org.apache.beam.runners.samza.runtime.KeyedTimerData@196f343c registration for the same timerCaused by: java.lang.IllegalStateException: Duplicate key org.apache.beam.runners.samza.runtime.KeyedTimerData@196f343c registration for the same timer at com.google.common.base.Preconditions.checkState(Preconditions.java:459) at org.apache.samza.task.EpochTimeScheduler.setTimer(EpochTimeScheduler.java:62) at org.apache.samza.scheduler.CallbackSchedulerImpl.scheduleCallback(CallbackSchedulerImpl.java:37) at org.apache.samza.operators.impl.OperatorImpl$1.schedule(OperatorImpl.java:446) at org.apache.beam.runners.samza.runtime.SamzaTimerInternalsFactory$SamzaTimerInternals.setTimer(SamzaTimerInternalsFactory.java:214) at org.apache.beam.runners.core.ReduceFnContextFactory$TimersImpl.setTimer(ReduceFnContextFactory.java:135) at org.apache.beam.runners.core.triggers.TriggerStateMachineContextFactory$TriggerTimers.setTimer(TriggerStateMachineContextFactory.java:188) at org.apache.beam.runners.core.triggers.TriggerStateMachineContextFactory$TriggerTimers.setTimer(TriggerStateMachineContextFactory.java:188) at org.apache.beam.runners.core.triggers.TriggerStateMachineContextFactory$TriggerTimers.setTimer(TriggerStateMachineContextFactory.java:188) at org.apache.beam.runners.core.triggers.TriggerStateMachineContextFactory$OnMergeContextImpl.setTimer(TriggerStateMachineContextFactory.java:478) at org.apache.beam.runners.core.triggers.AfterDelayFromFirstElementStateMachine.onMerge(AfterDelayFromFirstElementStateMachine.java:210) at org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine.invokeOnMerge(ExecutableTriggerStateMachine.java:129) at org.apache.beam.runners.core.triggers.RepeatedlyStateMachine.onMerge(RepeatedlyStateMachine.java:62) at org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine.invokeOnMerge(ExecutableTriggerStateMachine.java:129) at org.apache.beam.runners.core.triggers.TriggerStateMachineRunner.onMerge(TriggerStateMachineRunner.java:172) at org.apache.beam.runners.core.ReduceFnRunner$OnMergeCallback.onMerge(ReduceFnRunner.java:510) at org.apache.beam.runners.core.MergingActiveWindowSet$MergeContextImpl.recordMerges(MergingActiveWindowSet.java:211) at org.apache.beam.runners.core.MergingActiveWindowSet.merge(MergingActiveWindowSet.java:229) at org.apache.beam.runners.core.ReduceFnRunner.mergeWindows(ReduceFnRunner.java:436) at org.apache.beam.runners.core.ReduceFnRunner.processElements(ReduceFnRunner.java:329) at org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn.processElement(GroupAlsoByWindowViaWindowSetNewDoFn.java:136) ERROR : Got callback failure for task Partition 0{code}
 
We are getting duplicate key registered with same timer exception from this specific line
 
[https://github.com/apache/samza/blob/master/samza-core/src/main/java/org/apache/samza/scheduler/EpochTimeScheduler.java#L66]
 


*window and trigger are as follows:*
 ** 
PCollection.apply("UserSessions", Window.<KV<String, Event>>into(
        Sessions.< Event >withGapDuration(Duration.standardMinutes(30)))
        .triggering(Repeatedly
                        .forever(AfterProcessingTime
                                .pastFirstElementInPane()
                                .plusDelayOf(Duration.standardSeconds(60)))
                //.orFinally(AfterWatermark.pastEndOfWindow())
        )
        .discardingFiredPanes()
        .withAllowedLateness(Duration.standardDays(200))
)
Before window, we are changing the processing timestamp to event timestamp which we are getting from the event [WithTimestamp.of(Event::getTimestamp)].
 
 
The exception is thrown in two cases that happen within context of one trigger:
  1.  Two events for same key have same timestamp or
  2.  Events come out of order
 
 
 
For example, consider 7 events from Kafka: 
 
Event ID    Event Timestamp
 
Event 1 -   2019-12-06T01:00:00.000Z
 
Event 2 -   2019-12-06T01:05:00.000Z
 
Event 3 -   2019-12-06T01:10:00.000Z
 
Event 4 -   2019-12-06T01:15:00.000Z
 
Event 5 -   2019-12-06T01:14:00.000Z
 
Event 6 -   2019-12-06T01:20:00.000Z
 
 
 Events 1 to 4 came in correct order. Event 5 is out of order.
 
 For each and every event setTimer function is called.
[https://github.com/apache/samza/blob/master/samza-core/src/main/java/org/apache/samza/scheduler/EpochTimeScheduler.java#L60]
 
 
For the 5th event (out of order) we are seeing duplicate key registered with same exception at 
[https://github.com/apache/samza/blob/master/samza-core/src/main/java/org/apache/samza/scheduler/EpochTimeScheduler.java#L66]
 
When I traced back calls, I see that keys generated for the event 4 and event 5 are exactly same.
 
Keys are getting created at
 
[https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java#L135]
 
 
For events 4 and 5, within
[https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java#L135]
 
 
Based on what I understand, timestamp and timeDomain are exactly same. Timestamp is system timestamp when next trigger has to execute. TimeDomain is from key which we are grouping by in the above code and from window of the event:
  
Windows for the events are as follows:
 
Event 1 - Window([2019-12-06T01:00:00.000Z..2019-12-06T01:30:00.000Z])
 
Event 2 - Window([2019-12-06T01:00:00.000Z..2019-12-06T01:35:00.000Z])
 
Event 3 - Window([2019-12-06T01:00:00.000Z..2019-12-06T01:40:00.000Z])
 
Event 4 - Window([2019-12-06T01:00:00.000Z..2019-12-06T01:45:00.000Z])
 
Event 5 - Window([2019-12-06T01:00:00.000Z..2019-12-06T01:45:00.000Z])
 
 
 
Windows are also exactly same for both event 4 and event 5 and it completely makes sense because the event with largest timestamp is 2019-12-06T01:15:00.000Z till event 5 and we get window end as 2019-12-06T01:45:00.000Z which is largest timestamp (is 2019-12-06T01:15:00.000Z) plus 30 minutes(we are having sessions of 30 minutes gap from above code).
 
 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)