You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Alexey Romanenko (JIRA)" <ji...@apache.org> on 2019/04/09 17:18:00 UTC

[jira] [Commented] (BEAM-6991) EOS: Streaming job fails on job restart with withEOS specified

    [ https://issues.apache.org/jira/browse/BEAM-6991?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16813636#comment-16813636 ] 

Alexey Romanenko commented on BEAM-6991:
----------------------------------------

[~anton.lytvynenko] Thank you for reporting this and detailed explanation!

I may confirm that there is an issue with restarting a pipeline when {{withEOS()}} is used. It happens when {{SinkGroupId}} stays the same between two different pipeline runs. In the same time, since {{SinkGroupId}} is a part of {{transactional.id}}, it *has to be* the same according to Kafka doc mentioned above:

{{A `transactional.id` is a user config and thus on producer restart, the same `transactional.id` is uses. This allows brokers to identify the same producer across producer restarts. This identification is required to guarantee consistency in case of a failure: if a producer has an open transaction and crashed, on producer restart the brokers can detect the open transaction and abort it automatically.}}

On the other hand, when we restart job with the same groupId then {{writerId}} seems to be null but we have {{committed.metadata}} for this {{SinkGroupId}}. That is why it throws {{IllegalStateException}} in [KafkaExactlyOnceSink|https://github.com/apache/beam/blob/9f43c115c519633b30290677ba2ba49a6bfd65a6/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java#L568] mentioned above.

[~rangadi] Do you think we have a bug here or I missed something?

> EOS: Streaming job fails on job restart with withEOS specified
> --------------------------------------------------------------
>
>                 Key: BEAM-6991
>                 URL: https://issues.apache.org/jira/browse/BEAM-6991
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-kafka
>    Affects Versions: 2.9.0, 2.11.0
>            Reporter: Anton Lytvynenko
>            Assignee: Alexey Romanenko
>            Priority: Critical
>
> According to the [documentation|https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-WhatarePIDsandsequencenumbersandhowaretheyrelatedto%60transactional.id%60?], the *'transactional.id'* should be the same on producer restart.
> In BEAM, the *'transactional.id'* is defined under the hood as follows in *org.apache.beam.sdk.io.kafka.KafkaExactlyOnceSink.ExactlyOnceWriter#initShardWriter*:
>  
> {code:java}
> String producerName = String.format("producer_%d_for_%s", shard, spec.getSinkGroupId());
> ...
> Map<String, Object> producerConfig = new HashMap<>(spec.getProducerConfig());
>  producerConfig.putAll(
>  ImmutableMap.of(
>  ProducerSpEL.ENABLE_IDEMPOTENCE_CONFIG, true,
>  ProducerSpEL.TRANSACTIONAL_ID_CONFIG, producerName));
> ...{code}
>  
> So to make it consistent (with same value that was used by this writer on previous job run) after job restart, I need to configure KafkaIO writer with the constant *'sinkGroupId':*
>  
> {code:java}
> .withEOS(numShards, "myWriterSinkGroupId");{code}
>  
> and restart the job after it was canceled I get the following exception:
> {code:java}
> java.lang.RuntimeException: org.apache.beam.sdk.util.UserCodeException: java.lang.IllegalStateException: Kafka metadata exists for shard 0, but there is no stored state for it. This mostly indicates groupId 'myWriterSinkGroupId' is used else where or in earlier runs. Try another group id. Metadata for this shard on Kafka : '{"seq":238,"id":"B5E7EED - 2019-04-02 16:05:26"}'
>  org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn$1.output(GroupAlsoByWindowsParDoFn.java:184)
>  org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner$1.outputWindowedValue(GroupAlsoByWindowFnRunner.java:102)
>  org.apache.beam.runners.core.ReduceFnRunner.lambda$onTrigger$1(ReduceFnRunner.java:1057)
>  org.apache.beam.runners.core.ReduceFnContextFactory$OnTriggerContextImpl.output(ReduceFnContextFactory.java:438)
>  org.apache.beam.runners.core.SystemReduceFn.onTrigger(SystemReduceFn.java:125)
>  org.apache.beam.runners.core.ReduceFnRunner.onTrigger(ReduceFnRunner.java:1060)
>  org.apache.beam.runners.core.ReduceFnRunner.emit(ReduceFnRunner.java:930)
>  org.apache.beam.runners.core.ReduceFnRunner.processElements(ReduceFnRunner.java:368)
>  org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement(StreamingGroupAlsoByWindowViaWindowSetFn.java:94)
>  org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement(StreamingGroupAlsoByWindowViaWindowSetFn.java:42)
>  org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.invokeProcessElement(GroupAlsoByWindowFnRunner.java:115)
>  org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.processElement(GroupAlsoByWindowFnRunner.java:73)
>  org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement(LateDataDroppingDoFnRunner.java:80)
>  org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn.processElement(GroupAlsoByWindowsParDoFn.java:134)
>  org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
>  org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
>  org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:201)
>  org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159)
>  org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:76)
>  org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1228)
>  org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:143)
>  org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:967)
>  java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.beam.sdk.util.UserCodeException: java.lang.IllegalStateException: Kafka metadata exists for shard 0, but there is no stored state for it. This mostly indicates groupId 'myWriterSinkGroupId' is used else where or in earlier runs. Try another group id. Metadata for this shard on Kafka : '{"seq":238,"id":"B5E7EED - 2019-04-02 16:05:26"}'
>  org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:34)
>  org.apache.beam.sdk.io.kafka.KafkaExactlyOnceSink$ExactlyOnceWriter$DoFnInvoker.invokeProcessElement(Unknown Source)
>  org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:275)
>  org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:237)
>  org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:325)
>  org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
>  org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
>  org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn$1.output(GroupAlsoByWindowsParDoFn.java:182)
>  org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner$1.outputWindowedValue(GroupAlsoByWindowFnRunner.java:102)
>  org.apache.beam.runners.core.ReduceFnRunner.lambda$onTrigger$1(ReduceFnRunner.java:1057)
>  org.apache.beam.runners.core.ReduceFnContextFactory$OnTriggerContextImpl.output(ReduceFnContextFactory.java:438)
>  org.apache.beam.runners.core.SystemReduceFn.onTrigger(SystemReduceFn.java:125)
>  org.apache.beam.runners.core.ReduceFnRunner.onTrigger(ReduceFnRunner.java:1060)
>  org.apache.beam.runners.core.ReduceFnRunner.emit(ReduceFnRunner.java:930)
>  org.apache.beam.runners.core.ReduceFnRunner.processElements(ReduceFnRunner.java:368)
>  org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement(StreamingGroupAlsoByWindowViaWindowSetFn.java:94)
>  org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement(StreamingGroupAlsoByWindowViaWindowSetFn.java:42)
>  org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.invokeProcessElement(GroupAlsoByWindowFnRunner.java:115)
>  org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.processElement(GroupAlsoByWindowFnRunner.java:73)
>  org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement(LateDataDroppingDoFnRunner.java:80)
>  org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn.processElement(GroupAlsoByWindowsParDoFn.java:134)
>  org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
>  org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
>  org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:201)
>  org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159)
>  org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:76)
>  org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1228)
>  org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:143)
>  org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:967)
>  java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.IllegalStateException: Kafka metadata exists for shard 0, but there is no stored state for it. This mostly indicates groupId 'myWriterSinkGroupId' is used else where or in earlier runs. Try another group id. Metadata for this shard on Kafka : '{"seq":238,"id":"B5E7EED - 2019-04-02 16:05:26"}'
>  org.apache.beam.sdk.io.kafka.KafkaExactlyOnceSink$ExactlyOnceWriter.initShardWriter(KafkaExactlyOnceSink.java:574)
>  org.apache.beam.sdk.io.kafka.KafkaExactlyOnceSink$ExactlyOnceWriter.processElement(KafkaExactlyOnceSink.java:294){code}
> That basically says that I need to change '*sinkGroupId'* to something different, but if I change it and rerun a job, then I have duplicated messages in the destination topic. 
> In other words, it breaks the exactly-once semantics messages delivery guarantees.
> My project uses 2.9.0 beam.version but I tried with 2.11.0 as well, the behavior is the same.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)