You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Luke Cwik (Jira)" <ji...@apache.org> on 2021/11/23 18:10:00 UTC

[jira] [Updated] (BEAM-13310) KafkaIO SDF does not commit offsets but KafkaIO UnboundedSource does

     [ https://issues.apache.org/jira/browse/BEAM-13310?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Luke Cwik updated BEAM-13310:
-----------------------------
    Status: Open  (was: Triage Needed)

> KafkaIO SDF does not commit offsets but KafkaIO UnboundedSource does
> --------------------------------------------------------------------
>
>                 Key: BEAM-13310
>                 URL: https://issues.apache.org/jira/browse/BEAM-13310
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-kafka
>            Reporter: Luke Cwik
>            Priority: P2
>
> When run using SDF the pipeline does not commit offsets but when run using the SDF UnboundedSourceWrapper it works. This implies that the UnboundedSource version is able to correctly commit offsets but the pure SDF does not.
> Sample code:
> {code:java}
>         final Pipeline p = Pipeline.create(options);
>         p.apply(
>             KafkaIO.<Long, String>read()
>                 .withBootstrapServers(options.getKafkaBroker())
>                 .withTopic(options.getTopic())
>                 .withConsumerConfigUpdates(
>                     Map.of(
>                         ConsumerConfig.GROUP_ID_CONFIG, options.getConsumerGroup(),
>                         CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL",
>                         SaslConfigs.SASL_KERBEROS_SERVICE_NAME, "kafka",
>                         SaslConfigs.SASL_JAAS_CONFIG, "com.sun.security.auth.module.GssLoginModule required initiate=true;"))
>                 .withKeyDeserializer(LongDeserializer.class)
>                 .withValueDeserializer(StringDeserializer.class)
>                 .commitOffsetsInFinalize()
>                 .withoutMetadata());
> {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)