You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by kk...@apache.org on 2020/05/06 17:44:20 UTC
[kafka] branch 2.3 updated: KAFKA-9633: Ensure ConfigProviders are
closed (#8204)
This is an automated email from the ASF dual-hosted git repository.
kkarantasis pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.3 by this push:
new f3bfa6f KAFKA-9633: Ensure ConfigProviders are closed (#8204)
f3bfa6f is described below
commit f3bfa6f5e151b82ebe65a2e293cfe9c4e02e7dcd
Author: Tom Bentley <to...@users.noreply.github.com>
AuthorDate: Thu Apr 30 18:33:10 2020 +0100
KAFKA-9633: Ensure ConfigProviders are closed (#8204)
ConfigProvider extends Closeable, but were not closed in the following contexts:
* AbstractConfig
* WorkerConfigTransformer
* Worker
This commit ensures that ConfigProviders are close in the above contexts.
It also adds MockFileConfigProvider.assertClosed()
Gradle executes test classes concurrently, so MockFileConfigProvider
can't simply use a static field to hold its closure state.
Instead use a protocol whereby the MockFileConfigProvider is configured
with some unique ket identifying the test which also used when calling
assertClosed().
Reviewers: Konstantine Karantasis <ko...@confluent.io>
---
.../apache/kafka/common/config/AbstractConfig.java | 1 +
.../kafka/common/config/AbstractConfigTest.java | 18 ++++++++++-
.../config/provider/MockFileConfigProvider.java | 35 ++++++++++++++++++++++
.../org/apache/kafka/connect/runtime/Worker.java | 6 ++--
.../connect/runtime/WorkerConfigTransformer.java | 10 ++++++-
.../apache/kafka/connect/runtime/WorkerTest.java | 29 ++++++++++++++++++
6 files changed, 95 insertions(+), 4 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
index a48856f..f024732 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
@@ -481,6 +481,7 @@ public class AbstractConfig {
resolvedOriginals.putAll(result.data());
}
}
+ providers.values().forEach(x -> Utils.closeQuietly(x, "config provider"));
return new ResolvingMap<>(resolvedOriginals, originals);
}
diff --git a/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java b/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java
index b5fff6f..834278b 100644
--- a/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java
@@ -34,6 +34,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.UUID;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
@@ -340,6 +341,8 @@ public class AbstractConfigTest {
// Test Case: Valid Test Case for ConfigProviders as part of config.properties
props.put("config.providers", "file");
props.put("config.providers.file.class", MockFileConfigProvider.class.getName());
+ String id = UUID.randomUUID().toString();
+ props.put("config.providers.file.param.testId", id);
props.put("prefix.ssl.truststore.location.number", 5);
props.put("sasl.kerberos.service.name", "service name");
props.put("sasl.kerberos.key", "${file:/usr/kerberos:key}");
@@ -349,6 +352,7 @@ public class AbstractConfigTest {
assertEquals(config.originals().get("sasl.kerberos.password"), "randomPassword");
assertEquals(config.originals().get("prefix.ssl.truststore.location.number"), 5);
assertEquals(config.originals().get("sasl.kerberos.service.name"), "service name");
+ MockFileConfigProvider.assertClosed(id);
}
@Test
@@ -357,12 +361,15 @@ public class AbstractConfigTest {
Properties providers = new Properties();
providers.put("config.providers", "file");
providers.put("config.providers.file.class", MockFileConfigProvider.class.getName());
+ String id = UUID.randomUUID().toString();
+ providers.put("config.providers.file.param.testId", id);
Properties props = new Properties();
props.put("sasl.kerberos.key", "${file:/usr/kerberos:key}");
props.put("sasl.kerberos.password", "${file:/usr/kerberos:password}");
TestIndirectConfigResolution config = new TestIndirectConfigResolution(props, convertPropertiesToMap(providers));
assertEquals(config.originals().get("sasl.kerberos.key"), "testKey");
assertEquals(config.originals().get("sasl.kerberos.password"), "randomPassword");
+ MockFileConfigProvider.assertClosed(id);
}
@Test
@@ -370,13 +377,16 @@ public class AbstractConfigTest {
// Test Case: Valid Test Case for ConfigProviders as a separate variable
Properties providers = new Properties();
providers.put("config.providers", "file");
- providers.put("config.providers.file.class", "org.apache.kafka.common.config.provider.MockFileConfigProvider");
+ providers.put("config.providers.file.class", MockFileConfigProvider.class.getName());
+ String id = UUID.randomUUID().toString();
+ providers.put("config.providers.file.param.testId", id);
Properties props = new Properties();
props.put("sasl.kerberos.key", "${file:/usr/kerberos:key}");
Map<?, ?> immutableMap = Collections.unmodifiableMap(props);
Map<String, ?> provMap = convertPropertiesToMap(providers);
TestIndirectConfigResolution config = new TestIndirectConfigResolution(immutableMap, provMap);
assertEquals(config.originals().get("sasl.kerberos.key"), "testKey");
+ MockFileConfigProvider.assertClosed(id);
}
@Test
@@ -385,6 +395,8 @@ public class AbstractConfigTest {
Properties providers = new Properties();
providers.put("config.providers", "file,vault");
providers.put("config.providers.file.class", MockFileConfigProvider.class.getName());
+ String id = UUID.randomUUID().toString();
+ providers.put("config.providers.file.param.testId", id);
providers.put("config.providers.vault.class", MockVaultConfigProvider.class.getName());
Properties props = new Properties();
props.put("sasl.kerberos.key", "${file:/usr/kerberos:key}");
@@ -396,6 +408,7 @@ public class AbstractConfigTest {
assertEquals(config.originals().get("sasl.kerberos.password"), "randomPassword");
assertEquals(config.originals().get("sasl.truststore.key"), "testTruststoreKey");
assertEquals(config.originals().get("sasl.truststore.password"), "randomtruststorePassword");
+ MockFileConfigProvider.assertClosed(id);
}
@Test
@@ -429,9 +442,12 @@ public class AbstractConfigTest {
Properties props = new Properties();
props.put("config.providers", "test");
props.put("config.providers.test.class", MockFileConfigProvider.class.getName());
+ String id = UUID.randomUUID().toString();
+ props.put("config.providers.test.param.testId", id);
props.put("random", "${test:/foo/bar/testpath:random}");
TestIndirectConfigResolution config = new TestIndirectConfigResolution(props);
assertEquals(config.originals().get("random"), "${test:/foo/bar/testpath:random}");
+ MockFileConfigProvider.assertClosed(id);
}
@Test
diff --git a/clients/src/test/java/org/apache/kafka/common/config/provider/MockFileConfigProvider.java b/clients/src/test/java/org/apache/kafka/common/config/provider/MockFileConfigProvider.java
index e779cbe..3409096 100644
--- a/clients/src/test/java/org/apache/kafka/common/config/provider/MockFileConfigProvider.java
+++ b/clients/src/test/java/org/apache/kafka/common/config/provider/MockFileConfigProvider.java
@@ -19,11 +19,46 @@ package org.apache.kafka.common.config.provider;
import java.io.IOException;
import java.io.Reader;
import java.io.StringReader;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
public class MockFileConfigProvider extends FileConfigProvider {
+ private static final Map<String, MockFileConfigProvider> INSTANCES = Collections.synchronizedMap(new HashMap<>());
+ private String id;
+ private boolean closed = false;
+
+ public void configure(Map<String, ?> configs) {
+ Object id = configs.get("testId");
+ if (id == null) {
+ throw new RuntimeException(getClass().getName() + " missing 'testId' config");
+ }
+ if (this.id != null) {
+ throw new RuntimeException(getClass().getName() + " instance was configured twice");
+ }
+ this.id = id.toString();
+ INSTANCES.put(id.toString(), this);
+ }
+
@Override
protected Reader reader(String path) throws IOException {
return new StringReader("key=testKey\npassword=randomPassword");
}
+
+ @Override
+ public synchronized void close() {
+ closed = true;
+ }
+
+ public static void assertClosed(String id) {
+ MockFileConfigProvider instance = INSTANCES.remove(id);
+ assertNotNull(instance);
+ synchronized (instance) {
+ assertTrue(instance.closed);
+ }
+ }
}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index 27c13de..5cfc963 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -103,8 +103,8 @@ public class Worker {
private final ConcurrentMap<String, WorkerConnector> connectors = new ConcurrentHashMap<>();
private final ConcurrentMap<ConnectorTaskId, WorkerTask> tasks = new ConcurrentHashMap<>();
private SourceTaskOffsetCommitter sourceTaskOffsetCommitter;
- private WorkerConfigTransformer workerConfigTransformer;
- private ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy;
+ private final WorkerConfigTransformer workerConfigTransformer;
+ private final ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy;
public Worker(
String workerId,
@@ -216,6 +216,8 @@ public class Worker {
log.info("Worker stopped");
workerMetricsGroup.close();
+
+ workerConfigTransformer.close();
}
/**
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java
index 1a799bb..318626b 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.provider.ConfigProvider;
import org.apache.kafka.common.config.ConfigTransformer;
import org.apache.kafka.common.config.ConfigTransformerResult;
+import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.runtime.Herder.ConfigReloadAction;
import org.apache.kafka.connect.util.Callback;
import org.slf4j.Logger;
@@ -34,15 +35,17 @@ import java.util.concurrent.ConcurrentMap;
* A wrapper class to perform configuration transformations and schedule reloads for any
* retrieved TTL values.
*/
-public class WorkerConfigTransformer {
+public class WorkerConfigTransformer implements AutoCloseable {
private static final Logger log = LoggerFactory.getLogger(WorkerConfigTransformer.class);
private final Worker worker;
private final ConfigTransformer configTransformer;
private final ConcurrentMap<String, Map<String, HerderRequest>> requests = new ConcurrentHashMap<>();
+ private final Map<String, ConfigProvider> configProviders;
public WorkerConfigTransformer(Worker worker, Map<String, ConfigProvider> configProviders) {
this.worker = worker;
+ this.configProviders = configProviders;
this.configTransformer = new ConfigTransformer(configProviders);
}
@@ -98,4 +101,9 @@ public class WorkerConfigTransformer {
HerderRequest request = worker.herder().restartConnector(ttl, connectorName, cb);
connectorRequests.put(path, request);
}
+
+ @Override
+ public void close() {
+ configProviders.values().forEach(x -> Utils.closeQuietly(x, "config provider"));
+ }
}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
index 40b4df2..784508e 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
@@ -24,6 +24,7 @@ import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.config.provider.MockFileConfigProvider;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.connector.Connector;
@@ -75,6 +76,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.UUID;
import java.util.concurrent.ExecutorService;
import static org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperatorTest.NOOP_OPERATOR;
@@ -127,6 +129,7 @@ public class WorkerTest extends ThreadedTest {
@Mock private HeaderConverter taskHeaderConverter;
@Mock private ExecutorService executorService;
@MockNice private ConnectorConfig connectorConfig;
+ private String mockFileProviderTestId;
@Before
public void setup() {
@@ -139,6 +142,10 @@ public class WorkerTest extends ThreadedTest {
workerProps.put("internal.value.converter.schemas.enable", "false");
workerProps.put("offset.storage.file.filename", "/tmp/connect.offsets");
workerProps.put(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
+ workerProps.put("config.providers", "file");
+ workerProps.put("config.providers.file.class", MockFileConfigProvider.class.getName());
+ mockFileProviderTestId = UUID.randomUUID().toString();
+ workerProps.put("config.providers.file.param.testId", mockFileProviderTestId);
config = new StandaloneConfig(workerProps);
defaultProducerConfigs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
@@ -182,9 +189,11 @@ public class WorkerTest extends ThreadedTest {
EasyMock.expect(connector.version()).andReturn("1.0");
+ expectFileConfigProvider();
EasyMock.expect(plugins.compareAndSwapLoaders(connector))
.andReturn(delegatingLoader)
.times(2);
+
connector.initialize(anyObject(ConnectorContext.class));
EasyMock.expectLastCall();
connector.start(props);
@@ -230,12 +239,24 @@ public class WorkerTest extends ThreadedTest {
assertStatistics(worker, 0, 0);
PowerMock.verifyAll();
+ MockFileConfigProvider.assertClosed(mockFileProviderTestId);
+ }
+
+ private void expectFileConfigProvider() {
+ EasyMock.expect(plugins.newConfigProvider(EasyMock.anyObject(),
+ EasyMock.eq("config.providers.file"), EasyMock.anyObject()))
+ .andAnswer(() -> {
+ MockFileConfigProvider mockFileConfigProvider = new MockFileConfigProvider();
+ mockFileConfigProvider.configure(Collections.singletonMap("testId", mockFileProviderTestId));
+ return mockFileConfigProvider;
+ });
}
@Test
public void testStartConnectorFailure() {
expectConverters();
expectStartStorage();
+ expectFileConfigProvider();
Map<String, String> props = new HashMap<>();
props.put(SinkConnectorConfig.TOPICS_CONFIG, "foo,bar");
@@ -280,6 +301,7 @@ public class WorkerTest extends ThreadedTest {
public void testAddConnectorByAlias() {
expectConverters();
expectStartStorage();
+ expectFileConfigProvider();
EasyMock.expect(plugins.currentThreadLoader()).andReturn(delegatingLoader).times(2);
EasyMock.expect(plugins.newConnector("WorkerTestConnector")).andReturn(connector);
@@ -344,6 +366,7 @@ public class WorkerTest extends ThreadedTest {
public void testAddConnectorByShortAlias() {
expectConverters();
expectStartStorage();
+ expectFileConfigProvider();
EasyMock.expect(plugins.currentThreadLoader()).andReturn(delegatingLoader).times(2);
EasyMock.expect(plugins.newConnector("WorkerTest")).andReturn(connector);
@@ -405,6 +428,7 @@ public class WorkerTest extends ThreadedTest {
public void testStopInvalidConnector() {
expectConverters();
expectStartStorage();
+ expectFileConfigProvider();
PowerMock.replayAll();
@@ -420,6 +444,7 @@ public class WorkerTest extends ThreadedTest {
public void testReconfigureConnectorTasks() {
expectConverters();
expectStartStorage();
+ expectFileConfigProvider();
EasyMock.expect(plugins.currentThreadLoader()).andReturn(delegatingLoader).times(3);
EasyMock.expect(plugins.newConnector(WorkerTestConnector.class.getName()))
@@ -508,6 +533,7 @@ public class WorkerTest extends ThreadedTest {
public void testAddRemoveTask() throws Exception {
expectConverters();
expectStartStorage();
+ expectFileConfigProvider();
EasyMock.expect(workerTask.id()).andStubReturn(TASK_ID);
@@ -602,6 +628,7 @@ public class WorkerTest extends ThreadedTest {
public void testStartTaskFailure() {
expectConverters();
expectStartStorage();
+ expectFileConfigProvider();
Map<String, String> origProps = new HashMap<>();
origProps.put(TaskConfig.TASK_CLASS_CONFIG, "missing.From.This.Workers.Classpath");
@@ -647,6 +674,7 @@ public class WorkerTest extends ThreadedTest {
public void testCleanupTasksOnStop() throws Exception {
expectConverters();
expectStartStorage();
+ expectFileConfigProvider();
EasyMock.expect(workerTask.id()).andStubReturn(TASK_ID);
@@ -737,6 +765,7 @@ public class WorkerTest extends ThreadedTest {
public void testConverterOverrides() throws Exception {
expectConverters();
expectStartStorage();
+ expectFileConfigProvider();
EasyMock.expect(workerTask.id()).andStubReturn(TASK_ID);