You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2018/05/29 15:37:32 UTC

[kafka] branch trunk updated: KAFKA-6916; Refresh metadata in admin client if broker connection fails (#5050)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 3a8d3a7  KAFKA-6916; Refresh metadata in admin client if broker connection fails (#5050)
3a8d3a7 is described below

commit 3a8d3a792755cb2bc2a72d3a64dd84b6c8653035
Author: Rajini Sivaram <ra...@googlemail.com>
AuthorDate: Tue May 29 16:37:17 2018 +0100

    KAFKA-6916; Refresh metadata in admin client if broker connection fails (#5050)
    
    Refresh metadata if broker connection fails so that new calls are sent only to nodes that are alive and requests to controller are sent to the new controller if controller changes due to broker failure. Also reassign calls that could not be sent.
    
    Reviewers: Dong Lin <li...@gmail.com>, Jason Gustafson <ja...@confluent.io>
---
 .../kafka/clients/admin/KafkaAdminClient.java      | 28 +++++++++++++++++++++-
 .../admin/internals/AdminMetadataManager.java      |  2 +-
 .../kafka/api/AdminClientIntegrationTest.scala     | 15 ++++++++++++
 3 files changed, 43 insertions(+), 2 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 6c35f00..793e1da 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -991,6 +991,29 @@ public class KafkaAdminClient extends AdminClient {
             }
         }
 
+        /**
+         * Reassign calls that have not yet been sent. When metadata is refreshed,
+         * all unsent calls are reassigned to handle controller change and node changes.
+         * When a node is disconnected, all calls assigned to the node are reassigned.
+         *
+         * @param now The current time in milliseconds
+         * @param disconnectedOnly Reassign only calls to nodes that were disconnected
+         *                         in the last poll
+         */
+        private void reassignUnsentCalls(long now, boolean disconnectedOnly) {
+            ArrayList<Call> pendingCallsToSend = new ArrayList<>();
+            for (Iterator<Map.Entry<Node, List<Call>>> iter = callsToSend.entrySet().iterator(); iter.hasNext(); ) {
+                Map.Entry<Node, List<Call>> entry = iter.next();
+                if (!disconnectedOnly || client.connectionFailed(entry.getKey())) {
+                    for (Call call : entry.getValue()) {
+                        pendingCallsToSend.add(call);
+                    }
+                    iter.remove();
+                }
+            }
+            chooseNodesForPendingCalls(now, pendingCallsToSend.iterator());
+        }
+
         private boolean hasActiveExternalCalls(Collection<Call> calls) {
             for (Call call : calls) {
                 if (!call.isInternal()) {
@@ -1075,6 +1098,7 @@ public class KafkaAdminClient extends AdminClient {
 
                 // Update the current time and handle the latest responses.
                 now = time.milliseconds();
+                reassignUnsentCalls(now, true); // reassign calls to disconnected nodes
                 handleResponses(now, responses);
             }
             int numTimedOut = 0;
@@ -1158,7 +1182,9 @@ public class KafkaAdminClient extends AdminClient {
                 @Override
                 public void handleResponse(AbstractResponse abstractResponse) {
                     MetadataResponse response = (MetadataResponse) abstractResponse;
-                    metadataManager.update(response.cluster(), time.milliseconds());
+                    long now = time.milliseconds();
+                    metadataManager.update(response.cluster(), now);
+                    reassignUnsentCalls(now, false);
                 }
 
                 @Override
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java
index e06aed2..85d3c28 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java
@@ -116,7 +116,7 @@ public class AdminMetadataManager {
 
         @Override
         public void requestUpdate() {
-            // Do nothing
+            AdminMetadataManager.this.requestUpdate();
         }
     }
 
diff --git a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
index 24daa86..231b1e7 100644
--- a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
@@ -219,6 +219,21 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
     waitForTopics(client, List(), topics)
   }
 
+  @Test
+  def testMetadataRefresh(): Unit = {
+    client = AdminClient.create(createConfig())
+    val topics = Seq("mytopic")
+    val newTopics = Seq(new NewTopic("mytopic", 3, 3))
+    client.createTopics(newTopics.asJava).all.get()
+    waitForTopics(client, expectedPresent = topics, expectedMissing = List())
+
+    val controller = servers.find(_.config.brokerId == TestUtils.waitUntilControllerElected(zkClient)).get
+    controller.shutdown()
+    controller.awaitShutdown()
+    val topicDesc = client.describeTopics(topics.asJava).all.get()
+    assertEquals(topics.toSet, topicDesc.keySet.asScala)
+  }
+
   /**
     * describe should not auto create topics
     */

-- 
To stop receiving notification emails like this one, please contact
jgus@apache.org.