You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gh...@apache.org on 2024/01/12 23:46:25 UTC
(kafka) branch trunk updated: KAFKA-16051: Fixed deadlock in StandaloneHerder (#15080)
This is an automated email from the ASF dual-hosted git repository.
gharris pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 988aea3f58f KAFKA-16051: Fixed deadlock in StandaloneHerder (#15080)
988aea3f58f is described below
commit 988aea3f58f2a198dea7018c4adfd7cfd09855a3
Author: Octavian Ciubotaru <oc...@ciubotaru.net>
AuthorDate: Sat Jan 13 01:46:17 2024 +0200
KAFKA-16051: Fixed deadlock in StandaloneHerder (#15080)
Reviewers: Greg Harris <gr...@aiven.io>, Sagar Rao <sa...@gmail.com>
---
.../runtime/standalone/StandaloneHerder.java | 2 +-
.../runtime/standalone/StandaloneHerderTest.java | 57 ++++++++++++++++++++++
2 files changed, 58 insertions(+), 1 deletion(-)
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
index 40e19da19c7..e1386bf6ac4 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
@@ -483,7 +483,7 @@ public class StandaloneHerder extends AbstractHerder {
}
}
- private void updateConnectorTasks(String connName) {
+ private synchronized void updateConnectorTasks(String connName) {
if (!worker.isRunning(connName)) {
log.info("Skipping update of tasks for connector {} since it is not running", connName);
return;
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
index 213f029a0b2..993f5368ff7 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
@@ -1001,6 +1001,63 @@ public class StandaloneHerderTest {
assertEquals(msg, resetOffsetsCallback.get(1000, TimeUnit.MILLISECONDS));
}
+ @Test()
+ public void testRequestTaskReconfigurationDoesNotDeadlock() throws Exception {
+ connector = mock(BogusSourceConnector.class);
+ expectAdd(SourceSink.SOURCE);
+
+ // Start the connector
+ Map<String, String> config = connectorConfig(SourceSink.SOURCE);
+ Connector connectorMock = mock(SourceConnector.class);
+ expectConfigValidation(connectorMock, true, config);
+
+ herder.putConnectorConfig(CONNECTOR_NAME, config, false, createCallback);
+
+ // Wait on connector to start
+ Herder.Created<ConnectorInfo> connectorInfo = createCallback.get(1000L, TimeUnit.MILLISECONDS);
+ assertEquals(createdInfo(SourceSink.SOURCE), connectorInfo.result());
+
+ // Prepare for task config update
+ when(worker.connectorNames()).thenReturn(Collections.singleton(CONNECTOR_NAME));
+ expectStop();
+
+ // Prepare for connector and task config update
+ Map<String, String> newConfig = connectorConfig(SourceSink.SOURCE);
+ newConfig.put("dummy-connector-property", "yes");
+ final ArgumentCaptor<Callback<TargetState>> onStart = ArgumentCaptor.forClass(Callback.class);
+ doAnswer(invocation -> {
+ onStart.getValue().onCompletion(null, TargetState.STARTED);
+ return true;
+ }).when(worker).startConnector(eq(CONNECTOR_NAME), eq(newConfig), any(HerderConnectorContext.class),
+ eq(herder), eq(TargetState.STARTED), onStart.capture());
+
+ // Common invocations
+ when(worker.startSourceTask(eq(new ConnectorTaskId(CONNECTOR_NAME, 0)), any(), any(), any(), eq(herder), eq(TargetState.STARTED))).thenReturn(true);
+ Map<String, String> updatedTaskConfig1 = taskConfig(SourceSink.SOURCE);
+ updatedTaskConfig1.put("dummy-task-property", "1");
+ Map<String, String> updatedTaskConfig2 = taskConfig(SourceSink.SOURCE);
+ updatedTaskConfig2.put("dummy-task-property", "2");
+ when(worker.connectorTaskConfigs(eq(CONNECTOR_NAME), any()))
+ .thenReturn(
+ Collections.singletonList(updatedTaskConfig1),
+ Collections.singletonList(updatedTaskConfig2));
+
+ // Set new config on the connector and tasks
+ FutureCallback<Herder.Created<ConnectorInfo>> reconfigureCallback = new FutureCallback<>();
+ expectConfigValidation(connectorMock, false, newConfig);
+ herder.putConnectorConfig(CONNECTOR_NAME, newConfig, true, reconfigureCallback);
+
+ // Reconfigure the tasks
+ herder.requestTaskReconfiguration(CONNECTOR_NAME);
+
+ // Wait on connector update
+ Herder.Created<ConnectorInfo> updatedConnectorInfo = reconfigureCallback.get(1000L, TimeUnit.MILLISECONDS);
+ ConnectorInfo expectedConnectorInfo = new ConnectorInfo(CONNECTOR_NAME, newConfig, Arrays.asList(new ConnectorTaskId(CONNECTOR_NAME, 0)), ConnectorType.SOURCE);
+ assertEquals(expectedConnectorInfo, updatedConnectorInfo.result());
+
+ verify(statusBackingStore, times(2)).put(new TaskStatus(new ConnectorTaskId(CONNECTOR_NAME, 0), TaskStatus.State.DESTROYED, WORKER_ID, 0));
+ }
+
private void expectAdd(SourceSink sourceSink) {
Map<String, String> connectorProps = connectorConfig(sourceSink);
ConnectorConfig connConfig = sourceSink == SourceSink.SOURCE ?