You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Edward Rojas <ed...@gmail.com> on 2018/12/18 13:33:32 UTC

How to migrate Kafka Producer ?

Hi,

I'm planning to migrate from kafka connector 0.11 to the new universal kafka
connector 1.0.0+ but I'm having some troubles.

The kafka consumer seems to be compatible but when trying to migrate the
kafka producer I get an incompatibility error for the state migration. 
It looks like the producer uses a list state of type
"NextTransactionalIdHint", but this class is specific for each Producer
(FlinkKafkaProducer011.NextTransactionalIdHint  vs
FlinkKafkaProducer.NextTransactionalIdHint) and therefore the states are not
compatible.


I would like to know what is the recommended way to perform this kind of
migration without losing the state ?

Thanks in advance,
Edward



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: How to migrate Kafka Producer ?

Posted by Edward Alexander Rojas Clavijo <ed...@gmail.com>.
Hi Dawid, Piotr,

I see that for the kafka consumer base there are some migration tests here:
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java

As the kafka consumer state is managed on the FlinkKafkaConsumerBase class
I assumed this would cover also migration of connectors versions, but maybe
I'm missing something (?)

I performed some tests on my own and the migration of the kafka consumer
connector worked.


Regarding the kafka producer I am just updating the job with the new
connector and removing the previous one and upgrading the job by using a
savepoint and the --allowNonRestoredState.
So far my tests with this option are successful.

I appreciate any help here to clarify my understanding.

Regards,
Edward

El mié., 19 dic. 2018 a las 10:28, Dawid Wysakowicz (<dw...@apache.org>)
escribió:

> Hi Edward,
>
> AFAIK we do not support migrating state from one connector to another
> one, which is in fact the case for kafka 0.11 and the "universal" one.
>
> You might try to use the project bravo[1] to migrate the state manually,
> but unfortunately you have to understand the internals of both of the
> connectors. I pull also Piotr to the thread, maybe he can provide more
> straightforward workaround.
>
> Best,
>
> Dawid
>
> [1] https://github.com/king/bravo
>
> On 18/12/2018 14:33, Edward Rojas wrote:
> > Hi,
> >
> > I'm planning to migrate from kafka connector 0.11 to the new universal
> kafka
> > connector 1.0.0+ but I'm having some troubles.
> >
> > The kafka consumer seems to be compatible but when trying to migrate the
> > kafka producer I get an incompatibility error for the state migration.
> > It looks like the producer uses a list state of type
> > "NextTransactionalIdHint", but this class is specific for each Producer
> > (FlinkKafkaProducer011.NextTransactionalIdHint  vs
> > FlinkKafkaProducer.NextTransactionalIdHint) and therefore the states are
> not
> > compatible.
> >
> >
> > I would like to know what is the recommended way to perform this kind of
> > migration without losing the state ?
> >
> > Thanks in advance,
> > Edward
> >
> >
> >
> > --
> > Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>
>

Re: How to migrate Kafka Producer ?

Posted by Dawid Wysakowicz <dw...@apache.org>.
Hi Edward,

AFAIK we do not support migrating state from one connector to another
one, which is in fact the case for kafka 0.11 and the "universal" one.

You might try to use the project bravo[1] to migrate the state manually,
but unfortunately you have to understand the internals of both of the
connectors. I pull also Piotr to the thread, maybe he can provide more
straightforward workaround.

Best,

Dawid

[1] https://github.com/king/bravo

On 18/12/2018 14:33, Edward Rojas wrote:
> Hi,
>
> I'm planning to migrate from kafka connector 0.11 to the new universal kafka
> connector 1.0.0+ but I'm having some troubles.
>
> The kafka consumer seems to be compatible but when trying to migrate the
> kafka producer I get an incompatibility error for the state migration. 
> It looks like the producer uses a list state of type
> "NextTransactionalIdHint", but this class is specific for each Producer
> (FlinkKafkaProducer011.NextTransactionalIdHint  vs
> FlinkKafkaProducer.NextTransactionalIdHint) and therefore the states are not
> compatible.
>
>
> I would like to know what is the recommended way to perform this kind of
> migration without losing the state ?
>
> Thanks in advance,
> Edward
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: How to migrate Kafka Producer ?

Posted by Edward Rojas <ed...@gmail.com>.
Hi Piotr,

Thank you for looking into this. 
Do you have an idea when next version (1.7.2) will be available ?

Also, could you validate / invalidate the approach I proposed in the
previous comment ?


Edward Rojas wrote
> Regarding the kafka producer I am just updating the job with the new
> connector and removing the previous one and upgrading the job by using a
> savepoint and the --allowNonRestoredState. 
> So far my tests with this option are successful.

Is there any risk of using this approach and just ignore the state of the
previous Producer ?


Thanks again for your help.

Edward



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: How to migrate Kafka Producer ?

Posted by Piotr Nowojski <pi...@data-artisans.com>.
Hi Edward,

Sorry for coming back so late (because of holiday season).

You are unfortunately right. Our FlinkKafkaProducer should have been upgrade-able, but it is not. I have created a bug for this [1]. For the time being, until we fix the issue, you should be able to stick to 0.11 producer without noticeable negative effects. Our FlinkKafkaProducer011 has the same forward & backward compatibility as the universal FlinkKakfaProducer (The biggest change between two of them was just changing the naming convention), so you can use either of them with the same versions of Kafka brokers (0.10+).

Piotrek

[1] https://issues.apache.org/jira/browse/FLINK-11249

> On 18 Dec 2018, at 14:33, Edward Rojas <ed...@gmail.com> wrote:
> 
> Hi,
> 
> I'm planning to migrate from kafka connector 0.11 to the new universal kafka
> connector 1.0.0+ but I'm having some troubles.
> 
> The kafka consumer seems to be compatible but when trying to migrate the
> kafka producer I get an incompatibility error for the state migration. 
> It looks like the producer uses a list state of type
> "NextTransactionalIdHint", but this class is specific for each Producer
> (FlinkKafkaProducer011.NextTransactionalIdHint  vs
> FlinkKafkaProducer.NextTransactionalIdHint) and therefore the states are not
> compatible.
> 
> 
> I would like to know what is the recommended way to perform this kind of
> migration without losing the state ?
> 
> Thanks in advance,
> Edward
> 
> 
> 
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/