You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/11/03 02:24:21 UTC

[GitHub] [kafka] gharris1727 opened a new pull request, #12817: KAFKA-14346: Remove difficult to mock Plugins.compareAndSwapLoader usages

gharris1727 opened a new pull request, #12817:
URL: https://github.com/apache/kafka/pull/12817

   The current dominant compareAndSwapClassLoader usage idiom in the codebase is as follows:
   ```
   ClassLoader savedLoader = plugins.compareAndSwapLoaders(loader);
   try {
        // operations which are sensitive to the thread context classloader
   } finally {
       Plugins.compareAndSwapLoader(savedLoader);
   }
   ```
   This is difficult to mock in tests due to the static method call used to swap back to the previous classloader.
   Instead, this operation should be managed by a non-static try-with-resources call which is more easily mocked:
   ```
   try (LoaderSwap loaderSwap = plugins.withClassLoader(loader)) {
       // operations which are sensitive to the thread context classloader
   }
   ```
   This should also be less error-prone, as there is only one statement needed to safely create a special classloading context, rather than two.
   I also experimented with combining the plugins.connectorLoader(String) functionality into a similar withClassLoader() call, but omitted this from the PR as it complicated error handling. As-is, the withClassLoader call should not fail due to missing plugins, whereas the connectorLoader class will intentionally fail if a connector is not found.
   
   In addition to the title change, also clean up some unnecessary calls to Plugins::delegatingLoader, a now unused Plugins::currentThreadLoader, and a now trivial Worker::executeStateTransition. And remove the static mocking from the tests which was the impetus for this change.
   
   Also, moved the WorkerConnector/WorkerTask thread classloader ownership out to the Worker, as all other thread classloader management was already handled from the worker side. Mechanically, this is because a WorkerConnector/WorkerTask does not have access to the Plugins object of the worker, and is effectively in a single-plugin environment. Rather than inject the Plugins instance into each of the WorkerConnector/WorkerTask, I chose to pull the isolation functionality out of the WorkerConnector/WorkerTask entirely.
   
   Signed-off-by: Greg Harris <gr...@aiven.io>
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] gharris1727 commented on a diff in pull request #12817: KAFKA-14346: Remove difficult to mock Plugins.compareAndSwapLoader usages

Posted by GitBox <gi...@apache.org>.
gharris1727 commented on code in PR #12817:
URL: https://github.com/apache/kafka/pull/12817#discussion_r1014264009


##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java:
##########
@@ -1957,6 +1789,61 @@ private void verifyTaskHeaderConverter() {
         verify(plugins).newHeaderConverter(any(AbstractConfig.class), eq(WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG), eq(ClassLoaderUsage.CURRENT_CLASSLOADER));
     }
 
+    private void mockGenericIsolation() {
+        when(plugins.connectorLoader(anyString())).thenReturn(pluginLoader);
+        when(plugins.withClassLoader(pluginLoader)).thenReturn(loaderSwap);
+    }
+
+    private void verifyGenericIsolation() {
+        verify(plugins, atLeastOnce()).withClassLoader(pluginLoader);
+        verify(loaderSwap, atLeastOnce()).close();
+    }
+
+    private void mockConnectorIsolation(String connectorClass, Connector connector) {
+        mockGenericIsolation();
+        when(plugins.newConnector(connectorClass)).thenReturn(connector);
+        when(connector.version()).thenReturn("1.0");
+    }
+
+    private void verifyConnectorIsolation(Connector connector) {
+        verifyGenericIsolation();
+        verify(plugins).newConnector(anyString());
+        verify(connector, atLeastOnce()).version();
+    }
+
+    private void mockTaskIsolation(Class<? extends Connector> connector, Class<? extends Task> taskClass, Task task) {
+        mockGenericIsolation();
+        doReturn(connector).when(plugins).connectorClass(connector.getName());
+        when(plugins.newTask(taskClass)).thenReturn(task);
+        when(task.version()).thenReturn("1.0");
+    }
+
+    private void verifyTaskIsolation(Task task) {
+        verifyGenericIsolation();
+        verify(plugins).connectorClass(anyString());
+        verify(plugins).newTask(any());
+        verify(task).version();
+    }
+
+    private void mockExecutorSubmit(boolean startRunnable) {

Review Comment:
   I split the methods out while keeping the sometimes-mocked-sometimes-not executorService. If using the real/fake executor service and the real/fake mock in sync is too tedious, we can follow this up with a cleanup that removes the real executor tests and always injects a managed executor service.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] gharris1727 commented on a diff in pull request #12817: KAFKA-14346: Remove difficult to mock Plugins.compareAndSwapLoader usages

Posted by GitBox <gi...@apache.org>.
gharris1727 commented on code in PR #12817:
URL: https://github.com/apache/kafka/pull/12817#discussion_r1013212435


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java:
##########
@@ -155,10 +143,22 @@ public LoaderSwap withClassLoader(ClassLoader loader) {
         }
     }
 
+    public Runnable withClassLoader(ClassLoader classLoader, Runnable operation) {

Review Comment:
   I had a version which did that but felt that it unnecessarily duplicated the other withClassLoader :)



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java:
##########
@@ -421,19 +421,24 @@ private void removeConnectorTasks(String connName) {
     }
 
     private void updateConnectorTasks(String connName) {
-        if (!worker.isRunning(connName)) {
-            log.info("Skipping update of connector {} since it is not running", connName);
-            return;
-        }
+        try {
+            if (!worker.isRunning(connName)) {
+                log.info("Skipping update of connector {} since it is not running", connName);
+                return;
+            }
 
-        List<Map<String, String>> newTaskConfigs = recomputeTaskConfigs(connName);
-        List<Map<String, String>> oldTaskConfigs = configState.allTaskConfigs(connName);
+            List<Map<String, String>> newTaskConfigs = recomputeTaskConfigs(connName);
+            List<Map<String, String>> oldTaskConfigs = configState.allTaskConfigs(connName);
 
-        if (!newTaskConfigs.equals(oldTaskConfigs)) {
-            removeConnectorTasks(connName);
-            List<Map<String, String>> rawTaskConfigs = reverseTransform(connName, configState, newTaskConfigs);
-            configBackingStore.putTaskConfigs(connName, rawTaskConfigs);
-            createConnectorTasks(connName);
+            if (!newTaskConfigs.equals(oldTaskConfigs)) {
+                removeConnectorTasks(connName);
+                List<Map<String, String>> rawTaskConfigs = reverseTransform(connName, configState, newTaskConfigs);
+                configBackingStore.putTaskConfigs(connName, rawTaskConfigs);
+                createConnectorTasks(connName);
+            }
+        } catch (Throwable t) {
+            // TODO: when this throws errors where do they go
+            log.error("Unable to update connector tasks", t);

Review Comment:
   yeah something i need to follow up on but doesn't belong in this PR.
   I had some bad mocks which caused a 1000s wait while this method swallowed the errors.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java:
##########
@@ -116,14 +115,12 @@ public void run() {
         LoggingContext.clear();
 
         try (LoggingContext loggingContext = LoggingContext.forConnector(connName)) {
-            ClassLoader savedLoader = Plugins.compareAndSwapLoaders(loader);

Review Comment:
   I really wanted to make this happen, but it turns out the getter is used by the Worker to switch into the proper classloader for the connector before interacting with the WorkerConnector.
   
   There was an alternative to pull the plugin loader from the Connector object itself, but this same alternative did not exist for the WorkerTask, as the `task` field is only a member of the subclasses, and not of the WorkerTask itself.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante merged pull request #12817: KAFKA-14346: Remove difficult to mock Plugins.compareAndSwapLoader usages

Posted by GitBox <gi...@apache.org>.
C0urante merged PR #12817:
URL: https://github.com/apache/kafka/pull/12817


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on a diff in pull request #12817: KAFKA-14346: Remove difficult to mock Plugins.compareAndSwapLoader usages

Posted by GitBox <gi...@apache.org>.
C0urante commented on code in PR #12817:
URL: https://github.com/apache/kafka/pull/12817#discussion_r1014046291


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java:
##########
@@ -116,14 +115,12 @@ public void run() {
         LoggingContext.clear();
 
         try (LoggingContext loggingContext = LoggingContext.forConnector(connName)) {
-            ClassLoader savedLoader = Plugins.compareAndSwapLoaders(loader);

Review Comment:
   I guess one alternative could be to get the name of the connector from the `WorkerConnector`/`WorkerTask` object, then look up its config, then get the connector class out of the config, then use `Plugins::connectorLoader` to get the loader. But that's a bit convoluted and, if we ever want to add support for running multiple versions of the same connector at the same time, it'd probably be handy to have each `WorkerConnector`/`WorkerTask` object know its own class loader.
   
   Alright, we can leave this as-is for now 👍



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on a diff in pull request #12817: KAFKA-14346: Remove difficult to mock Plugins.compareAndSwapLoader usages

Posted by GitBox <gi...@apache.org>.
C0urante commented on code in PR #12817:
URL: https://github.com/apache/kafka/pull/12817#discussion_r1014046291


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java:
##########
@@ -116,14 +115,12 @@ public void run() {
         LoggingContext.clear();
 
         try (LoggingContext loggingContext = LoggingContext.forConnector(connName)) {
-            ClassLoader savedLoader = Plugins.compareAndSwapLoaders(loader);

Review Comment:
   I guess one alternative could be to get the name of the connector from the `WorkerConnector`/`WorkerTask` object, then look up its config, then get the connector class out of the config, then use `Plugins::connectorLoader` to get the loader. But that's a bit convoluted and, if we ever want to add support for running multiple versions of the same connector at the same time, it'd probably be handy to have each `WorkerConnector`/`WorkerTask` object know its own class loader.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on a diff in pull request #12817: KAFKA-14346: Remove difficult to mock Plugins.compareAndSwapLoader usages

Posted by GitBox <gi...@apache.org>.
C0urante commented on code in PR #12817:
URL: https://github.com/apache/kafka/pull/12817#discussion_r1012953885


##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java:
##########
@@ -147,6 +148,7 @@ public class AbstractHerderTest {
     @Mock private ConfigBackingStore configStore;
     @Mock private StatusBackingStore statusStore;
     @Mock private ClassLoader classLoader;
+    @Mock private LoaderSwap loaderSwap;
     @Mock private Plugins plugins;
 
     private ClassLoader loader;

Review Comment:
   Can we remove the `@Before` and `@After` methods now that we know that these tests aren't going to overwrite the context class loader?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java:
##########
@@ -116,14 +115,12 @@ public void run() {
         LoggingContext.clear();
 
         try (LoggingContext loggingContext = LoggingContext.forConnector(connName)) {
-            ClassLoader savedLoader = Plugins.compareAndSwapLoaders(loader);

Review Comment:
   Would it also be reasonable to get rid of the `loader` constructor parameter/field/getter method as well, since we no longer actually use that field in this class?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##########
@@ -1149,31 +1118,21 @@ public void setTargetState(String connName, TargetState state, Callback<TargetSt
 
         WorkerConnector workerConnector = connectors.get(connName);
         if (workerConnector != null) {
-            ClassLoader connectorLoader =
-                    plugins.delegatingLoader().connectorLoader(workerConnector.connector());
-            executeStateTransition(
-                () -> workerConnector.transitionTo(state, stateChangeCallback),
-                connectorLoader);
+            try (LoaderSwap loaderSwap = plugins.withClassLoader(workerConnector.loader())) {
+                workerConnector.transitionTo(state, stateChangeCallback);
+            }
         }
 
         for (Map.Entry<ConnectorTaskId, WorkerTask> taskEntry : tasks.entrySet()) {
             if (taskEntry.getKey().connector().equals(connName)) {
                 WorkerTask workerTask = taskEntry.getValue();
-                executeStateTransition(() -> workerTask.transitionTo(state), workerTask.loader);
+                try (LoaderSwap loaderSwap = plugins.withClassLoader(workerTask.loader())) {
+                    workerTask.transitionTo(state);
+                }
             }
         }
     }
 
-    private void executeStateTransition(Runnable stateTransition, ClassLoader loader) {
-        ClassLoader savedLoader = plugins.currentThreadLoader();
-        try {
-            savedLoader = Plugins.compareAndSwapLoaders(loader);
-            stateTransition.run();
-        } finally {
-            Plugins.compareAndSwapLoaders(savedLoader);
-        }
-    }

Review Comment:
   Thanks for getting rid of this, it was a bit of a hack 👍



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java:
##########
@@ -155,10 +143,22 @@ public LoaderSwap withClassLoader(ClassLoader loader) {
         }
     }
 
+    public Runnable withClassLoader(ClassLoader classLoader, Runnable operation) {

Review Comment:
   Probably worth adding a Javadoc to this method. I initially thought it would run the `operation` with the given loader, instead of returning a new `Runnable` that wraps it.



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java:
##########
@@ -452,7 +454,8 @@ public void testConfigValidationMissingName() {
         assertEquals(1, infos.get("required").configValue().errors().size());
 
         verify(plugins).newConnector(connectorClass.getName());
-        verify(plugins).compareAndSwapLoaders(connector);
+        verify(plugins).withClassLoader(classLoader);
+        verify(loaderSwap).close();

Review Comment:
   Interesting--we never had coverage to ensure that we swapped back to the original loader? Good to see that added now.



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java:
##########
@@ -300,14 +302,14 @@ public void testStartAndStopConnector() throws Throwable {
         connectorProps.put(CONNECTOR_CLASS_CONFIG, connectorClass);
 
         // Create
-        when(plugins.currentThreadLoader()).thenReturn(delegatingLoader);
-        when(plugins.delegatingLoader()).thenReturn(delegatingLoader);
-        when(delegatingLoader.connectorLoader(connectorClass)).thenReturn(pluginLoader);
+        when(plugins.connectorLoader(connectorClass)).thenReturn(pluginLoader);
         when(plugins.newConnector(connectorClass)).thenReturn(sourceConnector);
         when(sourceConnector.version()).thenReturn("1.0");
 
-        pluginsMockedStatic.when(() -> Plugins.compareAndSwapLoaders(pluginLoader)).thenReturn(delegatingLoader);
-        pluginsMockedStatic.when(() -> Plugins.compareAndSwapLoaders(delegatingLoader)).thenReturn(pluginLoader);
+        when(plugins.withClassLoader(pluginLoader)).thenReturn(loaderSwap);
+        // this test expects the runnable to be run by the executor, make withClassLoader(cl, runnable) a passthrough.
+        ArgumentCaptor<Runnable> runnableCaptor = ArgumentCaptor.forClass(Runnable.class);
+        when(plugins.withClassLoader(same(pluginLoader), runnableCaptor.capture())).thenReturn(() -> runnableCaptor.getValue().run());

Review Comment:
   Worth refactoring into a utility method?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java:
##########
@@ -193,10 +193,6 @@ public PluginClassLoader pluginClassLoader(String name) {
                : null;
     }
 
-    public ClassLoader connectorLoader(Connector connector) {
-        return connectorLoader(connector.getClass().getName());
-    }
-
     public ClassLoader connectorLoader(String connectorClassOrAlias) {

Review Comment:
   Looks like this method is only invoked from `Plugins::connectorLoader`; should we make this package private in order to keep the API clean and make that method the only entrypoint?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java:
##########
@@ -421,19 +421,24 @@ private void removeConnectorTasks(String connName) {
     }
 
     private void updateConnectorTasks(String connName) {
-        if (!worker.isRunning(connName)) {
-            log.info("Skipping update of connector {} since it is not running", connName);
-            return;
-        }
+        try {
+            if (!worker.isRunning(connName)) {
+                log.info("Skipping update of connector {} since it is not running", connName);
+                return;
+            }
 
-        List<Map<String, String>> newTaskConfigs = recomputeTaskConfigs(connName);
-        List<Map<String, String>> oldTaskConfigs = configState.allTaskConfigs(connName);
+            List<Map<String, String>> newTaskConfigs = recomputeTaskConfigs(connName);
+            List<Map<String, String>> oldTaskConfigs = configState.allTaskConfigs(connName);
 
-        if (!newTaskConfigs.equals(oldTaskConfigs)) {
-            removeConnectorTasks(connName);
-            List<Map<String, String>> rawTaskConfigs = reverseTransform(connName, configState, newTaskConfigs);
-            configBackingStore.putTaskConfigs(connName, rawTaskConfigs);
-            createConnectorTasks(connName);
+            if (!newTaskConfigs.equals(oldTaskConfigs)) {
+                removeConnectorTasks(connName);
+                List<Map<String, String>> rawTaskConfigs = reverseTransform(connName, configState, newTaskConfigs);
+                configBackingStore.putTaskConfigs(connName, rawTaskConfigs);
+                createConnectorTasks(connName);
+            }
+        } catch (Throwable t) {
+            // TODO: when this throws errors where do they go
+            log.error("Unable to update connector tasks", t);

Review Comment:
   Left in accidentally?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##########
@@ -320,12 +314,8 @@ public void startConnector(
                         connName, connector, connConfig, ctx, metrics, connectorStatusListener, offsetReader, offsetStore, connectorLoader);
                 log.info("Instantiated connector {} with version {} of type {}", connName, connector.version(), connector.getClass());
                 workerConnector.transitionTo(initialState, onConnectorStateChange);
-                Plugins.compareAndSwapLoaders(savedLoader);
             } catch (Throwable t) {
                 log.error("Failed to start connector {}", connName, t);
-                // Can't be put in a finally block because it needs to be swapped before the call on
-                // statusListener
-                Plugins.compareAndSwapLoaders(savedLoader);

Review Comment:
   Interesting--I didn't realize that objects instantiated in a try-with-resource block were closed before the catch body gets executed. This change should be safe; nice cleanup 👍



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java:
##########
@@ -455,10 +456,10 @@ ConfigInfos validateConnectorConfig(Map<String, String> connectorProps, boolean
 
         Connector connector = getConnector(connType);
         org.apache.kafka.connect.health.ConnectorType connectorType;
-        ClassLoader savedLoader = plugins().compareAndSwapLoaders(connector);
+        ClassLoader connectorLoader = plugins().connectorLoader(connType);
         ConfigDef enrichedConfigDef;
         Map<String, ConfigValue> validatedConnectorConfig;

Review Comment:
   Nit: can we move these inside the `try` block?



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java:
##########
@@ -452,7 +454,8 @@ public void testConfigValidationMissingName() {
         assertEquals(1, infos.get("required").configValue().errors().size());
 
         verify(plugins).newConnector(connectorClass.getName());
-        verify(plugins).compareAndSwapLoaders(connector);
+        verify(plugins).withClassLoader(classLoader);
+        verify(loaderSwap).close();

Review Comment:
   Also, these three lines:
   ```java
   verify(plugins).newConnector(connectorClass.getName());
   verify(plugins).withClassLoader(classLoader);
   verify(loaderSwap).close();
   ```
   are repeated eight times in this test. Maybe refactor into a utility method?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java:
##########
@@ -252,7 +251,6 @@ public void run() {
         LoggingContext.clear();
 
         try (LoggingContext loggingContext = LoggingContext.forTask(id())) {
-            ClassLoader savedLoader = Plugins.compareAndSwapLoaders(loader);

Review Comment:
   Same thought RE getting rid of `loader` from this class entirely



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on a diff in pull request #12817: KAFKA-14346: Remove difficult to mock Plugins.compareAndSwapLoader usages

Posted by GitBox <gi...@apache.org>.
C0urante commented on code in PR #12817:
URL: https://github.com/apache/kafka/pull/12817#discussion_r1014047751


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java:
##########
@@ -155,10 +143,22 @@ public LoaderSwap withClassLoader(ClassLoader loader) {
         }
     }
 
+    public Runnable withClassLoader(ClassLoader classLoader, Runnable operation) {

Review Comment:
   Heh, good call



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on a diff in pull request #12817: KAFKA-14346: Remove difficult to mock Plugins.compareAndSwapLoader usages

Posted by GitBox <gi...@apache.org>.
C0urante commented on code in PR #12817:
URL: https://github.com/apache/kafka/pull/12817#discussion_r1014063487


##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java:
##########
@@ -1957,6 +1789,61 @@ private void verifyTaskHeaderConverter() {
         verify(plugins).newHeaderConverter(any(AbstractConfig.class), eq(WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG), eq(ClassLoaderUsage.CURRENT_CLASSLOADER));
     }
 
+    private void mockGenericIsolation() {
+        when(plugins.connectorLoader(anyString())).thenReturn(pluginLoader);
+        when(plugins.withClassLoader(pluginLoader)).thenReturn(loaderSwap);
+    }
+
+    private void verifyGenericIsolation() {
+        verify(plugins, atLeastOnce()).withClassLoader(pluginLoader);
+        verify(loaderSwap, atLeastOnce()).close();
+    }
+
+    private void mockConnectorIsolation(String connectorClass, Connector connector) {
+        mockGenericIsolation();
+        when(plugins.newConnector(connectorClass)).thenReturn(connector);
+        when(connector.version()).thenReturn("1.0");
+    }
+
+    private void verifyConnectorIsolation(Connector connector) {
+        verifyGenericIsolation();
+        verify(plugins).newConnector(anyString());
+        verify(connector, atLeastOnce()).version();
+    }
+
+    private void mockTaskIsolation(Class<? extends Connector> connector, Class<? extends Task> taskClass, Task task) {
+        mockGenericIsolation();
+        doReturn(connector).when(plugins).connectorClass(connector.getName());
+        when(plugins.newTask(taskClass)).thenReturn(task);
+        when(task.version()).thenReturn("1.0");
+    }
+
+    private void verifyTaskIsolation(Task task) {
+        verifyGenericIsolation();
+        verify(plugins).connectorClass(anyString());
+        verify(plugins).newTask(any());
+        verify(task).version();
+    }
+
+    private void mockExecutorSubmit(boolean startRunnable) {

Review Comment:
   I found this pretty confusing in the case when `startRunnable` is `true`, since we don't actually end up mocking anything on the executor itself, and the implicit expectation in that case is that the `Worker` was actually instantiated with a non-mocked executor.
   
   Anything we can do to make this a bit less of a footgun? Javadoc, a different name, split out the two cases into different methods, mock the executor in both cases, etc.?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on a diff in pull request #12817: KAFKA-14346: Remove difficult to mock Plugins.compareAndSwapLoader usages

Posted by GitBox <gi...@apache.org>.
C0urante commented on code in PR #12817:
URL: https://github.com/apache/kafka/pull/12817#discussion_r1014063487


##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java:
##########
@@ -1957,6 +1789,61 @@ private void verifyTaskHeaderConverter() {
         verify(plugins).newHeaderConverter(any(AbstractConfig.class), eq(WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG), eq(ClassLoaderUsage.CURRENT_CLASSLOADER));
     }
 
+    private void mockGenericIsolation() {
+        when(plugins.connectorLoader(anyString())).thenReturn(pluginLoader);
+        when(plugins.withClassLoader(pluginLoader)).thenReturn(loaderSwap);
+    }
+
+    private void verifyGenericIsolation() {
+        verify(plugins, atLeastOnce()).withClassLoader(pluginLoader);
+        verify(loaderSwap, atLeastOnce()).close();
+    }
+
+    private void mockConnectorIsolation(String connectorClass, Connector connector) {
+        mockGenericIsolation();
+        when(plugins.newConnector(connectorClass)).thenReturn(connector);
+        when(connector.version()).thenReturn("1.0");
+    }
+
+    private void verifyConnectorIsolation(Connector connector) {
+        verifyGenericIsolation();
+        verify(plugins).newConnector(anyString());
+        verify(connector, atLeastOnce()).version();
+    }
+
+    private void mockTaskIsolation(Class<? extends Connector> connector, Class<? extends Task> taskClass, Task task) {
+        mockGenericIsolation();
+        doReturn(connector).when(plugins).connectorClass(connector.getName());
+        when(plugins.newTask(taskClass)).thenReturn(task);
+        when(task.version()).thenReturn("1.0");
+    }
+
+    private void verifyTaskIsolation(Task task) {
+        verifyGenericIsolation();
+        verify(plugins).connectorClass(anyString());
+        verify(plugins).newTask(any());
+        verify(task).version();
+    }
+
+    private void mockExecutorSubmit(boolean startRunnable) {

Review Comment:
   I found this pretty confusing in the case when `startRunnable` is `true`, since we don't actually end up mocking anything on the executor itself, and the implicit expectation in that case is that the `Worker` was actually instantiated with a non-mocked executor.
   
   Anything we can do to make this a bit less of a footgun? Javadoc, a different name, split out the two cases into different methods, mock the executor in either case, etc.?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] gharris1727 commented on a diff in pull request #12817: KAFKA-14346: Remove difficult to mock Plugins.compareAndSwapLoader usages

Posted by GitBox <gi...@apache.org>.
gharris1727 commented on code in PR #12817:
URL: https://github.com/apache/kafka/pull/12817#discussion_r1014266416


##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java:
##########
@@ -1957,6 +1789,61 @@ private void verifyTaskHeaderConverter() {
         verify(plugins).newHeaderConverter(any(AbstractConfig.class), eq(WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG), eq(ClassLoaderUsage.CURRENT_CLASSLOADER));
     }
 
+    private void mockGenericIsolation() {
+        when(plugins.connectorLoader(anyString())).thenReturn(pluginLoader);
+        when(plugins.withClassLoader(pluginLoader)).thenReturn(loaderSwap);
+    }
+
+    private void verifyGenericIsolation() {
+        verify(plugins, atLeastOnce()).withClassLoader(pluginLoader);
+        verify(loaderSwap, atLeastOnce()).close();
+    }
+
+    private void mockConnectorIsolation(String connectorClass, Connector connector) {
+        mockGenericIsolation();
+        when(plugins.newConnector(connectorClass)).thenReturn(connector);
+        when(connector.version()).thenReturn("1.0");
+    }
+
+    private void verifyConnectorIsolation(Connector connector) {
+        verifyGenericIsolation();
+        verify(plugins).newConnector(anyString());
+        verify(connector, atLeastOnce()).version();
+    }
+
+    private void mockTaskIsolation(Class<? extends Connector> connector, Class<? extends Task> taskClass, Task task) {
+        mockGenericIsolation();
+        doReturn(connector).when(plugins).connectorClass(connector.getName());
+        when(plugins.newTask(taskClass)).thenReturn(task);
+        when(task.version()).thenReturn("1.0");
+    }
+
+    private void verifyTaskIsolation(Task task) {
+        verifyGenericIsolation();
+        verify(plugins).connectorClass(anyString());
+        verify(plugins).newTask(any());
+        verify(task).version();
+    }
+
+    private void mockExecutorSubmit(boolean startRunnable) {

Review Comment:
   Also it happened to coincide that the real-executorService was only used with startConnector, and the fake-executorService was only used with startTask. This appeared to be because startTask doesn't have a callback which depends on the execution of the WorkerTask, while startConnector does. If in the future the startTask does have a callback, then we will need to provide real executor functionality for the startTask tests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org