You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Eugene Burd (JIRA)" <ji...@apache.org> on 2017/10/28 06:10:00 UTC

[jira] [Created] (KAFKA-6139) error when loading plugins

Eugene Burd created KAFKA-6139:
----------------------------------

             Summary: error when loading plugins
                 Key: KAFKA-6139
                 URL: https://issues.apache.org/jira/browse/KAFKA-6139
             Project: Kafka
          Issue Type: Bug
          Components: KafkaConnect
    Affects Versions: 0.11.0.0
            Reporter: Eugene Burd


I am trying to run the Big Query connector for KafkaConnect (https://github.com/wepay/kafka-connect-bigquery).  I have configured it using a docker container, dropped the jar files for the connector in the container and configured the plugin.path in the kafka connect config to point to the directory with the jars.  

Upon startup, Kafka is scanning the plugin folder and evaluating all the jar files found.  It encounters the connector jar file, tests to see if it extends a connector (in this case kcbq-connector-1.0.0-SNAPSHOT.jar), but then it proceeds to create a new instance of the connector.  Code snippet from DelegatingClassLoader.java below.  The problem is that the connector class relies on other jar files that are in the plugin folder, but not in the path.  This causes a class not found exception (below). 

I suspect when the plugin connector is actually executed, it is done through an isolated context and that context's classpath is set to the plugin folder, so i think the connector would actually work.  But scanning for the connector fails prior to ever getting there.

I understand that you need a version of the connector and hence why this code is running, but i think the new instance creation for version check needs to be done in a sandbox to support the classpaths of the plugin connector. 


[2017-10-28 06:04:08,961] INFO Loading plugin from: /usr/share/java/kafka-connect-bigquery/kcbq-connector-1.0.0-SNAPSHOT.jar (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:176)
Exception in thread "main" java.lang.NoClassDefFoundError: com/google/cloud/bigquery/BigQuery
        at java.lang.Class.getDeclaredConstructors0(Native Method)
        at java.lang.Class.privateGetDeclaredConstructors(Class.java:2671)
        at java.lang.Class.getConstructor0(Class.java:3075)
        at java.lang.Class.newInstance(Class.java:412)
        at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.getPluginDesc(DelegatingClassLoader.java:242)
        at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanPluginPath(DelegatingClassLoader.java:223)
        at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanUrlsAndAddPlugins(DelegatingClassLoader.java:198)
        at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.registerPlugin(DelegatingClassLoader.java:190)
        at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initLoaders(DelegatingClassLoader.java:150)
        at org.apache.kafka.connect.runtime.isolation.Plugins.<init>(Plugins.java:47)
        at org.apache.kafka.connect.cli.ConnectDistributed.main(ConnectDistributed.java:63)
Caused by: java.lang.ClassNotFoundException: com.google.cloud.bigquery.BigQuery
        at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:62)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        ... 11 more

-------------------------

private <T> Collection<PluginDesc<T>> getPluginDesc( 
            Reflections reflections,
            Class<T> klass,
            ClassLoader loader
    ) throws InstantiationException, IllegalAccessException {
        Set<Class<? extends T>> plugins = reflections.getSubTypesOf(klass);

        Collection<PluginDesc<T>> result = new ArrayList<>();
        for (Class<? extends T> plugin : plugins) {
            if (PluginUtils.isConcrete(plugin)) {
                // Temporary workaround until all the plugins are versioned.
                if (Connector.class.isAssignableFrom(plugin)) {
                    result.add(
                            new PluginDesc<>(
                                    plugin,
                                    ((Connector) plugin.newInstance()).version(),
                                    loader
                            )
                    );
                } else {
                    result.add(new PluginDesc<>(plugin, "undefined", loader));
                }
            }
        }
        return result;
    }




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)