You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rh...@apache.org on 2022/03/09 22:22:19 UTC

[kafka] branch 3.1 updated (7802b45 -> c4f84c8)

This is an automated email from the ASF dual-hosted git repository.

rhauch pushed a change to branch 3.1
in repository https://gitbox.apache.org/repos/asf/kafka.git.


    from 7802b45  MINOR: Fix flaky test cases SocketServerTest.remoteCloseWithoutBufferedReceives and SocketServerTest.remoteCloseWithIncompleteBufferedReceive (#11861)
     new d95647b  KAFKA-12879: Revert changes from KAFKA-12339 and instead add retry capability to KafkaBasedLog (#11797)
     new 020902d  KAFKA-12879: Addendum to reduce flakiness of tests (#11871)
     new c4f84c8  KAFKA-12879: Remove extra sleep (#11872)

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../admin/internals/MetadataOperationContext.java  |   1 -
 .../kafka/clients/admin/KafkaAdminClientTest.java  |  42 +----
 .../apache/kafka/connect/util/KafkaBasedLog.java   |  27 ++-
 .../org/apache/kafka/connect/util/RetryUtil.java   | 103 ++++++++++++
 .../org/apache/kafka/connect/util/TopicAdmin.java  |  29 +++-
 .../kafka/connect/util/KafkaBasedLogTest.java      |  14 +-
 .../apache/kafka/connect/util/RetryUtilTest.java   | 186 +++++++++++++++++++++
 .../apache/kafka/connect/util/TopicAdminTest.java  |  59 ++++++-
 8 files changed, 406 insertions(+), 55 deletions(-)
 create mode 100644 connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java
 create mode 100644 connect/runtime/src/test/java/org/apache/kafka/connect/util/RetryUtilTest.java

[kafka] 01/03: KAFKA-12879: Revert changes from KAFKA-12339 and instead add retry capability to KafkaBasedLog (#11797)

Posted by rh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rhauch pushed a commit to branch 3.1
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit d95647bcc890c4f7a7572102f84d1759ef53281e
Author: Philip Nee <ph...@gmail.com>
AuthorDate: Wed Mar 9 10:39:28 2022 -0800

    KAFKA-12879: Revert changes from KAFKA-12339 and instead add retry capability to KafkaBasedLog (#11797)
    
    Fixes the compatibility issue regarding KAFKA-12879 by reverting the changes to the admin client from KAFKA-12339 (#10152) that retry admin client operations, and instead perform the retries within Connect's `KafkaBasedLog` during startup via a new `TopicAdmin.retryEndOffsets(..)` method. This method delegates to the existing `TopicAdmin.endOffsets(...)` method, but will retry on `RetriableException` until the retry timeout elapses.
    
    This change should be backward compatible to the KAFKA-12339 so that when Connect's `KafkaBasedLog` starts up it will retry attempts to read the end offsets for the log's topic. The `KafkaBasedLog` existing thread already has its own retry logic, and this is not changed.
    
    Added more unit tests, and thoroughly tested the new `RetryUtil` used to encapsulate the parameterized retry logic around any supplied function.
---
 .../admin/internals/MetadataOperationContext.java  |   1 -
 .../kafka/clients/admin/KafkaAdminClientTest.java  |  42 +----
 .../apache/kafka/connect/util/KafkaBasedLog.java   |  27 ++-
 .../org/apache/kafka/connect/util/RetryUtil.java   | 101 +++++++++++
 .../org/apache/kafka/connect/util/TopicAdmin.java  |  29 +++-
 .../kafka/connect/util/KafkaBasedLogTest.java      |  14 +-
 .../apache/kafka/connect/util/RetryUtilTest.java   | 184 +++++++++++++++++++++
 .../apache/kafka/connect/util/TopicAdminTest.java  |  59 ++++++-
 8 files changed, 402 insertions(+), 55 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/MetadataOperationContext.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/MetadataOperationContext.java
index e7f2c07..c05e5cf 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/MetadataOperationContext.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/MetadataOperationContext.java
@@ -82,7 +82,6 @@ public final class MetadataOperationContext<T, O extends AbstractOptions<O>> {
 
     public static void handleMetadataErrors(MetadataResponse response) {
         for (TopicMetadata tm : response.topicMetadata()) {
-            if (shouldRefreshMetadata(tm.error())) throw tm.error().exception();
             for (PartitionMetadata pm : tm.partitionMetadata()) {
                 if (shouldRefreshMetadata(pm.error)) {
                     throw pm.error.exception();
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index b648b2d..f6a7352 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -489,16 +489,12 @@ public class KafkaAdminClientTest {
     }
 
     private static MetadataResponse prepareMetadataResponse(Cluster cluster, Errors error) {
-        return prepareMetadataResponse(cluster, error, error);
-    }
-
-    private static MetadataResponse prepareMetadataResponse(Cluster cluster, Errors topicError, Errors partitionError) {
         List<MetadataResponseTopic> metadata = new ArrayList<>();
         for (String topic : cluster.topics()) {
             List<MetadataResponsePartition> pms = new ArrayList<>();
             for (PartitionInfo pInfo : cluster.availablePartitionsForTopic(topic)) {
                 MetadataResponsePartition pm  = new MetadataResponsePartition()
-                    .setErrorCode(partitionError.code())
+                    .setErrorCode(error.code())
                     .setPartitionIndex(pInfo.partition())
                     .setLeaderId(pInfo.leader().id())
                     .setLeaderEpoch(234)
@@ -508,7 +504,7 @@ public class KafkaAdminClientTest {
                 pms.add(pm);
             }
             MetadataResponseTopic tm = new MetadataResponseTopic()
-                .setErrorCode(topicError.code())
+                .setErrorCode(error.code())
                 .setName(topic)
                 .setIsInternal(false)
                 .setPartitions(pms);
@@ -4340,40 +4336,6 @@ public class KafkaAdminClientTest {
     }
 
     @Test
-    public void testListOffsetsRetriableErrorOnMetadata() throws Exception {
-        Node node = new Node(0, "localhost", 8120);
-        List<Node> nodes = Collections.singletonList(node);
-        final Cluster cluster = new Cluster(
-            "mockClusterId",
-            nodes,
-            Collections.singleton(new PartitionInfo("foo", 0, node, new Node[]{node}, new Node[]{node})),
-            Collections.emptySet(),
-            Collections.emptySet(),
-            node);
-        final TopicPartition tp0 = new TopicPartition("foo", 0);
-
-        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) {
-            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
-            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.UNKNOWN_TOPIC_OR_PARTITION, Errors.NONE));
-            // metadata refresh because of UNKNOWN_TOPIC_OR_PARTITION
-            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE));
-            // listoffsets response from broker 0
-            ListOffsetsResponseData responseData = new ListOffsetsResponseData()
-                .setThrottleTimeMs(0)
-                .setTopics(Collections.singletonList(ListOffsetsResponse.singletonListOffsetsTopicResponse(tp0, Errors.NONE, -1L, 123L, 321)));
-            env.kafkaClient().prepareResponseFrom(new ListOffsetsResponse(responseData), node);
-
-            ListOffsetsResult result = env.adminClient().listOffsets(Collections.singletonMap(tp0, OffsetSpec.latest()));
-
-            Map<TopicPartition, ListOffsetsResultInfo> offsets = result.all().get(3, TimeUnit.SECONDS);
-            assertEquals(1, offsets.size());
-            assertEquals(123L, offsets.get(tp0).offset());
-            assertEquals(321, offsets.get(tp0).leaderEpoch().get().intValue());
-            assertEquals(-1L, offsets.get(tp0).timestamp());
-        }
-    }
-
-    @Test
     public void testListOffsetsRetriableErrors() throws Exception {
 
         Node node0 = new Node(0, "localhost", 8120);
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
index b1920d5..1dfb967 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
@@ -76,6 +76,10 @@ public class KafkaBasedLog<K, V> {
     private static final Logger log = LoggerFactory.getLogger(KafkaBasedLog.class);
     private static final long CREATE_TOPIC_TIMEOUT_NS = TimeUnit.SECONDS.toNanos(30);
     private static final long MAX_SLEEP_MS = TimeUnit.SECONDS.toMillis(1);
+    // 15min of admin retry duration to ensure successful metadata propagation.  10 seconds of backoff
+    // in between retries
+    private static final Duration ADMIN_CLIENT_RETRY_DURATION = Duration.ofMinutes(15);
+    private static final long ADMIN_CLIENT_RETRY_BACKOFF_MS = TimeUnit.SECONDS.toMillis(10);
 
     private Time time;
     private final String topic;
@@ -194,7 +198,7 @@ public class KafkaBasedLog<K, V> {
         // when a 'group.id' is specified (if offsets happen to have been committed unexpectedly).
         consumer.seekToBeginning(partitions);
 
-        readToLogEnd();
+        readToLogEnd(true);
 
         thread = new WorkThread();
         thread.start();
@@ -319,9 +323,16 @@ public class KafkaBasedLog<K, V> {
         }
     }
 
-    private void readToLogEnd() {
+    /**
+     * This method finds the end offsets of the Kafka log's topic partitions, optionally retrying
+     * if the {@code listOffsets()} method of the admin client throws a {@link RetriableException}.
+     *
+     * @param shouldRetry Boolean flag to enable retry for the admin client {@code listOffsets()} call.
+     * @see TopicAdmin#retryEndOffsets
+     */
+    private void readToLogEnd(boolean shouldRetry) {
         Set<TopicPartition> assignment = consumer.assignment();
-        Map<TopicPartition, Long> endOffsets = readEndOffsets(assignment);
+        Map<TopicPartition, Long> endOffsets = readEndOffsets(assignment, shouldRetry);
         log.trace("Reading to end of log offsets {}", endOffsets);
 
         while (!endOffsets.isEmpty()) {
@@ -345,7 +356,7 @@ public class KafkaBasedLog<K, V> {
     }
 
     // Visible for testing
-    Map<TopicPartition, Long> readEndOffsets(Set<TopicPartition> assignment) {
+    Map<TopicPartition, Long> readEndOffsets(Set<TopicPartition> assignment, boolean shouldRetry) {
         log.trace("Reading to end of offset log");
 
         // Note that we'd prefer to not use the consumer to find the end offsets for the assigned topic partitions.
@@ -360,6 +371,12 @@ public class KafkaBasedLog<K, V> {
             // Use the admin client to immediately find the end offsets for the assigned topic partitions.
             // Unlike using the consumer
             try {
+                if (shouldRetry) {
+                    return admin.retryEndOffsets(assignment,
+                            ADMIN_CLIENT_RETRY_DURATION,
+                            ADMIN_CLIENT_RETRY_BACKOFF_MS);
+                }
+
                 return admin.endOffsets(assignment);
             } catch (UnsupportedVersionException e) {
                 // This may happen with really old brokers that don't support the auto topic creation
@@ -395,7 +412,7 @@ public class KafkaBasedLog<K, V> {
 
                     if (numCallbacks > 0) {
                         try {
-                            readToLogEnd();
+                            readToLogEnd(false);
                             log.trace("Finished read to end log for topic {}", topic);
                         } catch (TimeoutException e) {
                             log.warn("Timeout while reading log to end for topic '{}'. Retrying automatically. " +
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java
new file mode 100644
index 0000000..9463f6a
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.Optional;
+import java.util.concurrent.Callable;
+import java.util.function.Supplier;
+
+public class RetryUtil {
+    private static final Logger log = LoggerFactory.getLogger(RetryUtil.class);
+
+    /**
+     * The method executes the callable at least once, optionally retrying the callable if
+     * {@link org.apache.kafka.connect.errors.RetriableException} is being thrown.  If timeout is exhausted,
+     * then the last exception is wrapped with a {@link org.apache.kafka.connect.errors.ConnectException} and thrown.
+     *
+     * <p>{@code description} supplies the message that indicates the purpose of the callable since the message will
+     * be used for logging.  For example, "list offsets". If the supplier is null or the supplied string is
+     * null, {@code callable} will be used as the default string.
+     *
+     * <p>The task will be executed at least once. No retries will be performed 
+     * if {@code timeoutDuration} is 0 or negative, or if {@code timeoutDuration} is less than {@code retryBackoffMs}.
+     *
+     * <p>A {@code retryBackoffMs} that is negative or zero will result in no delays between retries.
+     *
+     * @param callable          the function to execute
+     * @param description       supplier that provides custom message for logging purpose
+     * @param timeoutDuration   timeout duration; must not be null
+     * @param retryBackoffMs    the number of milliseconds to delay upon receiving a
+     *                          {@link org.apache.kafka.connect.errors.RetriableException} before retrying again
+     * @throws ConnectException If the task exhausted all the retries
+     */
+    public static <T> T retryUntilTimeout(Callable<T> callable, Supplier<String> description, Duration timeoutDuration, long retryBackoffMs) throws Exception {
+        // if null supplier or string is provided, the message will be default to "callabe"
+        final String descriptionStr = Optional.ofNullable(description)
+                .map(Supplier::get)
+                .orElse("callable");
+
+        // handling null duration
+        final long timeoutMs = Optional.ofNullable(timeoutDuration)
+                .map(Duration::toMillis)
+                .orElse(0L);
+
+        if (retryBackoffMs < 0) {
+            log.debug("Assuming no retry backoff since retryBackoffMs={} is negative", retryBackoffMs);
+            retryBackoffMs = 0;
+        }
+        if (timeoutMs <= 0 || retryBackoffMs >= timeoutMs) {
+            log.debug("Executing {} only once, since timeoutMs={} is not larger than retryBackoffMs={}",
+                    descriptionStr, timeoutMs, retryBackoffMs);
+            return callable.call();
+        }
+
+        final long end = System.currentTimeMillis() + timeoutMs;
+        int attempt = 0;
+        Throwable lastError = null;
+        do {
+            attempt++;
+            try {
+                return callable.call();
+            } catch (RetriableException | org.apache.kafka.connect.errors.RetriableException e) {
+                log.warn("Attempt {} to {} resulted in RetriableException; retrying automatically. " +
+                        "Reason: {}", attempt, descriptionStr, e.getMessage(), e);
+                lastError = e;
+            } catch (WakeupException e) {
+                lastError = e;
+            }
+
+            long millisRemaining = Math.max(0, end - System.currentTimeMillis());
+            if (millisRemaining < retryBackoffMs) {
+                // exit when the time remaining is less than retryBackoffMs
+                break;
+            }
+            Utils.sleep(retryBackoffMs);
+        } while (System.currentTimeMillis() < end);
+
+        throw new ConnectException("Fail to " + descriptionStr + " after " + attempt + " attempts.  Reason: " + lastError.getMessage(), lastError);
+    }
+}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
index 7b2f152..c1afd2c 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
@@ -655,7 +655,7 @@ public class TopicAdmin implements AutoCloseable {
      * @throws TimeoutException if the offset metadata could not be fetched before the amount of time allocated
      *         by {@code request.timeout.ms} expires, and this call can be retried
      * @throws LeaderNotAvailableException if the leader was not available and this call can be retried
-     * @throws RetriableException if a retriable error occurs, or the thread is interrupted while attempting 
+     * @throws RetriableException if a retriable error occurs, or the thread is interrupted while attempting
      *         to perform this operation
      * @throws ConnectException if a non retriable error occurs
      */
@@ -703,6 +703,33 @@ public class TopicAdmin implements AutoCloseable {
         return result;
     }
 
+    /**
+     * Fetch the most recent offset for each of the supplied {@link TopicPartition} objects, and performs retry when
+     * {@link org.apache.kafka.connect.errors.RetriableException} is thrown.
+     *
+     * @param partitions        the topic partitions
+     * @param timeoutDuration   timeout duration; may not be null
+     * @param retryBackoffMs    the number of milliseconds to delay upon receiving a
+     *                          {@link org.apache.kafka.connect.errors.RetriableException} before retrying again;
+     *                          must be 0 or more
+     * @return                  the map of offset for each topic partition, or an empty map if the supplied partitions
+     *                          are null or empty
+     * @throws ConnectException if {@code timeoutDuration} is exhausted
+     * @see TopicAdmin#endOffsets(Set)
+     */
+    public Map<TopicPartition, Long> retryEndOffsets(Set<TopicPartition> partitions, Duration timeoutDuration, long retryBackoffMs) {
+
+        try {
+            return RetryUtil.retryUntilTimeout(
+                    () -> endOffsets(partitions),
+                    () -> "list offsets for topic partitions",
+                    timeoutDuration,
+                    retryBackoffMs);
+        } catch (Exception e) {
+            throw new ConnectException("Failed to list offsets for topic partitions.", e);
+        }
+    }
+
     @Override
     public void close() {
         admin.close();
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java
index 8fae57e..887b56a 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java
@@ -490,13 +490,15 @@ public class KafkaBasedLogTest {
         Map<TopicPartition, Long> endOffsets = new HashMap<>();
         endOffsets.put(TP0, 0L);
         endOffsets.put(TP1, 0L);
+        admin.retryEndOffsets(EasyMock.eq(tps), EasyMock.anyObject(), EasyMock.anyLong());
+        PowerMock.expectLastCall().andReturn(endOffsets).times(1);
         admin.endOffsets(EasyMock.eq(tps));
-        PowerMock.expectLastCall().andReturn(endOffsets).times(2);
+        PowerMock.expectLastCall().andReturn(endOffsets).times(1);
 
         PowerMock.replayAll();
 
         store.start();
-        assertEquals(endOffsets, store.readEndOffsets(tps));
+        assertEquals(endOffsets, store.readEndOffsets(tps, false));
     }
 
     @Test
@@ -507,7 +509,7 @@ public class KafkaBasedLogTest {
 
         Set<TopicPartition> tps = new HashSet<>(Arrays.asList(TP0, TP1));
         // Getting end offsets using the admin client should fail with unsupported version
-        admin.endOffsets(EasyMock.eq(tps));
+        admin.retryEndOffsets(EasyMock.eq(tps), EasyMock.anyObject(), EasyMock.anyLong());
         PowerMock.expectLastCall().andThrow(new UnsupportedVersionException("too old"));
 
         // Falls back to the consumer
@@ -519,7 +521,7 @@ public class KafkaBasedLogTest {
         PowerMock.replayAll();
 
         store.start();
-        assertEquals(endOffsets, store.readEndOffsets(tps));
+        assertEquals(endOffsets, store.readEndOffsets(tps, false));
     }
 
     @Test
@@ -533,7 +535,7 @@ public class KafkaBasedLogTest {
         endOffsets.put(TP0, 0L);
         endOffsets.put(TP1, 0L);
         // Getting end offsets upon startup should work fine
-        admin.endOffsets(EasyMock.eq(tps));
+        admin.retryEndOffsets(EasyMock.eq(tps), EasyMock.anyObject(), EasyMock.anyLong());
         PowerMock.expectLastCall().andReturn(endOffsets).times(1);
         // Getting end offsets using the admin client should fail with leader not available
         admin.endOffsets(EasyMock.eq(tps));
@@ -542,7 +544,7 @@ public class KafkaBasedLogTest {
         PowerMock.replayAll();
 
         store.start();
-        assertThrows(LeaderNotAvailableException.class, () -> store.readEndOffsets(tps));
+        assertThrows(LeaderNotAvailableException.class, () -> store.readEndOffsets(tps, false));
     }
 
     @SuppressWarnings("unchecked")
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/RetryUtilTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/RetryUtilTest.java
new file mode 100644
index 0000000..05f0212
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/RetryUtilTest.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.time.Duration;
+import java.util.concurrent.Callable;
+import java.util.function.Supplier;
+
+@RunWith(PowerMockRunner.class)
+public class RetryUtilTest {
+
+    private Callable<String> mockCallable;
+    private final Supplier<String> testMsg = () -> "Test";
+
+    @SuppressWarnings("unchecked")
+    @Before
+    public void setUp() throws Exception {
+        mockCallable = Mockito.mock(Callable.class);
+    }
+
+    @Test
+    public void testSuccess() throws Exception {
+        Mockito.when(mockCallable.call()).thenReturn("success");
+        assertEquals("success", RetryUtil.retryUntilTimeout(mockCallable, testMsg, Duration.ofMillis(100), 1));
+        Mockito.verify(mockCallable, Mockito.times(1)).call();
+    }
+
+    // timeout the test after 1000ms if unable to complete within a reasonable time frame
+    @Test(timeout = 1000)
+    public void testExhaustingRetries() throws Exception {
+        Mockito.when(mockCallable.call()).thenThrow(new TimeoutException());
+        ConnectException e = assertThrows(ConnectException.class,
+                () -> RetryUtil.retryUntilTimeout(mockCallable, testMsg, Duration.ofMillis(50), 10));
+        Mockito.verify(mockCallable, Mockito.atLeastOnce()).call();
+    }
+
+    @Test
+    public void retriesEventuallySucceed() throws Exception {
+        Mockito.when(mockCallable.call())
+                .thenThrow(new TimeoutException())
+                .thenThrow(new TimeoutException())
+                .thenThrow(new TimeoutException())
+                .thenReturn("success");
+
+        assertEquals("success", RetryUtil.retryUntilTimeout(mockCallable, testMsg, Duration.ofMillis(100), 1));
+        Mockito.verify(mockCallable, Mockito.times(4)).call();
+    }
+
+    @Test
+    public void failWithNonRetriableException() throws Exception {
+        Mockito.when(mockCallable.call())
+                .thenThrow(new TimeoutException("timeout"))
+                .thenThrow(new TimeoutException("timeout"))
+                .thenThrow(new TimeoutException("timeout"))
+                .thenThrow(new TimeoutException("timeout"))
+                .thenThrow(new TimeoutException("timeout"))
+                .thenThrow(new NullPointerException("Non retriable"));
+        NullPointerException e = assertThrows(NullPointerException.class,
+                () -> RetryUtil.retryUntilTimeout(mockCallable, testMsg, Duration.ofMillis(100), 0));
+        assertEquals("Non retriable", e.getMessage());
+        Mockito.verify(mockCallable, Mockito.times(6)).call();
+    }
+
+    @Test
+    public void noRetryAndSucceed() throws Exception {
+        Mockito.when(mockCallable.call()).thenReturn("success");
+
+        assertEquals("success", RetryUtil.retryUntilTimeout(mockCallable, testMsg, Duration.ofMillis(0), 100));
+        Mockito.verify(mockCallable, Mockito.times(1)).call();
+    }
+
+    @Test
+    public void noRetryAndFailed() throws Exception {
+        Mockito.when(mockCallable.call()).thenThrow(new TimeoutException("timeout exception"));
+
+        TimeoutException e = assertThrows(TimeoutException.class,
+                () -> RetryUtil.retryUntilTimeout(mockCallable, testMsg, Duration.ofMillis(0), 100));
+        Mockito.verify(mockCallable, Mockito.times(1)).call();
+        assertEquals("timeout exception", e.getMessage());
+    }
+
+    @Test
+    public void testNoBackoffTimeAndSucceed() throws Exception {
+        Mockito.when(mockCallable.call())
+                .thenThrow(new TimeoutException())
+                .thenThrow(new TimeoutException())
+                .thenThrow(new TimeoutException())
+                .thenReturn("success");
+
+        assertEquals("success", RetryUtil.retryUntilTimeout(mockCallable, testMsg, Duration.ofMillis(50), 0));
+        Mockito.verify(mockCallable, Mockito.times(4)).call();
+    }
+
+    @Test
+    public void testNoBackoffTimeAndFail() throws Exception {
+        Mockito.when(mockCallable.call()).thenThrow(new TimeoutException("timeout exception"));
+
+        ConnectException e = assertThrows(ConnectException.class,
+                () -> RetryUtil.retryUntilTimeout(mockCallable, testMsg, Duration.ofMillis(80), 0));
+        Mockito.verify(mockCallable, Mockito.atLeastOnce()).call();
+        assertTrue(e.getMessage().contains("Reason: timeout exception"));
+    }
+
+    @Test
+    public void testBackoffMoreThanTimeoutWillOnlyExecuteOnce() throws Exception {
+        Mockito.when(mockCallable.call()).thenThrow(new TimeoutException("timeout exception"));
+
+        TimeoutException e = assertThrows(TimeoutException.class,
+                () -> RetryUtil.retryUntilTimeout(mockCallable, testMsg, Duration.ofMillis(50), 100));
+        Mockito.verify(mockCallable, Mockito.times(1)).call();
+    }
+
+    @Test
+    public void testInvalidTimeDuration() throws Exception {
+        Mockito.when(mockCallable.call()).thenReturn("success");
+
+        assertEquals("success", RetryUtil.retryUntilTimeout(mockCallable, testMsg, null, 10));
+        assertEquals("success", RetryUtil.retryUntilTimeout(mockCallable, testMsg, Duration.ofMillis(-1), 10));
+        Mockito.verify(mockCallable, Mockito.times(2)).call();
+    }
+
+    @Test
+    public void testInvalidRetryTimeout() throws Exception {
+        Mockito.when(mockCallable.call())
+                .thenThrow(new TimeoutException("timeout"))
+                .thenReturn("success");
+        assertEquals("success", RetryUtil.retryUntilTimeout(mockCallable, testMsg, Duration.ofMillis(100), -1));
+        Mockito.verify(mockCallable, Mockito.times(2)).call();
+    }
+
+    @Test
+    public void testSupplier() throws Exception {
+        Mockito.when(mockCallable.call()).thenThrow(new TimeoutException("timeout exception"));
+
+        ConnectException e = assertThrows(ConnectException.class,
+                () -> RetryUtil.retryUntilTimeout(mockCallable, null, Duration.ofMillis(100), 10));
+        assertTrue(e.getMessage().startsWith("Fail to callable"));
+
+        e = assertThrows(ConnectException.class,
+                () -> RetryUtil.retryUntilTimeout(mockCallable, () -> null, Duration.ofMillis(100), 10));
+        assertTrue(e.getMessage().startsWith("Fail to callable"));
+
+        e = assertThrows(ConnectException.class,
+                () -> RetryUtil.retryUntilTimeout(mockCallable, () -> "execute lambda", Duration.ofMillis(500), 10));
+        assertTrue(e.getMessage().startsWith("Fail to execute lambda"));
+        Mockito.verify(mockCallable, Mockito.atLeast(3)).call();
+    }
+
+    @Test
+    public void testWakeupException() throws Exception {
+        Mockito.when(mockCallable.call()).thenThrow(new WakeupException());
+
+        ConnectException e = assertThrows(ConnectException.class,
+                () -> RetryUtil.retryUntilTimeout(mockCallable, testMsg, Duration.ofMillis(50), 10));
+        Mockito.verify(mockCallable, Mockito.atLeastOnce()).call();
+    }
+}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
index dc25129..32cec61 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.connect.util;
 
 import org.apache.kafka.clients.NodeApiVersions;
+import org.apache.kafka.clients.admin.AdminClientConfig;
 import org.apache.kafka.clients.admin.AdminClientUnitTestEnv;
 import org.apache.kafka.clients.admin.Config;
 import org.apache.kafka.clients.admin.DescribeTopicsResult;
@@ -53,6 +54,7 @@ import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.junit.Test;
 
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -466,6 +468,55 @@ public class TopicAdminTest {
     }
 
     @Test
+    public void retryEndOffsetsShouldThrowConnectException() {
+        String topicName = "myTopic";
+        TopicPartition tp1 = new TopicPartition(topicName, 0);
+        Set<TopicPartition> tps = Collections.singleton(tp1);
+        Long offset = 1000L;
+        Cluster cluster = createCluster(1, "myTopic", 1);
+
+        try (final AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new MockTime(10), cluster)) {
+            Map<TopicPartition, Long> offsetMap = new HashMap<>();
+            offsetMap.put(tp1, offset);
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.UNKNOWN_TOPIC_OR_PARTITION, Errors.NONE));
+            Map<String, Object> adminConfig = new HashMap<>();
+            adminConfig.put(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG, "0");
+            TopicAdmin admin = new TopicAdmin(adminConfig, env.adminClient());
+
+            assertThrows(ConnectException.class, () -> {
+                admin.retryEndOffsets(tps, Duration.ofMillis(100), 1);
+            });
+        }
+    }
+
+    @Test
+    public void retryEndOffsetsShouldRetryWhenTopicNotFound() {
+        String topicName = "myTopic";
+        TopicPartition tp1 = new TopicPartition(topicName, 0);
+        Set<TopicPartition> tps = Collections.singleton(tp1);
+        Long offset = 1000L;
+        Cluster cluster = createCluster(1, "myTopic", 1);
+
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new MockTime(10), cluster)) {
+            Map<TopicPartition, Long> offsetMap = new HashMap<>();
+            offsetMap.put(tp1, offset);
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.UNKNOWN_TOPIC_OR_PARTITION));
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE));
+            env.kafkaClient().prepareResponse(listOffsetsResult(tp1, offset));
+
+            Map<String, Object> adminConfig = new HashMap<>();
+            adminConfig.put(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG, "0");
+            TopicAdmin admin = new TopicAdmin(adminConfig, env.adminClient());
+            Map<TopicPartition, Long> endoffsets = admin.retryEndOffsets(tps, Duration.ofMillis(100), 1);
+            assertNotNull(endoffsets);
+            assertTrue(endoffsets.containsKey(tp1));
+            assertEquals(1000L, endoffsets.get(tp1).longValue());
+        }
+    }
+
+    @Test
     public void endOffsetsShouldFailWithNonRetriableWhenAuthorizationFailureOccurs() {
         String topicName = "myTopic";
         TopicPartition tp1 = new TopicPartition(topicName, 0);
@@ -635,12 +686,16 @@ public class TopicAdminTest {
     }
 
     private MetadataResponse prepareMetadataResponse(Cluster cluster, Errors error) {
+        return prepareMetadataResponse(cluster, error, error);
+    }
+
+    private MetadataResponse prepareMetadataResponse(Cluster cluster, Errors topicError, Errors partitionError) {
         List<MetadataResponseTopic> metadata = new ArrayList<>();
         for (String topic : cluster.topics()) {
             List<MetadataResponseData.MetadataResponsePartition> pms = new ArrayList<>();
             for (PartitionInfo pInfo : cluster.availablePartitionsForTopic(topic)) {
                 MetadataResponseData.MetadataResponsePartition pm  = new MetadataResponseData.MetadataResponsePartition()
-                        .setErrorCode(error.code())
+                        .setErrorCode(partitionError.code())
                         .setPartitionIndex(pInfo.partition())
                         .setLeaderId(pInfo.leader().id())
                         .setLeaderEpoch(234)
@@ -650,7 +705,7 @@ public class TopicAdminTest {
                 pms.add(pm);
             }
             MetadataResponseTopic tm = new MetadataResponseTopic()
-                    .setErrorCode(error.code())
+                    .setErrorCode(topicError.code())
                     .setName(topic)
                     .setIsInternal(false)
                     .setPartitions(pms);

[kafka] 02/03: KAFKA-12879: Addendum to reduce flakiness of tests (#11871)

Posted by rh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rhauch pushed a commit to branch 3.1
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 020902d1649d2575ed154d675c52a23fc21d3df9
Author: Philip Nee <ph...@gmail.com>
AuthorDate: Wed Mar 9 12:37:48 2022 -0800

    KAFKA-12879: Addendum to reduce flakiness of tests (#11871)
    
    This is an addendum to the KAFKA-12879 (#11797) to fix some tests that are somewhat flaky when a build machine is heavily loaded (when the timeouts are too small).
    
    - Add an if check to void sleep(0)
    - Increase timeout in the tests
---
 .../main/java/org/apache/kafka/connect/util/RetryUtil.java    | 11 +++++++----
 .../java/org/apache/kafka/connect/util/RetryUtilTest.java     | 10 ++++++----
 2 files changed, 13 insertions(+), 8 deletions(-)

diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java
index 9463f6a..64fd97c 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java
@@ -88,10 +88,13 @@ public class RetryUtil {
                 lastError = e;
             }
 
-            long millisRemaining = Math.max(0, end - System.currentTimeMillis());
-            if (millisRemaining < retryBackoffMs) {
-                // exit when the time remaining is less than retryBackoffMs
-                break;
+            if (retryBackoffMs > 0) {
+                long millisRemaining = Math.max(0, end - System.currentTimeMillis());
+                if (millisRemaining < retryBackoffMs) {
+                    // exit when the time remaining is less than retryBackoffMs
+                    break;
+                }
+                Utils.sleep(retryBackoffMs);
             }
             Utils.sleep(retryBackoffMs);
         } while (System.currentTimeMillis() < end);
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/RetryUtilTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/RetryUtilTest.java
index 05f0212..58c101b 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/RetryUtilTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/RetryUtilTest.java
@@ -36,6 +36,8 @@ import java.util.function.Supplier;
 @RunWith(PowerMockRunner.class)
 public class RetryUtilTest {
 
+    private static final Duration TIMEOUT = Duration.ofSeconds(10);
+
     private Callable<String> mockCallable;
     private final Supplier<String> testMsg = () -> "Test";
 
@@ -69,7 +71,7 @@ public class RetryUtilTest {
                 .thenThrow(new TimeoutException())
                 .thenReturn("success");
 
-        assertEquals("success", RetryUtil.retryUntilTimeout(mockCallable, testMsg, Duration.ofMillis(100), 1));
+        assertEquals("success", RetryUtil.retryUntilTimeout(mockCallable, testMsg, TIMEOUT, 1));
         Mockito.verify(mockCallable, Mockito.times(4)).call();
     }
 
@@ -83,7 +85,7 @@ public class RetryUtilTest {
                 .thenThrow(new TimeoutException("timeout"))
                 .thenThrow(new NullPointerException("Non retriable"));
         NullPointerException e = assertThrows(NullPointerException.class,
-                () -> RetryUtil.retryUntilTimeout(mockCallable, testMsg, Duration.ofMillis(100), 0));
+                () -> RetryUtil.retryUntilTimeout(mockCallable, testMsg, TIMEOUT, 0));
         assertEquals("Non retriable", e.getMessage());
         Mockito.verify(mockCallable, Mockito.times(6)).call();
     }
@@ -114,7 +116,7 @@ public class RetryUtilTest {
                 .thenThrow(new TimeoutException())
                 .thenReturn("success");
 
-        assertEquals("success", RetryUtil.retryUntilTimeout(mockCallable, testMsg, Duration.ofMillis(50), 0));
+        assertEquals("success", RetryUtil.retryUntilTimeout(mockCallable, testMsg, TIMEOUT, 0));
         Mockito.verify(mockCallable, Mockito.times(4)).call();
     }
 
@@ -151,7 +153,7 @@ public class RetryUtilTest {
         Mockito.when(mockCallable.call())
                 .thenThrow(new TimeoutException("timeout"))
                 .thenReturn("success");
-        assertEquals("success", RetryUtil.retryUntilTimeout(mockCallable, testMsg, Duration.ofMillis(100), -1));
+        assertEquals("success", RetryUtil.retryUntilTimeout(mockCallable, testMsg, TIMEOUT, -1));
         Mockito.verify(mockCallable, Mockito.times(2)).call();
     }
 

[kafka] 03/03: KAFKA-12879: Remove extra sleep (#11872)

Posted by rh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rhauch pushed a commit to branch 3.1
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit c4f84c842961f076ec7962a3f89b50a8ee8cbf61
Author: Randall Hauch <rh...@gmail.com>
AuthorDate: Wed Mar 9 15:11:46 2022 -0600

    KAFKA-12879: Remove extra sleep (#11872)
---
 .../runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java   | 1 -
 1 file changed, 1 deletion(-)

diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java
index 64fd97c..84b9af1 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java
@@ -96,7 +96,6 @@ public class RetryUtil {
                 }
                 Utils.sleep(retryBackoffMs);
             }
-            Utils.sleep(retryBackoffMs);
         } while (System.currentTimeMillis() < end);
 
         throw new ConnectException("Fail to " + descriptionStr + " after " + attempt + " attempts.  Reason: " + lastError.getMessage(), lastError);