You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Praveen K Viswanathan <ha...@gmail.com> on 2020/06/24 20:10:01 UTC

Unable to commit offset using KafkaIO

Hello Everyone,

I am having issues in committing offsets using KafkaIO. My underlying
streaming is OSS (Oracle Streaming Service) which is a Kafka-Compatible
one. When I try to commit offset using
"ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true" I am getting below error.

I am having both my Kakfa-Compatible OSS and Beam job running on my local
machine. My Topic has only one partition and with just one message. The
expectation is when I run my beam job for the first time, KafkaIO should
read and commit the offset. So if I terminate the job and re-run again it
should not pull any message until I publish any new messages into the input
stream but in this case it keeps on pulling the same message every time.
Any pointers would be appreciated.

*Error*:
12:31:34.237 INFO : Kafka version: 2.5.0 12:31:34.237 INFO : Kafka
commitId: 66563e712b0b9f84 12:31:34.237 INFO : Kafka startTimeMs:
1593027094233 12:31:34.256 INFO : [Producer clientId=producer-1] Cluster
ID: OSS 12:31:34.760 INFO : [Consumer
clientId=consumer-Reader-0_offset_consumer_43668338_enrichment-consumer-group-3,
groupId=Reader-0_offset_consumer_43668338_enrichment-consumer-group]
Seeking to LATEST offset of partition raw-0 12:31:34.766 INFO : [Consumer
clientId=consumer-Reader-0_offset_consumer_43668338_enrichment-consumer-group-3,
groupId=Reader-0_offset_consumer_43668338_enrichment-consumer-group]
Resetting offset for partition raw-0 to offset 0. 12:31:35.759 INFO :
[Consumer
clientId=consumer-Reader-0_offset_consumer_43668338_enrichment-consumer-group-3,
groupId=Reader-0_offset_consumer_43668338_enrichment-consumer-group]
Seeking to LATEST offset of partition raw-0 12:31:35.770 INFO : [Consumer
clientId=consumer-Reader-0_offset_consumer_43668338_enrichment-consumer-group-3,
groupId=Reader-0_offset_consumer_43668338_enrichment-consumer-group]
Resetting offset for partition raw-0 to offset 0. 12:31:36.762 INFO :
[Consumer
clientId=consumer-Reader-0_offset_consumer_43668338_enrichment-consumer-group-3,
groupId=Reader-0_offset_consumer_43668338_enrichment-consumer-group]
Seeking to LATEST offset of partition raw-0 12:31:36.767 INFO : [Consumer
clientId=consumer-Reader-0_offset_consumer_43668338_enrichment-consumer-group-3,
groupId=Reader-0_offset_consumer_43668338_enrichment-consumer-group]
Resetting offset for partition raw-0 to offset 0. 12:31:36.796 ERROR :
[Consumer clientId=consumer-enrichment-consumer-group-2,
groupId=enrichment-consumer-group] Offset commit failed on partition raw-0
at offset 1: Specified group generation id is not valid. 12:31:36.797 WARN
: [Consumer clientId=consumer-enrichment-consumer-group-2,
groupId=enrichment-consumer-group] Asynchronous auto-commit of offsets
{raw-0=OffsetAndMetadata{offset=1, leaderEpoch=null, metadata=''}} failed:
Commit cannot be completed since the group has already rebalanced and
assigned the partitions to another member. This means that the time between
subsequent calls to poll() was longer than the configured
max.poll.interval.ms, which typically implies that the poll loop is
spending too much time message processing. You can address this either by
increasing max.poll.interval.ms or by reducing the maximum size of batches
returned in poll() with max.poll.records. 12:31:37.762 INFO : [Consumer
clientId=consumer-Reader-0_offset_consumer_43668338_enrichment-consumer-group-3,
groupId=Reader-0_offs

*ConsumerConfig values:*
allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.offset.reset = latest
bootstrap.servers = [localhost:19092]
check.crcs = true
client.dns.lookup = default
client.id =
client.rack =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = true
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = enrichment-consumer-group
group.instance.id = null
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class
org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class
org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 524288
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = [hidden]
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = PLAIN
security.protocol = SASL_PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.2
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class
org.apache.kafka.common.serialization.ByteArrayDeserializer

*Pipeline:*
p.apply("Read from raw", KafkaIO. <String, String>read()
                .withBootstrapServers(localhost)
                .withTopic(rawTopic)
                .withConsumerConfigUpdates(ImmutableMap.of(
                        ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest",
                        ConsumerConfig.GROUP_ID_CONFIG,
"enrichment-consumer-group",
                        "security.protocol", "SASL_PLAINTEXT",
                        "sasl.mechanism", "PLAIN",
                        ""sasl.jaas.config",
"org.apache.kafka.common.security.plain.PlainLoginModule required
username=\"user\" password=\"pwd\";";
                ))
                .withConsumerConfigUpdates(ImmutableMap.of(
                        ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true
                ))
-- 
Thanks,
Praveen K Viswanathan