You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "freezhan (Jira)" <ji...@apache.org> on 2020/07/16 10:49:00 UTC

[jira] [Comment Edited] (FLINK-11654) Multiple transactional KafkaProducers writing to same cluster have clashing transaction IDs

    [ https://issues.apache.org/jira/browse/FLINK-11654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17159108#comment-17159108 ] 

freezhan edited comment on FLINK-11654 at 7/16/20, 10:48 AM:
-------------------------------------------------------------

[~becket_qin] 

Are you still working on this question?

What's next?

 

I agree with the result of the discussion. 
 # Expose {{transactionIdPrefix ({color:#FF0000}And add length checks{color})}}
 # Fail Fast when overriding Kafka properties

 

I have a similar problem

https://issues.apache.org/jira/browse/FLINK-17691

[https://github.com/apache/flink/pull/12157]

 

If you don't think about continuing to fix it, then I want to take over.


was (Author: freezhan):
[~becket_qin] 

Are you still working on this question?

What's next?

 

I agree with the result of the discussion. 
 # Expose {{transactionIdPrefix}}
 # Fail Fast when overriding Kafka properties

 

I have a similar problem

https://issues.apache.org/jira/browse/FLINK-17691

[https://github.com/apache/flink/pull/12157]

 

If you don't think about continuing to fix it, then I want to take over.

> Multiple transactional KafkaProducers writing to same cluster have clashing transaction IDs
> -------------------------------------------------------------------------------------------
>
>                 Key: FLINK-11654
>                 URL: https://issues.apache.org/jira/browse/FLINK-11654
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>    Affects Versions: 1.7.1
>            Reporter: Jürgen Kreileder
>            Assignee: Jiangjie Qin
>            Priority: Major
>
> We run multiple jobs on a cluster which write a lot to the same Kafka topic from identically named sinks. When EXACTLY_ONCE semantic is enabled for the KafkaProducers we run into a lot of ProducerFencedExceptions and all jobs go into a restart cycle.
> Example exception from the Kafka log:
>  
> {code:java}
> [2019-02-18 18:05:28,485] ERROR [ReplicaManager broker=1] Error processing append operation on partition finding-commands-dev-1-0 (kafka.server.ReplicaManager)
> org.apache.kafka.common.errors.ProducerFencedException: Producer's epoch is no longer valid. There is probably another producer with a newer epoch. 483 (request epoch), 484 (server epoch)
> {code}
> The reason for this is the way FlinkKafkaProducer initializes the TransactionalIdsGenerator:
> The IDs are only guaranteed to be unique for a single Job. But they can clash between different Jobs (and Clusters).
>  
>  
> {code:java}
> --- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
> +++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
> @@ -819,6 +819,7 @@ public class FlinkKafkaProducer<IN>
>                 nextTransactionalIdHintState = context.getOperatorStateStore().getUnionListState(
>                         NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR);
>                 transactionalIdsGenerator = new TransactionalIdsGenerator(
> + // the prefix probably should include job id and maybe cluster id
>                         getRuntimeContext().getTaskName() + "-" + ((StreamingRuntimeContext) getRuntimeContext()).getOperatorUniqueID(),
>                         getRuntimeContext().getIndexOfThisSubtask(),
>                         getRuntimeContext().getNumberOfParallelSubtasks(),{code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)