You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "ASF GitHub Bot (Jira)" <ji...@apache.org> on 2022/03/02 21:42:00 UTC

[jira] [Work logged] (BEAM-13310) KafkaIO SDF does not commit offsets but KafkaIO UnboundedSource does

     [ https://issues.apache.org/jira/browse/BEAM-13310?focusedWorklogId=735610&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-735610 ]

ASF GitHub Bot logged work on BEAM-13310:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 02/Mar/22 21:41
            Start Date: 02/Mar/22 21:41
    Worklog Time Spent: 10m 
      Work Description: lukecwik commented on pull request #16588:
URL: https://github.com/apache/beam/pull/16588#issuecomment-1057420303


   Does this fix the SDF KafkaIO commit issue?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 735610)
    Time Spent: 1.5h  (was: 1h 20m)

> KafkaIO SDF does not commit offsets but KafkaIO UnboundedSource does
> --------------------------------------------------------------------
>
>                 Key: BEAM-13310
>                 URL: https://issues.apache.org/jira/browse/BEAM-13310
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-kafka
>            Reporter: Luke Cwik
>            Assignee: John Casey
>            Priority: P2
>          Time Spent: 1.5h
>  Remaining Estimate: 0h
>
> When run using SDF the pipeline does not commit offsets but when run using the SDF UnboundedSourceWrapper via *use_deprecated_read* experiment the pipeline does. This implies that the UnboundedSource version is able to correctly commit offsets but the pure SDF does not.
> Sample code:
> {code:java}
>         final Pipeline p = Pipeline.create(options);
>         p.apply(
>             KafkaIO.<Long, String>read()
>                 .withBootstrapServers(options.getKafkaBroker())
>                 .withTopic(options.getTopic())
>                 .withConsumerConfigUpdates(
>                     Map.of(
>                         ConsumerConfig.GROUP_ID_CONFIG, options.getConsumerGroup(),
>                         CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL",
>                         SaslConfigs.SASL_KERBEROS_SERVICE_NAME, "kafka",
>                         SaslConfigs.SASL_JAAS_CONFIG, "com.sun.security.auth.module.GssLoginModule required initiate=true;"))
>                 .withKeyDeserializer(LongDeserializer.class)
>                 .withValueDeserializer(StringDeserializer.class)
>                 .commitOffsetsInFinalize()
>                 .withoutMetadata());
> {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)