You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2019/02/08 15:54:54 UTC

[GitHub] EAlexRojas commented on issue #7405: [FLINK-11249] FlinkKafkaProducer011 can not be migrated to FlinkKafkaProducer

EAlexRojas commented on issue #7405: [FLINK-11249] FlinkKafkaProducer011 can not be migrated to FlinkKafkaProducer
URL: https://github.com/apache/flink/pull/7405#issuecomment-461849076
 
 
   Hi @pnowojski, @yanghua, @tzulitai, @tvielgouarin 
   
   I took a look to this PR and I think the changes proposed are not enough to support the migration from `FlinkKafkaProducer011` to `FlinkKafkaProducer`.
   
   
   
   - First problem is that as the `NextTransactionalIdHintSerializer` extends `TypeSerializerSingleton`, the `ensureCompatibility` will return "Compatible" only if the class name is the same, which is not the case here because we have `FlinkKafkaProducer011.NextTransactionalIdHintSerializer` vs `FlinkKafkaProducer.NextTransactionalIdHintSerializer`.
   Now, `ensureCompatibility` is deprecated and new migration system should be used, by implementing the `snapshotConfiguration`method. With that, one could create a class implementing `TypeSerializerSnapshot` and use the `resolveSchemaCompatibility` method to take this into account and return "compatibleAfterMigration" and like that the states could be compatible.
   
   - Another problem is that the `context.getOperatorStateStore().getUnionListState()` method will create an empty state and will save the state descriptor even if it's empty. So when trying to migrate, even if the old state is empty, it would try to migrate it which will result in using the old migration system and result in incompatible state.
   For this, one could first check whether there is an old state to recover or not, and only when there is one, try to migrate it.
   
   
   I managed to solved this two problems, but the bigger problem here is that the KafkaProducer does not have only the NextTransactionalIdHint state, it also have the ListState inherited from the `TwoPhaseCommitSinkFunction` which includes the `KafkaTransactionState` and `KafkaTransactionContext`. 
   
   Taking the version from master branch which include an updated version of the `ContextStateSerializer` and `TransactionStateSerializer` implementing the `snapshotConfiguration` method,  one could adapt the TypeSerializer classes to Override the `resolveSchemaCompatibility` and also take into account the difference in class names prefixes and return "compatibleAfterMigration" when needed.
   
   With this we wouldn't have anymore the "org.apache.flink.util.StateMigrationException: The new state serializer for operator state must not be incompatible" errors.
   
   ---
   
   **BUT**, even when we could solve this part, the problem is that each KafkaProducer class have in their methods signature its own independent version of the `KafkaTransactionState` and `KafkaTransactionContext` and they cannot be cast to one another.
   
   Clarifying, when the `initializeState` gets called in the Kafka Producer, this will call the same method in the parent class, which will try to recover the transactions and context. Here it will recover the Transactions from `KafkaProducer011` and try to call the `recoverAndCommit` method of the universal `KafkaProducer` but as the transaction class cannot be cast, this will fail:
   ```
   java.lang.ClassCastException: org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011$KafkaTransactionState cannot be cast to org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer$KafkaTransactionState
   	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.recoverAndCommit(FlinkKafkaProducer.java:99)
   	at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.recoverAndCommitInternal(TwoPhaseCommitSinkFunction.java:393)
   	at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.initializeState(TwoPhaseCommitSinkFunction.java:351)
   ```
   
   I think the only way be able to migrate this would be to have a common class for `KafkaTransactionState` and `KafkaTransactionContext` that would be used for both Flink KafkaProducer... but I'm not sure this could be done without breaking compatibility...
   
   
   I performed the necessary refactoring to have a common class for each of the states and this seems to solve the migration issue, I performed limited tests, so this could have still problems...
   
   The final thing is that for this to work, one have to include both connectors dependencies `flink-connector-kafka-0.11_2.11` and `flink-connector-kafka_2.11`, which I expected to work without any issue... 
   But when I include both connectors in the job jar, I get an error in Kafka consumer side:
   ```
   java.lang.NoSuchMethodError: org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.<init>(Lorg/slf4j/Logger;Lorg/apache/flink/streaming/connectors/kafka/internal/Handover;Ljava/util/Properties;Lorg/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue;Ljava/lang/String;JZLorg/apache/flink/metrics/MetricGroup;Lorg/apache/flink/metrics/MetricGroup;)V
   	at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.<init>(KafkaFetcher.java:109)
   	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.createFetcher(FlinkKafkaConsumer.java:240)
   	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:647)
   ```
   
   Worth to say, I get this error even without the modifications in this PR...
   Maybe I'm doing something wrong... but if this latter issue could be solved and the refactoring I made is correct the migration may be possible...
   
   Please take a look to my analysis and correct me if I'm wrong.
   
   Also, you can take a look to the work I've done in https://github.com/EAlexRojas/flink/commit/3ef8328dffb076199776698add231684363615eb and https://github.com/EAlexRojas/flink/commit/8f6685a50c03c2a43cb3046b04f7190edbd3fe75
   
   Regards, 
   Edward

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services