You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rh...@apache.org on 2019/08/11 15:28:39 UTC

[kafka] branch 2.2 updated: KAFKA-8550: Fix plugin loading of aliased converters in Connect (#6959)

This is an automated email from the ASF dual-hosted git repository.

rhauch pushed a commit to branch 2.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.2 by this push:
     new d86420f  KAFKA-8550: Fix plugin loading of aliased converters in Connect (#6959)
d86420f is described below

commit d86420f15681297d653909035bc67a3ae6081376
Author: Chris Egerton <ch...@confluent.io>
AuthorDate: Sun Aug 11 07:56:05 2019 -0700

    KAFKA-8550: Fix plugin loading of aliased converters in Connect (#6959)
    
    Connector validation fails if an alias is used for the converter since the validation for that is done via `ConfigDef.validateAll(...)`, which in turn invokes `Class.forName(...)` on the alias. Even though the class is successfully loaded by the DelegatingClassLoader, some Java implementations will refuse to return a class from `Class.forName(...)` whose name differs from the argument provided.
    
    This commit alters `ConfigDef.parseType(...)` to first invoke `ClassLoader.loadClass(...)` on the class using our class loader in order to get a handle on the actual class object to be loaded, then invoke `Class.forName(...)` with the fully-qualified class name of the to-be-loaded class and return the result. The invocation of `Class.forName(...)` is necessary in order to allow static initialization to take place; simply calling `ClassLoader.loadClass(...)` is insufficient.
    
    Also corrected a unit test that relied upon the old behavior.
    
    Author: Chris Egerton <ch...@confluent.io>
    Reviewers: Robert Yokota <ra...@gmail.com>, Randall Hauch <rh...@gmail.com>
---
 .../org/apache/kafka/common/config/ConfigDef.java  | 13 +++++++++---
 .../kafka/common/config/AbstractConfigTest.java    |  2 +-
 .../apache/kafka/common/config/ConfigDefTest.java  | 23 ++++++++++++++++++++++
 3 files changed, 34 insertions(+), 4 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
index 7669f1d..b172369 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
@@ -705,9 +705,16 @@ public class ConfigDef {
                 case CLASS:
                     if (value instanceof Class)
                         return value;
-                    else if (value instanceof String)
-                        return Class.forName(trimmed, true, Utils.getContextOrKafkaClassLoader());
-                    else
+                    else if (value instanceof String) {
+                        ClassLoader contextOrKafkaClassLoader = Utils.getContextOrKafkaClassLoader();
+                        // Use loadClass here instead of Class.forName because the name we use here may be an alias
+                        // and not match the name of the class that gets loaded. If that happens, Class.forName can
+                        // throw an exception.
+                        Class<?> klass = contextOrKafkaClassLoader.loadClass(trimmed);
+                        // Invoke forName here with the true name of the requested class to cause class
+                        // initialization to take place.
+                        return Class.forName(klass.getName(), true, contextOrKafkaClassLoader);
+                    } else
                         throw new ConfigException(name, value, "Expected a Class instance or class name.");
                 default:
                     throw new IllegalStateException("Unknown type.");
diff --git a/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java b/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java
index 071deed..5762adf 100644
--- a/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java
@@ -263,7 +263,7 @@ public class AbstractConfigTest {
             @Override
             protected Class<?> findClass(String name) throws ClassNotFoundException {
                 if (name.equals(ClassTestConfig.DEFAULT_CLASS.getName()) || name.equals(ClassTestConfig.RESTRICTED_CLASS.getName()))
-                    return null;
+                    throw new ClassNotFoundException();
                 else
                     return ClassTestConfig.class.getClassLoader().loadClass(name);
             }
diff --git a/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java b/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java
index 974d39f..701e9f4 100644
--- a/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java
@@ -639,6 +639,29 @@ public class ConfigDefTest {
         assertEquals(NestedClass.class, Class.forName(actual));
     }
 
+    @Test
+    public void testClassWithAlias() {
+        final String alias = "PluginAlias";
+        ClassLoader originalClassLoader = Thread.currentThread().getContextClassLoader();
+        try {
+            // Could try to use the Plugins class from Connect here, but this should simulate enough
+            // of the aliasing logic to suffice for this test.
+            Thread.currentThread().setContextClassLoader(new ClassLoader(originalClassLoader) {
+                @Override
+                public Class loadClass(String name, boolean resolve) throws ClassNotFoundException {
+                    if (alias.equals(name)) {
+                        return NestedClass.class;
+                    } else {
+                        return super.loadClass(name, resolve);
+                    }
+                }
+            });
+            ConfigDef.parseType("Test config", alias, Type.CLASS);
+        } finally {
+            Thread.currentThread().setContextClassLoader(originalClassLoader);
+        }
+    }
+
     private class NestedClass {
     }
 }