You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "gharris1727 (via GitHub)" <gi...@apache.org> on 2023/01/25 18:05:37 UTC

[GitHub] [kafka] gharris1727 opened a new pull request, #13165: KAFKA-14654: Connector classes should statically initialize with plugin classloader

gharris1727 opened a new pull request, #13165:
URL: https://github.com/apache/kafka/pull/13165

   The scanPluginPath -> getPluginDesc -> versionFor code path instantiates connectors in order to evaluate their version() method. This is the first call to initialize these classes, and so performs static initialization, which may be sensitive to the Thread Context Classloader. Currently the TCCL is just the app class loader, which may prevent the connector from discovering isolated resources.
   
   Instead, move the loader swap in getServiceLoaderPluginDesc out to scanPluginPath, in order to cover both the service-loaded and reflections-loaded classes, and in particular, initialize connectors with the correct TCCL.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] gharris1727 commented on a diff in pull request #13165: KAFKA-14654: Connector classes should statically initialize with plugin classloader

Posted by "gharris1727 (via GitHub)" <gi...@apache.org>.
gharris1727 commented on code in PR #13165:
URL: https://github.com/apache/kafka/pull/13165#discussion_r1204482479


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/LoaderSwap.java:
##########
@@ -24,13 +24,32 @@ public class LoaderSwap implements AutoCloseable {
 
     private final ClassLoader savedLoader;
 
-    public LoaderSwap(ClassLoader savedLoader) {
+    public static LoaderSwap use(ClassLoader loader) {
+        ClassLoader savedLoader = compareAndSwapLoaders(loader);
+        try {
+            return new LoaderSwap(savedLoader);
+        } catch (Throwable t) {
+            compareAndSwapLoaders(savedLoader);
+            throw t;
+        }
+    }

Review Comment:
   I've reverted this change and left Plugins.compareAndSwapLoader unchanged.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/cli/AbstractConnectCli.java:
##########
@@ -119,36 +120,37 @@ public Connect startConnect(Map<String, String> workerProps, String... extraArgs
 
         log.info("Scanning for plugin classes. This might take a moment ...");
         Plugins plugins = new Plugins(workerProps);
-        plugins.compareAndSwapWithDelegatingLoader();
-        T config = createConfig(workerProps);
-        log.debug("Kafka cluster ID: {}", config.kafkaClusterId());
+        try (LoaderSwap loaderSwap = plugins.withClassLoader(plugins.delegatingLoader())) {

Review Comment:
   I've reverted this change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] gharris1727 commented on a diff in pull request #13165: KAFKA-14654: Connector classes should statically initialize with plugin classloader

Posted by "gharris1727 (via GitHub)" <gi...@apache.org>.
gharris1727 commented on code in PR #13165:
URL: https://github.com/apache/kafka/pull/13165#discussion_r1163148455


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java:
##########
@@ -159,13 +143,7 @@ public ClassLoader compareAndSwapWithDelegatingLoader() {
      * @return A {@link LoaderSwap} handle which restores the prior classloader on {@link LoaderSwap#close()}.
      */
     public LoaderSwap withClassLoader(ClassLoader loader) {

Review Comment:
   I think I would be in favor of this change if we could eliminate or reduce the visibility of the delegatingLoader(), but there are a couple of non-trivial usages in tests which need a ClassLoader to use when constructing worker tasks. I don't think that two call-sites are enough to justify an extra method that makes the delegatingLoader into a public method that is only used in tests.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mukkachaitanya commented on a diff in pull request #13165: KAFKA-14654: Connector classes should statically initialize with plugin classloader

Posted by "mukkachaitanya (via GitHub)" <gi...@apache.org>.
mukkachaitanya commented on code in PR #13165:
URL: https://github.com/apache/kafka/pull/13165#discussion_r1162729307


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java:
##########
@@ -159,13 +143,7 @@ public ClassLoader compareAndSwapWithDelegatingLoader() {
      * @return A {@link LoaderSwap} handle which restores the prior classloader on {@link LoaderSwap#close()}.
      */
     public LoaderSwap withClassLoader(ClassLoader loader) {

Review Comment:
   minor suggestion. Given we swap with delegating loader in most of the places we changed, if it would useful to create a new function:
   ```java
   public LoaderSwap withDelegatingClassLoader() {
       return withClassLoader(delegatingLoader());
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mukkachaitanya commented on a diff in pull request #13165: KAFKA-14654: Connector classes should statically initialize with plugin classloader

Posted by "mukkachaitanya (via GitHub)" <gi...@apache.org>.
mukkachaitanya commented on code in PR #13165:
URL: https://github.com/apache/kafka/pull/13165#discussion_r1161593044


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java:
##########
@@ -360,17 +360,22 @@ private PluginScanResult scanPluginPath(
         builder.useParallelExecutor();
         Reflections reflections = new InternalReflections(builder);
 
-        return new PluginScanResult(
-                getPluginDesc(reflections, SinkConnector.class, loader),
-                getPluginDesc(reflections, SourceConnector.class, loader),
-                getPluginDesc(reflections, Converter.class, loader),
-                getPluginDesc(reflections, HeaderConverter.class, loader),
-                getTransformationPluginDesc(loader, reflections),
-                getPredicatePluginDesc(loader, reflections),
-                getServiceLoaderPluginDesc(ConfigProvider.class, loader),
-                getServiceLoaderPluginDesc(ConnectRestExtension.class, loader),
-                getServiceLoaderPluginDesc(ConnectorClientConfigOverridePolicy.class, loader)
-        );
+        ClassLoader savedLoader = Plugins.compareAndSwapLoaders(loader);

Review Comment:
   `Plugins#withClassLoader` would be a nice touch to do these repetitive swaps; however, it doesn't seem to be a static method. I don't see a strong reason why it's not. If it's not too much out of the scope of this PR, can we make it static and use it to make the code cleaner? 
   
   Alternatively, if there are several instances where the code benefits form using the new static method we can tackle it with another refactor PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on a diff in pull request #13165: KAFKA-14654: Connector classes should statically initialize with plugin classloader

Posted by "C0urante (via GitHub)" <gi...@apache.org>.
C0urante commented on code in PR #13165:
URL: https://github.com/apache/kafka/pull/13165#discussion_r1204337446


##########
connect/runtime/src/main/java/org/apache/kafka/connect/cli/AbstractConnectCli.java:
##########
@@ -119,36 +120,37 @@ public Connect startConnect(Map<String, String> workerProps, String... extraArgs
 
         log.info("Scanning for plugin classes. This might take a moment ...");
         Plugins plugins = new Plugins(workerProps);
-        plugins.compareAndSwapWithDelegatingLoader();
-        T config = createConfig(workerProps);
-        log.debug("Kafka cluster ID: {}", config.kafkaClusterId());
+        try (LoaderSwap loaderSwap = plugins.withClassLoader(plugins.delegatingLoader())) {

Review Comment:
   I think it's brittle to change the context classloader back. Currently there's no additional logic that requires it, but we have a choice between adding the potential for bugs related to the context classloader and not adding it.
   
   I get that the approach on trunk requires special treatment for integration tests, but since that's already a solved problem, I'd prefer to keep things as they are, especially since it's preferable to keep the risk in the testing portions of the code base over the main parts.



##########
connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java:
##########
@@ -89,9 +88,6 @@ public class EmbeddedConnectCluster {
     private final String workerNamePrefix;
     private final AtomicInteger nextWorkerId = new AtomicInteger(0);
     private final EmbeddedConnectClusterAssertions assertions;
-    // we should keep the original class loader and set it back after connector stopped since the connector will change the class loader,
-    // and then, the Mockito will use the unexpected class loader to generate the wrong proxy instance, which makes mock failed
-    private final ClassLoader originalClassLoader = Thread.currentThread().getContextClassLoader();

Review Comment:
   (Discussed above)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] gharris1727 commented on a diff in pull request #13165: KAFKA-14654: Connector classes should statically initialize with plugin classloader

Posted by "gharris1727 (via GitHub)" <gi...@apache.org>.
gharris1727 commented on code in PR #13165:
URL: https://github.com/apache/kafka/pull/13165#discussion_r1161929533


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java:
##########
@@ -360,17 +360,22 @@ private PluginScanResult scanPluginPath(
         builder.useParallelExecutor();
         Reflections reflections = new InternalReflections(builder);
 
-        return new PluginScanResult(
-                getPluginDesc(reflections, SinkConnector.class, loader),
-                getPluginDesc(reflections, SourceConnector.class, loader),
-                getPluginDesc(reflections, Converter.class, loader),
-                getPluginDesc(reflections, HeaderConverter.class, loader),
-                getTransformationPluginDesc(loader, reflections),
-                getPredicatePluginDesc(loader, reflections),
-                getServiceLoaderPluginDesc(ConfigProvider.class, loader),
-                getServiceLoaderPluginDesc(ConnectRestExtension.class, loader),
-                getServiceLoaderPluginDesc(ConnectorClientConfigOverridePolicy.class, loader)
-        );
+        ClassLoader savedLoader = Plugins.compareAndSwapLoaders(loader);

Review Comment:
   > I don't see a strong reason why it's not [static].
   
   It's non-static to make mocking easier. Rather than having to mock a static method of a class, you mock the Plugins instance, and stub out the loader swapping functionality.
   
   It appears that there are only a handful of places where compareAndSwapLoaders (and compareAndSwapWithDelegatingLoader) is used:
   * In DelegatingClassLoader, during initialization
   * In AbstractConnectCli and MirrorMaker to swap to the delegating classloader
   * In EmbeddedConnectCluster to swap back to the saved loader (KAFKA-12229)
   
   I think that the EmbeddedConnectCluster call-site is just a result of the open-ended delegating swaps. I'll refactor all of these call-sites to use LoaderSwap, and hide the more dangerous compareAndSwapLoaders now that only LoaderSwap is using it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] gharris1727 commented on pull request #13165: KAFKA-14654: Connector classes should statically initialize with plugin classloader

Posted by "gharris1727 (via GitHub)" <gi...@apache.org>.
gharris1727 commented on PR #13165:
URL: https://github.com/apache/kafka/pull/13165#issuecomment-1560240995

   > I'm wondering if we can get better coverage for DelegatingClassLoader::scanPluginPath. Right now we verify in PluginsTest::newConnectorShouldInstantiateWithPluginClassLoader that if we've initialized a Plugins instance, and we invoke Plugins::newConnector, the constructor for that connector is called with the correct context classloader. But it seems like this isn't very powerful since, if the constructor is invoked multiple times, the last invocation's classloader will be recorded--so in this case, we're really testing Plugins::newConnector and not the instantiations that are performed during plugin discovery.
   
   Yeah this is a blind-spot in the existing tests. The "sampling" paradigm requires an instance of the object in order to perform the assertions, and the scanPluginPath implementation throws away the objects that it creates. The test does not and cannot assert that the TCCL is correct for the first version() call, for example.
   
   In this specific case the regression test is still sensitive, because the static initialization happens when the plugin constructor is first called (not when the Class<?> object is created). This means that we can assert the TCCL used in the first constructor via the staticClassloader inspection.
   
   I think the alternative would involve mocking/spying part of the scanPluginPath (such as versionFor), or keeping track of instantiated objects in SamplingTestPlugins, both of which seem messy, and would make this harder to refactor in the near future. Do you think this should be addressed now, or can it wait until the plugin path scanning refactor is landed?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante merged pull request #13165: KAFKA-14654: Connector classes should statically initialize with plugin classloader

Posted by "C0urante (via GitHub)" <gi...@apache.org>.
C0urante merged PR #13165:
URL: https://github.com/apache/kafka/pull/13165


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on a diff in pull request #13165: KAFKA-14654: Connector classes should statically initialize with plugin classloader

Posted by "C0urante (via GitHub)" <gi...@apache.org>.
C0urante commented on code in PR #13165:
URL: https://github.com/apache/kafka/pull/13165#discussion_r1204336495


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/LoaderSwap.java:
##########
@@ -24,13 +24,32 @@ public class LoaderSwap implements AutoCloseable {
 
     private final ClassLoader savedLoader;
 
-    public LoaderSwap(ClassLoader savedLoader) {
+    public static LoaderSwap use(ClassLoader loader) {
+        ClassLoader savedLoader = compareAndSwapLoaders(loader);
+        try {
+            return new LoaderSwap(savedLoader);
+        } catch (Throwable t) {
+            compareAndSwapLoaders(savedLoader);
+            throw t;
+        }
+    }

Review Comment:
   This does still involve static logic for classloader swapping, though. And the comment about internal use doesn't seem very helpful since the way we use that term ("internal") has to do with public vs. private API; it's not really clear to people that (or why) they shouldn't just upgrade the visibility to public.
   
   Ultimately I'd prefer to see this logic duplicated in two places (`DelegatingClassLoader::withClassLoader` and `Plugins::withCLassLoader`) rather than introduce a new API that might be misused in the future.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] gharris1727 commented on a diff in pull request #13165: KAFKA-14654: Connector classes should statically initialize with plugin classloader

Posted by "gharris1727 (via GitHub)" <gi...@apache.org>.
gharris1727 commented on code in PR #13165:
URL: https://github.com/apache/kafka/pull/13165#discussion_r1202962972


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/LoaderSwap.java:
##########
@@ -24,13 +24,32 @@ public class LoaderSwap implements AutoCloseable {
 
     private final ClassLoader savedLoader;
 
-    public LoaderSwap(ClassLoader savedLoader) {
+    public static LoaderSwap use(ClassLoader loader) {
+        ClassLoader savedLoader = compareAndSwapLoaders(loader);
+        try {
+            return new LoaderSwap(savedLoader);
+        } catch (Throwable t) {
+            compareAndSwapLoaders(savedLoader);
+            throw t;
+        }
+    }

Review Comment:
   This is not re-introducing the static logic, it is just refactoring to eliminate the open-ended Plugins.compareAndSwap* methods.
   
   This method is only called in two places: by DelegatingClassLoader.scanPluginPath (before scanning is finished) and Plugins.withClassLoader (after scanning is finished).
   
   I've dropped the visibility and made the DCL call-site mock-able.



##########
connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java:
##########
@@ -89,9 +88,6 @@ public class EmbeddedConnectCluster {
     private final String workerNamePrefix;
     private final AtomicInteger nextWorkerId = new AtomicInteger(0);
     private final EmbeddedConnectClusterAssertions assertions;
-    // we should keep the original class loader and set it back after connector stopped since the connector will change the class loader,
-    // and then, the Mockito will use the unexpected class loader to generate the wrong proxy instance, which makes mock failed
-    private final ClassLoader originalClassLoader = Thread.currentThread().getContextClassLoader();

Review Comment:
   I disagree. I think that this is a symptom of the open-ended context classloader swap having unintended downstream effects. The existing fix is adequate, but is mostly addressing the symptom rather than the problem.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/cli/AbstractConnectCli.java:
##########
@@ -119,36 +120,37 @@ public Connect startConnect(Map<String, String> workerProps, String... extraArgs
 
         log.info("Scanning for plugin classes. This might take a moment ...");
         Plugins plugins = new Plugins(workerProps);
-        plugins.compareAndSwapWithDelegatingLoader();
-        T config = createConfig(workerProps);
-        log.debug("Kafka cluster ID: {}", config.kafkaClusterId());
+        try (LoaderSwap loaderSwap = plugins.withClassLoader(plugins.delegatingLoader())) {

Review Comment:
   I understand that this is a change in semantics, but that change is intentional. After this method completes, operations should not require the delegating loader and should be performed via the Connect handle. That handle only has methods for starting, stopping, and interacting with the REST API, all of which should internally handle setting the context classloader when appropriate.
   
   The reason that I'm changing this is that I think the open-ended swap methods are an anti-pattern, and lead to unexpected behavior later in the caller thread.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] gharris1727 commented on pull request #13165: KAFKA-14654: Connector classes should statically initialize with plugin classloader

Posted by "gharris1727 (via GitHub)" <gi...@apache.org>.
gharris1727 commented on PR #13165:
URL: https://github.com/apache/kafka/pull/13165#issuecomment-1561615543

   @C0urante I added a static list to all of the Sampling plugins that allow us to inspect the classloader used for all method calls to all instances of each plugin type. This should now perform the assertions you were describing.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on a diff in pull request #13165: KAFKA-14654: Connector classes should statically initialize with plugin classloader

Posted by "C0urante (via GitHub)" <gi...@apache.org>.
C0urante commented on code in PR #13165:
URL: https://github.com/apache/kafka/pull/13165#discussion_r1202903632


##########
connect/runtime/src/main/java/org/apache/kafka/connect/cli/AbstractConnectCli.java:
##########
@@ -119,36 +120,37 @@ public Connect startConnect(Map<String, String> workerProps, String... extraArgs
 
         log.info("Scanning for plugin classes. This might take a moment ...");
         Plugins plugins = new Plugins(workerProps);
-        plugins.compareAndSwapWithDelegatingLoader();
-        T config = createConfig(workerProps);
-        log.debug("Kafka cluster ID: {}", config.kafkaClusterId());
+        try (LoaderSwap loaderSwap = plugins.withClassLoader(plugins.delegatingLoader())) {

Review Comment:
   This is actually incorrect; we want the delegating loader to remain the classloader even after this method exits (normally or exceptionally).



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/LoaderSwap.java:
##########
@@ -24,13 +24,32 @@ public class LoaderSwap implements AutoCloseable {
 
     private final ClassLoader savedLoader;
 
-    public LoaderSwap(ClassLoader savedLoader) {
+    public static LoaderSwap use(ClassLoader loader) {
+        ClassLoader savedLoader = compareAndSwapLoaders(loader);
+        try {
+            return new LoaderSwap(savedLoader);
+        } catch (Throwable t) {
+            compareAndSwapLoaders(savedLoader);
+            throw t;
+        }
+    }

Review Comment:
   Adding static logic that invokes `compareAndSwapLoaders` is difficult to test, which was the motivation for KAFKA-14346. Can we try not to re-introduce that kind of static logic?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java:
##########
@@ -360,17 +360,19 @@ private PluginScanResult scanPluginPath(
         builder.useParallelExecutor();
         Reflections reflections = new InternalReflections(builder);
 
-        return new PluginScanResult(
-                getPluginDesc(reflections, SinkConnector.class, loader),
-                getPluginDesc(reflections, SourceConnector.class, loader),
-                getPluginDesc(reflections, Converter.class, loader),
-                getPluginDesc(reflections, HeaderConverter.class, loader),
-                getTransformationPluginDesc(loader, reflections),
-                getPredicatePluginDesc(loader, reflections),
-                getServiceLoaderPluginDesc(ConfigProvider.class, loader),
-                getServiceLoaderPluginDesc(ConnectRestExtension.class, loader),
-                getServiceLoaderPluginDesc(ConnectorClientConfigOverridePolicy.class, loader)
-        );
+        try (LoaderSwap loaderSwap = LoaderSwap.use(loader)) {

Review Comment:
   If static initialization logic for a plugin class changes the context classloader, then that will remain the classloader for the rest of the plugin scanning that takes place in this method.
   
   I don't think we have to accommodate this case, but if there's an easy way to, we might try.



##########
connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java:
##########
@@ -89,9 +88,6 @@ public class EmbeddedConnectCluster {
     private final String workerNamePrefix;
     private final AtomicInteger nextWorkerId = new AtomicInteger(0);
     private final EmbeddedConnectClusterAssertions assertions;
-    // we should keep the original class loader and set it back after connector stopped since the connector will change the class loader,
-    // and then, the Mockito will use the unexpected class loader to generate the wrong proxy instance, which makes mock failed
-    private final ClassLoader originalClassLoader = Thread.currentThread().getContextClassLoader();

Review Comment:
   I think we need to keep this since the change to `AbstractConnectCli::startConnect` is incorrect.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] gharris1727 commented on a diff in pull request #13165: KAFKA-14654: Connector classes should statically initialize with plugin classloader

Posted by "gharris1727 (via GitHub)" <gi...@apache.org>.
gharris1727 commented on code in PR #13165:
URL: https://github.com/apache/kafka/pull/13165#discussion_r1203138122


##########
connect/runtime/src/main/java/org/apache/kafka/connect/cli/AbstractConnectCli.java:
##########
@@ -119,36 +120,37 @@ public Connect startConnect(Map<String, String> workerProps, String... extraArgs
 
         log.info("Scanning for plugin classes. This might take a moment ...");
         Plugins plugins = new Plugins(workerProps);
-        plugins.compareAndSwapWithDelegatingLoader();
-        T config = createConfig(workerProps);
-        log.debug("Kafka cluster ID: {}", config.kafkaClusterId());
+        try (LoaderSwap loaderSwap = plugins.withClassLoader(plugins.delegatingLoader())) {

Review Comment:
   Also here is some of the context for this change: https://github.com/apache/kafka/pull/13165#discussion_r1161929533
   
   Since the elimination of compareAndSwap is technically unrelated to the title change, it could be moved out to it's own PR. Let me know if you'd like me to separate the two changes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on pull request #13165: KAFKA-14654: Connector classes should statically initialize with plugin classloader

Posted by "C0urante (via GitHub)" <gi...@apache.org>.
C0urante commented on PR #13165:
URL: https://github.com/apache/kafka/pull/13165#issuecomment-1561331016

   That's a good point about the static initialization taking place directly before the constructor right now, but it's possible that other logic either directly from the Connect framework or from the Reflections library can cause static initialization to take place earlier than then.
   
   I was thinking we could statically track context classloader instances for the `SamplingConnector` class across instantiations of that class, and then perform assertion on all of those instances about the correct classloader being set. This wouldn't give us perfect coverage across all plugin (or plugin discovery) types, but would at least harden us against changes to plugin discovery logic.
   
   I have a local draft of this that I'd be happy to share if it's too much work. It's certainly not as clean as the existing


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org