You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Chris Egerton (Jira)" <ji...@apache.org> on 2019/10/08 17:25:00 UTC

[jira] [Commented] (KAFKA-8370) Kafka Connect should check for existence of internal topics before attempting to create them

    [ https://issues.apache.org/jira/browse/KAFKA-8370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16947064#comment-16947064 ] 

Chris Egerton commented on KAFKA-8370:
--------------------------------------

[~rhauch] is this still an issue, and if so, do we still want to implement it in the way suggested on this ticket (checking if the topic exists before attempting to create it)?

Checking to see if the topic exists before creating it would prevent some unnecessary topic creation requests (in fact, probably most), but it wouldn't prevent the possible race condition where two workers perform the check at the same time, see that the topic doesn't exist, and then both attempt to create the topic. With fixes like https://issues.apache.org/jira/browse/KAFKA-6250 and https://issues.apache.org/jira/browse/KAFKA-7633, it's unclear if sending unnecessary topic creation requests is a serious issue anymore since Connect should still function properly if they fail.

If we don't aim to implement a fix for this issue, then the ticket should be closed. Otherwise, it'd be good to clarify what the impact of this issue is after taking into account the aforementioned tickets which lessened the impact of failed topic create requests.

> Kafka Connect should check for existence of internal topics before attempting to create them
> --------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-8370
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8370
>             Project: Kafka
>          Issue Type: Bug
>          Components: KafkaConnect
>    Affects Versions: 0.11.0.0
>            Reporter: Randall Hauch
>            Assignee: Randall Hauch
>            Priority: Major
>
> The Connect worker doesn't current check for the existence of the internal topics, and instead is issuing a CreateTopic request and handling a TopicExistsException. However, this can cause problems when the number of brokers is fewer than the replication factor, *even if the topic already exists* and the partitions of those topics all remain available on the remaining brokers.
> One problem of the current approach is that the broker checks the requested replication factor before checking for the existence of the topic, resulting in unexpected exceptions when the topic does exist:
> {noformat}
> connect      | [2019-05-14 19:24:25,166] ERROR Uncaught exception in herder work thread, exiting:  (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
> connect      | org.apache.kafka.connect.errors.ConnectException: Error while attempting to create/find topic(s) 'connect-offsets'
> connect      | 	at org.apache.kafka.connect.util.TopicAdmin.createTopics(TopicAdmin.java:255)
> connect      | 	at org.apache.kafka.connect.storage.KafkaOffsetBackingStore$1.run(KafkaOffsetBackingStore.java:99)
> connect      | 	at org.apache.kafka.connect.util.KafkaBasedLog.start(KafkaBasedLog.java:127)
> connect      | 	at org.apache.kafka.connect.storage.KafkaOffsetBackingStore.start(KafkaOffsetBackingStore.java:109)
> connect      | 	at org.apache.kafka.connect.runtime.Worker.start(Worker.java:164)
> connect      | 	at org.apache.kafka.connect.runtime.AbstractHerder.startServices(AbstractHerder.java:114)
> connect      | 	at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:214)
> connect      | 	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> connect      | 	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> connect      | 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> connect      | 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> connect      | 	at java.lang.Thread.run(Thread.java:748)
> connect      | Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 3 larger than available brokers: 2.
> connect      | 	at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
> connect      | 	at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
> connect      | 	at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
> connect      | 	at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
> connect      | 	at org.apache.kafka.connect.util.TopicAdmin.createTopics(TopicAdmin.java:228)
> connect      | 	... 11 more
> connect      | Caused by: org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 3 larger than available brokers: 2.
> connect      | [2019-05-14 19:24:25,168] INFO Kafka Connect stopping (org.apache.kafka.connect.runtime.Connect)
> {noformat}
> Instead of always issuing a CreateTopic request, the worker's admin client should first check whether the topic exists, and if not *then* attempt to create the topic.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)