You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "yashmayya (via GitHub)" <gi...@apache.org> on 2023/02/18 11:58:20 UTC

[GitHub] [kafka] yashmayya opened a new pull request, #13276: KAFKA-14732: Use an exponential backoff retry mechanism while reconfiguring connector tasks

yashmayya opened a new pull request, #13276:
URL: https://github.com/apache/kafka/pull/13276

   - Kafka Connect in distributed mode currently retries infinitely with a fixed retry backoff (`250 ms`) in case of errors arising during connector task reconfiguration.
   - Tasks can be "reconfigured" during connector startup (to get the initial task configs from the connector), a connector resume or if a connector explicitly requests it via its context.
   - Task reconfiguration essentially entails requesting a connector instance for its task configs and writing them to the Connect cluster's config storage (in case a change in task configs is detected).
   - A fixed retry backoff of 250 ms leads to very aggressive retries - consider a Debezium connector which attempts to initiate a database connection in its [taskConfigs method](https://github.com/debezium/debezium/blob/bf347da71ad9b0819998a3bc9754b3cc96cc1563/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnector.java#L63). If the connection fails due to something like an invalid login, the Connect worker will essentially spam connection attempts frequently and indefinitely (until the connector config / database side configs are fixed).
   - An exponential backoff retry mechanism seems more well suited for the `DistributedHerder::reconfigureConnectorTasksWithRetry` method. The initial retry backoff is retained as `250 ms` with a chosen maximum backoff of `60000 ms`.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mukkachaitanya commented on a diff in pull request #13276: KAFKA-14732: Use an exponential backoff retry mechanism while reconfiguring connector tasks

Posted by "mukkachaitanya (via GitHub)" <gi...@apache.org>.
mukkachaitanya commented on code in PR #13276:
URL: https://github.com/apache/kafka/pull/13276#discussion_r1111817237


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##########
@@ -1870,7 +1880,16 @@ private Callable<Void> getConnectorStoppingCallable(final String connectorName)
         };
     }
 
-    private void reconfigureConnectorTasksWithRetry(long initialRequestTime, final String connName) {
+    /**
+     * Request task configs from the connector and write them to the config storage in case the configs are detected to
+     * have changed. This method retries infinitely in case of any errors.
+     *
+     * @param initialRequestTime the time in milliseconds when the original request was made (i.e. before any retries)
+     * @param connName the name of the connector
+     * @param exponentialBackoff {@link ExponentialBackoff} used to calculate the retry backoff duration
+     * @param attempts the number of retry attempts that have been made
+     */
+    private void reconfigureConnectorTasksWithRetry(long initialRequestTime, final String connName, ExponentialBackoff exponentialBackoff, int attempts) {

Review Comment:
   I see currently we are always gonna do an ExponentialBackoff. Should we simply move the logic to set up the `ExponentialBackoff`in this function? I was thinking something like
   ```java
       private void reconfigureConnectorTasksWithRetry(long initialRequestTime, final String connName, ExponentialBackoff exponentialBackoff, int attempts) {
           ExponentialBackoff exponentialBackoff = new ExponentialBackoff(
                   RECONFIGURE_CONNECTOR_TASKS_BACKOFF_INITIAL_MS,
                   2, RECONFIGURE_CONNECTOR_TASKS_BACKOFF_MAX_MS,
                   0);
           reconfigureConnectorTasksWithExpontialBackoff(initialRequestTime, connName, exponentialBackoff, attempts + 1);
   }
   ```



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##########
@@ -1870,7 +1880,16 @@ private Callable<Void> getConnectorStoppingCallable(final String connectorName)
         };
     }
 
-    private void reconfigureConnectorTasksWithRetry(long initialRequestTime, final String connName) {
+    /**
+     * Request task configs from the connector and write them to the config storage in case the configs are detected to
+     * have changed. This method retries infinitely in case of any errors.

Review Comment:
   I am curious if there is a way to not do infinite retries. If we are actually retrying infinitely, esp in the case of `startConnector` phase, then the connector just doesn't have tasks. Is it possible to somehow bubble up errors as part of connector (not task) status?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on a diff in pull request #13276: KAFKA-14732: Use an exponential backoff retry mechanism while reconfiguring connector tasks

Posted by "C0urante (via GitHub)" <gi...@apache.org>.
C0urante commented on code in PR #13276:
URL: https://github.com/apache/kafka/pull/13276#discussion_r1119036590


##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java:
##########
@@ -3694,6 +3694,54 @@ public void shouldThrowWhenStartAndStopExecutorThrowsRejectedExecutionExceptionA
         PowerMock.verifyAll();
     }
 
+    @Test
+    public void testTaskReconfigurationRetries() {
+        EasyMock.expect(member.memberId()).andStubReturn("leader");
+        EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
+        expectRebalance(1, Collections.emptyList(), Collections.emptyList(), true);
+        expectConfigRefreshAndSnapshot(SNAPSHOT);
+
+        // end of initial tick
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        member.wakeup();
+        PowerMock.expectLastCall();

Review Comment:
   Wakeups basically don't matter in these tests; if it's easier, feel free to append `.anyTimes()` here.



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java:
##########
@@ -3694,6 +3694,54 @@ public void shouldThrowWhenStartAndStopExecutorThrowsRejectedExecutionExceptionA
         PowerMock.verifyAll();
     }
 
+    @Test
+    public void testTaskReconfigurationRetries() {
+        EasyMock.expect(member.memberId()).andStubReturn("leader");
+        EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
+        expectRebalance(1, Collections.emptyList(), Collections.emptyList(), true);
+        expectConfigRefreshAndSnapshot(SNAPSHOT);
+
+        // end of initial tick
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        member.wakeup();
+        PowerMock.expectLastCall();
+
+        // second tick
+        member.ensureActive();
+        PowerMock.expectLastCall();
+
+        EasyMock.expect(worker.isRunning(CONN1)).andReturn(true).anyTimes();
+        EasyMock.expect(worker.getPlugins()).andReturn(plugins).anyTimes();
+
+        SinkConnectorConfig sinkConnectorConfig = new SinkConnectorConfig(plugins, CONN1_CONFIG);
+        EasyMock.expect(worker.connectorTaskConfigs(CONN1, sinkConnectorConfig))
+                .andThrow(new ConnectException("Failed to generate task configs")).anyTimes();

Review Comment:
   This test is great. I think it'd be worth it to perform a third and fourth tick. The third can be used to simulate successfully generating task configs after the two failed attempts, and the fourth can be used to ensure that we don't retry any further.
   
   It's also worth noting that we're only testing the case where `Connector::taskConfigs` (or really, `Worker::connectorTaskConfigs`) fails, but the logic that's being added here applies if intra-cluster communication fails as well (which may happen if the leader of the cluster is temporarily unavailable, for example). It'd be nice if we could have test coverage for that too, but I won't block this PR on that.



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java:
##########
@@ -3694,6 +3694,54 @@ public void shouldThrowWhenStartAndStopExecutorThrowsRejectedExecutionExceptionA
         PowerMock.verifyAll();
     }
 
+    @Test
+    public void testTaskReconfigurationRetries() {
+        EasyMock.expect(member.memberId()).andStubReturn("leader");
+        EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
+        expectRebalance(1, Collections.emptyList(), Collections.emptyList(), true);
+        expectConfigRefreshAndSnapshot(SNAPSHOT);
+
+        // end of initial tick
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        member.wakeup();
+        PowerMock.expectLastCall();
+
+        // second tick
+        member.ensureActive();
+        PowerMock.expectLastCall();
+
+        EasyMock.expect(worker.isRunning(CONN1)).andReturn(true).anyTimes();
+        EasyMock.expect(worker.getPlugins()).andReturn(plugins).anyTimes();
+
+        SinkConnectorConfig sinkConnectorConfig = new SinkConnectorConfig(plugins, CONN1_CONFIG);
+        EasyMock.expect(worker.connectorTaskConfigs(CONN1, sinkConnectorConfig))
+                .andThrow(new ConnectException("Failed to generate task configs")).anyTimes();
+
+        // task reconfiguration request with initial retry backoff
+        member.poll(EasyMock.eq(250L));
+        PowerMock.expectLastCall();
+
+        member.ensureActive();
+        PowerMock.expectLastCall();
+
+        // task reconfiguration request with double the initial retry backoff
+        member.poll(EasyMock.eq(500L));
+        PowerMock.expectLastCall();
+
+        PowerMock.replayAll();
+
+        // initial tick
+        herder.tick();
+        herder.requestTaskReconfiguration(CONN1);
+        // process the task reconfiguration request in this tick
+        herder.tick();
+        // advance the time by 250ms so that the task reconfiguration request with initial retry backoff is processed
+        time.sleep(250);
+        herder.tick();

Review Comment:
   Can't forget this part:
   
   ```suggestion
           herder.tick();
   
           PowerMock.verifyAll();
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] yashmayya commented on a diff in pull request #13276: KAFKA-14732: Use an exponential backoff retry mechanism while reconfiguring connector tasks

Posted by "yashmayya (via GitHub)" <gi...@apache.org>.
yashmayya commented on code in PR #13276:
URL: https://github.com/apache/kafka/pull/13276#discussion_r1119640881


##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java:
##########
@@ -3694,6 +3694,54 @@ public void shouldThrowWhenStartAndStopExecutorThrowsRejectedExecutionExceptionA
         PowerMock.verifyAll();
     }
 
+    @Test
+    public void testTaskReconfigurationRetries() {
+        EasyMock.expect(member.memberId()).andStubReturn("leader");
+        EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
+        expectRebalance(1, Collections.emptyList(), Collections.emptyList(), true);
+        expectConfigRefreshAndSnapshot(SNAPSHOT);
+
+        // end of initial tick
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        member.wakeup();
+        PowerMock.expectLastCall();
+
+        // second tick
+        member.ensureActive();
+        PowerMock.expectLastCall();
+
+        EasyMock.expect(worker.isRunning(CONN1)).andReturn(true).anyTimes();
+        EasyMock.expect(worker.getPlugins()).andReturn(plugins).anyTimes();
+
+        SinkConnectorConfig sinkConnectorConfig = new SinkConnectorConfig(plugins, CONN1_CONFIG);
+        EasyMock.expect(worker.connectorTaskConfigs(CONN1, sinkConnectorConfig))
+                .andThrow(new ConnectException("Failed to generate task configs")).anyTimes();

Review Comment:
   Thanks, I've updated the test to add another herder tick which runs a successful task reconfiguration request (I skipped the addition of another tick because the no further retries bit can be verified by the poll timeout at the end of the previous tick).
   
   Regarding the test case for the task reconfiguration REST request to the leader - I did consider that initially but while trying to add one, there were some complications (timing related issues) arising from the use of the `forwardRequestExecutor` at which point I felt like it was more trouble than it was worth. However, your comment made me revisit it and I've made some changes to drop in a simple mock executor service which runs requests synchronously (on the same thread as the caller). Let me know what you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante merged pull request #13276: KAFKA-14732: Use an exponential backoff retry mechanism while reconfiguring connector tasks

Posted by "C0urante (via GitHub)" <gi...@apache.org>.
C0urante merged PR #13276:
URL: https://github.com/apache/kafka/pull/13276


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] yashmayya commented on a diff in pull request #13276: KAFKA-14732: Use an exponential backoff retry mechanism while reconfiguring connector tasks

Posted by "yashmayya (via GitHub)" <gi...@apache.org>.
yashmayya commented on code in PR #13276:
URL: https://github.com/apache/kafka/pull/13276#discussion_r1119636919


##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java:
##########
@@ -3694,6 +3694,54 @@ public void shouldThrowWhenStartAndStopExecutorThrowsRejectedExecutionExceptionA
         PowerMock.verifyAll();
     }
 
+    @Test
+    public void testTaskReconfigurationRetries() {
+        EasyMock.expect(member.memberId()).andStubReturn("leader");
+        EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
+        expectRebalance(1, Collections.emptyList(), Collections.emptyList(), true);
+        expectConfigRefreshAndSnapshot(SNAPSHOT);
+
+        // end of initial tick
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        member.wakeup();
+        PowerMock.expectLastCall();
+
+        // second tick
+        member.ensureActive();
+        PowerMock.expectLastCall();
+
+        EasyMock.expect(worker.isRunning(CONN1)).andReturn(true).anyTimes();
+        EasyMock.expect(worker.getPlugins()).andReturn(plugins).anyTimes();
+
+        SinkConnectorConfig sinkConnectorConfig = new SinkConnectorConfig(plugins, CONN1_CONFIG);
+        EasyMock.expect(worker.connectorTaskConfigs(CONN1, sinkConnectorConfig))
+                .andThrow(new ConnectException("Failed to generate task configs")).anyTimes();
+
+        // task reconfiguration request with initial retry backoff
+        member.poll(EasyMock.eq(250L));
+        PowerMock.expectLastCall();
+
+        member.ensureActive();
+        PowerMock.expectLastCall();
+
+        // task reconfiguration request with double the initial retry backoff
+        member.poll(EasyMock.eq(500L));
+        PowerMock.expectLastCall();
+
+        PowerMock.replayAll();
+
+        // initial tick
+        herder.tick();
+        herder.requestTaskReconfiguration(CONN1);
+        // process the task reconfiguration request in this tick
+        herder.tick();
+        // advance the time by 250ms so that the task reconfiguration request with initial retry backoff is processed
+        time.sleep(250);
+        herder.tick();

Review Comment:
   Whoops, thanks 🤦



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java:
##########
@@ -3694,6 +3694,54 @@ public void shouldThrowWhenStartAndStopExecutorThrowsRejectedExecutionExceptionA
         PowerMock.verifyAll();
     }
 
+    @Test
+    public void testTaskReconfigurationRetries() {
+        EasyMock.expect(member.memberId()).andStubReturn("leader");
+        EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
+        expectRebalance(1, Collections.emptyList(), Collections.emptyList(), true);
+        expectConfigRefreshAndSnapshot(SNAPSHOT);
+
+        // end of initial tick
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        member.wakeup();
+        PowerMock.expectLastCall();
+
+        // second tick
+        member.ensureActive();
+        PowerMock.expectLastCall();
+
+        EasyMock.expect(worker.isRunning(CONN1)).andReturn(true).anyTimes();
+        EasyMock.expect(worker.getPlugins()).andReturn(plugins).anyTimes();
+
+        SinkConnectorConfig sinkConnectorConfig = new SinkConnectorConfig(plugins, CONN1_CONFIG);
+        EasyMock.expect(worker.connectorTaskConfigs(CONN1, sinkConnectorConfig))
+                .andThrow(new ConnectException("Failed to generate task configs")).anyTimes();

Review Comment:
   Thanks, I've updated the test to add another herder tick which runs a successful task reconfiguration request (I skipped the addition of another tick because the no further retries bit can be verified by the poll timeout at the end of the previous tick).
   
   Regarding the test case for the task reconfiguration REST request to the leader - I did consider that initially but while trying to add one, there were some complications (timing related issues) arising from the use of the `forwardRequestExecutor` at which point I felt like it was more trouble that it was worth. However, your comment made me revisit it and I've made some changes to drop in a simple mock executor service which runs requests synchronously (on the same thread as the caller). Let me know what you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] yashmayya commented on a diff in pull request #13276: KAFKA-14732: Use an exponential backoff retry mechanism while reconfiguring connector tasks

Posted by "yashmayya (via GitHub)" <gi...@apache.org>.
yashmayya commented on code in PR #13276:
URL: https://github.com/apache/kafka/pull/13276#discussion_r1112987230


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##########
@@ -1870,7 +1880,16 @@ private Callable<Void> getConnectorStoppingCallable(final String connectorName)
         };
     }
 
-    private void reconfigureConnectorTasksWithRetry(long initialRequestTime, final String connName) {
+    /**
+     * Request task configs from the connector and write them to the config storage in case the configs are detected to
+     * have changed. This method retries infinitely in case of any errors.

Review Comment:
   Hm, that's an interesting idea and I don't see the harm in limiting the number of retries to some reasonable value and then marking the connector as failed after that (we could include the last exception's trace in the connector's status).
   
   > Is it possible to somehow bubble up errors as part of connector (not task) status?
   
   The `AbstractHerder` (`DistributedHerder`'s parent class) implements the `ConnectorStatus.Listener` interface and so we should be able to update the connector's status to failed via https://github.com/apache/kafka/blob/81b3b2fb3399ab2784eb8158564fc1c9a1299a8d/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java#L191-L195
   
   For instance, [here](https://github.com/apache/kafka/blob/81b3b2fb3399ab2784eb8158564fc1c9a1299a8d/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1856) we use the `onFailure` hook to update a connector's status as failed if there is an exception thrown during startup. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] yashmayya commented on pull request #13276: KAFKA-14732: Use an exponential backoff retry mechanism while reconfiguring connector tasks

Posted by "yashmayya (via GitHub)" <gi...@apache.org>.
yashmayya commented on PR #13276:
URL: https://github.com/apache/kafka/pull/13276#issuecomment-1438404922

   Hi @C0urante, could you please take a look?
   
   PS - I'd be happy to move the minor Javadoc improvements to a separate PR if you'd like (we never got to a conclusion [here](https://github.com/apache/kafka/pull/12984#discussion_r1093416934) 🙂 )


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on a diff in pull request #13276: KAFKA-14732: Use an exponential backoff retry mechanism while reconfiguring connector tasks

Posted by "C0urante (via GitHub)" <gi...@apache.org>.
C0urante commented on code in PR #13276:
URL: https://github.com/apache/kafka/pull/13276#discussion_r1120259483


##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java:
##########
@@ -3694,6 +3694,54 @@ public void shouldThrowWhenStartAndStopExecutorThrowsRejectedExecutionExceptionA
         PowerMock.verifyAll();
     }
 
+    @Test
+    public void testTaskReconfigurationRetries() {
+        EasyMock.expect(member.memberId()).andStubReturn("leader");
+        EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
+        expectRebalance(1, Collections.emptyList(), Collections.emptyList(), true);
+        expectConfigRefreshAndSnapshot(SNAPSHOT);
+
+        // end of initial tick
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        member.wakeup();
+        PowerMock.expectLastCall();
+
+        // second tick
+        member.ensureActive();
+        PowerMock.expectLastCall();
+
+        EasyMock.expect(worker.isRunning(CONN1)).andReturn(true).anyTimes();
+        EasyMock.expect(worker.getPlugins()).andReturn(plugins).anyTimes();
+
+        SinkConnectorConfig sinkConnectorConfig = new SinkConnectorConfig(plugins, CONN1_CONFIG);
+        EasyMock.expect(worker.connectorTaskConfigs(CONN1, sinkConnectorConfig))
+                .andThrow(new ConnectException("Failed to generate task configs")).anyTimes();

Review Comment:
   Ah, fair point about the fourth tick!
   
   I don't love using a synchronous executor here since it diverges significantly from the non-testing behavior of the herder. But, I can't think of a better way to test this without going overboard in complexity, and it does give us decent coverage.
   
   So, good enough 👍 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org