You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Randall Hauch (JIRA)" <ji...@apache.org> on 2019/05/15 19:47:00 UTC

[jira] [Created] (KAFKA-8370) Kafka Connect should check for existence of internal topics before attempting to create them

Randall Hauch created KAFKA-8370:
------------------------------------

             Summary: Kafka Connect should check for existence of internal topics before attempting to create them
                 Key: KAFKA-8370
                 URL: https://issues.apache.org/jira/browse/KAFKA-8370
             Project: Kafka
          Issue Type: Bug
          Components: KafkaConnect
    Affects Versions: 0.11.0.0
            Reporter: Randall Hauch
            Assignee: Randall Hauch


The Connect worker doesn't current check for the existence of the internal topics, and instead is issuing a CreateTopic request and handling a TopicExistsException. However, this can cause problems when the number of brokers is fewer than the replication factor, *even if the topic already exists* and the partitions of those topics all remain available on the remaining brokers.

One problem of the current approach is that the broker checks the requested replication factor before checking for the existence of the topic, resulting in unexpected exceptions when the topic does exist:

{noformat}
connect      | [2019-05-14 19:24:25,166] ERROR Uncaught exception in herder work thread, exiting:  (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
connect      | org.apache.kafka.connect.errors.ConnectException: Error while attempting to create/find topic(s) 'connect-offsets'
connect      | 	at org.apache.kafka.connect.util.TopicAdmin.createTopics(TopicAdmin.java:255)
connect      | 	at org.apache.kafka.connect.storage.KafkaOffsetBackingStore$1.run(KafkaOffsetBackingStore.java:99)
connect      | 	at org.apache.kafka.connect.util.KafkaBasedLog.start(KafkaBasedLog.java:127)
connect      | 	at org.apache.kafka.connect.storage.KafkaOffsetBackingStore.start(KafkaOffsetBackingStore.java:109)
connect      | 	at org.apache.kafka.connect.runtime.Worker.start(Worker.java:164)
connect      | 	at org.apache.kafka.connect.runtime.AbstractHerder.startServices(AbstractHerder.java:114)
connect      | 	at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:214)
connect      | 	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
connect      | 	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
connect      | 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
connect      | 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
connect      | 	at java.lang.Thread.run(Thread.java:748)
connect      | Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 3 larger than available brokers: 2.
connect      | 	at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
connect      | 	at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
connect      | 	at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
connect      | 	at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
connect      | 	at org.apache.kafka.connect.util.TopicAdmin.createTopics(TopicAdmin.java:228)
connect      | 	... 11 more
connect      | Caused by: org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 3 larger than available brokers: 2.
connect      | [2019-05-14 19:24:25,168] INFO Kafka Connect stopping (org.apache.kafka.connect.runtime.Connect)
{noformat}

Instead of always issuing a CreateTopic request, the worker's admin client should first check whether the topic exists, and if not *then* attempt to create the topic.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)