You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/11/23 15:24:52 UTC

[GitHub] [kafka] C0urante commented on a diff in pull request #12876: KAFKA-12476: Prevent herder tick thread from sleeping excessively after slow operations

C0urante commented on code in PR #12876:
URL: https://github.com/apache/kafka/pull/12876#discussion_r1030468497


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##########
@@ -436,15 +436,15 @@ public void tick() {
         //       Another example: if multiple configurations are submitted for the same connector,
         //       the only one that actually has to be written to the config topic is the
         //       most-recently one.
-        long nextRequestTimeoutMs = Long.MAX_VALUE;
+        long scheduledTick = Long.MAX_VALUE;
         while (true) {
             final DistributedHerderRequest next = peekWithoutException();
             if (next == null) {
                 break;
             } else if (now >= next.at) {
                 requests.pollFirst();
             } else {
-                nextRequestTimeoutMs = next.at - now;
+                scheduledTick = Math.min(scheduledTick, next.at);

Review Comment:
   We will only reach this line if `scheduledTick` is `Long.MAX_VALUE`:
   
   ```suggestion
                   scheduledTick = next.at;
   ```



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java:
##########
@@ -3735,6 +3750,19 @@ private ClusterConfigState exactlyOnceSnapshot(
                 taskConfigs, taskCountRecords, taskConfigGenerations, pendingFencing, Collections.emptySet());
     }
 
+    private void expectQueueHerderOperation() {
+        member.wakeup();
+        PowerMock.expectLastCall();
+    }

Review Comment:
   IMO this isn't really an improvement in readability. If we want to change two-liners that involve `PowerMock::expectLastCall` to one-liners, we can do that with a utility method like this:
   
   ```java
   private void expectCall(Runnable call) {
       call.run();
       PowerMock.expectLastCall();
   }
   ```
   
   which can be used in place of `expectQueueHerderOperation` like this:
   
   ```java
   expectCall(member::wakeup);
   ```



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java:
##########
@@ -1538,6 +1506,11 @@ public void testDoRestartConnectorAndTasksOnlyConnector() {
         Capture<Callback<TargetState>>  stateCallback = newCapture();
         worker.startConnector(EasyMock.eq(CONN1), EasyMock.anyObject(), EasyMock.anyObject(),
                 EasyMock.eq(herder), EasyMock.anyObject(TargetState.class), capture(stateCallback));
+        PowerMock.expectLastCall().andAnswer(() -> {
+            stateCallback.getValue().onCompletion(null, TargetState.STARTED);
+            return true;
+        });
+        expectQueueHerderOperation();

Review Comment:
   This took a few minutes to grok. A comment might help:
   
   ```suggestion
           // Once the Connector is restarted, we should request task configurations from it, which requires waking up the herder tick thread
           expectQueueHerderOperation();
   ```



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java:
##########
@@ -2179,6 +2135,7 @@ public void testConnectorPausedRunningTaskOnly() throws Exception {
         herder.tick(); // join
         configUpdateListener.onConnectorTargetStateChange(CONN1); // state changes to paused
         herder.tick(); // apply state change
+        herder.tick();

Review Comment:
   Why add this extra tick and expectations for `member::poll` and `member::ensureActive`?
   
   FWIW, I think there's a bug in this test where the state change callback is invoked with `STARTED` when it should be `PAUSED`.



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java:
##########
@@ -3594,6 +3543,72 @@ public void testHerderStopServicesClosesUponShutdown() {
         assertEquals(0, shutdownCalled.getCount());
     }
 
+    @Test
+    public void testPollDurationOnSlowConnectorOperations() {
+        connectProtocolVersion = CONNECT_PROTOCOL_V1;
+        // If an operation during tick() takes some amount of time, that time should count against the rebalance delay
+        int rebalanceDelayMs = 20000;
+        long operationDelayMs = 10000;
+        long maxPollWaitMs = rebalanceDelayMs - operationDelayMs;

Review Comment:
   It helps clarify intent with these cases if these constants declared at the beginning are marked `final`:
   ```suggestion
           final int rebalanceDelayMs = 20000;
           final long operationDelayMs = 10000;
           final long maxPollWaitMs = rebalanceDelayMs - operationDelayMs;
   ```



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java:
##########
@@ -2717,8 +2669,7 @@ public void testKeyRotationWhenWorkerBecomesLeader() throws Exception {
             return null;
         });
         // Third rebalance: poll for a limited time as worker has become leader and must wake up for key expiration
-        Capture<Long> pollTimeout = EasyMock.newCapture();
-        member.poll(EasyMock.captureLong(pollTimeout));
+        member.poll(leq(rotationTtlDelay));

Review Comment:
   This is much cleaner than the `Capture`-based approach 👍



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java:
##########
@@ -3594,6 +3543,72 @@ public void testHerderStopServicesClosesUponShutdown() {
         assertEquals(0, shutdownCalled.getCount());
     }
 
+    @Test
+    public void testPollDurationOnSlowConnectorOperations() {
+        connectProtocolVersion = CONNECT_PROTOCOL_V1;
+        // If an operation during tick() takes some amount of time, that time should count against the rebalance delay
+        int rebalanceDelayMs = 20000;
+        long operationDelayMs = 10000;
+        long maxPollWaitMs = rebalanceDelayMs - operationDelayMs;
+        EasyMock.expect(member.memberId()).andStubReturn("member");
+        EasyMock.expect(member.currentProtocolVersion()).andStubReturn(connectProtocolVersion);
+
+        // Assign the connector to this worker, and have it start
+        expectRebalance(Collections.emptyList(), Collections.emptyList(), ConnectProtocol.Assignment.NO_ERROR, 1, Arrays.asList(CONN1), Collections.emptyList(), rebalanceDelayMs);
+        expectConfigRefreshAndSnapshot(SNAPSHOT);
+        Capture<Callback<TargetState>> onFirstStart = newCapture();
+        worker.startConnector(EasyMock.eq(CONN1), EasyMock.anyObject(), EasyMock.anyObject(),
+                EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED), capture(onFirstStart));
+        PowerMock.expectLastCall().andAnswer(() -> {
+            onFirstStart.getValue().onCompletion(null, TargetState.STARTED);
+            time.sleep(operationDelayMs);
+            return true;
+        });
+        expectQueueHerderOperation();
+        expectExecuteTaskReconfiguration(true, conn1SinkConfig, () -> TASK_CONFIGS);
+        // We should poll for less than the delay - time to start the connector, meaning that a long connector start
+        // does not delay the poll timeout
+        member.poll(leq(maxPollWaitMs));
+        PowerMock.expectLastCall();
+
+        // Rebalance again due to config update
+        expectQueueHerderOperation();
+        expectRebalance(Collections.emptyList(), Collections.emptyList(), ConnectProtocol.Assignment.NO_ERROR, 1, Arrays.asList(CONN1), Collections.emptyList(), rebalanceDelayMs);
+        EasyMock.expect(configBackingStore.snapshot()).andReturn(SNAPSHOT_UPDATED_CONN1_CONFIG);
+
+        worker.stopAndAwaitConnector(CONN1);
+        PowerMock.expectLastCall();
+        EasyMock.expect(member.currentProtocolVersion()).andStubReturn(connectProtocolVersion);
+        Capture<Callback<TargetState>> onSecondStart = newCapture();
+        worker.startConnector(EasyMock.eq(CONN1), EasyMock.anyObject(), EasyMock.anyObject(),
+                EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED), capture(onSecondStart));
+        PowerMock.expectLastCall().andAnswer(() -> {
+            onSecondStart.getValue().onCompletion(null, TargetState.STARTED);
+            time.sleep(operationDelayMs);

Review Comment:
   Same thought RE switching the ordering here



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java:
##########
@@ -3735,6 +3750,19 @@ private ClusterConfigState exactlyOnceSnapshot(
                 taskConfigs, taskCountRecords, taskConfigGenerations, pendingFencing, Collections.emptySet());
     }
 
+    private void expectQueueHerderOperation() {
+        member.wakeup();
+        PowerMock.expectLastCall();
+    }
+
+    private void expectExecuteTaskReconfiguration(boolean running, ConnectorConfig connectorConfig, IAnswer<List<Map<String, String>>> answer) {
+        EasyMock.expect(worker.isRunning(CONN1)).andReturn(running);
+        if (running) {
+            EasyMock.expect(worker.getPlugins()).andReturn(plugins);
+            EasyMock.expect(worker.connectorTaskConfigs(CONN1, connectorConfig)).andAnswer(answer);
+        }
+    }

Review Comment:
   OTOH, this is great 👍



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java:
##########
@@ -2607,20 +2559,17 @@ public void testPutConnectorConfig() throws Exception {
             onFirstStart.getValue().onCompletion(null, TargetState.STARTED);
             return true;
         });
-        EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
-        EasyMock.expect(worker.connectorTaskConfigs(CONN1, conn1SinkConfig)).andReturn(TASK_CONFIGS);
+        expectQueueHerderOperation();
+        expectExecuteTaskReconfiguration(true, conn1SinkConfig, () -> TASK_CONFIGS);
 
         // list connectors, get connector info, get connector config, get task configs

Review Comment:
   This comment explains why we expect `member::wakeup` to be invoked any times; we should remove it if we're going to set an exact expectation for the number of calls.



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java:
##########
@@ -2690,13 +2641,14 @@ public void testPutConnectorConfig() throws Exception {
 
     @Test
     public void testKeyRotationWhenWorkerBecomesLeader() throws Exception {
+        long rotationTtlDelay = DistributedConfig.INTER_WORKER_KEY_TTL_MS_MS_DEFAULT;
         EasyMock.expect(member.memberId()).andStubReturn("member");
         EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V2);
 
         expectRebalance(1, Collections.emptyList(), Collections.emptyList());
         expectConfigRefreshAndSnapshot(SNAPSHOT);
         // First rebalance: poll indefinitely as no key has been read yet, so expiration doesn't come into play
-        member.poll(Long.MAX_VALUE);
+        member.poll(geq(rotationTtlDelay));

Review Comment:
   I see why we had to change this (we now poll with a duration of `scheduledTick - now`, and even if `scheduledTick` is `Long.MAX_VALUE`, `now` is always non-zero).
   
   It's nice to be able to verify that we don't plan on waking ourselves up from a poll by asserting that we're polling for `Long.MAX_VALUE`. WDYT about making `scheduledTick` a nullable `Long` instead of a primitive `long`, and then setting `nextRequestTimeoutMs` to `Long.MAX_VALUE` if it's null near the bottom of `DistributedHerder::tick`?



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java:
##########
@@ -3594,6 +3543,72 @@ public void testHerderStopServicesClosesUponShutdown() {
         assertEquals(0, shutdownCalled.getCount());
     }
 
+    @Test
+    public void testPollDurationOnSlowConnectorOperations() {
+        connectProtocolVersion = CONNECT_PROTOCOL_V1;
+        // If an operation during tick() takes some amount of time, that time should count against the rebalance delay
+        int rebalanceDelayMs = 20000;
+        long operationDelayMs = 10000;
+        long maxPollWaitMs = rebalanceDelayMs - operationDelayMs;
+        EasyMock.expect(member.memberId()).andStubReturn("member");
+        EasyMock.expect(member.currentProtocolVersion()).andStubReturn(connectProtocolVersion);
+
+        // Assign the connector to this worker, and have it start
+        expectRebalance(Collections.emptyList(), Collections.emptyList(), ConnectProtocol.Assignment.NO_ERROR, 1, Arrays.asList(CONN1), Collections.emptyList(), rebalanceDelayMs);
+        expectConfigRefreshAndSnapshot(SNAPSHOT);
+        Capture<Callback<TargetState>> onFirstStart = newCapture();
+        worker.startConnector(EasyMock.eq(CONN1), EasyMock.anyObject(), EasyMock.anyObject(),
+                EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED), capture(onFirstStart));
+        PowerMock.expectLastCall().andAnswer(() -> {
+            onFirstStart.getValue().onCompletion(null, TargetState.STARTED);
+            time.sleep(operationDelayMs);

Review Comment:
   If we're simulating a long-running connector start, probably best to invoke the callback after our fake delay?
   ```suggestion
               time.sleep(operationDelayMs);
               onFirstStart.getValue().onCompletion(null, TargetState.STARTED);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org