You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by kk...@apache.org on 2020/05/23 22:36:36 UTC

[kafka] branch trunk updated: KAFKA-9888: Copy connector configs before passing to REST extensions (#8511)

This is an automated email from the ASF dual-hosted git repository.

kkarantasis pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 40ee580  KAFKA-9888: Copy connector configs before passing to REST extensions (#8511)
40ee580 is described below

commit 40ee580ed23c876dec45eca4770e834d9aed76da
Author: Chris Egerton <ch...@confluent.io>
AuthorDate: Sat May 23 15:35:43 2020 -0700

    KAFKA-9888: Copy connector configs before passing to REST extensions (#8511)
    
    The changes made in KIP-454 involved adding a `connectorConfig` method to the ConnectClusterState interface that REST extensions could use to query the worker for the configuration of a given connector. The implementation for this method returns the Java `Map` that's stored in the worker's view of the config topic (when running in distributed mode). No copying is performed, which causes mutations of that `Map` to persist across invocations of `connectorConfig` and, even worse, propaga [...]
    
    In this commit the map is copied before it's returned to REST extensions.
    
    An existing unit test is modified to ensure that REST extensions receive a copy of the connector config, not the original.
    
    Reviewers: Nigel Liang <ni...@nigelliang.com>, Konstantine Karantasis <ko...@confluent.io>
---
 .../kafka/connect/runtime/health/ConnectClusterStateImpl.java    | 2 +-
 .../apache/kafka/connect/storage/KafkaConfigBackingStore.java    | 4 ++--
 .../connect/runtime/health/ConnectClusterStateImplTest.java      | 9 ++++++++-
 3 files changed, 11 insertions(+), 4 deletions(-)

diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImpl.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImpl.java
index 38362b3..6b7285d 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImpl.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImpl.java
@@ -86,7 +86,7 @@ public class ConnectClusterStateImpl implements ConnectClusterState {
         FutureCallback<Map<String, String>> connectorConfigCallback = new FutureCallback<>();
         herder.connectorConfig(connName, connectorConfigCallback);
         try {
-            return connectorConfigCallback.get(herderRequestTimeoutMs, TimeUnit.MILLISECONDS);
+            return new HashMap<>(connectorConfigCallback.get(herderRequestTimeoutMs, TimeUnit.MILLISECONDS));
         } catch (InterruptedException | ExecutionException | TimeoutException e) {
             throw new ConnectException(
                 String.format("Failed to retrieve configuration for connector '%s'", connName),
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
index 3cf2288..b9e9831 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
@@ -281,8 +281,8 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
     @Override
     public ClusterConfigState snapshot() {
         synchronized (lock) {
-            // Doing a shallow copy of the data is safe here because the complex nested data that is copied should all be
-            // immutable configs
+            // Only a shallow copy is performed here; in order to avoid accidentally corrupting the worker's view
+            // of the config topic, any nested structures should be copied before making modifications
             return new ClusterConfigState(
                     offset,
                     sessionKey,
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImplTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImplTest.java
index d8984f0..d8a7e49 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImplTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImplTest.java
@@ -36,6 +36,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotSame;
 import static org.junit.Assert.assertThrows;
 
 @RunWith(PowerMockRunner.class)
@@ -87,7 +88,13 @@ public class ConnectClusterStateImplTest {
             }
         });
         EasyMock.replay(herder);
-        assertEquals(expectedConfig, connectClusterState.connectorConfig(connName));
+        Map<String, String> actualConfig = connectClusterState.connectorConfig(connName);
+        assertEquals(expectedConfig, actualConfig);
+        assertNotSame(
+            "Config should be copied in order to avoid mutation by REST extensions",
+            expectedConfig,
+            actualConfig
+        );
     }
 
     @Test