You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Fuyao Li <fu...@oracle.com> on 2022/02/09 23:00:49 UTC

Use TwoPhaseCommitSinkFunction or StatefulSink+TwoPhaseCommittingSink to achieve EXACTLY_ONCE sink?

Hello Community,


I have two questions regarding Flink custom sink with EXACTLY_ONCE semantic.


  1.  I have a SDK that could publish messages based on HTTP (backed by Oracle Streaming Service --- very similar to Kafka).  This will be my Flink application’s sink. Is it possible to use this SDK as sink with EXACTLY_ONCE semantic? HTTP is stateless here… If possible, what could be added in SDK to support EXACTLY_ONCE?
  2.  If it is possible for question 1, then I need to implement a custom sink for this. Which option should I use?
     *   Option 1: TwoPhaseCommitSinkFunction<https://nightlies.apache.org/flink/flink-docs-release-1.4/api/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.html>
     *   Option 2: StatefulSink<https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/connector/sink2/StatefulSink.java> + TwoPhaseCommittingSink<https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/connector/sink2/TwoPhaseCommittingSink.java>

The legacy FlinkKafkaProducer seems to be using option (a) ---- This will be removed from Flink in the future. The new KafkaSink<https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/kafka/#kafka-sink> seems to be using option (b). Based on the comment in the code, it seems option (a) is recommended, which one should I use? Please suggest if I am missing anything, or any other better solutions in my case?


Thanks,
Fuyao






Re: Re: [External] : Re: Use TwoPhaseCommitSinkFunction or StatefulSink+TwoPhaseCommittingSink to achieve EXACTLY_ONCE sink?

Posted by Fuyao Li <fu...@oracle.com>.
Hi Yun,

Thanks for the reply! This is very helpful.
For the Sink interface, I checked ReducingUpsertSink<https://github.com/apache/flink/blob/release-1.14/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertSink.java> implementation. If I use the new Sink interface, for these committable/transaction related interfaces and classes, I think I can just create some dummy class like this one as a place holder and leave it empty like this example, this should effectively achieve AT_LEAST_ONCE, right? Thanks.

Best regards,
Fuyao


From: Yun Gao <yu...@aliyun.com>
Date: Wednesday, February 16, 2022 at 00:54
To: Fuyao Li <fu...@oracle.com>, user <us...@flink.apache.org>
Subject: Re: Re: [External] : Re: Use TwoPhaseCommitSinkFunction or StatefulSink+TwoPhaseCommittingSink to achieve EXACTLY_ONCE sink?
Hi Fuyao,

Very sorry for the late reply.

For the question 1, I think it would not cause data corruption: in Flink the checkpoint is achived via
inserting barriers into the stream of normal records, and the snapshot is taken in the same thread with
the record processing. Thus the snapshot of the operators would always at the boundary of the records.

For the question 3, if the outside system does not support transaction, there are might two other ways to
implement the exactly-once semantics:

1. If the record always has a key and the external systems support deduplication, then it might be possible to
use AT_LEAST_ONCE sinks and let the external system to deduplicate the records.
2. Another possible method to reduce the requirements on the external systems is to use WAL sinks: the record might
be first written into some external systems (like file system) as a kind of logs. Once a checkpoint succeed, we could
then write the records before this checkpoint into the external systems. It needs note that writting these records into the
external systems must also be retriable: the Flink jobs might still fail during writting and after restarted, the writting should
restarted exactly from the next record. This required the external system have some method to query the offset of the currently
written records.


For AT_LEASE_ONCE sink RichSinkFunction<https://urldefense.com/v3/__https:/github.com/apache/flink/blob/release-1.14/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/RichSinkFunction.java__;!!ACWV5N9M2RV99hQ!diEOgJTtuymyjqSpWaJPIgIfBq7rmdNshNbUNNERhBh6gxIDkJUHbD-XBWt9Bsk$> should also works, but if possible we still recommend to use the new sink API~


Best,
Yun


------------------Original Mail ------------------
Sender:Fuyao Li <fu...@oracle.com>
Send Date:Tue Feb 15 08:26:32 2022
Recipients:Yun Gao <yu...@aliyun.com>, user <us...@flink.apache.org>
Subject:Re: [External] : Re: Use TwoPhaseCommitSinkFunction or StatefulSink+TwoPhaseCommittingSink to achieve EXACTLY_ONCE sink?
Hi Yun,

Please ignore my question 2. I think the Sink part is the decisive factor to ensure end to end exactly once.

If I want to implement a AT LEAST ONCE sink, which interface should I implement? MaybeRichSinkFunction<https://urldefense.com/v3/__https:/github.com/apache/flink/blob/release-1.14/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/RichSinkFunction.java__;!!ACWV5N9M2RV99hQ!diEOgJTtuymyjqSpWaJPIgIfBq7rmdNshNbUNNERhBh6gxIDkJUHbD-XBWt9Bsk$> should be enough? Any suggestions on this?

For question 3, maybe I can add some deduplication code at consumer side to uptake the AT_LEASE_ONCE sink produced messages. If OSS doesn’t support exactly once semantic, it seems impossible for me to handle it at Flink code side.

Thanks,
Fuyao

From:Fuyao Li <fu...@oracle.com>
Date: Thursday, February 10, 2022 at 15:48
To: Yun Gao <yu...@aliyun.com>, user <us...@flink.apache.org>
Subject: Re: [External] : Re: Use TwoPhaseCommitSinkFunction or StatefulSink+TwoPhaseCommittingSink to achieve EXACTLY_ONCE sink?
Hello Yun,

Thanks for the quick response. This is really helpful.

I have confirmed with Oracle Streaming Service (OSS) that they currently don’t support EXACTLY_ONCE semantic, only AT_LEAST_ONCE semantic works. They suggest to add some deduplicate mechanisms at Sink to mitigate the issue.

Question 1:
So the scenario should looks like this:
When the flink application restarts after it fails, it will start from this checkpoint offset. The messages has been processed after the checkpoint before the failure will be processed twice here after the restart. Is there any chance of data corruption here, for example, breaking the window and sending out incomplete records? I am using some session windows based on DataStream event time timers.

Question 2:
For the KafkaSource<https://urldefense.com/v3/__https:/nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/kafka/*kafka-source__;Iw!!ACWV5N9M2RV99hQ!diEOgJTtuymyjqSpWaJPIgIfBq7rmdNshNbUNNERhBh6gxIDkJUHbD-XDo8FpLM$>, I noticed that we don’t have a place to configure the semantic? Maybe enabling the checkpoint with EXACTLY_ONCE should guarantee the source’s exactly once semantic here? Please correct me if I am wrong here.

Question 3:
To guarantee the end-to-end exactly once, I think we must make sure the sink is exactly once, right? Since OSS has such limitation,is it possible to achieve effective EXACTLY_ONCE semantic through additional logic at Flink side since I can’t do too much on OSS side? Or it is technically impossible?
If possible, I think I should implement theSink<https://urldefense.com/v3/__https:/github.com/apache/flink/blob/release-1.14/flink-core/src/main/java/org/apache/flink/api/connector/sink/Sink.java__;!!ACWV5N9M2RV99hQ!diEOgJTtuymyjqSpWaJPIgIfBq7rmdNshNbUNNERhBh6gxIDkJUHbD-XFpo5StE$> you mentioned.

Thank you very much for the help!
Fuyao


From:Yun Gao <yu...@aliyun.com>
Date: Wednesday, February 9, 2022 at 23:17
To: Fuyao Li <fu...@oracle.com>, user <us...@flink.apache.org>
Subject: [External] : Re: Use TwoPhaseCommitSinkFunction or StatefulSink+TwoPhaseCommittingSink to achieve EXACTLY_ONCE sink?
Hi Fuyao,

Logically if a system want to support end-to-end exactly once,
it should support transactions:
1. The transactions hold the records, it get pre-committed on snapshot state
and get committed on checkpont succeed.
2. The transaction should still be able to be aborted after pre-committed.
3. Once pre-committed, the transactions must be able to be committed, even if
the flink jobs fails between pre-committed and committed, after the job restarted
these transaction should be able to be committed again.

If the external system meet such conditions, to implement an exactly-once sink,
the option b) should be more recommend. However, these interface is newly added
in the upcoming 1.15 and it might need to be wait for about 1.5 month before releasing.

An early version for option b is the org.apache.flink.api.connector.sink.Sink. It is much
similar to the option b) and are supported since 1.13. It would still be supported in the
next several releases and  it also be able to be migrated to the option b) easily.

Best,
Yun


------------------Original Mail ------------------
Sender:Fuyao Li <fu...@oracle.com>
Send Date:Thu Feb 10 07:01:51 2022
Recipients:user <us...@flink.apache.org>
Subject:Use TwoPhaseCommitSinkFunction or StatefulSink+TwoPhaseCommittingSink to achieve EXACTLY_ONCE sink?
Hello Community,


I have two questions regarding Flink custom sink with EXACTLY_ONCE semantic.


  1.  I have a SDK that could publish messages based on HTTP (backed by Oracle Streaming Service --- very similar to Kafka).  This will be my Flink application’s sink. Is it possible to use this SDK as sink with EXACTLY_ONCE semantic? HTTP is stateless here… If possible, what could be added in SDK to support EXACTLY_ONCE?
  2.  If it is possible for question 1, then I need to implement a custom sink for this. Which option should I use?

     *   Option 1:TwoPhaseCommitSinkFunction<https://urldefense.com/v3/__https:/nightlies.apache.org/flink/flink-docs-release-1.4/api/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.html__;!!ACWV5N9M2RV99hQ!ckD7MXv5zSgCh50yzgUgsG8AZAcjIVIFyjt7kyfvHveN21sjsr2gOqXjIAX36vY$>
     *   Option 2:StatefulSink<https://urldefense.com/v3/__https:/github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/connector/sink2/StatefulSink.java__;!!ACWV5N9M2RV99hQ!ckD7MXv5zSgCh50yzgUgsG8AZAcjIVIFyjt7kyfvHveN21sjsr2gOqXj5cKfxA0$> + TwoPhaseCommittingSink<https://urldefense.com/v3/__https:/github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/connector/sink2/TwoPhaseCommittingSink.java__;!!ACWV5N9M2RV99hQ!ckD7MXv5zSgCh50yzgUgsG8AZAcjIVIFyjt7kyfvHveN21sjsr2gOqXjA088CIg$>

The legacy FlinkKafkaProducer seems to be using option (a) ---- This will be removed from Flink in the future. The newKafkaSink<https://urldefense.com/v3/__https:/nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/kafka/*kafka-sink__;Iw!!ACWV5N9M2RV99hQ!ckD7MXv5zSgCh50yzgUgsG8AZAcjIVIFyjt7kyfvHveN21sjsr2gOqXj02BG7zk$> seems to be using option (b). Based on the comment in the code, it seems option (a) is recommended, which one should I use? Please suggest if I am missing anything, or any other better solutions in my case?


Thanks,
Fuyao






Re: Re: [External] : Re: Use TwoPhaseCommitSinkFunction or StatefulSink+TwoPhaseCommittingSink to achieve EXACTLY_ONCE sink?

Posted by Yun Gao <yu...@aliyun.com>.
Hi Fuyao,

Very sorry for the late reply. 

For the question 1, I think it would not cause data corruption: in Flink the checkpoint is achived via 
inserting barriers into the stream of normal records, and the snapshot is taken in the same thread with 
the record processing. Thus the snapshot of the operators would always at the boundary of the records.

For the question 3, if the outside system does not support transaction, there are might two other ways to 
implement the exactly-once semantics:

1. If the record always has a key and the external systems support deduplication, then it might be possible to 
use AT_LEAST_ONCE sinks and let the external system to deduplicate the records. 
2. Another possible method to reduce the requirements on the external systems is to use WAL sinks: the record might
be first written into some external systems (like file system) as a kind of logs. Once a checkpoint succeed, we could
then write the records before this checkpoint into the external systems. It needs note that writting these records into the
external systems must also be retriable: the Flink jobs might still fail during writting and after restarted, the writting should
restarted exactly from the next record. This required the external system have some method to query the offset of the currently
written records.

For AT_LEASE_ONCE sink RichSinkFunction should also works, but if possible we still recommend to use the new sink API~

Best,
Yun



 ------------------Original Mail ------------------
Sender:Fuyao Li <fu...@oracle.com>
Send Date:Tue Feb 15 08:26:32 2022
Recipients:Yun Gao <yu...@aliyun.com>, user <us...@flink.apache.org>
Subject:Re: [External] : Re: Use TwoPhaseCommitSinkFunction or StatefulSink+TwoPhaseCommittingSink to achieve EXACTLY_ONCE sink?

Hi Yun,

Please ignore my question 2. I think the Sink part is the decisive factor to ensure end to end exactly once.

If I want to implement a AT LEAST ONCE sink, which interface should I implement? MaybeRichSinkFunction should be enough? Any suggestions on this?

For question 3, maybe I can add some deduplication code at consumer side to uptake the AT_LEASE_ONCE sink produced messages. If OSS doesn’t support exactly once semantic, it seems impossible for me to handle it at Flink code side.

Thanks,
Fuyao

From:Fuyao Li <fu...@oracle.com>
Date: Thursday, February 10, 2022 at 15:48
To: Yun Gao <yu...@aliyun.com>, user <us...@flink.apache.org>
Subject: Re: [External] : Re: Use TwoPhaseCommitSinkFunction or StatefulSink+TwoPhaseCommittingSink to achieve EXACTLY_ONCE sink?
Hello Yun,

Thanks for the quick response. This is really helpful.

I have confirmed with Oracle Streaming Service (OSS) that they currently don’t support EXACTLY_ONCE semantic, only AT_LEAST_ONCE semantic works. They suggest to add some deduplicate mechanisms at Sink to mitigate the issue. 

Question 1:
So the scenario should looks like this:
When the flink application restarts after it fails, it will start from this checkpoint offset. The messages has been processed after the checkpoint before the failure will be processed twice here after the restart. Is there any chance of data corruption here, for example, breaking the window and sending out incomplete records? I am using some session windows based on DataStream event time timers.

Question 2:
For the KafkaSource, I noticed that we don’t have a place to configure the semantic? Maybe enabling the checkpoint with EXACTLY_ONCE should guarantee the source’s exactly once semantic here? Please correct me if I am wrong here.

Question 3:
To guarantee the end-to-end exactly once, I think we must make sure the sink is exactly once, right? Since OSS has such limitation,is it possible to achieve effective EXACTLY_ONCE semantic through additional logic at Flink side since I can’t do too much on OSS side? Or it is technically impossible?
If possible, I think I should implement theSink you mentioned.

Thank you very much for the help!
Fuyao


From:Yun Gao <yu...@aliyun.com>
Date: Wednesday, February 9, 2022 at 23:17
To: Fuyao Li <fu...@oracle.com>, user <us...@flink.apache.org>
Subject: [External] : Re: Use TwoPhaseCommitSinkFunction or StatefulSink+TwoPhaseCommittingSink to achieve EXACTLY_ONCE sink?
Hi Fuyao,
 
Logically if a system want to support end-to-end exactly once,
it should support transactions:
1. The transactions hold the records, it get pre-committed on snapshot state
and get committed on checkpont succeed.
2. The transaction should still be able to be aborted after pre-committed.
3. Once pre-committed, the transactions must be able to be committed, even if
the flink jobs fails between pre-committed and committed, after the job restarted
these transaction should be able to be committed again. 
 
If the external system meet such conditions, to implement an exactly-once sink, 
the option b) should be more recommend. However, these interface is newly added
in the upcoming 1.15 and it might need to be wait for about 1.5 month before releasing.
 
An early version for option b is the org.apache.flink.api.connector.sink.Sink. It is much 
similar to the option b) and are supported since 1.13. It would still be supported in the
next several releases and  it also be able to be migrated to the option b) easily. 
 
Best,
Yun
 
 
------------------Original Mail ------------------
Sender:Fuyao Li <fu...@oracle.com>
Send Date:Thu Feb 10 07:01:51 2022
Recipients:user <us...@flink.apache.org>
Subject:Use TwoPhaseCommitSinkFunction or StatefulSink+TwoPhaseCommittingSink to achieve EXACTLY_ONCE sink?

Hello Community,
 
 
I have two questions regarding Flink custom sink with EXACTLY_ONCE semantic.
 
I have a SDK that could publish messages based on HTTP (backed by Oracle Streaming Service --- very similar to Kafka).  This will be my Flink application’s sink. Is it possible to use this SDK as sink with EXACTLY_ONCE semantic? HTTP is stateless here… If possible, what could be added in SDK to support EXACTLY_ONCE?
If it is possible for question 1, then I need to implement a custom sink for this. Which option should I use?
Option 1:TwoPhaseCommitSinkFunction
Option 2:StatefulSink + TwoPhaseCommittingSink
The legacy FlinkKafkaProducer seems to be using option (a) ---- This will be removed from Flink in the future. The newKafkaSink seems to be using option (b). Based on the comment in the code, it seems option (a) is recommended, which one should I use? Please suggest if I am missing anything, or any other better solutions in my case?
 
 
Thanks,
Fuyao
 
 
 
 

Re: [External] : Re: Use TwoPhaseCommitSinkFunction or StatefulSink+TwoPhaseCommittingSink to achieve EXACTLY_ONCE sink?

Posted by Fuyao Li <fu...@oracle.com>.
Hi Yun,

Please ignore my question 2. I think the Sink part is the decisive factor to ensure end to end exactly once.

If I want to implement a AT LEAST ONCE sink, which interface should I implement? Maybe RichSinkFunction<https://github.com/apache/flink/blob/release-1.14/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/RichSinkFunction.java> should be enough? Any suggestions on this?

For question 3, maybe I can add some deduplication code at consumer side to uptake the AT_LEASE_ONCE sink produced messages. If OSS doesn’t support exactly once semantic, it seems impossible for me to handle it at Flink code side.

Thanks,
Fuyao

From: Fuyao Li <fu...@oracle.com>
Date: Thursday, February 10, 2022 at 15:48
To: Yun Gao <yu...@aliyun.com>, user <us...@flink.apache.org>
Subject: Re: [External] : Re: Use TwoPhaseCommitSinkFunction or StatefulSink+TwoPhaseCommittingSink to achieve EXACTLY_ONCE sink?
Hello Yun,

Thanks for the quick response. This is really helpful.

I have confirmed with Oracle Streaming Service (OSS) that they currently don’t support EXACTLY_ONCE semantic, only AT_LEAST_ONCE semantic works. They suggest to add some deduplicate mechanisms at Sink to mitigate the issue.

Question 1:
So the scenario should looks like this:
When the flink application restarts after it fails, it will start from this checkpoint offset. The messages has been processed after the checkpoint before the failure will be processed twice here after the restart. Is there any chance of data corruption here, for example, breaking the window and sending out incomplete records? I am using some session windows based on DataStream event time timers.

Question 2:
For the KafkaSource<https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/kafka/#kafka-source>, I noticed that we don’t have a place to configure the semantic? Maybe enabling the checkpoint with EXACTLY_ONCE should guarantee the source’s exactly once semantic here? Please correct me if I am wrong here.

Question 3:
To guarantee the end-to-end exactly once, I think we must make sure the sink is exactly once, right? Since OSS has such limitation, is it possible to achieve effective EXACTLY_ONCE semantic through additional logic at Flink side since I can’t do too much on OSS side? Or it is technically impossible?
If possible, I think I should implement the Sink<https://github.com/apache/flink/blob/release-1.14/flink-core/src/main/java/org/apache/flink/api/connector/sink/Sink.java> you mentioned.

Thank you very much for the help!
Fuyao


From: Yun Gao <yu...@aliyun.com>
Date: Wednesday, February 9, 2022 at 23:17
To: Fuyao Li <fu...@oracle.com>, user <us...@flink.apache.org>
Subject: [External] : Re: Use TwoPhaseCommitSinkFunction or StatefulSink+TwoPhaseCommittingSink to achieve EXACTLY_ONCE sink?
Hi Fuyao,

Logically if a system want to support end-to-end exactly once,
it should support transactions:
1. The transactions hold the records, it get pre-committed on snapshot state
and get committed on checkpont succeed.
2. The transaction should still be able to be aborted after pre-committed.
3. Once pre-committed, the transactions must be able to be committed, even if
the flink jobs fails between pre-committed and committed, after the job restarted
these transaction should be able to be committed again.

If the external system meet such conditions, to implement an exactly-once sink,
the option b) should be more recommend. However, these interface is newly added
in the upcoming 1.15 and it might need to be wait for about 1.5 month before releasing.

An early version for option b is the org.apache.flink.api.connector.sink.Sink. It is much
similar to the option b) and are supported since 1.13. It would still be supported in the
next several releases and  it also be able to be migrated to the option b) easily.

Best,
Yun


------------------Original Mail ------------------
Sender:Fuyao Li <fu...@oracle.com>
Send Date:Thu Feb 10 07:01:51 2022
Recipients:user <us...@flink.apache.org>
Subject:Use TwoPhaseCommitSinkFunction or StatefulSink+TwoPhaseCommittingSink to achieve EXACTLY_ONCE sink?
Hello Community,


I have two questions regarding Flink custom sink with EXACTLY_ONCE semantic.


  1.  I have a SDK that could publish messages based on HTTP (backed by Oracle Streaming Service --- very similar to Kafka).  This will be my Flink application’s sink. Is it possible to use this SDK as sink with EXACTLY_ONCE semantic? HTTP is stateless here… If possible, what could be added in SDK to support EXACTLY_ONCE?
  2.  If it is possible for question 1, then I need to implement a custom sink for this. Which option should I use?

     *   Option 1:TwoPhaseCommitSinkFunction<https://urldefense.com/v3/__https:/nightlies.apache.org/flink/flink-docs-release-1.4/api/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.html__;!!ACWV5N9M2RV99hQ!ckD7MXv5zSgCh50yzgUgsG8AZAcjIVIFyjt7kyfvHveN21sjsr2gOqXjIAX36vY$>
     *   Option 2:StatefulSink<https://urldefense.com/v3/__https:/github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/connector/sink2/StatefulSink.java__;!!ACWV5N9M2RV99hQ!ckD7MXv5zSgCh50yzgUgsG8AZAcjIVIFyjt7kyfvHveN21sjsr2gOqXj5cKfxA0$> + TwoPhaseCommittingSink<https://urldefense.com/v3/__https:/github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/connector/sink2/TwoPhaseCommittingSink.java__;!!ACWV5N9M2RV99hQ!ckD7MXv5zSgCh50yzgUgsG8AZAcjIVIFyjt7kyfvHveN21sjsr2gOqXjA088CIg$>

The legacy FlinkKafkaProducer seems to be using option (a) ---- This will be removed from Flink in the future. The newKafkaSink<https://urldefense.com/v3/__https:/nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/kafka/*kafka-sink__;Iw!!ACWV5N9M2RV99hQ!ckD7MXv5zSgCh50yzgUgsG8AZAcjIVIFyjt7kyfvHveN21sjsr2gOqXj02BG7zk$> seems to be using option (b). Based on the comment in the code, it seems option (a) is recommended, which one should I use? Please suggest if I am missing anything, or any other better solutions in my case?


Thanks,
Fuyao






Re: [External] : Re: Use TwoPhaseCommitSinkFunction or StatefulSink+TwoPhaseCommittingSink to achieve EXACTLY_ONCE sink?

Posted by Fuyao Li <fu...@oracle.com>.
Hello Yun,

Thanks for the quick response. This is really helpful.

I have confirmed with Oracle Streaming Service (OSS) that they currently don’t support EXACTLY_ONCE semantic, only AT_LEAST_ONCE semantic works. They suggest to add some deduplicate mechanisms at Sink to mitigate the issue.

Question 1:
So the scenario should looks like this:
When the flink application restarts after it fails, it will start from this checkpoint offset. The messages has been processed after the checkpoint before the failure will be processed twice here after the restart. Is there any chance of data corruption here, for example, breaking the window and sending out incomplete records? I am using some session windows based on DataStream event time timers.

Question 2:
For the KafkaSource<https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/kafka/#kafka-source>, I noticed that we don’t have a place to configure the semantic? Maybe enabling the checkpoint with EXACTLY_ONCE should guarantee the source’s exactly once semantic here? Please correct me if I am wrong here.

Question 3:
To guarantee the end-to-end exactly once, I think we must make sure the sink is exactly once, right? Since OSS has such limitation, is it possible to achieve effective EXACTLY_ONCE semantic through additional logic at Flink side since I can’t do too much on OSS side? Or it is technically impossible?
If possible, I think I should implement the Sink<https://github.com/apache/flink/blob/release-1.14/flink-core/src/main/java/org/apache/flink/api/connector/sink/Sink.java> you mentioned.

Thank you very much for the help!
Fuyao


From: Yun Gao <yu...@aliyun.com>
Date: Wednesday, February 9, 2022 at 23:17
To: Fuyao Li <fu...@oracle.com>, user <us...@flink.apache.org>
Subject: [External] : Re: Use TwoPhaseCommitSinkFunction or StatefulSink+TwoPhaseCommittingSink to achieve EXACTLY_ONCE sink?
Hi Fuyao,

Logically if a system want to support end-to-end exactly once,
it should support transactions:
1. The transactions hold the records, it get pre-committed on snapshot state
and get committed on checkpont succeed.
2. The transaction should still be able to be aborted after pre-committed.
3. Once pre-committed, the transactions must be able to be committed, even if
the flink jobs fails between pre-committed and committed, after the job restarted
these transaction should be able to be committed again.

If the external system meet such conditions, to implement an exactly-once sink,
the option b) should be more recommend. However, these interface is newly added
in the upcoming 1.15 and it might need to be wait for about 1.5 month before releasing.

An early version for option b is the org.apache.flink.api.connector.sink.Sink. It is much
similar to the option b) and are supported since 1.13. It would still be supported in the
next several releases and  it also be able to be migrated to the option b) easily.

Best,
Yun


------------------Original Mail ------------------
Sender:Fuyao Li <fu...@oracle.com>
Send Date:Thu Feb 10 07:01:51 2022
Recipients:user <us...@flink.apache.org>
Subject:Use TwoPhaseCommitSinkFunction or StatefulSink+TwoPhaseCommittingSink to achieve EXACTLY_ONCE sink?
Hello Community,


I have two questions regarding Flink custom sink with EXACTLY_ONCE semantic.


  1.  I have a SDK that could publish messages based on HTTP (backed by Oracle Streaming Service --- very similar to Kafka).  This will be my Flink application’s sink. Is it possible to use this SDK as sink with EXACTLY_ONCE semantic? HTTP is stateless here… If possible, what could be added in SDK to support EXACTLY_ONCE?
  2.  If it is possible for question 1, then I need to implement a custom sink for this. Which option should I use?

     *   Option 1:TwoPhaseCommitSinkFunction<https://urldefense.com/v3/__https:/nightlies.apache.org/flink/flink-docs-release-1.4/api/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.html__;!!ACWV5N9M2RV99hQ!ckD7MXv5zSgCh50yzgUgsG8AZAcjIVIFyjt7kyfvHveN21sjsr2gOqXjIAX36vY$>
     *   Option 2:StatefulSink<https://urldefense.com/v3/__https:/github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/connector/sink2/StatefulSink.java__;!!ACWV5N9M2RV99hQ!ckD7MXv5zSgCh50yzgUgsG8AZAcjIVIFyjt7kyfvHveN21sjsr2gOqXj5cKfxA0$> + TwoPhaseCommittingSink<https://urldefense.com/v3/__https:/github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/connector/sink2/TwoPhaseCommittingSink.java__;!!ACWV5N9M2RV99hQ!ckD7MXv5zSgCh50yzgUgsG8AZAcjIVIFyjt7kyfvHveN21sjsr2gOqXjA088CIg$>

The legacy FlinkKafkaProducer seems to be using option (a) ---- This will be removed from Flink in the future. The newKafkaSink<https://urldefense.com/v3/__https:/nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/kafka/*kafka-sink__;Iw!!ACWV5N9M2RV99hQ!ckD7MXv5zSgCh50yzgUgsG8AZAcjIVIFyjt7kyfvHveN21sjsr2gOqXj02BG7zk$> seems to be using option (b). Based on the comment in the code, it seems option (a) is recommended, which one should I use? Please suggest if I am missing anything, or any other better solutions in my case?


Thanks,
Fuyao






Re: Use TwoPhaseCommitSinkFunction or StatefulSink+TwoPhaseCommittingSink to achieve EXACTLY_ONCE sink?

Posted by Yun Gao <yu...@aliyun.com>.
Hi Fuyao,

Logically if a system want to support end-to-end exactly once,
it should support transactions:
1. The transactions hold the records, it get pre-committed on snapshot state
and get committed on checkpont succeed.
2. The transaction should still be able to be aborted after pre-committed.
3. Once pre-committed, the transactions must be able to be committed, even if
the flink jobs fails between pre-committed and committed, after the job restarted
these transaction should be able to be committed again. 

If the external system meet such conditions, to implement an exactly-once sink, 
the option b) should be more recommend. However, these interface is newly added
in the upcoming 1.15 and it might need to be wait for about 1.5 month before releasing.

An early version for option b is the org.apache.flink.api.connector.sink.Sink. It is much 
similar to the option b) and are supported since 1.13. It would still be supported in the
next several releases and  it also be able to be migrated to the option b) easily. 

Best,
Yun



 ------------------Original Mail ------------------
Sender:Fuyao Li <fu...@oracle.com>
Send Date:Thu Feb 10 07:01:51 2022
Recipients:user <us...@flink.apache.org>
Subject:Use TwoPhaseCommitSinkFunction or StatefulSink+TwoPhaseCommittingSink to achieve EXACTLY_ONCE sink?

Hello Community,
I have two questions regarding Flink custom sink with EXACTLY_ONCE semantic.

I have a SDK that could publish messages based on HTTP (backed by Oracle Streaming Service --- very similar to Kafka).  This will be my Flink application’s sink. Is it possible to use this SDK as sink with EXACTLY_ONCE semantic? HTTP is stateless here… If possible, what could be added in SDK to support EXACTLY_ONCE?
If it is possible for question 1, then I need to implement a custom sink for this. Which option should I use?
Option 1:TwoPhaseCommitSinkFunction
Option 2:StatefulSink + TwoPhaseCommittingSink
The legacy FlinkKafkaProducer seems to be using option (a) ---- This will be removed from Flink in the future. The newKafkaSink seems to be using option (b). Based on the comment in the code, it seems option (a) is recommended, which one should I use? Please suggest if I am missing anything, or any other better solutions in my case?
Thanks,
Fuyao