You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by 郑 洁锋 <zj...@hotmail.com> on 2019/11/28 03:32:41 UTC

How to store&read offset with kafkaio

Hi,
       I want to store the offset in Oracle/Mysql, and then every time I start the real-time streaming task, I want this task to start consuming from the offset stored in Oracle/Mysql.
       I haven't seen how to manually store the offset and specify the offset to start consuming after reading the documentation.

       Can anyone help me
________________________________
zjfplayer@hotmail.com

Re: Re: How to store&read offset with kafkaio

Posted by 郑 洁锋 <zj...@hotmail.com>.
Hi!
        The following is explained in the official kafka documentation:
        [cid:_Foxmail.1@650e07ec-0053-d8e0-f703-6e01aae70803]

            I use the following data structure to store the consumption status of kafka corresponding topics:
          [cid:_Foxmail.1@de3bd115-81b1-6222-f34d-cd7b3f8db2f0]

         Then in spark streaming I can start consuming from the specified location with the following code:
         [cid:_Foxmail.1@b09d1336-e988-6b10-8734-1deb34a7a9b3]

        So when I switch to apache beam, I need to know how to implement this function in a way supported by apache beam. This is a function that our company now implements through the spark streaming framework.
________________________________
zjfplayer@hotmail.com

From: Pablo Estrada<ma...@google.com>
Date: 2019-12-05 03:04
To: user<ma...@beam.apache.org>
Subject: Re: How to store&read offset with kafkaio
Hi!
What do you mean by offset? Is 'offset' a field in a database table? Or maybe it's an offset in the database binlog?
Best
-P.

On Wed, Nov 27, 2019 at 7:32 PM 郑 洁锋 <zj...@hotmail.com>> wrote:
Hi,
       I want to store the offset in Oracle/Mysql, and then every time I start the real-time streaming task, I want this task to start consuming from the offset stored in Oracle/Mysql.
       I haven't seen how to manually store the offset and specify the offset to start consuming after reading the documentation.

       Can anyone help me
________________________________
zjfplayer@hotmail.com<ma...@hotmail.com>

Re: How to store&read offset with kafkaio

Posted by Alexey Romanenko <ar...@gmail.com>.
Thank for creating a Jira.

I know this is not exactly what you are looking for, but did you try to use “commitOffsetsInFinalize()”? It does not provide exactly-once guarantee for consumption but it minimises the gap between new and already processed records in case of pipeline restart. Perhaps, it could be helpful. 

> On 10 Dec 2019, at 03:19, 郑 洁锋 <zj...@hotmail.com> wrote:
> 
> Hi,
>         Thank you very much for your reply, I have added jira, the url is:
>          https://issues.apache.org/jira/browse/BEAM-8934 <https://issues.apache.org/jira/browse/BEAM-8934>
> 
>         This is the first time I have created jira, and I apologize if there is a problem. Please tell me and i will make changes.
>         
> zjfplayer@hotmail.com <ma...@hotmail.com>
>  
> From: Pablo Estrada <ma...@google.com>
> Date: 2019-12-10 00:49
> To: user <ma...@beam.apache.org>
> Subject: Re: Re: How to store&read offset with kafkaio
> So if I understand correctly, you specify a map<partition, offset>, and pass it to the IO to start from there, right?
> I don't think our KafkaIO supports that ATM[1], but it sounds like a desirable feature.
> 
> Would you please create a JIRA to track adding that?
> 
> [1] https://beam.apache.org/releases/javadoc/2.16.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html <https://beam.apache.org/releases/javadoc/2.16.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html>
> On Sun, Dec 8, 2019 at 6:17 PM 郑 洁锋 <zjfplayer@hotmail.com <ma...@hotmail.com>> wrote:
> Hi,
>        The main reason I need this feature is to support exactly-once semantics in spark streaming:
>         <Catch4F3E(12-10-10-16-18).jpg>
> 
> 
>        According to our existing business, we choose to use the following method to implement (also in the spark streaming official document):
>         <CatchBC9F(12-10-10-16-18).jpg>
>          The complete document url is:
>          http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html <http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html>
>     
> zjfplayer@hotmail.com <ma...@hotmail.com>
>  
> From: Chamikara Jayalath <ma...@google.com>
> Date: 2019-12-05 03:17
> To: user <ma...@beam.apache.org>
> Subject: Re: How to store&read offset with kafkaio
> I assume you meant Kafka offset - https://kafka.apache.org/documentation/#intro_topics <https://kafka.apache.org/documentation/#intro_topics>
> 
> Currently I don't think this is possible due to two reasons.
> 
> (1) Currently Kafka source can either read from a given topic or a set of topic partitions, but not from a given offset - https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L523 <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L523>
> (2) Currently source has to be at the top of the pipeline graph to operate. For example, you cannot initiate a Kafka source from a topic/partition read from a database.
> 
> (2) should be possible when we have our next generation source framework, SplittableDoFn. May be when we have that we can consider adding (1) as well, if there are good justifications for that. I think the policy regarding the offset to start reading from is configured in the Kafka cluster and specifying a specific offset will not work if the corresponding messages have been purged by the Kafka cluster, so I'm not sure how useful adding support for reading from a given offset will be.
> 
> Thanks,
> Cham
> 
> 


Re: Re: How to store&read offset with kafkaio

Posted by 郑 洁锋 <zj...@hotmail.com>.
Hi,
        Thank you very much for your reply, I have added jira, the url is:
         https://issues.apache.org/jira/browse/BEAM-8934

        This is the first time I have created jira, and I apologize if there is a problem. Please tell me and i will make changes.

________________________________
zjfplayer@hotmail.com

From: Pablo Estrada<ma...@google.com>
Date: 2019-12-10 00:49
To: user<ma...@beam.apache.org>
Subject: Re: Re: How to store&read offset with kafkaio
So if I understand correctly, you specify a map<partition, offset>, and pass it to the IO to start from there, right?
I don't think our KafkaIO supports that ATM[1], but it sounds like a desirable feature.

Would you please create a JIRA to track adding that?

[1] https://beam.apache.org/releases/javadoc/2.16.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html

On Sun, Dec 8, 2019 at 6:17 PM 郑 洁锋 <zj...@hotmail.com>> wrote:
Hi,
       The main reason I need this feature is to support exactly-once semantics in spark streaming:
        [cid:_Foxmail.1@3e8020de-35e3-15d4-b70b-08a74a45c5d1]


       According to our existing business, we choose to use the following method to implement (also in the spark streaming official document):
        [cid:_Foxmail.1@47cc9f05-806b-7a7d-05f6-140a51f16806]
         The complete document url is:
         http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html

________________________________
zjfplayer@hotmail.com<ma...@hotmail.com>

From: Chamikara Jayalath<ma...@google.com>
Date: 2019-12-05 03:17
To: user<ma...@beam.apache.org>
Subject: Re: How to store&read offset with kafkaio
I assume you meant Kafka offset - https://kafka.apache.org/documentation/#intro_topics

Currently I don't think this is possible due to two reasons.

(1) Currently Kafka source can either read from a given topic or a set of topic partitions, but not from a given offset - https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L523
(2) Currently source has to be at the top of the pipeline graph to operate. For example, you cannot initiate a Kafka source from a topic/partition read from a database.

(2) should be possible when we have our next generation source framework, SplittableDoFn. May be when we have that we can consider adding (1) as well, if there are good justifications for that. I think the policy regarding the offset to start reading from is configured in the Kafka cluster and specifying a specific offset will not work if the corresponding messages have been purged by the Kafka cluster, so I'm not sure how useful adding support for reading from a given offset will be.

Thanks,
Cham


Re: Re: How to store&read offset with kafkaio

Posted by Pablo Estrada <pa...@google.com>.
So if I understand correctly, you specify a map<partition, offset>, and
pass it to the IO to start from there, right?
I don't think our KafkaIO supports that ATM[1], but it sounds like a
desirable feature.

Would you please create a JIRA to track adding that?

[1]
https://beam.apache.org/releases/javadoc/2.16.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html

On Sun, Dec 8, 2019 at 6:17 PM 郑 洁锋 <zj...@hotmail.com> wrote:

> Hi,
>        The main reason I need this feature is to support exactly-once
> semantics in spark streaming:
>
>
>
>        According to our existing business, we choose to use the following
> method to implement (also in the spark streaming official document):
>
>          The complete document url is:
>
> http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html
>
> ------------------------------
> zjfplayer@hotmail.com
>
>
> *From:* Chamikara Jayalath <ch...@google.com>
> *Date:* 2019-12-05 03:17
> *To:* user <us...@beam.apache.org>
> *Subject:* Re: How to store&read offset with kafkaio
> I assume you meant Kafka offset -
> https://kafka.apache.org/documentation/#intro_topics
>
> Currently I don't think this is possible due to two reasons.
>
> (1) Currently Kafka source can either read from a given topic or a set of
> topic partitions, but not from a given offset -
> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L523
> (2) Currently source has to be at the top of the pipeline graph to
> operate. For example, you cannot initiate a Kafka source from a
> topic/partition read from a database.
>
> (2) should be possible when we have our next generation source framework,
> SplittableDoFn. May be when we have that we can consider adding (1) as
> well, if there are good justifications for that. I think the policy
> regarding the offset to start reading from is configured in the Kafka
> cluster and specifying a specific offset will not work if the corresponding
> messages have been purged by the Kafka cluster, so I'm not sure how useful
> adding support for reading from a given offset will be.
>
> Thanks,
> Cham
>
>

Re: Re: How to store&read offset with kafkaio

Posted by 郑 洁锋 <zj...@hotmail.com>.
Hi,
       The main reason I need this feature is to support exactly-once semantics in spark streaming:
        [cid:_Foxmail.1@10fb32f5-500d-6d9a-3a7d-5edae827b4dc]


       According to our existing business, we choose to use the following method to implement (also in the spark streaming official document):
        [cid:_Foxmail.1@08ed28b9-7ee5-dcfb-f95f-0786270e3a2d]
         The complete document url is:
         http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html

________________________________
zjfplayer@hotmail.com

From: Chamikara Jayalath<ma...@google.com>
Date: 2019-12-05 03:17
To: user<ma...@beam.apache.org>
Subject: Re: How to store&read offset with kafkaio
I assume you meant Kafka offset - https://kafka.apache.org/documentation/#intro_topics

Currently I don't think this is possible due to two reasons.

(1) Currently Kafka source can either read from a given topic or a set of topic partitions, but not from a given offset - https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L523
(2) Currently source has to be at the top of the pipeline graph to operate. For example, you cannot initiate a Kafka source from a topic/partition read from a database.

(2) should be possible when we have our next generation source framework, SplittableDoFn. May be when we have that we can consider adding (1) as well, if there are good justifications for that. I think the policy regarding the offset to start reading from is configured in the Kafka cluster and specifying a specific offset will not work if the corresponding messages have been purged by the Kafka cluster, so I'm not sure how useful adding support for reading from a given offset will be.

Thanks,
Cham


Re: How to store&read offset with kafkaio

Posted by Chamikara Jayalath <ch...@google.com>.
I assume you meant Kafka offset -
https://kafka.apache.org/documentation/#intro_topics

Currently I don't think this is possible due to two reasons.

(1) Currently Kafka source can either read from a given topic or a set of
topic partitions, but not from a given offset -
https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L523
(2) Currently source has to be at the top of the pipeline graph to operate.
For example, you cannot initiate a Kafka source from a topic/partition read
from a database.

(2) should be possible when we have our next generation source framework,
SplittableDoFn. May be when we have that we can consider adding (1) as
well, if there are good justifications for that. I think the policy
regarding the offset to start reading from is configured in the Kafka
cluster and specifying a specific offset will not work if the corresponding
messages have been purged by the Kafka cluster, so I'm not sure how useful
adding support for reading from a given offset will be.

Thanks,
Cham

Re: How to store&read offset with kafkaio

Posted by Pablo Estrada <pa...@google.com>.
Hi!
What do you mean by offset? Is 'offset' a field in a database table? Or
maybe it's an offset in the database binlog?
Best
-P.

On Wed, Nov 27, 2019 at 7:32 PM 郑 洁锋 <zj...@hotmail.com> wrote:

> Hi,
>        I want to store the offset in Oracle/Mysql, and then every time I
> start the real-time streaming task, I want this task to start consuming
> from the offset stored in Oracle/Mysql.
>        I haven't seen how to manually store the offset and specify the
> offset to start consuming after reading the documentation.
>
>        Can anyone help me
> ------------------------------
> zjfplayer@hotmail.com
>