You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by kk...@apache.org on 2020/05/06 17:44:20 UTC

[kafka] branch 2.3 updated: KAFKA-9633: Ensure ConfigProviders are closed (#8204)

This is an automated email from the ASF dual-hosted git repository.

kkarantasis pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.3 by this push:
     new f3bfa6f  KAFKA-9633: Ensure ConfigProviders are closed (#8204)
f3bfa6f is described below

commit f3bfa6f5e151b82ebe65a2e293cfe9c4e02e7dcd
Author: Tom Bentley <to...@users.noreply.github.com>
AuthorDate: Thu Apr 30 18:33:10 2020 +0100

    KAFKA-9633: Ensure ConfigProviders are closed (#8204)
    
    ConfigProvider extends Closeable, but were not closed in the following contexts:
    * AbstractConfig
    * WorkerConfigTransformer
    * Worker
    
    This commit ensures that ConfigProviders are close in the above contexts.
    
    It also adds MockFileConfigProvider.assertClosed()
    Gradle executes test classes concurrently, so MockFileConfigProvider
    can't simply use a static field to hold its closure state.
    Instead use a protocol whereby the MockFileConfigProvider is configured
    with some unique ket identifying the test which also used when calling
    assertClosed().
    
    Reviewers: Konstantine Karantasis <ko...@confluent.io>
---
 .../apache/kafka/common/config/AbstractConfig.java |  1 +
 .../kafka/common/config/AbstractConfigTest.java    | 18 ++++++++++-
 .../config/provider/MockFileConfigProvider.java    | 35 ++++++++++++++++++++++
 .../org/apache/kafka/connect/runtime/Worker.java   |  6 ++--
 .../connect/runtime/WorkerConfigTransformer.java   | 10 ++++++-
 .../apache/kafka/connect/runtime/WorkerTest.java   | 29 ++++++++++++++++++
 6 files changed, 95 insertions(+), 4 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
index a48856f..f024732 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
@@ -481,6 +481,7 @@ public class AbstractConfig {
                 resolvedOriginals.putAll(result.data());
             }
         }
+        providers.values().forEach(x -> Utils.closeQuietly(x, "config provider"));
 
         return new ResolvingMap<>(resolvedOriginals, originals);
     }
diff --git a/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java b/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java
index b5fff6f..834278b 100644
--- a/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java
@@ -34,6 +34,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.UUID;
 
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
@@ -340,6 +341,8 @@ public class AbstractConfigTest {
         // Test Case: Valid Test Case for ConfigProviders as part of config.properties
         props.put("config.providers", "file");
         props.put("config.providers.file.class", MockFileConfigProvider.class.getName());
+        String id = UUID.randomUUID().toString();
+        props.put("config.providers.file.param.testId", id);
         props.put("prefix.ssl.truststore.location.number", 5);
         props.put("sasl.kerberos.service.name", "service name");
         props.put("sasl.kerberos.key", "${file:/usr/kerberos:key}");
@@ -349,6 +352,7 @@ public class AbstractConfigTest {
         assertEquals(config.originals().get("sasl.kerberos.password"), "randomPassword");
         assertEquals(config.originals().get("prefix.ssl.truststore.location.number"), 5);
         assertEquals(config.originals().get("sasl.kerberos.service.name"), "service name");
+        MockFileConfigProvider.assertClosed(id);
     }
 
     @Test
@@ -357,12 +361,15 @@ public class AbstractConfigTest {
         Properties providers = new Properties();
         providers.put("config.providers", "file");
         providers.put("config.providers.file.class", MockFileConfigProvider.class.getName());
+        String id = UUID.randomUUID().toString();
+        providers.put("config.providers.file.param.testId", id);
         Properties props = new Properties();
         props.put("sasl.kerberos.key", "${file:/usr/kerberos:key}");
         props.put("sasl.kerberos.password", "${file:/usr/kerberos:password}");
         TestIndirectConfigResolution config = new TestIndirectConfigResolution(props, convertPropertiesToMap(providers));
         assertEquals(config.originals().get("sasl.kerberos.key"), "testKey");
         assertEquals(config.originals().get("sasl.kerberos.password"), "randomPassword");
+        MockFileConfigProvider.assertClosed(id);
     }
 
     @Test
@@ -370,13 +377,16 @@ public class AbstractConfigTest {
         // Test Case: Valid Test Case for ConfigProviders as a separate variable
         Properties providers = new Properties();
         providers.put("config.providers", "file");
-        providers.put("config.providers.file.class", "org.apache.kafka.common.config.provider.MockFileConfigProvider");
+        providers.put("config.providers.file.class", MockFileConfigProvider.class.getName());
+        String id = UUID.randomUUID().toString();
+        providers.put("config.providers.file.param.testId", id);
         Properties props = new Properties();
         props.put("sasl.kerberos.key", "${file:/usr/kerberos:key}");
         Map<?, ?> immutableMap = Collections.unmodifiableMap(props);
         Map<String, ?> provMap = convertPropertiesToMap(providers);
         TestIndirectConfigResolution config = new TestIndirectConfigResolution(immutableMap, provMap);
         assertEquals(config.originals().get("sasl.kerberos.key"), "testKey");
+        MockFileConfigProvider.assertClosed(id);
     }
 
     @Test
@@ -385,6 +395,8 @@ public class AbstractConfigTest {
         Properties providers = new Properties();
         providers.put("config.providers", "file,vault");
         providers.put("config.providers.file.class", MockFileConfigProvider.class.getName());
+        String id = UUID.randomUUID().toString();
+        providers.put("config.providers.file.param.testId", id);
         providers.put("config.providers.vault.class", MockVaultConfigProvider.class.getName());
         Properties props = new Properties();
         props.put("sasl.kerberos.key", "${file:/usr/kerberos:key}");
@@ -396,6 +408,7 @@ public class AbstractConfigTest {
         assertEquals(config.originals().get("sasl.kerberos.password"), "randomPassword");
         assertEquals(config.originals().get("sasl.truststore.key"), "testTruststoreKey");
         assertEquals(config.originals().get("sasl.truststore.password"), "randomtruststorePassword");
+        MockFileConfigProvider.assertClosed(id);
     }
 
     @Test
@@ -429,9 +442,12 @@ public class AbstractConfigTest {
         Properties props = new Properties();
         props.put("config.providers", "test");
         props.put("config.providers.test.class", MockFileConfigProvider.class.getName());
+        String id = UUID.randomUUID().toString();
+        props.put("config.providers.test.param.testId", id);
         props.put("random", "${test:/foo/bar/testpath:random}");
         TestIndirectConfigResolution config = new TestIndirectConfigResolution(props);
         assertEquals(config.originals().get("random"), "${test:/foo/bar/testpath:random}");
+        MockFileConfigProvider.assertClosed(id);
     }
 
     @Test
diff --git a/clients/src/test/java/org/apache/kafka/common/config/provider/MockFileConfigProvider.java b/clients/src/test/java/org/apache/kafka/common/config/provider/MockFileConfigProvider.java
index e779cbe..3409096 100644
--- a/clients/src/test/java/org/apache/kafka/common/config/provider/MockFileConfigProvider.java
+++ b/clients/src/test/java/org/apache/kafka/common/config/provider/MockFileConfigProvider.java
@@ -19,11 +19,46 @@ package org.apache.kafka.common.config.provider;
 import java.io.IOException;
 import java.io.Reader;
 import java.io.StringReader;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 
 public class MockFileConfigProvider extends FileConfigProvider {
 
+    private static final Map<String, MockFileConfigProvider> INSTANCES = Collections.synchronizedMap(new HashMap<>());
+    private String id;
+    private boolean closed = false;
+
+    public void configure(Map<String, ?> configs) {
+        Object id = configs.get("testId");
+        if (id == null) {
+            throw new RuntimeException(getClass().getName() + " missing 'testId' config");
+        }
+        if (this.id != null) {
+            throw new RuntimeException(getClass().getName() + " instance was configured twice");
+        }
+        this.id = id.toString();
+        INSTANCES.put(id.toString(), this);
+    }
+
     @Override
     protected Reader reader(String path) throws IOException {
         return new StringReader("key=testKey\npassword=randomPassword");
     }
+
+    @Override
+    public synchronized void close() {
+        closed = true;
+    }
+
+    public static void assertClosed(String id) {
+        MockFileConfigProvider instance = INSTANCES.remove(id);
+        assertNotNull(instance);
+        synchronized (instance) {
+            assertTrue(instance.closed);
+        }
+    }
 }
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index 27c13de..5cfc963 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -103,8 +103,8 @@ public class Worker {
     private final ConcurrentMap<String, WorkerConnector> connectors = new ConcurrentHashMap<>();
     private final ConcurrentMap<ConnectorTaskId, WorkerTask> tasks = new ConcurrentHashMap<>();
     private SourceTaskOffsetCommitter sourceTaskOffsetCommitter;
-    private WorkerConfigTransformer workerConfigTransformer;
-    private ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy;
+    private final WorkerConfigTransformer workerConfigTransformer;
+    private final ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy;
 
     public Worker(
         String workerId,
@@ -216,6 +216,8 @@ public class Worker {
         log.info("Worker stopped");
 
         workerMetricsGroup.close();
+
+        workerConfigTransformer.close();
     }
 
     /**
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java
index 1a799bb..318626b 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.provider.ConfigProvider;
 import org.apache.kafka.common.config.ConfigTransformer;
 import org.apache.kafka.common.config.ConfigTransformerResult;
+import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.runtime.Herder.ConfigReloadAction;
 import org.apache.kafka.connect.util.Callback;
 import org.slf4j.Logger;
@@ -34,15 +35,17 @@ import java.util.concurrent.ConcurrentMap;
  * A wrapper class to perform configuration transformations and schedule reloads for any
  * retrieved TTL values.
  */
-public class WorkerConfigTransformer {
+public class WorkerConfigTransformer implements AutoCloseable {
     private static final Logger log = LoggerFactory.getLogger(WorkerConfigTransformer.class);
 
     private final Worker worker;
     private final ConfigTransformer configTransformer;
     private final ConcurrentMap<String, Map<String, HerderRequest>> requests = new ConcurrentHashMap<>();
+    private final Map<String, ConfigProvider> configProviders;
 
     public WorkerConfigTransformer(Worker worker, Map<String, ConfigProvider> configProviders) {
         this.worker = worker;
+        this.configProviders = configProviders;
         this.configTransformer = new ConfigTransformer(configProviders);
     }
 
@@ -98,4 +101,9 @@ public class WorkerConfigTransformer {
         HerderRequest request = worker.herder().restartConnector(ttl, connectorName, cb);
         connectorRequests.put(path, request);
     }
+
+    @Override
+    public void close() {
+        configProviders.values().forEach(x -> Utils.closeQuietly(x, "config provider"));
+    }
 }
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
index 40b4df2..784508e 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
@@ -24,6 +24,7 @@ import org.apache.kafka.common.Configurable;
 import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.config.provider.MockFileConfigProvider;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.connect.connector.Connector;
@@ -75,6 +76,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.UUID;
 import java.util.concurrent.ExecutorService;
 
 import static org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperatorTest.NOOP_OPERATOR;
@@ -127,6 +129,7 @@ public class WorkerTest extends ThreadedTest {
     @Mock private HeaderConverter taskHeaderConverter;
     @Mock private ExecutorService executorService;
     @MockNice private ConnectorConfig connectorConfig;
+    private String mockFileProviderTestId;
 
     @Before
     public void setup() {
@@ -139,6 +142,10 @@ public class WorkerTest extends ThreadedTest {
         workerProps.put("internal.value.converter.schemas.enable", "false");
         workerProps.put("offset.storage.file.filename", "/tmp/connect.offsets");
         workerProps.put(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
+        workerProps.put("config.providers", "file");
+        workerProps.put("config.providers.file.class", MockFileConfigProvider.class.getName());
+        mockFileProviderTestId = UUID.randomUUID().toString();
+        workerProps.put("config.providers.file.param.testId", mockFileProviderTestId);
         config = new StandaloneConfig(workerProps);
 
         defaultProducerConfigs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
@@ -182,9 +189,11 @@ public class WorkerTest extends ThreadedTest {
 
         EasyMock.expect(connector.version()).andReturn("1.0");
 
+        expectFileConfigProvider();
         EasyMock.expect(plugins.compareAndSwapLoaders(connector))
                 .andReturn(delegatingLoader)
                 .times(2);
+
         connector.initialize(anyObject(ConnectorContext.class));
         EasyMock.expectLastCall();
         connector.start(props);
@@ -230,12 +239,24 @@ public class WorkerTest extends ThreadedTest {
         assertStatistics(worker, 0, 0);
 
         PowerMock.verifyAll();
+        MockFileConfigProvider.assertClosed(mockFileProviderTestId);
+    }
+
+    private void expectFileConfigProvider() {
+        EasyMock.expect(plugins.newConfigProvider(EasyMock.anyObject(),
+                    EasyMock.eq("config.providers.file"), EasyMock.anyObject()))
+                .andAnswer(() -> {
+                    MockFileConfigProvider mockFileConfigProvider = new MockFileConfigProvider();
+                    mockFileConfigProvider.configure(Collections.singletonMap("testId", mockFileProviderTestId));
+                    return mockFileConfigProvider;
+                });
     }
 
     @Test
     public void testStartConnectorFailure() {
         expectConverters();
         expectStartStorage();
+        expectFileConfigProvider();
 
         Map<String, String> props = new HashMap<>();
         props.put(SinkConnectorConfig.TOPICS_CONFIG, "foo,bar");
@@ -280,6 +301,7 @@ public class WorkerTest extends ThreadedTest {
     public void testAddConnectorByAlias() {
         expectConverters();
         expectStartStorage();
+        expectFileConfigProvider();
 
         EasyMock.expect(plugins.currentThreadLoader()).andReturn(delegatingLoader).times(2);
         EasyMock.expect(plugins.newConnector("WorkerTestConnector")).andReturn(connector);
@@ -344,6 +366,7 @@ public class WorkerTest extends ThreadedTest {
     public void testAddConnectorByShortAlias() {
         expectConverters();
         expectStartStorage();
+        expectFileConfigProvider();
 
         EasyMock.expect(plugins.currentThreadLoader()).andReturn(delegatingLoader).times(2);
         EasyMock.expect(plugins.newConnector("WorkerTest")).andReturn(connector);
@@ -405,6 +428,7 @@ public class WorkerTest extends ThreadedTest {
     public void testStopInvalidConnector() {
         expectConverters();
         expectStartStorage();
+        expectFileConfigProvider();
 
         PowerMock.replayAll();
 
@@ -420,6 +444,7 @@ public class WorkerTest extends ThreadedTest {
     public void testReconfigureConnectorTasks() {
         expectConverters();
         expectStartStorage();
+        expectFileConfigProvider();
 
         EasyMock.expect(plugins.currentThreadLoader()).andReturn(delegatingLoader).times(3);
         EasyMock.expect(plugins.newConnector(WorkerTestConnector.class.getName()))
@@ -508,6 +533,7 @@ public class WorkerTest extends ThreadedTest {
     public void testAddRemoveTask() throws Exception {
         expectConverters();
         expectStartStorage();
+        expectFileConfigProvider();
 
         EasyMock.expect(workerTask.id()).andStubReturn(TASK_ID);
 
@@ -602,6 +628,7 @@ public class WorkerTest extends ThreadedTest {
     public void testStartTaskFailure() {
         expectConverters();
         expectStartStorage();
+        expectFileConfigProvider();
 
         Map<String, String> origProps = new HashMap<>();
         origProps.put(TaskConfig.TASK_CLASS_CONFIG, "missing.From.This.Workers.Classpath");
@@ -647,6 +674,7 @@ public class WorkerTest extends ThreadedTest {
     public void testCleanupTasksOnStop() throws Exception {
         expectConverters();
         expectStartStorage();
+        expectFileConfigProvider();
 
         EasyMock.expect(workerTask.id()).andStubReturn(TASK_ID);
 
@@ -737,6 +765,7 @@ public class WorkerTest extends ThreadedTest {
     public void testConverterOverrides() throws Exception {
         expectConverters();
         expectStartStorage();
+        expectFileConfigProvider();
 
         EasyMock.expect(workerTask.id()).andStubReturn(TASK_ID);