You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by kk...@apache.org on 2020/06/10 23:38:37 UTC

[kafka] branch 2.3 updated: KAFKA-9969: Exclude ConnectorClientConfigRequest from class loading isolation (#8630)

This is an automated email from the ASF dual-hosted git repository.

kkarantasis pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.3 by this push:
     new a342103  KAFKA-9969: Exclude ConnectorClientConfigRequest from class loading isolation (#8630)
a342103 is described below

commit a3421039e531db944b3acda12c4f091741afe06d
Author: Greg Harris <gr...@confluent.io>
AuthorDate: Wed Jun 10 15:04:36 2020 -0700

    KAFKA-9969: Exclude ConnectorClientConfigRequest from class loading isolation (#8630)
    
    This fix excludes `ConnectorClientConfigRequest` and its inner class from class loading isolation in a similar way that KAFKA-8415 excluded `ConnectorClientConfigOverridePolicy`.
    
    Reviewer: Konstantine Karantasis <ko...@confluent.io>
---
 .../connect/runtime/isolation/PluginUtils.java     |   2 +-
 .../connect/runtime/isolation/PluginUtilsTest.java | 343 +++++++++++++++------
 2 files changed, 248 insertions(+), 97 deletions(-)

diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java
index b9d5470..63805b1 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java
@@ -135,7 +135,7 @@ public class PluginUtils {
             + "|storage\\.StringConverter"
             + "|storage\\.SimpleHeaderConverter"
             + "|rest\\.basic\\.auth\\.extension\\.BasicAuthSecurityRestExtension"
-            + "|connector\\.policy\\.(?!ConnectorClientConfigOverridePolicy$).*"
+            + "|connector\\.policy\\.(?!ConnectorClientConfig(?:OverridePolicy|Request(?:\\$ClientType)?)$).*"
             + ")"
             + "|common\\.config\\.provider\\.(?!ConfigProvider$).*"
             + ")$");
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java
index bf441ff..d59967d 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java
@@ -70,7 +70,7 @@ public class PluginUtilsTest {
     }
 
     @Test
-    public void testConnectFrameworkClasses() {
+    public void testKafkaDependencyClasses() {
         assertFalse(PluginUtils.shouldLoadInIsolation("org.apache.kafka.common."));
         assertFalse(PluginUtils.shouldLoadInIsolation(
                 "org.apache.kafka.common.config.AbstractConfig")
@@ -81,30 +81,6 @@ public class PluginUtilsTest {
         assertFalse(PluginUtils.shouldLoadInIsolation(
                 "org.apache.kafka.common.serialization.Deserializer")
         );
-        assertFalse(PluginUtils.shouldLoadInIsolation("org.apache.kafka.connect."));
-        assertFalse(PluginUtils.shouldLoadInIsolation(
-                "org.apache.kafka.connect.connector.Connector")
-        );
-        assertFalse(PluginUtils.shouldLoadInIsolation(
-                "org.apache.kafka.connect.source.SourceConnector")
-        );
-        assertFalse(PluginUtils.shouldLoadInIsolation(
-                "org.apache.kafka.connect.sink.SinkConnector")
-        );
-        assertFalse(PluginUtils.shouldLoadInIsolation("org.apache.kafka.connect.connector.Task"));
-        assertFalse(PluginUtils.shouldLoadInIsolation(
-                "org.apache.kafka.connect.source.SourceTask")
-        );
-        assertFalse(PluginUtils.shouldLoadInIsolation("org.apache.kafka.connect.sink.SinkTask"));
-        assertFalse(PluginUtils.shouldLoadInIsolation(
-                "org.apache.kafka.connect.transforms.Transformation")
-        );
-        assertFalse(PluginUtils.shouldLoadInIsolation(
-                "org.apache.kafka.connect.storage.Converter")
-        );
-        assertFalse(PluginUtils.shouldLoadInIsolation(
-                "org.apache.kafka.connect.storage.OffsetBackingStore")
-        );
         assertFalse(PluginUtils.shouldLoadInIsolation(
                 "org.apache.kafka.clients.producer.ProducerConfig")
         );
@@ -114,62 +90,256 @@ public class PluginUtilsTest {
         assertFalse(PluginUtils.shouldLoadInIsolation(
                 "org.apache.kafka.clients.admin.KafkaAdminClient")
         );
-        assertFalse(PluginUtils.shouldLoadInIsolation(
-                "org.apache.kafka.connect.rest.ConnectRestExtension")
-        );
     }
 
     @Test
-    public void testAllowedConnectFrameworkClasses() {
-        assertTrue(PluginUtils.shouldLoadInIsolation("org.apache.kafka.connect.transforms."));
-        assertTrue(PluginUtils.shouldLoadInIsolation(
-                "org.apache.kafka.connect.transforms.ExtractField")
-        );
-        assertTrue(PluginUtils.shouldLoadInIsolation(
-                "org.apache.kafka.connect.transforms.ExtractField$Key")
-        );
-        assertTrue(PluginUtils.shouldLoadInIsolation("org.apache.kafka.connect.json."));
-        assertTrue(PluginUtils.shouldLoadInIsolation(
-                "org.apache.kafka.connect.json.JsonConverter")
-        );
-        assertTrue(PluginUtils.shouldLoadInIsolation(
-                "org.apache.kafka.connect.json.JsonConverter$21")
-        );
-        assertTrue(PluginUtils.shouldLoadInIsolation("org.apache.kafka.connect.file."));
-        assertTrue(PluginUtils.shouldLoadInIsolation(
-                "org.apache.kafka.connect.file.FileStreamSourceTask")
-        );
-        assertTrue(PluginUtils.shouldLoadInIsolation(
-                "org.apache.kafka.connect.file.FileStreamSinkConnector")
-        );
-        assertTrue(PluginUtils.shouldLoadInIsolation("org.apache.kafka.connect.converters."));
-        assertTrue(PluginUtils.shouldLoadInIsolation(
-                "org.apache.kafka.connect.converters.ByteArrayConverter")
-        );
-        assertTrue(PluginUtils.shouldLoadInIsolation(
-                "org.apache.kafka.connect.converters.DoubleConverter")
-        );
-        assertTrue(PluginUtils.shouldLoadInIsolation(
-                "org.apache.kafka.connect.converters.FloatConverter")
-        );
-        assertTrue(PluginUtils.shouldLoadInIsolation(
-                "org.apache.kafka.connect.converters.IntegerConverter")
-        );
-        assertTrue(PluginUtils.shouldLoadInIsolation(
-                "org.apache.kafka.connect.converters.LongConverter")
-        );
-        assertTrue(PluginUtils.shouldLoadInIsolation(
-                "org.apache.kafka.connect.converters.ShortConverter")
-        );
-        assertTrue(PluginUtils.shouldLoadInIsolation(
-                "org.apache.kafka.connect.storage.StringConverter")
-        );
-        assertTrue(PluginUtils.shouldLoadInIsolation(
-                "org.apache.kafka.connect.storage.SimpleHeaderConverter")
-        );
-        assertTrue(PluginUtils.shouldLoadInIsolation(
+    public void testConnectApiClasses() {
+        List<String> apiClasses = Arrays.asList(
+            // Enumerate all packages and classes
+            "org.apache.kafka.connect.",
+            "org.apache.kafka.connect.components.",
+            "org.apache.kafka.connect.components.Versioned",
+            //"org.apache.kafka.connect.connector.policy.", isolated by default
+            "org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy",
+            "org.apache.kafka.connect.connector.policy.ConnectorClientConfigRequest",
+            "org.apache.kafka.connect.connector.policy.ConnectorClientConfigRequest$ClientType",
+            "org.apache.kafka.connect.connector.",
+            "org.apache.kafka.connect.connector.Connector",
+            "org.apache.kafka.connect.connector.ConnectorContext",
+            "org.apache.kafka.connect.connector.ConnectRecord",
+            "org.apache.kafka.connect.connector.Task",
+            "org.apache.kafka.connect.data.",
+            "org.apache.kafka.connect.data.ConnectSchema",
+            "org.apache.kafka.connect.data.Date",
+            "org.apache.kafka.connect.data.Decimal",
+            "org.apache.kafka.connect.data.Field",
+            "org.apache.kafka.connect.data.Schema",
+            "org.apache.kafka.connect.data.SchemaAndValue",
+            "org.apache.kafka.connect.data.SchemaBuilder",
+            "org.apache.kafka.connect.data.SchemaProjector",
+            "org.apache.kafka.connect.data.Struct",
+            "org.apache.kafka.connect.data.Time",
+            "org.apache.kafka.connect.data.Timestamp",
+            "org.apache.kafka.connect.data.Values",
+            "org.apache.kafka.connect.errors.",
+            "org.apache.kafka.connect.errors.AlreadyExistsException",
+            "org.apache.kafka.connect.errors.ConnectException",
+            "org.apache.kafka.connect.errors.DataException",
+            "org.apache.kafka.connect.errors.IllegalWorkerStateException",
+            "org.apache.kafka.connect.errors.NotFoundException",
+            "org.apache.kafka.connect.errors.RetriableException",
+            "org.apache.kafka.connect.errors.SchemaBuilderException",
+            "org.apache.kafka.connect.errors.SchemaProjectorException",
+            "org.apache.kafka.connect.header.",
+            "org.apache.kafka.connect.header.ConnectHeader",
+            "org.apache.kafka.connect.header.ConnectHeaders",
+            "org.apache.kafka.connect.header.Header",
+            "org.apache.kafka.connect.header.Headers",
+            "org.apache.kafka.connect.health.",
+            "org.apache.kafka.connect.health.AbstractState",
+            "org.apache.kafka.connect.health.ConnectClusterDetails",
+            "org.apache.kafka.connect.health.ConnectClusterState",
+            "org.apache.kafka.connect.health.ConnectorHealth",
+            "org.apache.kafka.connect.health.ConnectorState",
+            "org.apache.kafka.connect.health.ConnectorType",
+            "org.apache.kafka.connect.health.TaskState",
+            "org.apache.kafka.connect.rest.",
+            "org.apache.kafka.connect.rest.ConnectRestExtension",
+            "org.apache.kafka.connect.rest.ConnectRestExtensionContext",
+            "org.apache.kafka.connect.sink.",
+            "org.apache.kafka.connect.sink.SinkConnector",
+            "org.apache.kafka.connect.sink.SinkRecord",
+            "org.apache.kafka.connect.sink.SinkTask",
+            "org.apache.kafka.connect.sink.SinkTaskContext",
+            "org.apache.kafka.connect.sink.ErrantRecordReporter",
+            "org.apache.kafka.connect.source.",
+            "org.apache.kafka.connect.source.SourceConnector",
+            "org.apache.kafka.connect.source.SourceRecord",
+            "org.apache.kafka.connect.source.SourceTask",
+            "org.apache.kafka.connect.source.SourceTaskContext",
+            "org.apache.kafka.connect.storage.",
+            "org.apache.kafka.connect.storage.Converter",
+            "org.apache.kafka.connect.storage.ConverterConfig",
+            "org.apache.kafka.connect.storage.ConverterType",
+            "org.apache.kafka.connect.storage.HeaderConverter",
+            "org.apache.kafka.connect.storage.OffsetStorageReader",
+            //"org.apache.kafka.connect.storage.SimpleHeaderConverter", explicitly isolated
+            //"org.apache.kafka.connect.storage.StringConverter", explicitly isolated
+            "org.apache.kafka.connect.storage.StringConverterConfig",
+            //"org.apache.kafka.connect.transforms.", isolated by default
+            "org.apache.kafka.connect.transforms.Transformation",
+            "org.apache.kafka.connect.util.",
+            "org.apache.kafka.connect.util.ConnectorUtils"
+        );
+        // Classes in the API should never be loaded in isolation.
+        for (String clazz : apiClasses) {
+            assertFalse(
+                clazz + " from 'api' is loaded in isolation but should not be",
+                PluginUtils.shouldLoadInIsolation(clazz)
+            );
+        }
+    }
+
+    @Test
+    public void testConnectRuntimeClasses() {
+        // Only list packages, because there are too many classes.
+        List<String> runtimeClasses = Arrays.asList(
+            "org.apache.kafka.connect.cli.",
+            //"org.apache.kafka.connect.connector.policy.", isolated by default
+            //"org.apache.kafka.connect.converters.", isolated by default
+            "org.apache.kafka.connect.runtime.",
+            "org.apache.kafka.connect.runtime.distributed",
+            "org.apache.kafka.connect.runtime.errors",
+            "org.apache.kafka.connect.runtime.health",
+            "org.apache.kafka.connect.runtime.isolation",
+            "org.apache.kafka.connect.runtime.rest.",
+            "org.apache.kafka.connect.runtime.rest.entities.",
+            "org.apache.kafka.connect.runtime.rest.errors.",
+            "org.apache.kafka.connect.runtime.rest.resources.",
+            "org.apache.kafka.connect.runtime.rest.util.",
+            "org.apache.kafka.connect.runtime.standalone.",
+            "org.apache.kafka.connect.runtime.rest.",
+            "org.apache.kafka.connect.storage.",
+            "org.apache.kafka.connect.tools.",
+            "org.apache.kafka.connect.util."
+        );
+        for (String clazz : runtimeClasses) {
+            assertFalse(
+                clazz + " from 'runtime' is loaded in isolation but should not be",
+                PluginUtils.shouldLoadInIsolation(clazz)
+            );
+        }
+    }
+
+    @Test
+    public void testAllowedRuntimeClasses() {
+        List<String> jsonConverterClasses = Arrays.asList(
+            "org.apache.kafka.connect.connector.policy.",
+            "org.apache.kafka.connect.connector.policy.AbstractConnectorClientConfigOverridePolicy",
+            "org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy",
+            "org.apache.kafka.connect.connector.policy.NoneConnectorClientConfigOverridePolicy",
+            "org.apache.kafka.connect.connector.policy.PrincipalConnectorClientConfigOverridePolicy",
+            "org.apache.kafka.connect.converters.",
+            "org.apache.kafka.connect.converters.ByteArrayConverter",
+            "org.apache.kafka.connect.converters.DoubleConverter",
+            "org.apache.kafka.connect.converters.FloatConverter",
+            "org.apache.kafka.connect.converters.IntegerConverter",
+            "org.apache.kafka.connect.converters.LongConverter",
+            "org.apache.kafka.connect.converters.NumberConverter",
+            "org.apache.kafka.connect.converters.NumberConverterConfig",
+            "org.apache.kafka.connect.converters.ShortConverter",
+            //"org.apache.kafka.connect.storage.", not isolated by default
+            "org.apache.kafka.connect.storage.StringConverter",
+            "org.apache.kafka.connect.storage.SimpleHeaderConverter"
+        );
+        for (String clazz : jsonConverterClasses) {
+            assertTrue(
+                clazz + " from 'runtime' is not loaded in isolation but should be",
+                PluginUtils.shouldLoadInIsolation(clazz)
+            );
+        }
+    }
+
+    @Test
+    public void testTransformsClasses() {
+        List<String> transformsClasses = Arrays.asList(
+            "org.apache.kafka.connect.transforms.",
+            "org.apache.kafka.connect.transforms.util.",
+            "org.apache.kafka.connect.transforms.util.NonEmptyListValidator",
+            "org.apache.kafka.connect.transforms.util.RegexValidator",
+            "org.apache.kafka.connect.transforms.util.Requirements",
+            "org.apache.kafka.connect.transforms.util.SchemaUtil",
+            "org.apache.kafka.connect.transforms.util.SimpleConfig",
+            "org.apache.kafka.connect.transforms.Cast",
+            "org.apache.kafka.connect.transforms.Cast$Key",
+            "org.apache.kafka.connect.transforms.Cast$Value",
+            "org.apache.kafka.connect.transforms.ExtractField",
+            "org.apache.kafka.connect.transforms.ExtractField$Key",
+            "org.apache.kafka.connect.transforms.ExtractField$Value",
+            "org.apache.kafka.connect.transforms.Flatten",
+            "org.apache.kafka.connect.transforms.Flatten$Key",
+            "org.apache.kafka.connect.transforms.Flatten$Value",
+            "org.apache.kafka.connect.transforms.HoistField",
+            "org.apache.kafka.connect.transforms.HoistField$Key",
+            "org.apache.kafka.connect.transforms.HoistField$Key",
+            "org.apache.kafka.connect.transforms.InsertField",
+            "org.apache.kafka.connect.transforms.InsertField$Key",
+            "org.apache.kafka.connect.transforms.InsertField$Value",
+            "org.apache.kafka.connect.transforms.MaskField",
+            "org.apache.kafka.connect.transforms.MaskField$Key",
+            "org.apache.kafka.connect.transforms.MaskField$Value",
+            "org.apache.kafka.connect.transforms.RegexRouter",
+            "org.apache.kafka.connect.transforms.ReplaceField",
+            "org.apache.kafka.connect.transforms.ReplaceField$Key",
+            "org.apache.kafka.connect.transforms.ReplaceField$Value",
+            "org.apache.kafka.connect.transforms.SetSchemaMetadata",
+            "org.apache.kafka.connect.transforms.SetSchemaMetadata$Key",
+            "org.apache.kafka.connect.transforms.SetSchemaMetadata$Value",
+            "org.apache.kafka.connect.transforms.TimestampConverter",
+            "org.apache.kafka.connect.transforms.TimestampConverter$Key",
+            "org.apache.kafka.connect.transforms.TimestampConverter$Value",
+            "org.apache.kafka.connect.transforms.TimestampRouter",
+            "org.apache.kafka.connect.transforms.TimestampRouter$Key",
+            "org.apache.kafka.connect.transforms.TimestampRouter$Value",
+            "org.apache.kafka.connect.transforms.ValueToKey"
+        );
+        for (String clazz : transformsClasses) {
+            assertTrue(
+                clazz + " from 'transforms' is not loaded in isolation but should be",
+                PluginUtils.shouldLoadInIsolation(clazz)
+            );
+        }
+    }
+
+    @Test
+    public void testAllowedJsonConverterClasses() {
+        List<String> jsonConverterClasses = Arrays.asList(
+            "org.apache.kafka.connect.json.",
+            "org.apache.kafka.connect.json.DecimalFormat",
+            "org.apache.kafka.connect.json.JsonConverter",
+            "org.apache.kafka.connect.json.JsonConverterConfig",
+            "org.apache.kafka.connect.json.JsonDeserializer",
+            "org.apache.kafka.connect.json.JsonSchema",
+            "org.apache.kafka.connect.json.JsonSerializer"
+        );
+        for (String clazz : jsonConverterClasses) {
+            assertTrue(
+                clazz + " from 'json' is not loaded in isolation but should be",
+                PluginUtils.shouldLoadInIsolation(clazz)
+            );
+        }
+    }
+
+    @Test
+    public void testAllowedFileConnectors() {
+        List<String> jsonConverterClasses = Arrays.asList(
+            "org.apache.kafka.connect.file.",
+            "org.apache.kafka.connect.file.FileStreamSinkConnector",
+            "org.apache.kafka.connect.file.FileStreamSinkTask",
+            "org.apache.kafka.connect.file.FileStreamSourceConnector",
+            "org.apache.kafka.connect.file.FileStreamSourceTask"
+        );
+        for (String clazz : jsonConverterClasses) {
+            assertTrue(
+                clazz + " from 'file' is not loaded in isolation but should be",
+                PluginUtils.shouldLoadInIsolation(clazz)
+            );
+        }
+    }
+
+    @Test
+    public void testAllowedBasicAuthExtensionClasses() {
+        List<String> basicAuthExtensionClasses = Arrays.asList(
             "org.apache.kafka.connect.rest.basic.auth.extension.BasicAuthSecurityRestExtension"
-        ));
+            //"org.apache.kafka.connect.rest.basic.auth.extension.JaasBasicAuthFilter", TODO fix?
+            //"org.apache.kafka.connect.rest.basic.auth.extension.PropertyFileLoginModule" TODO fix?
+        );
+        for (String clazz : basicAuthExtensionClasses) {
+            assertTrue(
+                clazz + " from 'basic-auth-extension' is not loaded in isolation but should be",
+                PluginUtils.shouldLoadInIsolation(clazz)
+            );
+        }
     }
 
     @Test
@@ -186,25 +356,6 @@ public class PluginUtilsTest {
     }
 
     @Test
-    public void testConnectorClientConfigOverridePolicy() {
-        assertFalse(PluginUtils.shouldLoadInIsolation(
-                "org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy")
-        );
-        assertTrue(PluginUtils.shouldLoadInIsolation(
-                "org.apache.kafka.connect.connector.policy.AbstractConnectorClientConfigOverridePolicy")
-        );
-        assertTrue(PluginUtils.shouldLoadInIsolation(
-                "org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy")
-        );
-        assertTrue(PluginUtils.shouldLoadInIsolation(
-                "org.apache.kafka.connect.connector.policy.NoneConnectorClientConfigOverridePolicy")
-        );
-        assertTrue(PluginUtils.shouldLoadInIsolation(
-                "org.apache.kafka.connect.connector.policy.PrincipalConnectorClientConfigOverridePolicy")
-        );
-    }
-
-    @Test
     public void testEmptyPluginUrls() throws Exception {
         assertEquals(Collections.<Path>emptyList(), PluginUtils.pluginUrls(pluginPath));
     }