You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by kla <la...@gmail.com> on 2017/09/14 09:56:35 UTC

Change Kafka cluster without loosing the Flink's state

Hi guys, 

We have a running apache flink streaming job which interacts with apache
kafka (consumer and producer). 
Now we would like to change the kafka cluster without loosing Flink's state. 

Is it possible to do it ? If yes, what is the right way to do it ? 

Thanks in advance! 
Best, 
Konstantin 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Change Kafka cluster without loosing the Flink's state

Posted by kla <la...@gmail.com>.
Hi Gordon, 

Thanks again for your answer. 

But I am not sure if I understood this part:
"The workaround, for now, would be to explicitly disable chaining of the
consumer source with any stateful operators before taking the savepoint and
changing the operator UID."


So my code looks like this:

        DataStream<Model2> stream2 = env.addSource(new
FlinkKafkaConsumer(...)).uid(“some-unique-id”)
        DataStream<Model12> stream12 = stream2
                .connect(stream1)
                .keyBy(...)
                .flatMap(new CoFlatMapFunction()); // this is a stateful
operator

I have added uid() as you have suggested, but I got the following message:

Cannot map savepoint state for operator 5167ac19a7ef76538ac5476b6094bc29 to
the new program, because the operator is not available in the new program.
If you want to allow to skip this, you can set the --allowNonRestoredState
option on the CLI.

What should I do here ? 


Thanks,
Konstantin



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Change Kafka cluster without loosing the Flink's state

Posted by "Tzu-Li (Gordon) Tai" <tz...@apache.org>.
Simply like this:
env.addSource(new FlinkKafkaConsumer(...)).uid(“some-unique-id”)

The same goes for any other operator.

However, do keep in mind this bug that was just recently uncovered: https://issues.apache.org/jira/browse/FLINK-7623.
What I described in my previous reply would not work as expected if your Kafka consumer was chained to some stateful operator.
The workaround, for now, would be to explicitly disable chaining of the consumer source with any stateful operators before taking the savepoint and changing the operator UID.
If your consumer source is not chained with any stateful operator, then you can ignore this and safely proceed.

Cheers,
Gordon

On 14 September 2017 at 3:49:31 PM, kla (lalafaryan@gmail.com) wrote:

Hi Gordon,  

Thanks for your quick reply.  

I have following consumer:  

jobConfiguration.getEnv().addSource(  
new FlinkKafkaConsumer010<>(properties.getProperty(TOPIC), deserializer,  
properties));  

How can I set the UID for the consumer ?  

Thanks again for help!  

Regards,  
Konstantin  



--  
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/  

Re: Change Kafka cluster without loosing the Flink's state

Posted by kla <la...@gmail.com>.
Hi Gordon,

Thanks for your quick reply.

I have following consumer:

jobConfiguration.getEnv().addSource(
new FlinkKafkaConsumer010<>(properties.getProperty(TOPIC), deserializer,
properties));

How can I set the UID for the consumer ?

Thanks again for help!

Regards,
Konstantin



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Change Kafka cluster without loosing the Flink's state

Posted by "Tzu-Li (Gordon) Tai" <tz...@apache.org>.
Hi Konstantin,

After migrating and connecting to the new Kafka cluster, do you want the Kafka consumer to start fresh without any partition offset state (and therefore will re-establish its partition-to-subtask assignments), while keeping all other operator state in the pipeline intact?
If so, that is definitely possible. Simply explicitly set a new different operator UID for the FlinkKafkaConsumer. When the savepoint is restored, the consumer will not be restored with any state since it will be recognized by Flink as a new operator in the pipeline without any state.

Cheers,
Gordon

On 14 September 2017 at 11:56:38 AM, kla (lalafaryan@gmail.com) wrote:

Hi guys, 

We have a running apache flink streaming job which interacts with apache 
kafka (consumer and producer). 
Now we would like to change the kafka cluster without loosing Flink's state. 

Is it possible to do it ? If yes, what is the right way to do it ? 

Thanks in advance! 
Best, 
Konstantin 



-- 
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/