You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Aljoscha Krettek (JIRA)" <ji...@apache.org> on 2017/10/23 13:48:00 UTC

[jira] [Created] (FLINK-7902) TwoPhaseCommitSinkFunctions should use custom TypeSerializer

Aljoscha Krettek created FLINK-7902:
---------------------------------------

             Summary: TwoPhaseCommitSinkFunctions should use custom TypeSerializer
                 Key: FLINK-7902
                 URL: https://issues.apache.org/jira/browse/FLINK-7902
             Project: Flink
          Issue Type: Bug
          Components: Kafka Connector
    Affects Versions: 1.4.0
            Reporter: Aljoscha Krettek
            Assignee: Piotr Nowojski
            Priority: Blocker
             Fix For: 1.4.0


Currently, the {{FlinkKafkaProducer011}} uses {{TypeInformation.of(new TypeHint<State<KafkaTransactionState, KafkaTransactionContext>>() {})}} to create a {{TypeInformation}} which in turn is used to create a {{StateDescriptor}} for the state that the Kafka sink stores.

Behind the scenes, this would be roughly analysed as a {{PojoType(GenericType<KafkaTransactionState>, GenericType<KafkaTransactionContext>)}} which means we don't have explicit control over the serialisation format and we also use Kryo (which is the default for {{GenericTypeInfo}}). This can be problematic if we want to evolve the state schema in the future or if we want to change Kryo versions.

We should change {{TwoPhaseCommitSinkFunction}} to only have this constructor:
{code}
public TwoPhaseCommitSinkFunction(TypeSerializer<State<TXN, CONTEXT>> stateSerializer) {
{code}
and we should then change the {{FlinkKafkaProducer011}} to hand in a custom-made {{TypeSerializer}} for the state.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)