You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Jürgen Kreileder (JIRA)" <ji...@apache.org> on 2019/02/20 22:45:00 UTC
[jira] [Comment Edited] (FLINK-11654) ProducerFencedExceptions from
Kafka in EXACTLY_ONCE mode due to identical transactional IDs in multiple
jobs
[ https://issues.apache.org/jira/browse/FLINK-11654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16773458#comment-16773458 ]
Jürgen Kreileder edited comment on FLINK-11654 at 2/20/19 10:44 PM:
--------------------------------------------------------------------
I could probably provide a fix and test case for this if somebody can tell me if adding the JobID to the prefix is actually the right way of fixing it (something like [https://github.com/jkreileder/flink/commit/81100d07e03e3ec107e389ab35c59bd7cc9a6378]).
JobID is a 16-byte random value, so there's no need to add something like a cluster ID. But is JobID the right concept here?
was (Author: jkreileder):
I could probably provide a fix and test case for this if somebody can tell me if adding the JobID to the prefix is actually the right way of fixing it (something like [https://github.com/jkreileder/flink/commit/81100d07e03e3ec107e389ab35c59bd7cc9a6378]).
JobID is a 16-bit random value, so there's no need to add something like a cluster ID. But is JobID the right concept here?
> ProducerFencedExceptions from Kafka in EXACTLY_ONCE mode due to identical transactional IDs in multiple jobs
> ------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-11654
> URL: https://issues.apache.org/jira/browse/FLINK-11654
> Project: Flink
> Issue Type: Bug
> Components: Kafka Connector
> Affects Versions: 1.7.1
> Reporter: Jürgen Kreileder
> Priority: Blocker
> Fix For: 1.8.0
>
>
> We run multiple jobs on a cluster which write a lot to the same Kafka topic from identically named sinks. When EXACTLY_ONCE semantic is enabled for the KafkaProducers we run into a lot of ProducerFencedExceptions and all jobs go into a restart cycle.
> Example exception from the Kafka log:
>
> {code:java}
> [2019-02-18 18:05:28,485] ERROR [ReplicaManager broker=1] Error processing append operation on partition finding-commands-dev-1-0 (kafka.server.ReplicaManager)
> org.apache.kafka.common.errors.ProducerFencedException: Producer's epoch is no longer valid. There is probably another producer with a newer epoch. 483 (request epoch), 484 (server epoch)
> {code}
> The reason for this is the way FlinkKafkaProducer initializes the TransactionalIdsGenerator:
> The IDs are only guaranteed to be unique for a single Job. But they can clash between different Jobs (and Clusters).
>
>
> {code:java}
> --- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
> +++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
> @@ -819,6 +819,7 @@ public class FlinkKafkaProducer<IN>
> nextTransactionalIdHintState = context.getOperatorStateStore().getUnionListState(
> NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR);
> transactionalIdsGenerator = new TransactionalIdsGenerator(
> + // the prefix probably should include job id and maybe cluster id
> getRuntimeContext().getTaskName() + "-" + ((StreamingRuntimeContext) getRuntimeContext()).getOperatorUniqueID(),
> getRuntimeContext().getIndexOfThisSubtask(),
> getRuntimeContext().getNumberOfParallelSubtasks(),{code}
>
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)