You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/08/21 23:36:03 UTC

[GitHub] [kafka] abbccdda commented on a change in pull request #8846: KAFKA-9800: [KIP-580] Client Exponential Backoff Implementation

abbccdda commented on a change in pull request #8846:
URL: https://github.com/apache/kafka/pull/8846#discussion_r474843042



##########
File path: clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
##########
@@ -84,6 +84,14 @@
 
     public static final String RETRY_BACKOFF_MS_CONFIG = "retry.backoff.ms";
     public static final String RETRY_BACKOFF_MS_DOC = "The amount of time to wait before attempting to retry a failed request to a given topic partition. This avoids repeatedly sending requests in a tight loop under some failure scenarios.";

Review comment:
       Should we update the comment to indicate this is the initial backoff time now?

##########
File path: clients/src/main/java/org/apache/kafka/clients/Metadata.java
##########
@@ -74,22 +76,28 @@
     private final ClusterResourceListeners clusterResourceListeners;
     private boolean isClosed;
     private final Map<TopicPartition, Integer> lastSeenLeaderEpochs;
+    private final ExponentialBackoff refreshBackoff;
+    final static double RETRY_BACKOFF_JITTER = CommonClientConfigs.RETRY_BACKOFF_JITTER;

Review comment:
       Are these two necessary? Couldn't we just directly use the config value instead?

##########
File path: clients/src/main/java/org/apache/kafka/clients/Metadata.java
##########
@@ -457,6 +467,18 @@ private void clearRecoverableErrors() {
      * to avoid retrying immediately.
      */
     public synchronized void failedUpdate(long now) {
+        this.incrementRefreshBackoff(now);
+    }
+
+    private void incrementRefreshBackoff(long now) {
+        this.refreshBackoffMs = this.refreshBackoff.backoff(this.attempts);

Review comment:
       For the general purpose ExponentialBackoff class, could we let the struct itself track the number of attempts made over time?

##########
File path: clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##########
@@ -2277,26 +2295,36 @@ public void testDescribeConsumerGroupNumRetries() throws Exception {
         final Time time = new MockTime();
 
         try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time, cluster,
-            AdminClientConfig.RETRIES_CONFIG, "0")) {
+                AdminClientConfig.RETRIES_CONFIG, "1",
+                AdminClientConfig.RETRY_BACKOFF_MS_CONFIG, "0")) {
             env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
 
-            env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
-
-            DescribeGroupsResponseData data = new DescribeGroupsResponseData();
+            // Case 1

Review comment:
       What does the 4 cases imply?

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -350,7 +351,11 @@
 
     private final int maxRetries;
 
-    private final long retryBackoffMs;
+    private ExponentialBackoff retryBackoff;
+
+    final static double RETRY_BACKOFF_JITTER = CommonClientConfigs.RETRY_BACKOFF_JITTER;

Review comment:
       Similar question, I don't see any obvious benefit for having 2 separate static variables.

##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
##########
@@ -128,14 +136,17 @@ public synchronized String prettyString() {
         }
     }
 
-    public SubscriptionState(LogContext logContext, OffsetResetStrategy defaultResetStrategy) {
+    public SubscriptionState(LogContext logContext, OffsetResetStrategy defaultResetStrategy,

Review comment:
       Could we create a constructor with default retry parameters? We could reduce code changes and make the default behavior consistent across different tests.

##########
File path: clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##########
@@ -2235,39 +2227,65 @@ public void testOffsetCommitRetryBackoff() throws Exception {
 
             mockClient.setNodeApiVersions(NodeApiVersions.create());
 
-            AtomicLong firstAttemptTime = new AtomicLong(0);
-            AtomicLong secondAttemptTime = new AtomicLong(0);
-
             final String groupId = "group-0";
             final TopicPartition tp1 = new TopicPartition("foo", 0);
 
             mockClient.prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
-            mockClient.prepareResponse(body -> {
-                firstAttemptTime.set(time.milliseconds());
-                return true;
-            }, prepareOffsetCommitResponse(tp1, Errors.NOT_COORDINATOR));
-
+            mockClient.prepareResponse(prepareOffsetCommitResponse(tp1, Errors.NOT_COORDINATOR));
 
             mockClient.prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
-            mockClient.prepareResponse(body -> {
-                secondAttemptTime.set(time.milliseconds());
-                return true;
-            }, prepareOffsetCommitResponse(tp1, Errors.NONE));
-
+            mockClient.prepareResponse(prepareOffsetCommitResponse(tp1, Errors.NONE));
 
             Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
             offsets.put(tp1, new OffsetAndMetadata(123L));
             final KafkaFuture<Void> future = env.adminClient().alterConsumerGroupOffsets(groupId, offsets).all();
 
-            TestUtils.waitForCondition(() -> mockClient.numAwaitingResponses() == 1, "Failed awaiting CommitOffsets first request failure");
+            TestUtils.waitForCondition(() -> mockClient.numAwaitingResponses() == 2, "Failed awaiting CommitOffsets first request failure");
             TestUtils.waitForCondition(() -> ((KafkaAdminClient) env.adminClient()).numPendingCalls() == 1, "Failed to add retry CommitOffsets call on first failure");
-            time.sleep(retryBackoff);
 
+            long sleepTime1 = (long) (retryBackoff * Math.pow(retryBackoffExpBase, 0) * (1 - retryBackoffJitter)) - 1;
+            long sleepTime2 = (long) (retryBackoff * Math.pow(retryBackoffExpBase, 0) * (retryBackoffJitter * 2)) + 2;
+
+            time.sleep(sleepTime1);
+            assertEquals(1, ((KafkaAdminClient) env.adminClient()).numPendingCalls());
+
+            time.sleep(sleepTime2);
             future.get();
+        }
+    }
 
-            long actualRetryBackoff = secondAttemptTime.get() - firstAttemptTime.get();
-            assertEquals("CommitOffsets retry did not await expected backoff!", retryBackoff, actualRetryBackoff);
+    private void prepareFindCoordinatorResponse(AdminClientUnitTestEnv env, boolean successful) {
+        if (successful) {
+            env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
+        } else {
+            env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, env.cluster().controller()));
+        }
+    }
+
+    private void prepareDescribeGroupResponse(AdminClientUnitTestEnv env, boolean successful) {
+        DescribeGroupsResponseData data;

Review comment:
       nit: I think we could just use different error code here, instead of building the entire data in branches. Also `data` should be final.

##########
File path: clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##########
@@ -4068,6 +4046,295 @@ public void testListOffsetsMetadataNonRetriableErrors() throws Exception {
         }
     }
 
+    private void prepareMetadataResponse(AdminClientUnitTestEnv env, boolean successful) {
+        if (successful) {
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(), Errors.NONE));
+        } else {
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(), Errors.LEADER_NOT_AVAILABLE));
+        }
+    }
+
+    private void prepareListoffsetsResponse(List<TopicPartition> tps, AdminClientUnitTestEnv env, boolean successful) {
+        Map<TopicPartition, PartitionData> responseData = new HashMap<>();
+        if (successful) {
+            tps.forEach(tp -> {
+                responseData.put(tp, new PartitionData(Errors.NONE, 123456789L, 345L, Optional.of(543)));
+            });
+            env.kafkaClient().prepareResponse(new ListOffsetResponse(responseData));
+        } else {
+            tps.forEach(tp -> {
+                responseData.put(tp, new PartitionData(Errors.LEADER_NOT_AVAILABLE, 123456789L, 345L, Optional.of(543)));
+            });
+            env.kafkaClient().prepareResponse(new ListOffsetResponse(responseData));
+        }
+    }
+
+    private void verifyListOffsetsResult(Map<TopicPartition, OffsetSpec> partitionOffsets, AdminClientUnitTestEnv env,
+                                         boolean successful) throws ExecutionException, InterruptedException {
+        ListOffsetsResult result = env.adminClient().listOffsets(partitionOffsets);
+        if (successful) {
+            Map<TopicPartition, ListOffsetsResultInfo> offsets = result.all().get();
+            assertFalse(offsets.isEmpty());
+            partitionOffsets.keySet().forEach(tp -> {
+                assertEquals(345L, offsets.get(tp).offset());
+                assertEquals(543, offsets.get(tp).leaderEpoch().get().intValue());
+                assertEquals(123456789L, offsets.get(tp).timestamp());
+            });
+        } else {
+            TestUtils.assertFutureError(result.all(), TimeoutException.class);
+        }
+    }
+
+    @Deprecated
+    @Test
+    public void testListOffsetsMaxAllowedNumRetries() throws Exception {
+
+        Node node0 = new Node(0, "localhost", 8120);
+        List<PartitionInfo> pInfos = new ArrayList<>();
+        pInfos.add(new PartitionInfo("foo", 0, node0, new Node[]{node0}, new Node[]{node0}));
+        pInfos.add(new PartitionInfo("bar", 0, node0, new Node[]{node0}, new Node[]{node0}));
+        pInfos.add(new PartitionInfo("baz", 0, node0, new Node[]{node0}, new Node[]{node0}));
+
+        MockTime time = new MockTime();
+        String maxAllowedNumTries = "2";
+        String retryBackoff = "0";
+
+        final Cluster cluster =
+                new Cluster(
+                        "mockClusterId",
+                        Arrays.asList(node0),
+                        pInfos,
+                        Collections.<String>emptySet(),
+                        Collections.<String>emptySet(),
+                        node0);
+
+        final TopicPartition tp1 = new TopicPartition("foo", 0);
+        final TopicPartition tp2 = new TopicPartition("bar", 0);
+        final TopicPartition tp3 = new TopicPartition("baz", 0);
+
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time, cluster,
+                AdminClientConfig.RETRIES_CONFIG, maxAllowedNumTries,
+                AdminClientConfig.RETRY_BACKOFF_MS_CONFIG, retryBackoff)) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            Map<TopicPartition, OffsetSpec> partitionOffsets = new HashMap<>();
+            partitionOffsets.put(tp1, OffsetSpec.latest());
+            partitionOffsets.put(tp2, OffsetSpec.earliest());
+            partitionOffsets.put(tp3, OffsetSpec.forTimestamp(time.milliseconds()));
+            List<TopicPartition> tps = new ArrayList<>(partitionOffsets.keySet());
+
+            // Case 1

Review comment:
       Would be better to describe what `cases` are referring to.

##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
##########
@@ -341,10 +342,11 @@ int attempts() {
         return attempts.get();
     }
 
-    void reenqueued(long now) {
+    void reenqueued(long newRetryBackoffMs, long now) {
         attempts.getAndIncrement();
         lastAttemptMs = Math.max(lastAppendTime, now);
         lastAppendTime = Math.max(lastAppendTime, now);
+        retryBackoffMs = newRetryBackoffMs;

Review comment:
       nit: just pass `retryBackoffMs` and `this. retryBackoffMs = retryBackoffMs;`

##########
File path: clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##########
@@ -620,13 +611,14 @@ public void testCreateTopicsRetryBackoff() throws Exception {
             TestUtils.waitForCondition(() -> ((KafkaAdminClient) env.adminClient()).numPendingCalls() == 1,
                 "Failed to add retry CreateTopics call");
 
-            time.sleep(retryBackoff);
+            long sleepTime1 = (long) (retryBackoff * Math.pow(retryBackoffExpBase, 0) * (1 - retryBackoffJitter)) - 1;
+            long sleepTime2 = (long) (retryBackoff * Math.pow(retryBackoffExpBase, 0) * (retryBackoffJitter * 2)) + 2;
 
-            future.get();
+            time.sleep(sleepTime1);
+            assertEquals(1, ((KafkaAdminClient) env.adminClient()).numPendingCalls());
 
-            long actualRetryBackoff = secondAttemptTime.get() - firstAttemptTime.get();
-            assertEquals("CreateTopics retry did not await expected backoff",
-                    retryBackoff, actualRetryBackoff);
+            time.sleep(sleepTime2);

Review comment:
       Do we lose precision of the backoff time here? Could we do (sleepTime - 1) and verify it is not done yet?

##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
##########
@@ -708,16 +719,35 @@ public synchronized void resume(TopicPartition tp) {
         assignedState(tp).resume();
     }
 
-    synchronized void requestFailed(Set<TopicPartition> partitions, long nextRetryTimeMs) {
+    synchronized void requestFailed(Set<TopicPartition> partitions, long now) {

Review comment:
       I think we could make requestFailed/requestSucceeded as templates, and make the `incrementRetryBackoff`/`resetRetryBackoff` as function parameters to be passed in.

##########
File path: clients/src/test/java/org/apache/kafka/common/utils/MockTime.java
##########
@@ -38,6 +38,8 @@
     private final CopyOnWriteArrayList<MockTimeListener> listeners = new CopyOnWriteArrayList<>();
 
     private final long autoTickMs;
+    // Record the sum of auto ticked milliseconds
+    private AtomicLong autoTickedMs;

Review comment:
       should be final

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
##########
@@ -105,7 +105,7 @@ public WorkerGroupMember(DistributedConfig config,
 
             this.metrics = new Metrics(metricConfig, reporters, time, metricsContext);
             this.retryBackoffMs = config.getLong(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG);
-            this.metadata = new Metadata(retryBackoffMs, config.getLong(CommonClientConfigs.METADATA_MAX_AGE_CONFIG),
+            this.metadata = new Metadata(retryBackoffMs, retryBackoffMs, config.getLong(CommonClientConfigs.METADATA_MAX_AGE_CONFIG),

Review comment:
       Similar for `Metadata`, we should have a separate constructor without `refreshBackoffMaxMs` to set it to the same as `retryBackoffMs` by default to avoid mis-configuration.

##########
File path: clients/src/test/java/org/apache/kafka/common/utils/MockTime.java
##########
@@ -75,8 +78,18 @@ public long nanoseconds() {
     }
 
     private void maybeSleep(long ms) {
-        if (ms != 0)
+        if (ms != 0) {
             sleep(ms);
+            autoTickedMs.getAndAdd(ms);
+        }
+    }
+
+    public long autoTickedMs() {
+        return autoTickedMs.get();
+    }
+
+    public void resetAutoTickedCounter() {

Review comment:
       nit: I think autoTickedMs is a recorder instead of a counter.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org