You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by kk...@apache.org on 2020/05/23 23:07:23 UTC
[kafka] branch 2.4 updated: KAFKA-9888: Copy connector configs
before passing to REST extensions (#8511)
This is an automated email from the ASF dual-hosted git repository.
kkarantasis pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.4 by this push:
new 0ec8cf5 KAFKA-9888: Copy connector configs before passing to REST extensions (#8511)
0ec8cf5 is described below
commit 0ec8cf591faa9d4784982ed452711466e4243a9e
Author: Chris Egerton <ch...@confluent.io>
AuthorDate: Sat May 23 15:35:43 2020 -0700
KAFKA-9888: Copy connector configs before passing to REST extensions (#8511)
The changes made in KIP-454 involved adding a `connectorConfig` method to the ConnectClusterState interface that REST extensions could use to query the worker for the configuration of a given connector. The implementation for this method returns the Java `Map` that's stored in the worker's view of the config topic (when running in distributed mode). No copying is performed, which causes mutations of that `Map` to persist across invocations of `connectorConfig` and, even worse, propaga [...]
In this commit the map is copied before it's returned to REST extensions.
An existing unit test is modified to ensure that REST extensions receive a copy of the connector config, not the original.
Reviewers: Nigel Liang <ni...@nigelliang.com>, Konstantine Karantasis <ko...@confluent.io>
---
.../kafka/connect/runtime/health/ConnectClusterStateImpl.java | 2 +-
.../apache/kafka/connect/storage/KafkaConfigBackingStore.java | 4 ++--
.../connect/runtime/health/ConnectClusterStateImplTest.java | 9 ++++++++-
3 files changed, 11 insertions(+), 4 deletions(-)
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImpl.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImpl.java
index 38362b3..6b7285d 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImpl.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImpl.java
@@ -86,7 +86,7 @@ public class ConnectClusterStateImpl implements ConnectClusterState {
FutureCallback<Map<String, String>> connectorConfigCallback = new FutureCallback<>();
herder.connectorConfig(connName, connectorConfigCallback);
try {
- return connectorConfigCallback.get(herderRequestTimeoutMs, TimeUnit.MILLISECONDS);
+ return new HashMap<>(connectorConfigCallback.get(herderRequestTimeoutMs, TimeUnit.MILLISECONDS));
} catch (InterruptedException | ExecutionException | TimeoutException e) {
throw new ConnectException(
String.format("Failed to retrieve configuration for connector '%s'", connName),
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
index 39a8f35..658d6c3 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
@@ -280,8 +280,8 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
@Override
public ClusterConfigState snapshot() {
synchronized (lock) {
- // Doing a shallow copy of the data is safe here because the complex nested data that is copied should all be
- // immutable configs
+ // Only a shallow copy is performed here; in order to avoid accidentally corrupting the worker's view
+ // of the config topic, any nested structures should be copied before making modifications
return new ClusterConfigState(
offset,
sessionKey,
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImplTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImplTest.java
index d8984f0..d8a7e49 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImplTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImplTest.java
@@ -36,6 +36,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertThrows;
@RunWith(PowerMockRunner.class)
@@ -87,7 +88,13 @@ public class ConnectClusterStateImplTest {
}
});
EasyMock.replay(herder);
- assertEquals(expectedConfig, connectClusterState.connectorConfig(connName));
+ Map<String, String> actualConfig = connectClusterState.connectorConfig(connName);
+ assertEquals(expectedConfig, actualConfig);
+ assertNotSame(
+ "Config should be copied in order to avoid mutation by REST extensions",
+ expectedConfig,
+ actualConfig
+ );
}
@Test