You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "gharris1727 (via GitHub)" <gi...@apache.org> on 2023/05/23 20:45:09 UTC

[GitHub] [kafka] gharris1727 commented on a diff in pull request #13165: KAFKA-14654: Connector classes should statically initialize with plugin classloader

gharris1727 commented on code in PR #13165:
URL: https://github.com/apache/kafka/pull/13165#discussion_r1202962972


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/LoaderSwap.java:
##########
@@ -24,13 +24,32 @@ public class LoaderSwap implements AutoCloseable {
 
     private final ClassLoader savedLoader;
 
-    public LoaderSwap(ClassLoader savedLoader) {
+    public static LoaderSwap use(ClassLoader loader) {
+        ClassLoader savedLoader = compareAndSwapLoaders(loader);
+        try {
+            return new LoaderSwap(savedLoader);
+        } catch (Throwable t) {
+            compareAndSwapLoaders(savedLoader);
+            throw t;
+        }
+    }

Review Comment:
   This is not re-introducing the static logic, it is just refactoring to eliminate the open-ended Plugins.compareAndSwap* methods.
   
   This method is only called in two places: by DelegatingClassLoader.scanPluginPath (before scanning is finished) and Plugins.withClassLoader (after scanning is finished).
   
   I've dropped the visibility and made the DCL call-site mock-able.



##########
connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java:
##########
@@ -89,9 +88,6 @@ public class EmbeddedConnectCluster {
     private final String workerNamePrefix;
     private final AtomicInteger nextWorkerId = new AtomicInteger(0);
     private final EmbeddedConnectClusterAssertions assertions;
-    // we should keep the original class loader and set it back after connector stopped since the connector will change the class loader,
-    // and then, the Mockito will use the unexpected class loader to generate the wrong proxy instance, which makes mock failed
-    private final ClassLoader originalClassLoader = Thread.currentThread().getContextClassLoader();

Review Comment:
   I disagree. I think that this is a symptom of the open-ended context classloader swap having unintended downstream effects. The existing fix is adequate, but is mostly addressing the symptom rather than the problem.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/cli/AbstractConnectCli.java:
##########
@@ -119,36 +120,37 @@ public Connect startConnect(Map<String, String> workerProps, String... extraArgs
 
         log.info("Scanning for plugin classes. This might take a moment ...");
         Plugins plugins = new Plugins(workerProps);
-        plugins.compareAndSwapWithDelegatingLoader();
-        T config = createConfig(workerProps);
-        log.debug("Kafka cluster ID: {}", config.kafkaClusterId());
+        try (LoaderSwap loaderSwap = plugins.withClassLoader(plugins.delegatingLoader())) {

Review Comment:
   I understand that this is a change in semantics, but that change is intentional. After this method completes, operations should not require the delegating loader and should be performed via the Connect handle. That handle only has methods for starting, stopping, and interacting with the REST API, all of which should internally handle setting the context classloader when appropriate.
   
   The reason that I'm changing this is that I think the open-ended swap methods are an anti-pattern, and lead to unexpected behavior later in the caller thread.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org