You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by "Wu, Huijun" <hu...@ebay.com> on 2021/09/22 05:46:07 UTC

[Question] How to manually commit Kafka offset in Apache Beam at the end of specific doFun execution

Hi All,
I encounter with a problem which seems common, but I couldn’t find any working solution online like stack overflow or google search, so I am asking for help here.

I create a simple Apache Beam streaming pipeline which read data from Kafka, do some processing and persist the result by calling some external service's API. I want to make sure no data are lost during pipeline restart or failure so I want to manually commit the record offset to Kafka after I successfully call the API at the end of specific doFun execution.

In my previous Kafka experience, I know that by using Kafka Consumer's below API, I am able to manually commit the record offset to Kafka.

consumer.commitSync(currentOffsets);

There is setting to turn off the auto commit in KafkaIO setup, however I didn't find any working solution or interfaces exposed to manually commit offset in Apache Beam as there seems no way I can access the consumer in doFun. The project I am working on is using Apache Beam 2.16.0 due to some historical reasons, but I am happy to upgrade it if latest feature provides working solution.

I will be really appreciated if if some expert can kindly share some hint with sample codes.

Best Regards,

Re: [Question] How to manually commit Kafka offset in Apache Beam at the end of specific doFun execution

Posted by Jan Lukavský <je...@seznam.cz>.
On 9/22/21 11:42 AM, Wu, Huijun wrote:
>
> Hi Lukavský,
>
> Appreciated for your kind help.
>
> I want to understands more about the method commitOffsetsInFinalize 
> and the Checkpoint in Apache Beam.
>
> (1) Regarding commitOffsetsInFinalize
>
> The doc said “Finalized offsets are committed to Kafka.” I want to 
> know explicitly when the offset will be committed to Kafka?
>
> For instance if our pipeline has 4 steps:
>
> Read data from Kafka -> transformation on the raw data -> Send data to 
> external service by http call -> Persist http response into DB
>
> if I enable commitOffsetsInFinalize, is the offset committed 
> immediately after the data is read out from Kafka or it is committed 
> immediately after http response are persisted into DB succesffuly?
>
> If at the third step, the external service is down and we fail to make 
> the http call, how can we pause the whole streaming process until 
> external service recovered? (That leads me trying to find solution to 
> manually commit offset back to Kafka)
>
> If we restart the dataflow, will the data got lost which has completed 
> step 1 and step 2 but not step 3 yet?
>
The Apache Beam model uses a concept called "bundles". A bundle is a set 
of records, which are processed atomically - either all elements are 
processed or no elements are processed at all. The 
commitOffsetsInFinalize commits offsets once a bundle is "finalized" - 
that is once is safely persisted. Where it should be persisted is not 
defined by the runner, but is runner-dependent, but what you can rely 
on, is that there should be no data-loss possible.
>
> (2) Regarding checkpoint
>
> I am using Spark as my underlying runner for Apache Beam pipeline. In 
> Spark, I can use the below to make checkpoint:
>
> spark.sparkContext.setCheckpointDir(path)
>
> |rdd1.checkpoint()|
> ||
>
> But in Apache Beam, I never explicitly deal with checkpoint, when and 
> how the checkpoint are made behind the scene and how it will affect 
> commitOffsetsInFinalize?
>
As mentioned above, _how_ is the atomicity achieved depends on the 
runner. I'm not SparkRunner expert, so I cannot tell for sure how it 
works in Spark, but the good news is that it should not matter to the 
user code.

  Jan

> Best Regards,
>
> *From: *Jan Lukavský <je...@seznam.cz>
> *Reply-To: *"user@beam.apache.org" <us...@beam.apache.org>
> *Date: *Wednesday, September 22, 2021 at 4:39 PM
> *To: *"user@beam.apache.org" <us...@beam.apache.org>
> *Subject: *Re: [Question] How to manually commit Kafka offset in 
> Apache Beam at the end of specific doFun execution
>
> External Email
>
> Hi,
>
> are you using KafkaIO? If yes, then you can enable offsets commit in 
> bundle finalize via [1]. Note on the other hand, that KafkaIO stores 
> offsets in checkpoint, so - provided you run your Beam Pipeline on a 
> runner with enabled checkpointing - it should not be necessary to 
> commit offsets to Kafka only for the sake of exactly once processing. 
> That should be granted even without that.
>
> Please don't hesitate to ask if you have any more questions.
>
> Best,
>
>  Jan
>
> [1] 
> https://beam.apache.org/releases/javadoc/2.32.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#commitOffsetsInFinalize 
> <https://nam10.safelinks.protection.outlook.com/?url=https%3A%2F%2Fbeam.apache.org%2Freleases%2Fjavadoc%2F2.32.0%2Forg%2Fapache%2Fbeam%2Fsdk%2Fio%2Fkafka%2FKafkaIO.Read.html%23commitOffsetsInFinalize&data=04%7C01%7Chuiwu%40ebay.com%7C7bfe3ae479cd457afa0508d97da46b2e%7C46326bff992841a0baca17c16c94ea99%7C0%7C0%7C637678967848814863%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000&sdata=DUYl9y1nbDeuuByIcQsiemedXOYMOrpNCHtcd8McoKE%3D&reserved=0>--
>
> On 9/22/21 7:46 AM, Wu, Huijun wrote:
>
>     Hi All,
>
>     I encounter with a problem which seems common, but I couldn’t find
>     any working solution online like stack overflow or google search,
>     so I am asking for help here.
>
>     I create a simple Apache Beam streaming pipeline which read data
>     from Kafka, do some processing and persist the result by calling
>     some external service's API. I want to make sure no data are lost
>     during pipeline restart or failure so I want to manually commit
>     the record offset to Kafka after I successfully call the API at
>     the end of specific doFun execution.
>
>     In my previous Kafka experience, I know that by using Kafka
>     Consumer's below API, I am able to manually commit the record
>     offset to Kafka.
>
>     *consumer.commitSync(currentOffsets); *
>
>     There is setting to turn off the auto commit in KafkaIO setup,
>     however I didn't find any working solution or interfaces exposed
>     to manually commit offset in Apache Beam as there seems no way I
>     can access the consumer in doFun. The project I am working on is
>     using Apache Beam 2.16.0 due to some historical reasons, but I am
>     happy to upgrade it if latest feature provides working solution.
>
>     I will be really appreciated if if some expert can kindly share
>     some hint with sample codes.
>
>     Best Regards,
>


Re: [Question] How to manually commit Kafka offset in Apache Beam at the end of specific doFun execution

Posted by "Wu, Huijun" <hu...@ebay.com>.
Hi Lukavský,
Appreciated for your kind help.

I want to understands more about the method commitOffsetsInFinalize and the Checkpoint in Apache Beam.

(1) Regarding commitOffsetsInFinalize
The doc said “Finalized offsets are committed to Kafka.” I want to know explicitly when the offset will be committed to Kafka?
For instance if our pipeline has 4 steps:
Read data from Kafka -> transformation on the raw data -> Send data to external service by http call -> Persist http response into DB

if I enable commitOffsetsInFinalize, is the offset committed immediately after the data is read out from Kafka or it is committed immediately after http response are persisted into DB succesffuly?
If at the third step, the external service is down and we fail to make the http call, how can we pause the whole streaming process until external service recovered? (That leads me trying to find solution to manually commit offset back to Kafka)
If we restart the dataflow, will the data got lost which has completed step 1 and step 2 but not step 3 yet?

(2) Regarding checkpoint
I am using Spark as my underlying runner for Apache Beam pipeline. In Spark, I can use the below to make checkpoint:

spark.sparkContext.setCheckpointDir(path)

rdd1.checkpoint()


But in Apache Beam, I never explicitly deal with checkpoint, when and how the checkpoint are made behind the scene and how it will affect commitOffsetsInFinalize?

Best Regards,

From: Jan Lukavský <je...@seznam.cz>
Reply-To: "user@beam.apache.org" <us...@beam.apache.org>
Date: Wednesday, September 22, 2021 at 4:39 PM
To: "user@beam.apache.org" <us...@beam.apache.org>
Subject: Re: [Question] How to manually commit Kafka offset in Apache Beam at the end of specific doFun execution

External Email

Hi,

are you using KafkaIO? If yes, then you can enable offsets commit in bundle finalize via [1]. Note on the other hand, that KafkaIO stores offsets in checkpoint, so - provided you run your Beam Pipeline on a runner with enabled checkpointing - it should not be necessary to commit offsets to Kafka only for the sake of exactly once processing. That should be granted even without that.

Please don't hesitate to ask if you have any more questions.

Best,

 Jan

[1] https://beam.apache.org/releases/javadoc/2.32.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#commitOffsetsInFinalize<https://nam10.safelinks.protection.outlook.com/?url=https%3A%2F%2Fbeam.apache.org%2Freleases%2Fjavadoc%2F2.32.0%2Forg%2Fapache%2Fbeam%2Fsdk%2Fio%2Fkafka%2FKafkaIO.Read.html%23commitOffsetsInFinalize&data=04%7C01%7Chuiwu%40ebay.com%7C7bfe3ae479cd457afa0508d97da46b2e%7C46326bff992841a0baca17c16c94ea99%7C0%7C0%7C637678967848814863%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000&sdata=DUYl9y1nbDeuuByIcQsiemedXOYMOrpNCHtcd8McoKE%3D&reserved=0>--
On 9/22/21 7:46 AM, Wu, Huijun wrote:
Hi All,
I encounter with a problem which seems common, but I couldn’t find any working solution online like stack overflow or google search, so I am asking for help here.

I create a simple Apache Beam streaming pipeline which read data from Kafka, do some processing and persist the result by calling some external service's API. I want to make sure no data are lost during pipeline restart or failure so I want to manually commit the record offset to Kafka after I successfully call the API at the end of specific doFun execution.

In my previous Kafka experience, I know that by using Kafka Consumer's below API, I am able to manually commit the record offset to Kafka.

consumer.commitSync(currentOffsets);

There is setting to turn off the auto commit in KafkaIO setup, however I didn't find any working solution or interfaces exposed to manually commit offset in Apache Beam as there seems no way I can access the consumer in doFun. The project I am working on is using Apache Beam 2.16.0 due to some historical reasons, but I am happy to upgrade it if latest feature provides working solution.

I will be really appreciated if if some expert can kindly share some hint with sample codes.

Best Regards,

Re: [Question] How to manually commit Kafka offset in Apache Beam at the end of specific doFun execution

Posted by Jan Lukavský <je...@seznam.cz>.
Hi,

are you using KafkaIO? If yes, then you can enable offsets commit in 
bundle finalize via [1]. Note on the other hand, that KafkaIO stores 
offsets in checkpoint, so - provided you run your Beam Pipeline on a 
runner with enabled checkpointing - it should not be necessary to commit 
offsets to Kafka only for the sake of exactly once processing. That 
should be granted even without that.

Please don't hesitate to ask if you have any more questions.

Best,

  Jan

[1] 
https://beam.apache.org/releases/javadoc/2.32.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#commitOffsetsInFinalize--

On 9/22/21 7:46 AM, Wu, Huijun wrote:
>
> Hi All,
>
> I encounter with a problem which seems common, but I couldn’t find any 
> working solution online like stack overflow or google search, so I am 
> asking for help here.
>
> I create a simple Apache Beam streaming pipeline which read data from 
> Kafka, do some processing and persist the result by calling some 
> external service's API. I want to make sure no data are lost during 
> pipeline restart or failure so I want to manually commit the record 
> offset to Kafka after I successfully call the API at the end of 
> specific doFun execution.
>
> In my previous Kafka experience, I know that by using Kafka Consumer's 
> below API, I am able to manually commit the record offset to Kafka.
>
> *consumer.commitSync(currentOffsets); *
>
> There is setting to turn off the auto commit in KafkaIO setup, however 
> I didn't find any working solution or interfaces exposed to manually 
> commit offset in Apache Beam as there seems no way I can access the 
> consumer in doFun. The project I am working on is using Apache Beam 
> 2.16.0 due to some historical reasons, but I am happy to upgrade it if 
> latest feature provides working solution.
>
> I will be really appreciated if if some expert can kindly share some 
> hint with sample codes.
>
> Best Regards,
>