You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by André Missaglia <an...@arquivei.com.br> on 2019/01/09 13:28:22 UTC

KafkaIO not commiting offsets when using withMaxReadTime

Hello everyone,

I need to do some batch processing that uses messages in a Kafka topic. So
I tried the "withMaxReadTime" KafkaIO setting:

---
val properties = new Properties()
properties.setProperty("bootstrap.servers", "...")
properties.setProperty("group.id", "mygroup")
properties.setProperty("sasl.jaas.config", "...")
properties.setProperty("security.protocol", "SASL_PLAINTEXT")
properties.setProperty("sasl.mechanism", "SCRAM-SHA-256")
properties.setProperty("enable.auto.commit", "false")

sc.customInput("Read From Kafka",
  KafkaIO
    .read[String, String]()
    .withTopic("mytopic")
    .withKeyDeserializer(classOf[StringDeserializer])
    .withValueDeserializer(classOf[StringDeserializer])
    .updateConsumerProperties(properties)
    .withMaxReadTime(Duration.standardSeconds(20))
    .withMaxNumRecords(1000000)
    .commitOffsetsInFinalize()
    .withoutMetadata()
)
.count.debug() // prints something between 10000 and 20000
---
I can see that it was able to read the messages and process them. But in
the end, no offset was commited:

TOPIC                                        PARTITION  CURRENT-OFFSET
LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
mytopic                                         0          0
3094751         3094751         -               -               -

But it is a strange behavior: sometimes it commits the offset, sometimes
not. I'm not sure if it is a bug, or I'm using the wrong configs.

Has anyone used Bounded KafkaIO before? is there anything I can do?

Best Regards,

-- 
*André Badawi Missaglia*
Data Engineer
(16) 3509-5515 *|* www.arquivei.com.br
<https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura>
[image: Arquivei.com.br – Inteligência em Notas Fiscais]
<https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura>
[image: Google seleciona Arquivei para imersão e mentoria no Vale do
Silício]
<https://arquivei.com.br/blog/google-seleciona-arquivei/?utm_campaign=assinatura-email-launchpad&utm_content=assinatura-launchpad>
<https://www.facebook.com/arquivei>
<https://www.linkedin.com/company/arquivei>
<https://www.youtube.com/watch?v=KJFrh8h4Zds&yt%3Acc=on>

Re: KafkaIO not commiting offsets when using withMaxReadTime

Posted by Alexey Romanenko <ar...@gmail.com>.
Hi Jozef,

I’m not aware if someone is working on this. In mean time, I created a Jira for this: https://issues.apache.org/jira/browse/BEAM-6466 <https://issues.apache.org/jira/browse/BEAM-6466>
Feel free to contribute if you wish.

> On 17 Jan 2019, at 09:10, Jozef Vilcek <jo...@gmail.com> wrote:
> 
> Hello,
> was there any progress on this or JIRA I can follow? I could use bounded processing over KafkaIO too. 
> 
> Thanks,
> Jozef
> 
> On Thu, Jan 10, 2019 at 4:57 PM Alexey Romanenko <aromanenko.dev@gmail.com <ma...@gmail.com>> wrote:
> Don’t you think that we could have some race condition there since, according to initial issue description, sometimes offset was committed and sometimes not?
> 
> 
>> On 9 Jan 2019, at 19:48, Raghu Angadi <angadi@gmail.com <ma...@gmail.com>> wrote:
>> 
>> Oh, the generic bounded source wrapper over an unbounded source does not seem to call finalize when it is done with a split. I think it should.
>> 
>> Could you file a bug for the wrapper? 
>> Mean while, this check could be added sanity checks in KafkaIO.Read.expand().
>> 
>> 
>> 
>> On Wed, Jan 9, 2019 at 10:37 AM André Missaglia <andre.missaglia@arquivei.com.br <ma...@arquivei.com.br>> wrote:
>> Hi Juan,
>> 
>> After researching a bit, I found this issue, which is open since 2017: https://issues.apache.org/jira/browse/BEAM-2185 <https://issues.apache.org/jira/browse/BEAM-2185>
>> 
>> I guess KafkaIO isn't intended to provide a bounded source. Maybe I should write my own code that fetches messages from kafka, even if it means giving up on some processing guarantees from beam...
>> 
>> 
>> Em qua, 9 de jan de 2019 às 14:24, Juan Carlos Garcia <jcgarciam@gmail.com <ma...@gmail.com>> escreveu:
>> Just for you to have a look where this happen:
>> 
>> https://github.com/apache/beam/blob/dffe2c1a2bd95f78869b266d3e1ea3f8ad8c323d/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L584 <https://github.com/apache/beam/blob/dffe2c1a2bd95f78869b266d3e1ea3f8ad8c323d/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L584>
>> 
>> Cheers
>> 
>> On Wed, Jan 9, 2019 at 5:09 PM Juan Carlos Garcia <jcgarciam@gmail.com <ma...@gmail.com>> wrote:
>> I also experience the same, as per the documentation *withMaxReadTime* and *withMaxNumRecords* are mainly used for Demo purposes, so i guess is beyond the scope of the current KafkaIO to behave as Bounded with offset management or just something is missing in the current implementation (Watermarking).
>> 
>> 
>> 
>> On Wed, Jan 9, 2019 at 2:28 PM André Missaglia <andre.missaglia@arquivei.com.br <ma...@arquivei.com.br>> wrote:
>> Hello everyone,
>> 
>> I need to do some batch processing that uses messages in a Kafka topic. So I tried the "withMaxReadTime" KafkaIO setting:
>> 
>> ---
>> val properties = new Properties()
>> properties.setProperty("bootstrap.servers", "...")
>> properties.setProperty("group.id <http://group.id/>", "mygroup")
>> properties.setProperty("sasl.jaas.config", "...")
>> properties.setProperty("security.protocol", "SASL_PLAINTEXT")
>> properties.setProperty("sasl.mechanism", "SCRAM-SHA-256")
>> properties.setProperty("enable.auto.commit", "false")
>> 
>> sc.customInput("Read From Kafka",
>>   KafkaIO
>>     .read[String, String]()
>>     .withTopic("mytopic")
>>     .withKeyDeserializer(classOf[StringDeserializer])
>>     .withValueDeserializer(classOf[StringDeserializer])
>>     .updateConsumerProperties(properties)
>>     .withMaxReadTime(Duration.standardSeconds(20))
>>     .withMaxNumRecords(1000000)
>>     .commitOffsetsInFinalize()
>>     .withoutMetadata()
>> )
>> .count.debug() // prints something between 10000 and 20000
>> ---
>> I can see that it was able to read the messages and process them. But in the end, no offset was commited:
>> 
>> TOPIC                                        PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
>> mytopic                                         0          0               3094751         3094751         -               -               -
>> 
>> But it is a strange behavior: sometimes it commits the offset, sometimes not. I'm not sure if it is a bug, or I'm using the wrong configs.
>> 
>> Has anyone used Bounded KafkaIO before? is there anything I can do?
>> 
>> Best Regards,
>> 
>> -- 
>> André Badawi Missaglia
>> Data Engineer
>> (16) 3509-5515 | www.arquivei.com.br <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura>
>>  <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura>
>>  <https://arquivei.com.br/blog/google-seleciona-arquivei/?utm_campaign=assinatura-email-launchpad&utm_content=assinatura-launchpad>
>>  <https://www.facebook.com/arquivei>  <https://www.linkedin.com/company/arquivei>  <https://www.youtube.com/watch?v=KJFrh8h4Zds&yt%3Acc=on>
>> 
>> -- 
>> 
>> JC 
>> 
>> 
>> 
>> -- 
>> 
>> JC 
>> 
>> 
>> 
>> -- 
>> André Badawi Missaglia
>> Data Engineer
>> (16) 3509-5515 | www.arquivei.com.br <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura>
>>  <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura>
>>  <https://arquivei.com.br/blog/google-seleciona-arquivei/?utm_campaign=assinatura-email-launchpad&utm_content=assinatura-launchpad>
>>  <https://www.facebook.com/arquivei>  <https://www.linkedin.com/company/arquivei>  <https://www.youtube.com/watch?v=KJFrh8h4Zds&yt%3Acc=on>


Re: KafkaIO not commiting offsets when using withMaxReadTime

Posted by Jozef Vilcek <jo...@gmail.com>.
Hello,
was there any progress on this or JIRA I can follow? I could use bounded
processing over KafkaIO too.

Thanks,
Jozef

On Thu, Jan 10, 2019 at 4:57 PM Alexey Romanenko <ar...@gmail.com>
wrote:

> Don’t you think that we could have some race condition there since,
> according to initial issue description, sometimes offset was committed and
> sometimes not?
>
>
> On 9 Jan 2019, at 19:48, Raghu Angadi <an...@gmail.com> wrote:
>
> Oh, the generic bounded source wrapper over an unbounded source does not
> seem to call finalize when it is done with a split. I think it should.
>
> Could you file a bug for the wrapper?
> Mean while, this check could be added sanity checks in
> KafkaIO.Read.expand().
>
>
>
> On Wed, Jan 9, 2019 at 10:37 AM André Missaglia <
> andre.missaglia@arquivei.com.br> wrote:
>
>> Hi Juan,
>>
>> After researching a bit, I found this issue, which is open since 2017:
>> https://issues.apache.org/jira/browse/BEAM-2185
>>
>> I guess KafkaIO isn't intended to provide a bounded source. Maybe I
>> should write my own code that fetches messages from kafka, even if it means
>> giving up on some processing guarantees from beam...
>>
>>
>> Em qua, 9 de jan de 2019 às 14:24, Juan Carlos Garcia <
>> jcgarciam@gmail.com> escreveu:
>>
>>> Just for you to have a look where this happen:
>>>
>>>
>>> https://github.com/apache/beam/blob/dffe2c1a2bd95f78869b266d3e1ea3f8ad8c323d/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L584
>>>
>>> Cheers
>>>
>>> On Wed, Jan 9, 2019 at 5:09 PM Juan Carlos Garcia <jc...@gmail.com>
>>> wrote:
>>>
>>>> I also experience the same, as per the documentation **withMaxReadTime**
>>>> and **withMaxNumRecords** are mainly used for Demo purposes, so i
>>>> guess is beyond the scope of the current KafkaIO to behave as Bounded with
>>>> offset management or just something is missing in the current
>>>> implementation (Watermarking).
>>>>
>>>>
>>>>
>>>> On Wed, Jan 9, 2019 at 2:28 PM André Missaglia <
>>>> andre.missaglia@arquivei.com.br> wrote:
>>>>
>>>>> Hello everyone,
>>>>>
>>>>> I need to do some batch processing that uses messages in a Kafka
>>>>> topic. So I tried the "withMaxReadTime" KafkaIO setting:
>>>>>
>>>>> ---
>>>>> val properties = new Properties()
>>>>> properties.setProperty("bootstrap.servers", "...")
>>>>> properties.setProperty("group.id", "mygroup")
>>>>> properties.setProperty("sasl.jaas.config", "...")
>>>>> properties.setProperty("security.protocol", "SASL_PLAINTEXT")
>>>>> properties.setProperty("sasl.mechanism", "SCRAM-SHA-256")
>>>>> properties.setProperty("enable.auto.commit", "false")
>>>>>
>>>>> sc.customInput("Read From Kafka",
>>>>>   KafkaIO
>>>>>     .read[String, String]()
>>>>>     .withTopic("mytopic")
>>>>>     .withKeyDeserializer(classOf[StringDeserializer])
>>>>>     .withValueDeserializer(classOf[StringDeserializer])
>>>>>     .updateConsumerProperties(properties)
>>>>>     .withMaxReadTime(Duration.standardSeconds(20))
>>>>>     .withMaxNumRecords(1000000)
>>>>>     .commitOffsetsInFinalize()
>>>>>     .withoutMetadata()
>>>>> )
>>>>> .count.debug() // prints something between 10000 and 20000
>>>>> ---
>>>>> I can see that it was able to read the messages and process them. But
>>>>> in the end, no offset was commited:
>>>>>
>>>>> TOPIC                                        PARTITION
>>>>> CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID
>>>>> HOST            CLIENT-ID
>>>>> mytopic                                         0
>>>>> 0               3094751         3094751         -
>>>>> -               -
>>>>>
>>>>> But it is a strange behavior: sometimes it commits the offset,
>>>>> sometimes not. I'm not sure if it is a bug, or I'm using the wrong configs.
>>>>>
>>>>> Has anyone used Bounded KafkaIO before? is there anything I can do?
>>>>>
>>>>> Best Regards,
>>>>>
>>>>> --
>>>>> *André Badawi Missaglia*
>>>>> Data Engineer
>>>>> (16) 3509-5515 *|* www.arquivei.com.br
>>>>> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura>
>>>>> [image: Arquivei.com.br – Inteligência em Notas Fiscais]
>>>>> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura>
>>>>> [image: Google seleciona Arquivei para imersão e mentoria no Vale do
>>>>> Silício]
>>>>> <https://arquivei.com.br/blog/google-seleciona-arquivei/?utm_campaign=assinatura-email-launchpad&utm_content=assinatura-launchpad>
>>>>> <https://www.facebook.com/arquivei>
>>>>> <https://www.linkedin.com/company/arquivei>
>>>>> <https://www.youtube.com/watch?v=KJFrh8h4Zds&yt%3Acc=on>
>>>>>
>>>>
>>>>
>>>> --
>>>>
>>>> JC
>>>>
>>>>
>>>
>>> --
>>>
>>> JC
>>>
>>>
>>
>> --
>> *André Badawi Missaglia*
>> Data Engineer
>> (16) 3509-5515 *|* www.arquivei.com.br
>> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura>
>> [image: Arquivei.com.br – Inteligência em Notas Fiscais]
>> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura>
>> [image: Google seleciona Arquivei para imersão e mentoria no Vale do
>> Silício]
>> <https://arquivei.com.br/blog/google-seleciona-arquivei/?utm_campaign=assinatura-email-launchpad&utm_content=assinatura-launchpad>
>> <https://www.facebook.com/arquivei>
>> <https://www.linkedin.com/company/arquivei>
>> <https://www.youtube.com/watch?v=KJFrh8h4Zds&yt%3Acc=on>
>>
>
>

Re: KafkaIO not commiting offsets when using withMaxReadTime

Posted by Raghu Angadi <an...@gmail.com>.
On Thu, Jan 10, 2019 at 7:57 AM Alexey Romanenko <ar...@gmail.com>
wrote:

> Don’t you think that we could have some race condition there since,
> according to initial issue description, sometimes offset was committed and
> sometimes not?
>

Yeah, there is a timing issue. 'finalizeCheckpoint()' does not wait until
checkpoint is committed by the IO thread. See comment at
https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L613

This is best suited for unbounded case, since we don't want to block on
each call to finalize checkpoint. There could be lots of these calls per
second in a streaming pipeline and we only need to commit the latest
checkpoint. But that does not work well when this is used in bounded reader
context.

Fix: KafkaIO could store a flag that it is being read by a bounded wrapper
(see expand() where the bounded wrapper is added). When this flag is set it
could wake up the IO thread and wait for offsets to be committed.





>
>
> On 9 Jan 2019, at 19:48, Raghu Angadi <an...@gmail.com> wrote:
>
> Oh, the generic bounded source wrapper over an unbounded source does not
> seem to call finalize when it is done with a split. I think it should.
>
> Could you file a bug for the wrapper?
> Mean while, this check could be added sanity checks in
> KafkaIO.Read.expand().
>
>
>
> On Wed, Jan 9, 2019 at 10:37 AM André Missaglia <
> andre.missaglia@arquivei.com.br> wrote:
>
>> Hi Juan,
>>
>> After researching a bit, I found this issue, which is open since 2017:
>> https://issues.apache.org/jira/browse/BEAM-2185
>>
>> I guess KafkaIO isn't intended to provide a bounded source. Maybe I
>> should write my own code that fetches messages from kafka, even if it means
>> giving up on some processing guarantees from beam...
>>
>>
>> Em qua, 9 de jan de 2019 às 14:24, Juan Carlos Garcia <
>> jcgarciam@gmail.com> escreveu:
>>
>>> Just for you to have a look where this happen:
>>>
>>>
>>> https://github.com/apache/beam/blob/dffe2c1a2bd95f78869b266d3e1ea3f8ad8c323d/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L584
>>>
>>> Cheers
>>>
>>> On Wed, Jan 9, 2019 at 5:09 PM Juan Carlos Garcia <jc...@gmail.com>
>>> wrote:
>>>
>>>> I also experience the same, as per the documentation **withMaxReadTime**
>>>> and **withMaxNumRecords** are mainly used for Demo purposes, so i
>>>> guess is beyond the scope of the current KafkaIO to behave as Bounded with
>>>> offset management or just something is missing in the current
>>>> implementation (Watermarking).
>>>>
>>>>
>>>>
>>>> On Wed, Jan 9, 2019 at 2:28 PM André Missaglia <
>>>> andre.missaglia@arquivei.com.br> wrote:
>>>>
>>>>> Hello everyone,
>>>>>
>>>>> I need to do some batch processing that uses messages in a Kafka
>>>>> topic. So I tried the "withMaxReadTime" KafkaIO setting:
>>>>>
>>>>> ---
>>>>> val properties = new Properties()
>>>>> properties.setProperty("bootstrap.servers", "...")
>>>>> properties.setProperty("group.id", "mygroup")
>>>>> properties.setProperty("sasl.jaas.config", "...")
>>>>> properties.setProperty("security.protocol", "SASL_PLAINTEXT")
>>>>> properties.setProperty("sasl.mechanism", "SCRAM-SHA-256")
>>>>> properties.setProperty("enable.auto.commit", "false")
>>>>>
>>>>> sc.customInput("Read From Kafka",
>>>>>   KafkaIO
>>>>>     .read[String, String]()
>>>>>     .withTopic("mytopic")
>>>>>     .withKeyDeserializer(classOf[StringDeserializer])
>>>>>     .withValueDeserializer(classOf[StringDeserializer])
>>>>>     .updateConsumerProperties(properties)
>>>>>     .withMaxReadTime(Duration.standardSeconds(20))
>>>>>     .withMaxNumRecords(1000000)
>>>>>     .commitOffsetsInFinalize()
>>>>>     .withoutMetadata()
>>>>> )
>>>>> .count.debug() // prints something between 10000 and 20000
>>>>> ---
>>>>> I can see that it was able to read the messages and process them. But
>>>>> in the end, no offset was commited:
>>>>>
>>>>> TOPIC                                        PARTITION
>>>>> CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID
>>>>> HOST            CLIENT-ID
>>>>> mytopic                                         0
>>>>> 0               3094751         3094751         -
>>>>> -               -
>>>>>
>>>>> But it is a strange behavior: sometimes it commits the offset,
>>>>> sometimes not. I'm not sure if it is a bug, or I'm using the wrong configs.
>>>>>
>>>>> Has anyone used Bounded KafkaIO before? is there anything I can do?
>>>>>
>>>>> Best Regards,
>>>>>
>>>>> --
>>>>> *André Badawi Missaglia*
>>>>> Data Engineer
>>>>> (16) 3509-5515 *|* www.arquivei.com.br
>>>>> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura>
>>>>> [image: Arquivei.com.br – Inteligência em Notas Fiscais]
>>>>> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura>
>>>>> [image: Google seleciona Arquivei para imersão e mentoria no Vale do
>>>>> Silício]
>>>>> <https://arquivei.com.br/blog/google-seleciona-arquivei/?utm_campaign=assinatura-email-launchpad&utm_content=assinatura-launchpad>
>>>>> <https://www.facebook.com/arquivei>
>>>>> <https://www.linkedin.com/company/arquivei>
>>>>> <https://www.youtube.com/watch?v=KJFrh8h4Zds&yt%3Acc=on>
>>>>>
>>>>
>>>>
>>>> --
>>>>
>>>> JC
>>>>
>>>>
>>>
>>> --
>>>
>>> JC
>>>
>>>
>>
>> --
>> *André Badawi Missaglia*
>> Data Engineer
>> (16) 3509-5515 *|* www.arquivei.com.br
>> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura>
>> [image: Arquivei.com.br – Inteligência em Notas Fiscais]
>> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura>
>> [image: Google seleciona Arquivei para imersão e mentoria no Vale do
>> Silício]
>> <https://arquivei.com.br/blog/google-seleciona-arquivei/?utm_campaign=assinatura-email-launchpad&utm_content=assinatura-launchpad>
>> <https://www.facebook.com/arquivei>
>> <https://www.linkedin.com/company/arquivei>
>> <https://www.youtube.com/watch?v=KJFrh8h4Zds&yt%3Acc=on>
>>
>
>

Re: KafkaIO not commiting offsets when using withMaxReadTime

Posted by Alexey Romanenko <ar...@gmail.com>.
Don’t you think that we could have some race condition there since, according to initial issue description, sometimes offset was committed and sometimes not?


> On 9 Jan 2019, at 19:48, Raghu Angadi <an...@gmail.com> wrote:
> 
> Oh, the generic bounded source wrapper over an unbounded source does not seem to call finalize when it is done with a split. I think it should.
> 
> Could you file a bug for the wrapper? 
> Mean while, this check could be added sanity checks in KafkaIO.Read.expand().
> 
> 
> 
> On Wed, Jan 9, 2019 at 10:37 AM André Missaglia <andre.missaglia@arquivei.com.br <ma...@arquivei.com.br>> wrote:
> Hi Juan,
> 
> After researching a bit, I found this issue, which is open since 2017: https://issues.apache.org/jira/browse/BEAM-2185 <https://issues.apache.org/jira/browse/BEAM-2185>
> 
> I guess KafkaIO isn't intended to provide a bounded source. Maybe I should write my own code that fetches messages from kafka, even if it means giving up on some processing guarantees from beam...
> 
> 
> Em qua, 9 de jan de 2019 às 14:24, Juan Carlos Garcia <jcgarciam@gmail.com <ma...@gmail.com>> escreveu:
> Just for you to have a look where this happen:
> 
> https://github.com/apache/beam/blob/dffe2c1a2bd95f78869b266d3e1ea3f8ad8c323d/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L584 <https://github.com/apache/beam/blob/dffe2c1a2bd95f78869b266d3e1ea3f8ad8c323d/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L584>
> 
> Cheers
> 
> On Wed, Jan 9, 2019 at 5:09 PM Juan Carlos Garcia <jcgarciam@gmail.com <ma...@gmail.com>> wrote:
> I also experience the same, as per the documentation *withMaxReadTime* and *withMaxNumRecords* are mainly used for Demo purposes, so i guess is beyond the scope of the current KafkaIO to behave as Bounded with offset management or just something is missing in the current implementation (Watermarking).
> 
> 
> 
> On Wed, Jan 9, 2019 at 2:28 PM André Missaglia <andre.missaglia@arquivei.com.br <ma...@arquivei.com.br>> wrote:
> Hello everyone,
> 
> I need to do some batch processing that uses messages in a Kafka topic. So I tried the "withMaxReadTime" KafkaIO setting:
> 
> ---
> val properties = new Properties()
> properties.setProperty("bootstrap.servers", "...")
> properties.setProperty("group.id <http://group.id/>", "mygroup")
> properties.setProperty("sasl.jaas.config", "...")
> properties.setProperty("security.protocol", "SASL_PLAINTEXT")
> properties.setProperty("sasl.mechanism", "SCRAM-SHA-256")
> properties.setProperty("enable.auto.commit", "false")
> 
> sc.customInput("Read From Kafka",
>   KafkaIO
>     .read[String, String]()
>     .withTopic("mytopic")
>     .withKeyDeserializer(classOf[StringDeserializer])
>     .withValueDeserializer(classOf[StringDeserializer])
>     .updateConsumerProperties(properties)
>     .withMaxReadTime(Duration.standardSeconds(20))
>     .withMaxNumRecords(1000000)
>     .commitOffsetsInFinalize()
>     .withoutMetadata()
> )
> .count.debug() // prints something between 10000 and 20000
> ---
> I can see that it was able to read the messages and process them. But in the end, no offset was commited:
> 
> TOPIC                                        PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
> mytopic                                         0          0               3094751         3094751         -               -               -
> 
> But it is a strange behavior: sometimes it commits the offset, sometimes not. I'm not sure if it is a bug, or I'm using the wrong configs.
> 
> Has anyone used Bounded KafkaIO before? is there anything I can do?
> 
> Best Regards,
> 
> -- 
> André Badawi Missaglia
> Data Engineer
> (16) 3509-5515 | www.arquivei.com.br <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura>
>  <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura>
>  <https://arquivei.com.br/blog/google-seleciona-arquivei/?utm_campaign=assinatura-email-launchpad&utm_content=assinatura-launchpad>
>  <https://www.facebook.com/arquivei>  <https://www.linkedin.com/company/arquivei>  <https://www.youtube.com/watch?v=KJFrh8h4Zds&yt%3Acc=on>
> 
> -- 
> 
> JC 
> 
> 
> 
> -- 
> 
> JC 
> 
> 
> 
> -- 
> André Badawi Missaglia
> Data Engineer
> (16) 3509-5515 | www.arquivei.com.br <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura>
>  <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura>
>  <https://arquivei.com.br/blog/google-seleciona-arquivei/?utm_campaign=assinatura-email-launchpad&utm_content=assinatura-launchpad>
>  <https://www.facebook.com/arquivei>  <https://www.linkedin.com/company/arquivei>  <https://www.youtube.com/watch?v=KJFrh8h4Zds&yt%3Acc=on>

Re: KafkaIO not commiting offsets when using withMaxReadTime

Posted by Raghu Angadi <an...@gmail.com>.
Oh, the generic bounded source wrapper over an unbounded source does not
seem to call finalize when it is done with a split. I think it should.

Could you file a bug for the wrapper?
Mean while, this check could be added sanity checks in
KafkaIO.Read.expand().



On Wed, Jan 9, 2019 at 10:37 AM André Missaglia <
andre.missaglia@arquivei.com.br> wrote:

> Hi Juan,
>
> After researching a bit, I found this issue, which is open since 2017:
> https://issues.apache.org/jira/browse/BEAM-2185
>
> I guess KafkaIO isn't intended to provide a bounded source. Maybe I should
> write my own code that fetches messages from kafka, even if it means giving
> up on some processing guarantees from beam...
>
>
> Em qua, 9 de jan de 2019 às 14:24, Juan Carlos Garcia <jc...@gmail.com>
> escreveu:
>
>> Just for you to have a look where this happen:
>>
>>
>> https://github.com/apache/beam/blob/dffe2c1a2bd95f78869b266d3e1ea3f8ad8c323d/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L584
>>
>> Cheers
>>
>> On Wed, Jan 9, 2019 at 5:09 PM Juan Carlos Garcia <jc...@gmail.com>
>> wrote:
>>
>>> I also experience the same, as per the documentation **withMaxReadTime**
>>> and **withMaxNumRecords** are mainly used for Demo purposes, so i guess
>>> is beyond the scope of the current KafkaIO to behave as Bounded with offset
>>> management or just something is missing in the current implementation
>>> (Watermarking).
>>>
>>>
>>>
>>> On Wed, Jan 9, 2019 at 2:28 PM André Missaglia <
>>> andre.missaglia@arquivei.com.br> wrote:
>>>
>>>> Hello everyone,
>>>>
>>>> I need to do some batch processing that uses messages in a Kafka topic.
>>>> So I tried the "withMaxReadTime" KafkaIO setting:
>>>>
>>>> ---
>>>> val properties = new Properties()
>>>> properties.setProperty("bootstrap.servers", "...")
>>>> properties.setProperty("group.id", "mygroup")
>>>> properties.setProperty("sasl.jaas.config", "...")
>>>> properties.setProperty("security.protocol", "SASL_PLAINTEXT")
>>>> properties.setProperty("sasl.mechanism", "SCRAM-SHA-256")
>>>> properties.setProperty("enable.auto.commit", "false")
>>>>
>>>> sc.customInput("Read From Kafka",
>>>>   KafkaIO
>>>>     .read[String, String]()
>>>>     .withTopic("mytopic")
>>>>     .withKeyDeserializer(classOf[StringDeserializer])
>>>>     .withValueDeserializer(classOf[StringDeserializer])
>>>>     .updateConsumerProperties(properties)
>>>>     .withMaxReadTime(Duration.standardSeconds(20))
>>>>     .withMaxNumRecords(1000000)
>>>>     .commitOffsetsInFinalize()
>>>>     .withoutMetadata()
>>>> )
>>>> .count.debug() // prints something between 10000 and 20000
>>>> ---
>>>> I can see that it was able to read the messages and process them. But
>>>> in the end, no offset was commited:
>>>>
>>>> TOPIC                                        PARTITION  CURRENT-OFFSET
>>>> LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
>>>> mytopic                                         0
>>>> 0               3094751         3094751         -
>>>> -               -
>>>>
>>>> But it is a strange behavior: sometimes it commits the offset,
>>>> sometimes not. I'm not sure if it is a bug, or I'm using the wrong configs.
>>>>
>>>> Has anyone used Bounded KafkaIO before? is there anything I can do?
>>>>
>>>> Best Regards,
>>>>
>>>> --
>>>> *André Badawi Missaglia*
>>>> Data Engineer
>>>> (16) 3509-5515 *|* www.arquivei.com.br
>>>> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura>
>>>> [image: Arquivei.com.br – Inteligência em Notas Fiscais]
>>>> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura>
>>>> [image: Google seleciona Arquivei para imersão e mentoria no Vale do
>>>> Silício]
>>>> <https://arquivei.com.br/blog/google-seleciona-arquivei/?utm_campaign=assinatura-email-launchpad&utm_content=assinatura-launchpad>
>>>> <https://www.facebook.com/arquivei>
>>>> <https://www.linkedin.com/company/arquivei>
>>>> <https://www.youtube.com/watch?v=KJFrh8h4Zds&yt%3Acc=on>
>>>>
>>>
>>>
>>> --
>>>
>>> JC
>>>
>>>
>>
>> --
>>
>> JC
>>
>>
>
> --
> *André Badawi Missaglia*
> Data Engineer
> (16) 3509-5515 *|* www.arquivei.com.br
> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura>
> [image: Arquivei.com.br – Inteligência em Notas Fiscais]
> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura>
> [image: Google seleciona Arquivei para imersão e mentoria no Vale do
> Silício]
> <https://arquivei.com.br/blog/google-seleciona-arquivei/?utm_campaign=assinatura-email-launchpad&utm_content=assinatura-launchpad>
> <https://www.facebook.com/arquivei>
> <https://www.linkedin.com/company/arquivei>
> <https://www.youtube.com/watch?v=KJFrh8h4Zds&yt%3Acc=on>
>

Re: KafkaIO not commiting offsets when using withMaxReadTime

Posted by André Missaglia <an...@arquivei.com.br>.
Hi Juan,

After researching a bit, I found this issue, which is open since 2017:
https://issues.apache.org/jira/browse/BEAM-2185

I guess KafkaIO isn't intended to provide a bounded source. Maybe I should
write my own code that fetches messages from kafka, even if it means giving
up on some processing guarantees from beam...


Em qua, 9 de jan de 2019 às 14:24, Juan Carlos Garcia <jc...@gmail.com>
escreveu:

> Just for you to have a look where this happen:
>
>
> https://github.com/apache/beam/blob/dffe2c1a2bd95f78869b266d3e1ea3f8ad8c323d/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L584
>
> Cheers
>
> On Wed, Jan 9, 2019 at 5:09 PM Juan Carlos Garcia <jc...@gmail.com>
> wrote:
>
>> I also experience the same, as per the documentation **withMaxReadTime**
>> and **withMaxNumRecords** are mainly used for Demo purposes, so i guess
>> is beyond the scope of the current KafkaIO to behave as Bounded with offset
>> management or just something is missing in the current implementation
>> (Watermarking).
>>
>>
>>
>> On Wed, Jan 9, 2019 at 2:28 PM André Missaglia <
>> andre.missaglia@arquivei.com.br> wrote:
>>
>>> Hello everyone,
>>>
>>> I need to do some batch processing that uses messages in a Kafka topic.
>>> So I tried the "withMaxReadTime" KafkaIO setting:
>>>
>>> ---
>>> val properties = new Properties()
>>> properties.setProperty("bootstrap.servers", "...")
>>> properties.setProperty("group.id", "mygroup")
>>> properties.setProperty("sasl.jaas.config", "...")
>>> properties.setProperty("security.protocol", "SASL_PLAINTEXT")
>>> properties.setProperty("sasl.mechanism", "SCRAM-SHA-256")
>>> properties.setProperty("enable.auto.commit", "false")
>>>
>>> sc.customInput("Read From Kafka",
>>>   KafkaIO
>>>     .read[String, String]()
>>>     .withTopic("mytopic")
>>>     .withKeyDeserializer(classOf[StringDeserializer])
>>>     .withValueDeserializer(classOf[StringDeserializer])
>>>     .updateConsumerProperties(properties)
>>>     .withMaxReadTime(Duration.standardSeconds(20))
>>>     .withMaxNumRecords(1000000)
>>>     .commitOffsetsInFinalize()
>>>     .withoutMetadata()
>>> )
>>> .count.debug() // prints something between 10000 and 20000
>>> ---
>>> I can see that it was able to read the messages and process them. But in
>>> the end, no offset was commited:
>>>
>>> TOPIC                                        PARTITION  CURRENT-OFFSET
>>> LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
>>> mytopic                                         0
>>> 0               3094751         3094751         -
>>> -               -
>>>
>>> But it is a strange behavior: sometimes it commits the offset, sometimes
>>> not. I'm not sure if it is a bug, or I'm using the wrong configs.
>>>
>>> Has anyone used Bounded KafkaIO before? is there anything I can do?
>>>
>>> Best Regards,
>>>
>>> --
>>> *André Badawi Missaglia*
>>> Data Engineer
>>> (16) 3509-5515 *|* www.arquivei.com.br
>>> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura>
>>> [image: Arquivei.com.br – Inteligência em Notas Fiscais]
>>> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura>
>>> [image: Google seleciona Arquivei para imersão e mentoria no Vale do
>>> Silício]
>>> <https://arquivei.com.br/blog/google-seleciona-arquivei/?utm_campaign=assinatura-email-launchpad&utm_content=assinatura-launchpad>
>>> <https://www.facebook.com/arquivei>
>>> <https://www.linkedin.com/company/arquivei>
>>> <https://www.youtube.com/watch?v=KJFrh8h4Zds&yt%3Acc=on>
>>>
>>
>>
>> --
>>
>> JC
>>
>>
>
> --
>
> JC
>
>

-- 
*André Badawi Missaglia*
Data Engineer
(16) 3509-5515 *|* www.arquivei.com.br
<https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura>
[image: Arquivei.com.br – Inteligência em Notas Fiscais]
<https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura>
[image: Google seleciona Arquivei para imersão e mentoria no Vale do
Silício]
<https://arquivei.com.br/blog/google-seleciona-arquivei/?utm_campaign=assinatura-email-launchpad&utm_content=assinatura-launchpad>
<https://www.facebook.com/arquivei>
<https://www.linkedin.com/company/arquivei>
<https://www.youtube.com/watch?v=KJFrh8h4Zds&yt%3Acc=on>

Re: KafkaIO not commiting offsets when using withMaxReadTime

Posted by Juan Carlos Garcia <jc...@gmail.com>.
Just for you to have a look where this happen:

https://github.com/apache/beam/blob/dffe2c1a2bd95f78869b266d3e1ea3f8ad8c323d/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L584

Cheers

On Wed, Jan 9, 2019 at 5:09 PM Juan Carlos Garcia <jc...@gmail.com>
wrote:

> I also experience the same, as per the documentation **withMaxReadTime**
> and **withMaxNumRecords** are mainly used for Demo purposes, so i guess
> is beyond the scope of the current KafkaIO to behave as Bounded with offset
> management or just something is missing in the current implementation
> (Watermarking).
>
>
>
> On Wed, Jan 9, 2019 at 2:28 PM André Missaglia <
> andre.missaglia@arquivei.com.br> wrote:
>
>> Hello everyone,
>>
>> I need to do some batch processing that uses messages in a Kafka topic.
>> So I tried the "withMaxReadTime" KafkaIO setting:
>>
>> ---
>> val properties = new Properties()
>> properties.setProperty("bootstrap.servers", "...")
>> properties.setProperty("group.id", "mygroup")
>> properties.setProperty("sasl.jaas.config", "...")
>> properties.setProperty("security.protocol", "SASL_PLAINTEXT")
>> properties.setProperty("sasl.mechanism", "SCRAM-SHA-256")
>> properties.setProperty("enable.auto.commit", "false")
>>
>> sc.customInput("Read From Kafka",
>>   KafkaIO
>>     .read[String, String]()
>>     .withTopic("mytopic")
>>     .withKeyDeserializer(classOf[StringDeserializer])
>>     .withValueDeserializer(classOf[StringDeserializer])
>>     .updateConsumerProperties(properties)
>>     .withMaxReadTime(Duration.standardSeconds(20))
>>     .withMaxNumRecords(1000000)
>>     .commitOffsetsInFinalize()
>>     .withoutMetadata()
>> )
>> .count.debug() // prints something between 10000 and 20000
>> ---
>> I can see that it was able to read the messages and process them. But in
>> the end, no offset was commited:
>>
>> TOPIC                                        PARTITION  CURRENT-OFFSET
>> LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
>> mytopic                                         0
>> 0               3094751         3094751         -
>> -               -
>>
>> But it is a strange behavior: sometimes it commits the offset, sometimes
>> not. I'm not sure if it is a bug, or I'm using the wrong configs.
>>
>> Has anyone used Bounded KafkaIO before? is there anything I can do?
>>
>> Best Regards,
>>
>> --
>> *André Badawi Missaglia*
>> Data Engineer
>> (16) 3509-5515 *|* www.arquivei.com.br
>> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura>
>> [image: Arquivei.com.br – Inteligência em Notas Fiscais]
>> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura>
>> [image: Google seleciona Arquivei para imersão e mentoria no Vale do
>> Silício]
>> <https://arquivei.com.br/blog/google-seleciona-arquivei/?utm_campaign=assinatura-email-launchpad&utm_content=assinatura-launchpad>
>> <https://www.facebook.com/arquivei>
>> <https://www.linkedin.com/company/arquivei>
>> <https://www.youtube.com/watch?v=KJFrh8h4Zds&yt%3Acc=on>
>>
>
>
> --
>
> JC
>
>

-- 

JC

Re: KafkaIO not commiting offsets when using withMaxReadTime

Posted by Juan Carlos Garcia <jc...@gmail.com>.
I also experience the same, as per the documentation **withMaxReadTime**
and **withMaxNumRecords** are mainly used for Demo purposes, so i guess is
beyond the scope of the current KafkaIO to behave as Bounded with offset
management or just something is missing in the current implementation
(Watermarking).



On Wed, Jan 9, 2019 at 2:28 PM André Missaglia <
andre.missaglia@arquivei.com.br> wrote:

> Hello everyone,
>
> I need to do some batch processing that uses messages in a Kafka topic. So
> I tried the "withMaxReadTime" KafkaIO setting:
>
> ---
> val properties = new Properties()
> properties.setProperty("bootstrap.servers", "...")
> properties.setProperty("group.id", "mygroup")
> properties.setProperty("sasl.jaas.config", "...")
> properties.setProperty("security.protocol", "SASL_PLAINTEXT")
> properties.setProperty("sasl.mechanism", "SCRAM-SHA-256")
> properties.setProperty("enable.auto.commit", "false")
>
> sc.customInput("Read From Kafka",
>   KafkaIO
>     .read[String, String]()
>     .withTopic("mytopic")
>     .withKeyDeserializer(classOf[StringDeserializer])
>     .withValueDeserializer(classOf[StringDeserializer])
>     .updateConsumerProperties(properties)
>     .withMaxReadTime(Duration.standardSeconds(20))
>     .withMaxNumRecords(1000000)
>     .commitOffsetsInFinalize()
>     .withoutMetadata()
> )
> .count.debug() // prints something between 10000 and 20000
> ---
> I can see that it was able to read the messages and process them. But in
> the end, no offset was commited:
>
> TOPIC                                        PARTITION  CURRENT-OFFSET
> LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
> mytopic                                         0          0
> 3094751         3094751         -               -               -
>
> But it is a strange behavior: sometimes it commits the offset, sometimes
> not. I'm not sure if it is a bug, or I'm using the wrong configs.
>
> Has anyone used Bounded KafkaIO before? is there anything I can do?
>
> Best Regards,
>
> --
> *André Badawi Missaglia*
> Data Engineer
> (16) 3509-5515 *|* www.arquivei.com.br
> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura>
> [image: Arquivei.com.br – Inteligência em Notas Fiscais]
> <https://arquivei.com.br/?utm_campaign=assinatura-email&utm_content=assinatura>
> [image: Google seleciona Arquivei para imersão e mentoria no Vale do
> Silício]
> <https://arquivei.com.br/blog/google-seleciona-arquivei/?utm_campaign=assinatura-email-launchpad&utm_content=assinatura-launchpad>
> <https://www.facebook.com/arquivei>
> <https://www.linkedin.com/company/arquivei>
> <https://www.youtube.com/watch?v=KJFrh8h4Zds&yt%3Acc=on>
>


-- 

JC