You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by "Slotterback, Chris" <Ch...@comcast.com> on 2019/02/13 01:15:08 UTC

ProducerFencedException when running 2 jobs with FlinkKafkaProducer

Hey all,

I am running into an issue where if I run 2 flink jobs (same jar, different configuration), that produce to different kafka topics on the same broker, using the 1.7 FlinkKafkaProducer set with EXACTLY_ONCE semantics, both jobs go into a checkpoint exception loop every 15 seconds or so:

Caused by: org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.

As soon as one of the jobs is cancelled, things go back to normal for the other job. I tried manually setting the TRANSACTIONAL_ID_CONFIG config in the producer to be unique for each of the jobs. My producer transaction timeout is set to 5 minutes, and flink checkpointing is set to 1 minute. Is there some way to prevent these jobs from tripping over each other in execution while retaining exactly once processing?

Re: ProducerFencedException when running 2 jobs with FlinkKafkaProducer

Posted by "Tzu-Li (Gordon) Tai" <tz...@apache.org>.
Hi,

I just saw a JIRA opened for this:
https://issues.apache.org/jira/browse/FLINK-11654.

The JIRA ticket's description matches what I had in mind and can confirm
the bug assessment. Unfortunately, I currently do not have the capacity to
provide a fix and test for this.
For the meantime, I've made this a blocker for releasing 1.8.0. It would be
great if someone can try out the proposed fix mentioned in the JIRA, see if
it fixes the issue in your cases, and provide a PR for the patch.

Thanks,
Gordon

On Tue, Feb 19, 2019 at 9:46 AM Rohan Thimmappa <ro...@gmail.com>
wrote:

> Hi Tzu-Li,
>
> Any updated on this. This is consistently reproducible.
>
> Same jar - Separate source topic to Separate  destination topic.
>
> This sort of blocker for flink upgrada. i tried with 1.7.2 but no luck.
>
> org.apache.flink.streaming.connectors.kafka.FlinkKafka011Exception: Failed to send data to Kafka: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.
> 	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.checkErroneous(FlinkKafkaProducer011.java:994)
> 	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.invoke(FlinkKafkaProducer011.java:615)
> 	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.invoke(FlinkKafkaProducer011.java:94)
> 	at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:230)
> 	at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
> 	at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
> 	at com.comcast.ips.transformation.EMMFlatMap.flatMap(EMMFlatMap.java:98)
> 	at com.comcast.ips.transformation.EMMFlatMap.flatMap(EMMFlatMap.java:33)
> 	at org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50)
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
> 	at org.apache.flink.streaming.runtime.operators.TimestampsAndPeriodicWatermarksOperator.processElement(TimestampsAndPeriodicWatermarksOperator.java:67)
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
>
>
>
>
>
> Rohan
>
> On Wed, Feb 13, 2019 at 12:33 AM Tzu-Li (Gordon) Tai <tz...@apache.org>
> wrote:
>
>> Hi,
>>
>> I think this is unexpected. The generated transactional ids should not be
>> clashing.
>> Looking at the FlinkKafkaProducer code, it seems like the generation is
>> only a function of the subtask id of the FlinkKafkaProducer, which could be
>> the same across 2 different Kafka sources.
>>
>> I'm not completely certain about this. Piotr (in CC) might have more
>> insights for this.
>>
>> Cheers,
>> Gordon
>>
>> On Wed, Feb 13, 2019 at 9:15 AM Slotterback, Chris <
>> Chris_Slotterback@comcast.com> wrote:
>>
>>> Hey all,
>>>
>>>
>>>
>>> I am running into an issue where if I run 2 flink jobs (same jar,
>>> different configuration), that produce to different kafka topics on the
>>> same broker, using the 1.7 FlinkKafkaProducer set with EXACTLY_ONCE
>>> semantics, both jobs go into a checkpoint exception loop every 15 seconds
>>> or so:
>>>
>>>
>>>
>>> Caused by: org.apache.kafka.common.errors.ProducerFencedException:
>>> Producer attempted an operation with an old epoch. Either there is a newer
>>> producer with the same transactionalId, or the producer's transaction has
>>> been expired by the broker.
>>>
>>>
>>>
>>> As soon as one of the jobs is cancelled, things go back to normal for
>>> the other job. I tried manually setting the TRANSACTIONAL_ID_CONFIG config
>>> in the producer to be unique for each of the jobs. My producer transaction
>>> timeout is set to 5 minutes, and flink checkpointing is set to 1 minute. Is
>>> there some way to prevent these jobs from tripping over each other in
>>> execution while retaining exactly once processing?
>>>
>>
>
> --
> Thanks
> Rohan
>

Re: ProducerFencedException when running 2 jobs with FlinkKafkaProducer

Posted by Rohan Thimmappa <ro...@gmail.com>.
Hi Tzu-Li,

Any updated on this. This is consistently reproducible.

Same jar - Separate source topic to Separate  destination topic.

This sort of blocker for flink upgrada. i tried with 1.7.2 but no luck.

org.apache.flink.streaming.connectors.kafka.FlinkKafka011Exception:
Failed to send data to Kafka: Producer attempted an operation with an
old epoch. Either there is a newer producer with the same
transactionalId, or the producer's transaction has been expired by the
broker.
	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.checkErroneous(FlinkKafkaProducer011.java:994)
	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.invoke(FlinkKafkaProducer011.java:615)
	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.invoke(FlinkKafkaProducer011.java:94)
	at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:230)
	at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
	at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
	at com.comcast.ips.transformation.EMMFlatMap.flatMap(EMMFlatMap.java:98)
	at com.comcast.ips.transformation.EMMFlatMap.flatMap(EMMFlatMap.java:33)
	at org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
	at org.apache.flink.streaming.runtime.operators.TimestampsAndPeriodicWatermarksOperator.processElement(TimestampsAndPeriodicWatermarksOperator.java:67)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)





Rohan

On Wed, Feb 13, 2019 at 12:33 AM Tzu-Li (Gordon) Tai <tz...@apache.org>
wrote:

> Hi,
>
> I think this is unexpected. The generated transactional ids should not be
> clashing.
> Looking at the FlinkKafkaProducer code, it seems like the generation is
> only a function of the subtask id of the FlinkKafkaProducer, which could be
> the same across 2 different Kafka sources.
>
> I'm not completely certain about this. Piotr (in CC) might have more
> insights for this.
>
> Cheers,
> Gordon
>
> On Wed, Feb 13, 2019 at 9:15 AM Slotterback, Chris <
> Chris_Slotterback@comcast.com> wrote:
>
>> Hey all,
>>
>>
>>
>> I am running into an issue where if I run 2 flink jobs (same jar,
>> different configuration), that produce to different kafka topics on the
>> same broker, using the 1.7 FlinkKafkaProducer set with EXACTLY_ONCE
>> semantics, both jobs go into a checkpoint exception loop every 15 seconds
>> or so:
>>
>>
>>
>> Caused by: org.apache.kafka.common.errors.ProducerFencedException:
>> Producer attempted an operation with an old epoch. Either there is a newer
>> producer with the same transactionalId, or the producer's transaction has
>> been expired by the broker.
>>
>>
>>
>> As soon as one of the jobs is cancelled, things go back to normal for the
>> other job. I tried manually setting the TRANSACTIONAL_ID_CONFIG config in
>> the producer to be unique for each of the jobs. My producer transaction
>> timeout is set to 5 minutes, and flink checkpointing is set to 1 minute. Is
>> there some way to prevent these jobs from tripping over each other in
>> execution while retaining exactly once processing?
>>
>

-- 
Thanks
Rohan

Re: ProducerFencedException when running 2 jobs with FlinkKafkaProducer

Posted by "Tzu-Li (Gordon) Tai" <tz...@apache.org>.
Hi,

I think this is unexpected. The generated transactional ids should not be
clashing.
Looking at the FlinkKafkaProducer code, it seems like the generation is
only a function of the subtask id of the FlinkKafkaProducer, which could be
the same across 2 different Kafka sources.

I'm not completely certain about this. Piotr (in CC) might have more
insights for this.

Cheers,
Gordon

On Wed, Feb 13, 2019 at 9:15 AM Slotterback, Chris <
Chris_Slotterback@comcast.com> wrote:

> Hey all,
>
>
>
> I am running into an issue where if I run 2 flink jobs (same jar,
> different configuration), that produce to different kafka topics on the
> same broker, using the 1.7 FlinkKafkaProducer set with EXACTLY_ONCE
> semantics, both jobs go into a checkpoint exception loop every 15 seconds
> or so:
>
>
>
> Caused by: org.apache.kafka.common.errors.ProducerFencedException:
> Producer attempted an operation with an old epoch. Either there is a newer
> producer with the same transactionalId, or the producer's transaction has
> been expired by the broker.
>
>
>
> As soon as one of the jobs is cancelled, things go back to normal for the
> other job. I tried manually setting the TRANSACTIONAL_ID_CONFIG config in
> the producer to be unique for each of the jobs. My producer transaction
> timeout is set to 5 minutes, and flink checkpointing is set to 1 minute. Is
> there some way to prevent these jobs from tripping over each other in
> execution while retaining exactly once processing?
>