You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/12/07 10:42:37 UTC

[GitHub] [kafka] mimaison opened a new pull request #11572: KAFKA-13510: Connect APIs to list all connector plugins and retrieve …

mimaison opened a new pull request #11572:
URL: https://github.com/apache/kafka/pull/11572


   …their configdefs
   
   Implements KIP-769
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on a change in pull request #11572: KAFKA-13510: Connect APIs to list all connector plugins and retrieve …

Posted by GitBox <gi...@apache.org>.
C0urante commented on a change in pull request #11572:
URL: https://github.com/apache/kafka/pull/11572#discussion_r817938870



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginType.java
##########
@@ -17,22 +17,24 @@
 package org.apache.kafka.connect.runtime.isolation;
 
 import org.apache.kafka.common.config.provider.ConfigProvider;
-import org.apache.kafka.connect.connector.Connector;
 import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
 import org.apache.kafka.connect.rest.ConnectRestExtension;
 import org.apache.kafka.connect.sink.SinkConnector;
 import org.apache.kafka.connect.source.SourceConnector;
 import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
 import org.apache.kafka.connect.transforms.Transformation;
+import org.apache.kafka.connect.transforms.predicates.Predicate;
 
 import java.util.Locale;
 
 public enum PluginType {
     SOURCE(SourceConnector.class),
     SINK(SinkConnector.class),
-    CONNECTOR(Connector.class),

Review comment:
       I think it allowed for classes that subclassed `Connector` but didn't subclass from `SinkConnector` or `SourceConnector` to still be detected by the worker during plugin scanning and included in responses for the `/connector-plugins` endpoint, with a type of `UNKNOWN`.
   
   With this change, those kinds of connector classes won't appear in the response for this endpoint anymore.
   
   I think this should be fine. As of https://github.com/apache/kafka/pull/2604, which was merged over a year ago, it's impossible to create that kind of connector anyways.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante edited a comment on pull request #11572: KAFKA-13510: Connect APIs to list all connector plugins and retrieve …

Posted by GitBox <gi...@apache.org>.
C0urante edited a comment on pull request #11572:
URL: https://github.com/apache/kafka/pull/11572#issuecomment-1054567120


   Noticed that the "resolve comment" option isn't available on this PR, apparently you need to either be the author of a PR or have write access to the repo in order to resolve comments ([ref](https://docs.github.com/en/pull-requests/collaborating-with-pull-requests/reviewing-changes-in-pull-requests/commenting-on-a-pull-request#resolving-conversations)). Feel free to resolve everything from me if you'd like; might help reduce noise on this page for other reviewers.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on a change in pull request #11572: KAFKA-13510: Connect APIs to list all connector plugins and retrieve …

Posted by GitBox <gi...@apache.org>.
C0urante commented on a change in pull request #11572:
URL: https://github.com/apache/kafka/pull/11572#discussion_r814469917



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java
##########
@@ -56,16 +69,49 @@
     private static final String ALIAS_SUFFIX = "Connector";
     private final Herder herder;
     private final List<ConnectorPluginInfo> connectorPlugins;
+    private final Map<String, PluginType> pluginsByType;
 
-    private static final List<Class<? extends Connector>> CONNECTOR_EXCLUDES = Arrays.asList(
+    static final List<Class<? extends Connector>> CONNECTOR_EXCLUDES = Arrays.asList(
             VerifiableSourceConnector.class, VerifiableSinkConnector.class,
             MockConnector.class, MockSourceConnector.class, MockSinkConnector.class,
             SchemaSourceConnector.class
     );
 
+    @SuppressWarnings("rawtypes")
+    static final List<Class<? extends Transformation>> TRANSFORM_EXCLUDES = Arrays.asList(
+            PredicatedTransformation.class
+    );
+
     public ConnectorPluginsResource(Herder herder) {
         this.herder = herder;
         this.connectorPlugins = new ArrayList<>();
+        this.pluginsByType = new HashMap<>();
+
+        // TODO: improve once plugins are allowed to be added/removed during runtime.
+        for (PluginDesc<Connector> plugin : herder.plugins().connectors()) {
+            if (!CONNECTOR_EXCLUDES.contains(plugin.pluginClass())) {
+                connectorPlugins.add(new ConnectorPluginInfo(plugin, PluginType.from(plugin.pluginClass())));
+                pluginsByType.put(getAlias(plugin.className()), PluginType.from(plugin.pluginClass()));
+            }
+        }
+        for (PluginDesc<Transformation<?>> transform : herder.plugins().transformations()) {
+            if (!TRANSFORM_EXCLUDES.contains(transform.pluginClass())) {
+                connectorPlugins.add(new ConnectorPluginInfo(transform, PluginType.TRANSFORMATION));
+                pluginsByType.put(getAlias(transform.className()), PluginType.TRANSFORMATION);
+            }
+        }
+        for (PluginDesc<Predicate<?>> predicate : herder.plugins().predicates()) {
+            connectorPlugins.add(new ConnectorPluginInfo(predicate, PluginType.PREDICATE));
+            pluginsByType.put(getAlias(predicate.className()), PluginType.PREDICATE);
+        }
+        for (PluginDesc<Converter> converter : herder.plugins().converters()) {
+            connectorPlugins.add(new ConnectorPluginInfo(converter, PluginType.CONVERTER));
+            pluginsByType.put(getAlias(converter.className()), PluginType.CONVERTER);
+        }
+        for (PluginDesc<HeaderConverter> headerConverter : herder.plugins().headerConverters()) {
+            connectorPlugins.add(new ConnectorPluginInfo(headerConverter, PluginType.HEADER_CONVERTER));
+            pluginsByType.put(getAlias(headerConverter.className()), PluginType.HEADER_CONVERTER);
+        }

Review comment:
       It seems like we're duplicating some of the logic contained in `Plugins` into this class by tracking class alias names and pre-computing plugin type based on them.
   
   Did you consider a `Herder` method that only accepted the name of the plugin, and took on the responsibility of deducing the plugin type itself?
   ```java
   List<ConfigKeyInfo> connectorPluginConfig(String pluginName);
   ```
   
   In `AbstractHerder`, we could do something like this:
   ```java
       @Override
       public List<ConfigKeyInfo> connectorPluginConfig(String pluginName) {
           try {
               Object plugin = Plugins.newPlugin(pluginName);
               PluginType pluginType = PluginType.from(plugin.class);
               List<ConfigKeyInfo> results = new ArrayList<>();
               ConfigDef configDefs;
               switch (pluginType) {
                   case SINK:
                   case SOURCE:
                       configDefs = ((Connector) plugin).config();
                       break;
                   case CONVERTER:
                       configDefs = ((Converter) plugin).config();
                       break;
               // ... Rest of switch statement follows same pattern, and rest of the method remains unchanged
       }
   ```
   
   And in `Plugins` we could do this:
   ```java
       public Object newPlugin(String classOrAlias) throws ClassNotFoundException {
           Class<? extends Object> klass = pluginClass(delegatingLoader, classOrAlias, Object.class);
           return newPlugin(klass);
       }
   ```
   
   
   Or alternatively, we could introduce a common interface for plugins that expose a `ConfigDef`:
   ```java
   interface DefinedConfigPlugin {
       ConfigDef config();
   }
   ```
   
   Which could really simplify some of the `AbstractHerder` logic:
   ```java
       @Override
       public List<ConfigKeyInfo> connectorPluginConfig(String pluginName) {
           try {
               DefinedConfigPlugin plugin = Plugins.newDefinedConfigPlugin(pluginName);
               ConfigDef configDefs = plugin.config();
               // No switch statement on plugin type necessary
               // ... Rest of the method  remains unchanged
       }
   ```
   
   And the change to `Plugins` would be lightweight as well:
   ```java
       public DefinedConfigPlugin newDefinedConfigPlugin(String classOrAlias) throws ClassNotFoundException {
           Class<? extends DefinedConfigPlugin> klass = pluginClass(delegatingLoader, classOrAlias, DefinedConfigPlugin.class);
           return newPlugin(klass);
       }
   ```
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mimaison commented on pull request #11572: KAFKA-13510: Connect APIs to list all connector plugins and retrieve …

Posted by GitBox <gi...@apache.org>.
mimaison commented on pull request #11572:
URL: https://github.com/apache/kafka/pull/11572#issuecomment-1048687705


   @tombentley @vvcephei @C0urante As you voted on the KIP, can you take a look? Thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mimaison commented on a change in pull request #11572: KAFKA-13510: Connect APIs to list all connector plugins and retrieve …

Posted by GitBox <gi...@apache.org>.
mimaison commented on a change in pull request #11572:
URL: https://github.com/apache/kafka/pull/11572#discussion_r817885958



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginType.java
##########
@@ -17,22 +17,24 @@
 package org.apache.kafka.connect.runtime.isolation;
 
 import org.apache.kafka.common.config.provider.ConfigProvider;
-import org.apache.kafka.connect.connector.Connector;
 import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
 import org.apache.kafka.connect.rest.ConnectRestExtension;
 import org.apache.kafka.connect.sink.SinkConnector;
 import org.apache.kafka.connect.source.SourceConnector;
 import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
 import org.apache.kafka.connect.transforms.Transformation;
+import org.apache.kafka.connect.transforms.predicates.Predicate;
 
 import java.util.Locale;
 
 public enum PluginType {
     SOURCE(SourceConnector.class),
     SINK(SinkConnector.class),
-    CONNECTOR(Connector.class),

Review comment:
       To be honest I'm not sure as it wasn't used by anything.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on a change in pull request #11572: KAFKA-13510: Connect APIs to list all connector plugins and retrieve …

Posted by GitBox <gi...@apache.org>.
C0urante commented on a change in pull request #11572:
URL: https://github.com/apache/kafka/pull/11572#discussion_r814469917



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java
##########
@@ -56,16 +69,49 @@
     private static final String ALIAS_SUFFIX = "Connector";
     private final Herder herder;
     private final List<ConnectorPluginInfo> connectorPlugins;
+    private final Map<String, PluginType> pluginsByType;
 
-    private static final List<Class<? extends Connector>> CONNECTOR_EXCLUDES = Arrays.asList(
+    static final List<Class<? extends Connector>> CONNECTOR_EXCLUDES = Arrays.asList(
             VerifiableSourceConnector.class, VerifiableSinkConnector.class,
             MockConnector.class, MockSourceConnector.class, MockSinkConnector.class,
             SchemaSourceConnector.class
     );
 
+    @SuppressWarnings("rawtypes")
+    static final List<Class<? extends Transformation>> TRANSFORM_EXCLUDES = Arrays.asList(
+            PredicatedTransformation.class
+    );
+
     public ConnectorPluginsResource(Herder herder) {
         this.herder = herder;
         this.connectorPlugins = new ArrayList<>();
+        this.pluginsByType = new HashMap<>();
+
+        // TODO: improve once plugins are allowed to be added/removed during runtime.
+        for (PluginDesc<Connector> plugin : herder.plugins().connectors()) {
+            if (!CONNECTOR_EXCLUDES.contains(plugin.pluginClass())) {
+                connectorPlugins.add(new ConnectorPluginInfo(plugin, PluginType.from(plugin.pluginClass())));
+                pluginsByType.put(getAlias(plugin.className()), PluginType.from(plugin.pluginClass()));
+            }
+        }
+        for (PluginDesc<Transformation<?>> transform : herder.plugins().transformations()) {
+            if (!TRANSFORM_EXCLUDES.contains(transform.pluginClass())) {
+                connectorPlugins.add(new ConnectorPluginInfo(transform, PluginType.TRANSFORMATION));
+                pluginsByType.put(getAlias(transform.className()), PluginType.TRANSFORMATION);
+            }
+        }
+        for (PluginDesc<Predicate<?>> predicate : herder.plugins().predicates()) {
+            connectorPlugins.add(new ConnectorPluginInfo(predicate, PluginType.PREDICATE));
+            pluginsByType.put(getAlias(predicate.className()), PluginType.PREDICATE);
+        }
+        for (PluginDesc<Converter> converter : herder.plugins().converters()) {
+            connectorPlugins.add(new ConnectorPluginInfo(converter, PluginType.CONVERTER));
+            pluginsByType.put(getAlias(converter.className()), PluginType.CONVERTER);
+        }
+        for (PluginDesc<HeaderConverter> headerConverter : herder.plugins().headerConverters()) {
+            connectorPlugins.add(new ConnectorPluginInfo(headerConverter, PluginType.HEADER_CONVERTER));
+            pluginsByType.put(getAlias(headerConverter.className()), PluginType.HEADER_CONVERTER);
+        }

Review comment:
       It seems like we're duplicating some of the logic contained in `Plugins` into this class by tracking class alias names and pre-computing plugin type based on them.
   
   Did you consider a `Herder` method that only accepted the name of the plugin, and took on the responsibility of deducing the plugin type itself?
   ```java
   List<ConfigKeyInfo> connectorPluginConfig(String pluginName);
   ```
   
   In `AbstractHerder`, we could do something like this:
   ```java
       @Override
       public List<ConfigKeyInfo> connectorPluginConfig(String pluginName) {
           try {
               Object plugin = Plugins.newPlugin(pluginName);
               PluginType pluginType = PluginType.from(plugin.class);
               List<ConfigKeyInfo> results = new ArrayList<>();
               ConfigDef configDefs;
               switch (pluginType) {
                   case SINK:
                   case SOURCE:
                       configDefs = ((Connector) plugin).config();
                       break;
                   case CONVERTER:
                       configDefs = ((Converter) plugin).config();
                       break;
               // ... Rest of switch statement follows same pattern, and rest of the method remains unchanged
       }
   ```
   
   And in `Plugins` we could do this:
   ```java
       public Object newPlugin(String classOrAlias) throws ClassNotFoundException {
           Class<? extends Object> klass = pluginClass(delegatingLoader, classOrAlias, Object.class);
           return newPlugin(klass);
       }
   ```
   
   
   Or alternatively, we could introduce a common interface for plugins that expose a `ConfigDef`:
   ```java
   interface DefinedConfigPlugin {
       ConfigDef config();
   }
   ```
   
   Which could really simplify some of the `AbstractHerder` logic:
   ```java
       @Override
       public List<ConfigKeyInfo> connectorPluginConfig(String pluginName) {
           try {
               DefinedConfigPlugin plugin = Plugins.newDefinedConfigPlugin(pluginName);
               ConfigDef configDefs = plugin.config();
               // No switch statement on plugin type necessary
               // ... Rest of the method  remains unchanged
       }
   ```
   
   And the change to `Plugins` would be lightweight as well:
   ```java
       public DefinedConfigPlugin newDefinedConfigPlugin(String classOrAlias) throws ClassNotFoundException {
           Class<? extends DefinedConfigPlugin> klass = pluginClass(delegatingLoader, classOrAlias, DefinedConfigPlugin.class);
           return newPlugin(klass);
       }
   ```
   
   Worth noting that if we want to differentiate to users between "this plugin is not on the worker" and "we don't expose config information for this type of plugin", we'd have to make a few further tweaks.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mimaison commented on a change in pull request #11572: KAFKA-13510: Connect APIs to list all connector plugins and retrieve …

Posted by GitBox <gi...@apache.org>.
mimaison commented on a change in pull request #11572:
URL: https://github.com/apache/kafka/pull/11572#discussion_r814917568



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorPluginInfo.java
##########
@@ -79,11 +82,21 @@ public int hashCode() {
 
     @Override
     public String toString() {
-        final StringBuilder sb = new StringBuilder("ConnectorPluginInfo{");
-        sb.append("className='").append(className).append('\'');
-        sb.append(", type=").append(type);
-        sb.append(", version='").append(version).append('\'');
-        sb.append('}');
-        return sb.toString();
+        return "ConnectorPluginInfo{" + "className='" + className + '\'' +
+                ", type=" + type.toString() +
+                ", version='" + version + '\'' +
+                '}';
+    }
+
+    public static final class NoVersionFilter {
+        @Override
+        public boolean equals(Object obj) {
+            return DelegatingClassLoader.UNDEFINED_VERSION.equals(obj);
+        }
+
+        @Override
+        public int hashCode() {
+            return super.hashCode();
+        }

Review comment:
       Without this method, the compilation fails. To be honest, I'm not sure if there's a way to avoid it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] tombentley commented on a change in pull request #11572: KAFKA-13510: Connect APIs to list all connector plugins and retrieve …

Posted by GitBox <gi...@apache.org>.
tombentley commented on a change in pull request #11572:
URL: https://github.com/apache/kafka/pull/11572#discussion_r817623175



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
##########
@@ -127,10 +131,26 @@ public DelegatingClassLoader(List<String> pluginPaths) {
         this(pluginPaths, DelegatingClassLoader.class.getClassLoader());
     }
 
+    @SuppressWarnings("unchecked")
     public Set<PluginDesc<Connector>> connectors() {
+        Set<PluginDesc<Connector>> connectors = new TreeSet<>();
+        for (PluginDesc<SinkConnector> sinkConnector : sinkConnectors) {
+            connectors.add((PluginDesc<Connector>) (PluginDesc<? extends Connector>) sinkConnector);
+        }
+        for (PluginDesc<SourceConnector> sourceConnector : sourceConnectors) {
+            connectors.add((PluginDesc<Connector>) (PluginDesc<? extends Connector>) sourceConnector);
+        }
         return connectors;

Review comment:
       ```suggestion
           Set<PluginDesc<Connector>> connectors = new TreeSet<>((Set) sinkConnectors);
           connectors.addAll((Set) sourceConnectors);
           return connectors;
   ```

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
##########
@@ -750,4 +755,41 @@ private String trace(Throwable t) {
         return keys;
     }
 
+    @Override
+    public List<ConfigKeyInfo> connectorPluginConfig(String pluginName) {

Review comment:
       Should it really be called `connectorPluginConfig` when it handles other plugins too?

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorPluginInfo.java
##########
@@ -17,30 +17,32 @@
 package org.apache.kafka.connect.runtime.rest.entities;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.kafka.connect.connector.Connector;
+import org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader;
 import org.apache.kafka.connect.runtime.isolation.PluginDesc;
+import org.apache.kafka.connect.runtime.isolation.PluginType;
 
 import java.util.Objects;
 
 public class ConnectorPluginInfo {

Review comment:
       Should the name remain as `ConnectorPluginInfo` when it's no longer just for connector plugins?

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginType.java
##########
@@ -17,22 +17,24 @@
 package org.apache.kafka.connect.runtime.isolation;
 
 import org.apache.kafka.common.config.provider.ConfigProvider;
-import org.apache.kafka.connect.connector.Connector;
 import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
 import org.apache.kafka.connect.rest.ConnectRestExtension;
 import org.apache.kafka.connect.sink.SinkConnector;
 import org.apache.kafka.connect.source.SourceConnector;
 import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
 import org.apache.kafka.connect.transforms.Transformation;
+import org.apache.kafka.connect.transforms.predicates.Predicate;
 
 import java.util.Locale;
 
 public enum PluginType {
     SOURCE(SourceConnector.class),
     SINK(SinkConnector.class),
-    CONNECTOR(Connector.class),

Review comment:
       This makes me wonder why CONNECTOR was ever in this enum. Do you know @mimaison / @C0urante ?

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java
##########
@@ -100,21 +135,23 @@ public ConfigInfos validateConfigs(
 
     @GET
     @Path("/")
-    public List<ConnectorPluginInfo> listConnectorPlugins() {
-        return getConnectorPlugins();
-    }
-
-    // TODO: improve once plugins are allowed to be added/removed during runtime.
-    private synchronized List<ConnectorPluginInfo> getConnectorPlugins() {
-        if (connectorPlugins.isEmpty()) {
-            for (PluginDesc<Connector> plugin : herder.plugins().connectors()) {
-                if (!CONNECTOR_EXCLUDES.contains(plugin.pluginClass())) {
-                    connectorPlugins.add(new ConnectorPluginInfo(plugin));
-                }
+    public List<ConnectorPluginInfo> listConnectorPlugins(@DefaultValue("true") @QueryParam("connectorsOnly") boolean connectorsOnly) {
+        synchronized (this) {
+            if (connectorsOnly) {
+                Set<String> types = new HashSet<>(Arrays.asList(PluginType.SINK.toString(), PluginType.SOURCE.toString()));
+                return Collections.unmodifiableList(connectorPlugins.stream().filter(p -> types.contains(p.type())).collect(Collectors.toList()));

Review comment:
       Is it worth using `Set.contains` for this, rather than `.equals` and `||`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mimaison commented on a change in pull request #11572: KAFKA-13510: Connect APIs to list all connector plugins and retrieve …

Posted by GitBox <gi...@apache.org>.
mimaison commented on a change in pull request #11572:
URL: https://github.com/apache/kafka/pull/11572#discussion_r817933038



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
##########
@@ -750,4 +755,41 @@ private String trace(Throwable t) {
         return keys;
     }
 
+    @Override
+    public List<ConfigKeyInfo> connectorPluginConfig(String pluginName) {

Review comment:
       I pushed an update to rename `ConnectorPluginInfo` to `PluginInfo`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on a change in pull request #11572: KAFKA-13510: Connect APIs to list all connector plugins and retrieve …

Posted by GitBox <gi...@apache.org>.
C0urante commented on a change in pull request #11572:
URL: https://github.com/apache/kafka/pull/11572#discussion_r817913673



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
##########
@@ -750,4 +755,41 @@ private String trace(Throwable t) {
         return keys;
     }
 
+    @Override
+    public List<ConfigKeyInfo> connectorPluginConfig(String pluginName) {

Review comment:
       If we continue to put the onus on the `Herder` to ensure that the requested plugin has a supported type (e.g., throw an error if it's a REST extension), then I think "connector plugin" is useful since it differentiates between worker and connector plugins.
   
   But it looks like the `ConnectorPluginInfo` class is itself completely agnostic on the connector-plugin vs. worker-plugin front, and it might even be possible to leverage it with no modifications if we eventually decide to add support for exposing worker plugin information to the REST API. So I think renaming that to `PluginInfo` (or something like that) would be reasonable.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mimaison commented on pull request #11572: KAFKA-13510: Connect APIs to list all connector plugins and retrieve …

Posted by GitBox <gi...@apache.org>.
mimaison commented on pull request #11572:
URL: https://github.com/apache/kafka/pull/11572#issuecomment-1055821359


   @rhauch Can you take a look? It would be great to get this in 3.2. Thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on a change in pull request #11572: KAFKA-13510: Connect APIs to list all connector plugins and retrieve …

Posted by GitBox <gi...@apache.org>.
C0urante commented on a change in pull request #11572:
URL: https://github.com/apache/kafka/pull/11572#discussion_r814970233



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java
##########
@@ -57,15 +71,46 @@
     private final Herder herder;
     private final List<ConnectorPluginInfo> connectorPlugins;
 
-    private static final List<Class<? extends Connector>> CONNECTOR_EXCLUDES = Arrays.asList(
+    static final List<Class<? extends Connector>> CONNECTOR_EXCLUDES = Arrays.asList(
             VerifiableSourceConnector.class, VerifiableSinkConnector.class,
             MockConnector.class, MockSourceConnector.class, MockSinkConnector.class,
             SchemaSourceConnector.class
     );
 
+    @SuppressWarnings("rawtypes")
+    static final List<Class<? extends Transformation>> TRANSFORM_EXCLUDES = Collections.singletonList(
+            PredicatedTransformation.class
+    );
+
     public ConnectorPluginsResource(Herder herder) {
         this.herder = herder;
         this.connectorPlugins = new ArrayList<>();
+
+        // TODO: improve once plugins are allowed to be added/removed during runtime.
+        for (PluginDesc<SinkConnector> plugin : herder.plugins().sinkConnectors()) {
+            if (!CONNECTOR_EXCLUDES.contains(plugin.pluginClass())) {
+                connectorPlugins.add(new ConnectorPluginInfo(plugin));
+            }
+        }
+        for (PluginDesc<SourceConnector> plugin : herder.plugins().sourceConnectors()) {
+            if (!CONNECTOR_EXCLUDES.contains(plugin.pluginClass())) {
+                connectorPlugins.add(new ConnectorPluginInfo(plugin));
+            }
+        }
+        for (PluginDesc<Transformation<?>> transform : herder.plugins().transformations()) {
+            if (!TRANSFORM_EXCLUDES.contains(transform.pluginClass())) {
+                connectorPlugins.add(new ConnectorPluginInfo(transform));
+            }
+        }
+        for (PluginDesc<Predicate<?>> predicate : herder.plugins().predicates()) {
+            connectorPlugins.add(new ConnectorPluginInfo(predicate));
+        }
+        for (PluginDesc<Converter> converter : herder.plugins().converters()) {
+            connectorPlugins.add(new ConnectorPluginInfo(converter));
+        }
+        for (PluginDesc<HeaderConverter> headerConverter : herder.plugins().headerConverters()) {
+            connectorPlugins.add(new ConnectorPluginInfo(headerConverter));
+        }

Review comment:
       Now that we have separate `Plugins::sinkConnectors` and `Plugins::sourceConnectors` methods, we can abstract this a little, which should improve readability a bit and make it easier to extend for other plugin types in the future:
   ```suggestion
       static final List<Class<? extends SinkConnector>> SINK_CONNECTOR_EXCLUDES = Arrays.asList(
               VerifiableSinkConnector.class, MockSinkConnector.class
       );
   
       static final List<Class<? extends SourceConnector>> SOURCE_CONNECTOR_EXCLUDES = Arrays.asList(
               VerifiableSourceConnector.class, MockSourceConnector.class, SchemaSourceConnector.class
       );
   
       @SuppressWarnings({"unchecked", "rawtypes"})
       static final List<Class<? extends Transformation<?>>> TRANSFORM_EXCLUDES = Collections.singletonList(
               (Class) PredicatedTransformation.class
       );
   
       public ConnectorPluginsResource(Herder herder) {
           this.herder = herder;
           this.connectorPlugins = new ArrayList<>();
   
           // TODO: improve once plugins are allowed to be added/removed during runtime.
           addConnectorPlugins(herder.plugins().sinkConnectors(), SINK_CONNECTOR_EXCLUDES);
           addConnectorPlugins(herder.plugins().sourceConnectors(), SOURCE_CONNECTOR_EXCLUDES);
           addConnectorPlugins(herder.plugins().transformations(), TRANSFORM_EXCLUDES);
           addConnectorPlugins(herder.plugins().predicates(), Collections.emptySet());
           addConnectorPlugins(herder.plugins().converters(), Collections.emptySet());
           addConnectorPlugins(herder.plugins().headerConverters(), Collections.emptySet());
       }
   
       private <T> void addConnectorPlugins(Collection<PluginDesc<T>> plugins, Collection<Class<? extends T>> excludes) {
           plugins.stream()
                   .filter(p -> !excludes.contains(p.pluginClass()))
                   .map(ConnectorPluginInfo::new)
                   .forEach(connectorPlugins::add);
   ```

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
##########
@@ -750,4 +755,41 @@ private String trace(Throwable t) {
         return keys;
     }
 
+    @Override
+    public List<ConfigKeyInfo> connectorPluginConfig(String pluginName) {
+        List<ConfigKeyInfo> results = new ArrayList<>();
+        ConfigDef configDefs;
+        try {
+            Plugins p = plugins();
+            Object plugin = p.newPlugin(pluginName);
+            PluginType pluginType = PluginType.from(plugin.getClass());
+            switch (pluginType) {
+                case SINK:
+                case SOURCE:
+                    configDefs = ((Connector) plugin).config();
+                    break;
+                case CONVERTER:
+                    configDefs = ((Converter) plugin).config();
+                    break;
+                case HEADER_CONVERTER:
+                    configDefs = ((HeaderConverter) plugin).config();
+                    break;
+                case TRANSFORMATION:
+                    configDefs = ((Transformation<?>) plugin).config();
+                    break;
+                case PREDICATE:
+                    configDefs = ((Predicate<?>) plugin).config();
+                    break;
+                default:
+                    throw new BadRequestException("Invalid plugin type " + pluginType + ". Valid types are sink, source, converter, header_converter, transformation, predicate.");
+            }
+        } catch (ClassNotFoundException cnfe) {
+            throw new BadRequestException("Unknown plugin " + pluginName + ".");

Review comment:
       Nit: a 404 here might make more sense
   ```suggestion
               throw new NotFoundException("Unknown plugin " + pluginName + ".");
   ```

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
##########
@@ -438,7 +457,8 @@ private PluginScanResult scanPluginPath(
     }
 
     private void addAllAliases() {
-        addAliases(connectors);
+        addAliases(sinkConnectors);
+        addAliases(sourceConnectors);

Review comment:
       I think we may want to add aliases for all connectors at once, in order to account for the `PluginUtils::isAliasUnique` check used to guarantee that it's safe to add an alias for a plugin. If (unlikely but possible) there's a source and a sink connector that have the same simple name, this would cause the alias for the source to shadow the alias for the sink. I guess that's not the worst thing but it's a deviation from current behavior and we probably don't want to let people notice that new behavior and start building against it with the expectation that it's supported and won't change later.

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
##########
@@ -901,6 +902,75 @@ public void testGenerateResultWithConfigValuesWithNoConfigKeysAndWithSomeErrors(
         assertInfoValue(infos, "config.extra2", "value.extra2", "error extra2");
     }
 
+    @Test
+    public void testConnectorPluginConfig() throws Exception {
+        AbstractHerder herder = partialMockBuilder(AbstractHerder.class)
+                .withConstructor(
+                        Worker.class,
+                        String.class,
+                        String.class,
+                        StatusBackingStore.class,
+                        ConfigBackingStore.class,
+                        ConnectorClientConfigOverridePolicy.class
+                )
+                .withArgs(worker, workerId, kafkaClusterId, statusStore, configStore, noneConnectorClientConfigOverridePolicy)
+                .addMockedMethod("generation")
+                .createMock();
+
+        EasyMock.expect(plugins.newPlugin(EasyMock.anyString())).andAnswer(() -> {
+            String name = (String) EasyMock.getCurrentArguments()[0];
+            switch (name) {
+                case "sink": return new SampleSinkConnector();
+                case "source": return new SampleSourceConnector();
+                case "converter": return new SampleConverterWithHeaders();
+                case "header-converter": return new SampleHeaderConverter();
+                case "predicate": return new SamplePredicate();
+                default: return new SampleTransformation<>();
+            }
+        }).anyTimes();
+        EasyMock.expect(herder.plugins()).andStubReturn(plugins);
+        replayAll();
+
+        List<ConfigKeyInfo> sinkConnectorConfigs = herder.connectorPluginConfig("sink");
+        assertNotNull(sinkConnectorConfigs);
+        assertEquals(new SampleSinkConnector().config().names().size(), sinkConnectorConfigs.size());
+
+        List<ConfigKeyInfo> sourceConnectorConfigs = herder.connectorPluginConfig("source");
+        assertNotNull(sourceConnectorConfigs);
+        assertEquals(new SampleSourceConnector().config().names().size(), sourceConnectorConfigs.size());
+
+        List<ConfigKeyInfo> converterConfigs = herder.connectorPluginConfig("converter");
+        assertNotNull(converterConfigs);
+        assertEquals(new SampleConverterWithHeaders().config().names().size(), converterConfigs.size());
+
+        List<ConfigKeyInfo> headerConverterConfigs = herder.connectorPluginConfig("header-converter");
+        assertNotNull(headerConverterConfigs);
+        assertEquals(new SampleHeaderConverter().config().names().size(), headerConverterConfigs.size());
+
+        List<ConfigKeyInfo> predicateConfigs = herder.connectorPluginConfig("predicate");
+        assertNotNull(predicateConfigs);
+        assertEquals(new SamplePredicate().config().names().size(), predicateConfigs.size());
+
+        List<ConfigKeyInfo> transformationConfigs = herder.connectorPluginConfig("transformation");
+        assertNotNull(transformationConfigs);
+        assertEquals(new SampleTransformation<>().config().names().size(), transformationConfigs.size());
+    }
+
+    @Test(expected = BadRequestException.class)
+    public void testGetConnectorConfigDefWithBadName() throws Exception {
+        String connName = "AnotherPlugin";
+        AbstractHerder herder = partialMockBuilder(AbstractHerder.class)
+                .withConstructor(Worker.class, String.class, String.class, StatusBackingStore.class, ConfigBackingStore.class,
+                        ConnectorClientConfigOverridePolicy.class)
+                .withArgs(worker, workerId, kafkaClusterId, statusStore, configStore, noneConnectorClientConfigOverridePolicy)
+                .addMockedMethod("generation")
+                .createMock();
+        EasyMock.expect(worker.getPlugins()).andStubReturn(plugins);
+        EasyMock.expect(plugins.newPlugin(anyString())).andThrow(new ClassNotFoundException());
+        replayAll();
+        herder.connectorPluginConfig(connName);
+    }
+

Review comment:
       Should we add a case for an unsupported plugin type, like a REST extension?

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
##########
@@ -81,38 +84,83 @@
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.anyBoolean;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(RestClient.class)
-@PowerMockIgnore("javax.management.*")
 public class ConnectorPluginsResourceTest {
 
-    private static Map<String, String> props;
-    private static Map<String, String> partialProps = new HashMap<>();
+    private static final Map<String, String> PROPS;
+    private static final Map<String, String> PARTIAL_PROPS = new HashMap<>();
     static {
-        partialProps.put("name", "test");
-        partialProps.put("test.string.config", "testString");
-        partialProps.put("test.int.config", "1");
-        partialProps.put("test.list.config", "a,b");
-
-        props = new HashMap<>(partialProps);
-        props.put("connector.class", ConnectorPluginsResourceTestConnector.class.getSimpleName());
-        props.put("plugin.path", "test.path");
+        PARTIAL_PROPS.put("name", "test");
+        PARTIAL_PROPS.put("test.string.config", "testString");
+        PARTIAL_PROPS.put("test.int.config", "1");
+        PARTIAL_PROPS.put("test.list.config", "a,b");
+
+        PROPS = new HashMap<>(PARTIAL_PROPS);
+        PROPS.put("connector.class", ConnectorPluginsResourceTestConnector.class.getSimpleName());
+        PROPS.put("plugin.path", null);

Review comment:
       Do we need this line at all now?

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
##########
@@ -339,44 +369,45 @@ public void testValidateConfigWithNonExistentName() {
         // simple name but different package.
         String customClassname = "com.custom.package."
             + ConnectorPluginsResourceTestConnector.class.getSimpleName();
-        assertThrows(BadRequestException.class, () -> connectorPluginsResource.validateConfigs(customClassname, props));
+        assertThrows(BadRequestException.class, () -> connectorPluginsResource.validateConfigs(customClassname, PROPS));
     }
 
     @Test
     public void testValidateConfigWithNonExistentAlias() {
-        assertThrows(BadRequestException.class, () -> connectorPluginsResource.validateConfigs("ConnectorPluginsTest", props));
+        assertThrows(BadRequestException.class, () -> connectorPluginsResource.validateConfigs("ConnectorPluginsTest", PROPS));
     }
 
     @Test
-    public void testListConnectorPlugins() throws Exception {
-        expectPlugins();
-        Set<ConnectorPluginInfo> connectorPlugins = new HashSet<>(connectorPluginsResource.listConnectorPlugins());
-        assertFalse(connectorPlugins.contains(newInfo(Connector.class, "0.0")));
-        assertFalse(connectorPlugins.contains(newInfo(SourceConnector.class, "0.0")));
-        assertFalse(connectorPlugins.contains(newInfo(SinkConnector.class, "0.0")));
-        assertFalse(connectorPlugins.contains(newInfo(VerifiableSourceConnector.class)));
-        assertFalse(connectorPlugins.contains(newInfo(VerifiableSinkConnector.class)));
-        assertFalse(connectorPlugins.contains(newInfo(MockSourceConnector.class)));
-        assertFalse(connectorPlugins.contains(newInfo(MockSinkConnector.class)));
-        assertFalse(connectorPlugins.contains(newInfo(MockConnector.class)));
-        assertFalse(connectorPlugins.contains(newInfo(SchemaSourceConnector.class)));
-        assertTrue(connectorPlugins.contains(newInfo(ConnectorPluginsResourceTestConnector.class)));
-        PowerMock.verifyAll();
+    public void testListConnectorPlugins() {
+        Set<ConnectorPluginInfo> connectorPlugins = new HashSet<>(connectorPluginsResource.listConnectorPlugins(true));
+
+        List<Set<MockConnectorPluginDesc<?>>> allConnectorPlugins = Arrays.asList(
+                SINK_CONNECTOR_PLUGINS,
+                SOURCE_CONNECTOR_PLUGINS);
+        for (Set<MockConnectorPluginDesc<?>> plugins : allConnectorPlugins) {
+            for (MockConnectorPluginDesc<?> plugin : plugins) {
+                boolean contained = connectorPlugins.contains(newInfo(plugin));
+                if ((plugin.type() != PluginType.SOURCE && plugin.type() != PluginType.SINK) ||
+                        (ConnectorPluginsResource.CONNECTOR_EXCLUDES.contains(plugin.pluginClass()))) {
+                    assertFalse(contained);
+                } else {
+                    assertTrue(contained);
+                }
+            }
+        }
+        verify(herder, atLeastOnce()).plugins();

Review comment:
       It'd be more powerful to do an assertion on the complete set of returned plugins, since that will only require one test run to discover all differences between the expected plugins and the actual ones:
   ```suggestion
           Set<Class<?>> excludes = Stream.of(ConnectorPluginsResource.SINK_CONNECTOR_EXCLUDES, ConnectorPluginsResource.SOURCE_CONNECTOR_EXCLUDES)
                   .flatMap(Collection::stream)
                   .collect(Collectors.toSet());
           Set<ConnectorPluginInfo> expectedConnectorPlugins = Stream.of(SINK_CONNECTOR_PLUGINS, SOURCE_CONNECTOR_PLUGINS)
                   .flatMap(Collection::stream)
                   .filter(p -> !excludes.contains(p.pluginClass()))
                   .map(ConnectorPluginsResourceTest::newInfo)
                   .collect(Collectors.toSet());
   
           Set<ConnectorPluginInfo> actualConnectorPlugins = new HashSet<>(connectorPluginsResource.listConnectorPlugins(true));
   
           assertEquals(expectedConnectorPlugins, actualConnectorPlugins);
   
           verify(herder, atLeastOnce()).plugins();
   ```
   (This assumes we split out `CONNECTOR_EXCLUDES`, but the same general strategy should apply even if we don't).

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/tools/VerifiableSinkConnector.java
##########
@@ -29,7 +29,7 @@
 /**
  * @see VerifiableSinkTask
  */
-public class VerifiableSinkConnector extends SourceConnector {
+public class VerifiableSinkConnector extends SinkConnector {

Review comment:
       Ha, nice catch!

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
##########
@@ -901,6 +902,75 @@ public void testGenerateResultWithConfigValuesWithNoConfigKeysAndWithSomeErrors(
         assertInfoValue(infos, "config.extra2", "value.extra2", "error extra2");
     }
 
+    @Test
+    public void testConnectorPluginConfig() throws Exception {
+        AbstractHerder herder = partialMockBuilder(AbstractHerder.class)
+                .withConstructor(
+                        Worker.class,
+                        String.class,
+                        String.class,
+                        StatusBackingStore.class,
+                        ConfigBackingStore.class,
+                        ConnectorClientConfigOverridePolicy.class
+                )
+                .withArgs(worker, workerId, kafkaClusterId, statusStore, configStore, noneConnectorClientConfigOverridePolicy)
+                .addMockedMethod("generation")
+                .createMock();
+
+        EasyMock.expect(plugins.newPlugin(EasyMock.anyString())).andAnswer(() -> {
+            String name = (String) EasyMock.getCurrentArguments()[0];
+            switch (name) {
+                case "sink": return new SampleSinkConnector();
+                case "source": return new SampleSourceConnector();
+                case "converter": return new SampleConverterWithHeaders();
+                case "header-converter": return new SampleHeaderConverter();
+                case "predicate": return new SamplePredicate();
+                default: return new SampleTransformation<>();
+            }
+        }).anyTimes();
+        EasyMock.expect(herder.plugins()).andStubReturn(plugins);
+        replayAll();
+
+        List<ConfigKeyInfo> sinkConnectorConfigs = herder.connectorPluginConfig("sink");
+        assertNotNull(sinkConnectorConfigs);
+        assertEquals(new SampleSinkConnector().config().names().size(), sinkConnectorConfigs.size());
+
+        List<ConfigKeyInfo> sourceConnectorConfigs = herder.connectorPluginConfig("source");
+        assertNotNull(sourceConnectorConfigs);
+        assertEquals(new SampleSourceConnector().config().names().size(), sourceConnectorConfigs.size());
+
+        List<ConfigKeyInfo> converterConfigs = herder.connectorPluginConfig("converter");
+        assertNotNull(converterConfigs);
+        assertEquals(new SampleConverterWithHeaders().config().names().size(), converterConfigs.size());
+
+        List<ConfigKeyInfo> headerConverterConfigs = herder.connectorPluginConfig("header-converter");
+        assertNotNull(headerConverterConfigs);
+        assertEquals(new SampleHeaderConverter().config().names().size(), headerConverterConfigs.size());
+
+        List<ConfigKeyInfo> predicateConfigs = herder.connectorPluginConfig("predicate");
+        assertNotNull(predicateConfigs);
+        assertEquals(new SamplePredicate().config().names().size(), predicateConfigs.size());
+
+        List<ConfigKeyInfo> transformationConfigs = herder.connectorPluginConfig("transformation");
+        assertNotNull(transformationConfigs);
+        assertEquals(new SampleTransformation<>().config().names().size(), transformationConfigs.size());
+    }
+
+    @Test(expected = BadRequestException.class)

Review comment:
       (If we choose to send a 404 when a plugin isn't found instead of a 400)
   ```suggestion
       @Test(expected = NotFoundException.class)
   ```
   (Will also require adding an import for the `NotFoundException` class)

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
##########
@@ -144,70 +192,63 @@
 
         CONFIG_INFOS = new ConfigInfos(ConnectorPluginsResourceTestConnector.class.getName(), ERROR_COUNT, Collections.singletonList("Test"), configs);
         PARTIAL_CONFIG_INFOS = new ConfigInfos(ConnectorPluginsResourceTestConnector.class.getName(), PARTIAL_CONFIG_ERROR_COUNT, Collections.singletonList("Test"), partialConfigs);
+    }
 
-        List<Class<? extends Connector>> abstractConnectorClasses = asList(
-            Connector.class,
-            SourceConnector.class,
-            SinkConnector.class
-        );
-
-        List<Class<? extends Connector>> connectorClasses = asList(
-            VerifiableSourceConnector.class,
-            VerifiableSinkConnector.class,
-            MockSourceConnector.class,
-            MockSinkConnector.class,
-            MockConnector.class,
-            SchemaSourceConnector.class,
-            ConnectorPluginsResourceTestConnector.class
-        );
+    private final Herder herder = mock(DistributedHerder.class);
+    private final Plugins plugins = mock(Plugins.class);
+    private ConnectorPluginsResource connectorPluginsResource;
 
+    @SuppressWarnings("rawtypes")
+    @Before
+    public void setUp() throws Exception {
         try {
-            for (Class<? extends Connector> klass : abstractConnectorClasses) {
-                MockConnectorPluginDesc pluginDesc = new MockConnectorPluginDesc(klass, "0.0.0");
-                CONNECTOR_PLUGINS.add(pluginDesc);
+            for (Class<? extends SinkConnector> klass : sinkConnectorClasses) {
+                MockConnectorPluginDesc<? extends SinkConnector> pluginDesc = new MockConnectorPluginDesc<>(klass);
+                SINK_CONNECTOR_PLUGINS.add(pluginDesc);

Review comment:
       This field (`SINK_CONNECTOR_PLUGINS`) and others like it are `static`, but this method is executed once per test. Should we populate them once in a `static` block instead of re-populating them every time we run a test? I know it's unlikely to have a performance impact but it might make the tests easier to read.
   
   Also, if we use a static initializer block, we could wrap `SINK_CONNECTOR_PLUGINS` (and other similar fields) with [`Collections::unmodifiableList`](https://docs.oracle.com/javase/8/docs/api/java/util/Collections.html#unmodifiableList-java.util.List-) in order to prevent accidental modifications of them across test cases.

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
##########
@@ -144,70 +192,63 @@
 
         CONFIG_INFOS = new ConfigInfos(ConnectorPluginsResourceTestConnector.class.getName(), ERROR_COUNT, Collections.singletonList("Test"), configs);
         PARTIAL_CONFIG_INFOS = new ConfigInfos(ConnectorPluginsResourceTestConnector.class.getName(), PARTIAL_CONFIG_ERROR_COUNT, Collections.singletonList("Test"), partialConfigs);
+    }
 
-        List<Class<? extends Connector>> abstractConnectorClasses = asList(
-            Connector.class,
-            SourceConnector.class,
-            SinkConnector.class
-        );
-
-        List<Class<? extends Connector>> connectorClasses = asList(
-            VerifiableSourceConnector.class,
-            VerifiableSinkConnector.class,
-            MockSourceConnector.class,
-            MockSinkConnector.class,
-            MockConnector.class,
-            SchemaSourceConnector.class,
-            ConnectorPluginsResourceTestConnector.class
-        );
+    private final Herder herder = mock(DistributedHerder.class);
+    private final Plugins plugins = mock(Plugins.class);
+    private ConnectorPluginsResource connectorPluginsResource;
 
+    @SuppressWarnings("rawtypes")
+    @Before
+    public void setUp() throws Exception {
         try {
-            for (Class<? extends Connector> klass : abstractConnectorClasses) {
-                MockConnectorPluginDesc pluginDesc = new MockConnectorPluginDesc(klass, "0.0.0");
-                CONNECTOR_PLUGINS.add(pluginDesc);
+            for (Class<? extends SinkConnector> klass : sinkConnectorClasses) {
+                MockConnectorPluginDesc<? extends SinkConnector> pluginDesc = new MockConnectorPluginDesc<>(klass);
+                SINK_CONNECTOR_PLUGINS.add(pluginDesc);
+            }
+            for (Class<? extends SourceConnector> klass : sourceConnectorClasses) {
+                MockConnectorPluginDesc<? extends SourceConnector> pluginDesc = new MockConnectorPluginDesc<>(klass);
+                SOURCE_CONNECTOR_PLUGINS.add(pluginDesc);
+            }
+            for (Class<? extends Converter> klass : converterClasses) {
+                MockConnectorPluginDesc<? extends Converter> pluginDesc = new MockConnectorPluginDesc<>(klass);
+                CONVERTER_PLUGINS.add(pluginDesc);
+            }
+            for (Class<? extends HeaderConverter> klass : headerConverterClasses) {
+                MockConnectorPluginDesc<? extends HeaderConverter> pluginDesc = new MockConnectorPluginDesc<>(klass);
+                HEADER_CONVERTER_PLUGINS.add(pluginDesc);
+            }
+            for (Class<? extends Transformation> klass : transformationClasses) {
+                MockConnectorPluginDesc<? extends Transformation> pluginDesc = new MockConnectorPluginDesc<>(klass);
+                TRANSFORMATION_PLUGINS.add(pluginDesc);
             }
-            for (Class<? extends Connector> klass : connectorClasses) {
-                MockConnectorPluginDesc pluginDesc = new MockConnectorPluginDesc(klass);
-                CONNECTOR_PLUGINS.add(pluginDesc);
+            for (Class<? extends Predicate> klass : predicateClasses) {
+                MockConnectorPluginDesc<? extends Predicate> pluginDesc = new MockConnectorPluginDesc<>(klass);
+                PREDICATE_PLUGINS.add(pluginDesc);
             }
         } catch (Exception e) {
             throw new RuntimeException(e);
         }
-    }
-
-    @Mock
-    private Herder herder;
-    @Mock
-    private Plugins plugins;
-    private ConnectorPluginsResource connectorPluginsResource;
-
-    @Before
-    public void setUp() throws Exception {
-        PowerMock.mockStatic(RestClient.class,
-                RestClient.class.getMethod("httpRequest", String.class, String.class, HttpHeaders.class, Object.class, TypeReference.class, WorkerConfig.class));

Review comment:
       Good catch, thanks for cleaning this up!

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
##########
@@ -399,19 +430,59 @@ public void testConnectorPluginsIncludesTypeAndVersionInformation() throws Excep
         );
     }
 
-    protected static ConnectorPluginInfo newInfo(Class<? extends Connector> klass, String version) {
-        return new ConnectorPluginInfo(new MockConnectorPluginDesc(klass, version));
+    @Test
+    public void testListAllPlugins() {
+        Set<ConnectorPluginInfo> connectorPlugins = new HashSet<>(connectorPluginsResource.listConnectorPlugins(false));
+
+        List<Set<MockConnectorPluginDesc<?>>> allPlugins = Arrays.asList(
+                SINK_CONNECTOR_PLUGINS,
+                SOURCE_CONNECTOR_PLUGINS,
+                CONVERTER_PLUGINS,
+                HEADER_CONVERTER_PLUGINS,
+                TRANSFORMATION_PLUGINS,
+                PREDICATE_PLUGINS);
+        for (Set<MockConnectorPluginDesc<?>> plugins : allPlugins) {
+            for (MockConnectorPluginDesc<?> plugin : plugins) {
+                boolean contained = connectorPlugins.contains(newInfo(plugin));
+                if ((ConnectorPluginsResource.CONNECTOR_EXCLUDES.contains(plugin.pluginClass())) ||
+                        (ConnectorPluginsResource.TRANSFORM_EXCLUDES.contains(plugin.pluginClass()))) {
+                    assertFalse(contained);
+                } else {
+                    assertTrue(contained);
+                }
+            }
+            verify(herder, atLeastOnce()).plugins();
+        }

Review comment:
       Same thought w/r/t performing assertions on the complete set of returned plugins:
   ```suggestion
           Set<Class<?>> excludes = Stream.of(
                       ConnectorPluginsResource.SINK_CONNECTOR_EXCLUDES,
                       ConnectorPluginsResource.SOURCE_CONNECTOR_EXCLUDES,
                       ConnectorPluginsResource.TRANSFORM_EXCLUDES
                   ).flatMap(Collection::stream)
                   .collect(Collectors.toSet());
           Set<ConnectorPluginInfo> expectedConnectorPlugins = Stream.of(
                       SINK_CONNECTOR_PLUGINS,
                       SOURCE_CONNECTOR_PLUGINS,
                       CONVERTER_PLUGINS,
                       HEADER_CONVERTER_PLUGINS,
                       TRANSFORMATION_PLUGINS,
                       PREDICATE_PLUGINS
                   ).flatMap(Collection::stream)
                   .filter(p -> !excludes.contains(p.pluginClass()))
                   .map(ConnectorPluginsResourceTest::newInfo)
                   .collect(Collectors.toSet());
   
           Set<ConnectorPluginInfo> actualConnectorPlugins = new HashSet<>(connectorPluginsResource.listConnectorPlugins(false));
   
           assertEquals(expectedConnectorPlugins, actualConnectorPlugins);
   
           verify(herder, atLeastOnce()).plugins();
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mimaison commented on pull request #11572: KAFKA-13510: Connect APIs to list all connector plugins and retrieve …

Posted by GitBox <gi...@apache.org>.
mimaison commented on pull request #11572:
URL: https://github.com/apache/kafka/pull/11572#issuecomment-1054536303


   Thanks @C0urante for the review! Again you made some very good suggestions. I've pushed an update.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mimaison merged pull request #11572: KAFKA-13510: Connect APIs to list all connector plugins and retrieve …

Posted by GitBox <gi...@apache.org>.
mimaison merged pull request #11572:
URL: https://github.com/apache/kafka/pull/11572


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on a change in pull request #11572: KAFKA-13510: Connect APIs to list all connector plugins and retrieve …

Posted by GitBox <gi...@apache.org>.
C0urante commented on a change in pull request #11572:
URL: https://github.com/apache/kafka/pull/11572#discussion_r814955840



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorPluginInfo.java
##########
@@ -79,11 +82,21 @@ public int hashCode() {
 
     @Override
     public String toString() {
-        final StringBuilder sb = new StringBuilder("ConnectorPluginInfo{");
-        sb.append("className='").append(className).append('\'');
-        sb.append(", type=").append(type);
-        sb.append(", version='").append(version).append('\'');
-        sb.append('}');
-        return sb.toString();
+        return "ConnectorPluginInfo{" + "className='" + className + '\'' +
+                ", type=" + type.toString() +
+                ", version='" + version + '\'' +
+                '}';
+    }
+
+    public static final class NoVersionFilter {
+        @Override
+        public boolean equals(Object obj) {
+            return DelegatingClassLoader.UNDEFINED_VERSION.equals(obj);
+        }
+
+        @Override
+        public int hashCode() {
+            return super.hashCode();
+        }

Review comment:
       Ah yeah, compiler warning about overriding `equals` but not `hashCode`. Think we should leave a comment about that just so others don't wonder the same thing?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mimaison commented on a change in pull request #11572: KAFKA-13510: Connect APIs to list all connector plugins and retrieve …

Posted by GitBox <gi...@apache.org>.
mimaison commented on a change in pull request #11572:
URL: https://github.com/apache/kafka/pull/11572#discussion_r814995987



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorPluginInfo.java
##########
@@ -79,11 +82,21 @@ public int hashCode() {
 
     @Override
     public String toString() {
-        final StringBuilder sb = new StringBuilder("ConnectorPluginInfo{");
-        sb.append("className='").append(className).append('\'');
-        sb.append(", type=").append(type);
-        sb.append(", version='").append(version).append('\'');
-        sb.append('}');
-        return sb.toString();
+        return "ConnectorPluginInfo{" + "className='" + className + '\'' +
+                ", type=" + type.toString() +
+                ", version='" + version + '\'' +
+                '}';
+    }
+
+    public static final class NoVersionFilter {
+        @Override
+        public boolean equals(Object obj) {
+            return DelegatingClassLoader.UNDEFINED_VERSION.equals(obj);
+        }
+
+        @Override
+        public int hashCode() {
+            return super.hashCode();
+        }

Review comment:
       Yes that's a good idea

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorPluginInfo.java
##########
@@ -79,11 +82,21 @@ public int hashCode() {
 
     @Override
     public String toString() {
-        final StringBuilder sb = new StringBuilder("ConnectorPluginInfo{");
-        sb.append("className='").append(className).append('\'');
-        sb.append(", type=").append(type);
-        sb.append(", version='").append(version).append('\'');
-        sb.append('}');
-        return sb.toString();
+        return "ConnectorPluginInfo{" + "className='" + className + '\'' +
+                ", type=" + type.toString() +
+                ", version='" + version + '\'' +
+                '}';
+    }
+
+    public static final class NoVersionFilter {
+        @Override
+        public boolean equals(Object obj) {
+            return DelegatingClassLoader.UNDEFINED_VERSION.equals(obj);
+        }
+
+        @Override
+        public int hashCode() {
+            return super.hashCode();
+        }

Review comment:
       Yes that's a good idea, done!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on a change in pull request #11572: KAFKA-13510: Connect APIs to list all connector plugins and retrieve …

Posted by GitBox <gi...@apache.org>.
C0urante commented on a change in pull request #11572:
URL: https://github.com/apache/kafka/pull/11572#discussion_r814358703



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
##########
@@ -168,6 +173,44 @@ public Connector newConnector(String connectorClassOrAlias) {
         return newPlugin(klass);
     }
 
+    public Converter newConverter(String className) throws ClassNotFoundException {

Review comment:
       Nit: Might be nice to have (brief) Javadocs on these methods. I was thinking "don't we already use the `Plugins` class to instantiate converters? Why do we need another?". After taking a look it's clear now that this is for just instantiating a converter in order to grab its `ConfigDef`, and the other methods (like `Plugins.newConverter(AbstractConfig config, String classPropertyName, ClassLoaderUsage classLoaderUsage)`) are for instantiating and configuring converters for use with tasks.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java
##########
@@ -56,16 +69,49 @@
     private static final String ALIAS_SUFFIX = "Connector";
     private final Herder herder;
     private final List<ConnectorPluginInfo> connectorPlugins;
+    private final Map<String, PluginType> pluginsByType;
 
-    private static final List<Class<? extends Connector>> CONNECTOR_EXCLUDES = Arrays.asList(
+    static final List<Class<? extends Connector>> CONNECTOR_EXCLUDES = Arrays.asList(
             VerifiableSourceConnector.class, VerifiableSinkConnector.class,
             MockConnector.class, MockSourceConnector.class, MockSinkConnector.class,
             SchemaSourceConnector.class
     );
 
+    @SuppressWarnings("rawtypes")
+    static final List<Class<? extends Transformation>> TRANSFORM_EXCLUDES = Arrays.asList(
+            PredicatedTransformation.class
+    );
+
     public ConnectorPluginsResource(Herder herder) {
         this.herder = herder;
         this.connectorPlugins = new ArrayList<>();
+        this.pluginsByType = new HashMap<>();
+
+        // TODO: improve once plugins are allowed to be added/removed during runtime.
+        for (PluginDesc<Connector> plugin : herder.plugins().connectors()) {
+            if (!CONNECTOR_EXCLUDES.contains(plugin.pluginClass())) {
+                connectorPlugins.add(new ConnectorPluginInfo(plugin, PluginType.from(plugin.pluginClass())));
+                pluginsByType.put(getAlias(plugin.className()), PluginType.from(plugin.pluginClass()));
+            }
+        }
+        for (PluginDesc<Transformation<?>> transform : herder.plugins().transformations()) {
+            if (!TRANSFORM_EXCLUDES.contains(transform.pluginClass())) {
+                connectorPlugins.add(new ConnectorPluginInfo(transform, PluginType.TRANSFORMATION));
+                pluginsByType.put(getAlias(transform.className()), PluginType.TRANSFORMATION);
+            }
+        }
+        for (PluginDesc<Predicate<?>> predicate : herder.plugins().predicates()) {
+            connectorPlugins.add(new ConnectorPluginInfo(predicate, PluginType.PREDICATE));
+            pluginsByType.put(getAlias(predicate.className()), PluginType.PREDICATE);
+        }
+        for (PluginDesc<Converter> converter : herder.plugins().converters()) {
+            connectorPlugins.add(new ConnectorPluginInfo(converter, PluginType.CONVERTER));
+            pluginsByType.put(getAlias(converter.className()), PluginType.CONVERTER);
+        }
+        for (PluginDesc<HeaderConverter> headerConverter : herder.plugins().headerConverters()) {
+            connectorPlugins.add(new ConnectorPluginInfo(headerConverter, PluginType.HEADER_CONVERTER));
+            pluginsByType.put(getAlias(headerConverter.className()), PluginType.HEADER_CONVERTER);
+        }

Review comment:
       Also, could we abstract and simplify the logic for populating `connectorPlugins`? It looks like the only wrinkle here preventing us from writing a generic `private void addConnectorPlugins(Collection<PluginDesc<?>>, PluginType pluginType)` method and then invoking that once each for sink connectors, source connectors, converters, etc. is that the `Plugins` class doesn't expose source and sink connectors separately.
   
   Considering that `Plugins::connectors` is only used (excluding test code) in one place--this class--could we refactor that into two separate `Plugins::sinkConnectors` and `Plugins::sourceConnectors` methods?

##########
File path: connect/api/src/main/java/org/apache/kafka/connect/storage/Converter.java
##########
@@ -86,4 +87,12 @@
     default SchemaAndValue toConnectData(String topic, Headers headers, byte[] value) {
         return toConnectData(topic, value);
     }
+
+    /**
+     * Configuration specification for this set of converters.

Review comment:
       I know this matches the Javadoc from the KIP but I'm wondering why we're using "converters" plural here? Would it be more clear to say "for this converter" instead?

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
##########
@@ -613,7 +614,7 @@ public static ConfigInfos generateResult(String connType, Map<String, ConfigKey>
         return new ConfigInfos(connType, errorCount, groups, configInfoList);
     }
 
-    private static ConfigKeyInfo convertConfigKey(ConfigKey configKey) {
+    public static ConfigKeyInfo convertConfigKey(ConfigKey configKey) {

Review comment:
       Nit: kind of strange that this method (and honestly a lot of the config wrangling logic) is contained in the `AbstractHerder` class. Might be worth refactoring into a separate config utils class/package. Probably best to leave for a separate PR, but if you agree that that'd be cleaner I could see about doing that work as a follow-up.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
##########
@@ -168,6 +173,44 @@ public Connector newConnector(String connectorClassOrAlias) {
         return newPlugin(klass);
     }
 
+    public Converter newConverter(String className) throws ClassNotFoundException {
+        Class<? extends Converter> klass = pluginClass(
+                delegatingLoader,
+                className,
+                Converter.class
+        );
+        return newPlugin(klass);
+    }
+
+    public HeaderConverter newHeaderConverter(String className) throws ClassNotFoundException {
+        Class<? extends HeaderConverter> klass = pluginClass(
+                delegatingLoader,
+                className,
+                HeaderConverter.class
+        );
+        return newPlugin(klass);
+    }
+
+    @SuppressWarnings("rawtypes")
+    public Predicate newPredicate(String className) throws ClassNotFoundException {

Review comment:
       This should match the type-safe declaration for `newTransformation` IMO:
   
   ```suggestion
       @SuppressWarnings({"unchecked", "rawtypes"})
       public <R extends ConnectRecord<R>> Predicate<R> newPredicate(String className) throws ClassNotFoundException {
   ```
   
   I think this is due to testing woes with the `AbstractHerderTest::testConnectorPluginConfig` test case; I've left a suggestion in that section for how to keep the type safety here with a small change there.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorPluginInfo.java
##########
@@ -79,11 +82,21 @@ public int hashCode() {
 
     @Override
     public String toString() {
-        final StringBuilder sb = new StringBuilder("ConnectorPluginInfo{");
-        sb.append("className='").append(className).append('\'');
-        sb.append(", type=").append(type);
-        sb.append(", version='").append(version).append('\'');
-        sb.append('}');
-        return sb.toString();
+        return "ConnectorPluginInfo{" + "className='" + className + '\'' +
+                ", type=" + type.toString() +
+                ", version='" + version + '\'' +
+                '}';
+    }
+
+    public static final class NoVersionFilter {
+        @Override
+        public boolean equals(Object obj) {
+            return DelegatingClassLoader.UNDEFINED_VERSION.equals(obj);
+        }
+
+        @Override
+        public int hashCode() {
+            return super.hashCode();
+        }

Review comment:
       Do we need this method declaration at all? Looks like it might have been left in accidentally while trying to wrestle with SpotBugs?
   ```suggestion
   ```

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
##########
@@ -750,4 +751,38 @@ private String trace(Throwable t) {
         return keys;
     }
 
+    @Override

Review comment:
       (Left thoughts in `ConnectorPluginsResource` about possible ways we could refactor this to reduce the workload on the REST layer to handle some of the classloading logic)

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
##########
@@ -901,6 +902,48 @@ public void testGenerateResultWithConfigValuesWithNoConfigKeysAndWithSomeErrors(
         assertInfoValue(infos, "config.extra2", "value.extra2", "error extra2");
     }
 
+    @Test
+    public void testConnectorPluginConfig() throws Exception {
+        AbstractHerder herder = partialMockBuilder(AbstractHerder.class)
+                .withConstructor(
+                        Worker.class,
+                        String.class,
+                        String.class,
+                        StatusBackingStore.class,
+                        ConfigBackingStore.class,
+                        ConnectorClientConfigOverridePolicy.class
+                )
+                .withArgs(worker, workerId, kafkaClusterId, statusStore, configStore, noneConnectorClientConfigOverridePolicy)
+                .addMockedMethod("generation")
+                .createMock();
+
+        EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(new SampleSourceConnector()).anyTimes();
+        EasyMock.expect(plugins.newConverter(EasyMock.anyString())).andReturn(new SampleConverterWithHeaders()).anyTimes();
+        EasyMock.expect(plugins.newHeaderConverter(EasyMock.anyString())).andReturn(new SampleHeaderConverter()).anyTimes();
+        EasyMock.expect(plugins.newPredicate(EasyMock.anyString())).andReturn(new SamplePredicate()).anyTimes();

Review comment:
       If we want to keep the type-safe signature for `Plugins::newPredicate`:
   ```suggestion
           EasyMock.expect(plugins.<SourceRecord>newPredicate(EasyMock.anyString())).andReturn(new SamplePredicate()).anyTimes();
   ```
   
   Note that this also requires adding an import for `org.apache.kafka.connect.source.SourceRecord` to this test class.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java
##########
@@ -123,4 +171,15 @@ private String normalizedPluginName(String pluginName) {
             ? pluginName.substring(0, pluginName.length() - ALIAS_SUFFIX.length())
             : pluginName;
     }
+
+    String getAlias(String name) {
+        name = normalizedPluginName(name);
+        int lastIndexOf = name.lastIndexOf('.');
+        return lastIndexOf >= 0 ? name.substring(lastIndexOf + 1) : name;
+    }
+
+    private synchronized List<ConfigKeyInfo> doGetConfigDef(final String pluginName) {
+        PluginType pluginType = pluginsByType.getOrDefault(getAlias(pluginName), PluginType.UNKNOWN);

Review comment:
       Also, it seems like using a default of `PluginType.UNKNOWN` here might be suboptimal. If someone wants to the view the config for a REST extension, for example, they'll end up seeing an error message later on (in `AbstractHerder::connectorPluginConfig`) that says something like "Invalid plugin type unknown. Valid types are..."
   
   I think it'd be clearer to users if we could differentiate between these two cases:
   1. User requests config for a plugin that does exist on the worker, but which we don't expose config information via the REST API for (such as a REST extension or a config provider)
   2. User requests config for a plugin that doesn't exist on the worker
   
   Status-wise, In the case of 1, a 400 response probably makes sense, but for 2, a 404 response might be more applicable.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java
##########
@@ -56,16 +69,49 @@
     private static final String ALIAS_SUFFIX = "Connector";
     private final Herder herder;
     private final List<ConnectorPluginInfo> connectorPlugins;
+    private final Map<String, PluginType> pluginsByType;
 
-    private static final List<Class<? extends Connector>> CONNECTOR_EXCLUDES = Arrays.asList(
+    static final List<Class<? extends Connector>> CONNECTOR_EXCLUDES = Arrays.asList(
             VerifiableSourceConnector.class, VerifiableSinkConnector.class,
             MockConnector.class, MockSourceConnector.class, MockSinkConnector.class,
             SchemaSourceConnector.class
     );
 
+    @SuppressWarnings("rawtypes")
+    static final List<Class<? extends Transformation>> TRANSFORM_EXCLUDES = Arrays.asList(
+            PredicatedTransformation.class
+    );
+
     public ConnectorPluginsResource(Herder herder) {
         this.herder = herder;
         this.connectorPlugins = new ArrayList<>();
+        this.pluginsByType = new HashMap<>();
+
+        // TODO: improve once plugins are allowed to be added/removed during runtime.
+        for (PluginDesc<Connector> plugin : herder.plugins().connectors()) {
+            if (!CONNECTOR_EXCLUDES.contains(plugin.pluginClass())) {
+                connectorPlugins.add(new ConnectorPluginInfo(plugin, PluginType.from(plugin.pluginClass())));
+                pluginsByType.put(getAlias(plugin.className()), PluginType.from(plugin.pluginClass()));
+            }
+        }
+        for (PluginDesc<Transformation<?>> transform : herder.plugins().transformations()) {
+            if (!TRANSFORM_EXCLUDES.contains(transform.pluginClass())) {
+                connectorPlugins.add(new ConnectorPluginInfo(transform, PluginType.TRANSFORMATION));
+                pluginsByType.put(getAlias(transform.className()), PluginType.TRANSFORMATION);
+            }
+        }
+        for (PluginDesc<Predicate<?>> predicate : herder.plugins().predicates()) {
+            connectorPlugins.add(new ConnectorPluginInfo(predicate, PluginType.PREDICATE));
+            pluginsByType.put(getAlias(predicate.className()), PluginType.PREDICATE);
+        }
+        for (PluginDesc<Converter> converter : herder.plugins().converters()) {
+            connectorPlugins.add(new ConnectorPluginInfo(converter, PluginType.CONVERTER));
+            pluginsByType.put(getAlias(converter.className()), PluginType.CONVERTER);
+        }
+        for (PluginDesc<HeaderConverter> headerConverter : herder.plugins().headerConverters()) {
+            connectorPlugins.add(new ConnectorPluginInfo(headerConverter, PluginType.HEADER_CONVERTER));
+            pluginsByType.put(getAlias(headerConverter.className()), PluginType.HEADER_CONVERTER);
+        }

Review comment:
       It seems like we're duplicating some of the logic contained in `Plugins` into this class by tracking class alias names and pre-computing plugin type based on them.
   
   Did you consider a `Herder` method that only accepted the name of the plugin, and took on the responsibility of deducing the plugin type itself?
   ```java
   List<ConfigKeyInfo> connectorPluginConfig(String pluginName);
   ```
   
   In `AbstractHerder`, we could do something like this:
   ```java
       @Override
       public List<ConfigKeyInfo> connectorPluginConfig(String pluginName) {
           try {
               Object plugin = Plugins.newPlugin(pluginName);
               PluginType pluginType = PluginType.from(plugin.class);
               List<ConfigKeyInfo> results = new ArrayList<>();
               ConfigDef configDefs;
               switch (pluginType) {
                   case SINK:
                   case SOURCE:
                       configDefs = ((Connector) plugin).config();
                       break;
                   case CONVERTER:
                       configDefs = ((Converter) plugin).config();
                       break;
               // ... Rest of switch statement follows same pattern, and rest of the method remains unchanged
       }
   ```
   
   And in `Plugins` we could do this:
   ```java
       public Object newPlugin(String classOrAlias) throws ClassNotFoundException {
           Class<? extends Object> klass = pluginClass(delegatingLoader, classOrAlias, Object.class);
           return newPlugin(klass);
       }
   ```
   
   
   Or alternatively, we could introduce a common interface for plugins that expose a `ConfigDef`:
   ```java
   interface DefinedConfigPlugin {
       ConfigDef config();
   }
   ```
   (this could be kept package-private so as not to quality as public interface)
   
   And we could simplify some of the `AbstractHerder` logic:
   ```java
       @Override
       public List<ConfigKeyInfo> connectorPluginConfig(String pluginName) {
           try {
               DefinedConfigPlugin plugin = Plugins.newDefinedConfigPlugin(pluginName);
               ConfigDef configDefs = plugin.config();
               // No switch statement on plugin type necessary
               // ... Rest of the method  remains unchanged
       }
   ```
   
   And `Plugins` would still be fairly simple:
   ```java
       public DefinedConfigPlugin newDefinedConfigPlugin(String classOrAlias) throws ClassNotFoundException {
           Class<? extends DefinedConfigPlugin> klass = pluginClass(delegatingLoader, classOrAlias, DefinedConfigPlugin.class);
           return newPlugin(klass);
       }
   ```
   

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorPluginInfo.java
##########
@@ -17,30 +17,32 @@
 package org.apache.kafka.connect.runtime.rest.entities;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.kafka.connect.connector.Connector;
+import org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader;
 import org.apache.kafka.connect.runtime.isolation.PluginDesc;
+import org.apache.kafka.connect.runtime.isolation.PluginType;
 
 import java.util.Objects;
 
 public class ConnectorPluginInfo {
     private final String className;
-    private final ConnectorType type;
+    private final PluginType type;
     private final String version;
 
     @JsonCreator
     public ConnectorPluginInfo(
         @JsonProperty("class") String className,
-        @JsonProperty("type") ConnectorType type,
+        @JsonProperty("type") PluginType type,
         @JsonProperty("version") String version
     ) {
         this.className = className;
         this.type = type;
         this.version = version;
     }
 
-    public ConnectorPluginInfo(PluginDesc<Connector> plugin) {
-        this(plugin.className(), ConnectorType.from(plugin.pluginClass()), plugin.version());
+    public ConnectorPluginInfo(PluginDesc<?> plugin, PluginType type) {
+        this(plugin.className(), type, plugin.version());

Review comment:
       Isn't the `PluginType` argument redundant if we already have a `PluginDesc`?
   ```suggestion
       public ConnectorPluginInfo(PluginDesc<?> plugin) {
           this(plugin.className(), plugin.type(), plugin.version());
   ```

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java
##########
@@ -123,4 +171,15 @@ private String normalizedPluginName(String pluginName) {
             ? pluginName.substring(0, pluginName.length() - ALIAS_SUFFIX.length())
             : pluginName;
     }
+
+    String getAlias(String name) {
+        name = normalizedPluginName(name);
+        int lastIndexOf = name.lastIndexOf('.');
+        return lastIndexOf >= 0 ? name.substring(lastIndexOf + 1) : name;
+    }
+
+    private synchronized List<ConfigKeyInfo> doGetConfigDef(final String pluginName) {
+        PluginType pluginType = pluginsByType.getOrDefault(getAlias(pluginName), PluginType.UNKNOWN);

Review comment:
       Doesn't this introduce the possibility of conflict between two plugins (or I guess specifically connectors, since those are the only ones we strip suffixes from) which have different fully-qualified class names, but the same simple class name? Or where they would have the same simple class name, except that one ends with `Connector` and the other doesn't?
   
   In practice this is unlikely to come up but if we support it at the moment, probably best to take care here to avoid introducing a potential regression, especially if someone for some reason wants to run, e.g., two different `MySqlSink` connectors on their worker.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java
##########
@@ -100,21 +146,23 @@ public ConfigInfos validateConfigs(
 
     @GET
     @Path("/")
-    public List<ConnectorPluginInfo> listConnectorPlugins() {
-        return getConnectorPlugins();
+    public List<ConnectorPluginInfo> listConnectorPlugins(@DefaultValue("true") @QueryParam("connectorsOnly") boolean connectorsOnly) {
+        return getConnectorPlugins(connectorsOnly);

Review comment:
       Nit: I know this is following the [existing style](https://github.com/apache/kafka/blob/c2ee1411c8bb73fcf96c12abeedbfe6fde2c6354/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java#L101-L118) in the code base, but do you think the separate `getConnectorPlugins` method is actually bringing anything to the table readability-wise?
   
   Think we could just as easily eliminate the `getConnectorPlugins` method and inline it directly here. Same thought with `getConnectorConfigDef`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on pull request #11572: KAFKA-13510: Connect APIs to list all connector plugins and retrieve …

Posted by GitBox <gi...@apache.org>.
C0urante commented on pull request #11572:
URL: https://github.com/apache/kafka/pull/11572#issuecomment-1054567120


   Noticed that the "resolve comment" option isn't available on this PR, apparently you need to either be the author of a PR or have write access to the repo in order to resolve comments ([ref](https://docs.github.com/en/pull-requests/collaborating-with-pull-requests/reviewing-changes-in-pull-requests/commenting-on-a-pull-request)). Feel free to resolve everything from me if you'd like; might help reduce noise on this page for other reviewers.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mimaison commented on a change in pull request #11572: KAFKA-13510: Connect APIs to list all connector plugins and retrieve …

Posted by GitBox <gi...@apache.org>.
mimaison commented on a change in pull request #11572:
URL: https://github.com/apache/kafka/pull/11572#discussion_r817909898



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java
##########
@@ -100,21 +135,23 @@ public ConfigInfos validateConfigs(
 
     @GET
     @Path("/")
-    public List<ConnectorPluginInfo> listConnectorPlugins() {
-        return getConnectorPlugins();
-    }
-
-    // TODO: improve once plugins are allowed to be added/removed during runtime.
-    private synchronized List<ConnectorPluginInfo> getConnectorPlugins() {
-        if (connectorPlugins.isEmpty()) {
-            for (PluginDesc<Connector> plugin : herder.plugins().connectors()) {
-                if (!CONNECTOR_EXCLUDES.contains(plugin.pluginClass())) {
-                    connectorPlugins.add(new ConnectorPluginInfo(plugin));
-                }
+    public List<ConnectorPluginInfo> listConnectorPlugins(@DefaultValue("true") @QueryParam("connectorsOnly") boolean connectorsOnly) {
+        synchronized (this) {
+            if (connectorsOnly) {
+                Set<String> types = new HashSet<>(Arrays.asList(PluginType.SINK.toString(), PluginType.SOURCE.toString()));
+                return Collections.unmodifiableList(connectorPlugins.stream().filter(p -> types.contains(p.type())).collect(Collectors.toList()));

Review comment:
       Not really, I updated the logic to use `equals` and `||`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mimaison commented on pull request #11572: KAFKA-13510: Connect APIs to list all connector plugins and retrieve …

Posted by GitBox <gi...@apache.org>.
mimaison commented on pull request #11572:
URL: https://github.com/apache/kafka/pull/11572#issuecomment-1057165940


   Thanks @tombentley for the review! I've pushed an update.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on a change in pull request #11572: KAFKA-13510: Connect APIs to list all connector plugins and retrieve …

Posted by GitBox <gi...@apache.org>.
C0urante commented on a change in pull request #11572:
URL: https://github.com/apache/kafka/pull/11572#discussion_r817913673



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
##########
@@ -750,4 +755,41 @@ private String trace(Throwable t) {
         return keys;
     }
 
+    @Override
+    public List<ConfigKeyInfo> connectorPluginConfig(String pluginName) {

Review comment:
       If we continue to put the onus on the `Herder` to ensure that the requested plugin has a supported type (e.g., throw an error if it's a REST extension), then I think "connector plugin" is useful since it differentiates between worker and connector plugins.
   
   But it looks like the `ConnectorPluginInfo` class is itself completely agnostic on the connector-plugin vs. worker-plugin front, and it might even possible to leverage it with no modifications if we eventually decide to add support for exposing worker plugin information to the REST API. So I think renaming that to `PluginInfo` (or something like that) would be reasonable.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mimaison commented on a change in pull request #11572: KAFKA-13510: Connect APIs to list all connector plugins and retrieve …

Posted by GitBox <gi...@apache.org>.
mimaison commented on a change in pull request #11572:
URL: https://github.com/apache/kafka/pull/11572#discussion_r817726612



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
##########
@@ -750,4 +755,41 @@ private String trace(Throwable t) {
         return keys;
     }
 
+    @Override
+    public List<ConfigKeyInfo> connectorPluginConfig(String pluginName) {

Review comment:
       In this context we consider connectors, transforms, converters, etc as "connector plugins" as opposed to "worker plugins" such as config providers and rest extensions. 
   
   With that in mind, do you think it's still worth naming this method and `ConnectorPluginInfo`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on a change in pull request #11572: KAFKA-13510: Connect APIs to list all connector plugins and retrieve …

Posted by GitBox <gi...@apache.org>.
C0urante commented on a change in pull request #11572:
URL: https://github.com/apache/kafka/pull/11572#discussion_r816159562



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
##########
@@ -232,8 +252,7 @@ private void initPluginLoader(String path) {
             if (CLASSPATH_NAME.equals(path)) {
                 scanUrlsAndAddPlugins(
                         getParent(),
-                        ClasspathHelper.forJavaClassPath().toArray(new URL[0]),
-                        null

Review comment:
       Thanks, the "unused parameter" warning from my IDE has been bugging me here about this for ages 🙏 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mimaison commented on pull request #11572: KAFKA-13510: Connect APIs to list all connector plugins and retrieve …

Posted by GitBox <gi...@apache.org>.
mimaison commented on pull request #11572:
URL: https://github.com/apache/kafka/pull/11572#issuecomment-1051008938


   Thanks @C0urante for the review, all good suggestions. I believe I've addressed them all.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on a change in pull request #11572: KAFKA-13510: Connect APIs to list all connector plugins and retrieve …

Posted by GitBox <gi...@apache.org>.
C0urante commented on a change in pull request #11572:
URL: https://github.com/apache/kafka/pull/11572#discussion_r814472999



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
##########
@@ -750,4 +751,38 @@ private String trace(Throwable t) {
         return keys;
     }
 
+    @Override

Review comment:
       (Left thoughts in `ConnectorPluginsResource` about possible ways we could refactor this to reduce the workload on the REST layer to handle some of the classloading logic.)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on a change in pull request #11572: KAFKA-13510: Connect APIs to list all connector plugins and retrieve …

Posted by GitBox <gi...@apache.org>.
C0urante commented on a change in pull request #11572:
URL: https://github.com/apache/kafka/pull/11572#discussion_r814469917



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java
##########
@@ -56,16 +69,49 @@
     private static final String ALIAS_SUFFIX = "Connector";
     private final Herder herder;
     private final List<ConnectorPluginInfo> connectorPlugins;
+    private final Map<String, PluginType> pluginsByType;
 
-    private static final List<Class<? extends Connector>> CONNECTOR_EXCLUDES = Arrays.asList(
+    static final List<Class<? extends Connector>> CONNECTOR_EXCLUDES = Arrays.asList(
             VerifiableSourceConnector.class, VerifiableSinkConnector.class,
             MockConnector.class, MockSourceConnector.class, MockSinkConnector.class,
             SchemaSourceConnector.class
     );
 
+    @SuppressWarnings("rawtypes")
+    static final List<Class<? extends Transformation>> TRANSFORM_EXCLUDES = Arrays.asList(
+            PredicatedTransformation.class
+    );
+
     public ConnectorPluginsResource(Herder herder) {
         this.herder = herder;
         this.connectorPlugins = new ArrayList<>();
+        this.pluginsByType = new HashMap<>();
+
+        // TODO: improve once plugins are allowed to be added/removed during runtime.
+        for (PluginDesc<Connector> plugin : herder.plugins().connectors()) {
+            if (!CONNECTOR_EXCLUDES.contains(plugin.pluginClass())) {
+                connectorPlugins.add(new ConnectorPluginInfo(plugin, PluginType.from(plugin.pluginClass())));
+                pluginsByType.put(getAlias(plugin.className()), PluginType.from(plugin.pluginClass()));
+            }
+        }
+        for (PluginDesc<Transformation<?>> transform : herder.plugins().transformations()) {
+            if (!TRANSFORM_EXCLUDES.contains(transform.pluginClass())) {
+                connectorPlugins.add(new ConnectorPluginInfo(transform, PluginType.TRANSFORMATION));
+                pluginsByType.put(getAlias(transform.className()), PluginType.TRANSFORMATION);
+            }
+        }
+        for (PluginDesc<Predicate<?>> predicate : herder.plugins().predicates()) {
+            connectorPlugins.add(new ConnectorPluginInfo(predicate, PluginType.PREDICATE));
+            pluginsByType.put(getAlias(predicate.className()), PluginType.PREDICATE);
+        }
+        for (PluginDesc<Converter> converter : herder.plugins().converters()) {
+            connectorPlugins.add(new ConnectorPluginInfo(converter, PluginType.CONVERTER));
+            pluginsByType.put(getAlias(converter.className()), PluginType.CONVERTER);
+        }
+        for (PluginDesc<HeaderConverter> headerConverter : herder.plugins().headerConverters()) {
+            connectorPlugins.add(new ConnectorPluginInfo(headerConverter, PluginType.HEADER_CONVERTER));
+            pluginsByType.put(getAlias(headerConverter.className()), PluginType.HEADER_CONVERTER);
+        }

Review comment:
       It seems like we're duplicating some of the logic contained in `Plugins` into this class by tracking class alias names and pre-computing plugin type based on them.
   
   Did you consider a `Herder` method that only accepted the name of the plugin, and took on the responsibility of deducing the plugin type itself?
   ```java
   List<ConfigKeyInfo> connectorPluginConfig(String pluginName);
   ```
   
   In `AbstractHerder`, we could do something like this:
   ```java
       @Override
       public List<ConfigKeyInfo> connectorPluginConfig(String pluginName) {
           try {
               Object plugin = Plugins.newPlugin(pluginName);
               PluginType pluginType = PluginType.from(plugin.class);
               List<ConfigKeyInfo> results = new ArrayList<>();
               ConfigDef configDefs;
               switch (pluginType) {
                   case SINK:
                   case SOURCE:
                       configDefs = ((Connector) plugin).config();
                       break;
                   case CONVERTER:
                       configDefs = ((Converter) plugin).config();
                       break;
               // ... Rest of switch statement follows same pattern, and rest of the method remains unchanged
       }
   ```
   
   And in `Plugins` we could do this:
   ```java
       public Object newPlugin(String classOrAlias) throws ClassNotFoundException {
           Class<? extends Object> klass = pluginClass(delegatingLoader, classOrAlias, Object.class);
           return newPlugin(klass);
       }
   ```
   
   
   Or alternatively, we could introduce a common interface for plugins that expose a `ConfigDef`:
   ```java
   interface DefinedConfigPlugin {
       ConfigDef config();
   }
   ```
   
   And we could simplify some of the `AbstractHerder` logic:
   ```java
       @Override
       public List<ConfigKeyInfo> connectorPluginConfig(String pluginName) {
           try {
               DefinedConfigPlugin plugin = Plugins.newDefinedConfigPlugin(pluginName);
               ConfigDef configDefs = plugin.config();
               // No switch statement on plugin type necessary
               // ... Rest of the method  remains unchanged
       }
   ```
   
   And `Plugins` would still be fairly simple:
   ```java
       public DefinedConfigPlugin newDefinedConfigPlugin(String classOrAlias) throws ClassNotFoundException {
           Class<? extends DefinedConfigPlugin> klass = pluginClass(delegatingLoader, classOrAlias, DefinedConfigPlugin.class);
           return newPlugin(klass);
       }
   ```
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] C0urante commented on a change in pull request #11572: KAFKA-13510: Connect APIs to list all connector plugins and retrieve …

Posted by GitBox <gi...@apache.org>.
C0urante commented on a change in pull request #11572:
URL: https://github.com/apache/kafka/pull/11572#discussion_r814475362



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
##########
@@ -168,6 +173,44 @@ public Connector newConnector(String connectorClassOrAlias) {
         return newPlugin(klass);
     }
 
+    public Converter newConverter(String className) throws ClassNotFoundException {

Review comment:
       (If we introduce a `newDefinedConfigPlugin` method, then these methods will become redundant and no docs will be necessary.)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mimaison commented on pull request #11572: KAFKA-13510: Connect APIs to list all connector plugins and retrieve …

Posted by GitBox <gi...@apache.org>.
mimaison commented on pull request #11572:
URL: https://github.com/apache/kafka/pull/11572#issuecomment-1058042085


   Thanks @C0urante and @tombentley for the reviews!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org