You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Qingsheng Ren (Jira)" <ji...@apache.org> on 2022/10/11 09:29:00 UTC

[jira] [Closed] (FLINK-27962) KafkaSourceReader fails to commit consumer offsets for checkpoints

     [ https://issues.apache.org/jira/browse/FLINK-27962?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Qingsheng Ren closed FLINK-27962.
---------------------------------
    Fix Version/s: 1.16.0
       Resolution: Duplicate

> KafkaSourceReader fails to commit consumer offsets for checkpoints
> ------------------------------------------------------------------
>
>                 Key: FLINK-27962
>                 URL: https://issues.apache.org/jira/browse/FLINK-27962
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>    Affects Versions: 1.14.4, 1.15.0
>            Reporter: Dmytro
>            Priority: Major
>             Fix For: 1.16.0
>
>         Attachments: Screen Shot 2022-09-20 at 5.18.04 PM.png
>
>
> The KafkaSourceReader works well for many hours, then fails and re-connects successfully, then continues to work some time. After the first three failures it hangs on "Offset commit failed" and never connected again. Restarting the Flink job does help and it works until the next "3 times fail".
> I am aware about [the note|https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/kafka/#consumer-offset-committing] that Kafka source does NOT rely on committed offsets for fault tolerance. Committing offset is only for exposing the progress of consumer and consuming group for monitoring.
> I agree if the failures are only periodic, but I would argue complete failures are unacceptable
> *Failed to commit consumer offsets for checkpoint:*
> {code:java}
> Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: The coordinator is not available.
> 2022-06-06 14:19:52,297 WARN  org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed to commit consumer offsets for checkpoint 464521
> org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit failed with a retriable exception. You should retry committing the latest consumed offsets.
> Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: The coordinator is not available.
> 2022-06-06 14:20:02,297 WARN  org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed to commit consumer offsets for checkpoint 464522
> org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit failed with a retriable exception. You should retry committing the latest consumed offsets.
> Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: The coordinator is not available.
> 2022-06-06 14:20:02,297 WARN  org.apache.flink.connector.kafka.source.reader.KafkaSourceReader [] - Failed to commit consumer offsets for checkpoint 464523
> org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit failed with a retriable exception. You should retry committing the latest consumed offsets
> ..... fails permanently until the job restart
>  {code}
> *Consumer Config:*
> {code:java}
> allow.auto.create.topics = true
> auto.commit.interval.ms = 5000
> auto.offset.reset = none
> bootstrap.servers = [test.host.net:9093]
> check.crcs = true
> client.dns.lookup = use_all_dns_ips
> client.id = test-client-id
> client.rack =
> connections.max.idle.ms = 180000
> default.api.timeout.ms = 60000
> enable.auto.commit = false
> exclude.internal.topics = true
> fetch.max.bytes = 52428800
> fetch.max.wait.ms = 500
> fetch.min.bytes = 1
> group.id = test-group-id
> group.instance.id = null
> heartbeat.interval.ms = 3000
> interceptor.classes = []
> internal.leave.group.on.close = true
> internal.throw.on.fetch.stable.offset.unsupported = false
> isolation.level = read_uncommitted
> key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
> max.partition.fetch.bytes = 1048576
> max.poll.interval.ms = 300000
> max.poll.records = 500
> metadata.max.age.ms = 180000
> metric.reporters = []
> metrics.num.samples = 2
> metrics.recording.level = INFO
> metrics.sample.window.ms = 30000
> partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
> receive.buffer.bytes = 65536
> reconnect.backoff.max.ms = 1000
> reconnect.backoff.ms = 50
> request.timeout.ms = 60000
> retry.backoff.ms = 100
> sasl.client.callback.handler.class = null
> sasl.jaas.config = [hidden]
> sasl.kerberos.kinit.cmd = /usr/bin/kinit
> sasl.kerberos.min.time.before.relogin = 60000
> sasl.kerberos.service.name = null
> sasl.kerberos.ticket.renew.jitter = 0.05
> sasl.kerberos.ticket.renew.window.factor = 0.8
> sasl.login.callback.handler.class = class com.test.kafka.security.AzureAuthenticateCallbackHandler
> sasl.login.class = null
> sasl.login.refresh.buffer.seconds = 300
> sasl.login.refresh.min.period.seconds = 60
> sasl.login.refresh.window.factor = 0.8
> sasl.login.refresh.window.jitter = 0.05
> sasl.mechanism = OAUTHBEARER
> security.protocol = SASL_SSL
> security.providers = null
> send.buffer.bytes = 131072
> session.timeout.ms = 30000
> socket.connection.setup.timeout.max.ms = 30000
> socket.connection.setup.timeout.ms = 10000
> ssl.cipher.suites = null
> ssl.enabled.protocols = [TLSv1.2]
> ssl.endpoint.identification.algorithm = https
> ssl.engine.factory.class = null
> ssl.key.password = null
> ssl.keymanager.algorithm = SunX509
> ssl.keystore.certificate.chain = null
> ssl.keystore.key = null
> ssl.keystore.location = null
> ssl.keystore.password = null
> ssl.keystore.type = JKS
> ssl.protocol = TLSv1.2
> ssl.provider = null
> ssl.secure.random.implementation = null
> ssl.trustmanager.algorithm = PKIX
> ssl.truststore.certificates = null
> ssl.truststore.location = null
> ssl.truststore.password = null
> ssl.truststore.type = JKS
> value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)