You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Piotr Nowojski (JIRA)" <ji...@apache.org> on 2017/06/22 17:16:00 UTC

[jira] [Comment Edited] (FLINK-6988) Add Apache Kafka 0.11 connector

    [ https://issues.apache.org/jira/browse/FLINK-6988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16059699#comment-16059699 ] 

Piotr Nowojski edited comment on FLINK-6988 at 6/22/17 5:15 PM:
----------------------------------------------------------------

Unfortunately KafkaProducer's API is very limited. Especially it doesn't allow to implement two phase commit protocol like it is done in BucketingSink, because it doesn't allow for neither resuming nor committing transactions from different workers after crash (last bullet point above). This is because every time user calls `Producer::initTransactions()`, all pending (not committed) transactions are being automatically aborted by Kafka Server. Calling `Producer::initTransactions()` is neccessary to obtain `producerId` and `epoch` values from the Kafka server, which are crucial for manipulating transactions.

Fortunately there is a walk around for this issue. It seems like Kafka's REST API is more flexible and we should be possible to resume transactions. Every time we begin transaction we can store `producerId` and `epoch` on the state. In case we need to commit pending transaction on another worker (after crash), instead of calling `KafkaProducer::initTransactions()` we can restore `producerId` and `epoch` from the state and commit this pending transaction using those restored values.

"Hacky" part is that  `producerId` and `epoch` values are hidden behind private fields in package private classes. That means we can not overload `KafkaProducer` to obtain or set them. That leaves as with two options. We either reimplement KafkaProducer using Kafka's REST API (we could copy/paste most of their code) or we use JVM reflection to manually manipulate official KafkaProducer class.


was (Author: pnowojski):
Unfortunately KafkaProducer's API is very limited. Especially it doesn't allow to implement two phase commit protocol like it is done in BucketingSink, because it doesn't allow for neither resuming nor committing transactions from different workers after crash (last bullet point above). This is because every time user calls `Producer::initTransactions()`, all pending (not committed) transactions are being automatically aborted by Kafka Server. Calling `Producer::initTransactions()` is neccessary to obtain `producerId` and `epoch` values from the Kafka server, which are crucial for manipulating transactions.

Fortunately there is a walk around this issue. It seems like Kafka's REST API is more flexible and we should be possible to resume transactions. Every time we begin transaction we can store `producerId` and `epoch` on the state. In case we need to commit pending transaction on another worker (after crash), instead of calling `KafkaProducer::initTransactions()` we can restore `producerId` and `epoch` from the state and commit this pending transaction using those restored values.

"Hacky" part is that  `producerId` and `epoch` values are hidden behind private fields in package private classes. That means we can not overload `KafkaProducer` to obtain or set them. That leaves as with two options. We either reimplement KafkaProducer using Kafka's REST API (we could copy/paste most of their code) or we use JVM reflection to manually manipulate official KafkaProducer class.

> Add Apache Kafka 0.11 connector
> -------------------------------
>
>                 Key: FLINK-6988
>                 URL: https://issues.apache.org/jira/browse/FLINK-6988
>             Project: Flink
>          Issue Type: Improvement
>          Components: Kafka Connector
>    Affects Versions: 1.3.1
>            Reporter: Piotr Nowojski
>            Assignee: Piotr Nowojski
>
> Kafka 0.11 (it will be released very soon) add supports for transactions. Thanks to that, Flink might be able to implement Kafka sink supporting "exactly-once" semantic. API changes and whole transactions support is described in [KIP-98|https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging].
> The goal is to mimic implementation of existing BucketingSink. New FlinkKafkaProducer011 would 
> * upon creation begin transaction, store transaction identifiers into the state and would write all incoming data to an output Kafka topic using that transaction
> * on `snapshotState` call, it would flush the data and write in state information that current transaction is pending to be committed
> * on `notifyCheckpointComplete` we would commit this pending transaction
> * in case of crash between `snapshotState` and `notifyCheckpointComplete` we either abort this pending transaction (if not every participant successfully saved the snapshot) or restore and commit it. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)