You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/04/16 22:01:47 UTC

[GitHub] [kafka] kpatelatwork opened a new pull request #10549: KAFKA-8605 log an error message when we detect multiple copies of sam…

kpatelatwork opened a new pull request #10549:
URL: https://github.com/apache/kafka/pull/10549


   …e plugin class in the plugin paths
   
   *A plugin like converter or connector can exists multiple times in the plugin-path and user may not realize that one of those copies will be used. The copy that will be used really depends on operating system and how the filesystem gives use the jar listing, therefore we need to log an error message so user is aware of this conflict and can remove the offending copy*
    
   @rhauch @gharris1727 @C0urante  please review 
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] kpatelatwork commented on a change in pull request #10549: KAFKA-8605 log an error message when we detect multiple copies of sam…

Posted by GitBox <gi...@apache.org>.
kpatelatwork commented on a change in pull request #10549:
URL: https://github.com/apache/kafka/pull/10549#discussion_r616208904



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
##########
@@ -196,8 +214,10 @@ private static PluginClassLoader newPluginClassLoader(
                 pluginLoaders.put(pluginClassName, inner);
                 // TODO: once versioning is enabled this line should be moved outside this if branch
                 log.info("Added plugin '{}'", pluginClassName);
+                allAddedPlugins.put(pluginClassName, new ArrayList<>());
             }
             inner.put(plugin, loader);
+            allAddedPlugins.get(pluginClassName).add(plugin);

Review comment:
       reused the same pattern for pluginLoader also.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] kpatelatwork commented on pull request #10549: KAFKA-8605 log an error message when we detect multiple copies of sam…

Posted by GitBox <gi...@apache.org>.
kpatelatwork commented on pull request #10549:
URL: https://github.com/apache/kafka/pull/10549#issuecomment-823575141


   @rhauch Good suggestions, I applied them to the PR. 
   
   When you get time could you please review if they look good?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] kpatelatwork commented on a change in pull request #10549: KAFKA-8605 log an error message when we detect multiple copies of sam…

Posted by GitBox <gi...@apache.org>.
kpatelatwork commented on a change in pull request #10549:
URL: https://github.com/apache/kafka/pull/10549#discussion_r616074220



##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoaderTest.java
##########
@@ -129,4 +133,32 @@ public void testLoadingMixOfValidAndInvalidPlugins() throws Exception {
             assertNotNull(classLoader.pluginClassLoader(pluginClassName));
         }
     }
+
+    @Test
+    public void testAddMultiplePluginsWithSameClass() {

Review comment:
       @C0urante I reworked the tests could you please check now?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] kpatelatwork commented on a change in pull request #10549: KAFKA-8605 log an error message when we detect multiple copies of sam…

Posted by GitBox <gi...@apache.org>.
kpatelatwork commented on a change in pull request #10549:
URL: https://github.com/apache/kafka/pull/10549#discussion_r617005037



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
##########
@@ -208,6 +220,21 @@ protected void initLoaders() {
         // Finally add parent/system loader.
         initPluginLoader(CLASSPATH_NAME);
         addAllAliases();
+        reportPluginConflicts();
+    }
+
+    //visible for testing
+    Set<String> reportPluginConflicts() {
+        return allAddedPlugins.entrySet().stream().filter(e -> e.getValue().size() > 1).map(e -> {
+            String pluginClassName = e.getKey();
+            PluginDesc<?> usedPluginDesc = usedPluginDesc(pluginClassName);
+            List<PluginDesc<?>> ignoredPlugins = new ArrayList<>(e.getValue());
+            ignoredPlugins.remove(usedPluginDesc);
+            log.error("Detected multiple plugins contain '{}'; using plugin {} and ignoring {} plugins ({}). "
+                            + "Check the installation and remove duplicate plugins from all workers.",

Review comment:
       done

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
##########
@@ -208,6 +220,21 @@ protected void initLoaders() {
         // Finally add parent/system loader.
         initPluginLoader(CLASSPATH_NAME);
         addAllAliases();
+        reportPluginConflicts();
+    }
+
+    //visible for testing
+    Set<String> reportPluginConflicts() {
+        return allAddedPlugins.entrySet().stream().filter(e -> e.getValue().size() > 1).map(e -> {
+            String pluginClassName = e.getKey();
+            PluginDesc<?> usedPluginDesc = usedPluginDesc(pluginClassName);
+            List<PluginDesc<?>> ignoredPlugins = new ArrayList<>(e.getValue());
+            ignoredPlugins.remove(usedPluginDesc);
+            log.error("Detected multiple plugins contain '{}'; using plugin {} and ignoring {} plugins ({}). "

Review comment:
       done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] kpatelatwork commented on a change in pull request #10549: KAFKA-8605 log an error message when we detect multiple copies of sam…

Posted by GitBox <gi...@apache.org>.
kpatelatwork commented on a change in pull request #10549:
URL: https://github.com/apache/kafka/pull/10549#discussion_r615310932



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
##########
@@ -187,17 +192,24 @@ private static PluginClassLoader newPluginClassLoader(
         );
     }
 
-    private <T> void addPlugins(Collection<PluginDesc<T>> plugins, ClassLoader loader) {
+    //visible for testing
+    <T> void addPlugins(Collection<PluginDesc<T>> plugins, ClassLoader loader) {
         for (PluginDesc<T> plugin : plugins) {
             String pluginClassName = plugin.className();
             SortedMap<PluginDesc<?>, ClassLoader> inner = pluginLoaders.get(pluginClassName);
+            boolean pluginConflict = false;
             if (inner == null) {
                 inner = new TreeMap<>();
                 pluginLoaders.put(pluginClassName, inner);
                 // TODO: once versioning is enabled this line should be moved outside this if branch
                 log.info("Added plugin '{}'", pluginClassName);
+            } else {
+                pluginConflict = true;
             }
             inner.put(plugin, loader);
+            if (pluginConflict) {
+                log.error("Detected multiple copies of plugin '{}', one of these will be used '{}'", pluginClassName, inner.keySet());
+            }

Review comment:
       I had discussed that case with Randall and you are right detecting and logging only once would need the change to inspect the map after all plugins are loaded but soon we will be working on dynamically loading plugins at runtime to improve the worker startup time so it would be a moot point to do it that way.  This is why it's ok to see the same statement twice as it would be rare for someone to have 3 copies of the same plugin.
   
   sure let me make the change to log which one will be used as of the time when statement is logged.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on a change in pull request #10549: KAFKA-8605 log an error message when we detect multiple copies of sam…

Posted by GitBox <gi...@apache.org>.
rhauch commented on a change in pull request #10549:
URL: https://github.com/apache/kafka/pull/10549#discussion_r616765881



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
##########
@@ -153,12 +156,25 @@ public PluginClassLoader pluginClassLoader(String name) {
         if (inner == null) {
             return null;
         }
-        ClassLoader pluginLoader = inner.get(inner.lastKey());
+        ClassLoader pluginLoader = inner.get(usedPluginDesc(inner));
         return pluginLoader instanceof PluginClassLoader
                ? (PluginClassLoader) pluginLoader
                : null;
     }
 
+    //visible for testing
+    PluginDesc<?> usedPluginDesc(String name) {
+        SortedMap<PluginDesc<?>, ClassLoader> inner = pluginLoaders.get(name);
+        if (inner == null) {
+            return null;
+        }
+        return usedPluginDesc(inner);

Review comment:
       In cases like this, it's probably a bit more idiomatic in Connect to use a tertiary operator so that there is a single `return`:
   ```suggestion
           return inner == null ? null : usedPluginDesc(inner);
   ```

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoaderTest.java
##########
@@ -129,4 +133,57 @@ public void testLoadingMixOfValidAndInvalidPlugins() throws Exception {
             assertNotNull(classLoader.pluginClassLoader(pluginClassName));
         }
     }
+
+    @Test
+    public void testPluginConflictSameClassSameVersion() {
+        DelegatingClassLoader delegatingClassLoader = new DelegatingClassLoader(Collections.emptyList());
+
+        Class<Connector> klass = Connector.class;
+        PluginDesc<Connector> connectorDesc1 = new PluginDesc<>(
+                klass,
+                "1.1.0",
+                ClassLoader.getSystemClassLoader()
+        );
+        delegatingClassLoader.addPlugins(Arrays.asList(connectorDesc1, connectorDesc1), ClassLoader.getSystemClassLoader());
+        assertTrue(delegatingClassLoader.reportPluginConflicts().contains(klass.getName()));
+    }
+
+    @Test
+    public void testPluginConflictSameClassDiffVersions() {
+        DelegatingClassLoader delegatingClassLoader = new DelegatingClassLoader(Collections.emptyList());
+        PluginDesc<Converter> notConflictingPlugin = new PluginDesc<>(
+                Converter.class,
+                "0.0",
+                ClassLoader.getSystemClassLoader()
+        );
+        delegatingClassLoader.addPlugins(Arrays.asList(notConflictingPlugin), ClassLoader.getSystemClassLoader());
+
+        Class<Connector> klass = Connector.class;
+        PluginDesc<Connector> connectorDesc1 = new PluginDesc<>(
+                klass,
+                "1.1.0",
+                ClassLoader.getSystemClassLoader()
+        );
+        delegatingClassLoader.addPlugins(Arrays.asList(connectorDesc1), ClassLoader.getSystemClassLoader());
+        assertTrue(delegatingClassLoader.reportPluginConflicts().isEmpty());
+
+        PluginDesc<Connector> connectorDesc2 = new PluginDesc<>(
+                klass,
+                "1.0.0",
+                ClassLoader.getSystemClassLoader()
+        );
+        delegatingClassLoader.addPlugins(Arrays.asList(connectorDesc2), ClassLoader.getSystemClassLoader());
+        String pluginClassName = klass.getName();
+        assertTrue(delegatingClassLoader.reportPluginConflicts().contains(pluginClassName));

Review comment:
       It's probably worthwhile to also assert that the non-conflicting plugin has not accidentally been added as a conflict, here and after the final `addPlugins(...)` call below.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
##########
@@ -208,6 +223,19 @@ protected void initLoaders() {
         // Finally add parent/system loader.
         initPluginLoader(CLASSPATH_NAME);
         addAllAliases();
+        reportPluginConflicts();
+    }
+
+    //visible for testing
+    Set<String> reportPluginConflicts() {
+        return allAddedPlugins.entrySet().stream().filter(e -> e.getValue().size() > 1).map(e -> {
+            String pluginClassName = e.getKey();
+            PluginDesc<?> usedPluginDesc = usedPluginDesc(pluginClassName);
+            List<PluginDesc<?>> ignoredPlugins = new ArrayList<>(e.getValue());
+            ignoredPlugins.remove(usedPluginDesc);
+            log.error("Detected multiple plugins contain '{}'; using {} and ignoring {}", pluginClassName, usedPluginDesc, ignoredPlugins);

Review comment:
       Since this is an error, it probably would be good to clarify the error message and provide an action. WDYT about:
   ```suggestion
               log.error("Detected multiple plugins contain '{}'; using plugin {} and ignoring {} plugins ({}). "
                       + "Check the installation and remove duplicate plugins from all workers.",
                       pluginClassName, usedPluginDesc, ignoredPlugins.size(), ignoredPlugins);
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] kpatelatwork commented on a change in pull request #10549: KAFKA-8605 log an error message when we detect multiple copies of sam…

Posted by GitBox <gi...@apache.org>.
kpatelatwork commented on a change in pull request #10549:
URL: https://github.com/apache/kafka/pull/10549#discussion_r616821920



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
##########
@@ -208,6 +223,19 @@ protected void initLoaders() {
         // Finally add parent/system loader.
         initPluginLoader(CLASSPATH_NAME);
         addAllAliases();
+        reportPluginConflicts();
+    }
+
+    //visible for testing
+    Set<String> reportPluginConflicts() {
+        return allAddedPlugins.entrySet().stream().filter(e -> e.getValue().size() > 1).map(e -> {
+            String pluginClassName = e.getKey();
+            PluginDesc<?> usedPluginDesc = usedPluginDesc(pluginClassName);
+            List<PluginDesc<?>> ignoredPlugins = new ArrayList<>(e.getValue());
+            ignoredPlugins.remove(usedPluginDesc);
+            log.error("Detected multiple plugins contain '{}'; using {} and ignoring {}", pluginClassName, usedPluginDesc, ignoredPlugins);

Review comment:
       done

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
##########
@@ -153,12 +156,25 @@ public PluginClassLoader pluginClassLoader(String name) {
         if (inner == null) {
             return null;
         }
-        ClassLoader pluginLoader = inner.get(inner.lastKey());
+        ClassLoader pluginLoader = inner.get(usedPluginDesc(inner));
         return pluginLoader instanceof PluginClassLoader
                ? (PluginClassLoader) pluginLoader
                : null;
     }
 
+    //visible for testing
+    PluginDesc<?> usedPluginDesc(String name) {
+        SortedMap<PluginDesc<?>, ClassLoader> inner = pluginLoaders.get(name);
+        if (inner == null) {
+            return null;
+        }
+        return usedPluginDesc(inner);

Review comment:
       done

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoaderTest.java
##########
@@ -129,4 +133,57 @@ public void testLoadingMixOfValidAndInvalidPlugins() throws Exception {
             assertNotNull(classLoader.pluginClassLoader(pluginClassName));
         }
     }
+
+    @Test
+    public void testPluginConflictSameClassSameVersion() {
+        DelegatingClassLoader delegatingClassLoader = new DelegatingClassLoader(Collections.emptyList());
+
+        Class<Connector> klass = Connector.class;
+        PluginDesc<Connector> connectorDesc1 = new PluginDesc<>(
+                klass,
+                "1.1.0",
+                ClassLoader.getSystemClassLoader()
+        );
+        delegatingClassLoader.addPlugins(Arrays.asList(connectorDesc1, connectorDesc1), ClassLoader.getSystemClassLoader());
+        assertTrue(delegatingClassLoader.reportPluginConflicts().contains(klass.getName()));
+    }
+
+    @Test
+    public void testPluginConflictSameClassDiffVersions() {
+        DelegatingClassLoader delegatingClassLoader = new DelegatingClassLoader(Collections.emptyList());
+        PluginDesc<Converter> notConflictingPlugin = new PluginDesc<>(
+                Converter.class,
+                "0.0",
+                ClassLoader.getSystemClassLoader()
+        );
+        delegatingClassLoader.addPlugins(Arrays.asList(notConflictingPlugin), ClassLoader.getSystemClassLoader());
+
+        Class<Connector> klass = Connector.class;
+        PluginDesc<Connector> connectorDesc1 = new PluginDesc<>(
+                klass,
+                "1.1.0",
+                ClassLoader.getSystemClassLoader()
+        );
+        delegatingClassLoader.addPlugins(Arrays.asList(connectorDesc1), ClassLoader.getSystemClassLoader());
+        assertTrue(delegatingClassLoader.reportPluginConflicts().isEmpty());
+
+        PluginDesc<Connector> connectorDesc2 = new PluginDesc<>(
+                klass,
+                "1.0.0",
+                ClassLoader.getSystemClassLoader()
+        );
+        delegatingClassLoader.addPlugins(Arrays.asList(connectorDesc2), ClassLoader.getSystemClassLoader());
+        String pluginClassName = klass.getName();
+        assertTrue(delegatingClassLoader.reportPluginConflicts().contains(pluginClassName));

Review comment:
       done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] kpatelatwork commented on a change in pull request #10549: KAFKA-8605 log an error message when we detect multiple copies of sam…

Posted by GitBox <gi...@apache.org>.
kpatelatwork commented on a change in pull request #10549:
URL: https://github.com/apache/kafka/pull/10549#discussion_r616169919



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
##########
@@ -208,6 +228,22 @@ protected void initLoaders() {
         // Finally add parent/system loader.
         initPluginLoader(CLASSPATH_NAME);
         addAllAliases();
+        reportPluginConflicts();
+    }
+
+    //visible for testing
+    Set<String> reportPluginConflicts() {
+        Set<String> conflictPluginClasses = new HashSet<>();
+        for (Map.Entry<String, List<PluginDesc<?>>> entry : allAddedPlugins.entrySet()) {
+            String pluginClassName = entry.getKey();
+            List<PluginDesc<?>> pluginDescriptors = entry.getValue();
+            if (pluginDescriptors.size() > 1) {
+                PluginDesc<?> pluginDescInUse = pluginDescInUse(pluginClassName);
+                log.error("For plugin '{}', detected multiple copies '{}', this copy '{}' will be used.", pluginClassName, pluginDescriptors, pluginDescInUse);
+                conflictPluginClasses.add(pluginClassName);
+            }
+        }
+        return conflictPluginClasses;

Review comment:
       Applied all the changes. @rhauch please check now.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] kpatelatwork commented on a change in pull request #10549: KAFKA-8605 log an error message when we detect multiple copies of sam…

Posted by GitBox <gi...@apache.org>.
kpatelatwork commented on a change in pull request #10549:
URL: https://github.com/apache/kafka/pull/10549#discussion_r615186346



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
##########
@@ -187,17 +192,24 @@ private static PluginClassLoader newPluginClassLoader(
         );
     }
 
-    private <T> void addPlugins(Collection<PluginDesc<T>> plugins, ClassLoader loader) {
+    //visible for testing
+    <T> void addPlugins(Collection<PluginDesc<T>> plugins, ClassLoader loader) {
         for (PluginDesc<T> plugin : plugins) {
             String pluginClassName = plugin.className();
             SortedMap<PluginDesc<?>, ClassLoader> inner = pluginLoaders.get(pluginClassName);
+            boolean pluginConflict = false;
             if (inner == null) {
                 inner = new TreeMap<>();
                 pluginLoaders.put(pluginClassName, inner);
                 // TODO: once versioning is enabled this line should be moved outside this if branch
                 log.info("Added plugin '{}'", pluginClassName);
+            } else {
+                pluginConflict = true;
             }
             inner.put(plugin, loader);
+            if (pluginConflict) {
+                log.error("Detected multiple copies of plugin '{}', one of these will be used '{}'", pluginClassName, inner.keySet());
+            }

Review comment:
       we can't move in else because we need to report all versions and they are added outside the if/else block. Also, I cant use size() check because sometimes the same plugin with the same version exists in 2 places. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on a change in pull request #10549: KAFKA-8605 log an error message when we detect multiple copies of sam…

Posted by GitBox <gi...@apache.org>.
rhauch commented on a change in pull request #10549:
URL: https://github.com/apache/kafka/pull/10549#discussion_r616119723



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
##########
@@ -196,8 +214,10 @@ private static PluginClassLoader newPluginClassLoader(
                 pluginLoaders.put(pluginClassName, inner);
                 // TODO: once versioning is enabled this line should be moved outside this if branch
                 log.info("Added plugin '{}'", pluginClassName);
+                allAddedPlugins.put(pluginClassName, new ArrayList<>());
             }
             inner.put(plugin, loader);
+            allAddedPlugins.get(pluginClassName).add(plugin);

Review comment:
       We can use `computeIfAbsent(...)` to eliminate the prior newly-added line:
   ```suggestion
               allAddedPlugins.computeIfAbsent(pluginClassName, n -> new ArrayList<>()).add(plugin);
   ```

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
##########
@@ -153,13 +157,26 @@ public PluginClassLoader pluginClassLoader(String name) {
         if (inner == null) {
             return null;
         }
-        ClassLoader pluginLoader = inner.get(inner.lastKey());
+        ClassLoader pluginLoader = inner.get(pluginDescInUse(inner));
         return pluginLoader instanceof PluginClassLoader
                ? (PluginClassLoader) pluginLoader
                : null;
     }
 
-    public ClassLoader connectorLoader(Connector connector) {
+    //visible for testing
+    PluginDesc<?> pluginDescInUse(String name) {

Review comment:
       I realize that `pluginDesc` in this name is just the equivalent of `PluginDesc`, but generally we try to avoid abbreviations, which is probably more true in this case because the name seems even more mangled:
   ```suggestion
       PluginDesc<?> usedPluginDesc(String name) {
   ```

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
##########
@@ -153,13 +157,26 @@ public PluginClassLoader pluginClassLoader(String name) {
         if (inner == null) {
             return null;
         }
-        ClassLoader pluginLoader = inner.get(inner.lastKey());
+        ClassLoader pluginLoader = inner.get(pluginDescInUse(inner));
         return pluginLoader instanceof PluginClassLoader
                ? (PluginClassLoader) pluginLoader
                : null;
     }
 
-    public ClassLoader connectorLoader(Connector connector) {
+    //visible for testing
+    PluginDesc<?> pluginDescInUse(String name) {
+        SortedMap<PluginDesc<?>, ClassLoader> inner = pluginLoaders.get(name);
+        if (inner == null) {
+            return null;
+        }
+        return pluginDescInUse(inner);
+    }
+
+    private PluginDesc<?> pluginDescInUse(SortedMap<PluginDesc<?>, ClassLoader> inner) {
+        return inner.lastKey();
+    }
+
+        public ClassLoader connectorLoader(Connector connector) {

Review comment:
       ```suggestion
       public ClassLoader connectorLoader(Connector connector) {
   ```

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
##########
@@ -208,6 +228,22 @@ protected void initLoaders() {
         // Finally add parent/system loader.
         initPluginLoader(CLASSPATH_NAME);
         addAllAliases();
+        reportPluginConflicts();
+    }
+
+    //visible for testing
+    Set<String> reportPluginConflicts() {
+        Set<String> conflictPluginClasses = new HashSet<>();
+        for (Map.Entry<String, List<PluginDesc<?>>> entry : allAddedPlugins.entrySet()) {
+            String pluginClassName = entry.getKey();
+            List<PluginDesc<?>> pluginDescriptors = entry.getValue();
+            if (pluginDescriptors.size() > 1) {
+                PluginDesc<?> pluginDescInUse = pluginDescInUse(pluginClassName);
+                log.error("For plugin '{}', detected multiple copies '{}', this copy '{}' will be used.", pluginClassName, pluginDescriptors, pluginDescInUse);
+                conflictPluginClasses.add(pluginClassName);
+            }
+        }
+        return conflictPluginClasses;

Review comment:
       This could be rewritten a bit more compactly and a bit more functionally:
   ```suggestion
           return allAddedPlugins.entrySet().stream().filter(e -> e.getValue().size() > 1).map(e -> {
               String pluginClassName = e.getKey();
               PluginDesc<?> pluginDescInUse = pluginDescInUse(pluginClassName);
               List<PluginDesc<?>> ignoredPlugins = new ArrayList<>(e.getValue());
               ignoredPlugins.remove(pluginDescInUse);
               log.error("Detected multiple plugins contain '{}'; using {} and ignoring {}", pluginClassName, pluginDescInUse, ignoredPlugins);
               return pluginClassName;
           }).collect(Collectors.toSet());
   ```
   My suggestion also includes a reworded error message to put the more meaningful information near the front, and computes the unused plugins and includes them at the end of the message, just in case there are multiple.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
##########
@@ -196,8 +214,10 @@ private static PluginClassLoader newPluginClassLoader(
                 pluginLoaders.put(pluginClassName, inner);
                 // TODO: once versioning is enabled this line should be moved outside this if branch
                 log.info("Added plugin '{}'", pluginClassName);
+                allAddedPlugins.put(pluginClassName, new ArrayList<>());

Review comment:
       If we use `computeIfAbsent(...)` below, we don't need this line:
   ```suggestion
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on a change in pull request #10549: KAFKA-8605 log an error message when we detect multiple copies of sam…

Posted by GitBox <gi...@apache.org>.
C0urante commented on a change in pull request #10549:
URL: https://github.com/apache/kafka/pull/10549#discussion_r615168657



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
##########
@@ -187,17 +192,24 @@ private static PluginClassLoader newPluginClassLoader(
         );
     }
 
-    private <T> void addPlugins(Collection<PluginDesc<T>> plugins, ClassLoader loader) {
+    //visible for testing
+    <T> void addPlugins(Collection<PluginDesc<T>> plugins, ClassLoader loader) {
         for (PluginDesc<T> plugin : plugins) {
             String pluginClassName = plugin.className();
             SortedMap<PluginDesc<?>, ClassLoader> inner = pluginLoaders.get(pluginClassName);
+            boolean pluginConflict = false;
             if (inner == null) {
                 inner = new TreeMap<>();
                 pluginLoaders.put(pluginClassName, inner);
                 // TODO: once versioning is enabled this line should be moved outside this if branch
                 log.info("Added plugin '{}'", pluginClassName);
+            } else {
+                pluginConflict = true;
             }
             inner.put(plugin, loader);
+            if (pluginConflict) {
+                log.error("Detected multiple copies of plugin '{}', one of these will be used '{}'", pluginClassName, inner.keySet());
+            }

Review comment:
       Do we need the `pluginConflict` field, or can this be moved into the `else` branch above?

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoaderTest.java
##########
@@ -129,4 +133,32 @@ public void testLoadingMixOfValidAndInvalidPlugins() throws Exception {
             assertNotNull(classLoader.pluginClassLoader(pluginClassName));
         }
     }
+
+    @Test
+    public void testAddMultiplePluginsWithSameClass() {

Review comment:
       Is this test buying us much? It looks like it's verifying that multiple versions of the same plugin are recognized by the worker, but not really probing how the worker handles that case.
   
   For what it's worth, I think the functional parts of this change are small and readable enough that a test might not even be warranted.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] kpatelatwork commented on a change in pull request #10549: KAFKA-8605 log an error message when we detect multiple copies of sam…

Posted by GitBox <gi...@apache.org>.
kpatelatwork commented on a change in pull request #10549:
URL: https://github.com/apache/kafka/pull/10549#discussion_r615186346



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
##########
@@ -187,17 +192,24 @@ private static PluginClassLoader newPluginClassLoader(
         );
     }
 
-    private <T> void addPlugins(Collection<PluginDesc<T>> plugins, ClassLoader loader) {
+    //visible for testing
+    <T> void addPlugins(Collection<PluginDesc<T>> plugins, ClassLoader loader) {
         for (PluginDesc<T> plugin : plugins) {
             String pluginClassName = plugin.className();
             SortedMap<PluginDesc<?>, ClassLoader> inner = pluginLoaders.get(pluginClassName);
+            boolean pluginConflict = false;
             if (inner == null) {
                 inner = new TreeMap<>();
                 pluginLoaders.put(pluginClassName, inner);
                 // TODO: once versioning is enabled this line should be moved outside this if branch
                 log.info("Added plugin '{}'", pluginClassName);
+            } else {
+                pluginConflict = true;
             }
             inner.put(plugin, loader);
+            if (pluginConflict) {
+                log.error("Detected multiple copies of plugin '{}', one of these will be used '{}'", pluginClassName, inner.keySet());
+            }

Review comment:
       we can't move in else because we need to report all versions(see `inner.keySet` in the `log.error`) and they are added outside the if/else block. Also, I cant use size() check because sometimes the same plugin with the same version exists in 2 places. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] kpatelatwork commented on a change in pull request #10549: KAFKA-8605 log an error message when we detect multiple copies of sam…

Posted by GitBox <gi...@apache.org>.
kpatelatwork commented on a change in pull request #10549:
URL: https://github.com/apache/kafka/pull/10549#discussion_r615310932



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
##########
@@ -187,17 +192,24 @@ private static PluginClassLoader newPluginClassLoader(
         );
     }
 
-    private <T> void addPlugins(Collection<PluginDesc<T>> plugins, ClassLoader loader) {
+    //visible for testing
+    <T> void addPlugins(Collection<PluginDesc<T>> plugins, ClassLoader loader) {
         for (PluginDesc<T> plugin : plugins) {
             String pluginClassName = plugin.className();
             SortedMap<PluginDesc<?>, ClassLoader> inner = pluginLoaders.get(pluginClassName);
+            boolean pluginConflict = false;
             if (inner == null) {
                 inner = new TreeMap<>();
                 pluginLoaders.put(pluginClassName, inner);
                 // TODO: once versioning is enabled this line should be moved outside this if branch
                 log.info("Added plugin '{}'", pluginClassName);
+            } else {
+                pluginConflict = true;
             }
             inner.put(plugin, loader);
+            if (pluginConflict) {
+                log.error("Detected multiple copies of plugin '{}', one of these will be used '{}'", pluginClassName, inner.keySet());
+            }

Review comment:
       I had discussed that case with Randall and you are right detecting and logging only once would need the change to inspect the map after all plugins are loaded which complicates the code. This is why it's ok to see the same statement twice as it would be rare for someone to have 3 copies of the same plugin.
   
   sure let me make the change to log which one will be used as of the time when the statement is logged.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] kpatelatwork commented on a change in pull request #10549: KAFKA-8605 log an error message when we detect multiple copies of sam…

Posted by GitBox <gi...@apache.org>.
kpatelatwork commented on a change in pull request #10549:
URL: https://github.com/apache/kafka/pull/10549#discussion_r616073878



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
##########
@@ -187,17 +192,24 @@ private static PluginClassLoader newPluginClassLoader(
         );
     }
 
-    private <T> void addPlugins(Collection<PluginDesc<T>> plugins, ClassLoader loader) {
+    //visible for testing
+    <T> void addPlugins(Collection<PluginDesc<T>> plugins, ClassLoader loader) {
         for (PluginDesc<T> plugin : plugins) {
             String pluginClassName = plugin.className();
             SortedMap<PluginDesc<?>, ClassLoader> inner = pluginLoaders.get(pluginClassName);
+            boolean pluginConflict = false;
             if (inner == null) {
                 inner = new TreeMap<>();
                 pluginLoaders.put(pluginClassName, inner);
                 // TODO: once versioning is enabled this line should be moved outside this if branch
                 log.info("Added plugin '{}'", pluginClassName);
+            } else {
+                pluginConflict = true;
             }
             inner.put(plugin, loader);
+            if (pluginConflict) {
+                log.error("Detected multiple copies of plugin '{}', one of these will be used '{}'", pluginClassName, inner.keySet());
+            }

Review comment:
       @C0urante you are right, I reworked the PR after your comments, Could you please check now?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on a change in pull request #10549: KAFKA-8605 log an error message when we detect multiple copies of sam…

Posted by GitBox <gi...@apache.org>.
C0urante commented on a change in pull request #10549:
URL: https://github.com/apache/kafka/pull/10549#discussion_r615189609



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
##########
@@ -187,17 +192,24 @@ private static PluginClassLoader newPluginClassLoader(
         );
     }
 
-    private <T> void addPlugins(Collection<PluginDesc<T>> plugins, ClassLoader loader) {
+    //visible for testing
+    <T> void addPlugins(Collection<PluginDesc<T>> plugins, ClassLoader loader) {
         for (PluginDesc<T> plugin : plugins) {
             String pluginClassName = plugin.className();
             SortedMap<PluginDesc<?>, ClassLoader> inner = pluginLoaders.get(pluginClassName);
+            boolean pluginConflict = false;
             if (inner == null) {
                 inner = new TreeMap<>();
                 pluginLoaders.put(pluginClassName, inner);
                 // TODO: once versioning is enabled this line should be moved outside this if branch
                 log.info("Added plugin '{}'", pluginClassName);
+            } else {
+                pluginConflict = true;
             }
             inner.put(plugin, loader);
+            if (pluginConflict) {
+                log.error("Detected multiple copies of plugin '{}', one of these will be used '{}'", pluginClassName, inner.keySet());
+            }

Review comment:
       Oh, gotcha--in that case, should we do a check somewhere else, since this will be triggered potentially multiple times for a single plugin? For example, if there are three copies of a connector, the warning will be logged twice right now, with different values for `inner.keySet()` each time.
   
   Also, it may also help to log exactly which one we're going to use either instead of or in addition to the complete set of discovered versions of the duplicated plugin.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on pull request #10549: KAFKA-8605 log an error message when we detect multiple copies of sam…

Posted by GitBox <gi...@apache.org>.
rhauch commented on pull request #10549:
URL: https://github.com/apache/kafka/pull/10549#issuecomment-823335289


   Several green builds (but failures on publishing results), and one Streams unit test failure unrelated to this PR (or Connect).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] kpatelatwork commented on a change in pull request #10549: KAFKA-8605 log an error message when we detect multiple copies of sam…

Posted by GitBox <gi...@apache.org>.
kpatelatwork commented on a change in pull request #10549:
URL: https://github.com/apache/kafka/pull/10549#discussion_r615186466



##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoaderTest.java
##########
@@ -129,4 +133,32 @@ public void testLoadingMixOfValidAndInvalidPlugins() throws Exception {
             assertNotNull(classLoader.pluginClassLoader(pluginClassName));
         }
     }
+
+    @Test
+    public void testAddMultiplePluginsWithSameClass() {

Review comment:
       I felt the same but I wanted to see the log statement and there was no way to see it so I added the test, the only other option was to add a ConflictListener but that seemed like a hammer to kill a mosquito :).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] kpatelatwork commented on a change in pull request #10549: KAFKA-8605 log an error message when we detect multiple copies of sam…

Posted by GitBox <gi...@apache.org>.
kpatelatwork commented on a change in pull request #10549:
URL: https://github.com/apache/kafka/pull/10549#discussion_r615186466



##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoaderTest.java
##########
@@ -129,4 +133,32 @@ public void testLoadingMixOfValidAndInvalidPlugins() throws Exception {
             assertNotNull(classLoader.pluginClassLoader(pluginClassName));
         }
     }
+
+    @Test
+    public void testAddMultiplePluginsWithSameClass() {

Review comment:
       I felt the same but I wanted to see the log statement and there was no way to see it so I added the test, the only other option was to add a ConflictListener but that seemed like a hammer to kill a mosquito :).
   
   let me see if on Monday I can change the test by probing the getPluginLoader method to ensure that the highest version plugin is returned.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] kpatelatwork commented on pull request #10549: KAFKA-8605 log an error message when we detect multiple copies of sam…

Posted by GitBox <gi...@apache.org>.
kpatelatwork commented on pull request #10549:
URL: https://github.com/apache/kafka/pull/10549#issuecomment-823396793


   Thanks @rhauch very good suggestions. I applied all of them.  Please check again.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on a change in pull request #10549: KAFKA-8605 log an error message when we detect multiple copies of sam…

Posted by GitBox <gi...@apache.org>.
C0urante commented on a change in pull request #10549:
URL: https://github.com/apache/kafka/pull/10549#discussion_r615928396



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
##########
@@ -187,17 +192,24 @@ private static PluginClassLoader newPluginClassLoader(
         );
     }
 
-    private <T> void addPlugins(Collection<PluginDesc<T>> plugins, ClassLoader loader) {
+    //visible for testing
+    <T> void addPlugins(Collection<PluginDesc<T>> plugins, ClassLoader loader) {
         for (PluginDesc<T> plugin : plugins) {
             String pluginClassName = plugin.className();
             SortedMap<PluginDesc<?>, ClassLoader> inner = pluginLoaders.get(pluginClassName);
+            boolean pluginConflict = false;
             if (inner == null) {
                 inner = new TreeMap<>();
                 pluginLoaders.put(pluginClassName, inner);
                 // TODO: once versioning is enabled this line should be moved outside this if branch
                 log.info("Added plugin '{}'", pluginClassName);
+            } else {
+                pluginConflict = true;
             }
             inner.put(plugin, loader);
+            if (pluginConflict) {
+                log.error("Detected multiple copies of plugin '{}', one of these will be used '{}'", pluginClassName, inner.keySet());
+            }

Review comment:
       Mmmm, I'm not sure we should be making decisions here based on dynamic plugin loading for two reasons:
   
   1. This change can be backported to older versions of Connect, which will never have that feature.
   2. It's unclear exactly what the mechanism for dynamic plugin loading will be, and it's possible that a re-scan of all known plugins after loading has taken place (either the initial start load or a subsequent dynamic load at runtime) could still be beneficial
   
   Also, it's actually not that uncommon for 3+ copies of the same plugin to appear on the plugin path for a worker. For example, some connectors come packaged directly with converters; all it takes is at least two such connectors and a separately-installed copy of that converter to lead to that number of copies, without any error or misconfiguration on the part of the user.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on a change in pull request #10549: KAFKA-8605 log an error message when we detect multiple copies of sam…

Posted by GitBox <gi...@apache.org>.
C0urante commented on a change in pull request #10549:
URL: https://github.com/apache/kafka/pull/10549#discussion_r615928396



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
##########
@@ -187,17 +192,24 @@ private static PluginClassLoader newPluginClassLoader(
         );
     }
 
-    private <T> void addPlugins(Collection<PluginDesc<T>> plugins, ClassLoader loader) {
+    //visible for testing
+    <T> void addPlugins(Collection<PluginDesc<T>> plugins, ClassLoader loader) {
         for (PluginDesc<T> plugin : plugins) {
             String pluginClassName = plugin.className();
             SortedMap<PluginDesc<?>, ClassLoader> inner = pluginLoaders.get(pluginClassName);
+            boolean pluginConflict = false;
             if (inner == null) {
                 inner = new TreeMap<>();
                 pluginLoaders.put(pluginClassName, inner);
                 // TODO: once versioning is enabled this line should be moved outside this if branch
                 log.info("Added plugin '{}'", pluginClassName);
+            } else {
+                pluginConflict = true;
             }
             inner.put(plugin, loader);
+            if (pluginConflict) {
+                log.error("Detected multiple copies of plugin '{}', one of these will be used '{}'", pluginClassName, inner.keySet());
+            }

Review comment:
       Mmmm, I'm not sure we should be making decisions here based on dynamic plugin loading for two reasons:
   
   1. This change can be backported to older versions of Connect, which will never have that feature.
   2. It's unclear exactly what the mechanism for dynamic plugin loading will be, and it's possible that a re-scan of all known plugins after loading has taken place (either the initial start load or a subsequent dynamic load at runtime) could still be beneficial
   
   Also, it's actually not that uncommon for 3+ copies of the same plugin to appear on the plugin path for a worker. For example, some connectors come packaged directly with converters; all it takes is at least two such connectors and a separately-installed copy of that converter to lead to that number of copies.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on a change in pull request #10549: KAFKA-8605 log an error message when we detect multiple copies of sam…

Posted by GitBox <gi...@apache.org>.
rhauch commented on a change in pull request #10549:
URL: https://github.com/apache/kafka/pull/10549#discussion_r617008391



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
##########
@@ -208,6 +220,21 @@ protected void initLoaders() {
         // Finally add parent/system loader.
         initPluginLoader(CLASSPATH_NAME);
         addAllAliases();
+        reportPluginConflicts();
+    }
+
+    //visible for testing
+    Set<String> reportPluginConflicts() {
+        return allAddedPlugins.entrySet().stream().filter(e -> e.getValue().size() > 1).map(e -> {
+            String pluginClassName = e.getKey();
+            PluginDesc<?> usedPluginDesc = usedPluginDesc(pluginClassName);
+            List<PluginDesc<?>> ignoredPlugins = new ArrayList<>(e.getValue());
+            ignoredPlugins.remove(usedPluginDesc);
+            log.warn("Detected multiple plugins contain '{}'; using plugin {} and ignoring {} plugins ({}). "
+                            + "Check the installation on all workers and if possible remove all but one of these duplicated plugins.",
+                    pluginClassName, usedPluginDesc, ignoredPlugins.size(), ignoredPlugins);

Review comment:
       Indentation is incorrect. It should be:
   ```suggestion
               log.warn("Detected multiple plugins contain '{}'; using plugin {} and ignoring {} plugins ({}). "
                       + "Check the installation on all workers and if possible remove all but one of these duplicated plugins.",
                       pluginClassName, usedPluginDesc, ignoredPlugins.size(), ignoredPlugins);
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] kpatelatwork commented on a change in pull request #10549: KAFKA-8605 log an error message when we detect multiple copies of sam…

Posted by GitBox <gi...@apache.org>.
kpatelatwork commented on a change in pull request #10549:
URL: https://github.com/apache/kafka/pull/10549#discussion_r615186466



##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoaderTest.java
##########
@@ -129,4 +133,32 @@ public void testLoadingMixOfValidAndInvalidPlugins() throws Exception {
             assertNotNull(classLoader.pluginClassLoader(pluginClassName));
         }
     }
+
+    @Test
+    public void testAddMultiplePluginsWithSameClass() {

Review comment:
       I felt the same but I wanted to see the log statement and there was no way to see it so I added the test, the only other option was to add a ConflictListener but that seemed like a hammer to kill a mosquito :).
   
   let me see if on Monday I can do a test by probing the getPluginLoader method to ensure that the highest version plugin is returned.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on a change in pull request #10549: KAFKA-8605 log an error message when we detect multiple copies of sam…

Posted by GitBox <gi...@apache.org>.
rhauch commented on a change in pull request #10549:
URL: https://github.com/apache/kafka/pull/10549#discussion_r616994286



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
##########
@@ -208,6 +220,21 @@ protected void initLoaders() {
         // Finally add parent/system loader.
         initPluginLoader(CLASSPATH_NAME);
         addAllAliases();
+        reportPluginConflicts();
+    }
+
+    //visible for testing
+    Set<String> reportPluginConflicts() {
+        return allAddedPlugins.entrySet().stream().filter(e -> e.getValue().size() > 1).map(e -> {
+            String pluginClassName = e.getKey();
+            PluginDesc<?> usedPluginDesc = usedPluginDesc(pluginClassName);
+            List<PluginDesc<?>> ignoredPlugins = new ArrayList<>(e.getValue());
+            ignoredPlugins.remove(usedPluginDesc);
+            log.error("Detected multiple plugins contain '{}'; using plugin {} and ignoring {} plugins ({}). "
+                            + "Check the installation and remove duplicate plugins from all workers.",

Review comment:
       I know I suggested this text, but reading it now makes me think that users may not really know what "remove duplicate plugins" means. "Remove all of the ones listed here?" No, we mean all but the one you really want to use. :-D
   
   How about the following?
    ```suggestion
                               + "Check the installation on all workers and if possible remove all but one of these duplicated plugins.",
   ```

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
##########
@@ -208,6 +220,21 @@ protected void initLoaders() {
         // Finally add parent/system loader.
         initPluginLoader(CLASSPATH_NAME);
         addAllAliases();
+        reportPluginConflicts();
+    }
+
+    //visible for testing
+    Set<String> reportPluginConflicts() {
+        return allAddedPlugins.entrySet().stream().filter(e -> e.getValue().size() > 1).map(e -> {
+            String pluginClassName = e.getKey();
+            PluginDesc<?> usedPluginDesc = usedPluginDesc(pluginClassName);
+            List<PluginDesc<?>> ignoredPlugins = new ArrayList<>(e.getValue());
+            ignoredPlugins.remove(usedPluginDesc);
+            log.error("Detected multiple plugins contain '{}'; using plugin {} and ignoring {} plugins ({}). "

Review comment:
       Offline you suggested that we change this to warn, based on @C0urante's earlier observation that sometimes the same converter might be included by multiple plugins. This latter isn't an issue if it's the same converter version in all of them.
   
   So +1 to change this to warn:
   ```suggestion
               log.warn("Detected multiple plugins contain '{}'; using plugin {} and ignoring {} plugins ({}). "
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org