You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Timothy Victor <vi...@gmail.com> on 2019/03/08 12:28:09 UTC

Flink 1.7.1 KafkaProducer error using Exactly Once semantic

Yesterday I came across a weird problem when attempting to run 2 nearly
identical jobs on a cluster.  I was able to solve it (or rather workaround
it), but am sharing here so we can consider a potential fix in Flink's
KafkaProducer code.

My scenario is as follows.  I have a Flink program that reads from a Kafka
topic, does a simple Map() operation and writes to a Kafka topic with
exactly once semantic.  The source and sink topics are configurable, and
the Map() operation is also configurable (i.e. based on CLI arguments it
can choose between a set of Map() operations).  The job does not name any
of its operators (this part sounds trivial, right?...but read on).   I run
an instance of this program (job) on a Flink standalone cluster running
1.7.1.  The cluster has 12 TMs, each with 1 slot each.   So basically each
job will run in its own task slot/TM, and hence each job would run in its
own JVM process.  The job runs fine, checkpointing regularly and no
errors.   However, if I start another instance of the program (with
different source/sink topics), then within a few seconds the first one
fails, and enters recovery.   The first one will eventually fail (all
retries exhausted).   If I try to start the failed job again, then the
second job would fail within a few seconds.   So basically it looked like
one job was tripping over the other.   This was especially odd since each
job was running in essentially its own JVM process (i.e. Task Manager /
Task Slot).

Looking at the flink logs, I saw this error message: >> "
org.apache.kafka.common.errors.ProducerFencedException:
Producer attempted an operation with an old epoch. Either there is a newer
producer with the same transactionalId, or the producer's transaction has
been expired by the broker. "

So I looked at the transactionId - and saw that they were of the form:  "
transaction.id = Source: Custom Source -> Map -> Sink: Unamed-<guid>",
essentially the transaction.id is set to the description of the chained
operator followed by some GUID.   It is not clear to me how the GUID is
generated --- but essentially BOTH my jobs were using the same
transaction.id!

If I understand correctly, Flink uses a pool of KafkaProducers.  Each
KafkaProducer within the pool has a transaction.id associated with it.   I
think what is happening each of my jobs has its own pool of
KafkaProducers.  However, each producer in both pools essentially have the
same ID.  So like JobA.Pool: {P1, P2, P3},   JobB.Pool: {P1, P2, P3}.
 This sounds like it would not be a problem since each pool will live in
its own JVM process.  But since it does break, my conjecture is this --
with the way 2-phase commit works, in the _commit_ phase, I believe the JM
sends a signal to each operator to commit its state.   My guess is that
since the IDs collide, the Producer in one pool is told to commit the
transaction with an epoch for a producer in the other pool which happens to
be less than the last epoch for it.   Example P1 (in Job A) gets a message
to commit with epoch 0 that is actually meant for P1 (in Job B).   The only
other explanation I can think of is that these pools are in fact shared
between task managers -- but that's really hard to believe.

Is my understanding correct?

I was able to solve this by simply naming one of my operators so that the
transaction.id will be unique for each job.   Example,
JobA  transaction.id = "Source: Custom Source -> (JobA) -> Sink:
unamed-guid"
JobB transaction.id = "Source: Custom Source -> (JobB) -> Sink: unamed-guid"

After I did this - both jobs run successfully.

I think a good improvement would be to _not_ use the job graph description
as the transaction ID.   Maybe a simple approach is to require the user to
provide a pool identifier when using Exactly Once with Kafka.  At least
this would make it clear.

Thanks

Tim

Re: [EXTERNAL] Flink 1.7.1 KafkaProducer error using Exactly Once semantic

Posted by "Slotterback, Chris" <Ch...@comcast.com>.
Hi Timothy,

I recently faced a similar issue that spawned a bug discussion from the devs: https://issues.apache.org/jira/browse/FLINK-11654

As far as I can tell your understanding is correct, we also renamed the UID using the jobname to force uniqueness across identical jobs writing to the same broker. The downside for us is keeping the job names unique while dealing with things like multi datacenter and multiple dev/qa/prod environments.

As for your idea for requiring the identifier be set when using Semantic.EXACTLY_ONCE, I agree with you as it was not immediately obvious to us that there was a collision occurring between the jobs.

Chris

From: Timothy Victor <vi...@gmail.com>
Date: Friday, March 8, 2019 at 7:28 AM
To: user <us...@flink.apache.org>
Subject: [EXTERNAL] Flink 1.7.1 KafkaProducer error using Exactly Once semantic

Yesterday I came across a weird problem when attempting to run 2 nearly identical jobs on a cluster.  I was able to solve it (or rather workaround it), but am sharing here so we can consider a potential fix in Flink's KafkaProducer code.

My scenario is as follows.  I have a Flink program that reads from a Kafka topic, does a simple Map() operation and writes to a Kafka topic with exactly once semantic.  The source and sink topics are configurable, and the Map() operation is also configurable (i.e. based on CLI arguments it can choose between a set of Map() operations).  The job does not name any of its operators (this part sounds trivial, right?...but read on).   I run an instance of this program (job) on a Flink standalone cluster running 1.7.1.  The cluster has 12 TMs, each with 1 slot each.   So basically each job will run in its own task slot/TM, and hence each job would run in its own JVM process.  The job runs fine, checkpointing regularly and no errors.   However, if I start another instance of the program (with different source/sink topics), then within a few seconds the first one fails, and enters recovery.   The first one will eventually fail (all retries exhausted).   If I try to start the failed job again, then the second job would fail within a few seconds.   So basically it looked like one job was tripping over the other.   This was especially odd since each job was running in essentially its own JVM process (i.e. Task Manager / Task Slot).

Looking at the flink logs, I saw this error message: >> " org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker. "

So I looked at the transactionId - and saw that they were of the form:  "transaction.id<https://protect2.fireeye.com/url?k=c102d0208bbc7867.c102f794-a9477e1e1a29c738&u=http://transaction.id> = Source: Custom Source -> Map -> Sink: Unamed-<guid>", essentially the transaction.id<https://protect2.fireeye.com/url?k=c102d0208bbc7867.c102f794-a9477e1e1a29c738&u=http://transaction.id> is set to the description of the chained operator followed by some GUID.   It is not clear to me how the GUID is generated --- but essentially BOTH my jobs were using the same transaction.id<https://protect2.fireeye.com/url?k=c102d0208bbc7867.c102f794-a9477e1e1a29c738&u=http://transaction.id>!

If I understand correctly, Flink uses a pool of KafkaProducers.  Each KafkaProducer within the pool has a transaction.id<https://protect2.fireeye.com/url?k=c102d0208bbc7867.c102f794-a9477e1e1a29c738&u=http://transaction.id> associated with it.   I think what is happening each of my jobs has its own pool of KafkaProducers.  However, each producer in both pools essentially have the same ID.  So like JobA.Pool: {P1, P2, P3},   JobB.Pool: {P1, P2, P3}.   This sounds like it would not be a problem since each pool will live in its own JVM process.  But since it does break, my conjecture is this -- with the way 2-phase commit works, in the _commit_ phase, I believe the JM sends a signal to each operator to commit its state.   My guess is that since the IDs collide, the Producer in one pool is told to commit the transaction with an epoch for a producer in the other pool which happens to be less than the last epoch for it.   Example P1 (in Job A) gets a message to commit with epoch 0 that is actually meant for P1 (in Job B).   The only other explanation I can think of is that these pools are in fact shared between task managers -- but that's really hard to believe.

Is my understanding correct?

I was able to solve this by simply naming one of my operators so that the transaction.id<https://protect2.fireeye.com/url?k=c102d0208bbc7867.c102f794-a9477e1e1a29c738&u=http://transaction.id> will be unique for each job.   Example,
JobA  transaction.id<https://protect2.fireeye.com/url?k=c102d0208bbc7867.c102f794-a9477e1e1a29c738&u=http://transaction.id> = "Source: Custom Source -> (JobA) -> Sink: unamed-guid"
JobB transaction.id<https://protect2.fireeye.com/url?k=c102d0208bbc7867.c102f794-a9477e1e1a29c738&u=http://transaction.id> = "Source: Custom Source -> (JobB) -> Sink: unamed-guid"

After I did this - both jobs run successfully.

I think a good improvement would be to _not_ use the job graph description as the transaction ID.   Maybe a simple approach is to require the user to provide a pool identifier when using Exactly Once with Kafka.  At least this would make it clear.

Thanks

Tim