You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2019/01/07 09:46:21 UTC

[GitHub] yanghua commented on a change in pull request #7405: [FLINK-11249] FlinkKafkaProducer011 can not be migrated to FlinkKafkaProducer

yanghua commented on a change in pull request #7405: [FLINK-11249] FlinkKafkaProducer011 can not be migrated to FlinkKafkaProducer
URL: https://github.com/apache/flink/pull/7405#discussion_r245596796
 
 

 ##########
 File path: flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
 ##########
 @@ -813,8 +820,18 @@ public void initializeState(FunctionInitializationContext context) throws Except
 			semantic = Semantic.NONE;
 		}
 
-		nextTransactionalIdHintState = context.getOperatorStateStore().getUnionListState(
+		ListState<NextTransactionalIdHint> oldNextTransactionalIdHintState = context.getOperatorStateStore().getUnionListState(
 			NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR);
+		nextTransactionalIdHintState = context.getOperatorStateStore().getUnionListState(NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR_V2);
+
+		//migrate and let the new state can be compatible with old state
+		if (oldNextTransactionalIdHintState != null && oldNextTransactionalIdHintState.get() != null) {
 
 Review comment:
   @pnowojski Yes, you are right. I thought that after the `State#clear` method was called, the next time getting the state would get `null`. Will fix it. Let us listen to @tzulitai 's option first and then see if we still use this solution.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services