You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Randall Hauch (Jira)" <ji...@apache.org> on 2020/10/16 23:10:00 UTC

[jira] [Resolved] (KAFKA-8370) Kafka Connect should check for existence of internal topics before attempting to create them

     [ https://issues.apache.org/jira/browse/KAFKA-8370?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Randall Hauch resolved KAFKA-8370.
----------------------------------
    Resolution: Won't Fix

As mentioned above, we never can avoid the race condition of two Connect workers trying to create the same topic, and it's imperative that the create-topic request is handled atomically and throws TopicExistsException if the create-topic request fails because the topic already exists. KAFKA-8875 is now ensuring that happens, and Connect already properly handles the case when a create-topic request fails with TopicExistsException

The conclusion: there is no need for the check before creating the topic, because that is not guaranteed to be sufficient anyway.

> Kafka Connect should check for existence of internal topics before attempting to create them
> --------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-8370
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8370
>             Project: Kafka
>          Issue Type: Bug
>          Components: KafkaConnect
>    Affects Versions: 0.11.0.0
>            Reporter: Randall Hauch
>            Assignee: Randall Hauch
>            Priority: Major
>
> The Connect worker doesn't current check for the existence of the internal topics, and instead is issuing a CreateTopic request and handling a TopicExistsException. However, this can cause problems when the number of brokers is fewer than the replication factor, *even if the topic already exists* and the partitions of those topics all remain available on the remaining brokers.
> One problem of the current approach is that the broker checks the requested replication factor before checking for the existence of the topic, resulting in unexpected exceptions when the topic does exist:
> {noformat}
> connect      | [2019-05-14 19:24:25,166] ERROR Uncaught exception in herder work thread, exiting:  (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
> connect      | org.apache.kafka.connect.errors.ConnectException: Error while attempting to create/find topic(s) 'connect-offsets'
> connect      | 	at org.apache.kafka.connect.util.TopicAdmin.createTopics(TopicAdmin.java:255)
> connect      | 	at org.apache.kafka.connect.storage.KafkaOffsetBackingStore$1.run(KafkaOffsetBackingStore.java:99)
> connect      | 	at org.apache.kafka.connect.util.KafkaBasedLog.start(KafkaBasedLog.java:127)
> connect      | 	at org.apache.kafka.connect.storage.KafkaOffsetBackingStore.start(KafkaOffsetBackingStore.java:109)
> connect      | 	at org.apache.kafka.connect.runtime.Worker.start(Worker.java:164)
> connect      | 	at org.apache.kafka.connect.runtime.AbstractHerder.startServices(AbstractHerder.java:114)
> connect      | 	at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:214)
> connect      | 	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> connect      | 	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> connect      | 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> connect      | 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> connect      | 	at java.lang.Thread.run(Thread.java:748)
> connect      | Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 3 larger than available brokers: 2.
> connect      | 	at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
> connect      | 	at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
> connect      | 	at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
> connect      | 	at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
> connect      | 	at org.apache.kafka.connect.util.TopicAdmin.createTopics(TopicAdmin.java:228)
> connect      | 	... 11 more
> connect      | Caused by: org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 3 larger than available brokers: 2.
> connect      | [2019-05-14 19:24:25,168] INFO Kafka Connect stopping (org.apache.kafka.connect.runtime.Connect)
> {noformat}
> Instead of always issuing a CreateTopic request, the worker's admin client should first check whether the topic exists, and if not *then* attempt to create the topic.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)