You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gh...@apache.org on 2024/01/12 23:46:25 UTC

(kafka) branch trunk updated: KAFKA-16051: Fixed deadlock in StandaloneHerder (#15080)

This is an automated email from the ASF dual-hosted git repository.

gharris pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 988aea3f58f KAFKA-16051: Fixed deadlock in StandaloneHerder (#15080)
988aea3f58f is described below

commit 988aea3f58f2a198dea7018c4adfd7cfd09855a3
Author: Octavian Ciubotaru <oc...@ciubotaru.net>
AuthorDate: Sat Jan 13 01:46:17 2024 +0200

    KAFKA-16051: Fixed deadlock in StandaloneHerder (#15080)
    
    Reviewers: Greg Harris <gr...@aiven.io>, Sagar Rao <sa...@gmail.com>
---
 .../runtime/standalone/StandaloneHerder.java       |  2 +-
 .../runtime/standalone/StandaloneHerderTest.java   | 57 ++++++++++++++++++++++
 2 files changed, 58 insertions(+), 1 deletion(-)

diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
index 40e19da19c7..e1386bf6ac4 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
@@ -483,7 +483,7 @@ public class StandaloneHerder extends AbstractHerder {
         }
     }
 
-    private void updateConnectorTasks(String connName) {
+    private synchronized void updateConnectorTasks(String connName) {
         if (!worker.isRunning(connName)) {
             log.info("Skipping update of tasks for connector {} since it is not running", connName);
             return;
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
index 213f029a0b2..993f5368ff7 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
@@ -1001,6 +1001,63 @@ public class StandaloneHerderTest {
         assertEquals(msg, resetOffsetsCallback.get(1000, TimeUnit.MILLISECONDS));
     }
 
+    @Test()
+    public void testRequestTaskReconfigurationDoesNotDeadlock() throws Exception {
+        connector = mock(BogusSourceConnector.class);
+        expectAdd(SourceSink.SOURCE);
+
+        // Start the connector
+        Map<String, String> config = connectorConfig(SourceSink.SOURCE);
+        Connector connectorMock = mock(SourceConnector.class);
+        expectConfigValidation(connectorMock, true, config);
+
+        herder.putConnectorConfig(CONNECTOR_NAME, config, false, createCallback);
+
+        // Wait on connector to start
+        Herder.Created<ConnectorInfo> connectorInfo = createCallback.get(1000L, TimeUnit.MILLISECONDS);
+        assertEquals(createdInfo(SourceSink.SOURCE), connectorInfo.result());
+
+        // Prepare for task config update
+        when(worker.connectorNames()).thenReturn(Collections.singleton(CONNECTOR_NAME));
+        expectStop();
+
+        // Prepare for connector and task config update
+        Map<String, String> newConfig = connectorConfig(SourceSink.SOURCE);
+        newConfig.put("dummy-connector-property", "yes");
+        final ArgumentCaptor<Callback<TargetState>> onStart = ArgumentCaptor.forClass(Callback.class);
+        doAnswer(invocation -> {
+            onStart.getValue().onCompletion(null, TargetState.STARTED);
+            return true;
+        }).when(worker).startConnector(eq(CONNECTOR_NAME), eq(newConfig), any(HerderConnectorContext.class),
+                eq(herder), eq(TargetState.STARTED), onStart.capture());
+
+        // Common invocations
+        when(worker.startSourceTask(eq(new ConnectorTaskId(CONNECTOR_NAME, 0)), any(), any(), any(), eq(herder), eq(TargetState.STARTED))).thenReturn(true);
+        Map<String, String> updatedTaskConfig1 = taskConfig(SourceSink.SOURCE);
+        updatedTaskConfig1.put("dummy-task-property", "1");
+        Map<String, String> updatedTaskConfig2 = taskConfig(SourceSink.SOURCE);
+        updatedTaskConfig2.put("dummy-task-property", "2");
+        when(worker.connectorTaskConfigs(eq(CONNECTOR_NAME), any()))
+                .thenReturn(
+                        Collections.singletonList(updatedTaskConfig1),
+                        Collections.singletonList(updatedTaskConfig2));
+
+        // Set new config on the connector and tasks
+        FutureCallback<Herder.Created<ConnectorInfo>> reconfigureCallback = new FutureCallback<>();
+        expectConfigValidation(connectorMock, false, newConfig);
+        herder.putConnectorConfig(CONNECTOR_NAME, newConfig, true, reconfigureCallback);
+
+        // Reconfigure the tasks
+        herder.requestTaskReconfiguration(CONNECTOR_NAME);
+
+        // Wait on connector update
+        Herder.Created<ConnectorInfo> updatedConnectorInfo = reconfigureCallback.get(1000L, TimeUnit.MILLISECONDS);
+        ConnectorInfo expectedConnectorInfo = new ConnectorInfo(CONNECTOR_NAME, newConfig, Arrays.asList(new ConnectorTaskId(CONNECTOR_NAME, 0)), ConnectorType.SOURCE);
+        assertEquals(expectedConnectorInfo, updatedConnectorInfo.result());
+
+        verify(statusBackingStore, times(2)).put(new TaskStatus(new ConnectorTaskId(CONNECTOR_NAME, 0), TaskStatus.State.DESTROYED, WORKER_ID, 0));
+    }
+
     private void expectAdd(SourceSink sourceSink) {
         Map<String, String> connectorProps = connectorConfig(sourceSink);
         ConnectorConfig connConfig = sourceSink == SourceSink.SOURCE ?