You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by kk...@apache.org on 2020/06/10 22:58:06 UTC
[kafka] branch 2.5 updated: KAFKA-9969: Exclude
ConnectorClientConfigRequest from class loading isolation (#8630)
This is an automated email from the ASF dual-hosted git repository.
kkarantasis pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.5 by this push:
new 0cde74e KAFKA-9969: Exclude ConnectorClientConfigRequest from class loading isolation (#8630)
0cde74e is described below
commit 0cde74e97773737ddbd2e03408b6bcc1560be6a6
Author: Greg Harris <gr...@confluent.io>
AuthorDate: Wed Jun 10 15:04:36 2020 -0700
KAFKA-9969: Exclude ConnectorClientConfigRequest from class loading isolation (#8630)
This fix excludes `ConnectorClientConfigRequest` and its inner class from class loading isolation in a similar way that KAFKA-8415 excluded `ConnectorClientConfigOverridePolicy`.
Reviewer: Konstantine Karantasis <ko...@confluent.io>
---
.../connect/runtime/isolation/PluginUtils.java | 2 +-
.../connect/runtime/isolation/PluginUtilsTest.java | 354 +++++++++++++++------
2 files changed, 258 insertions(+), 98 deletions(-)
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java
index 36feac5..acb26fb 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java
@@ -137,7 +137,7 @@ public class PluginUtils {
+ "|storage\\.StringConverter"
+ "|storage\\.SimpleHeaderConverter"
+ "|rest\\.basic\\.auth\\.extension\\.BasicAuthSecurityRestExtension"
- + "|connector\\.policy\\.(?!ConnectorClientConfigOverridePolicy$).*"
+ + "|connector\\.policy\\.(?!ConnectorClientConfig(?:OverridePolicy|Request(?:\\$ClientType)?)$).*"
+ ")"
+ "|common\\.config\\.provider\\.(?!ConfigProvider$).*"
+ ")$");
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java
index c406ead..1976698 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java
@@ -70,7 +70,7 @@ public class PluginUtilsTest {
}
@Test
- public void testConnectFrameworkClasses() {
+ public void testKafkaDependencyClasses() {
assertFalse(PluginUtils.shouldLoadInIsolation("org.apache.kafka.common."));
assertFalse(PluginUtils.shouldLoadInIsolation(
"org.apache.kafka.common.config.AbstractConfig")
@@ -81,30 +81,6 @@ public class PluginUtilsTest {
assertFalse(PluginUtils.shouldLoadInIsolation(
"org.apache.kafka.common.serialization.Deserializer")
);
- assertFalse(PluginUtils.shouldLoadInIsolation("org.apache.kafka.connect."));
- assertFalse(PluginUtils.shouldLoadInIsolation(
- "org.apache.kafka.connect.connector.Connector")
- );
- assertFalse(PluginUtils.shouldLoadInIsolation(
- "org.apache.kafka.connect.source.SourceConnector")
- );
- assertFalse(PluginUtils.shouldLoadInIsolation(
- "org.apache.kafka.connect.sink.SinkConnector")
- );
- assertFalse(PluginUtils.shouldLoadInIsolation("org.apache.kafka.connect.connector.Task"));
- assertFalse(PluginUtils.shouldLoadInIsolation(
- "org.apache.kafka.connect.source.SourceTask")
- );
- assertFalse(PluginUtils.shouldLoadInIsolation("org.apache.kafka.connect.sink.SinkTask"));
- assertFalse(PluginUtils.shouldLoadInIsolation(
- "org.apache.kafka.connect.transforms.Transformation")
- );
- assertFalse(PluginUtils.shouldLoadInIsolation(
- "org.apache.kafka.connect.storage.Converter")
- );
- assertFalse(PluginUtils.shouldLoadInIsolation(
- "org.apache.kafka.connect.storage.OffsetBackingStore")
- );
assertFalse(PluginUtils.shouldLoadInIsolation(
"org.apache.kafka.clients.producer.ProducerConfig")
);
@@ -114,68 +90,271 @@ public class PluginUtilsTest {
assertFalse(PluginUtils.shouldLoadInIsolation(
"org.apache.kafka.clients.admin.KafkaAdminClient")
);
- assertFalse(PluginUtils.shouldLoadInIsolation(
- "org.apache.kafka.connect.rest.ConnectRestExtension")
- );
}
@Test
- public void testAllowedConnectFrameworkClasses() {
- assertTrue(PluginUtils.shouldLoadInIsolation("org.apache.kafka.connect.transforms."));
- assertTrue(PluginUtils.shouldLoadInIsolation(
- "org.apache.kafka.connect.transforms.ExtractField")
- );
- assertTrue(PluginUtils.shouldLoadInIsolation(
- "org.apache.kafka.connect.transforms.ExtractField$Key")
- );
- assertTrue(PluginUtils.shouldLoadInIsolation("org.apache.kafka.connect.json."));
- assertTrue(PluginUtils.shouldLoadInIsolation(
- "org.apache.kafka.connect.json.JsonConverter")
- );
- assertTrue(PluginUtils.shouldLoadInIsolation(
- "org.apache.kafka.connect.json.JsonConverter$21")
- );
- assertTrue(PluginUtils.shouldLoadInIsolation("org.apache.kafka.connect.file."));
- assertTrue(PluginUtils.shouldLoadInIsolation(
- "org.apache.kafka.connect.file.FileStreamSourceTask")
- );
- assertTrue(PluginUtils.shouldLoadInIsolation(
- "org.apache.kafka.connect.file.FileStreamSinkConnector")
- );
+ public void testConnectApiClasses() {
+ List<String> apiClasses = Arrays.asList(
+ // Enumerate all packages and classes
+ "org.apache.kafka.connect.",
+ "org.apache.kafka.connect.components.",
+ "org.apache.kafka.connect.components.Versioned",
+ //"org.apache.kafka.connect.connector.policy.", isolated by default
+ "org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy",
+ "org.apache.kafka.connect.connector.policy.ConnectorClientConfigRequest",
+ "org.apache.kafka.connect.connector.policy.ConnectorClientConfigRequest$ClientType",
+ "org.apache.kafka.connect.connector.",
+ "org.apache.kafka.connect.connector.Connector",
+ "org.apache.kafka.connect.connector.ConnectorContext",
+ "org.apache.kafka.connect.connector.ConnectRecord",
+ "org.apache.kafka.connect.connector.Task",
+ "org.apache.kafka.connect.data.",
+ "org.apache.kafka.connect.data.ConnectSchema",
+ "org.apache.kafka.connect.data.Date",
+ "org.apache.kafka.connect.data.Decimal",
+ "org.apache.kafka.connect.data.Field",
+ "org.apache.kafka.connect.data.Schema",
+ "org.apache.kafka.connect.data.SchemaAndValue",
+ "org.apache.kafka.connect.data.SchemaBuilder",
+ "org.apache.kafka.connect.data.SchemaProjector",
+ "org.apache.kafka.connect.data.Struct",
+ "org.apache.kafka.connect.data.Time",
+ "org.apache.kafka.connect.data.Timestamp",
+ "org.apache.kafka.connect.data.Values",
+ "org.apache.kafka.connect.errors.",
+ "org.apache.kafka.connect.errors.AlreadyExistsException",
+ "org.apache.kafka.connect.errors.ConnectException",
+ "org.apache.kafka.connect.errors.DataException",
+ "org.apache.kafka.connect.errors.IllegalWorkerStateException",
+ "org.apache.kafka.connect.errors.NotFoundException",
+ "org.apache.kafka.connect.errors.RetriableException",
+ "org.apache.kafka.connect.errors.SchemaBuilderException",
+ "org.apache.kafka.connect.errors.SchemaProjectorException",
+ "org.apache.kafka.connect.header.",
+ "org.apache.kafka.connect.header.ConnectHeader",
+ "org.apache.kafka.connect.header.ConnectHeaders",
+ "org.apache.kafka.connect.header.Header",
+ "org.apache.kafka.connect.header.Headers",
+ "org.apache.kafka.connect.health.",
+ "org.apache.kafka.connect.health.AbstractState",
+ "org.apache.kafka.connect.health.ConnectClusterDetails",
+ "org.apache.kafka.connect.health.ConnectClusterState",
+ "org.apache.kafka.connect.health.ConnectorHealth",
+ "org.apache.kafka.connect.health.ConnectorState",
+ "org.apache.kafka.connect.health.ConnectorType",
+ "org.apache.kafka.connect.health.TaskState",
+ "org.apache.kafka.connect.rest.",
+ "org.apache.kafka.connect.rest.ConnectRestExtension",
+ "org.apache.kafka.connect.rest.ConnectRestExtensionContext",
+ "org.apache.kafka.connect.sink.",
+ "org.apache.kafka.connect.sink.SinkConnector",
+ "org.apache.kafka.connect.sink.SinkRecord",
+ "org.apache.kafka.connect.sink.SinkTask",
+ "org.apache.kafka.connect.sink.SinkTaskContext",
+ "org.apache.kafka.connect.sink.ErrantRecordReporter",
+ "org.apache.kafka.connect.source.",
+ "org.apache.kafka.connect.source.SourceConnector",
+ "org.apache.kafka.connect.source.SourceRecord",
+ "org.apache.kafka.connect.source.SourceTask",
+ "org.apache.kafka.connect.source.SourceTaskContext",
+ "org.apache.kafka.connect.storage.",
+ "org.apache.kafka.connect.storage.Converter",
+ "org.apache.kafka.connect.storage.ConverterConfig",
+ "org.apache.kafka.connect.storage.ConverterType",
+ "org.apache.kafka.connect.storage.HeaderConverter",
+ "org.apache.kafka.connect.storage.OffsetStorageReader",
+ //"org.apache.kafka.connect.storage.SimpleHeaderConverter", explicitly isolated
+ //"org.apache.kafka.connect.storage.StringConverter", explicitly isolated
+ "org.apache.kafka.connect.storage.StringConverterConfig",
+ //"org.apache.kafka.connect.transforms.", isolated by default
+ "org.apache.kafka.connect.transforms.Transformation",
+ "org.apache.kafka.connect.transforms.predicates.Predicate",
+ "org.apache.kafka.connect.util.",
+ "org.apache.kafka.connect.util.ConnectorUtils"
+ );
+ // Classes in the API should never be loaded in isolation.
+ for (String clazz : apiClasses) {
+ assertFalse(
+ clazz + " from 'api' is loaded in isolation but should not be",
+ PluginUtils.shouldLoadInIsolation(clazz)
+ );
+ }
+ }
+
+ @Test
+ public void testConnectRuntimeClasses() {
+ // Only list packages, because there are too many classes.
+ List<String> runtimeClasses = Arrays.asList(
+ "org.apache.kafka.connect.cli.",
+ //"org.apache.kafka.connect.connector.policy.", isolated by default
+ //"org.apache.kafka.connect.converters.", isolated by default
+ "org.apache.kafka.connect.runtime.",
+ "org.apache.kafka.connect.runtime.distributed",
+ "org.apache.kafka.connect.runtime.errors",
+ "org.apache.kafka.connect.runtime.health",
+ "org.apache.kafka.connect.runtime.isolation",
+ "org.apache.kafka.connect.runtime.rest.",
+ "org.apache.kafka.connect.runtime.rest.entities.",
+ "org.apache.kafka.connect.runtime.rest.errors.",
+ "org.apache.kafka.connect.runtime.rest.resources.",
+ "org.apache.kafka.connect.runtime.rest.util.",
+ "org.apache.kafka.connect.runtime.standalone.",
+ "org.apache.kafka.connect.runtime.rest.",
+ "org.apache.kafka.connect.storage.",
+ "org.apache.kafka.connect.tools.",
+ "org.apache.kafka.connect.util."
+ );
+ for (String clazz : runtimeClasses) {
+ assertFalse(
+ clazz + " from 'runtime' is loaded in isolation but should not be",
+ PluginUtils.shouldLoadInIsolation(clazz)
+ );
+ }
+ }
+
+ @Test
+ public void testAllowedRuntimeClasses() {
+ List<String> jsonConverterClasses = Arrays.asList(
+ "org.apache.kafka.connect.connector.policy.",
+ "org.apache.kafka.connect.connector.policy.AbstractConnectorClientConfigOverridePolicy",
+ "org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy",
+ "org.apache.kafka.connect.connector.policy.NoneConnectorClientConfigOverridePolicy",
+ "org.apache.kafka.connect.connector.policy.PrincipalConnectorClientConfigOverridePolicy",
+ "org.apache.kafka.connect.converters.",
+ "org.apache.kafka.connect.converters.ByteArrayConverter",
+ "org.apache.kafka.connect.converters.DoubleConverter",
+ "org.apache.kafka.connect.converters.FloatConverter",
+ "org.apache.kafka.connect.converters.IntegerConverter",
+ "org.apache.kafka.connect.converters.LongConverter",
+ "org.apache.kafka.connect.converters.NumberConverter",
+ "org.apache.kafka.connect.converters.NumberConverterConfig",
+ "org.apache.kafka.connect.converters.ShortConverter",
+ //"org.apache.kafka.connect.storage.", not isolated by default
+ "org.apache.kafka.connect.storage.StringConverter",
+ "org.apache.kafka.connect.storage.SimpleHeaderConverter"
+ );
+ for (String clazz : jsonConverterClasses) {
+ assertTrue(
+ clazz + " from 'runtime' is not loaded in isolation but should be",
+ PluginUtils.shouldLoadInIsolation(clazz)
+ );
+ }
+ }
+
+ @Test
+ public void testTransformsClasses() {
+ List<String> transformsClasses = Arrays.asList(
+ "org.apache.kafka.connect.transforms.",
+ "org.apache.kafka.connect.transforms.util.",
+ "org.apache.kafka.connect.transforms.util.NonEmptyListValidator",
+ "org.apache.kafka.connect.transforms.util.RegexValidator",
+ "org.apache.kafka.connect.transforms.util.Requirements",
+ "org.apache.kafka.connect.transforms.util.SchemaUtil",
+ "org.apache.kafka.connect.transforms.util.SimpleConfig",
+ "org.apache.kafka.connect.transforms.Cast",
+ "org.apache.kafka.connect.transforms.Cast$Key",
+ "org.apache.kafka.connect.transforms.Cast$Value",
+ "org.apache.kafka.connect.transforms.ExtractField",
+ "org.apache.kafka.connect.transforms.ExtractField$Key",
+ "org.apache.kafka.connect.transforms.ExtractField$Value",
+ "org.apache.kafka.connect.transforms.Flatten",
+ "org.apache.kafka.connect.transforms.Flatten$Key",
+ "org.apache.kafka.connect.transforms.Flatten$Value",
+ "org.apache.kafka.connect.transforms.HoistField",
+ "org.apache.kafka.connect.transforms.HoistField$Key",
+ "org.apache.kafka.connect.transforms.HoistField$Key",
+ "org.apache.kafka.connect.transforms.InsertField",
+ "org.apache.kafka.connect.transforms.InsertField$Key",
+ "org.apache.kafka.connect.transforms.InsertField$Value",
+ "org.apache.kafka.connect.transforms.MaskField",
+ "org.apache.kafka.connect.transforms.MaskField$Key",
+ "org.apache.kafka.connect.transforms.MaskField$Value",
+ "org.apache.kafka.connect.transforms.RegexRouter",
+ "org.apache.kafka.connect.transforms.ReplaceField",
+ "org.apache.kafka.connect.transforms.ReplaceField$Key",
+ "org.apache.kafka.connect.transforms.ReplaceField$Value",
+ "org.apache.kafka.connect.transforms.SetSchemaMetadata",
+ "org.apache.kafka.connect.transforms.SetSchemaMetadata$Key",
+ "org.apache.kafka.connect.transforms.SetSchemaMetadata$Value",
+ "org.apache.kafka.connect.transforms.TimestampConverter",
+ "org.apache.kafka.connect.transforms.TimestampConverter$Key",
+ "org.apache.kafka.connect.transforms.TimestampConverter$Value",
+ "org.apache.kafka.connect.transforms.TimestampRouter",
+ "org.apache.kafka.connect.transforms.TimestampRouter$Key",
+ "org.apache.kafka.connect.transforms.TimestampRouter$Value",
+ "org.apache.kafka.connect.transforms.ValueToKey",
+ "org.apache.kafka.connect.transforms.predicates.",
+ "org.apache.kafka.connect.transforms.predicates.HasHeaderKey",
+ "org.apache.kafka.connect.transforms.predicates.RecordIsTombstone",
+ "org.apache.kafka.connect.transforms.predicates.TopicNameMatches"
+ );
+ for (String clazz : transformsClasses) {
+ assertTrue(
+ clazz + " from 'transforms' is not loaded in isolation but should be",
+ PluginUtils.shouldLoadInIsolation(clazz)
+ );
+ }
+ }
+
+ @Test
+ public void testAllowedJsonConverterClasses() {
+ List<String> jsonConverterClasses = Arrays.asList(
+ "org.apache.kafka.connect.json.",
+ "org.apache.kafka.connect.json.DecimalFormat",
+ "org.apache.kafka.connect.json.JsonConverter",
+ "org.apache.kafka.connect.json.JsonConverterConfig",
+ "org.apache.kafka.connect.json.JsonDeserializer",
+ "org.apache.kafka.connect.json.JsonSchema",
+ "org.apache.kafka.connect.json.JsonSerializer"
+ );
+ for (String clazz : jsonConverterClasses) {
+ assertTrue(
+ clazz + " from 'json' is not loaded in isolation but should be",
+ PluginUtils.shouldLoadInIsolation(clazz)
+ );
+ }
+ }
+
+ @Test
+ public void testAllowedFileConnectors() {
+ List<String> jsonConverterClasses = Arrays.asList(
+ "org.apache.kafka.connect.file.",
+ "org.apache.kafka.connect.file.FileStreamSinkConnector",
+ "org.apache.kafka.connect.file.FileStreamSinkTask",
+ "org.apache.kafka.connect.file.FileStreamSourceConnector",
+ "org.apache.kafka.connect.file.FileStreamSourceTask"
+ );
+ for (String clazz : jsonConverterClasses) {
+ assertTrue(
+ clazz + " from 'file' is not loaded in isolation but should be",
+ PluginUtils.shouldLoadInIsolation(clazz)
+ );
+ }
+ }
+
+ @Test
+ public void testAllowedBasicAuthExtensionClasses() {
+ List<String> basicAuthExtensionClasses = Arrays.asList(
+ "org.apache.kafka.connect.rest.basic.auth.extension.BasicAuthSecurityRestExtension"
+ //"org.apache.kafka.connect.rest.basic.auth.extension.JaasBasicAuthFilter", TODO fix?
+ //"org.apache.kafka.connect.rest.basic.auth.extension.PropertyFileLoginModule" TODO fix?
+ );
+ for (String clazz : basicAuthExtensionClasses) {
+ assertTrue(
+ clazz + " from 'basic-auth-extension' is not loaded in isolation but should be",
+ PluginUtils.shouldLoadInIsolation(clazz)
+ );
+ }
+ }
+
+ @Test
+ public void testMirrorClasses() {
assertTrue(PluginUtils.shouldLoadInIsolation(
"org.apache.kafka.connect.mirror.MirrorSourceTask")
);
assertTrue(PluginUtils.shouldLoadInIsolation(
"org.apache.kafka.connect.mirror.MirrorSourceConnector")
);
- assertTrue(PluginUtils.shouldLoadInIsolation("org.apache.kafka.connect.converters."));
- assertTrue(PluginUtils.shouldLoadInIsolation(
- "org.apache.kafka.connect.converters.ByteArrayConverter")
- );
- assertTrue(PluginUtils.shouldLoadInIsolation(
- "org.apache.kafka.connect.converters.DoubleConverter")
- );
- assertTrue(PluginUtils.shouldLoadInIsolation(
- "org.apache.kafka.connect.converters.FloatConverter")
- );
- assertTrue(PluginUtils.shouldLoadInIsolation(
- "org.apache.kafka.connect.converters.IntegerConverter")
- );
- assertTrue(PluginUtils.shouldLoadInIsolation(
- "org.apache.kafka.connect.converters.LongConverter")
- );
- assertTrue(PluginUtils.shouldLoadInIsolation(
- "org.apache.kafka.connect.converters.ShortConverter")
- );
- assertTrue(PluginUtils.shouldLoadInIsolation(
- "org.apache.kafka.connect.storage.StringConverter")
- );
- assertTrue(PluginUtils.shouldLoadInIsolation(
- "org.apache.kafka.connect.storage.SimpleHeaderConverter")
- );
- assertTrue(PluginUtils.shouldLoadInIsolation(
- "org.apache.kafka.connect.rest.basic.auth.extension.BasicAuthSecurityRestExtension"
- ));
}
@Test
@@ -192,25 +371,6 @@ public class PluginUtilsTest {
}
@Test
- public void testConnectorClientConfigOverridePolicy() {
- assertFalse(PluginUtils.shouldLoadInIsolation(
- "org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy")
- );
- assertTrue(PluginUtils.shouldLoadInIsolation(
- "org.apache.kafka.connect.connector.policy.AbstractConnectorClientConfigOverridePolicy")
- );
- assertTrue(PluginUtils.shouldLoadInIsolation(
- "org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy")
- );
- assertTrue(PluginUtils.shouldLoadInIsolation(
- "org.apache.kafka.connect.connector.policy.NoneConnectorClientConfigOverridePolicy")
- );
- assertTrue(PluginUtils.shouldLoadInIsolation(
- "org.apache.kafka.connect.connector.policy.PrincipalConnectorClientConfigOverridePolicy")
- );
- }
-
- @Test
public void testEmptyPluginUrls() throws Exception {
assertEquals(Collections.<Path>emptyList(), PluginUtils.pluginUrls(pluginPath));
}