You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Morgan Geldenhuys <mo...@tu-berlin.de> on 2021/02/28 14:34:34 UTC

Issues running multiple Jobs using the same JAR

Greetings all,

I am having an issue instantiating multiple flink jobs uisng the same 
JAR in the same Flink native cluster (all 1.12.1).

When processing events, the jobs fail with the following trace:

org.apache.kafka.common.KafkaException: Cannotperform send because at 
least one previous transactional oridempotent request has failed with 
errors.
     at 
org.apache.kafka.clients.producer.internals.TransactionManager.failIfNotReadyForSend(TransactionManager.java:356)
     at 
org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:926)
     at 
org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:865)
     at 
org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer.send(FlinkKafkaInternalProducer.java:133)
     at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:915)
     at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:99)
     at 
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:223)
     at 
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54)
     at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:187)
     at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:204)
     at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174)
     at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
     at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:395)
     at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
     at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:609)
     at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573)
     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
     at java.base/java.lang.Thread.run(UnknownSource)
Suppressed: 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: 
Failedto send data to Kafka: Producerattempted an operation with an old 
epoch. Eitherthere is a newer producer with the same transactionalId, 
orthe producer's transaction has been expired by the broker.
         at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1392)
         at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.close(FlinkKafkaProducer.java:965)
         at 
org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41)
         at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
         at 
org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:783)
         at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:762)
         at 
org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:681)
         at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:585)
         ... 3more
Causedby: org.apache.kafka.common.errors.ProducerFencedException: 
Producerattempted an operation with an old epoch. Eitherthere is a newer 
producer with the same transactionalId, orthe producer's transaction has 
been expired by the broker.
Suppressed: java.lang.IllegalStateException: Pendingrecord count must be 
zero at thispoint: 1
             at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.flush(FlinkKafkaProducer.java:1090)
             at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.close(FlinkKafkaProducer.java:925)
             at 
org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41)
             at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
             at 
org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:783)
             at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:762)
             at 
org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:681)
             at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:585)
             at 
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
             at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
             at java.base/java.lang.Thread.run(UnknownSource)
Suppressed: java.lang.IllegalStateException: Pendingrecord count must be 
zero at thispoint: 1
             at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.flush(FlinkKafkaProducer.java:1090)
             at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.close(FlinkKafkaProducer.java:925)
             at 
org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41)
             at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
             at 
org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:783)
             at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:762)
             at 
org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:681)
             at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:585)
             at 
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
             at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
             at java.base/java.lang.Thread.run(UnknownSource) Concerning 
configurations, I have set the transaction.max.timeout.ms on the Kafka 
server to one hour as advised here 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-producers-and-fault-tolerance. 
Additionally in the producer TRANSACTIONAL_ID_CONFIG variable to random 
i.e. UUID.randomUUID().toString() Any ideas of why this would be the 
case? Regards, Morgan.

Re: Issues running multiple Jobs using the same JAR

Posted by Morgan Geldenhuys <mo...@tu-berlin.de>.
That solved it, thank you very much Kezhu :)

On 28.02.21 16:12, Kezhu Wang wrote:
> Hi Morgan,
>
> You could check FLINK-11654, from its description, I think it is the 
> problem you encountered.
>
> > We run multiple jobs on a cluster which write a lot to the same 
> Kafka topic from identically named sinks. When EXACTLY_ONCE semantic 
> is enabled for the KafkaProducers we run into a lot of 
> ProducerFencedExceptions and all jobs go into a restart cycle.
>
> FLINK-11654: https://issues.apache.org/jira/browse/FLINK-11654
>
>
> Best,
> Kezhu Wang
>
>
> On February 28, 2021 at 22:35:02, Morgan Geldenhuys 
> (morgan.geldenhuys@tu-berlin.de 
> <ma...@tu-berlin.de>) wrote:
>
>> Greetings all,
>>
>> I am having an issue instantiating multiple flink jobs uisng the same 
>> JAR in the same Flink native cluster (all 1.12.1).
>>
>> When processing events, the jobs fail with the following trace:
>>
>> org.apache.kafka.common.KafkaException: Cannotperform send because at 
>> least one previous transactional oridempotent request has failed with 
>> errors.
>>     at 
>> org.apache.kafka.clients.producer.internals.TransactionManager.failIfNotReadyForSend(TransactionManager.java:356)
>>     at 
>> org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:926)
>>     at 
>> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:865)
>>     at 
>> org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer.send(FlinkKafkaInternalProducer.java:133)
>>     at 
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:915)
>>     at 
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:99)
>>     at 
>> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:223)
>>     at 
>> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54)
>>     at 
>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:187)
>>     at org.apache.flink.streaming.runtime.io 
>> <http://org.apache.flink.streaming.runtime.io>.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:204)
>>     at org.apache.flink.streaming.runtime.io 
>> <http://org.apache.flink.streaming.runtime.io>.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174)
>>     at org.apache.flink.streaming.runtime.io 
>> <http://org.apache.flink.streaming.runtime.io>.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>>     at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:395)
>>     at 
>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
>>     at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:609)
>>     at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573)
>>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
>>     at java.base/java.lang.Thread.run(UnknownSource)
>> Suppressed: 
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: 
>> Failedto send data to Kafka: Producerattempted an operation with an 
>> old epoch. Eitherthere is a newer producer with the same 
>> transactionalId, orthe producer's transaction has been expired by the 
>> broker.
>>         at 
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1392)
>>         at 
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.close(FlinkKafkaProducer.java:965)
>>         at 
>> org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41)
>>         at 
>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
>>         at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:783)
>>         at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:762)
>>         at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:681)
>>         at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:585)
>>         ... 3more
>> Causedby: org.apache.kafka.common.errors.ProducerFencedException: 
>> Producerattempted an operation with an old epoch. Eitherthere is a 
>> newer producer with the same transactionalId, orthe producer's 
>> transaction has been expired by the broker.
>> Suppressed: java.lang.IllegalStateException: Pendingrecord count must 
>> be zero at thispoint: 1
>>             at 
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.flush(FlinkKafkaProducer.java:1090)
>>             at 
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.close(FlinkKafkaProducer.java:925)
>>             at 
>> org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41)
>>             at 
>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
>>             at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:783)
>>             at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:762)
>>             at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:681)
>>             at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:585)
>>             at 
>> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
>>             at 
>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
>>             at java.base/java.lang.Thread.run(UnknownSource)
>> Suppressed: java.lang.IllegalStateException: Pendingrecord count must 
>> be zero at thispoint: 1
>>             at 
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.flush(FlinkKafkaProducer.java:1090)
>>             at 
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.close(FlinkKafkaProducer.java:925)
>>             at 
>> org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41)
>>             at 
>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
>>             at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:783)
>>             at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:762)
>>             at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:681)
>>             at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:585)
>>             at 
>> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
>>             at 
>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
>>             at java.base/java.lang.Thread.run(UnknownSource) 
>> Concerning configurations, I have set the transaction.max.timeout.ms 
>> <http://transaction.max.timeout.ms> on the Kafka server to one hour 
>> as advised here 
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-producers-and-fault-tolerance. 
>> Additionally in the producer TRANSACTIONAL_ID_CONFIG variable to 
>> random i.e. UUID.randomUUID().toString() Any ideas of why this would 
>> be the case? Regards, Morgan.


Re: Issues running multiple Jobs using the same JAR

Posted by Kezhu Wang <ke...@gmail.com>.
Hi Morgan,

You could check FLINK-11654, from its description, I think it is the
problem you encountered.

> We run multiple jobs on a cluster which write a lot to the same Kafka
topic from identically named sinks. When EXACTLY_ONCE semantic is enabled
for the KafkaProducers we run into a lot of ProducerFencedExceptions and
all jobs go into a restart cycle.

FLINK-11654: https://issues.apache.org/jira/browse/FLINK-11654


Best,
Kezhu Wang


On February 28, 2021 at 22:35:02, Morgan Geldenhuys (
morgan.geldenhuys@tu-berlin.de) wrote:

Greetings all,

I am having an issue instantiating multiple flink jobs uisng the same JAR
in the same Flink native cluster (all 1.12.1).

When processing events, the jobs fail with the following trace:

org.apache.kafka.common.KafkaException: Cannot perform send because at
least one previous transactional or idempotent request has failed with
errors.
    at org.apache.kafka.clients.producer.internals.TransactionManager
.failIfNotReadyForSend(TransactionManager.java:356)
    at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer
.java:926)
    at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer
.java:865)
    at org.apache.flink.streaming.connectors.kafka.internals.
FlinkKafkaInternalProducer.send(FlinkKafkaInternalProducer.java:133)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
.invoke(FlinkKafkaProducer.java:915)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
.invoke(FlinkKafkaProducer.java:99)
    at org.apache.flink.streaming.api.functions.sink.
TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:223)
    at org.apache.flink.streaming.api.operators.StreamSink.processElement(
StreamSink.java:54)
    at org.apache.flink.streaming.runtime.tasks.
OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask
.java:187)
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
.processElement(StreamTaskNetworkInput.java:204)
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
.emitNext(StreamTaskNetworkInput.java:174)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
.processInput(StreamOneInputProcessor.java:65)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(
StreamTask.java:395)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
.runMailboxLoop(MailboxProcessor.java:191)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(
StreamTask.java:609)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask
.java:573)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
    at java.base/java.lang.Thread.run(Unknown Source)
    Suppressed: org.apache.flink.streaming.connectors.kafka.
FlinkKafkaException: Failed to send data to Kafka: Producer attempted an
operation with an old epoch. Either there is a newer producer with the same
transactionalId, or the producer's transaction has been expired by the
broker.
        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
.checkErroneous(FlinkKafkaProducer.java:1392)
        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
.close(FlinkKafkaProducer.java:965)
        at org.apache.flink.api.common.functions.util.FunctionUtils
.closeFunction(FunctionUtils.java:41)
        at org.apache.flink.streaming.api.operators.
AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
        at org.apache.flink.streaming.runtime.tasks.StreamTask
.disposeAllOperators(StreamTask.java:783)
        at org.apache.flink.streaming.runtime.tasks.StreamTask
.runAndSuppressThrowable(StreamTask.java:762)
        at org.apache.flink.streaming.runtime.tasks.StreamTask
.cleanUpInvoke(StreamTask.java:681)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
StreamTask.java:585)
        ... 3 more
    Caused by: org.apache.kafka.common.errors.ProducerFencedException:
Producer attempted an operation with an old epoch. Either there is a newer
producer with the same transactionalId, or the producer's transaction has
been expired by the broker.
        Suppressed: java.lang.IllegalStateException: Pending record count
must be zero at this point: 1
            at org.apache.flink.streaming.connectors.kafka.
FlinkKafkaProducer.flush(FlinkKafkaProducer.java:1090)
            at org.apache.flink.streaming.connectors.kafka.
FlinkKafkaProducer.close(FlinkKafkaProducer.java:925)
            at org.apache.flink.api.common.functions.util.FunctionUtils
.closeFunction(FunctionUtils.java:41)
            at org.apache.flink.streaming.api.operators.
AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
            at org.apache.flink.streaming.runtime.tasks.StreamTask
.disposeAllOperators(StreamTask.java:783)
            at org.apache.flink.streaming.runtime.tasks.StreamTask
.runAndSuppressThrowable(StreamTask.java:762)
            at org.apache.flink.streaming.runtime.tasks.StreamTask
.cleanUpInvoke(StreamTask.java:681)
            at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
StreamTask.java:585)
            at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755
)
            at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
            at java.base/java.lang.Thread.run(Unknown Source)
        Suppressed: java.lang.IllegalStateException: Pending record count
must be zero at this point: 1
            at org.apache.flink.streaming.connectors.kafka.
FlinkKafkaProducer.flush(FlinkKafkaProducer.java:1090)
            at org.apache.flink.streaming.connectors.kafka.
FlinkKafkaProducer.close(FlinkKafkaProducer.java:925)
            at org.apache.flink.api.common.functions.util.FunctionUtils
.closeFunction(FunctionUtils.java:41)
            at org.apache.flink.streaming.api.operators.
AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)
            at org.apache.flink.streaming.runtime.tasks.StreamTask
.disposeAllOperators(StreamTask.java:783)
            at org.apache.flink.streaming.runtime.tasks.StreamTask
.runAndSuppressThrowable(StreamTask.java:762)
            at org.apache.flink.streaming.runtime.tasks.StreamTask
.cleanUpInvoke(StreamTask.java:681)
            at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
StreamTask.java:585)
            at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755
)
            at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
            at java.base/java.lang.Thread.run(Unknown Source) Concerning
configurations, I have set the transaction.max.timeout.ms on the Kafka
server to one hour as advised here
https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-producers-and-fault-tolerance.
Additionally in the producer TRANSACTIONAL_ID_CONFIG variable to random
i.e. UUID.randomUUID().toString() Any ideas of why this would be the case?
Regards, Morgan.