You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by li...@apache.org on 2018/05/26 00:55:32 UTC

[kafka] branch trunk updated: MINOR: AdminClient should respect retry backoff

This is an automated email from the ASF dual-hosted git repository.

lindong pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 7f19df2  MINOR: AdminClient should respect retry backoff
7f19df2 is described below

commit 7f19df29ac60ec9aef1dbd14704a2dd045b1cfbf
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Fri May 25 17:55:21 2018 -0700

    MINOR: AdminClient should respect retry backoff
    
    AdminClient should backoff when retrying a Call. Fixed and added a unit test
    
    Author: Jason Gustafson <ja...@confluent.io>
    
    Reviewers: Dong Lin <li...@gmail.com>
    
    Closes #5077 from hachikuji/admin-client-retry-backoff
---
 .../kafka/clients/admin/KafkaAdminClient.java      | 30 ++++++++--
 .../java/org/apache/kafka/clients/MockClient.java  |  4 ++
 .../kafka/clients/admin/KafkaAdminClientTest.java  | 69 ++++++++++++++++------
 3 files changed, 80 insertions(+), 23 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index c935ef9..6c35f00 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -220,6 +220,8 @@ public class KafkaAdminClient extends AdminClient {
 
     private final int maxRetries;
 
+    private final long retryBackoffMs;
+
     /**
      * Get or create a list value from a map.
      *
@@ -387,9 +389,14 @@ public class KafkaAdminClient extends AdminClient {
         return new LogContext("[AdminClient clientId=" + clientId + "] ");
     }
 
-    private KafkaAdminClient(AdminClientConfig config, String clientId, Time time,
-                AdminMetadataManager metadataManager, Metrics metrics, KafkaClient client,
-                TimeoutProcessorFactory timeoutProcessorFactory, LogContext logContext) {
+    private KafkaAdminClient(AdminClientConfig config,
+                             String clientId,
+                             Time time,
+                             AdminMetadataManager metadataManager,
+                             Metrics metrics,
+                             KafkaClient client,
+                             TimeoutProcessorFactory timeoutProcessorFactory,
+                             LogContext logContext) {
         this.defaultTimeoutMs = config.getInt(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG);
         this.clientId = clientId;
         this.log = logContext.logger(KafkaAdminClient.class);
@@ -406,6 +413,7 @@ public class KafkaAdminClient extends AdminClient {
         this.timeoutProcessorFactory = (timeoutProcessorFactory == null) ?
             new TimeoutProcessorFactory() : timeoutProcessorFactory;
         this.maxRetries = config.getInt(AdminClientConfig.RETRIES_CONFIG);
+        this.retryBackoffMs = config.getLong(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG);
         config.logUnused();
         AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics);
         log.debug("Kafka admin client initialized");
@@ -532,6 +540,7 @@ public class KafkaAdminClient extends AdminClient {
         private int tries = 0;
         private boolean aborted = false;
         private Node curNode = null;
+        private long nextAllowedTryMs = 0;
 
         Call(boolean internal, String callName, long deadlineMs, NodeProvider nodeProvider) {
             this.internal = internal;
@@ -581,6 +590,8 @@ public class KafkaAdminClient extends AdminClient {
                 return;
             }
             tries++;
+            nextAllowedTryMs = now + retryBackoffMs;
+
             // If the call has timed out, fail.
             if (calcTimeoutMsRemainingAsInt(now, deadlineMs) < 0) {
                 if (log.isDebugEnabled()) {
@@ -811,10 +822,18 @@ public class KafkaAdminClient extends AdminClient {
          * @param now           The current time in milliseconds.
          * @param pendingIter   An iterator yielding pending calls.
          */
-        private void chooseNodesForPendingCalls(long now, Iterator<Call> pendingIter) {
+        private long chooseNodesForPendingCalls(long now, Iterator<Call> pendingIter) {
+            long pollTimeout = Long.MAX_VALUE;
             log.trace("Trying to choose nodes for {} at {}", pendingIter, now);
             while (pendingIter.hasNext()) {
                 Call call = pendingIter.next();
+
+                // If the call is being retried, await the proper backoff before finding the node
+                if (now < call.nextAllowedTryMs) {
+                    pollTimeout = Math.min(pollTimeout, call.nextAllowedTryMs - now);
+                    continue;
+                }
+
                 Node node = null;
                 try {
                     node = call.nodeProvider.provide();
@@ -833,6 +852,7 @@ public class KafkaAdminClient extends AdminClient {
                     log.trace("Unable to assign {} to a node.", call);
                 }
             }
+            return pollTimeout;
         }
 
         /**
@@ -1033,7 +1053,7 @@ public class KafkaAdminClient extends AdminClient {
                 }
 
                 // Choose nodes for our pending calls.
-                chooseNodesForPendingCalls(now, pendingCalls.iterator());
+                pollTimeout = Math.min(pollTimeout, chooseNodesForPendingCalls(now, pendingCalls.iterator()));
                 long metadataFetchDelayMs = metadataManager.metadataFetchDelayMs(now);
                 if (metadataFetchDelayMs == 0) {
                     metadataManager.transitionToUpdatePending(now);
diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
index 7da4a30..91905c2 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
@@ -426,6 +426,10 @@ public class MockClient implements KafkaClient {
         return !metadataUpdates.isEmpty();
     }
 
+    public int numAwaitingResponses() {
+        return futureResponses.size();
+    }
+
     public void prepareMetadataUpdate(Cluster cluster, Set<String> unavailableTopics) {
         metadataUpdates.add(new MetadataUpdate(cluster, unavailableTopics, false));
     }
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 575f7f8..0bc786b 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -50,6 +50,7 @@ import org.apache.kafka.common.requests.ApiError;
 import org.apache.kafka.common.requests.CreateAclsResponse;
 import org.apache.kafka.common.requests.CreateAclsResponse.AclCreationResponse;
 import org.apache.kafka.common.requests.CreatePartitionsResponse;
+import org.apache.kafka.common.requests.CreateTopicsRequest;
 import org.apache.kafka.common.requests.CreateTopicsResponse;
 import org.apache.kafka.common.requests.DeleteAclsResponse;
 import org.apache.kafka.common.requests.DeleteAclsResponse.AclDeletionResult;
@@ -61,6 +62,7 @@ import org.apache.kafka.common.requests.DescribeConfigsResponse;
 import org.apache.kafka.common.requests.DescribeGroupsResponse;
 import org.apache.kafka.common.requests.FindCoordinatorResponse;
 import org.apache.kafka.common.requests.ListGroupsResponse;
+import org.apache.kafka.common.requests.MetadataRequest;
 import org.apache.kafka.common.requests.MetadataResponse;
 import org.apache.kafka.common.requests.OffsetFetchResponse;
 import org.apache.kafka.common.resource.Resource;
@@ -91,6 +93,7 @@ import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 
 import static java.util.Arrays.asList;
 import static java.util.Collections.singletonList;
@@ -102,7 +105,6 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
-
 /**
  * A unit test for KafkaAdminClient.
  *
@@ -258,9 +260,9 @@ public class KafkaAdminClientTest {
     public void testCreateTopics() throws Exception {
         try (AdminClientUnitTestEnv env = mockClientEnv()) {
             env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
-            env.kafkaClient().prepareMetadataUpdate(env.cluster(), Collections.<String>emptySet());
             env.kafkaClient().setNode(env.cluster().controller());
-            env.kafkaClient().prepareResponse(new CreateTopicsResponse(Collections.singletonMap("myTopic", new ApiError(Errors.NONE, ""))));
+            env.kafkaClient().prepareResponse(body -> body instanceof CreateTopicsRequest,
+                    new CreateTopicsResponse(Collections.singletonMap("myTopic", new ApiError(Errors.NONE, ""))));
             KafkaFuture<Void> future = env.adminClient().createTopics(
                     Collections.singleton(new NewTopic("myTopic", Collections.singletonMap(0, asList(0, 1, 2)))),
                     new CreateTopicsOptions().timeoutMs(10000)).all();
@@ -269,11 +271,55 @@ public class KafkaAdminClientTest {
     }
 
     @Test
+    public void testCreateTopicsRetryBackoff() throws Exception {
+        Cluster cluster = mockCluster(0);
+        MockTime time = new MockTime();
+        MockClient mockClient = new MockClient(time);
+        int retryBackoff = 100;
+
+        mockClient.prepareResponse(body -> body instanceof MetadataRequest,
+                new MetadataResponse(cluster.nodes(), cluster.clusterResource().clusterId(),
+                        1, Collections.emptyList()));
+
+        AtomicLong firstAttemptTime = new AtomicLong(0);
+        AtomicLong secondAttemptTime = new AtomicLong(0);
+
+        mockClient.prepareResponse(body -> {
+            firstAttemptTime.set(time.milliseconds());
+            return body instanceof CreateTopicsRequest;
+        }, null, true);
+
+        mockClient.prepareResponse(body -> {
+            secondAttemptTime.set(time.milliseconds());
+            return body instanceof CreateTopicsRequest;
+        }, new CreateTopicsResponse(Collections.singletonMap("myTopic", new ApiError(Errors.NONE, ""))));
+
+        try (final AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockClient, time, cluster,
+                newStrMap(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG, "" + retryBackoff))) {
+            mockClient.setNodeApiVersions(NodeApiVersions.create());
+            mockClient.setNode(env.cluster().controller());
+
+            KafkaFuture<Void> future = env.adminClient().createTopics(
+                    Collections.singleton(new NewTopic("myTopic", Collections.singletonMap(0, asList(0, 1, 2)))),
+                    new CreateTopicsOptions().timeoutMs(10000)).all();
+
+            // Wait until the first attempt has failed, then advance the time
+            TestUtils.waitForCondition(() -> mockClient.numAwaitingResponses() == 1,
+                    "Failed awaiting CreateTopics first request failure");
+            time.sleep(retryBackoff);
+
+            future.get();
+        }
+
+        long actualRetryBackoff = secondAttemptTime.get() - firstAttemptTime.get();
+        assertEquals("CreateTopics retry did not await expected backoff",
+                retryBackoff, actualRetryBackoff);
+    }
+
+    @Test
     public void testCreateTopicsHandleNotControllerException() throws Exception {
         try (AdminClientUnitTestEnv env = mockClientEnv()) {
             env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
-            env.kafkaClient().prepareMetadataUpdate(mockCluster(0), Collections.<String>emptySet());
-            env.kafkaClient().prepareMetadataUpdate(mockCluster(1), Collections.<String>emptySet());
             env.kafkaClient().setNode(env.cluster().nodeById(0));
             env.kafkaClient().prepareResponseFrom(new CreateTopicsResponse(
                 Collections.singletonMap("myTopic", new ApiError(Errors.NOT_CONTROLLER, ""))),
@@ -296,7 +342,6 @@ public class KafkaAdminClientTest {
     public void testInvalidTopicNames() throws Exception {
         try (AdminClientUnitTestEnv env = mockClientEnv()) {
             env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
-            env.kafkaClient().prepareMetadataUpdate(env.cluster(), Collections.<String>emptySet());
             env.kafkaClient().setNode(env.cluster().controller());
 
             List<String> sillyTopicNames = asList("", null);
@@ -450,7 +495,6 @@ public class KafkaAdminClientTest {
     public void testDescribeAcls() throws Exception {
         try (AdminClientUnitTestEnv env = mockClientEnv()) {
             env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
-            env.kafkaClient().prepareMetadataUpdate(env.cluster(), Collections.<String>emptySet());
             env.kafkaClient().setNode(env.cluster().controller());
 
             // Test a call where we get back ACL1 and ACL2.
@@ -474,7 +518,6 @@ public class KafkaAdminClientTest {
     public void testCreateAcls() throws Exception {
         try (AdminClientUnitTestEnv env = mockClientEnv()) {
             env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
-            env.kafkaClient().prepareMetadataUpdate(env.cluster(), Collections.<String>emptySet());
             env.kafkaClient().setNode(env.cluster().controller());
 
             // Test a call where we successfully create two ACLs.
@@ -503,7 +546,6 @@ public class KafkaAdminClientTest {
     public void testDeleteAcls() throws Exception {
         try (AdminClientUnitTestEnv env = mockClientEnv()) {
             env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
-            env.kafkaClient().prepareMetadataUpdate(env.cluster(), Collections.<String>emptySet());
             env.kafkaClient().setNode(env.cluster().controller());
 
             // Test a call where one filter has an error.
@@ -556,7 +598,6 @@ public class KafkaAdminClientTest {
             AdminClientConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG, "1",
                 AdminClientConfig.RECONNECT_BACKOFF_MS_CONFIG, "1")) {
             env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
-            env.kafkaClient().prepareMetadataUpdate(env.cluster(), Collections.<String>emptySet());
             env.kafkaClient().setNode(nodes.get(0));
             assertEquals(time, env.time());
             assertEquals(env.time(), ((KafkaAdminClient) env.adminClient()).time());
@@ -598,7 +639,6 @@ public class KafkaAdminClientTest {
     public void testDescribeConfigs() throws Exception {
         try (AdminClientUnitTestEnv env = mockClientEnv()) {
             env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
-            env.kafkaClient().prepareMetadataUpdate(env.cluster(), Collections.<String>emptySet());
             env.kafkaClient().setNode(env.cluster().controller());
             env.kafkaClient().prepareResponse(new DescribeConfigsResponse(0,
                 Collections.singletonMap(new org.apache.kafka.common.requests.Resource(BROKER, "0"),
@@ -614,7 +654,6 @@ public class KafkaAdminClientTest {
     public void testCreatePartitions() throws Exception {
         try (AdminClientUnitTestEnv env = mockClientEnv()) {
             env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
-            env.kafkaClient().prepareMetadataUpdate(env.cluster(), Collections.<String>emptySet());
             env.kafkaClient().setNode(env.cluster().controller());
 
             Map<String, ApiError> m = new HashMap<>();
@@ -667,7 +706,6 @@ public class KafkaAdminClientTest {
 
         try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) {
             env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
-            env.kafkaClient().prepareMetadataUpdate(env.cluster(), Collections.<String>emptySet());
             env.kafkaClient().setNode(env.cluster().nodes().get(0));
 
             Map<TopicPartition, DeleteRecordsResponse.PartitionResponse> m = new HashMap<>();
@@ -770,7 +808,6 @@ public class KafkaAdminClientTest {
 
         try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) {
             env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
-            env.kafkaClient().prepareMetadataUpdate(env.cluster(), Collections.emptySet());
             env.kafkaClient().setNode(env.cluster().controller());
 
             // Empty metadata response should be retried
@@ -845,7 +882,6 @@ public class KafkaAdminClientTest {
         try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time, cluster,
                 AdminClientConfig.RETRIES_CONFIG, "0")) {
             env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
-            env.kafkaClient().prepareMetadataUpdate(env.cluster(), Collections.emptySet());
             env.kafkaClient().setNode(env.cluster().controller());
 
             // Empty metadata causes the request to fail since we have no list of brokers
@@ -877,7 +913,6 @@ public class KafkaAdminClientTest {
 
         try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) {
             env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
-            env.kafkaClient().prepareMetadataUpdate(env.cluster(), Collections.<String>emptySet());
             env.kafkaClient().setNode(env.cluster().controller());
 
             env.kafkaClient().prepareResponse(new FindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
@@ -941,7 +976,6 @@ public class KafkaAdminClientTest {
 
         try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) {
             env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
-            env.kafkaClient().prepareMetadataUpdate(env.cluster(), Collections.<String>emptySet());
             env.kafkaClient().setNode(env.cluster().controller());
 
             env.kafkaClient().prepareResponse(new FindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
@@ -983,7 +1017,6 @@ public class KafkaAdminClientTest {
 
         try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) {
             env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
-            env.kafkaClient().prepareMetadataUpdate(env.cluster(), Collections.<String>emptySet());
             env.kafkaClient().setNode(env.cluster().controller());
 
             env.kafkaClient().prepareResponse(new FindCoordinatorResponse(Errors.NONE, env.cluster().controller()));

-- 
To stop receiving notification emails like this one, please contact
lindong@apache.org.