You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/11/03 22:17:16 UTC

[GitHub] [kafka] gharris1727 commented on a diff in pull request #12817: KAFKA-14346: Remove difficult to mock Plugins.compareAndSwapLoader usages

gharris1727 commented on code in PR #12817:
URL: https://github.com/apache/kafka/pull/12817#discussion_r1013212435


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java:
##########
@@ -155,10 +143,22 @@ public LoaderSwap withClassLoader(ClassLoader loader) {
         }
     }
 
+    public Runnable withClassLoader(ClassLoader classLoader, Runnable operation) {

Review Comment:
   I had a version which did that but felt that it unnecessarily duplicated the other withClassLoader :)



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java:
##########
@@ -421,19 +421,24 @@ private void removeConnectorTasks(String connName) {
     }
 
     private void updateConnectorTasks(String connName) {
-        if (!worker.isRunning(connName)) {
-            log.info("Skipping update of connector {} since it is not running", connName);
-            return;
-        }
+        try {
+            if (!worker.isRunning(connName)) {
+                log.info("Skipping update of connector {} since it is not running", connName);
+                return;
+            }
 
-        List<Map<String, String>> newTaskConfigs = recomputeTaskConfigs(connName);
-        List<Map<String, String>> oldTaskConfigs = configState.allTaskConfigs(connName);
+            List<Map<String, String>> newTaskConfigs = recomputeTaskConfigs(connName);
+            List<Map<String, String>> oldTaskConfigs = configState.allTaskConfigs(connName);
 
-        if (!newTaskConfigs.equals(oldTaskConfigs)) {
-            removeConnectorTasks(connName);
-            List<Map<String, String>> rawTaskConfigs = reverseTransform(connName, configState, newTaskConfigs);
-            configBackingStore.putTaskConfigs(connName, rawTaskConfigs);
-            createConnectorTasks(connName);
+            if (!newTaskConfigs.equals(oldTaskConfigs)) {
+                removeConnectorTasks(connName);
+                List<Map<String, String>> rawTaskConfigs = reverseTransform(connName, configState, newTaskConfigs);
+                configBackingStore.putTaskConfigs(connName, rawTaskConfigs);
+                createConnectorTasks(connName);
+            }
+        } catch (Throwable t) {
+            // TODO: when this throws errors where do they go
+            log.error("Unable to update connector tasks", t);

Review Comment:
   yeah something i need to follow up on but doesn't belong in this PR.
   I had some bad mocks which caused a 1000s wait while this method swallowed the errors.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java:
##########
@@ -116,14 +115,12 @@ public void run() {
         LoggingContext.clear();
 
         try (LoggingContext loggingContext = LoggingContext.forConnector(connName)) {
-            ClassLoader savedLoader = Plugins.compareAndSwapLoaders(loader);

Review Comment:
   I really wanted to make this happen, but it turns out the getter is used by the Worker to switch into the proper classloader for the connector before interacting with the WorkerConnector.
   
   There was an alternative to pull the plugin loader from the Connector object itself, but this same alternative did not exist for the WorkerTask, as the `task` field is only a member of the subclasses, and not of the WorkerTask itself.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org