You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Boyang Chen (Jira)" <ji...@apache.org> on 2020/05/15 19:42:00 UTC

[jira] [Updated] (KAFKA-9999) Internal topic creation failure should be non-fatal and trigger explicit rebalance

     [ https://issues.apache.org/jira/browse/KAFKA-9999?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Boyang Chen updated KAFKA-9999:
-------------------------------
    Summary: Internal topic creation failure should be non-fatal and trigger explicit rebalance   (was: Topic description should be triggered after each failed topic creation iteration )

> Internal topic creation failure should be non-fatal and trigger explicit rebalance 
> -----------------------------------------------------------------------------------
>
>                 Key: KAFKA-9999
>                 URL: https://issues.apache.org/jira/browse/KAFKA-9999
>             Project: Kafka
>          Issue Type: Bug
>          Components: admin, streams
>    Affects Versions: 2.4.0
>            Reporter: Boyang Chen
>            Assignee: Boyang Chen
>            Priority: Major
>
> We spotted a case in system test failure where the topic already exists but the admin client still attempts to recreate it:
>  
> {code:java}
> [2020-05-14 09:56:40,120] INFO stream-thread [main] Could not create topic SmokeTest-KSTREAM-REDUCE-STATE-STORE-0000000020-changelog. Topic is probably marked for deletion (number of partitions is unknown).
> Will retry to create this topic in 100 ms (to let broker finish async delete operation first).
> Error message was: org.apache.kafka.common.errors.TopicExistsException: Topic 'SmokeTest-KSTREAM-REDUCE-STATE-STORE-0000000020-changelog' already exists. (org.apache.kafka.streams.processor.internals.InternalTopicManager)
> [2020-05-14 09:56:40,120] INFO stream-thread [main] Could not create topic SmokeTest-uwin-cnt-changelog. Topic is probably marked for deletion (number of partitions is unknown).
> Will retry to create this topic in 100 ms (to let broker finish async delete operation first).
> Error message was: org.apache.kafka.common.errors.TopicExistsException: Topic 'SmokeTest-uwin-cnt-changelog' already exists. (org.apache.kafka.streams.processor.internals.InternalTopicManager) 
> [2020-05-14 09:56:40,120] INFO stream-thread [main] Could not create topic SmokeTest-cntByCnt-changelog. Topic is probably marked for deletion (number of partitions is unknown).
> Will retry to create this topic in 100 ms (to let broker finish async delete operation first).
> Error message was: org.apache.kafka.common.errors.TopicExistsException: Topic 'SmokeTest-cntByCnt-changelog' already exists. (org.apache.kafka.streams.processor.internals.InternalTopicManager)
> [2020-05-14 09:56:40,120] INFO stream-thread [main] Topics [SmokeTest-KSTREAM-REDUCE-STATE-STORE-0000000020-changelog, SmokeTest-uwin-cnt-changelog, SmokeTest-cntByCnt-changelog] can not be made ready with 5 retries left (org.apache.kafka.streams.processor.internals.InternalTopicManager)
> [2020-05-14 09:56:40,220] ERROR stream-thread [main] Could not create topics after 5 retries. This can happen if the Kafka cluster is temporary not available. You can increase admin client config `retries` to be resilient against this error. (org.apache.kafka.streams.processor.internals.InternalTopicManager)
> [2020-05-14 09:56:40,221] ERROR stream-thread [SmokeTest-05374457-074b-4d33-bca0-8686465e8157-StreamThread-2] Encountered the following unexpected Kafka exception during processing, this usually indicate Streams internal errors: (org.apache.kafka.streams.processor.internals.StreamThread)
> org.apache.kafka.streams.errors.StreamsException: Could not create topics after 5 retries. This can happen if the Kafka cluster is temporary not available. You can increase admin client config `retries` to be resilient against this error.
>         at org.apache.kafka.streams.processor.internals.InternalTopicManager.makeReady(InternalTopicManager.java:171)
>         at org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.prepareTopic(StreamsPartitionAssignor.java:1229)
>         at org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assign(StreamsPartitionAssignor.java:588) 
>         at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:548)
>         at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:650) 
>         at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1300(AbstractCoordinator.java:111)
>         at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:572)
>         at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:555)
>         at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1026)
>         at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1006)
>         at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204)
>         at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)
>         at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)
>         at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:599)
>         at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:409)
>         at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:294) 
>         at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
>         at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:212)
>         at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:400)
>         at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:340)
>         at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:471)
>         at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1267)
>         at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1231)
>         at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211)
>         at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:843)
>         at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:743){code}
> Looking closer it seems that during the topic description phase we didn't find the topic due to temporary unavailability of brokers. However, we gave up retrying the topic description after the first run. A more resilient approach should be retrying the topic description to make sure we could filter topics that are already created and have expected number of partitions so that we don't wait for the unrealistic pending-to-remove phase.
> Another angle to this problem is that we don't know as of today if a topic is pending deletion or running properly. We could discuss a follow-up effort to reflect that information as part of topic description result.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)