You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by kk...@apache.org on 2020/05/06 17:24:47 UTC
[kafka] 03/03: KAFKA-9633: Ensure ConfigProviders are closed (#8204)
This is an automated email from the ASF dual-hosted git repository.
kkarantasis pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git
commit ad7030bd658b6037842f398c2787a4dedd75e321
Author: Tom Bentley <to...@users.noreply.github.com>
AuthorDate: Thu Apr 30 18:33:10 2020 +0100
KAFKA-9633: Ensure ConfigProviders are closed (#8204)
ConfigProvider extends Closeable, but were not closed in the following contexts:
* AbstractConfig
* WorkerConfigTransformer
* Worker
This commit ensures that ConfigProviders are close in the above contexts.
It also adds MockFileConfigProvider.assertClosed()
Gradle executes test classes concurrently, so MockFileConfigProvider
can't simply use a static field to hold its closure state.
Instead use a protocol whereby the MockFileConfigProvider is configured
with some unique ket identifying the test which also used when calling
assertClosed().
Reviewers: Konstantine Karantasis <ko...@confluent.io>
---
.../apache/kafka/common/config/AbstractConfig.java | 1 +
.../kafka/common/config/AbstractConfigTest.java | 18 ++++++++++-
.../config/provider/MockFileConfigProvider.java | 35 ++++++++++++++++++++++
.../org/apache/kafka/connect/runtime/Worker.java | 16 +++++-----
.../connect/runtime/WorkerConfigTransformer.java | 10 ++++++-
.../apache/kafka/connect/runtime/WorkerTest.java | 33 ++++++++++++++++++--
6 files changed, 102 insertions(+), 11 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
index 3992a41..e91a9a5 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
@@ -483,6 +483,7 @@ public class AbstractConfig {
resolvedOriginals.putAll(result.data());
}
}
+ providers.values().forEach(x -> Utils.closeQuietly(x, "config provider"));
return new ResolvingMap<>(resolvedOriginals, originals);
}
diff --git a/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java b/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java
index b5fff6f..834278b 100644
--- a/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java
@@ -34,6 +34,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.UUID;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
@@ -340,6 +341,8 @@ public class AbstractConfigTest {
// Test Case: Valid Test Case for ConfigProviders as part of config.properties
props.put("config.providers", "file");
props.put("config.providers.file.class", MockFileConfigProvider.class.getName());
+ String id = UUID.randomUUID().toString();
+ props.put("config.providers.file.param.testId", id);
props.put("prefix.ssl.truststore.location.number", 5);
props.put("sasl.kerberos.service.name", "service name");
props.put("sasl.kerberos.key", "${file:/usr/kerberos:key}");
@@ -349,6 +352,7 @@ public class AbstractConfigTest {
assertEquals(config.originals().get("sasl.kerberos.password"), "randomPassword");
assertEquals(config.originals().get("prefix.ssl.truststore.location.number"), 5);
assertEquals(config.originals().get("sasl.kerberos.service.name"), "service name");
+ MockFileConfigProvider.assertClosed(id);
}
@Test
@@ -357,12 +361,15 @@ public class AbstractConfigTest {
Properties providers = new Properties();
providers.put("config.providers", "file");
providers.put("config.providers.file.class", MockFileConfigProvider.class.getName());
+ String id = UUID.randomUUID().toString();
+ providers.put("config.providers.file.param.testId", id);
Properties props = new Properties();
props.put("sasl.kerberos.key", "${file:/usr/kerberos:key}");
props.put("sasl.kerberos.password", "${file:/usr/kerberos:password}");
TestIndirectConfigResolution config = new TestIndirectConfigResolution(props, convertPropertiesToMap(providers));
assertEquals(config.originals().get("sasl.kerberos.key"), "testKey");
assertEquals(config.originals().get("sasl.kerberos.password"), "randomPassword");
+ MockFileConfigProvider.assertClosed(id);
}
@Test
@@ -370,13 +377,16 @@ public class AbstractConfigTest {
// Test Case: Valid Test Case for ConfigProviders as a separate variable
Properties providers = new Properties();
providers.put("config.providers", "file");
- providers.put("config.providers.file.class", "org.apache.kafka.common.config.provider.MockFileConfigProvider");
+ providers.put("config.providers.file.class", MockFileConfigProvider.class.getName());
+ String id = UUID.randomUUID().toString();
+ providers.put("config.providers.file.param.testId", id);
Properties props = new Properties();
props.put("sasl.kerberos.key", "${file:/usr/kerberos:key}");
Map<?, ?> immutableMap = Collections.unmodifiableMap(props);
Map<String, ?> provMap = convertPropertiesToMap(providers);
TestIndirectConfigResolution config = new TestIndirectConfigResolution(immutableMap, provMap);
assertEquals(config.originals().get("sasl.kerberos.key"), "testKey");
+ MockFileConfigProvider.assertClosed(id);
}
@Test
@@ -385,6 +395,8 @@ public class AbstractConfigTest {
Properties providers = new Properties();
providers.put("config.providers", "file,vault");
providers.put("config.providers.file.class", MockFileConfigProvider.class.getName());
+ String id = UUID.randomUUID().toString();
+ providers.put("config.providers.file.param.testId", id);
providers.put("config.providers.vault.class", MockVaultConfigProvider.class.getName());
Properties props = new Properties();
props.put("sasl.kerberos.key", "${file:/usr/kerberos:key}");
@@ -396,6 +408,7 @@ public class AbstractConfigTest {
assertEquals(config.originals().get("sasl.kerberos.password"), "randomPassword");
assertEquals(config.originals().get("sasl.truststore.key"), "testTruststoreKey");
assertEquals(config.originals().get("sasl.truststore.password"), "randomtruststorePassword");
+ MockFileConfigProvider.assertClosed(id);
}
@Test
@@ -429,9 +442,12 @@ public class AbstractConfigTest {
Properties props = new Properties();
props.put("config.providers", "test");
props.put("config.providers.test.class", MockFileConfigProvider.class.getName());
+ String id = UUID.randomUUID().toString();
+ props.put("config.providers.test.param.testId", id);
props.put("random", "${test:/foo/bar/testpath:random}");
TestIndirectConfigResolution config = new TestIndirectConfigResolution(props);
assertEquals(config.originals().get("random"), "${test:/foo/bar/testpath:random}");
+ MockFileConfigProvider.assertClosed(id);
}
@Test
diff --git a/clients/src/test/java/org/apache/kafka/common/config/provider/MockFileConfigProvider.java b/clients/src/test/java/org/apache/kafka/common/config/provider/MockFileConfigProvider.java
index e779cbe..3409096 100644
--- a/clients/src/test/java/org/apache/kafka/common/config/provider/MockFileConfigProvider.java
+++ b/clients/src/test/java/org/apache/kafka/common/config/provider/MockFileConfigProvider.java
@@ -19,11 +19,46 @@ package org.apache.kafka.common.config.provider;
import java.io.IOException;
import java.io.Reader;
import java.io.StringReader;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
public class MockFileConfigProvider extends FileConfigProvider {
+ private static final Map<String, MockFileConfigProvider> INSTANCES = Collections.synchronizedMap(new HashMap<>());
+ private String id;
+ private boolean closed = false;
+
+ public void configure(Map<String, ?> configs) {
+ Object id = configs.get("testId");
+ if (id == null) {
+ throw new RuntimeException(getClass().getName() + " missing 'testId' config");
+ }
+ if (this.id != null) {
+ throw new RuntimeException(getClass().getName() + " instance was configured twice");
+ }
+ this.id = id.toString();
+ INSTANCES.put(id.toString(), this);
+ }
+
@Override
protected Reader reader(String path) throws IOException {
return new StringReader("key=testKey\npassword=randomPassword");
}
+
+ @Override
+ public synchronized void close() {
+ closed = true;
+ }
+
+ public static void assertClosed(String id) {
+ MockFileConfigProvider instance = INSTANCES.remove(id);
+ assertNotNull(instance);
+ synchronized (instance) {
+ assertTrue(instance.closed);
+ }
+ }
}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index 3c4ece7..3cf4d01 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -104,8 +104,8 @@ public class Worker {
private final ConcurrentMap<String, WorkerConnector> connectors = new ConcurrentHashMap<>();
private final ConcurrentMap<ConnectorTaskId, WorkerTask> tasks = new ConcurrentHashMap<>();
private SourceTaskOffsetCommitter sourceTaskOffsetCommitter;
- private WorkerConfigTransformer workerConfigTransformer;
- private ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy;
+ private final WorkerConfigTransformer workerConfigTransformer;
+ private final ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy;
public Worker(
String workerId,
@@ -220,6 +220,8 @@ public class Worker {
workerMetricsGroup.close();
connectorStatusMetricsGroup.close();
+
+ workerConfigTransformer.close();
}
/**
@@ -854,11 +856,11 @@ public class Worker {
}
static class ConnectorStatusMetricsGroup {
- private ConnectMetrics connectMetrics;
- private ConnectMetricsRegistry registry;
- private ConcurrentMap<String, MetricGroup> connectorStatusMetrics = new ConcurrentHashMap<>();
- private Herder herder;
- private ConcurrentMap<ConnectorTaskId, WorkerTask> tasks;
+ private final ConnectMetrics connectMetrics;
+ private final ConnectMetricsRegistry registry;
+ private final ConcurrentMap<String, MetricGroup> connectorStatusMetrics = new ConcurrentHashMap<>();
+ private final Herder herder;
+ private final ConcurrentMap<ConnectorTaskId, WorkerTask> tasks;
protected ConnectorStatusMetricsGroup(
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java
index 1a799bb..318626b 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.provider.ConfigProvider;
import org.apache.kafka.common.config.ConfigTransformer;
import org.apache.kafka.common.config.ConfigTransformerResult;
+import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.runtime.Herder.ConfigReloadAction;
import org.apache.kafka.connect.util.Callback;
import org.slf4j.Logger;
@@ -34,15 +35,17 @@ import java.util.concurrent.ConcurrentMap;
* A wrapper class to perform configuration transformations and schedule reloads for any
* retrieved TTL values.
*/
-public class WorkerConfigTransformer {
+public class WorkerConfigTransformer implements AutoCloseable {
private static final Logger log = LoggerFactory.getLogger(WorkerConfigTransformer.class);
private final Worker worker;
private final ConfigTransformer configTransformer;
private final ConcurrentMap<String, Map<String, HerderRequest>> requests = new ConcurrentHashMap<>();
+ private final Map<String, ConfigProvider> configProviders;
public WorkerConfigTransformer(Worker worker, Map<String, ConfigProvider> configProviders) {
this.worker = worker;
+ this.configProviders = configProviders;
this.configTransformer = new ConfigTransformer(configProviders);
}
@@ -98,4 +101,9 @@ public class WorkerConfigTransformer {
HerderRequest request = worker.herder().restartConnector(ttl, connectorName, cb);
connectorRequests.put(path, request);
}
+
+ @Override
+ public void close() {
+ configProviders.values().forEach(x -> Utils.closeQuietly(x, "config provider"));
+ }
}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
index 7021503..e7ffd60 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
@@ -24,6 +24,7 @@ import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.config.provider.MockFileConfigProvider;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.connector.Connector;
@@ -76,6 +77,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
@@ -132,6 +134,7 @@ public class WorkerTest extends ThreadedTest {
@Mock private HeaderConverter taskHeaderConverter;
@Mock private ExecutorService executorService;
@MockNice private ConnectorConfig connectorConfig;
+ private String mockFileProviderTestId;
@Before
public void setup() {
@@ -144,6 +147,10 @@ public class WorkerTest extends ThreadedTest {
workerProps.put("internal.value.converter.schemas.enable", "false");
workerProps.put("offset.storage.file.filename", "/tmp/connect.offsets");
workerProps.put(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
+ workerProps.put("config.providers", "file");
+ workerProps.put("config.providers.file.class", MockFileConfigProvider.class.getName());
+ mockFileProviderTestId = UUID.randomUUID().toString();
+ workerProps.put("config.providers.file.param.testId", mockFileProviderTestId);
config = new StandaloneConfig(workerProps);
defaultProducerConfigs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
@@ -187,9 +194,11 @@ public class WorkerTest extends ThreadedTest {
EasyMock.expect(connector.version()).andReturn("1.0");
+ expectFileConfigProvider();
EasyMock.expect(plugins.compareAndSwapLoaders(connector))
.andReturn(delegatingLoader)
.times(2);
+
connector.initialize(anyObject(ConnectorContext.class));
EasyMock.expectLastCall();
connector.start(props);
@@ -235,12 +244,24 @@ public class WorkerTest extends ThreadedTest {
assertStatistics(worker, 0, 0);
PowerMock.verifyAll();
+ MockFileConfigProvider.assertClosed(mockFileProviderTestId);
+ }
+
+ private void expectFileConfigProvider() {
+ EasyMock.expect(plugins.newConfigProvider(EasyMock.anyObject(),
+ EasyMock.eq("config.providers.file"), EasyMock.anyObject()))
+ .andAnswer(() -> {
+ MockFileConfigProvider mockFileConfigProvider = new MockFileConfigProvider();
+ mockFileConfigProvider.configure(Collections.singletonMap("testId", mockFileProviderTestId));
+ return mockFileConfigProvider;
+ });
}
@Test
public void testStartConnectorFailure() {
expectConverters();
expectStartStorage();
+ expectFileConfigProvider();
Map<String, String> props = new HashMap<>();
props.put(SinkConnectorConfig.TOPICS_CONFIG, "foo,bar");
@@ -285,6 +306,7 @@ public class WorkerTest extends ThreadedTest {
public void testAddConnectorByAlias() {
expectConverters();
expectStartStorage();
+ expectFileConfigProvider();
EasyMock.expect(plugins.currentThreadLoader()).andReturn(delegatingLoader).times(2);
EasyMock.expect(plugins.newConnector("WorkerTestConnector")).andReturn(connector);
@@ -349,6 +371,7 @@ public class WorkerTest extends ThreadedTest {
public void testAddConnectorByShortAlias() {
expectConverters();
expectStartStorage();
+ expectFileConfigProvider();
EasyMock.expect(plugins.currentThreadLoader()).andReturn(delegatingLoader).times(2);
EasyMock.expect(plugins.newConnector("WorkerTest")).andReturn(connector);
@@ -410,6 +433,7 @@ public class WorkerTest extends ThreadedTest {
public void testStopInvalidConnector() {
expectConverters();
expectStartStorage();
+ expectFileConfigProvider();
PowerMock.replayAll();
@@ -425,6 +449,7 @@ public class WorkerTest extends ThreadedTest {
public void testReconfigureConnectorTasks() {
expectConverters();
expectStartStorage();
+ expectFileConfigProvider();
EasyMock.expect(plugins.currentThreadLoader()).andReturn(delegatingLoader).times(3);
EasyMock.expect(plugins.newConnector(WorkerTestConnector.class.getName()))
@@ -513,6 +538,7 @@ public class WorkerTest extends ThreadedTest {
public void testAddRemoveTask() throws Exception {
expectConverters();
expectStartStorage();
+ expectFileConfigProvider();
EasyMock.expect(workerTask.id()).andStubReturn(TASK_ID);
@@ -607,6 +633,7 @@ public class WorkerTest extends ThreadedTest {
public void testTaskStatusMetricsStatuses() throws Exception {
expectConverters();
expectStartStorage();
+ expectFileConfigProvider();
EasyMock.expect(workerTask.id()).andStubReturn(TASK_ID);
@@ -738,10 +765,9 @@ public class WorkerTest extends ThreadedTest {
tasks.put(new ConnectorTaskId("c1", 1), workerTask);
tasks.put(new ConnectorTaskId("c2", 0), workerTask);
-
expectConverters();
-
expectStartStorage();
+ expectFileConfigProvider();
EasyMock.expect(Plugins.compareAndSwapLoaders(pluginLoader)).andReturn(delegatingLoader);
EasyMock.expect(Plugins.compareAndSwapLoaders(pluginLoader)).andReturn(delegatingLoader);
@@ -772,6 +798,7 @@ public class WorkerTest extends ThreadedTest {
public void testStartTaskFailure() {
expectConverters();
expectStartStorage();
+ expectFileConfigProvider();
Map<String, String> origProps = new HashMap<>();
origProps.put(TaskConfig.TASK_CLASS_CONFIG, "missing.From.This.Workers.Classpath");
@@ -817,6 +844,7 @@ public class WorkerTest extends ThreadedTest {
public void testCleanupTasksOnStop() throws Exception {
expectConverters();
expectStartStorage();
+ expectFileConfigProvider();
EasyMock.expect(workerTask.id()).andStubReturn(TASK_ID);
@@ -907,6 +935,7 @@ public class WorkerTest extends ThreadedTest {
public void testConverterOverrides() throws Exception {
expectConverters();
expectStartStorage();
+ expectFileConfigProvider();
EasyMock.expect(workerTask.id()).andStubReturn(TASK_ID);