You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Anton Lytvynenko (JIRA)" <ji...@apache.org> on 2019/04/03 11:14:05 UTC

[jira] [Updated] (BEAM-6991) EOS: Streaming job fails on job restart with withEOS specified

     [ https://issues.apache.org/jira/browse/BEAM-6991?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Anton Lytvynenko updated BEAM-6991:
-----------------------------------
    Priority: Blocker  (was: Major)

> EOS: Streaming job fails on job restart with withEOS specified
> --------------------------------------------------------------
>
>                 Key: BEAM-6991
>                 URL: https://issues.apache.org/jira/browse/BEAM-6991
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-dataflow, runner-direct
>    Affects Versions: 2.9.0, 2.11.0
>            Reporter: Anton Lytvynenko
>            Priority: Blocker
>
> According to the [documentation|https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-WhatarePIDsandsequencenumbersandhowaretheyrelatedto%60transactional.id%60?], the *'transactional.id'* should be the same on producer restart.
> In BEAM, the *'transactional.id'* is defined under the hood as follows in *org.apache.beam.sdk.io.kafka.KafkaExactlyOnceSink.ExactlyOnceWriter#initShardWriter*:
>  
> {code:java}
> String producerName = String.format("producer_%d_for_%s", shard, spec.getSinkGroupId());
> ...
> Map<String, Object> producerConfig = new HashMap<>(spec.getProducerConfig());
>  producerConfig.putAll(
>  ImmutableMap.of(
>  ProducerSpEL.ENABLE_IDEMPOTENCE_CONFIG, true,
>  ProducerSpEL.TRANSACTIONAL_ID_CONFIG, producerName));
> ...{code}
>  
> So to make it consistent (with same value that was used by this writer on previous job run) after job restart, I need to configure KafkaIO writer with the constant *'sinkGroupId':*
>  
> {code:java}
> .withEOS(numShards, "myWriterSinkGroupId");{code}
>  
> and restart the job after it was canceled I get the following exception:
> {code:java}
> java.lang.RuntimeException: org.apache.beam.sdk.util.UserCodeException: java.lang.IllegalStateException: Kafka metadata exists for shard 0, but there is no stored state for it. This mostly indicates groupId 'myWriterSinkGroupId' is used else where or in earlier runs. Try another group id. Metadata for this shard on Kafka : '{"seq":238,"id":"B5E7EED - 2019-04-02 16:05:26"}'
>  org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn$1.output(GroupAlsoByWindowsParDoFn.java:184)
>  org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner$1.outputWindowedValue(GroupAlsoByWindowFnRunner.java:102)
>  org.apache.beam.runners.core.ReduceFnRunner.lambda$onTrigger$1(ReduceFnRunner.java:1057)
>  org.apache.beam.runners.core.ReduceFnContextFactory$OnTriggerContextImpl.output(ReduceFnContextFactory.java:438)
>  org.apache.beam.runners.core.SystemReduceFn.onTrigger(SystemReduceFn.java:125)
>  org.apache.beam.runners.core.ReduceFnRunner.onTrigger(ReduceFnRunner.java:1060)
>  org.apache.beam.runners.core.ReduceFnRunner.emit(ReduceFnRunner.java:930)
>  org.apache.beam.runners.core.ReduceFnRunner.processElements(ReduceFnRunner.java:368)
>  org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement(StreamingGroupAlsoByWindowViaWindowSetFn.java:94)
>  org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement(StreamingGroupAlsoByWindowViaWindowSetFn.java:42)
>  org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.invokeProcessElement(GroupAlsoByWindowFnRunner.java:115)
>  org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.processElement(GroupAlsoByWindowFnRunner.java:73)
>  org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement(LateDataDroppingDoFnRunner.java:80)
>  org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn.processElement(GroupAlsoByWindowsParDoFn.java:134)
>  org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
>  org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
>  org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:201)
>  org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159)
>  org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:76)
>  org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1228)
>  org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:143)
>  org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:967)
>  java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.beam.sdk.util.UserCodeException: java.lang.IllegalStateException: Kafka metadata exists for shard 0, but there is no stored state for it. This mostly indicates groupId 'myWriterSinkGroupId' is used else where or in earlier runs. Try another group id. Metadata for this shard on Kafka : '{"seq":238,"id":"B5E7EED - 2019-04-02 16:05:26"}'
>  org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:34)
>  org.apache.beam.sdk.io.kafka.KafkaExactlyOnceSink$ExactlyOnceWriter$DoFnInvoker.invokeProcessElement(Unknown Source)
>  org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:275)
>  org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:237)
>  org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:325)
>  org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
>  org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
>  org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn$1.output(GroupAlsoByWindowsParDoFn.java:182)
>  org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner$1.outputWindowedValue(GroupAlsoByWindowFnRunner.java:102)
>  org.apache.beam.runners.core.ReduceFnRunner.lambda$onTrigger$1(ReduceFnRunner.java:1057)
>  org.apache.beam.runners.core.ReduceFnContextFactory$OnTriggerContextImpl.output(ReduceFnContextFactory.java:438)
>  org.apache.beam.runners.core.SystemReduceFn.onTrigger(SystemReduceFn.java:125)
>  org.apache.beam.runners.core.ReduceFnRunner.onTrigger(ReduceFnRunner.java:1060)
>  org.apache.beam.runners.core.ReduceFnRunner.emit(ReduceFnRunner.java:930)
>  org.apache.beam.runners.core.ReduceFnRunner.processElements(ReduceFnRunner.java:368)
>  org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement(StreamingGroupAlsoByWindowViaWindowSetFn.java:94)
>  org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement(StreamingGroupAlsoByWindowViaWindowSetFn.java:42)
>  org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.invokeProcessElement(GroupAlsoByWindowFnRunner.java:115)
>  org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.processElement(GroupAlsoByWindowFnRunner.java:73)
>  org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement(LateDataDroppingDoFnRunner.java:80)
>  org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn.processElement(GroupAlsoByWindowsParDoFn.java:134)
>  org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
>  org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
>  org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:201)
>  org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159)
>  org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:76)
>  org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1228)
>  org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:143)
>  org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:967)
>  java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.IllegalStateException: Kafka metadata exists for shard 0, but there is no stored state for it. This mostly indicates groupId 'myWriterSinkGroupId' is used else where or in earlier runs. Try another group id. Metadata for this shard on Kafka : '{"seq":238,"id":"B5E7EED - 2019-04-02 16:05:26"}'
>  org.apache.beam.sdk.io.kafka.KafkaExactlyOnceSink$ExactlyOnceWriter.initShardWriter(KafkaExactlyOnceSink.java:574)
>  org.apache.beam.sdk.io.kafka.KafkaExactlyOnceSink$ExactlyOnceWriter.processElement(KafkaExactlyOnceSink.java:294){code}
> That basically says that I need to change '*sinkGroupId'* to something different, but if I change it and rerun a job, then I have duplicated messages in the destination topic. 
> In other words, it breaks the exactly-once semantics messages delivery guarantees.
> My project uses 2.9.0 beam.version but I tried with 2.11.0 as well, the behavior is the same.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)