You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "mukkachaitanya (via GitHub)" <gi...@apache.org> on 2023/02/20 11:43:16 UTC

[GitHub] [kafka] mukkachaitanya commented on a diff in pull request #13276: KAFKA-14732: Use an exponential backoff retry mechanism while reconfiguring connector tasks

mukkachaitanya commented on code in PR #13276:
URL: https://github.com/apache/kafka/pull/13276#discussion_r1111817237


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##########
@@ -1870,7 +1880,16 @@ private Callable<Void> getConnectorStoppingCallable(final String connectorName)
         };
     }
 
-    private void reconfigureConnectorTasksWithRetry(long initialRequestTime, final String connName) {
+    /**
+     * Request task configs from the connector and write them to the config storage in case the configs are detected to
+     * have changed. This method retries infinitely in case of any errors.
+     *
+     * @param initialRequestTime the time in milliseconds when the original request was made (i.e. before any retries)
+     * @param connName the name of the connector
+     * @param exponentialBackoff {@link ExponentialBackoff} used to calculate the retry backoff duration
+     * @param attempts the number of retry attempts that have been made
+     */
+    private void reconfigureConnectorTasksWithRetry(long initialRequestTime, final String connName, ExponentialBackoff exponentialBackoff, int attempts) {

Review Comment:
   I see currently we are always gonna do an ExponentialBackoff. Should we simply move the logic to set up the `ExponentialBackoff`in this function? I was thinking something like
   ```java
       private void reconfigureConnectorTasksWithRetry(long initialRequestTime, final String connName, ExponentialBackoff exponentialBackoff, int attempts) {
           ExponentialBackoff exponentialBackoff = new ExponentialBackoff(
                   RECONFIGURE_CONNECTOR_TASKS_BACKOFF_INITIAL_MS,
                   2, RECONFIGURE_CONNECTOR_TASKS_BACKOFF_MAX_MS,
                   0);
           reconfigureConnectorTasksWithExpontialBackoff(initialRequestTime, connName, exponentialBackoff, attempts + 1);
   }
   ```



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##########
@@ -1870,7 +1880,16 @@ private Callable<Void> getConnectorStoppingCallable(final String connectorName)
         };
     }
 
-    private void reconfigureConnectorTasksWithRetry(long initialRequestTime, final String connName) {
+    /**
+     * Request task configs from the connector and write them to the config storage in case the configs are detected to
+     * have changed. This method retries infinitely in case of any errors.

Review Comment:
   I am curious if there is a way to not do infinite retries. If we are actually retrying infinitely, esp in the case of `startConnector` phase, then the connector just doesn't have tasks. Is it possible to somehow bubble up errors as part of connector (not task) status?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org