You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/02/23 00:09:19 UTC

[GitHub] [kafka] philipnee opened a new pull request #11797: Added retry capability to TopicAdmin.endOffsets

philipnee opened a new pull request #11797:
URL: https://github.com/apache/kafka/pull/11797


   **Summary**
   The goal of this PR is to fix the compatibility issue regarding
   KAFKA-12879, and this PR is the first step toward the fix by
   adding retry capability to the **TopicAdmin.endoffsets** method.  Such that,
   we will be able to perform KafkaAdminClient retry on the connect side,
   instead on the adminClient side.  In this PR, we added retry capability to the
   TopicAdmin when calling endOffsets, and we will perform retry on the
   RetriableException until the _retries_ (max retries) is hit, or api timeout.  On the
   connect side, it will be calling retryEndOffsets method to perform endOffsets call
   with retries.  This change should be backward compatible to the KAFKA-12339.
   In the subsequent PR - the behavior introduced in KAFKA-12339 will be revert.
   
   Notable changes are:
   **TopicAdmin**
   - Added retryEndOffsets method, which utilizes the new RetryUtil to perform retry
   
   **KafkaBasdLog**
   - Call TopicAdmin.retryEndOffsets to perform retry
   
   **RetryUtil**
   - A general utility that receives a Callable, max retry attempt, and backoff time in ms, perform retry
   - It only retries on RetriableException, e.g. UnknownTopicOrPartitionException, TimeoutException
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on a change in pull request #11797: KAFKA-12879: Revert changes from KAFKA-12339 and instead add retry capability to KafkaBasedLog

Posted by GitBox <gi...@apache.org>.
rhauch commented on a change in pull request #11797:
URL: https://github.com/apache/kafka/pull/11797#discussion_r815017009



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Callable;
+
+public class RetryUtil {
+    private static final Logger log = LoggerFactory.getLogger(RetryUtil.class);
+
+    /**
+     * The method executes the callable, and performs retries if
+     * {@link org.apache.kafka.connect.errors.RetriableException} is being thrown.  If other types of exceptions is
+     * caught, then the same exception will be rethrown.  If all retries are exhausted, then the last
+     * exception is wrapped into a {@link org.apache.kafka.connect.errors.ConnectException} and rethrown.
+     *
+     * The callable task will be executed at least once.  If <code>maxRetries</code> is set to 0, the task will be
+     * executed exactly once.  If <code>maxRetries</code> is set to <code>n</code>, the callable will be executed at
+     * most <code>n + 1</code> times.
+     *
+     * If <code>retryBackoffMs</code> is set to 0, no wait will happen in between the retries.
+     *
+     * @param callable The task to execute.
+     * @param maxRetries Maximum number of retries.
+     * @param retryBackoffMs Delay time to retry the callable task upon receiving a
+     * {@link org.apache.kafka.connect.errors.RetriableException}.
+     *
+     * @throws ConnectException If the task exhausted all the retries.
+     */
+    public static <T> T retry(Callable<T> callable, long maxRetries, long retryBackoffMs) throws Exception {
+        Throwable lastError = null;
+        int attempt = 0;
+        long maxAttempts = maxRetries + 1;
+        while (attempt++ < maxAttempts) {

Review comment:
       Should this be:
   ```suggestion
           while (++attempt <= maxAttempts) {
   ```
   so we could use a 1-based attempt in the log message on line 58-59?

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Callable;
+
+public class RetryUtil {
+    private static final Logger log = LoggerFactory.getLogger(RetryUtil.class);
+
+    /**
+     * The method executes the callable, and performs retries if
+     * {@link org.apache.kafka.connect.errors.RetriableException} is being thrown.  If other types of exceptions is
+     * caught, then the same exception will be rethrown.  If all retries are exhausted, then the last
+     * exception is wrapped into a {@link org.apache.kafka.connect.errors.ConnectException} and rethrown.
+     *
+     * The callable task will be executed at least once.  If <code>maxRetries</code> is set to 0, the task will be
+     * executed exactly once.  If <code>maxRetries</code> is set to <code>n</code>, the callable will be executed at
+     * most <code>n + 1</code> times.
+     *
+     * If <code>retryBackoffMs</code> is set to 0, no wait will happen in between the retries.
+     *
+     * @param callable The task to execute.
+     * @param maxRetries Maximum number of retries.
+     * @param retryBackoffMs Delay time to retry the callable task upon receiving a
+     * {@link org.apache.kafka.connect.errors.RetriableException}.
+     *
+     * @throws ConnectException If the task exhausted all the retries.
+     */
+    public static <T> T retry(Callable<T> callable, long maxRetries, long retryBackoffMs) throws Exception {
+        Throwable lastError = null;
+        int attempt = 0;
+        long maxAttempts = maxRetries + 1;

Review comment:
       Nit:
   ```suggestion
           final long maxAttempts = maxRetries + 1;
   ```

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Callable;
+
+public class RetryUtil {
+    private static final Logger log = LoggerFactory.getLogger(RetryUtil.class);
+
+    /**
+     * The method executes the callable, and performs retries if
+     * {@link org.apache.kafka.connect.errors.RetriableException} is being thrown.  If other types of exceptions is
+     * caught, then the same exception will be rethrown.  If all retries are exhausted, then the last
+     * exception is wrapped into a {@link org.apache.kafka.connect.errors.ConnectException} and rethrown.
+     *
+     * The callable task will be executed at least once.  If <code>maxRetries</code> is set to 0, the task will be
+     * executed exactly once.  If <code>maxRetries</code> is set to <code>n</code>, the callable will be executed at
+     * most <code>n + 1</code> times.
+     *
+     * If <code>retryBackoffMs</code> is set to 0, no wait will happen in between the retries.
+     *
+     * @param callable The task to execute.
+     * @param maxRetries Maximum number of retries.
+     * @param retryBackoffMs Delay time to retry the callable task upon receiving a
+     * {@link org.apache.kafka.connect.errors.RetriableException}.
+     *
+     * @throws ConnectException If the task exhausted all the retries.
+     */
+    public static <T> T retry(Callable<T> callable, long maxRetries, long retryBackoffMs) throws Exception {
+        Throwable lastError = null;
+        int attempt = 0;
+        long maxAttempts = maxRetries + 1;
+        while (attempt++ < maxAttempts) {
+            try {
+                return callable.call();
+            } catch (RetriableException | org.apache.kafka.connect.errors.RetriableException e) {
+                log.warn("RetriableException caught, retrying automatically up to {} more times. " +
+                        "Reason: {}", maxRetries - attempt, e.getMessage());

Review comment:
       WDYT?
   ```suggestion
                   log.warn("RetriableException caught on attempt {}, retrying automatically up to {} more times. " +
                           "Reason: {}", attempt, maxAttempts - attempt, e.getMessage());
   ```

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
##########
@@ -319,9 +319,11 @@ private void poll(long timeoutMs) {
         }
     }
 
-    private void readToLogEnd() {
+    private void readToLogEnd(boolean shouldRetry) {
         Set<TopicPartition> assignment = consumer.assignment();
-        Map<TopicPartition, Long> endOffsets = readEndOffsets(assignment);
+        // readEndOffsets makes listOffsets call to adminClient, if shouldRetry is set to True, the adminClinet
+        // will retry on RetriableExceptions

Review comment:
       Might be worth moving this to a JavaDoc for the method.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on a change in pull request #11797: KAFKA-12879: Revert changes from KAFKA-12339 and instead add retry capability to KafkaBasedLog

Posted by GitBox <gi...@apache.org>.
rhauch commented on a change in pull request #11797:
URL: https://github.com/apache/kafka/pull/11797#discussion_r819884210



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.Optional;
+import java.util.concurrent.Callable;
+import java.util.function.Supplier;
+
+public class RetryUtil {
+    private static final Logger log = LoggerFactory.getLogger(RetryUtil.class);
+
+    /**
+     * The method executes the callable at least once, optionally retrying the callable if
+     * {@link org.apache.kafka.connect.errors.RetriableException} is being thrown.  If timeout is exhausted,
+     * then the last exception is wrapped with a {@link org.apache.kafka.connect.errors.ConnectException} and thrown.
+     *
+     * <p>{@code description} supplies the message that indicates the purpose of the callable since the message will
+     * be used for logging.  For example, "list offsets". If the supplier is null or the supplied string is
+     * null, {@code callable} will be used as the default string.
+     *
+     * <p>The task will be executed at least once. No retries will be performed 
+     * if {@code timeoutDuration} is 0 or negative, or if {@code timeoutDuration} is less than {@code retryBackoffMs}.
+     *
+     * <p>A {@code retryBackoffMs} that is negative or zero will result in no delays between retries.
+     *
+     * @param callable          the function to execute
+     * @param description       supplier that provides custom message for logging purpose
+     * @param timeoutDuration   timeout duration; must not be null
+     * @param retryBackoffMs    the number of milliseconds to delay upon receiving a
+     *                          {@link org.apache.kafka.connect.errors.RetriableException} before retrying again
+     * @throws ConnectException If the task exhausted all the retries
+     */
+    public static <T> T retryUntilTimeout(Callable<T> callable, Supplier<String> description, Duration timeoutDuration, long retryBackoffMs) throws Exception {
+        // if null supplier or string is provided, the message will be default to "callabe"
+        String descriptionStr = Optional.ofNullable(description)
+                .map(Supplier::get)
+                .orElse("callable");
+
+        // handling null duration
+        long timeoutMs = Optional.ofNullable(timeoutDuration)
+                .map(Duration::toMillis)
+                .orElse(0L);
+
+        if (retryBackoffMs >= timeoutMs) {
+            log.debug("Executing {} only once, since retryBackoffMs={} is larger than total timeoutMs={}",
+                    descriptionStr, retryBackoffMs, timeoutMs);
+        }
+
+        if (retryBackoffMs < 0) {
+            log.debug("Invalid retryBackoffMs, must be non-negative but got {}. 0 will be used instead",
+                    retryBackoffMs);
+            retryBackoffMs = 0;
+        }
+
+        if (timeoutMs <= 0 || retryBackoffMs >= timeoutMs) {
+            // no retry
+            return callable.call();
+        }
+
+        long end = System.currentTimeMillis() + timeoutMs;
+        int attempt = 0;
+        Throwable lastError = null;
+        while (System.currentTimeMillis() <= end) {

Review comment:
       Ideally we'd not wrap the exception if there are no retries, so I guess it just depends on how hard it is to make that work.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] philipnee commented on a change in pull request #11797: KAFKA-12879: Revert changes from KAFKA-12339 and instead add retry capability to KafkaBasedLog

Posted by GitBox <gi...@apache.org>.
philipnee commented on a change in pull request #11797:
URL: https://github.com/apache/kafka/pull/11797#discussion_r815158013



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Callable;
+
+public class RetryUtil {
+    private static final Logger log = LoggerFactory.getLogger(RetryUtil.class);
+
+    /**
+     * The method executes the callable, and performs retries if
+     * {@link org.apache.kafka.connect.errors.RetriableException} is being thrown.  If other types of exceptions is
+     * caught, then the same exception will be rethrown.  If all retries are exhausted, then the last
+     * exception is wrapped into a {@link org.apache.kafka.connect.errors.ConnectException} and rethrown.
+     *
+     * The callable task will be executed at least once.  If <code>maxRetries</code> is set to 0, the task will be
+     * executed exactly once.  If <code>maxRetries</code> is set to <code>n</code>, the callable will be executed at
+     * most <code>n + 1</code> times.
+     *
+     * If <code>retryBackoffMs</code> is set to 0, no wait will happen in between the retries.
+     *
+     * @param callable The task to execute.
+     * @param maxRetries Maximum number of retries.
+     * @param retryBackoffMs Delay time to retry the callable task upon receiving a
+     * {@link org.apache.kafka.connect.errors.RetriableException}.
+     *
+     * @throws ConnectException If the task exhausted all the retries.
+     */
+    public static <T> T retry(Callable<T> callable, long maxRetries, long retryBackoffMs) throws Exception {
+        Throwable lastError = null;
+        int attempt = 0;
+        final long maxAttempts = maxRetries + 1;
+        while (++attempt <= maxAttempts) {
+            try {
+                return callable.call();
+            } catch (RetriableException | org.apache.kafka.connect.errors.RetriableException e) {
+                log.warn("RetriableException caught on attempt {}, retrying automatically up to {} more times. " +
+                        "Reason: {}", attempt, maxRetries - attempt, e.getMessage());

Review comment:
       We probably should log the exception, in fact.  making the change.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on a change in pull request #11797: KAFKA-12879: Revert changes from KAFKA-12339 and instead add retry capability to KafkaBasedLog

Posted by GitBox <gi...@apache.org>.
rhauch commented on a change in pull request #11797:
URL: https://github.com/apache/kafka/pull/11797#discussion_r815104923



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
##########
@@ -319,9 +319,18 @@ private void poll(long timeoutMs) {
         }
     }
 
-    private void readToLogEnd() {
+    /**
+     * This method finds the end offsets using the <code>listOffsets</code> method of the admin client.
+     * As the <code>listOffsets</code> method might throw a {@link RetriableException}, the <code>shouldRetry</code>
+     * flag enables retry, upon catching such exception, if it is set to <code>True</code>.
+     *
+     * @param shouldRetry Boolean flag to enable retry for the admin client <code>listOffsets</code> call.

Review comment:
       Maybe add:
   ```suggestion
        * @param shouldRetry Boolean flag to enable retry for the admin client <code>listOffsets</code> call.
        * @see TopicAdmin#retryEndOffsets
   ```

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
##########
@@ -319,9 +319,18 @@ private void poll(long timeoutMs) {
         }
     }
 
-    private void readToLogEnd() {
+    /**
+     * This method finds the end offsets using the <code>listOffsets</code> method of the admin client.
+     * As the <code>listOffsets</code> method might throw a {@link RetriableException}, the <code>shouldRetry</code>
+     * flag enables retry, upon catching such exception, if it is set to <code>True</code>.
+     *
+     * @param shouldRetry Boolean flag to enable retry for the admin client <code>listOffsets</code> call.
+     */
+
+    private void readToLogEnd(boolean shouldRetry) {
         Set<TopicPartition> assignment = consumer.assignment();
-        Map<TopicPartition, Long> endOffsets = readEndOffsets(assignment);
+        // it will subsequently invoke the listOffsets call here

Review comment:
       I don't think this comment adds much value, especially because "subsequently" is ambiguous. IMO the method call stands on its own.
   ```suggestion
   ```

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Callable;
+
+public class RetryUtil {
+    private static final Logger log = LoggerFactory.getLogger(RetryUtil.class);
+
+    /**
+     * The method executes the callable, and performs retries if
+     * {@link org.apache.kafka.connect.errors.RetriableException} is being thrown.  If other types of exceptions is
+     * caught, then the same exception will be rethrown.  If all retries are exhausted, then the last
+     * exception is wrapped into a {@link org.apache.kafka.connect.errors.ConnectException} and rethrown.
+     *
+     * The callable task will be executed at least once.  If <code>maxRetries</code> is set to 0, the task will be
+     * executed exactly once.  If <code>maxRetries</code> is set to <code>n</code>, the callable will be executed at
+     * most <code>n + 1</code> times.
+     *
+     * If <code>retryBackoffMs</code> is set to 0, no wait will happen in between the retries.
+     *
+     * @param callable The task to execute.
+     * @param maxRetries Maximum number of retries.
+     * @param retryBackoffMs Delay time to retry the callable task upon receiving a
+     * {@link org.apache.kafka.connect.errors.RetriableException}.
+     *
+     * @throws ConnectException If the task exhausted all the retries.
+     */
+    public static <T> T retry(Callable<T> callable, long maxRetries, long retryBackoffMs) throws Exception {
+        Throwable lastError = null;
+        int attempt = 0;
+        final long maxAttempts = maxRetries + 1;
+        while (++attempt <= maxAttempts) {
+            try {
+                return callable.call();
+            } catch (RetriableException | org.apache.kafka.connect.errors.RetriableException e) {
+                log.warn("RetriableException caught on attempt {}, retrying automatically up to {} more times. " +
+                        "Reason: {}", attempt, maxRetries - attempt, e.getMessage());
+                lastError = e;
+            } catch (WakeupException e) {
+                lastError = e;
+            } catch (Exception e) {
+                log.warn("Non-retriable exception caught. Re-throwing. Reason: {}, {}", e.getClass(), e.getMessage());
+                throw e;
+            }
+            Utils.sleep(retryBackoffMs);
+        }
+
+        throw new ConnectException("Fail to retry the task after " + maxRetries + " attempts.  Reason: " + lastError, lastError);

Review comment:
       Suggestion to improve the message:
   ```suggestion
           throw new ConnectException("Fail to retry the task after " + maxAttempts + " attempts.  Reason: " + lastError.getMessage(), lastError);
   ```
   
   And, if `maxRetries == 0` should we just call the function without any special handling, since we're not retrying? For example, should we add something like this very early in the method? Doing that would make it easier to phrase this message, since the `ConnectException` will only be used when at least 1 retry may be attempted.
   ```
   if (maxRetries <= 0) {
       // no special error handling
       return callable.call();
   }

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Callable;
+
+public class RetryUtil {
+    private static final Logger log = LoggerFactory.getLogger(RetryUtil.class);
+
+    /**
+     * The method executes the callable, and performs retries if
+     * {@link org.apache.kafka.connect.errors.RetriableException} is being thrown.  If other types of exceptions is
+     * caught, then the same exception will be rethrown.  If all retries are exhausted, then the last
+     * exception is wrapped into a {@link org.apache.kafka.connect.errors.ConnectException} and rethrown.
+     *
+     * The callable task will be executed at least once.  If <code>maxRetries</code> is set to 0, the task will be
+     * executed exactly once.  If <code>maxRetries</code> is set to <code>n</code>, the callable will be executed at
+     * most <code>n + 1</code> times.
+     *
+     * If <code>retryBackoffMs</code> is set to 0, no wait will happen in between the retries.

Review comment:
       A few corrections to the JavaDoc:
   ```suggestion
        * The method executes the callable at least once, optionally retrying the callable if
        * {@link org.apache.kafka.connect.errors.RetriableException} is thrown. All other types of exceptions
        * are not caught and are rethrown. If all retries are exhausted, the last
        * exception is wrapped with a {@link org.apache.kafka.connect.errors.ConnectException} and thrown.
        *
        * <p>If <code>maxRetries</code> is set to 0, the task will be executed exactly once.
        * If <code>maxRetries</code> is set to <code>n</code>, the callable will be executed at
        * most <code>n + 1</code> times.
        *
        * <p>If <code>retryBackoffMs</code> is set to 0, no wait will happen in between the retries.
   ```

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Callable;
+
+public class RetryUtil {
+    private static final Logger log = LoggerFactory.getLogger(RetryUtil.class);
+
+    /**
+     * The method executes the callable, and performs retries if
+     * {@link org.apache.kafka.connect.errors.RetriableException} is being thrown.  If other types of exceptions is
+     * caught, then the same exception will be rethrown.  If all retries are exhausted, then the last
+     * exception is wrapped into a {@link org.apache.kafka.connect.errors.ConnectException} and rethrown.
+     *
+     * The callable task will be executed at least once.  If <code>maxRetries</code> is set to 0, the task will be
+     * executed exactly once.  If <code>maxRetries</code> is set to <code>n</code>, the callable will be executed at
+     * most <code>n + 1</code> times.
+     *
+     * If <code>retryBackoffMs</code> is set to 0, no wait will happen in between the retries.
+     *
+     * @param callable The task to execute.
+     * @param maxRetries Maximum number of retries.
+     * @param retryBackoffMs Delay time to retry the callable task upon receiving a
+     * {@link org.apache.kafka.connect.errors.RetriableException}.
+     *
+     * @throws ConnectException If the task exhausted all the retries.
+     */
+    public static <T> T retry(Callable<T> callable, long maxRetries, long retryBackoffMs) throws Exception {
+        Throwable lastError = null;
+        int attempt = 0;
+        final long maxAttempts = maxRetries + 1;
+        while (++attempt <= maxAttempts) {
+            try {
+                return callable.call();
+            } catch (RetriableException | org.apache.kafka.connect.errors.RetriableException e) {
+                log.warn("RetriableException caught on attempt {}, retrying automatically up to {} more times. " +
+                        "Reason: {}", attempt, maxRetries - attempt, e.getMessage());
+                lastError = e;
+            } catch (WakeupException e) {
+                lastError = e;
+            } catch (Exception e) {
+                log.warn("Non-retriable exception caught. Re-throwing. Reason: {}, {}", e.getClass(), e.getMessage());
+                throw e;

Review comment:
       Do we need this catch block and log message? We're re-throwing the exception, and the stack trace will include the fact that the function threw the exception, and the log message here doesn't seem to add much detail since it will be caught and dealt with by the caller.
   ```suggestion
   ```
   If we remove this, we should adjust the JavaDoc to not mention "rethrow" since we would not catch the problem and are not rethrowing the exception.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Callable;
+
+public class RetryUtil {
+    private static final Logger log = LoggerFactory.getLogger(RetryUtil.class);
+
+    /**
+     * The method executes the callable, and performs retries if
+     * {@link org.apache.kafka.connect.errors.RetriableException} is being thrown.  If other types of exceptions is
+     * caught, then the same exception will be rethrown.  If all retries are exhausted, then the last
+     * exception is wrapped into a {@link org.apache.kafka.connect.errors.ConnectException} and rethrown.
+     *
+     * The callable task will be executed at least once.  If <code>maxRetries</code> is set to 0, the task will be
+     * executed exactly once.  If <code>maxRetries</code> is set to <code>n</code>, the callable will be executed at
+     * most <code>n + 1</code> times.
+     *
+     * If <code>retryBackoffMs</code> is set to 0, no wait will happen in between the retries.
+     *
+     * @param callable The task to execute.
+     * @param maxRetries Maximum number of retries.
+     * @param retryBackoffMs Delay time to retry the callable task upon receiving a
+     * {@link org.apache.kafka.connect.errors.RetriableException}.
+     *
+     * @throws ConnectException If the task exhausted all the retries.
+     */
+    public static <T> T retry(Callable<T> callable, long maxRetries, long retryBackoffMs) throws Exception {
+        Throwable lastError = null;
+        int attempt = 0;
+        final long maxAttempts = maxRetries + 1;
+        while (++attempt <= maxAttempts) {
+            try {
+                return callable.call();
+            } catch (RetriableException | org.apache.kafka.connect.errors.RetriableException e) {
+                log.warn("RetriableException caught on attempt {}, retrying automatically up to {} more times. " +
+                        "Reason: {}", attempt, maxRetries - attempt, e.getMessage());

Review comment:
       Are we intentionally not logging the exception as the extra parameter? If the exception wraps a more useful exception, we won't see any information about the wrapped exception unless we can see the stack trace in the warning log message.

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/util/RetryUtilTest.java
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.concurrent.Callable;
+
+@RunWith(PowerMockRunner.class)
+public class RetryUtilTest {
+
+    @Mock
+    private Callable<String> mockCallable;
+
+    @SuppressWarnings("unchecked")
+    @Before
+    public void setUp() throws Exception {
+        mockCallable = Mockito.mock(Callable.class);
+    }
+
+    @Test
+    public void TestSuccess() throws Exception {

Review comment:
       Nit: all of these test methods should start with a lowercase character.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Callable;
+
+public class RetryUtil {
+    private static final Logger log = LoggerFactory.getLogger(RetryUtil.class);
+
+    /**
+     * The method executes the callable, and performs retries if
+     * {@link org.apache.kafka.connect.errors.RetriableException} is being thrown.  If other types of exceptions is
+     * caught, then the same exception will be rethrown.  If all retries are exhausted, then the last
+     * exception is wrapped into a {@link org.apache.kafka.connect.errors.ConnectException} and rethrown.
+     *
+     * The callable task will be executed at least once.  If <code>maxRetries</code> is set to 0, the task will be
+     * executed exactly once.  If <code>maxRetries</code> is set to <code>n</code>, the callable will be executed at
+     * most <code>n + 1</code> times.
+     *
+     * If <code>retryBackoffMs</code> is set to 0, no wait will happen in between the retries.
+     *
+     * @param callable The task to execute.
+     * @param maxRetries Maximum number of retries.
+     * @param retryBackoffMs Delay time to retry the callable task upon receiving a
+     * {@link org.apache.kafka.connect.errors.RetriableException}.

Review comment:
       ```suggestion
        * @param callable       the function to execute.
        * @param maxRetries     maximum number of retries; must be 0 or more
        * @param retryBackoffMs the number of milliseconds to delay upon receiving a
        *.                      {@link org.apache.kafka.connect.errors.RetriableException}
        *                       before retrying again; must be 0 or more
   ```

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Callable;
+
+public class RetryUtil {
+    private static final Logger log = LoggerFactory.getLogger(RetryUtil.class);
+
+    /**
+     * The method executes the callable, and performs retries if
+     * {@link org.apache.kafka.connect.errors.RetriableException} is being thrown.  If other types of exceptions is
+     * caught, then the same exception will be rethrown.  If all retries are exhausted, then the last
+     * exception is wrapped into a {@link org.apache.kafka.connect.errors.ConnectException} and rethrown.
+     *
+     * The callable task will be executed at least once.  If <code>maxRetries</code> is set to 0, the task will be
+     * executed exactly once.  If <code>maxRetries</code> is set to <code>n</code>, the callable will be executed at
+     * most <code>n + 1</code> times.
+     *
+     * If <code>retryBackoffMs</code> is set to 0, no wait will happen in between the retries.
+     *
+     * @param callable The task to execute.
+     * @param maxRetries Maximum number of retries.
+     * @param retryBackoffMs Delay time to retry the callable task upon receiving a
+     * {@link org.apache.kafka.connect.errors.RetriableException}.
+     *
+     * @throws ConnectException If the task exhausted all the retries.
+     */
+    public static <T> T retry(Callable<T> callable, long maxRetries, long retryBackoffMs) throws Exception {
+        Throwable lastError = null;
+        int attempt = 0;
+        final long maxAttempts = maxRetries + 1;
+        while (++attempt <= maxAttempts) {
+            try {
+                return callable.call();
+            } catch (RetriableException | org.apache.kafka.connect.errors.RetriableException e) {
+                log.warn("RetriableException caught on attempt {}, retrying automatically up to {} more times. " +
+                        "Reason: {}", attempt, maxRetries - attempt, e.getMessage());
+                lastError = e;
+            } catch (WakeupException e) {
+                lastError = e;
+            } catch (Exception e) {
+                log.warn("Non-retriable exception caught. Re-throwing. Reason: {}, {}", e.getClass(), e.getMessage());
+                throw e;
+            }
+            Utils.sleep(retryBackoffMs);

Review comment:
       ```suggestion
               if (attempt < maxAttempts) {
                   Utils.sleep(retryBackoffMs);
               }
   ```

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
##########
@@ -703,6 +706,20 @@ public Config describeTopicConfig(String topic) {
         return result;
     }
 
+    protected Map<TopicPartition, Long> retryEndOffsets(Set<TopicPartition> partitions) {
+        int maxRetries = Integer.parseInt(adminConfig.getOrDefault(AdminClientConfig.RETRIES_CONFIG, DEFAULT_ADMIN_CLIENT_RETRIES).toString());

Review comment:
       Maybe add a comment here:
   ```suggestion
           // These will be 0 or higher, enforced by the admin client when it is instantiated and configured
           int maxRetries = Integer.parseInt(adminConfig.getOrDefault(AdminClientConfig.RETRIES_CONFIG, DEFAULT_ADMIN_CLIENT_RETRIES).toString());
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on a change in pull request #11797: KAFKA-12879: Revert changes from KAFKA-12339 and instead add retry capability to KafkaBasedLog

Posted by GitBox <gi...@apache.org>.
rhauch commented on a change in pull request #11797:
URL: https://github.com/apache/kafka/pull/11797#discussion_r814336427



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
##########
@@ -703,6 +706,20 @@ public Config describeTopicConfig(String topic) {
         return result;
     }
 
+    protected Map<TopicPartition, Long> retryEndOffsets(Set<TopicPartition> partitions) {
+        int maxRetries = Integer.parseInt(adminConfig.getOrDefault(AdminClientConfig.RETRIES_CONFIG, DEFAULT_ADMIN_CLIENT_RETRIES).toString());
+        long retryBackoffMs = Long.parseLong(adminConfig.getOrDefault(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG, DEFAULT_ADMIN_CLIENT_BACKOFF_MS).toString());
+
+        try {
+            return RetryUtil.retry(
+                    () -> endOffsets(partitions),
+                    maxRetries,

Review comment:
       The values of `maxRetries` and `retryBackoffMs` can each be `0`, and these are passed directly into the `RetryUtil.retry(...)` method. See my comment there. 

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/util/RetryUtilTest.java
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.jupiter.api.Assertions.fail;
+
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.concurrent.Callable;
+
+@RunWith(PowerMockRunner.class)
+public class RetryUtilTest {
+
+    @Mock
+    private Callable<String> mockCallable;
+
+    @SuppressWarnings("unchecked")
+    @Before
+    public void setUp() throws Exception {
+        mockCallable = Mockito.mock(Callable.class);
+    }
+
+    @Test
+    public void TestSuccess() throws Exception {
+        Mockito.when(mockCallable.call()).thenReturn("success");
+        //EasyMock.expect(mockCallable.call()).andReturn("success").times(1);
+        try {
+            assertEquals("success", RetryUtil.retry(mockCallable, 10, 100));
+        } catch (Exception e) {
+            fail("Not expecting an exception: ", e.getCause());
+        }
+        Mockito.verify(mockCallable, Mockito.times(1)).call();
+    }
+
+    @Test
+    public void TestFailingRetries() throws Exception {
+        Mockito.when(mockCallable.call()).thenThrow(new TimeoutException());
+        try {
+            RetryUtil.retry(mockCallable, 10, 100);
+            fail("Expect exception being thrown here");
+        } catch (ConnectException e) {
+            // expecting a connect exception
+        } catch (Exception e) {
+            fail("Only expecting ConnectException");
+        }

Review comment:
       It's much better to use `Assertion.assertThrows(...)`:
   ```suggestion
           ConnectException e = assertThrows(ConnectException.class, RetryUtil.retry(mockCallable, 10, 100));
           assertTrue(e.getMesssage().startsWith("Fail to retry the operation after 10 attempts."));
   ```

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/util/RetryUtilTest.java
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.jupiter.api.Assertions.fail;
+
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.concurrent.Callable;
+
+@RunWith(PowerMockRunner.class)
+public class RetryUtilTest {
+
+    @Mock
+    private Callable<String> mockCallable;
+
+    @SuppressWarnings("unchecked")
+    @Before
+    public void setUp() throws Exception {
+        mockCallable = Mockito.mock(Callable.class);
+    }
+
+    @Test
+    public void TestSuccess() throws Exception {
+        Mockito.when(mockCallable.call()).thenReturn("success");
+        //EasyMock.expect(mockCallable.call()).andReturn("success").times(1);

Review comment:
       Let's remove this line

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/util/RetryUtilTest.java
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.jupiter.api.Assertions.fail;
+
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.concurrent.Callable;
+
+@RunWith(PowerMockRunner.class)
+public class RetryUtilTest {
+
+    @Mock
+    private Callable<String> mockCallable;
+
+    @SuppressWarnings("unchecked")
+    @Before
+    public void setUp() throws Exception {
+        mockCallable = Mockito.mock(Callable.class);
+    }
+
+    @Test
+    public void TestSuccess() throws Exception {
+        Mockito.when(mockCallable.call()).thenReturn("success");
+        //EasyMock.expect(mockCallable.call()).andReturn("success").times(1);
+        try {
+            assertEquals("success", RetryUtil.retry(mockCallable, 10, 100));
+        } catch (Exception e) {
+            fail("Not expecting an exception: ", e.getCause());
+        }
+        Mockito.verify(mockCallable, Mockito.times(1)).call();
+    }
+
+    @Test
+    public void TestFailingRetries() throws Exception {
+        Mockito.when(mockCallable.call()).thenThrow(new TimeoutException());
+        try {
+            RetryUtil.retry(mockCallable, 10, 100);
+            fail("Expect exception being thrown here");
+        } catch (ConnectException e) {
+            // expecting a connect exception
+        } catch (Exception e) {
+            fail("Only expecting ConnectException");
+        }
+        Mockito.verify(mockCallable, Mockito.times(10)).call();
+    }
+
+    @Test
+    public void RetriesEventuallySucceed() throws Exception {
+        Mockito.when(mockCallable.call())
+                .thenThrow(new TimeoutException())
+                .thenThrow(new TimeoutException())
+                .thenThrow(new TimeoutException())
+                .thenReturn("success");
+        try {
+            assertEquals("success", RetryUtil.retry(mockCallable, 10, 100));
+        } catch (Exception e) {
+            fail("Not expecting an exception: ", e.getCause());
+        }
+        Mockito.verify(mockCallable, Mockito.times(4)).call();
+    }
+
+    @Test
+    public void FailWithNonRetriableException() throws Exception {
+        Mockito.when(mockCallable.call())
+                .thenThrow(new TimeoutException("timeout"))
+                .thenThrow(new TimeoutException("timeout"))
+                .thenThrow(new TimeoutException("timeout"))
+                .thenThrow(new TimeoutException("timeout"))
+                .thenThrow(new TimeoutException("timeout"))
+                .thenThrow(new NullPointerException("Non retriable"));
+        try {
+            for (int i = 0; i < 10; i++) {
+                RetryUtil.retry(mockCallable, 10, 100);
+            }
+            fail("Not expecting an exception: ");
+        } catch (ConnectException e) {
+            fail("Should fail with NPE");
+        } catch (NullPointerException e) {
+            // good
+        }

Review comment:
       What is the purpose of the for loop? Why not just:
   ```suggestion
           NullPointerException e = assertThrows(NullPointerException.class, RetryUtil.retry(mockCallable, 10, 100));
           assertTrue(e.getMesssage().startsWith("Non retriable"));
   ```

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Callable;
+
+public class RetryUtil {
+    private static final Logger log = LoggerFactory.getLogger(RetryUtil.class);
+
+    public static <T> T retry(Callable<T> callable, long maxRetries, long retryBackoffMs) throws Exception {
+        Throwable lastError = null;
+        int retries = 0;
+        while (retries++ < maxRetries) {
+            try {
+                return callable.call();

Review comment:
       Even though this method will not be part of our public API, we should still add JavaDoc so that we can document what the allowed values for `maxRetries` and `retryBackoffMs` can be, and what exceptions might be thrown by this method.
   
   Second, is `maxRetries` the total number of _attempts_, or the total number of _retries_ that are allowed. With the way this method is called by `KafkaBasedLog`, it is possible that both or either of these values are `0` -- which makes sense for "retries". But if `retries` is `0`, then this method will retry for a long period of times. Let's fix this logic to handle either or both `maxRetries` and/or `retryBackoffMs` being `0`, and add unit tests to covert this situation.

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/util/RetryUtilTest.java
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.jupiter.api.Assertions.fail;
+
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.concurrent.Callable;
+
+@RunWith(PowerMockRunner.class)
+public class RetryUtilTest {
+
+    @Mock
+    private Callable<String> mockCallable;
+
+    @SuppressWarnings("unchecked")
+    @Before
+    public void setUp() throws Exception {
+        mockCallable = Mockito.mock(Callable.class);
+    }
+
+    @Test
+    public void TestSuccess() throws Exception {
+        Mockito.when(mockCallable.call()).thenReturn("success");
+        //EasyMock.expect(mockCallable.call()).andReturn("success").times(1);
+        try {
+            assertEquals("success", RetryUtil.retry(mockCallable, 10, 100));
+        } catch (Exception e) {
+            fail("Not expecting an exception: ", e.getCause());
+        }

Review comment:
       Is this any better than just:
   ```suggestion
           assertEquals("success", RetryUtil.retry(mockCallable, 10, 100));
   ```

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/util/RetryUtilTest.java
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.jupiter.api.Assertions.fail;
+
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.concurrent.Callable;
+
+@RunWith(PowerMockRunner.class)
+public class RetryUtilTest {
+
+    @Mock
+    private Callable<String> mockCallable;
+
+    @SuppressWarnings("unchecked")
+    @Before
+    public void setUp() throws Exception {
+        mockCallable = Mockito.mock(Callable.class);
+    }
+
+    @Test
+    public void TestSuccess() throws Exception {
+        Mockito.when(mockCallable.call()).thenReturn("success");
+        //EasyMock.expect(mockCallable.call()).andReturn("success").times(1);
+        try {
+            assertEquals("success", RetryUtil.retry(mockCallable, 10, 100));
+        } catch (Exception e) {
+            fail("Not expecting an exception: ", e.getCause());
+        }
+        Mockito.verify(mockCallable, Mockito.times(1)).call();
+    }
+
+    @Test
+    public void TestFailingRetries() throws Exception {
+        Mockito.when(mockCallable.call()).thenThrow(new TimeoutException());
+        try {
+            RetryUtil.retry(mockCallable, 10, 100);
+            fail("Expect exception being thrown here");
+        } catch (ConnectException e) {
+            // expecting a connect exception
+        } catch (Exception e) {
+            fail("Only expecting ConnectException");
+        }
+        Mockito.verify(mockCallable, Mockito.times(10)).call();
+    }
+
+    @Test
+    public void RetriesEventuallySucceed() throws Exception {
+        Mockito.when(mockCallable.call())
+                .thenThrow(new TimeoutException())
+                .thenThrow(new TimeoutException())
+                .thenThrow(new TimeoutException())
+                .thenReturn("success");
+        try {
+            assertEquals("success", RetryUtil.retry(mockCallable, 10, 100));
+        } catch (Exception e) {
+            fail("Not expecting an exception: ", e.getCause());
+        }

Review comment:
       The try-catch is not really needed, since any exception thrown will fail the test:
   ```suggestion
           assertEquals("success", RetryUtil.retry(mockCallable, 10, 100));
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch merged pull request #11797: KAFKA-12879: Revert changes from KAFKA-12339 and instead add retry capability to KafkaBasedLog

Posted by GitBox <gi...@apache.org>.
rhauch merged pull request #11797:
URL: https://github.com/apache/kafka/pull/11797


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] philipnee commented on a change in pull request #11797: KAFKA-12879: Revert changes from KAFKA-12339 and instead add retry capability to KafkaBasedLog

Posted by GitBox <gi...@apache.org>.
philipnee commented on a change in pull request #11797:
URL: https://github.com/apache/kafka/pull/11797#discussion_r815023731



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
##########
@@ -319,9 +319,11 @@ private void poll(long timeoutMs) {
         }
     }
 
-    private void readToLogEnd() {
+    private void readToLogEnd(boolean shouldRetry) {
         Set<TopicPartition> assignment = consumer.assignment();
-        Map<TopicPartition, Long> endOffsets = readEndOffsets(assignment);
+        // readEndOffsets makes listOffsets call to adminClient, if shouldRetry is set to True, the adminClinet
+        // will retry on RetriableExceptions

Review comment:
       Yes! Good call.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] philipnee commented on a change in pull request #11797: KAFKA-12879: Revert changes from KAFKA-12339 and instead add retry capability to KafkaBasedLog

Posted by GitBox <gi...@apache.org>.
philipnee commented on a change in pull request #11797:
URL: https://github.com/apache/kafka/pull/11797#discussion_r818211185



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.concurrent.Callable;
+
+public class RetryUtil {
+    private static final Logger log = LoggerFactory.getLogger(RetryUtil.class);
+
+    /**
+     * The method executes the callable at least once, optionally retrying the callable if
+     * {@link org.apache.kafka.connect.errors.RetriableException} is being thrown.  If timeout is exhausted,
+     * then the last exception is wrapped with a {@link org.apache.kafka.connect.errors.ConnectException} and thrown.
+     *
+     * <p>If {@code timeoutDuration} is set to 0, the task will be
+     * executed exactly once.  If {@code timeoutDuration} is less than {@code retryBackoffMs}, the callable will be
+     * executed only once.
+     *
+     * <p>If {@code retryBackoffMs} is set to 0, no wait will happen in between the retries.
+     *
+     * @param callable          the function to execute.
+     * @param timeoutDuration   timeout duration
+     * @param retryBackoffMs    the number of milliseconds to delay upon receiving a
+     *                          {@link org.apache.kafka.connect.errors.RetriableException} before retrying again;
+     *                          must be 0 or more
+     * @throws ConnectException If the task exhausted all the retries.
+     */
+
+    public static <T> T retryUntilTimeout(Callable<T> callable, Duration timeoutDuration, long retryBackoffMs) throws Exception {
+        long timeoutMs = timeoutDuration.toMillis();
+        if (retryBackoffMs >= timeoutMs) {
+            log.warn("retryBackoffMs, {}, needs to be less than timeoutMs, {}.  Callable will only execute once.", retryBackoffMs, timeoutMs);

Review comment:
       I like this idea.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
##########
@@ -124,6 +124,9 @@ public String toString() {
     public static final int NO_PARTITIONS = -1;
     public static final short NO_REPLICATION_FACTOR = -1;
 
+    private static final String DEFAULT_ADMIN_CLIENT_RETRIES = "10";
+    private static final String DEFAULT_ADMIN_CLIENT_BACKOFF_MS = "100";
+

Review comment:
       ah 🤦 

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.concurrent.Callable;
+
+public class RetryUtil {
+    private static final Logger log = LoggerFactory.getLogger(RetryUtil.class);
+
+    /**
+     * The method executes the callable at least once, optionally retrying the callable if
+     * {@link org.apache.kafka.connect.errors.RetriableException} is being thrown.  If timeout is exhausted,
+     * then the last exception is wrapped with a {@link org.apache.kafka.connect.errors.ConnectException} and thrown.
+     *
+     * <p>If {@code timeoutDuration} is set to 0, the task will be
+     * executed exactly once.  If {@code timeoutDuration} is less than {@code retryBackoffMs}, the callable will be
+     * executed only once.
+     *
+     * <p>If {@code retryBackoffMs} is set to 0, no wait will happen in between the retries.
+     *
+     * @param callable          the function to execute.
+     * @param timeoutDuration   timeout duration
+     * @param retryBackoffMs    the number of milliseconds to delay upon receiving a
+     *                          {@link org.apache.kafka.connect.errors.RetriableException} before retrying again;
+     *                          must be 0 or more
+     * @throws ConnectException If the task exhausted all the retries.
+     */
+
+    public static <T> T retryUntilTimeout(Callable<T> callable, Duration timeoutDuration, long retryBackoffMs) throws Exception {
+        long timeoutMs = timeoutDuration.toMillis();
+        if (retryBackoffMs >= timeoutMs) {
+            log.warn("retryBackoffMs, {}, needs to be less than timeoutMs, {}.  Callable will only execute once.", retryBackoffMs, timeoutMs);
+        }
+
+        if (retryBackoffMs >= timeoutMs ||
+                timeoutMs <= 0) {
+            // no retry
+            return callable.call();
+        }
+
+        long end = System.currentTimeMillis() + timeoutMs;
+        int attempt = 0;
+        Throwable lastError = null;
+        while (System.currentTimeMillis() <= end) {
+            attempt++;
+            try {
+                return callable.call();
+            } catch (RetriableException | org.apache.kafka.connect.errors.RetriableException e) {
+                log.warn("RetriableException caught on attempt {}, retrying automatically. " +
+                        "Reason: {}", attempt, e.getMessage(), e);
+                lastError = e;
+            } catch (WakeupException e) {
+                lastError = e;
+            }
+
+            // if current time is less than the ending time, no more retry is necessary
+            // won't sleep if retryBackoffMs equals to 0
+            if (retryBackoffMs > 0 && System.currentTimeMillis() < end) {
+                Utils.sleep(retryBackoffMs);
+            }

Review comment:
       much better




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on a change in pull request #11797: KAFKA-12879: Revert changes from KAFKA-12339 and instead add retry capability to KafkaBasedLog

Posted by GitBox <gi...@apache.org>.
rhauch commented on a change in pull request #11797:
URL: https://github.com/apache/kafka/pull/11797#discussion_r815174043



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Callable;
+
+public class RetryUtil {
+    private static final Logger log = LoggerFactory.getLogger(RetryUtil.class);
+
+    /**
+     * The method executes the callable at least once, optionally retrying the callable if
+     * {@link org.apache.kafka.connect.errors.RetriableException} is being thrown.  If all retries are exhausted,
+     * then the last exception is wrapped with a {@link org.apache.kafka.connect.errors.ConnectException} and thrown.
+     *
+     * <p>If <code>maxRetries</code> is set to 0, the task will be
+     * executed exactly once.  If <code>maxRetries</code> is set to <code>n</code>, the callable will be executed at
+     * most <code>n + 1</code> times.
+     *
+     * <p>If <code>retryBackoffMs</code> is set to 0, no wait will happen in between the retries.
+     *
+     * @param callable the function to execute.
+     * @param maxRetries maximum number of retries; must be 0 or more
+     * @param retryBackoffMs the number of milliseconds to delay upon receiving a
+     * {@link org.apache.kafka.connect.errors.RetriableException} before retrying again; must be 0 or more
+     *

Review comment:
       Nit on the spacing so the description of parameters is column-aligned.
   ```suggestion
        * @param callable       the function to execute.
        * @param maxRetries     maximum number of retries; must be 0 or more
        * @param retryBackoffMs the number of milliseconds to delay upon receiving a
        *                       {@link org.apache.kafka.connect.errors.RetriableException} before retrying again; 
        *                       must be 0 or more
   ```

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/util/RetryUtilTest.java
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.concurrent.Callable;
+
+@RunWith(PowerMockRunner.class)
+public class RetryUtilTest {
+
+    @Mock

Review comment:
       Do we need this line? I think not since you're instantiating `mockCallable` in `setUp()`. The annotation is really only needed when the Mockito JUnit runner is used to initialize the mock fields.
   
   Also, IIRC we get rid of the `@RunWith(PowerMockRunner.class)` line as well, since this code is not really using anything from PowerMock. It'd be great if we could avoid using PowerMock in new code.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] philipnee commented on a change in pull request #11797: KAFKA-12879: Revert changes from KAFKA-12339 and instead add retry capability to KafkaBasedLog

Posted by GitBox <gi...@apache.org>.
philipnee commented on a change in pull request #11797:
URL: https://github.com/apache/kafka/pull/11797#discussion_r819871715



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.Optional;
+import java.util.concurrent.Callable;
+import java.util.function.Supplier;
+
+public class RetryUtil {
+    private static final Logger log = LoggerFactory.getLogger(RetryUtil.class);
+
+    /**
+     * The method executes the callable at least once, optionally retrying the callable if
+     * {@link org.apache.kafka.connect.errors.RetriableException} is being thrown.  If timeout is exhausted,
+     * then the last exception is wrapped with a {@link org.apache.kafka.connect.errors.ConnectException} and thrown.
+     *
+     * <p>{@code description} supplies the message that indicates the purpose of the callable since the message will
+     * be used for logging.  For example, "list offsets". If the supplier is null or the supplied string is
+     * null, {@code callable} will be used as the default string.
+     *
+     * <p>The task will be executed at least once. No retries will be performed 
+     * if {@code timeoutDuration} is 0 or negative, or if {@code timeoutDuration} is less than {@code retryBackoffMs}.
+     *
+     * <p>A {@code retryBackoffMs} that is negative or zero will result in no delays between retries.
+     *
+     * @param callable          the function to execute
+     * @param description       supplier that provides custom message for logging purpose
+     * @param timeoutDuration   timeout duration; must not be null
+     * @param retryBackoffMs    the number of milliseconds to delay upon receiving a
+     *                          {@link org.apache.kafka.connect.errors.RetriableException} before retrying again
+     * @throws ConnectException If the task exhausted all the retries
+     */
+    public static <T> T retryUntilTimeout(Callable<T> callable, Supplier<String> description, Duration timeoutDuration, long retryBackoffMs) throws Exception {
+        // if null supplier or string is provided, the message will be default to "callabe"
+        String descriptionStr = Optional.ofNullable(description)
+                .map(Supplier::get)
+                .orElse("callable");
+
+        // handling null duration
+        long timeoutMs = Optional.ofNullable(timeoutDuration)
+                .map(Duration::toMillis)
+                .orElse(0L);
+
+        if (retryBackoffMs >= timeoutMs) {
+            log.debug("Executing {} only once, since retryBackoffMs={} is larger than total timeoutMs={}",
+                    descriptionStr, retryBackoffMs, timeoutMs);
+        }
+
+        if (retryBackoffMs < 0) {
+            log.debug("Invalid retryBackoffMs, must be non-negative but got {}. 0 will be used instead",
+                    retryBackoffMs);
+            retryBackoffMs = 0;
+        }
+
+        if (timeoutMs <= 0 || retryBackoffMs >= timeoutMs) {
+            // no retry
+            return callable.call();
+        }
+
+        long end = System.currentTimeMillis() + timeoutMs;
+        int attempt = 0;
+        Throwable lastError = null;
+        while (System.currentTimeMillis() <= end) {

Review comment:
       It's a really good catch, I think if it skips the while loop, a NPE will be thrown when it is trying to run lastError.getMessage().  In this case, lastError is null.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] philipnee commented on a change in pull request #11797: KAFKA-12879: Revert changes from KAFKA-12339 and instead add retry capability to KafkaBasedLog

Posted by GitBox <gi...@apache.org>.
philipnee commented on a change in pull request #11797:
URL: https://github.com/apache/kafka/pull/11797#discussion_r815182457



##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/util/RetryUtilTest.java
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.concurrent.Callable;
+
+@RunWith(PowerMockRunner.class)
+public class RetryUtilTest {
+
+    @Mock

Review comment:
       oh good call - the mock exist because I was doing something else.  Deleting this line.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] philipnee commented on a change in pull request #11797: KAFKA-12879: Revert changes from KAFKA-12339 and instead add retry capability to KafkaBasedLog

Posted by GitBox <gi...@apache.org>.
philipnee commented on a change in pull request #11797:
URL: https://github.com/apache/kafka/pull/11797#discussion_r819086900



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.Optional;
+import java.util.concurrent.Callable;
+import java.util.function.Supplier;
+
+public class RetryUtil {
+    private static final Logger log = LoggerFactory.getLogger(RetryUtil.class);
+
+    /**
+     * The method executes the callable at least once, optionally retrying the callable if
+     * {@link org.apache.kafka.connect.errors.RetriableException} is being thrown.  If timeout is exhausted,
+     * then the last exception is wrapped with a {@link org.apache.kafka.connect.errors.ConnectException} and thrown.
+     *
+     * <p>{@code description} supplies the message that indicates the purpose of the callable since the message will
+     * be used for logging.  For example, "list offsets". If the supplier is null or the supplied string is
+     * null, {@call callable} will be used as the default string.
+     *
+     * <p>If {@code timeoutDuration} is set to 0, the task will be
+     * executed exactly once.  If {@code timeoutDuration} is less than {@code retryBackoffMs}, the callable will be
+     * executed only once.
+     *
+     * <p>If {@code retryBackoffMs} is set to 0, no wait will happen in between the retries.
+     *
+     * @param callable          the function to execute.
+     * @param description       supplier that provides custom message for logging purpose
+     * @param timeoutDuration   timeout duration; may not be null
+     * @param retryBackoffMs    the number of milliseconds to delay upon receiving a
+     *                          {@link org.apache.kafka.connect.errors.RetriableException} before retrying again;
+     *                          must be 0 or more
+     * @throws ConnectException If the task exhausted all the retries.
+     */
+    public static <T> T retryUntilTimeout(Callable<T> callable, Supplier<String> description, Duration timeoutDuration, long retryBackoffMs) throws Exception {
+        if (timeoutDuration == null) {
+            throw new IllegalArgumentException("timeoutDuration cannot be null");
+        }
+
+        // if null supplier or string is provided, the message will be default to "callabe"
+        String descriptionStr = Optional.ofNullable(description)
+                .map(Supplier::get)
+                .orElse("callable");
+
+        long timeoutMs = timeoutDuration.toMillis();
+
+        if (retryBackoffMs >= timeoutMs) {
+            log.warn("Call to {} will only execute once, since retryBackoffMs={} is larger than total timeoutMs={}",
+                    descriptionStr, retryBackoffMs, timeoutMs);
+        }
+
+        if (retryBackoffMs >= timeoutMs ||
+                timeoutMs <= 0) {
+            // no retry
+            return callable.call();
+        }
+
+        long end = System.currentTimeMillis() + timeoutMs;
+        int attempt = 0;
+        Throwable lastError = null;
+        while (System.currentTimeMillis() <= end) {
+            attempt++;
+            try {
+                return callable.call();
+            } catch (RetriableException | org.apache.kafka.connect.errors.RetriableException e) {
+                log.warn("Attempt {} to execute {} resulted in RetriableException; retrying automatically. " +
+                        "Reason: {}", attempt, descriptionStr, e.getMessage(), e);
+                lastError = e;
+            } catch (WakeupException e) {
+                lastError = e;
+            }
+
+            // if current time is less than the ending time, no more retry is necessary
+            // won't sleep if retryBackoffMs equals to 0
+            long millisRemaining = Math.max(0, end - System.currentTimeMillis());
+            if (millisRemaining > 0) {
+                Utils.sleep(retryBackoffMs);
+            }
+        }
+
+        throw new ConnectException("Fail to execute " + descriptionStr + " after " + attempt + " attempts.  Reason: " + lastError.getMessage(), lastError);

Review comment:
       A comment on the logging message, I think they can be grammatically strange if the provided description isn't a verb. 
    E.g., "Fail to logging error". Probably not a big deal.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] philipnee commented on a change in pull request #11797: KAFKA-12879: Revert changes from KAFKA-12339 and instead add retry capability to KafkaBasedLog

Posted by GitBox <gi...@apache.org>.
philipnee commented on a change in pull request #11797:
URL: https://github.com/apache/kafka/pull/11797#discussion_r819187176



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.Optional;
+import java.util.concurrent.Callable;
+import java.util.function.Supplier;
+
+public class RetryUtil {
+    private static final Logger log = LoggerFactory.getLogger(RetryUtil.class);
+
+    /**
+     * The method executes the callable at least once, optionally retrying the callable if
+     * {@link org.apache.kafka.connect.errors.RetriableException} is being thrown.  If timeout is exhausted,
+     * then the last exception is wrapped with a {@link org.apache.kafka.connect.errors.ConnectException} and thrown.
+     *
+     * <p>{@code description} supplies the message that indicates the purpose of the callable since the message will
+     * be used for logging.  For example, "list offsets". If the supplier is null or the supplied string is
+     * null, {@code callable} will be used as the default string.
+     *
+     * <p>If {@code timeoutDuration} is set to 0, the task will be
+     * executed exactly once.  If {@code timeoutDuration} is less than {@code retryBackoffMs}, the callable will be
+     * executed only once.
+     *
+     * <p>If {@code retryBackoffMs} is set to 0, no wait will happen in between the retries.
+     *
+     * @param callable          the function to execute
+     * @param description       supplier that provides custom message for logging purpose
+     * @param timeoutDuration   timeout duration; must not be null or negative
+     * @param retryBackoffMs    the number of milliseconds to delay upon receiving a
+     *                          {@link org.apache.kafka.connect.errors.RetriableException} before retrying again;
+     *                          must be 0 or more
+     * @throws ConnectException If the task exhausted all the retries
+     */
+    public static <T> T retryUntilTimeout(Callable<T> callable, Supplier<String> description, Duration timeoutDuration, long retryBackoffMs) throws Exception {
+        // if null supplier or string is provided, the message will be default to "callabe"
+        String descriptionStr = Optional.ofNullable(description)
+                .map(Supplier::get)
+                .orElse("callable");
+
+        // handling null duration
+        long timeoutMs = Optional.ofNullable(timeoutDuration)
+                .map(Duration::toMillis)
+                .orElse(0L);
+
+        if (retryBackoffMs >= timeoutMs) {
+            log.warn("Executing {} only once, since retryBackoffMs={} is larger than total timeoutMs={}",
+                    descriptionStr, retryBackoffMs, timeoutMs);

Review comment:
       make sense




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on a change in pull request #11797: KAFKA-12879: Revert changes from KAFKA-12339 and instead add retry capability to KafkaBasedLog

Posted by GitBox <gi...@apache.org>.
rhauch commented on a change in pull request #11797:
URL: https://github.com/apache/kafka/pull/11797#discussion_r819182327



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.Optional;
+import java.util.concurrent.Callable;
+import java.util.function.Supplier;
+
+public class RetryUtil {
+    private static final Logger log = LoggerFactory.getLogger(RetryUtil.class);
+
+    /**
+     * The method executes the callable at least once, optionally retrying the callable if
+     * {@link org.apache.kafka.connect.errors.RetriableException} is being thrown.  If timeout is exhausted,
+     * then the last exception is wrapped with a {@link org.apache.kafka.connect.errors.ConnectException} and thrown.
+     *
+     * <p>{@code description} supplies the message that indicates the purpose of the callable since the message will
+     * be used for logging.  For example, "list offsets". If the supplier is null or the supplied string is
+     * null, {@code callable} will be used as the default string.
+     *
+     * <p>If {@code timeoutDuration} is set to 0, the task will be
+     * executed exactly once.  If {@code timeoutDuration} is less than {@code retryBackoffMs}, the callable will be
+     * executed only once.
+     *
+     * <p>If {@code retryBackoffMs} is set to 0, no wait will happen in between the retries.
+     *
+     * @param callable          the function to execute
+     * @param description       supplier that provides custom message for logging purpose
+     * @param timeoutDuration   timeout duration; must not be null or negative
+     * @param retryBackoffMs    the number of milliseconds to delay upon receiving a
+     *                          {@link org.apache.kafka.connect.errors.RetriableException} before retrying again;
+     *                          must be 0 or more
+     * @throws ConnectException If the task exhausted all the retries
+     */
+    public static <T> T retryUntilTimeout(Callable<T> callable, Supplier<String> description, Duration timeoutDuration, long retryBackoffMs) throws Exception {
+        // if null supplier or string is provided, the message will be default to "callabe"
+        String descriptionStr = Optional.ofNullable(description)
+                .map(Supplier::get)
+                .orElse("callable");
+
+        // handling null duration
+        long timeoutMs = Optional.ofNullable(timeoutDuration)
+                .map(Duration::toMillis)
+                .orElse(0L);
+
+        if (retryBackoffMs >= timeoutMs) {
+            log.warn("Executing {} only once, since retryBackoffMs={} is larger than total timeoutMs={}",
+                    descriptionStr, retryBackoffMs, timeoutMs);

Review comment:
       I don't think it's necessary to warn in this case. Maybe debug, but not warn. If callers are concerned, they can check the parameters before calling the method.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.Optional;
+import java.util.concurrent.Callable;
+import java.util.function.Supplier;
+
+public class RetryUtil {
+    private static final Logger log = LoggerFactory.getLogger(RetryUtil.class);
+
+    /**
+     * The method executes the callable at least once, optionally retrying the callable if
+     * {@link org.apache.kafka.connect.errors.RetriableException} is being thrown.  If timeout is exhausted,
+     * then the last exception is wrapped with a {@link org.apache.kafka.connect.errors.ConnectException} and thrown.
+     *
+     * <p>{@code description} supplies the message that indicates the purpose of the callable since the message will
+     * be used for logging.  For example, "list offsets". If the supplier is null or the supplied string is
+     * null, {@code callable} will be used as the default string.
+     *
+     * <p>If {@code timeoutDuration} is set to 0, the task will be
+     * executed exactly once.  If {@code timeoutDuration} is less than {@code retryBackoffMs}, the callable will be
+     * executed only once.

Review comment:
       How about:
   ```suggestion
        * <p>The task will be executed at least once. No retries will be performed 
        * if {@code timeoutDuration} is 0 or negative, or if {@code timeoutDuration} is less than {@code retryBackoffMs}.
   ```

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.Optional;
+import java.util.concurrent.Callable;
+import java.util.function.Supplier;
+
+public class RetryUtil {
+    private static final Logger log = LoggerFactory.getLogger(RetryUtil.class);
+
+    /**
+     * The method executes the callable at least once, optionally retrying the callable if
+     * {@link org.apache.kafka.connect.errors.RetriableException} is being thrown.  If timeout is exhausted,
+     * then the last exception is wrapped with a {@link org.apache.kafka.connect.errors.ConnectException} and thrown.
+     *
+     * <p>{@code description} supplies the message that indicates the purpose of the callable since the message will
+     * be used for logging.  For example, "list offsets". If the supplier is null or the supplied string is
+     * null, {@code callable} will be used as the default string.
+     *
+     * <p>If {@code timeoutDuration} is set to 0, the task will be
+     * executed exactly once.  If {@code timeoutDuration} is less than {@code retryBackoffMs}, the callable will be
+     * executed only once.
+     *
+     * <p>If {@code retryBackoffMs} is set to 0, no wait will happen in between the retries.
+     *
+     * @param callable          the function to execute
+     * @param description       supplier that provides custom message for logging purpose
+     * @param timeoutDuration   timeout duration; must not be null or negative
+     * @param retryBackoffMs    the number of milliseconds to delay upon receiving a
+     *                          {@link org.apache.kafka.connect.errors.RetriableException} before retrying again;
+     *                          must be 0 or more
+     * @throws ConnectException If the task exhausted all the retries
+     */
+    public static <T> T retryUntilTimeout(Callable<T> callable, Supplier<String> description, Duration timeoutDuration, long retryBackoffMs) throws Exception {
+        // if null supplier or string is provided, the message will be default to "callabe"
+        String descriptionStr = Optional.ofNullable(description)
+                .map(Supplier::get)
+                .orElse("callable");
+
+        // handling null duration
+        long timeoutMs = Optional.ofNullable(timeoutDuration)
+                .map(Duration::toMillis)
+                .orElse(0L);
+
+        if (retryBackoffMs >= timeoutMs) {
+            log.warn("Executing {} only once, since retryBackoffMs={} is larger than total timeoutMs={}",
+                    descriptionStr, retryBackoffMs, timeoutMs);
+        }
+
+        if (retryBackoffMs < 0) {
+            log.warn("Invalid retryBackoffMs, must be non-negative but got {}. 0 will be used instead",
+                    retryBackoffMs);

Review comment:
       I don't think it's necessary to warn in this case. Maybe debug, but not warn. If callers are concerned, they can check the parameters before calling the method.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.Optional;
+import java.util.concurrent.Callable;
+import java.util.function.Supplier;
+
+public class RetryUtil {
+    private static final Logger log = LoggerFactory.getLogger(RetryUtil.class);
+
+    /**
+     * The method executes the callable at least once, optionally retrying the callable if
+     * {@link org.apache.kafka.connect.errors.RetriableException} is being thrown.  If timeout is exhausted,
+     * then the last exception is wrapped with a {@link org.apache.kafka.connect.errors.ConnectException} and thrown.
+     *
+     * <p>{@code description} supplies the message that indicates the purpose of the callable since the message will
+     * be used for logging.  For example, "list offsets". If the supplier is null or the supplied string is
+     * null, {@code callable} will be used as the default string.
+     *
+     * <p>If {@code timeoutDuration} is set to 0, the task will be
+     * executed exactly once.  If {@code timeoutDuration} is less than {@code retryBackoffMs}, the callable will be
+     * executed only once.
+     *
+     * <p>If {@code retryBackoffMs} is set to 0, no wait will happen in between the retries.
+     *
+     * @param callable          the function to execute
+     * @param description       supplier that provides custom message for logging purpose
+     * @param timeoutDuration   timeout duration; must not be null or negative

Review comment:
       ```suggestion
        * @param timeoutDuration   timeout duration; must not be null
   ```

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.Optional;
+import java.util.concurrent.Callable;
+import java.util.function.Supplier;
+
+public class RetryUtil {
+    private static final Logger log = LoggerFactory.getLogger(RetryUtil.class);
+
+    /**
+     * The method executes the callable at least once, optionally retrying the callable if
+     * {@link org.apache.kafka.connect.errors.RetriableException} is being thrown.  If timeout is exhausted,
+     * then the last exception is wrapped with a {@link org.apache.kafka.connect.errors.ConnectException} and thrown.
+     *
+     * <p>{@code description} supplies the message that indicates the purpose of the callable since the message will
+     * be used for logging.  For example, "list offsets". If the supplier is null or the supplied string is
+     * null, {@code callable} will be used as the default string.
+     *
+     * <p>If {@code timeoutDuration} is set to 0, the task will be
+     * executed exactly once.  If {@code timeoutDuration} is less than {@code retryBackoffMs}, the callable will be
+     * executed only once.
+     *
+     * <p>If {@code retryBackoffMs} is set to 0, no wait will happen in between the retries.
+     *
+     * @param callable          the function to execute
+     * @param description       supplier that provides custom message for logging purpose
+     * @param timeoutDuration   timeout duration; must not be null or negative
+     * @param retryBackoffMs    the number of milliseconds to delay upon receiving a
+     *                          {@link org.apache.kafka.connect.errors.RetriableException} before retrying again;
+     *                          must be 0 or more

Review comment:
       ```suggestion
        * @param retryBackoffMs    the number of milliseconds to delay upon receiving a
        *                          {@link org.apache.kafka.connect.errors.RetriableException} before retrying again
   ```

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.Optional;
+import java.util.concurrent.Callable;
+import java.util.function.Supplier;
+
+public class RetryUtil {
+    private static final Logger log = LoggerFactory.getLogger(RetryUtil.class);
+
+    /**
+     * The method executes the callable at least once, optionally retrying the callable if
+     * {@link org.apache.kafka.connect.errors.RetriableException} is being thrown.  If timeout is exhausted,
+     * then the last exception is wrapped with a {@link org.apache.kafka.connect.errors.ConnectException} and thrown.
+     *
+     * <p>{@code description} supplies the message that indicates the purpose of the callable since the message will
+     * be used for logging.  For example, "list offsets". If the supplier is null or the supplied string is
+     * null, {@code callable} will be used as the default string.
+     *
+     * <p>If {@code timeoutDuration} is set to 0, the task will be
+     * executed exactly once.  If {@code timeoutDuration} is less than {@code retryBackoffMs}, the callable will be
+     * executed only once.
+     *
+     * <p>If {@code retryBackoffMs} is set to 0, no wait will happen in between the retries.

Review comment:
       ```suggestion
        * <p>A {@code retryBackoffMs} that is negative or zero will result in no delays between retries.
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] philipnee commented on a change in pull request #11797: KAFKA-12879: Revert changes from KAFKA-12339 and instead add retry capability to KafkaBasedLog

Posted by GitBox <gi...@apache.org>.
philipnee commented on a change in pull request #11797:
URL: https://github.com/apache/kafka/pull/11797#discussion_r815154741



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Callable;
+
+public class RetryUtil {
+    private static final Logger log = LoggerFactory.getLogger(RetryUtil.class);
+
+    /**
+     * The method executes the callable, and performs retries if
+     * {@link org.apache.kafka.connect.errors.RetriableException} is being thrown.  If other types of exceptions is
+     * caught, then the same exception will be rethrown.  If all retries are exhausted, then the last
+     * exception is wrapped into a {@link org.apache.kafka.connect.errors.ConnectException} and rethrown.
+     *
+     * The callable task will be executed at least once.  If <code>maxRetries</code> is set to 0, the task will be
+     * executed exactly once.  If <code>maxRetries</code> is set to <code>n</code>, the callable will be executed at
+     * most <code>n + 1</code> times.
+     *
+     * If <code>retryBackoffMs</code> is set to 0, no wait will happen in between the retries.
+     *
+     * @param callable The task to execute.
+     * @param maxRetries Maximum number of retries.
+     * @param retryBackoffMs Delay time to retry the callable task upon receiving a
+     * {@link org.apache.kafka.connect.errors.RetriableException}.
+     *
+     * @throws ConnectException If the task exhausted all the retries.
+     */
+    public static <T> T retry(Callable<T> callable, long maxRetries, long retryBackoffMs) throws Exception {
+        Throwable lastError = null;
+        int attempt = 0;
+        final long maxAttempts = maxRetries + 1;
+        while (++attempt <= maxAttempts) {
+            try {
+                return callable.call();
+            } catch (RetriableException | org.apache.kafka.connect.errors.RetriableException e) {
+                log.warn("RetriableException caught on attempt {}, retrying automatically up to {} more times. " +
+                        "Reason: {}", attempt, maxRetries - attempt, e.getMessage());
+                lastError = e;
+            } catch (WakeupException e) {
+                lastError = e;
+            } catch (Exception e) {
+                log.warn("Non-retriable exception caught. Re-throwing. Reason: {}, {}", e.getClass(), e.getMessage());
+                throw e;
+            }
+            Utils.sleep(retryBackoffMs);
+        }
+
+        throw new ConnectException("Fail to retry the task after " + maxRetries + " attempts.  Reason: " + lastError, lastError);

Review comment:
       Agreed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] philipnee commented on a change in pull request #11797: KAFKA-12879: Revert changes from KAFKA-12339 and instead add retry capability to KafkaBasedLog

Posted by GitBox <gi...@apache.org>.
philipnee commented on a change in pull request #11797:
URL: https://github.com/apache/kafka/pull/11797#discussion_r815156025



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Callable;
+
+public class RetryUtil {
+    private static final Logger log = LoggerFactory.getLogger(RetryUtil.class);
+
+    /**
+     * The method executes the callable, and performs retries if
+     * {@link org.apache.kafka.connect.errors.RetriableException} is being thrown.  If other types of exceptions is
+     * caught, then the same exception will be rethrown.  If all retries are exhausted, then the last
+     * exception is wrapped into a {@link org.apache.kafka.connect.errors.ConnectException} and rethrown.
+     *
+     * The callable task will be executed at least once.  If <code>maxRetries</code> is set to 0, the task will be
+     * executed exactly once.  If <code>maxRetries</code> is set to <code>n</code>, the callable will be executed at
+     * most <code>n + 1</code> times.
+     *
+     * If <code>retryBackoffMs</code> is set to 0, no wait will happen in between the retries.
+     *
+     * @param callable The task to execute.
+     * @param maxRetries Maximum number of retries.
+     * @param retryBackoffMs Delay time to retry the callable task upon receiving a
+     * {@link org.apache.kafka.connect.errors.RetriableException}.
+     *
+     * @throws ConnectException If the task exhausted all the retries.
+     */
+    public static <T> T retry(Callable<T> callable, long maxRetries, long retryBackoffMs) throws Exception {
+        Throwable lastError = null;
+        int attempt = 0;
+        final long maxAttempts = maxRetries + 1;
+        while (++attempt <= maxAttempts) {
+            try {
+                return callable.call();
+            } catch (RetriableException | org.apache.kafka.connect.errors.RetriableException e) {
+                log.warn("RetriableException caught on attempt {}, retrying automatically up to {} more times. " +
+                        "Reason: {}", attempt, maxRetries - attempt, e.getMessage());
+                lastError = e;
+            } catch (WakeupException e) {
+                lastError = e;
+            } catch (Exception e) {
+                log.warn("Non-retriable exception caught. Re-throwing. Reason: {}, {}", e.getClass(), e.getMessage());
+                throw e;
+            }
+            Utils.sleep(retryBackoffMs);

Review comment:
       Agreed, and I actually had the exact same though - though, i thought it would be fine to wait for an additional x-amount of time.  




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] philipnee commented on a change in pull request #11797: KAFKA-12879: Revert changes from KAFKA-12339 and instead add retry capability to KafkaBasedLog

Posted by GitBox <gi...@apache.org>.
philipnee commented on a change in pull request #11797:
URL: https://github.com/apache/kafka/pull/11797#discussion_r819863248



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.Optional;
+import java.util.concurrent.Callable;
+import java.util.function.Supplier;
+
+public class RetryUtil {
+    private static final Logger log = LoggerFactory.getLogger(RetryUtil.class);
+
+    /**
+     * The method executes the callable at least once, optionally retrying the callable if
+     * {@link org.apache.kafka.connect.errors.RetriableException} is being thrown.  If timeout is exhausted,
+     * then the last exception is wrapped with a {@link org.apache.kafka.connect.errors.ConnectException} and thrown.
+     *
+     * <p>{@code description} supplies the message that indicates the purpose of the callable since the message will
+     * be used for logging.  For example, "list offsets". If the supplier is null or the supplied string is
+     * null, {@code callable} will be used as the default string.
+     *
+     * <p>The task will be executed at least once. No retries will be performed 
+     * if {@code timeoutDuration} is 0 or negative, or if {@code timeoutDuration} is less than {@code retryBackoffMs}.
+     *
+     * <p>A {@code retryBackoffMs} that is negative or zero will result in no delays between retries.
+     *
+     * @param callable          the function to execute
+     * @param description       supplier that provides custom message for logging purpose
+     * @param timeoutDuration   timeout duration; must not be null
+     * @param retryBackoffMs    the number of milliseconds to delay upon receiving a
+     *                          {@link org.apache.kafka.connect.errors.RetriableException} before retrying again
+     * @throws ConnectException If the task exhausted all the retries
+     */
+    public static <T> T retryUntilTimeout(Callable<T> callable, Supplier<String> description, Duration timeoutDuration, long retryBackoffMs) throws Exception {
+        // if null supplier or string is provided, the message will be default to "callabe"
+        String descriptionStr = Optional.ofNullable(description)
+                .map(Supplier::get)
+                .orElse("callable");
+
+        // handling null duration
+        long timeoutMs = Optional.ofNullable(timeoutDuration)
+                .map(Duration::toMillis)
+                .orElse(0L);
+
+        if (retryBackoffMs >= timeoutMs) {
+            log.debug("Executing {} only once, since retryBackoffMs={} is larger than total timeoutMs={}",
+                    descriptionStr, retryBackoffMs, timeoutMs);
+        }
+
+        if (retryBackoffMs < 0) {
+            log.debug("Invalid retryBackoffMs, must be non-negative but got {}. 0 will be used instead",
+                    retryBackoffMs);
+            retryBackoffMs = 0;
+        }
+
+        if (timeoutMs <= 0 || retryBackoffMs >= timeoutMs) {
+            // no retry
+            return callable.call();
+        }
+
+        long end = System.currentTimeMillis() + timeoutMs;
+        int attempt = 0;
+        Throwable lastError = null;
+        while (System.currentTimeMillis() <= end) {

Review comment:
       I see, so if the timeoutMs is 1ms, and let's say, it takes 2ms to reach the top of the while loop.  In this case, the function will skip the loop.  
   
   I think using a do..while loop would make sense, and we can also eliminate the duplicated callable.call() above; however, it will now be throwing ConnectException instead of the RetriableException if line 77 is satisfied.  But I think it is okay?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on a change in pull request #11797: KAFKA-12879: Revert changes from KAFKA-12339 and instead add retry capability to KafkaBasedLog

Posted by GitBox <gi...@apache.org>.
rhauch commented on a change in pull request #11797:
URL: https://github.com/apache/kafka/pull/11797#discussion_r819024411



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.Optional;
+import java.util.concurrent.Callable;
+import java.util.function.Supplier;
+
+public class RetryUtil {
+    private static final Logger log = LoggerFactory.getLogger(RetryUtil.class);
+
+    /**
+     * The method executes the callable at least once, optionally retrying the callable if
+     * {@link org.apache.kafka.connect.errors.RetriableException} is being thrown.  If timeout is exhausted,
+     * then the last exception is wrapped with a {@link org.apache.kafka.connect.errors.ConnectException} and thrown.
+     *
+     * <p>{@code description} supplies the message that indicates the purpose of the callable since the message will
+     * be used for logging.  For example, "list offsets". If the supplier is null or the supplied string is
+     * null, {@call callable} will be used as the default string.
+     *
+     * <p>If {@code timeoutDuration} is set to 0, the task will be
+     * executed exactly once.  If {@code timeoutDuration} is less than {@code retryBackoffMs}, the callable will be
+     * executed only once.
+     *
+     * <p>If {@code retryBackoffMs} is set to 0, no wait will happen in between the retries.
+     *
+     * @param callable          the function to execute.
+     * @param description       supplier that provides custom message for logging purpose
+     * @param timeoutDuration   timeout duration; may not be null
+     * @param retryBackoffMs    the number of milliseconds to delay upon receiving a
+     *                          {@link org.apache.kafka.connect.errors.RetriableException} before retrying again;
+     *                          must be 0 or more
+     * @throws ConnectException If the task exhausted all the retries.
+     */
+    public static <T> T retryUntilTimeout(Callable<T> callable, Supplier<String> description, Duration timeoutDuration, long retryBackoffMs) throws Exception {
+        if (timeoutDuration == null) {
+            throw new IllegalArgumentException("timeoutDuration cannot be null");
+        }
+
+        // if null supplier or string is provided, the message will be default to "callabe"
+        String descriptionStr = Optional.ofNullable(description)
+                .map(Supplier::get)
+                .orElse("callable");
+
+        long timeoutMs = timeoutDuration.toMillis();
+
+        if (retryBackoffMs >= timeoutMs) {
+            log.warn("Call to {} will only execute once, since retryBackoffMs={} is larger than total timeoutMs={}",
+                    descriptionStr, retryBackoffMs, timeoutMs);
+        }
+
+        if (retryBackoffMs >= timeoutMs ||
+                timeoutMs <= 0) {
+            // no retry
+            return callable.call();
+        }
+
+        long end = System.currentTimeMillis() + timeoutMs;
+        int attempt = 0;
+        Throwable lastError = null;
+        while (System.currentTimeMillis() <= end) {
+            attempt++;
+            try {
+                return callable.call();
+            } catch (RetriableException | org.apache.kafka.connect.errors.RetriableException e) {
+                log.warn("Attempt {} to execute {} resulted in RetriableException; retrying automatically. " +

Review comment:
       Nit: including "execute" here is completely unnecessary.
   ```suggestion
                   log.warn("Attempt {} to {} resulted in RetriableException; retrying automatically. " +
   ```

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
##########
@@ -703,6 +703,19 @@ public Config describeTopicConfig(String topic) {
         return result;
     }
 
+    protected Map<TopicPartition, Long> retryEndOffsets(Set<TopicPartition> partitions, Duration timeoutDuration, long retryBackoffMs) {

Review comment:
       Why is this protected when `endOffsets(...)` is public? This class is the Connect runtime module, and the class is not part of the public API for Connect.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.Optional;
+import java.util.concurrent.Callable;
+import java.util.function.Supplier;
+
+public class RetryUtil {
+    private static final Logger log = LoggerFactory.getLogger(RetryUtil.class);
+
+    /**
+     * The method executes the callable at least once, optionally retrying the callable if
+     * {@link org.apache.kafka.connect.errors.RetriableException} is being thrown.  If timeout is exhausted,
+     * then the last exception is wrapped with a {@link org.apache.kafka.connect.errors.ConnectException} and thrown.
+     *
+     * <p>{@code description} supplies the message that indicates the purpose of the callable since the message will
+     * be used for logging.  For example, "list offsets". If the supplier is null or the supplied string is
+     * null, {@call callable} will be used as the default string.
+     *
+     * <p>If {@code timeoutDuration} is set to 0, the task will be
+     * executed exactly once.  If {@code timeoutDuration} is less than {@code retryBackoffMs}, the callable will be
+     * executed only once.
+     *
+     * <p>If {@code retryBackoffMs} is set to 0, no wait will happen in between the retries.
+     *
+     * @param callable          the function to execute.
+     * @param description       supplier that provides custom message for logging purpose
+     * @param timeoutDuration   timeout duration; may not be null
+     * @param retryBackoffMs    the number of milliseconds to delay upon receiving a
+     *                          {@link org.apache.kafka.connect.errors.RetriableException} before retrying again;
+     *                          must be 0 or more
+     * @throws ConnectException If the task exhausted all the retries.
+     */
+    public static <T> T retryUntilTimeout(Callable<T> callable, Supplier<String> description, Duration timeoutDuration, long retryBackoffMs) throws Exception {
+        if (timeoutDuration == null) {
+            throw new IllegalArgumentException("timeoutDuration cannot be null");
+        }
+
+        // if null supplier or string is provided, the message will be default to "callabe"
+        String descriptionStr = Optional.ofNullable(description)
+                .map(Supplier::get)
+                .orElse("callable");
+
+        long timeoutMs = timeoutDuration.toMillis();
+
+        if (retryBackoffMs >= timeoutMs) {
+            log.warn("Call to {} will only execute once, since retryBackoffMs={} is larger than total timeoutMs={}",
+                    descriptionStr, retryBackoffMs, timeoutMs);
+        }
+
+        if (retryBackoffMs >= timeoutMs ||
+                timeoutMs <= 0) {
+            // no retry
+            return callable.call();
+        }
+
+        long end = System.currentTimeMillis() + timeoutMs;
+        int attempt = 0;
+        Throwable lastError = null;
+        while (System.currentTimeMillis() <= end) {
+            attempt++;
+            try {
+                return callable.call();
+            } catch (RetriableException | org.apache.kafka.connect.errors.RetriableException e) {
+                log.warn("Attempt {} to execute {} resulted in RetriableException; retrying automatically. " +
+                        "Reason: {}", attempt, descriptionStr, e.getMessage(), e);
+                lastError = e;
+            } catch (WakeupException e) {
+                lastError = e;
+            }
+
+            // if current time is less than the ending time, no more retry is necessary
+            // won't sleep if retryBackoffMs equals to 0
+            long millisRemaining = Math.max(0, end - System.currentTimeMillis());
+            if (millisRemaining > 0) {
+                Utils.sleep(retryBackoffMs);

Review comment:
       What happens if `millisRemaining` is, say, 2 and `retryBackoffMs` is 1000? If `millisRemaining` is positive, then shouldn't we sleep for the smaller of `millisRemaining` or `retryBackoffMs`? IOW:
   ```suggestion
                   Utils.sleep(Math.min(retryBackoffMs, millisRemaining));
   ```

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.Optional;
+import java.util.concurrent.Callable;
+import java.util.function.Supplier;
+
+public class RetryUtil {
+    private static final Logger log = LoggerFactory.getLogger(RetryUtil.class);
+
+    /**
+     * The method executes the callable at least once, optionally retrying the callable if
+     * {@link org.apache.kafka.connect.errors.RetriableException} is being thrown.  If timeout is exhausted,
+     * then the last exception is wrapped with a {@link org.apache.kafka.connect.errors.ConnectException} and thrown.
+     *
+     * <p>{@code description} supplies the message that indicates the purpose of the callable since the message will
+     * be used for logging.  For example, "list offsets". If the supplier is null or the supplied string is
+     * null, {@call callable} will be used as the default string.
+     *
+     * <p>If {@code timeoutDuration} is set to 0, the task will be
+     * executed exactly once.  If {@code timeoutDuration} is less than {@code retryBackoffMs}, the callable will be
+     * executed only once.
+     *
+     * <p>If {@code retryBackoffMs} is set to 0, no wait will happen in between the retries.
+     *
+     * @param callable          the function to execute.
+     * @param description       supplier that provides custom message for logging purpose
+     * @param timeoutDuration   timeout duration; may not be null
+     * @param retryBackoffMs    the number of milliseconds to delay upon receiving a
+     *                          {@link org.apache.kafka.connect.errors.RetriableException} before retrying again;
+     *                          must be 0 or more
+     * @throws ConnectException If the task exhausted all the retries.
+     */
+    public static <T> T retryUntilTimeout(Callable<T> callable, Supplier<String> description, Duration timeoutDuration, long retryBackoffMs) throws Exception {
+        if (timeoutDuration == null) {
+            throw new IllegalArgumentException("timeoutDuration cannot be null");
+        }
+
+        // if null supplier or string is provided, the message will be default to "callabe"
+        String descriptionStr = Optional.ofNullable(description)
+                .map(Supplier::get)
+                .orElse("callable");
+
+        long timeoutMs = timeoutDuration.toMillis();
+
+        if (retryBackoffMs >= timeoutMs) {
+            log.warn("Call to {} will only execute once, since retryBackoffMs={} is larger than total timeoutMs={}",
+                    descriptionStr, retryBackoffMs, timeoutMs);
+        }
+
+        if (retryBackoffMs >= timeoutMs ||
+                timeoutMs <= 0) {
+            // no retry
+            return callable.call();
+        }
+
+        long end = System.currentTimeMillis() + timeoutMs;
+        int attempt = 0;
+        Throwable lastError = null;
+        while (System.currentTimeMillis() <= end) {
+            attempt++;
+            try {
+                return callable.call();
+            } catch (RetriableException | org.apache.kafka.connect.errors.RetriableException e) {
+                log.warn("Attempt {} to execute {} resulted in RetriableException; retrying automatically. " +
+                        "Reason: {}", attempt, descriptionStr, e.getMessage(), e);
+                lastError = e;
+            } catch (WakeupException e) {
+                lastError = e;
+            }
+
+            // if current time is less than the ending time, no more retry is necessary
+            // won't sleep if retryBackoffMs equals to 0
+            long millisRemaining = Math.max(0, end - System.currentTimeMillis());
+            if (millisRemaining > 0) {
+                Utils.sleep(retryBackoffMs);
+            }
+        }
+
+        throw new ConnectException("Fail to execute " + descriptionStr + " after " + attempt + " attempts.  Reason: " + lastError.getMessage(), lastError);

Review comment:
       Nit: including "execute" here is completely unnecessary.
   ```suggestion
           throw new ConnectException("Fail to " + descriptionStr + " after " + attempt + " attempts.  Reason: " + lastError.getMessage(), lastError);
   ```

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.Optional;
+import java.util.concurrent.Callable;
+import java.util.function.Supplier;
+
+public class RetryUtil {
+    private static final Logger log = LoggerFactory.getLogger(RetryUtil.class);
+
+    /**
+     * The method executes the callable at least once, optionally retrying the callable if
+     * {@link org.apache.kafka.connect.errors.RetriableException} is being thrown.  If timeout is exhausted,
+     * then the last exception is wrapped with a {@link org.apache.kafka.connect.errors.ConnectException} and thrown.
+     *
+     * <p>{@code description} supplies the message that indicates the purpose of the callable since the message will
+     * be used for logging.  For example, "list offsets". If the supplier is null or the supplied string is
+     * null, {@call callable} will be used as the default string.
+     *
+     * <p>If {@code timeoutDuration} is set to 0, the task will be
+     * executed exactly once.  If {@code timeoutDuration} is less than {@code retryBackoffMs}, the callable will be
+     * executed only once.
+     *
+     * <p>If {@code retryBackoffMs} is set to 0, no wait will happen in between the retries.
+     *
+     * @param callable          the function to execute.
+     * @param description       supplier that provides custom message for logging purpose
+     * @param timeoutDuration   timeout duration; may not be null
+     * @param retryBackoffMs    the number of milliseconds to delay upon receiving a
+     *                          {@link org.apache.kafka.connect.errors.RetriableException} before retrying again;
+     *                          must be 0 or more
+     * @throws ConnectException If the task exhausted all the retries.
+     */
+    public static <T> T retryUntilTimeout(Callable<T> callable, Supplier<String> description, Duration timeoutDuration, long retryBackoffMs) throws Exception {
+        if (timeoutDuration == null) {
+            throw new IllegalArgumentException("timeoutDuration cannot be null");
+        }
+
+        // if null supplier or string is provided, the message will be default to "callabe"
+        String descriptionStr = Optional.ofNullable(description)
+                .map(Supplier::get)
+                .orElse("callable");
+
+        long timeoutMs = timeoutDuration.toMillis();
+
+        if (retryBackoffMs >= timeoutMs) {
+            log.warn("Call to {} will only execute once, since retryBackoffMs={} is larger than total timeoutMs={}",
+                    descriptionStr, retryBackoffMs, timeoutMs);

Review comment:
       Nit:
   ```suggestion
               log.warn("Executing {} only once, since retryBackoffMs={} is larger than total timeoutMs={}",
                       descriptionStr, retryBackoffMs, timeoutMs);
   ```

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
##########
@@ -703,6 +703,19 @@ public Config describeTopicConfig(String topic) {
         return result;
     }
 
+    protected Map<TopicPartition, Long> retryEndOffsets(Set<TopicPartition> partitions, Duration timeoutDuration, long retryBackoffMs) {
+
+        try {
+            return RetryUtil.retryUntilTimeout(
+                    () -> endOffsets(partitions),
+                    () -> "list offsets",
+                    timeoutDuration,
+                    retryBackoffMs);
+        } catch (Exception e) {
+            throw new ConnectException("Failed to read offsets for topic partitions.", e);
+        }

Review comment:
       We should use consisting phrasing, and actually "list offsets for topic partitions" most closely aligns with the admin client call that `endOffsets(...)` is using. So how about:
   ```suggestion
           try {
               return RetryUtil.retryUntilTimeout(
                       () -> endOffsets(partitions),
                       () -> "list offsets for topic partitions",
                       timeoutDuration,
                       retryBackoffMs);
           } catch (Exception e) {
               throw new ConnectException("Failed to list offsets for topic partitions.", e);
           }
   ```
   Also, using `list offsets for topic partitions` as the retry function description works quite well with the log and exception message within `retryUntilTimeout(...)` method.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on a change in pull request #11797: KAFKA-12879: Revert changes from KAFKA-12339 and instead add retry capability to KafkaBasedLog

Posted by GitBox <gi...@apache.org>.
rhauch commented on a change in pull request #11797:
URL: https://github.com/apache/kafka/pull/11797#discussion_r819851789



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.Optional;
+import java.util.concurrent.Callable;
+import java.util.function.Supplier;
+
+public class RetryUtil {
+    private static final Logger log = LoggerFactory.getLogger(RetryUtil.class);
+
+    /**
+     * The method executes the callable at least once, optionally retrying the callable if
+     * {@link org.apache.kafka.connect.errors.RetriableException} is being thrown.  If timeout is exhausted,
+     * then the last exception is wrapped with a {@link org.apache.kafka.connect.errors.ConnectException} and thrown.
+     *
+     * <p>{@code description} supplies the message that indicates the purpose of the callable since the message will
+     * be used for logging.  For example, "list offsets". If the supplier is null or the supplied string is
+     * null, {@code callable} will be used as the default string.
+     *
+     * <p>The task will be executed at least once. No retries will be performed 
+     * if {@code timeoutDuration} is 0 or negative, or if {@code timeoutDuration} is less than {@code retryBackoffMs}.
+     *
+     * <p>A {@code retryBackoffMs} that is negative or zero will result in no delays between retries.
+     *
+     * @param callable          the function to execute
+     * @param description       supplier that provides custom message for logging purpose
+     * @param timeoutDuration   timeout duration; must not be null
+     * @param retryBackoffMs    the number of milliseconds to delay upon receiving a
+     *                          {@link org.apache.kafka.connect.errors.RetriableException} before retrying again
+     * @throws ConnectException If the task exhausted all the retries
+     */
+    public static <T> T retryUntilTimeout(Callable<T> callable, Supplier<String> description, Duration timeoutDuration, long retryBackoffMs) throws Exception {
+        // if null supplier or string is provided, the message will be default to "callabe"
+        String descriptionStr = Optional.ofNullable(description)
+                .map(Supplier::get)
+                .orElse("callable");
+
+        // handling null duration
+        long timeoutMs = Optional.ofNullable(timeoutDuration)
+                .map(Duration::toMillis)
+                .orElse(0L);
+
+        if (retryBackoffMs >= timeoutMs) {
+            log.debug("Executing {} only once, since retryBackoffMs={} is larger than total timeoutMs={}",
+                    descriptionStr, retryBackoffMs, timeoutMs);
+        }
+
+        if (retryBackoffMs < 0) {
+            log.debug("Invalid retryBackoffMs, must be non-negative but got {}. 0 will be used instead",
+                    retryBackoffMs);
+            retryBackoffMs = 0;
+        }
+
+        if (timeoutMs <= 0 || retryBackoffMs >= timeoutMs) {
+            // no retry
+            return callable.call();
+        }
+
+        long end = System.currentTimeMillis() + timeoutMs;
+        int attempt = 0;
+        Throwable lastError = null;
+        while (System.currentTimeMillis() <= end) {

Review comment:
       I think there's an edge case where `timeoutMs` is positive but small enough that the condition on line 77 is not met but the while loop on line 85 is not satisfied because the end time has already passed. In this edge case, we might not call the callable function (even once).
   
   One option is to change the while loop to be a do-while loop so that we always go through one loop. Another option is to compute the remaining time before line 77 and not update it before the while loop. Either would work, but one of the options may require fewer duplicated lines. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on a change in pull request #11797: KAFKA-12879: Revert changes from KAFKA-12339 and instead add retry capability to KafkaBasedLog

Posted by GitBox <gi...@apache.org>.
rhauch commented on a change in pull request #11797:
URL: https://github.com/apache/kafka/pull/11797#discussion_r815158301



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Callable;
+
+public class RetryUtil {
+    private static final Logger log = LoggerFactory.getLogger(RetryUtil.class);
+
+    /**
+     * The method executes the callable, and performs retries if
+     * {@link org.apache.kafka.connect.errors.RetriableException} is being thrown.  If other types of exceptions is
+     * caught, then the same exception will be rethrown.  If all retries are exhausted, then the last
+     * exception is wrapped into a {@link org.apache.kafka.connect.errors.ConnectException} and rethrown.
+     *
+     * The callable task will be executed at least once.  If <code>maxRetries</code> is set to 0, the task will be
+     * executed exactly once.  If <code>maxRetries</code> is set to <code>n</code>, the callable will be executed at
+     * most <code>n + 1</code> times.
+     *
+     * If <code>retryBackoffMs</code> is set to 0, no wait will happen in between the retries.
+     *
+     * @param callable The task to execute.
+     * @param maxRetries Maximum number of retries.
+     * @param retryBackoffMs Delay time to retry the callable task upon receiving a
+     * {@link org.apache.kafka.connect.errors.RetriableException}.
+     *
+     * @throws ConnectException If the task exhausted all the retries.
+     */
+    public static <T> T retry(Callable<T> callable, long maxRetries, long retryBackoffMs) throws Exception {
+        Throwable lastError = null;
+        int attempt = 0;
+        final long maxAttempts = maxRetries + 1;
+        while (++attempt <= maxAttempts) {
+            try {
+                return callable.call();
+            } catch (RetriableException | org.apache.kafka.connect.errors.RetriableException e) {
+                log.warn("RetriableException caught on attempt {}, retrying automatically up to {} more times. " +
+                        "Reason: {}", attempt, maxRetries - attempt, e.getMessage());
+                lastError = e;
+            } catch (WakeupException e) {
+                lastError = e;
+            } catch (Exception e) {
+                log.warn("Non-retriable exception caught. Re-throwing. Reason: {}, {}", e.getClass(), e.getMessage());
+                throw e;
+            }
+            Utils.sleep(retryBackoffMs);

Review comment:
       Yeah, I think it's worth the bit of logic to fail more quickly.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on a change in pull request #11797: KAFKA-12879: Revert changes from KAFKA-12339 and instead add retry capability to KafkaBasedLog

Posted by GitBox <gi...@apache.org>.
rhauch commented on a change in pull request #11797:
URL: https://github.com/apache/kafka/pull/11797#discussion_r816108391



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Callable;
+
+public class RetryUtil {
+    private static final Logger log = LoggerFactory.getLogger(RetryUtil.class);
+
+    /**
+     * The method executes the callable at least once, optionally retrying the callable if
+     * {@link org.apache.kafka.connect.errors.RetriableException} is being thrown.  If all retries are exhausted,
+     * then the last exception is wrapped with a {@link org.apache.kafka.connect.errors.ConnectException} and thrown.
+     *
+     * <p>If {@code maxRetries} is set to 0, the task will be
+     * executed exactly once.  If {@code maxRetries} is set to ,{@code n} the callable will be executed at

Review comment:
       Nit:
   ```suggestion
        * executed exactly once.  If {@code maxRetries} is set to {@code n}, the callable will be executed at
   ```

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
##########
@@ -319,9 +319,18 @@ private void poll(long timeoutMs) {
         }
     }
 
-    private void readToLogEnd() {
+    /**
+     * This method finds the end offsets using the {@code listOffsets()} method of the admin client.
+     * As the {@code listOffsets()} method might throw a {@link RetriableException}, the {@code shouldRetry}
+     * flag enables retry, upon catching such exception, if it is set to {@code True}.
+     *
+     * @param shouldRetry Boolean flag to enable retry for the admin client {@code listOffsets()} call.
+     * @see TopicAdmin#retryEndOffsets
+     */
+

Review comment:
       Nit:
   ```suggestion
   ```

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Callable;
+
+public class RetryUtil {
+    private static final Logger log = LoggerFactory.getLogger(RetryUtil.class);
+
+    /**
+     * The method executes the callable at least once, optionally retrying the callable if
+     * {@link org.apache.kafka.connect.errors.RetriableException} is being thrown.  If all retries are exhausted,
+     * then the last exception is wrapped with a {@link org.apache.kafka.connect.errors.ConnectException} and thrown.
+     *
+     * <p>If <code>maxRetries</code> is set to 0, the task will be
+     * executed exactly once.  If <code>maxRetries</code> is set to <code>n</code>, the callable will be executed at
+     * most <code>n + 1</code> times.
+     *
+     * <p>If <code>retryBackoffMs</code> is set to 0, no wait will happen in between the retries.
+     *
+     * @param callable the function to execute.
+     * @param maxRetries maximum number of retries; must be 0 or more
+     * @param retryBackoffMs the number of milliseconds to delay upon receiving a
+     * {@link org.apache.kafka.connect.errors.RetriableException} before retrying again; must be 0 or more
+     *

Review comment:
       @philipnee can you please correct this spacing to reflect the project standards? Thanks!

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
##########
@@ -124,6 +124,9 @@ public String toString() {
     public static final int NO_PARTITIONS = -1;
     public static final short NO_REPLICATION_FACTOR = -1;
 
+    private static final String DEFAULT_ADMIN_CLIENT_RETRIES = "10";
+    private static final String DEFAULT_ADMIN_CLIENT_BACKOFF_MS = "100";
+

Review comment:
       Okay, so I was verifying what these defaults really were on the `AdminClientConfig`, and I noticed that [AdminClientConfig.RETRIES_CONFIG](https://github.com/apache/kafka/blob/2ccc834faa3fffcd5d15d2463aeef3ee6f5cea13/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java#L173-L178) defaults `Int.MAX_VALUE`. I did a bit of research, and found [KIP-533](https://cwiki.apache.org/confluence/display/KAFKA/KIP-533%3A+Add+default+api+timeout+to+AdminClient) changed the defaults, and I don't know that we can/should count on `AdminClientConfig.RETRIES`.
   
   I consulted with @kkonstantine, and after a bit of discussion we agree that it would be better to not make these configurable and to instead define the maximum duration to try to read the offsets as a constant (e.g., 15 minutes), plus a constant for retry backoff millis (e.g., 10 seconds). The max duration is easier to reason about.
   
   The 15 minutes is a balance between retrying long enough to work with most incidents of metadata propagation taking too long, and short enough to fail if the worker cannot start up.
   
   Interestingly, I don't think the duration needs to be defined in `TopicAdmin`, and is better defined in `KafkaBasedLog.start()`. That way, the `KafkaBasedLog` passes the duration into the `TopicAdmin.endOffset(Set<TopicPartition> partitions, Duration timeout)` method, which is the method that calls the `RetryUtil.retry(Callable<T> callable, Duration retryTimeout, long retryBackoffMs)`.
   
   This isn't too much different than what you have, but I do think it's cleaner. I don't think we have seen too many retriable exception during `KafkaBasedLog.start()`, which means it's kind of an edge case.
   
   WDYT?
   
   
   
   Second, it might be good to add these methods:
   ```
   public Map<TopicPartition, Long> retryEndOffsets(Set<TopicPartition> partitions, Duration timeout) { ... }
   ```
   that retries for the given timeout while using `DEFAULT_ADMIN_CLIENT_BACKOFF_MS` for backoffs. Then, if you change `KafkaBasedLog.readEndOffsets(...)` and `e the `KafkaBasedLog.readToLogEnd(...)` to take a `Duration timeout` rather than a boolean, then `KafkaBasedLog.start() could just call `readToLogEnd(Duration.ofMinutes(15))` while `KafkaBasedLog.WorkThread.run()` could call `readToLogEnd(null)`.
   
   You'd need to refactor your `retry(...)` method a bit to take the `Duration`. You could also 

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
##########
@@ -319,9 +319,18 @@ private void poll(long timeoutMs) {
         }
     }
 
-    private void readToLogEnd() {
+    /**
+     * This method finds the end offsets using the {@code listOffsets()} method of the admin client.
+     * As the {@code listOffsets()} method might throw a {@link RetriableException}, the {@code shouldRetry}
+     * flag enables retry, upon catching such exception, if it is set to {@code True}.

Review comment:
       Suggestions:
   ```suggestion
        * This method finds the end offsets of the Kafka log's topic partitions, optionally retrying
        * if the {@code listOffsets()} method of the admin client throws a {@link RetriableException}.
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] philipnee commented on a change in pull request #11797: KAFKA-12879: Revert changes from KAFKA-12339 and instead add retry capability to KafkaBasedLog

Posted by GitBox <gi...@apache.org>.
philipnee commented on a change in pull request #11797:
URL: https://github.com/apache/kafka/pull/11797#discussion_r819898973



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.Optional;
+import java.util.concurrent.Callable;
+import java.util.function.Supplier;
+
+public class RetryUtil {
+    private static final Logger log = LoggerFactory.getLogger(RetryUtil.class);
+
+    /**
+     * The method executes the callable at least once, optionally retrying the callable if
+     * {@link org.apache.kafka.connect.errors.RetriableException} is being thrown.  If timeout is exhausted,
+     * then the last exception is wrapped with a {@link org.apache.kafka.connect.errors.ConnectException} and thrown.
+     *
+     * <p>{@code description} supplies the message that indicates the purpose of the callable since the message will
+     * be used for logging.  For example, "list offsets". If the supplier is null or the supplied string is
+     * null, {@code callable} will be used as the default string.
+     *
+     * <p>The task will be executed at least once. No retries will be performed 
+     * if {@code timeoutDuration} is 0 or negative, or if {@code timeoutDuration} is less than {@code retryBackoffMs}.
+     *
+     * <p>A {@code retryBackoffMs} that is negative or zero will result in no delays between retries.
+     *
+     * @param callable          the function to execute
+     * @param description       supplier that provides custom message for logging purpose
+     * @param timeoutDuration   timeout duration; must not be null
+     * @param retryBackoffMs    the number of milliseconds to delay upon receiving a
+     *                          {@link org.apache.kafka.connect.errors.RetriableException} before retrying again
+     * @throws ConnectException If the task exhausted all the retries
+     */
+    public static <T> T retryUntilTimeout(Callable<T> callable, Supplier<String> description, Duration timeoutDuration, long retryBackoffMs) throws Exception {
+        // if null supplier or string is provided, the message will be default to "callabe"
+        String descriptionStr = Optional.ofNullable(description)
+                .map(Supplier::get)
+                .orElse("callable");
+
+        // handling null duration
+        long timeoutMs = Optional.ofNullable(timeoutDuration)
+                .map(Duration::toMillis)
+                .orElse(0L);
+
+        if (retryBackoffMs >= timeoutMs) {
+            log.debug("Executing {} only once, since retryBackoffMs={} is larger than total timeoutMs={}",
+                    descriptionStr, retryBackoffMs, timeoutMs);
+        }
+
+        if (retryBackoffMs < 0) {
+            log.debug("Invalid retryBackoffMs, must be non-negative but got {}. 0 will be used instead",
+                    retryBackoffMs);
+            retryBackoffMs = 0;
+        }
+
+        if (timeoutMs <= 0 || retryBackoffMs >= timeoutMs) {
+            // no retry
+            return callable.call();
+        }
+
+        long end = System.currentTimeMillis() + timeoutMs;
+        int attempt = 0;
+        Throwable lastError = null;
+        while (System.currentTimeMillis() <= end) {

Review comment:
       I see - I should then keep the line 77 to handle special cases.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on a change in pull request #11797: KAFKA-12879: Revert changes from KAFKA-12339 and instead add retry capability to KafkaBasedLog

Posted by GitBox <gi...@apache.org>.
rhauch commented on a change in pull request #11797:
URL: https://github.com/apache/kafka/pull/11797#discussion_r818198568



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.concurrent.Callable;
+
+public class RetryUtil {
+    private static final Logger log = LoggerFactory.getLogger(RetryUtil.class);
+
+    /**
+     * The method executes the callable at least once, optionally retrying the callable if
+     * {@link org.apache.kafka.connect.errors.RetriableException} is being thrown.  If timeout is exhausted,
+     * then the last exception is wrapped with a {@link org.apache.kafka.connect.errors.ConnectException} and thrown.
+     *
+     * <p>If {@code timeoutDuration} is set to 0, the task will be
+     * executed exactly once.  If {@code timeoutDuration} is less than {@code retryBackoffMs}, the callable will be
+     * executed only once.
+     *
+     * <p>If {@code retryBackoffMs} is set to 0, no wait will happen in between the retries.
+     *
+     * @param callable          the function to execute.
+     * @param timeoutDuration   timeout duration
+     * @param retryBackoffMs    the number of milliseconds to delay upon receiving a
+     *                          {@link org.apache.kafka.connect.errors.RetriableException} before retrying again;
+     *                          must be 0 or more
+     * @throws ConnectException If the task exhausted all the retries.
+     */
+
+    public static <T> T retryUntilTimeout(Callable<T> callable, Duration timeoutDuration, long retryBackoffMs) throws Exception {
+        long timeoutMs = timeoutDuration.toMillis();
+        if (retryBackoffMs >= timeoutMs) {
+            log.warn("retryBackoffMs, {}, needs to be less than timeoutMs, {}.  Callable will only execute once.", retryBackoffMs, timeoutMs);
+        }
+
+        if (retryBackoffMs >= timeoutMs ||
+                timeoutMs <= 0) {
+            // no retry
+            return callable.call();
+        }
+
+        long end = System.currentTimeMillis() + timeoutMs;
+        int attempt = 0;
+        Throwable lastError = null;
+        while (System.currentTimeMillis() <= end) {
+            attempt++;
+            try {
+                return callable.call();
+            } catch (RetriableException | org.apache.kafka.connect.errors.RetriableException e) {
+                log.warn("RetriableException caught on attempt {}, retrying automatically. " +
+                        "Reason: {}", attempt, e.getMessage(), e);

Review comment:
       Likewise, this log message could be changed to:
   ```suggestion
                   log.warn("Attempt {} to {} resulted in RetriableException; retrying automatically. " +
                           "Reason: {}", attempt, description.get(), e.getMessage(), e);
   ```
   

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
##########
@@ -124,6 +124,9 @@ public String toString() {
     public static final int NO_PARTITIONS = -1;
     public static final short NO_REPLICATION_FACTOR = -1;
 
+    private static final String DEFAULT_ADMIN_CLIENT_RETRIES = "10";
+    private static final String DEFAULT_ADMIN_CLIENT_BACKOFF_MS = "100";
+

Review comment:
       These are unused now, right?

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.concurrent.Callable;
+
+public class RetryUtil {
+    private static final Logger log = LoggerFactory.getLogger(RetryUtil.class);
+
+    /**
+     * The method executes the callable at least once, optionally retrying the callable if
+     * {@link org.apache.kafka.connect.errors.RetriableException} is being thrown.  If timeout is exhausted,
+     * then the last exception is wrapped with a {@link org.apache.kafka.connect.errors.ConnectException} and thrown.
+     *
+     * <p>If {@code timeoutDuration} is set to 0, the task will be
+     * executed exactly once.  If {@code timeoutDuration} is less than {@code retryBackoffMs}, the callable will be
+     * executed only once.
+     *
+     * <p>If {@code retryBackoffMs} is set to 0, no wait will happen in between the retries.
+     *
+     * @param callable          the function to execute.
+     * @param timeoutDuration   timeout duration
+     * @param retryBackoffMs    the number of milliseconds to delay upon receiving a
+     *                          {@link org.apache.kafka.connect.errors.RetriableException} before retrying again;
+     *                          must be 0 or more
+     * @throws ConnectException If the task exhausted all the retries.
+     */
+
+    public static <T> T retryUntilTimeout(Callable<T> callable, Duration timeoutDuration, long retryBackoffMs) throws Exception {
+        long timeoutMs = timeoutDuration.toMillis();
+        if (retryBackoffMs >= timeoutMs) {
+            log.warn("retryBackoffMs, {}, needs to be less than timeoutMs, {}.  Callable will only execute once.", retryBackoffMs, timeoutMs);

Review comment:
       All of the log messages in this method could use a description of the callable. WDYT about adding a `Supplier<String> description` parameter, and using that in the log messages. For example, the `KafkaBasedLog` could supply 
   ```
   () -> "list offsets for topic '" + topicName + "'"
   ```
   then this log message might be:
   
   ```suggestion
               log.warn("Call to {} will only execute once, since retryBackoffMs={} is larger than total timeoutMs={}"),
                   description.get(), retryBackoffMs, timeoutMs);
   ```

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
##########
@@ -465,6 +467,55 @@ public void verifyingGettingTopicCleanupPolicies() {
         }
     }
 
+    @Test
+    public void retryEndOffsetsShouldThrowConnectException() {
+        String topicName = "myTopic";
+        TopicPartition tp1 = new TopicPartition(topicName, 0);
+        Set<TopicPartition> tps = Collections.singleton(tp1);
+        Long offset = 1000L;
+        Cluster cluster = createCluster(1, "myTopic", 1);
+
+        try (final AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(new MockTime(10), cluster)) {
+            Map<TopicPartition, Long> offsetMap = new HashMap<>();
+            offsetMap.put(tp1, offset);
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.UNKNOWN_TOPIC_OR_PARTITION, Errors.NONE));
+            Map<String, Object> adminConfig = new HashMap<>();
+            adminConfig.put(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG, "0");
+            TopicAdmin admin = new TopicAdmin(adminConfig, env.adminClient());
+
+            assertThrows(ConnectException.class, () -> {
+                admin.retryEndOffsets(tps, Duration.ofMillis(100), 1);
+            });
+        }
+    }
+
+    @Test
+    public void endOffsetsShouldRetryWhenTopicNotFound() {

Review comment:
       Nit:
   ```suggestion
       public void retryEndOffsetsShouldRetryWhenTopicNotFound() {
   ```

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.concurrent.Callable;
+
+public class RetryUtil {
+    private static final Logger log = LoggerFactory.getLogger(RetryUtil.class);
+
+    /**
+     * The method executes the callable at least once, optionally retrying the callable if
+     * {@link org.apache.kafka.connect.errors.RetriableException} is being thrown.  If timeout is exhausted,
+     * then the last exception is wrapped with a {@link org.apache.kafka.connect.errors.ConnectException} and thrown.
+     *
+     * <p>If {@code timeoutDuration} is set to 0, the task will be
+     * executed exactly once.  If {@code timeoutDuration} is less than {@code retryBackoffMs}, the callable will be
+     * executed only once.
+     *
+     * <p>If {@code retryBackoffMs} is set to 0, no wait will happen in between the retries.
+     *
+     * @param callable          the function to execute.
+     * @param timeoutDuration   timeout duration
+     * @param retryBackoffMs    the number of milliseconds to delay upon receiving a
+     *                          {@link org.apache.kafka.connect.errors.RetriableException} before retrying again;
+     *                          must be 0 or more
+     * @throws ConnectException If the task exhausted all the retries.
+     */
+
+    public static <T> T retryUntilTimeout(Callable<T> callable, Duration timeoutDuration, long retryBackoffMs) throws Exception {
+        long timeoutMs = timeoutDuration.toMillis();
+        if (retryBackoffMs >= timeoutMs) {
+            log.warn("retryBackoffMs, {}, needs to be less than timeoutMs, {}.  Callable will only execute once.", retryBackoffMs, timeoutMs);
+        }
+
+        if (retryBackoffMs >= timeoutMs ||
+                timeoutMs <= 0) {
+            // no retry
+            return callable.call();
+        }
+
+        long end = System.currentTimeMillis() + timeoutMs;
+        int attempt = 0;
+        Throwable lastError = null;
+        while (System.currentTimeMillis() <= end) {
+            attempt++;
+            try {
+                return callable.call();
+            } catch (RetriableException | org.apache.kafka.connect.errors.RetriableException e) {
+                log.warn("RetriableException caught on attempt {}, retrying automatically. " +
+                        "Reason: {}", attempt, e.getMessage(), e);
+                lastError = e;
+            } catch (WakeupException e) {
+                lastError = e;
+            }
+
+            // if current time is less than the ending time, no more retry is necessary
+            // won't sleep if retryBackoffMs equals to 0
+            if (retryBackoffMs > 0 && System.currentTimeMillis() < end) {
+                Utils.sleep(retryBackoffMs);
+            }

Review comment:
       WDYT about this:
   ```suggestion
               long millisRemaining = Math.max(0, end - System.currentTimeMillis());
               if (millisRemaining > 0) {
                   Utils.sleep(millisRemaining)
               }
   ```

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.concurrent.Callable;
+
+public class RetryUtil {
+    private static final Logger log = LoggerFactory.getLogger(RetryUtil.class);
+
+    /**
+     * The method executes the callable at least once, optionally retrying the callable if
+     * {@link org.apache.kafka.connect.errors.RetriableException} is being thrown.  If timeout is exhausted,
+     * then the last exception is wrapped with a {@link org.apache.kafka.connect.errors.ConnectException} and thrown.
+     *
+     * <p>If {@code timeoutDuration} is set to 0, the task will be
+     * executed exactly once.  If {@code timeoutDuration} is less than {@code retryBackoffMs}, the callable will be
+     * executed only once.
+     *
+     * <p>If {@code retryBackoffMs} is set to 0, no wait will happen in between the retries.
+     *
+     * @param callable          the function to execute.
+     * @param timeoutDuration   timeout duration
+     * @param retryBackoffMs    the number of milliseconds to delay upon receiving a
+     *                          {@link org.apache.kafka.connect.errors.RetriableException} before retrying again;
+     *                          must be 0 or more
+     * @throws ConnectException If the task exhausted all the retries.
+     */
+
+    public static <T> T retryUntilTimeout(Callable<T> callable, Duration timeoutDuration, long retryBackoffMs) throws Exception {
+        long timeoutMs = timeoutDuration.toMillis();
+        if (retryBackoffMs >= timeoutMs) {
+            log.warn("retryBackoffMs, {}, needs to be less than timeoutMs, {}.  Callable will only execute once.", retryBackoffMs, timeoutMs);
+        }
+
+        if (retryBackoffMs >= timeoutMs ||
+                timeoutMs <= 0) {
+            // no retry
+            return callable.call();
+        }
+
+        long end = System.currentTimeMillis() + timeoutMs;
+        int attempt = 0;
+        Throwable lastError = null;
+        while (System.currentTimeMillis() <= end) {
+            attempt++;
+            try {
+                return callable.call();
+            } catch (RetriableException | org.apache.kafka.connect.errors.RetriableException e) {
+                log.warn("RetriableException caught on attempt {}, retrying automatically. " +
+                        "Reason: {}", attempt, e.getMessage(), e);
+                lastError = e;
+            } catch (WakeupException e) {
+                lastError = e;
+            }
+
+            // if current time is less than the ending time, no more retry is necessary
+            // won't sleep if retryBackoffMs equals to 0
+            if (retryBackoffMs > 0 && System.currentTimeMillis() < end) {
+                Utils.sleep(retryBackoffMs);
+            }
+        }
+
+        throw new ConnectException("Fail to retry the task after " + attempt + " attempts.  Reason: " + lastError.getMessage(), lastError);

Review comment:
       And this exception message could also use the description:
   ```suggestion
           throw new ConnectException("Fail to " + description.get() + " after " + attempt + " attempts.  Reason: " + lastError.getMessage(), lastError);
   ```

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
##########
@@ -76,6 +76,10 @@
     private static final Logger log = LoggerFactory.getLogger(KafkaBasedLog.class);
     private static final long CREATE_TOPIC_TIMEOUT_NS = TimeUnit.SECONDS.toNanos(30);
     private static final long MAX_SLEEP_MS = TimeUnit.SECONDS.toMillis(1);
+    // 15min of admin retry duration to ensure successful metadata propagation.  10 seconds of backoff
+    // in between retries
+    private static final Duration ADMIN_CLIENT_RETRY_DURATION = Duration.ofMinutes(15);
+    private static final long ADMIN_CLIENT_RETRY_BACKOFF_MS = 10 * 1000;

Review comment:
       Nit: our convention is to use either `TimeUnit` or `Duration` to compute the millis to make it more readable and to reduce risk of conversion errors. Using `Duration` is a bit more readable, but the lines above already use `TimeUnit`:
   ```suggestion
       private static final long ADMIN_CLIENT_RETRY_BACKOFF_MS = TimeUnit.SECONDS.toMillis(10);
   ```

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/RetryUtil.java
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.concurrent.Callable;
+
+public class RetryUtil {
+    private static final Logger log = LoggerFactory.getLogger(RetryUtil.class);
+
+    /**
+     * The method executes the callable at least once, optionally retrying the callable if
+     * {@link org.apache.kafka.connect.errors.RetriableException} is being thrown.  If timeout is exhausted,
+     * then the last exception is wrapped with a {@link org.apache.kafka.connect.errors.ConnectException} and thrown.
+     *
+     * <p>If {@code timeoutDuration} is set to 0, the task will be
+     * executed exactly once.  If {@code timeoutDuration} is less than {@code retryBackoffMs}, the callable will be
+     * executed only once.
+     *
+     * <p>If {@code retryBackoffMs} is set to 0, no wait will happen in between the retries.
+     *
+     * @param callable          the function to execute.
+     * @param timeoutDuration   timeout duration

Review comment:
       Nit: since we require a non-null `Duration`, we should state that here:
   ```suggestion
        * @param timeoutDuration   timeout duration; may not be null
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org