You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "C0urante (via GitHub)" <gi...@apache.org> on 2023/02/27 17:15:24 UTC

[GitHub] [kafka] C0urante commented on a diff in pull request #13276: KAFKA-14732: Use an exponential backoff retry mechanism while reconfiguring connector tasks

C0urante commented on code in PR #13276:
URL: https://github.com/apache/kafka/pull/13276#discussion_r1119036590


##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java:
##########
@@ -3694,6 +3694,54 @@ public void shouldThrowWhenStartAndStopExecutorThrowsRejectedExecutionExceptionA
         PowerMock.verifyAll();
     }
 
+    @Test
+    public void testTaskReconfigurationRetries() {
+        EasyMock.expect(member.memberId()).andStubReturn("leader");
+        EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
+        expectRebalance(1, Collections.emptyList(), Collections.emptyList(), true);
+        expectConfigRefreshAndSnapshot(SNAPSHOT);
+
+        // end of initial tick
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        member.wakeup();
+        PowerMock.expectLastCall();

Review Comment:
   Wakeups basically don't matter in these tests; if it's easier, feel free to append `.anyTimes()` here.



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java:
##########
@@ -3694,6 +3694,54 @@ public void shouldThrowWhenStartAndStopExecutorThrowsRejectedExecutionExceptionA
         PowerMock.verifyAll();
     }
 
+    @Test
+    public void testTaskReconfigurationRetries() {
+        EasyMock.expect(member.memberId()).andStubReturn("leader");
+        EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
+        expectRebalance(1, Collections.emptyList(), Collections.emptyList(), true);
+        expectConfigRefreshAndSnapshot(SNAPSHOT);
+
+        // end of initial tick
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        member.wakeup();
+        PowerMock.expectLastCall();
+
+        // second tick
+        member.ensureActive();
+        PowerMock.expectLastCall();
+
+        EasyMock.expect(worker.isRunning(CONN1)).andReturn(true).anyTimes();
+        EasyMock.expect(worker.getPlugins()).andReturn(plugins).anyTimes();
+
+        SinkConnectorConfig sinkConnectorConfig = new SinkConnectorConfig(plugins, CONN1_CONFIG);
+        EasyMock.expect(worker.connectorTaskConfigs(CONN1, sinkConnectorConfig))
+                .andThrow(new ConnectException("Failed to generate task configs")).anyTimes();

Review Comment:
   This test is great. I think it'd be worth it to perform a third and fourth tick. The third can be used to simulate successfully generating task configs after the two failed attempts, and the fourth can be used to ensure that we don't retry any further.
   
   It's also worth noting that we're only testing the case where `Connector::taskConfigs` (or really, `Worker::connectorTaskConfigs`) fails, but the logic that's being added here applies if intra-cluster communication fails as well (which may happen if the leader of the cluster is temporarily unavailable, for example). It'd be nice if we could have test coverage for that too, but I won't block this PR on that.



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java:
##########
@@ -3694,6 +3694,54 @@ public void shouldThrowWhenStartAndStopExecutorThrowsRejectedExecutionExceptionA
         PowerMock.verifyAll();
     }
 
+    @Test
+    public void testTaskReconfigurationRetries() {
+        EasyMock.expect(member.memberId()).andStubReturn("leader");
+        EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
+        expectRebalance(1, Collections.emptyList(), Collections.emptyList(), true);
+        expectConfigRefreshAndSnapshot(SNAPSHOT);
+
+        // end of initial tick
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        member.wakeup();
+        PowerMock.expectLastCall();
+
+        // second tick
+        member.ensureActive();
+        PowerMock.expectLastCall();
+
+        EasyMock.expect(worker.isRunning(CONN1)).andReturn(true).anyTimes();
+        EasyMock.expect(worker.getPlugins()).andReturn(plugins).anyTimes();
+
+        SinkConnectorConfig sinkConnectorConfig = new SinkConnectorConfig(plugins, CONN1_CONFIG);
+        EasyMock.expect(worker.connectorTaskConfigs(CONN1, sinkConnectorConfig))
+                .andThrow(new ConnectException("Failed to generate task configs")).anyTimes();
+
+        // task reconfiguration request with initial retry backoff
+        member.poll(EasyMock.eq(250L));
+        PowerMock.expectLastCall();
+
+        member.ensureActive();
+        PowerMock.expectLastCall();
+
+        // task reconfiguration request with double the initial retry backoff
+        member.poll(EasyMock.eq(500L));
+        PowerMock.expectLastCall();
+
+        PowerMock.replayAll();
+
+        // initial tick
+        herder.tick();
+        herder.requestTaskReconfiguration(CONN1);
+        // process the task reconfiguration request in this tick
+        herder.tick();
+        // advance the time by 250ms so that the task reconfiguration request with initial retry backoff is processed
+        time.sleep(250);
+        herder.tick();

Review Comment:
   Can't forget this part:
   
   ```suggestion
           herder.tick();
   
           PowerMock.verifyAll();
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org