You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@samza.apache.org by "Kathula, Sandeep" <Sa...@intuit.com.INVALID> on 2020/02/19 19:54:13 UTC

Samza throwing exception: Duplicate key org.apache.beam.runners.samza.runtime.KeyedTimerData@5a5eb850 registration for the same timer

Hi,
   We are trying to build sessionization where we get clickstream hits from Kafka and generate sessions from the hits. We are using Apache Beam for our code and it runs on Samza runner. We have a PCollection<String, Event> where key is user id and value is clickstream hit. We are grouping by user id and calculating sessions.

We are using following windowing strategy:
PCollection.apply("UserSessions", Window.<KV<String, SegmentEvent>>into(
        Sessions.<SegmentEvent>withGapDuration(Duration.standardMinutes(30)))
        .triggering(Repeatedly
                        .forever(AfterProcessingTime
                                .pastFirstElementInPane()
                                .plusDelayOf(Duration.standardSeconds(60)))
        )
        .discardingFiredPanes()
        .withAllowedLateness(Duration.standardDays(200))
)

But events we are getting are out of order. So, we are getting timestamp from the hit and adding it as event timestamp in order to have it as part of correct session. We are using WithTimestamps.of()   for that.

We are saving intermediate state in Kafka topics. We are getting duplicate key registered for the same timer exception. When I tried with different scenarios when this issue is happening, we figured out that when events are coming out of order. For a user when a hit comes and later some hit of earlier timestamp comes then it is throwing duplicate key timer exception. It is writing all these events into intermediate Kafka topic from which duplicate key timer exception is being thrown. First out of order event is being written into this Kafka topic and very next moment this process is failing with duplicate key timer issue.

Stack trace:

ERROR o.a.b.r.samza.SamzaPipelineResult - org.apache.samza.SamzaException: Callback failed for task Partition 8, ssp SystemStreamPartition [kafka, cg-mint-session-take27-8c06b298-1091-4322-8da8-2f55d4f617d4-partition_by-gbk-8, 8], offset 825. org.apache.samza.SamzaException: org.apache.samza.SamzaException: Callback failed for task Partition 8, ssp SystemStreamPartition [kafka, cg-mint-session-take27-8c06b298-1091-4322-8da8-2f55d4f617d4-partition_by-gbk-8, 8], offset 825. at org.apache.samza.task.AsyncRunLoop.run(AsyncRunLoop.java:150) at org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:778) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.samza.SamzaException: Callback failed for task Partition 8, ssp SystemStreamPartition [kafka, cg-mint-session-take27-8c06b298-1091-4322-8da8-2f55d4f617d4-partition_by-gbk-8, 8], offset 825. at org.apache.samza.task.TaskCallbackImpl.failure(TaskCallbackImpl.java:89) at org.apache.samza.task.AsyncStreamTaskAdapter.process(AsyncStreamTaskAdapter.java:75) at org.apache.samza.task.AsyncStreamTaskAdapter.access$000(AsyncStreamTaskAdapter.java:33) at org.apache.samza.task.AsyncStreamTaskAdapter$1.run(AsyncStreamTaskAdapter.java:58) ... 5 common frames omitted Caused by: org.apache.beam.sdk.util.UserCodeException: java.lang.RuntimeException: org.apache.beam.sdk.util.UserCodeException: java.lang.IllegalStateException: Duplicate key org.apache.beam.runners.samza.runtime.KeyedTimerData@dffeb6cc registration for the same timer at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:34) at org.apache.beam.runners.samza.runtime.OpAdapter.apply(OpAdapter.java:96) at org.apache.beam.runners.samza.runtime.OpAdapter.apply(OpAdapter.java:37) at org.apache.samza.operators.impl.StreamOperatorImpl.handleMessage(StreamOperatorImpl.java:55) at org.apache.samza.operators.impl.OperatorImpl.onMessage(OperatorImpl.java:178) at org.apache.samza.operators.impl.OperatorImpl.lambda$null$1(OperatorImpl.java:194) at java.lang.Iterable.forEach(Iterable.java:75) at org.apache.samza.operators.impl.OperatorImpl.lambda$onMessage$2(OperatorImpl.java:193) at java.util.ArrayList.forEach(ArrayList.java:1257) at org.apache.samza.operators.impl.OperatorImpl.onMessage(OperatorImpl.java:192) at org.apache.samza.operators.impl.OperatorImpl.lambda$null$1(OperatorImpl.java:194) at java.lang.Iterable.forEach(Iterable.java:75) at org.apache.samza.operators.impl.OperatorImpl.lambda$onMessage$2(OperatorImpl.java:193) at java.util.ArrayList.forEach(ArrayList.java:1257) at org.apache.samza.operators.impl.OperatorImpl.onMessage(OperatorImpl.java:192) at org.apache.samza.operators.impl.OperatorImpl.lambda$null$1(OperatorImpl.java:194) at java.lang.Iterable.forEach(Iterable.java:75) at org.apache.samza.operators.impl.OperatorImpl.lambda$onMessage$2(OperatorImpl.java:193) at java.util.Collections$SingletonList.forEach(Collections.java:4822) at org.apache.samza.operators.impl.OperatorImpl.onMessage(OperatorImpl.java:192) at org.apache.samza.task.StreamOperatorTask.process(StreamOperatorTask.java:101) at org.apache.samza.task.AsyncStreamTaskAdapter.process(AsyncStreamTaskAdapter.java:72) ... 7 common frames omitted Caused by: java.lang.RuntimeException: org.apache.beam.sdk.util.UserCodeException: java.lang.IllegalStateException: Duplicate key org.apache.beam.runners.samza.runtime.KeyedTimerData@dffeb6cc registration for the same timer at org.apache.beam.runners.samza.metrics.DoFnRunnerWithMetrics.withMetrics(DoFnRunnerWithMetrics.java:84) at org.apache.beam.runners.samza.metrics.DoFnRunnerWithMetrics.processElement(DoFnRunnerWithMetrics.java:55) at org.apache.beam.runners.samza.runtime.GroupByKeyOp.processElement(GroupByKeyOp.java:191) Caused by: org.apache.beam.sdk.util.UserCodeException: java.lang.IllegalStateException: Duplicate key org.apache.beam.runners.samza.runtime.KeyedTimerData@dffeb6cc registration for the same timer at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:34) at org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn$DoFnInvoker.invokeProcessElement(Unknown Source) at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:214) at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:179) at org.apache.beam.runners.samza.metrics.DoFnRunnerWithMetrics.lambda$processElement$1(DoFnRunnerWithMetrics.java:55) at org.apache.beam.runners.samza.metrics.DoFnRunnerWithMetrics.lambda$withMetrics$4(DoFnRunnerWithMetrics.java:80) at org.apache.beam.runners.samza.metrics.FnWithMetricsWrapper.wrap(FnWithMetricsWrapper.java:42) at org.apache.beam.runners.samza.metrics.DoFnRunnerWithMetrics.withMetrics(DoFnRunnerWithMetrics.java:78) at org.apache.beam.runners.samza.metrics.DoFnRunnerWithMetrics.processElement(DoFnRunnerWithMetrics.java:55) at org.apache.beam.runners.samza.runtime.GroupByKeyOp.processElement(GroupByKeyOp.java:191) at org.apache.beam.runners.samza.runtime.OpAdapter.apply(OpAdapter.java:82) at org.apache.beam.runners.samza.runtime.OpAdapter.apply(OpAdapter.java:37) at org.apache.samza.operators.impl.StreamOperatorImpl.handleMessage(StreamOperatorImpl.java:55) at org.apache.samza.operators.impl.OperatorImpl.onMessage(OperatorImpl.java:178) at org.apache.samza.operators.impl.OperatorImpl.lambda$null$1(OperatorImpl.java:194) at java.lang.Iterable.forEach(Iterable.java:75) at org.apache.samza.operators.impl.OperatorImpl.lambda$onMessage$2(OperatorImpl.java:193) at java.util.ArrayList.forEach(ArrayList.java:1257) at org.apache.samza.operators.impl.OperatorImpl.onMessage(OperatorImpl.java:192) at org.apache.samza.operators.impl.OperatorImpl.lambda$null$1(OperatorImpl.java:194) at java.lang.Iterable.forEach(Iterable.java:75) at org.apache.samza.operators.impl.OperatorImpl.lambda$onMessage$2(OperatorImpl.java:193) at java.util.ArrayList.forEach(ArrayList.java:1257) at org.apache.samza.operators.impl.OperatorImpl.onMessage(OperatorImpl.java:192) at org.apache.samza.operators.impl.OperatorImpl.lambda$null$1(OperatorImpl.java:194) at java.lang.Iterable.forEach(Iterable.java:75) at org.apache.samza.operators.impl.OperatorImpl.lambda$onMessage$2(OperatorImpl.java:193) at java.util.Collections$SingletonList.forEach(Collections.java:4822) at org.apache.samza.operators.impl.OperatorImpl.onMessage(OperatorImpl.java:192) at org.apache.samza.task.StreamOperatorTask.process(StreamOperatorTask.java:101) at org.apache.samza.task.AsyncStreamTaskAdapter.process(AsyncStreamTaskAdapter.java:72) at org.apache.samza.task.AsyncStreamTaskAdapter.access$000(AsyncStreamTaskAdapter.java:33) at org.apache.samza.task.AsyncStreamTaskAdapter$1.run(AsyncStreamTaskAdapter.java:58) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.IllegalStateException: Duplicate key org.apache.beam.runners.samza.runtime.KeyedTimerData@dffeb6cc registration for the same timer at com.google.common.base.Preconditions.checkState(Preconditions.java:459) at org.apache.samza.task.EpochTimeScheduler.setTimer(EpochTimeScheduler.java:62) at org.apache.samza.scheduler.CallbackSchedulerImpl.scheduleCallback(CallbackSchedulerImpl.java:37) at org.apache.samza.operators.impl.OperatorImpl$1.schedule(OperatorImpl.java:446) at org.apache.beam.runners.samza.runtime.SamzaTimerInternalsFactory$SamzaTimerInternals.setTimer(SamzaTimerInternalsFactory.java:214) at org.apache.beam.runners.core.ReduceFnContextFactory$TimersImpl.setTimer(ReduceFnContextFactory.java:135) at org.apache.beam.runners.core.triggers.TriggerStateMachineContextFactory$TriggerTimers.setTimer(TriggerStateMachineContextFactory.java:188) at org.apache.beam.runners.core.triggers.TriggerStateMachineContextFactory$TriggerTimers.setTimer(TriggerStateMachineContextFactory.java:188) at org.apache.beam.runners.core.triggers.TriggerStateMachineContextFactory$TriggerTimers.setTimer(TriggerStateMachineContextFactory.java:188) at org.apache.beam.runners.core.triggers.TriggerStateMachineContextFactory$OnMergeContextImpl.setTimer(TriggerStateMachineContextFactory.java:478) at org.apache.beam.runners.core.triggers.AfterDelayFromFirstElementStateMachine.onMerge(AfterDelayFromFirstElementStateMachine.java:210) at org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine.invokeOnMerge(ExecutableTriggerStateMachine.java:129) at org.apache.beam.runners.core.triggers.RepeatedlyStateMachine.onMerge(RepeatedlyStateMachine.java:62) at org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine.invokeOnMerge(ExecutableTriggerStateMachine.java:129) at org.apache.beam.runners.core.triggers.TriggerStateMachineRunner.onMerge(TriggerStateMachineRunner.java:172) at org.apache.beam.runners.core.ReduceFnRunner$OnMergeCallback.onMerge(ReduceFnRunner.java:510) at org.apache.beam.runners.core.MergingActiveWindowSet$MergeContextImpl.recordMerges(MergingActiveWindowSet.java:211) at org.apache.beam.runners.core.MergingActiveWindowSet.merge(MergingActiveWindowSet.java:229) at org.apache.beam.runners.core.ReduceFnRunner.mergeWindows(ReduceFnRunner.java:436) at org.apache.beam.runners.core.ReduceFnRunner.processElements(ReduceFnRunner.java:329) at org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn.processElement(GroupAlsoByWindowViaWindowSetNewDoFn.java:136)



Does anyone have suggestions or experienced similar issue previously?

Thanks,
Sandeep


Re: Samza throwing exception: Duplicate key org.apache.beam.runners.samza.runtime.KeyedTimerData@5a5eb850 registration for the same timer

Posted by "Kathula, Sandeep" <Sa...@intuit.com.INVALID>.
Hi Bharath,


Thanks for the reply. I have few observations from your email and wanted to make sure if we are talking about the same issue. I did put breakpoints inremoveReadyTimers method but our code is not even going to that point.



We are getting duplicate key registered with same timer exception from this specific line

 https://github.com/apache/samza/blob/master/samza-core/src/main/java/org/apache/samza/scheduler/EpochTimeScheduler.java#L66



I figured out exact scenario where we are seeing this issue.



Our window and trigger are as follows:





PCollection.apply("UserSessions", Window.<KV<String, Event>>into(
        Sessions.< Event >withGapDuration(Duration.standardMinutes(30)))
        .triggering(Repeatedly
                        .forever(AfterProcessingTime
                                .pastFirstElementInPane()
                                .plusDelayOf(Duration.standardSeconds(60)))
                //.orFinally(AfterWatermark.pastEndOfWindow())
        )
        .discardingFiredPanes()
        .withAllowedLateness(Duration.standardDays(200))
)



Before window, we are changing the processing timestamp to event timestamp which we are getting from the event [WithTimestamp.of(Event::getTimestamp)].



The exception is thrown in two cases that happen within context of one trigger:

  1.  Two events for same key have same timestamp or
  2.  Events come out of order



For example, consider 7 events from Kafka:



Event ID    Event Timestamp



Event 1 -   2019-12-06T01:00:00.000Z

Event 2 -   2019-12-06T01:05:00.000Z

Event 3 -   2019-12-06T01:10:00.000Z

Event 4 -   2019-12-06T01:15:00.000Z

Event 5 -   2019-12-06T01:14:00.000Z

Event 6 -   2019-12-06T01:20:00.000Z



Events 1 to 4 came in correct order. Event 5 is out of order.



For each and every event setTimer function is called.

https://github.com/apache/samza/blob/master/samza-core/src/main/java/org/apache/samza/scheduler/EpochTimeScheduler.java#L60





For the 5th event (out of order) we are seeing duplicate key registered with same exception at

https://github.com/apache/samza/blob/master/samza-core/src/main/java/org/apache/samza/scheduler/EpochTimeScheduler.java#L66



When I traced back calls, I see that keys generated for the event 4 and event 5 are exactly same.

Keys are getting created at

https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java#L135



For events 4 and 5, within

https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java#L135



Based on what I understand, timestamp and timeDomain are exactly same. Timestamp is system timestamp when next trigger has to execute. TimeDomain is from key which we are grouping by in the above code and from window of the event:



Windows for the events are as follows:



Event 1 - Window([2019-12-06T01:00:00.000Z..2019-12-06T01:30:00.000Z])

Event 2 - Window([2019-12-06T01:00:00.000Z..2019-12-06T01:35:00.000Z])

Event 3 - Window([2019-12-06T01:00:00.000Z..2019-12-06T01:40:00.000Z])

Event 4 - Window([2019-12-06T01:00:00.000Z..2019-12-06T01:45:00.000Z])

Event 5 - Window([2019-12-06T01:00:00.000Z..2019-12-06T01:45:00.000Z])



Windows are also exactly same for both event 4 and event 5 and it completely makes sense because the event with largest timestamp is 2019-12-06T01:15:00.000Z till event 5 and we get window end as 2019-12-06T01:45:00.000Z which is largest timestamp (is 2019-12-06T01:15:00.000Z) plus 30 minutes(we are having sessions of 30 minutes gap from above code).



On a whole it seems like Samza runner for beam is not correctly handling out of order events when we replace processing time with event time.



Just wanted to make sure if you are talking about the same issue and if not can we have a bug created?





Thanks,

Sandeep


________________________________
From: Bharath Kumara Subramanian <co...@gmail.com>
Sent: Thursday, February 20, 2020 2:29 PM
To: dev@samza.apache.org <de...@samza.apache.org>
Cc: Vora, Jainik <Ja...@intuit.com>; Upadhyay, Tapan <Ta...@intuit.com>
Subject: Re: Samza throwing exception: Duplicate key org.apache.beam.runners.samza.runtime.KeyedTimerData@5a5eb850 registration for the same timer

This email is from an external sender.


Hi Sandeep,

Thank you for reporting this issue. We noticed this problem during our
testing for upcoming Samza release.
You can find more details about the bug -
https://issues.apache.org/jira/browse/SAMZA-2463

We have a fix and should be included in our upcoming Samza 1.4 release that
is slotted for end of this month.
However, we also need to update beam Samza runner to pick up the fix which
give or take should take another 2 weeks including testing and verification.

Hopefully, we should have a fix by mid March.

Thanks,
Bharath

On Wed, Feb 19, 2020 at 11:54 AM Kathula, Sandeep
<Sa...@intuit.com.invalid> wrote:

> Hi,
>    We are trying to build sessionization where we get clickstream hits
> from Kafka and generate sessions from the hits. We are using Apache Beam
> for our code and it runs on Samza runner. We have a PCollection<String,
> Event> where key is user id and value is clickstream hit. We are grouping
> by user id and calculating sessions.
>
> We are using following windowing strategy:
> PCollection.apply("UserSessions", Window.<KV<String, SegmentEvent>>into(
>
> Sessions.<SegmentEvent>withGapDuration(Duration.standardMinutes(30)))
>         .triggering(Repeatedly
>                         .forever(AfterProcessingTime
>                                 .pastFirstElementInPane()
>                                 .plusDelayOf(Duration.standardSeconds(60)))
>         )
>         .discardingFiredPanes()
>         .withAllowedLateness(Duration.standardDays(200))
> )
>
> But events we are getting are out of order. So, we are getting timestamp
> from the hit and adding it as event timestamp in order to have it as part
> of correct session. We are using WithTimestamps.of()   for that.
>
> We are saving intermediate state in Kafka topics. We are getting duplicate
> key registered for the same timer exception. When I tried with different
> scenarios when this issue is happening, we figured out that when events are
> coming out of order. For a user when a hit comes and later some hit of
> earlier timestamp comes then it is throwing duplicate key timer exception.
> It is writing all these events into intermediate Kafka topic from which
> duplicate key timer exception is being thrown. First out of order event is
> being written into this Kafka topic and very next moment this process is
> failing with duplicate key timer issue.
>
> Stack trace:
>
> ERROR o.a.b.r.samza.SamzaPipelineResult - org.apache.samza.SamzaException:
> Callback failed for task Partition 8, ssp SystemStreamPartition [kafka,
> cg-mint-session-take27-8c06b298-1091-4322-8da8-2f55d4f617d4-partition_by-gbk-8,
> 8], offset 825. org.apache.samza.SamzaException:
> org.apache.samza.SamzaException: Callback failed for task Partition 8, ssp
> SystemStreamPartition [kafka,
> cg-mint-session-take27-8c06b298-1091-4322-8da8-2f55d4f617d4-partition_by-gbk-8,
> 8], offset 825. at
> org.apache.samza.task.AsyncRunLoop.run(AsyncRunLoop.java:150) at
> org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:778) at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at
> java.util.concurrent.FutureTask.run(FutureTask.java:266) at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748) Caused by:
> org.apache.samza.SamzaException: Callback failed for task Partition 8, ssp
> SystemStreamPartition [kafka,
> cg-mint-session-take27-8c06b298-1091-4322-8da8-2f55d4f617d4-partition_by-gbk-8,
> 8], offset 825. at
> org.apache.samza.task.TaskCallbackImpl.failure(TaskCallbackImpl.java:89) at
> org.apache.samza.task.AsyncStreamTaskAdapter.process(AsyncStreamTaskAdapter.java:75)
> at
> org.apache.samza.task.AsyncStreamTaskAdapter.access$000(AsyncStreamTaskAdapter.java:33)
> at
> org.apache.samza.task.AsyncStreamTaskAdapter$1.run(AsyncStreamTaskAdapter.java:58)
> ... 5 common frames omitted Caused by:
> org.apache.beam.sdk.util.UserCodeException: java.lang.RuntimeException:
> org.apache.beam.sdk.util.UserCodeException:
> java.lang.IllegalStateException: Duplicate key
> org.apache.beam.runners.samza.runtime.KeyedTimerData@dffeb6cc
> registration for the same timer at
> org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:34)
> at org.apache.beam.runners.samza.runtime.OpAdapter.apply(OpAdapter.java:96)
> at org.apache.beam.runners.samza.runtime.OpAdapter.apply(OpAdapter.java:37)
> at
> org.apache.samza.operators.impl.StreamOperatorImpl.handleMessage(StreamOperatorImpl.java:55)
> at
> org.apache.samza.operators.impl.OperatorImpl.onMessage(OperatorImpl.java:178)
> at
> org.apache.samza.operators.impl.OperatorImpl.lambda$null$1(OperatorImpl.java:194)
> at java.lang.Iterable.forEach(Iterable.java:75) at
> org.apache.samza.operators.impl.OperatorImpl.lambda$onMessage$2(OperatorImpl.java:193)
> at java.util.ArrayList.forEach(ArrayList.java:1257) at
> org.apache.samza.operators.impl.OperatorImpl.onMessage(OperatorImpl.java:192)
> at
> org.apache.samza.operators.impl.OperatorImpl.lambda$null$1(OperatorImpl.java:194)
> at java.lang.Iterable.forEach(Iterable.java:75) at
> org.apache.samza.operators.impl.OperatorImpl.lambda$onMessage$2(OperatorImpl.java:193)
> at java.util.ArrayList.forEach(ArrayList.java:1257) at
> org.apache.samza.operators.impl.OperatorImpl.onMessage(OperatorImpl.java:192)
> at
> org.apache.samza.operators.impl.OperatorImpl.lambda$null$1(OperatorImpl.java:194)
> at java.lang.Iterable.forEach(Iterable.java:75) at
> org.apache.samza.operators.impl.OperatorImpl.lambda$onMessage$2(OperatorImpl.java:193)
> at java.util.Collections$SingletonList.forEach(Collections.java:4822) at
> org.apache.samza.operators.impl.OperatorImpl.onMessage(OperatorImpl.java:192)
> at
> org.apache.samza.task.StreamOperatorTask.process(StreamOperatorTask.java:101)
> at
> org.apache.samza.task.AsyncStreamTaskAdapter.process(AsyncStreamTaskAdapter.java:72)
> ... 7 common frames omitted Caused by: java.lang.RuntimeException:
> org.apache.beam.sdk.util.UserCodeException:
> java.lang.IllegalStateException: Duplicate key
> org.apache.beam.runners.samza.runtime.KeyedTimerData@dffeb6cc
> registration for the same timer at
> org.apache.beam.runners.samza.metrics.DoFnRunnerWithMetrics.withMetrics(DoFnRunnerWithMetrics.java:84)
> at
> org.apache.beam.runners.samza.metrics.DoFnRunnerWithMetrics.processElement(DoFnRunnerWithMetrics.java:55)
> at
> org.apache.beam.runners.samza.runtime.GroupByKeyOp.processElement(GroupByKeyOp.java:191)
> Caused by: org.apache.beam.sdk.util.UserCodeException:
> java.lang.IllegalStateException: Duplicate key
> org.apache.beam.runners.samza.runtime.KeyedTimerData@dffeb6cc
> registration for the same timer at
> org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:34)
> at
> org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn$DoFnInvoker.invokeProcessElement(Unknown
> Source) at
> org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:214)
> at
> org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:179)
> at
> org.apache.beam.runners.samza.metrics.DoFnRunnerWithMetrics.lambda$processElement$1(DoFnRunnerWithMetrics.java:55)
> at
> org.apache.beam.runners.samza.metrics.DoFnRunnerWithMetrics.lambda$withMetrics$4(DoFnRunnerWithMetrics.java:80)
> at
> org.apache.beam.runners.samza.metrics.FnWithMetricsWrapper.wrap(FnWithMetricsWrapper.java:42)
> at
> org.apache.beam.runners.samza.metrics.DoFnRunnerWithMetrics.withMetrics(DoFnRunnerWithMetrics.java:78)
> at
> org.apache.beam.runners.samza.metrics.DoFnRunnerWithMetrics.processElement(DoFnRunnerWithMetrics.java:55)
> at
> org.apache.beam.runners.samza.runtime.GroupByKeyOp.processElement(GroupByKeyOp.java:191)
> at org.apache.beam.runners.samza.runtime.OpAdapter.apply(OpAdapter.java:82)
> at org.apache.beam.runners.samza.runtime.OpAdapter.apply(OpAdapter.java:37)
> at
> org.apache.samza.operators.impl.StreamOperatorImpl.handleMessage(StreamOperatorImpl.java:55)
> at
> org.apache.samza.operators.impl.OperatorImpl.onMessage(OperatorImpl.java:178)
> at
> org.apache.samza.operators.impl.OperatorImpl.lambda$null$1(OperatorImpl.java:194)
> at java.lang.Iterable.forEach(Iterable.java:75) at
> org.apache.samza.operators.impl.OperatorImpl.lambda$onMessage$2(OperatorImpl.java:193)
> at java.util.ArrayList.forEach(ArrayList.java:1257) at
> org.apache.samza.operators.impl.OperatorImpl.onMessage(OperatorImpl.java:192)
> at
> org.apache.samza.operators.impl.OperatorImpl.lambda$null$1(OperatorImpl.java:194)
> at java.lang.Iterable.forEach(Iterable.java:75) at
> org.apache.samza.operators.impl.OperatorImpl.lambda$onMessage$2(OperatorImpl.java:193)
> at java.util.ArrayList.forEach(ArrayList.java:1257) at
> org.apache.samza.operators.impl.OperatorImpl.onMessage(OperatorImpl.java:192)
> at
> org.apache.samza.operators.impl.OperatorImpl.lambda$null$1(OperatorImpl.java:194)
> at java.lang.Iterable.forEach(Iterable.java:75) at
> org.apache.samza.operators.impl.OperatorImpl.lambda$onMessage$2(OperatorImpl.java:193)
> at java.util.Collections$SingletonList.forEach(Collections.java:4822) at
> org.apache.samza.operators.impl.OperatorImpl.onMessage(OperatorImpl.java:192)
> at
> org.apache.samza.task.StreamOperatorTask.process(StreamOperatorTask.java:101)
> at
> org.apache.samza.task.AsyncStreamTaskAdapter.process(AsyncStreamTaskAdapter.java:72)
> at
> org.apache.samza.task.AsyncStreamTaskAdapter.access$000(AsyncStreamTaskAdapter.java:33)
> at
> org.apache.samza.task.AsyncStreamTaskAdapter$1.run(AsyncStreamTaskAdapter.java:58)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266) at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748) Caused by:
> java.lang.IllegalStateException: Duplicate key
> org.apache.beam.runners.samza.runtime.KeyedTimerData@dffeb6cc
> registration for the same timer at
> com.google.common.base.Preconditions.checkState(Preconditions.java:459) at
> org.apache.samza.task.EpochTimeScheduler.setTimer(EpochTimeScheduler.java:62)
> at
> org.apache.samza.scheduler.CallbackSchedulerImpl.scheduleCallback(CallbackSchedulerImpl.java:37)
> at
> org.apache.samza.operators.impl.OperatorImpl$1.schedule(OperatorImpl.java:446)
> at
> org.apache.beam.runners.samza.runtime.SamzaTimerInternalsFactory$SamzaTimerInternals.setTimer(SamzaTimerInternalsFactory.java:214)
> at
> org.apache.beam.runners.core.ReduceFnContextFactory$TimersImpl.setTimer(ReduceFnContextFactory.java:135)
> at
> org.apache.beam.runners.core.triggers.TriggerStateMachineContextFactory$TriggerTimers.setTimer(TriggerStateMachineContextFactory.java:188)
> at
> org.apache.beam.runners.core.triggers.TriggerStateMachineContextFactory$TriggerTimers.setTimer(TriggerStateMachineContextFactory.java:188)
> at
> org.apache.beam.runners.core.triggers.TriggerStateMachineContextFactory$TriggerTimers.setTimer(TriggerStateMachineContextFactory.java:188)
> at
> org.apache.beam.runners.core.triggers.TriggerStateMachineContextFactory$OnMergeContextImpl.setTimer(TriggerStateMachineContextFactory.java:478)
> at
> org.apache.beam.runners.core.triggers.AfterDelayFromFirstElementStateMachine.onMerge(AfterDelayFromFirstElementStateMachine.java:210)
> at
> org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine.invokeOnMerge(ExecutableTriggerStateMachine.java:129)
> at
> org.apache.beam.runners.core.triggers.RepeatedlyStateMachine.onMerge(RepeatedlyStateMachine.java:62)
> at
> org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine.invokeOnMerge(ExecutableTriggerStateMachine.java:129)
> at
> org.apache.beam.runners.core.triggers.TriggerStateMachineRunner.onMerge(TriggerStateMachineRunner.java:172)
> at
> org.apache.beam.runners.core.ReduceFnRunner$OnMergeCallback.onMerge(ReduceFnRunner.java:510)
> at
> org.apache.beam.runners.core.MergingActiveWindowSet$MergeContextImpl.recordMerges(MergingActiveWindowSet.java:211)
> at
> org.apache.beam.runners.core.MergingActiveWindowSet.merge(MergingActiveWindowSet.java:229)
> at
> org.apache.beam.runners.core.ReduceFnRunner.mergeWindows(ReduceFnRunner.java:436)
> at
> org.apache.beam.runners.core.ReduceFnRunner.processElements(ReduceFnRunner.java:329)
> at
> org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn.processElement(GroupAlsoByWindowViaWindowSetNewDoFn.java:136)
>
>
>
> Does anyone have suggestions or experienced similar issue previously?
>
> Thanks,
> Sandeep
>
>

Re: Samza throwing exception: Duplicate key org.apache.beam.runners.samza.runtime.KeyedTimerData@5a5eb850 registration for the same timer

Posted by Bharath Kumara Subramanian <co...@gmail.com>.
Hi Sandeep,

Thank you for reporting this issue. We noticed this problem during our
testing for upcoming Samza release.
You can find more details about the bug -
https://issues.apache.org/jira/browse/SAMZA-2463

We have a fix and should be included in our upcoming Samza 1.4 release that
is slotted for end of this month.
However, we also need to update beam Samza runner to pick up the fix which
give or take should take another 2 weeks including testing and verification.

Hopefully, we should have a fix by mid March.

Thanks,
Bharath

On Wed, Feb 19, 2020 at 11:54 AM Kathula, Sandeep
<Sa...@intuit.com.invalid> wrote:

> Hi,
>    We are trying to build sessionization where we get clickstream hits
> from Kafka and generate sessions from the hits. We are using Apache Beam
> for our code and it runs on Samza runner. We have a PCollection<String,
> Event> where key is user id and value is clickstream hit. We are grouping
> by user id and calculating sessions.
>
> We are using following windowing strategy:
> PCollection.apply("UserSessions", Window.<KV<String, SegmentEvent>>into(
>
> Sessions.<SegmentEvent>withGapDuration(Duration.standardMinutes(30)))
>         .triggering(Repeatedly
>                         .forever(AfterProcessingTime
>                                 .pastFirstElementInPane()
>                                 .plusDelayOf(Duration.standardSeconds(60)))
>         )
>         .discardingFiredPanes()
>         .withAllowedLateness(Duration.standardDays(200))
> )
>
> But events we are getting are out of order. So, we are getting timestamp
> from the hit and adding it as event timestamp in order to have it as part
> of correct session. We are using WithTimestamps.of()   for that.
>
> We are saving intermediate state in Kafka topics. We are getting duplicate
> key registered for the same timer exception. When I tried with different
> scenarios when this issue is happening, we figured out that when events are
> coming out of order. For a user when a hit comes and later some hit of
> earlier timestamp comes then it is throwing duplicate key timer exception.
> It is writing all these events into intermediate Kafka topic from which
> duplicate key timer exception is being thrown. First out of order event is
> being written into this Kafka topic and very next moment this process is
> failing with duplicate key timer issue.
>
> Stack trace:
>
> ERROR o.a.b.r.samza.SamzaPipelineResult - org.apache.samza.SamzaException:
> Callback failed for task Partition 8, ssp SystemStreamPartition [kafka,
> cg-mint-session-take27-8c06b298-1091-4322-8da8-2f55d4f617d4-partition_by-gbk-8,
> 8], offset 825. org.apache.samza.SamzaException:
> org.apache.samza.SamzaException: Callback failed for task Partition 8, ssp
> SystemStreamPartition [kafka,
> cg-mint-session-take27-8c06b298-1091-4322-8da8-2f55d4f617d4-partition_by-gbk-8,
> 8], offset 825. at
> org.apache.samza.task.AsyncRunLoop.run(AsyncRunLoop.java:150) at
> org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:778) at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at
> java.util.concurrent.FutureTask.run(FutureTask.java:266) at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748) Caused by:
> org.apache.samza.SamzaException: Callback failed for task Partition 8, ssp
> SystemStreamPartition [kafka,
> cg-mint-session-take27-8c06b298-1091-4322-8da8-2f55d4f617d4-partition_by-gbk-8,
> 8], offset 825. at
> org.apache.samza.task.TaskCallbackImpl.failure(TaskCallbackImpl.java:89) at
> org.apache.samza.task.AsyncStreamTaskAdapter.process(AsyncStreamTaskAdapter.java:75)
> at
> org.apache.samza.task.AsyncStreamTaskAdapter.access$000(AsyncStreamTaskAdapter.java:33)
> at
> org.apache.samza.task.AsyncStreamTaskAdapter$1.run(AsyncStreamTaskAdapter.java:58)
> ... 5 common frames omitted Caused by:
> org.apache.beam.sdk.util.UserCodeException: java.lang.RuntimeException:
> org.apache.beam.sdk.util.UserCodeException:
> java.lang.IllegalStateException: Duplicate key
> org.apache.beam.runners.samza.runtime.KeyedTimerData@dffeb6cc
> registration for the same timer at
> org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:34)
> at org.apache.beam.runners.samza.runtime.OpAdapter.apply(OpAdapter.java:96)
> at org.apache.beam.runners.samza.runtime.OpAdapter.apply(OpAdapter.java:37)
> at
> org.apache.samza.operators.impl.StreamOperatorImpl.handleMessage(StreamOperatorImpl.java:55)
> at
> org.apache.samza.operators.impl.OperatorImpl.onMessage(OperatorImpl.java:178)
> at
> org.apache.samza.operators.impl.OperatorImpl.lambda$null$1(OperatorImpl.java:194)
> at java.lang.Iterable.forEach(Iterable.java:75) at
> org.apache.samza.operators.impl.OperatorImpl.lambda$onMessage$2(OperatorImpl.java:193)
> at java.util.ArrayList.forEach(ArrayList.java:1257) at
> org.apache.samza.operators.impl.OperatorImpl.onMessage(OperatorImpl.java:192)
> at
> org.apache.samza.operators.impl.OperatorImpl.lambda$null$1(OperatorImpl.java:194)
> at java.lang.Iterable.forEach(Iterable.java:75) at
> org.apache.samza.operators.impl.OperatorImpl.lambda$onMessage$2(OperatorImpl.java:193)
> at java.util.ArrayList.forEach(ArrayList.java:1257) at
> org.apache.samza.operators.impl.OperatorImpl.onMessage(OperatorImpl.java:192)
> at
> org.apache.samza.operators.impl.OperatorImpl.lambda$null$1(OperatorImpl.java:194)
> at java.lang.Iterable.forEach(Iterable.java:75) at
> org.apache.samza.operators.impl.OperatorImpl.lambda$onMessage$2(OperatorImpl.java:193)
> at java.util.Collections$SingletonList.forEach(Collections.java:4822) at
> org.apache.samza.operators.impl.OperatorImpl.onMessage(OperatorImpl.java:192)
> at
> org.apache.samza.task.StreamOperatorTask.process(StreamOperatorTask.java:101)
> at
> org.apache.samza.task.AsyncStreamTaskAdapter.process(AsyncStreamTaskAdapter.java:72)
> ... 7 common frames omitted Caused by: java.lang.RuntimeException:
> org.apache.beam.sdk.util.UserCodeException:
> java.lang.IllegalStateException: Duplicate key
> org.apache.beam.runners.samza.runtime.KeyedTimerData@dffeb6cc
> registration for the same timer at
> org.apache.beam.runners.samza.metrics.DoFnRunnerWithMetrics.withMetrics(DoFnRunnerWithMetrics.java:84)
> at
> org.apache.beam.runners.samza.metrics.DoFnRunnerWithMetrics.processElement(DoFnRunnerWithMetrics.java:55)
> at
> org.apache.beam.runners.samza.runtime.GroupByKeyOp.processElement(GroupByKeyOp.java:191)
> Caused by: org.apache.beam.sdk.util.UserCodeException:
> java.lang.IllegalStateException: Duplicate key
> org.apache.beam.runners.samza.runtime.KeyedTimerData@dffeb6cc
> registration for the same timer at
> org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:34)
> at
> org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn$DoFnInvoker.invokeProcessElement(Unknown
> Source) at
> org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:214)
> at
> org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:179)
> at
> org.apache.beam.runners.samza.metrics.DoFnRunnerWithMetrics.lambda$processElement$1(DoFnRunnerWithMetrics.java:55)
> at
> org.apache.beam.runners.samza.metrics.DoFnRunnerWithMetrics.lambda$withMetrics$4(DoFnRunnerWithMetrics.java:80)
> at
> org.apache.beam.runners.samza.metrics.FnWithMetricsWrapper.wrap(FnWithMetricsWrapper.java:42)
> at
> org.apache.beam.runners.samza.metrics.DoFnRunnerWithMetrics.withMetrics(DoFnRunnerWithMetrics.java:78)
> at
> org.apache.beam.runners.samza.metrics.DoFnRunnerWithMetrics.processElement(DoFnRunnerWithMetrics.java:55)
> at
> org.apache.beam.runners.samza.runtime.GroupByKeyOp.processElement(GroupByKeyOp.java:191)
> at org.apache.beam.runners.samza.runtime.OpAdapter.apply(OpAdapter.java:82)
> at org.apache.beam.runners.samza.runtime.OpAdapter.apply(OpAdapter.java:37)
> at
> org.apache.samza.operators.impl.StreamOperatorImpl.handleMessage(StreamOperatorImpl.java:55)
> at
> org.apache.samza.operators.impl.OperatorImpl.onMessage(OperatorImpl.java:178)
> at
> org.apache.samza.operators.impl.OperatorImpl.lambda$null$1(OperatorImpl.java:194)
> at java.lang.Iterable.forEach(Iterable.java:75) at
> org.apache.samza.operators.impl.OperatorImpl.lambda$onMessage$2(OperatorImpl.java:193)
> at java.util.ArrayList.forEach(ArrayList.java:1257) at
> org.apache.samza.operators.impl.OperatorImpl.onMessage(OperatorImpl.java:192)
> at
> org.apache.samza.operators.impl.OperatorImpl.lambda$null$1(OperatorImpl.java:194)
> at java.lang.Iterable.forEach(Iterable.java:75) at
> org.apache.samza.operators.impl.OperatorImpl.lambda$onMessage$2(OperatorImpl.java:193)
> at java.util.ArrayList.forEach(ArrayList.java:1257) at
> org.apache.samza.operators.impl.OperatorImpl.onMessage(OperatorImpl.java:192)
> at
> org.apache.samza.operators.impl.OperatorImpl.lambda$null$1(OperatorImpl.java:194)
> at java.lang.Iterable.forEach(Iterable.java:75) at
> org.apache.samza.operators.impl.OperatorImpl.lambda$onMessage$2(OperatorImpl.java:193)
> at java.util.Collections$SingletonList.forEach(Collections.java:4822) at
> org.apache.samza.operators.impl.OperatorImpl.onMessage(OperatorImpl.java:192)
> at
> org.apache.samza.task.StreamOperatorTask.process(StreamOperatorTask.java:101)
> at
> org.apache.samza.task.AsyncStreamTaskAdapter.process(AsyncStreamTaskAdapter.java:72)
> at
> org.apache.samza.task.AsyncStreamTaskAdapter.access$000(AsyncStreamTaskAdapter.java:33)
> at
> org.apache.samza.task.AsyncStreamTaskAdapter$1.run(AsyncStreamTaskAdapter.java:58)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266) at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748) Caused by:
> java.lang.IllegalStateException: Duplicate key
> org.apache.beam.runners.samza.runtime.KeyedTimerData@dffeb6cc
> registration for the same timer at
> com.google.common.base.Preconditions.checkState(Preconditions.java:459) at
> org.apache.samza.task.EpochTimeScheduler.setTimer(EpochTimeScheduler.java:62)
> at
> org.apache.samza.scheduler.CallbackSchedulerImpl.scheduleCallback(CallbackSchedulerImpl.java:37)
> at
> org.apache.samza.operators.impl.OperatorImpl$1.schedule(OperatorImpl.java:446)
> at
> org.apache.beam.runners.samza.runtime.SamzaTimerInternalsFactory$SamzaTimerInternals.setTimer(SamzaTimerInternalsFactory.java:214)
> at
> org.apache.beam.runners.core.ReduceFnContextFactory$TimersImpl.setTimer(ReduceFnContextFactory.java:135)
> at
> org.apache.beam.runners.core.triggers.TriggerStateMachineContextFactory$TriggerTimers.setTimer(TriggerStateMachineContextFactory.java:188)
> at
> org.apache.beam.runners.core.triggers.TriggerStateMachineContextFactory$TriggerTimers.setTimer(TriggerStateMachineContextFactory.java:188)
> at
> org.apache.beam.runners.core.triggers.TriggerStateMachineContextFactory$TriggerTimers.setTimer(TriggerStateMachineContextFactory.java:188)
> at
> org.apache.beam.runners.core.triggers.TriggerStateMachineContextFactory$OnMergeContextImpl.setTimer(TriggerStateMachineContextFactory.java:478)
> at
> org.apache.beam.runners.core.triggers.AfterDelayFromFirstElementStateMachine.onMerge(AfterDelayFromFirstElementStateMachine.java:210)
> at
> org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine.invokeOnMerge(ExecutableTriggerStateMachine.java:129)
> at
> org.apache.beam.runners.core.triggers.RepeatedlyStateMachine.onMerge(RepeatedlyStateMachine.java:62)
> at
> org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine.invokeOnMerge(ExecutableTriggerStateMachine.java:129)
> at
> org.apache.beam.runners.core.triggers.TriggerStateMachineRunner.onMerge(TriggerStateMachineRunner.java:172)
> at
> org.apache.beam.runners.core.ReduceFnRunner$OnMergeCallback.onMerge(ReduceFnRunner.java:510)
> at
> org.apache.beam.runners.core.MergingActiveWindowSet$MergeContextImpl.recordMerges(MergingActiveWindowSet.java:211)
> at
> org.apache.beam.runners.core.MergingActiveWindowSet.merge(MergingActiveWindowSet.java:229)
> at
> org.apache.beam.runners.core.ReduceFnRunner.mergeWindows(ReduceFnRunner.java:436)
> at
> org.apache.beam.runners.core.ReduceFnRunner.processElements(ReduceFnRunner.java:329)
> at
> org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn.processElement(GroupAlsoByWindowViaWindowSetNewDoFn.java:136)
>
>
>
> Does anyone have suggestions or experienced similar issue previously?
>
> Thanks,
> Sandeep
>
>