You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/02/18 18:55:03 UTC

[GitHub] [kafka] chia7712 opened a new pull request #10152: KAFKA-12339 Starting new connector cluster with new internal topics e…

chia7712 opened a new pull request #10152:
URL: https://github.com/apache/kafka/pull/10152


   issue: https://issues.apache.org/jira/browse/KAFKA-12339
   
   After migrating our connector env to 2.9.0-SNAPSHOT, it start to fail to deploy connector cluster. The error message is shown below.
   ```
   Uncaught exception in herder work thread, exiting:  (org.apache.kafka.connect.runtime.distributed.DistributedHerder:324)
   org.apache.kafka.connect.errors.ConnectException: Error while getting end offsets for topic 'connect-storage-topic-connect-cluster-1'
   at org.apache.kafka.connect.util.TopicAdmin.endOffsets(TopicAdmin.java:689)
   at org.apache.kafka.connect.util.KafkaBasedLog.readToLogEnd(KafkaBasedLog.java:338)
   at org.apache.kafka.connect.util.KafkaBasedLog.start(KafkaBasedLog.java:195)
   at org.apache.kafka.connect.storage.KafkaStatusBackingStore.start(KafkaStatusBackingStore.java:216)
   at org.apache.kafka.connect.runtime.AbstractHerder.startServices(AbstractHerder.java:129)
   at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:310)
   at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
   at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
   at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
   at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
   at java.base/java.lang.Thread.run(Thread.java:834)
   Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition.
   at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
   at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
   at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
   at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
   at org.apache.kafka.connect.util.TopicAdmin.endOffsets(TopicAdmin.java:668)
   ... 10 more
   ```
   #9780 added shared admin to get end offsets. KafkaAdmin#listOffsets does not handle topic-level error, hence the UnknownTopicOrPartitionException on topic-level can obstruct worker from running when the new internal topic is NOT synced to all brokers.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on pull request #10152: KAFKA-12339: Add retry to admin client's listOffsets

Posted by GitBox <gi...@apache.org>.
rhauch commented on pull request #10152:
URL: https://github.com/apache/kafka/pull/10152#issuecomment-782647280


   I've already cherry picked this to `2.7`, `2.6` (after release manager's approval) and `2.5`, and have asked for approval on `2.8` (since we are in code freeze).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch merged pull request #10152: KAFKA-12339 Starting new connector cluster with new internal topics e…

Posted by GitBox <gi...@apache.org>.
rhauch merged pull request #10152:
URL: https://github.com/apache/kafka/pull/10152


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] kkonstantine commented on a change in pull request #10152: KAFKA-12339 Starting new connector cluster with new internal topics e…

Posted by GitBox <gi...@apache.org>.
kkonstantine commented on a change in pull request #10152:
URL: https://github.com/apache/kafka/pull/10152#discussion_r579485651



##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/MetadataOperationContext.java
##########
@@ -82,6 +82,7 @@ public long deadline() {
 
     public static void handleMetadataErrors(MetadataResponse response) {
         for (TopicMetadata tm : response.topicMetadata()) {
+            if (shouldRefreshMetadata(tm.error())) throw tm.error().exception();

Review comment:
       nit: a stylistic observation is that this call is exactly the same as the call below. Yet it's written differently. 
   
   Styles differ, usually not too much, from module to module. I think it's good to keep existing styles to help with readability when the changes don't require a greater change. 
    
   ```suggestion
               if (shouldRefreshMetadata(tm.error())) {
                   throw tm.error().exception();
               }
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on pull request #10152: KAFKA-12339 Starting new connector cluster with new internal topics e…

Posted by GitBox <gi...@apache.org>.
rhauch commented on pull request #10152:
URL: https://github.com/apache/kafka/pull/10152#issuecomment-781812825


   Thanks, @chia7712. I've reviewed much of this, and it seems straightforward, though I plan to review more thoroughly tomorrow (~12 hours). I've asked a few others that might be more familiar with the admin client code to also take a look.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] kkonstantine commented on a change in pull request #10152: KAFKA-12339 Starting new connector cluster with new internal topics e…

Posted by GitBox <gi...@apache.org>.
kkonstantine commented on a change in pull request #10152:
URL: https://github.com/apache/kafka/pull/10152#discussion_r579485651



##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/MetadataOperationContext.java
##########
@@ -82,6 +82,7 @@ public long deadline() {
 
     public static void handleMetadataErrors(MetadataResponse response) {
         for (TopicMetadata tm : response.topicMetadata()) {
+            if (shouldRefreshMetadata(tm.error())) throw tm.error().exception();

Review comment:
       nit: a stylistic observation is that this call is exactly the same as the call below. Yet it's written differently. 
   
   Styles differ, usually not too much, from module to module. I think it's good to keep existing styles to help with readability and when the changes don't require a greater change. 
    
   ```suggestion
               if (shouldRefreshMetadata(tm.error())) { 
                   throw tm.error().exception();
               }
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on pull request #10152: KAFKA-12339 Starting new connector cluster with new internal topics e…

Posted by GitBox <gi...@apache.org>.
chia7712 commented on pull request #10152:
URL: https://github.com/apache/kafka/pull/10152#issuecomment-782527168


   > Also might be good to note in the description that the error could be retriable in a same way as the one on partition metadata and it's the fact that it is not retried (here or in the connect worker) that creates the issue.
   
   Should we cherry-pick this patch to other active branches?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] chia7712 commented on pull request #10152: KAFKA-12339 Starting new connector cluster with new internal topics e…

Posted by GitBox <gi...@apache.org>.
chia7712 commented on pull request #10152:
URL: https://github.com/apache/kafka/pull/10152#issuecomment-782526238


   @rhauch @kkonstantine Thanks for your reviews!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] kkonstantine commented on a change in pull request #10152: KAFKA-12339 Starting new connector cluster with new internal topics e…

Posted by GitBox <gi...@apache.org>.
kkonstantine commented on a change in pull request #10152:
URL: https://github.com/apache/kafka/pull/10152#discussion_r579485651



##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/MetadataOperationContext.java
##########
@@ -82,6 +82,7 @@ public long deadline() {
 
     public static void handleMetadataErrors(MetadataResponse response) {
         for (TopicMetadata tm : response.topicMetadata()) {
+            if (shouldRefreshMetadata(tm.error())) throw tm.error().exception();

Review comment:
       nit: a stylistic observation is that this call is exactly the same as the call below. Yet it's written differently. 
   
   Styles differ, usually not too much, from module to module. I think it's good to keep existing styles to help with readability and when the changes don't require a greater change. 
    
   ```suggestion
               if (shouldRefreshMetadata(tm.error())) {
                   throw tm.error().exception();
               }
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org