You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2017/06/01 09:08:01 UTC
kafka git commit: MINOR: Traverse plugin path recursively in Connect
(KIP-146)
Repository: kafka
Updated Branches:
refs/heads/trunk 049abe7ef -> e0150a25e
MINOR: Traverse plugin path recursively in Connect (KIP-146)
Author: Konstantine Karantasis <ko...@confluent.io>
Reviewers: Ewen Cheslack-Postava <ew...@confluent.io>
Closes #3173 from kkonstantine/MINOR-Traverse-plugin-path-recursively-in-Connect
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e0150a25
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e0150a25
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e0150a25
Branch: refs/heads/trunk
Commit: e0150a25e8127c282f54a0395eb2b1c80ebda94a
Parents: 049abe7
Author: Konstantine Karantasis <ko...@confluent.io>
Authored: Thu Jun 1 02:07:53 2017 -0700
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Thu Jun 1 02:07:53 2017 -0700
----------------------------------------------------------------------
.../kafka/connect/runtime/ConnectorFactory.java | 99 ----------
.../isolation/DelegatingClassLoader.java | 76 ++++----
.../runtime/isolation/PluginClassLoader.java | 17 +-
.../connect/runtime/isolation/PluginUtils.java | 188 ++++++++++++++++---
.../runtime/isolation/PluginUtilsTest.java | 10 +-
5 files changed, 219 insertions(+), 171 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/e0150a25/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorFactory.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorFactory.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorFactory.java
deleted file mode 100644
index fd0d982..0000000
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorFactory.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.connect.runtime;
-
-import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.connect.connector.Connector;
-import org.apache.kafka.connect.connector.Task;
-import org.apache.kafka.connect.errors.ConnectException;
-import org.reflections.Reflections;
-import org.reflections.util.ClasspathHelper;
-import org.reflections.util.ConfigurationBuilder;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Set;
-
-public class ConnectorFactory {
-
- public Connector newConnector(String connectorClassOrAlias) {
- return instantiate(getConnectorClass(connectorClassOrAlias));
- }
-
- public Task newTask(Class<? extends Task> taskClass) {
- return instantiate(taskClass);
- }
-
- private static <T> T instantiate(Class<? extends T> cls) {
- try {
- return Utils.newInstance(cls);
- } catch (Throwable t) {
- throw new ConnectException("Instantiation error", t);
- }
- }
-
- @SuppressWarnings("unchecked")
- private static Class<? extends Connector> getConnectorClass(String connectorClassOrAlias) {
- // Avoid the classpath scan if the full class name was provided
- try {
- Class<?> clazz = Class.forName(connectorClassOrAlias);
- if (!Connector.class.isAssignableFrom(clazz))
- throw new ConnectException("Class " + connectorClassOrAlias + " does not implement Connector");
- return (Class<? extends Connector>) clazz;
- } catch (ClassNotFoundException e) {
- // Fall through to scan for the alias
- }
-
- // Iterate over our entire classpath to find all the connectors and hopefully one of them matches the alias from the connector configration
- Reflections reflections = new Reflections(new ConfigurationBuilder()
- .setUrls(ClasspathHelper.forJavaClassPath()));
-
- Set<Class<? extends Connector>> connectors = reflections.getSubTypesOf(Connector.class);
-
- List<Class<? extends Connector>> results = new ArrayList<>();
-
- for (Class<? extends Connector> connector: connectors) {
- // Configuration included the class name but not package
- if (connector.getSimpleName().equals(connectorClassOrAlias))
- results.add(connector);
-
- // Configuration included a short version of the name (i.e. FileStreamSink instead of FileStreamSinkConnector)
- if (connector.getSimpleName().equals(connectorClassOrAlias + "Connector"))
- results.add(connector);
- }
-
- if (results.isEmpty())
- throw new ConnectException("Failed to find any class that implements Connector and which name matches " + connectorClassOrAlias +
- ", available connectors are: " + connectorNames(connectors));
- if (results.size() > 1) {
- throw new ConnectException("More than one connector matches alias " + connectorClassOrAlias +
- ". Please use full package and class name instead. Classes found: " + connectorNames(results));
- }
-
- // We just validated that we have exactly one result, so this is safe
- return results.get(0);
- }
-
- private static String connectorNames(Collection<Class<? extends Connector>> connectors) {
- StringBuilder names = new StringBuilder();
- for (Class<?> c : connectors)
- names.append(c.getName()).append(", ");
- return names.substring(0, names.toString().length() - 2);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/e0150a25/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
index da8b444..ac0530e 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
@@ -51,6 +51,7 @@ public class DelegatingClassLoader extends URLClassLoader {
private static final Logger log = LoggerFactory.getLogger(DelegatingClassLoader.class);
private final Map<String, SortedMap<PluginDesc<?>, ClassLoader>> pluginLoaders;
+ private final Map<String, String> aliases;
private final SortedSet<PluginDesc<Connector>> connectors;
private final SortedSet<PluginDesc<Converter>> converters;
private final SortedSet<PluginDesc<Transformation>> transformations;
@@ -61,6 +62,7 @@ public class DelegatingClassLoader extends URLClassLoader {
super(new URL[0], parent);
this.pluginPaths = pluginPaths;
this.pluginLoaders = new HashMap<>();
+ this.aliases = new HashMap<>();
this.activePaths = new HashMap<>();
this.connectors = new TreeSet<>();
this.converters = new TreeSet<>();
@@ -89,8 +91,10 @@ public class DelegatingClassLoader extends URLClassLoader {
public ClassLoader connectorLoader(String connectorClassOrAlias) {
log.debug("Getting plugin class loader for connector: '{}'", connectorClassOrAlias);
- SortedMap<PluginDesc<?>, ClassLoader> inner =
- pluginLoaders.get(connectorClassOrAlias);
+ String fullName = aliases.containsKey(connectorClassOrAlias)
+ ? aliases.get(connectorClassOrAlias)
+ : connectorClassOrAlias;
+ SortedMap<PluginDesc<?>, ClassLoader> inner = pluginLoaders.get(fullName);
if (inner == null) {
log.error(
"Plugin class loader for connector: '{}' was not found. Returning: {}",
@@ -137,23 +141,16 @@ public class DelegatingClassLoader extends URLClassLoader {
for (String configPath : pluginPaths) {
path = configPath;
Path pluginPath = Paths.get(path).toAbsolutePath();
+ // Update for exception handling
+ path = pluginPath.toString();
// Currently 'plugin.paths' property is a list of top-level directories
// containing plugins
if (Files.isDirectory(pluginPath)) {
for (Path pluginLocation : PluginUtils.pluginLocations(pluginPath)) {
- log.info("Loading plugin from: {}", pluginLocation);
- URL[] urls = PluginUtils.pluginUrls(pluginLocation).toArray(new URL[0]);
- if (log.isDebugEnabled()) {
- log.debug("Loading plugin urls: {}", Arrays.toString(urls));
- }
- PluginClassLoader loader = newPluginClassLoader(
- pluginLocation.toUri().toURL(),
- urls,
- this
- );
-
- scanUrlsAndAddPlugins(loader, urls, pluginLocation);
+ registerPlugin(pluginLocation);
}
+ } else if (PluginUtils.isArchive(pluginPath)) {
+ registerPlugin(pluginPath);
}
}
@@ -165,15 +162,34 @@ public class DelegatingClassLoader extends URLClassLoader {
null
);
} catch (InvalidPathException | MalformedURLException e) {
- log.error("Invalid path in plugin path: {}. Ignoring.", path);
+ log.error("Invalid path in plugin path: {}. Ignoring.", path, e);
} catch (IOException e) {
- log.error("Could not get listing for plugin path: {}. Ignoring.", path);
+ log.error("Could not get listing for plugin path: {}. Ignoring.", path, e);
} catch (InstantiationException | IllegalAccessException e) {
log.error("Could not instantiate plugins in: {}. Ignoring: {}", path, e);
}
addAllAliases();
}
+ private void registerPlugin(Path pluginLocation)
+ throws InstantiationException, IllegalAccessException, IOException {
+ log.info("Loading plugin from: {}", pluginLocation);
+ List<URL> pluginUrls = new ArrayList<>();
+ for (Path path : PluginUtils.pluginUrls(pluginLocation)) {
+ pluginUrls.add(path.toUri().toURL());
+ }
+ URL[] urls = pluginUrls.toArray(new URL[0]);
+ if (log.isDebugEnabled()) {
+ log.debug("Loading plugin urls: {}", Arrays.toString(urls));
+ }
+ PluginClassLoader loader = newPluginClassLoader(
+ pluginLocation.toUri().toURL(),
+ urls,
+ this
+ );
+ scanUrlsAndAddPlugins(loader, urls, pluginLocation);
+ }
+
private void scanUrlsAndAddPlugins(
ClassLoader loader,
URL[] urls,
@@ -245,28 +261,17 @@ public class DelegatingClassLoader extends URLClassLoader {
return super.loadClass(name, resolve);
}
- SortedMap<PluginDesc<?>, ClassLoader> inner = pluginLoaders.get(name);
+ String fullName = aliases.containsKey(name) ? aliases.get(name) : name;
+ SortedMap<PluginDesc<?>, ClassLoader> inner = pluginLoaders.get(fullName);
if (inner != null) {
- log.trace("Retrieving loaded class '{}' from '{}'", name, inner.get(inner.lastKey()));
ClassLoader pluginLoader = inner.get(inner.lastKey());
+ log.trace("Retrieving loaded class '{}' from '{}'", fullName, pluginLoader);
return pluginLoader instanceof PluginClassLoader
- ? ((PluginClassLoader) pluginLoader).loadClass(name, resolve)
- : super.loadClass(name, resolve);
+ ? ((PluginClassLoader) pluginLoader).loadClass(fullName, resolve)
+ : super.loadClass(fullName, resolve);
}
- Class<?> klass = null;
- for (PluginClassLoader loader : activePaths.values()) {
- try {
- klass = loader.loadClass(name, resolve);
- break;
- } catch (ClassNotFoundException e) {
- // Not found in this loader.
- }
- }
- if (klass == null) {
- return super.loadClass(name, resolve);
- }
- return klass;
+ return super.loadClass(fullName, resolve);
}
private void addAllAliases() {
@@ -280,12 +285,11 @@ public class DelegatingClassLoader extends URLClassLoader {
if (PluginUtils.isAliasUnique(plugin, plugins)) {
String simple = PluginUtils.simpleName(plugin);
String pruned = PluginUtils.prunedName(plugin);
- SortedMap<PluginDesc<?>, ClassLoader> inner = pluginLoaders.get(plugin.className());
- pluginLoaders.put(simple, inner);
+ aliases.put(simple, plugin.className());
if (simple.equals(pruned)) {
log.info("Added alias '{}' to plugin '{}'", simple, plugin.className());
} else {
- pluginLoaders.put(pruned, inner);
+ aliases.put(pruned, plugin.className());
log.info(
"Added aliases '{}' and '{}' to plugin '{}'",
simple,
http://git-wip-us.apache.org/repos/asf/kafka/blob/e0150a25/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginClassLoader.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginClassLoader.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginClassLoader.java
index 07438e9..780ebd0 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginClassLoader.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginClassLoader.java
@@ -23,7 +23,7 @@ import java.net.URL;
import java.net.URLClassLoader;
public class PluginClassLoader extends URLClassLoader {
- private static final Logger log = LoggerFactory.getLogger(DelegatingClassLoader.class);
+ private static final Logger log = LoggerFactory.getLogger(PluginClassLoader.class);
private final URL pluginLocation;
public PluginClassLoader(URL pluginLocation, URL[] urls, ClassLoader parent) {
@@ -49,16 +49,17 @@ public class PluginClassLoader extends URLClassLoader {
protected Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundException {
Class<?> klass = findLoadedClass(name);
if (klass == null) {
- if (PluginUtils.shouldLoadInIsolation(name)) {
- try {
+ try {
+ if (PluginUtils.shouldLoadInIsolation(name)) {
klass = findClass(name);
- } catch (ClassNotFoundException e) {
- // Not found in loader's path. Search in parents.
}
+ } catch (ClassNotFoundException e) {
+ // Not found in loader's path. Search in parents.
+ log.trace("Class '{}' not found. Delegating to parent", name);
}
- if (klass == null) {
- klass = super.loadClass(name, false);
- }
+ }
+ if (klass == null) {
+ klass = super.loadClass(name, false);
}
if (resolve) {
resolveClass(klass);
http://git-wip-us.apache.org/repos/asf/kafka/blob/e0150a25/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java
index b2be997..edc1636 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginUtils.java
@@ -16,26 +16,107 @@
*/
package org.apache.kafka.connect.runtime.isolation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.io.IOException;
import java.lang.reflect.Modifier;
-import java.net.URL;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
+import java.util.Set;
public class PluginUtils {
+ private static final Logger log = LoggerFactory.getLogger(PluginUtils.class);
+
+ // Be specific about javax packages and exclude those existing in Java SE and Java EE libraries.
private static final String BLACKLIST = "^(?:"
+ "java"
- + "|javax"
- + "|org\\.omg"
+ + "|javax\\.accessibility"
+ + "|javax\\.activation"
+ + "|javax\\.activity"
+ + "|javax\\.annotation"
+ + "|javax\\.batch\\.api"
+ + "|javax\\.batch\\.operations"
+ + "|javax\\.batch\\.runtime"
+ + "|javax\\.crypto"
+ + "|javax\\.decorator"
+ + "|javax\\.ejb"
+ + "|javax\\.el"
+ + "|javax\\.enterprise\\.concurrent"
+ + "|javax\\.enterprise\\.context"
+ + "|javax\\.enterprise\\.context\\.spi"
+ + "|javax\\.enterprise\\.deploy\\.model"
+ + "|javax\\.enterprise\\.deploy\\.shared"
+ + "|javax\\.enterprise\\.deploy\\.spi"
+ + "|javax\\.enterprise\\.event"
+ + "|javax\\.enterprise\\.inject"
+ + "|javax\\.enterprise\\.inject\\.spi"
+ + "|javax\\.enterprise\\.util"
+ + "|javax\\.faces"
+ + "|javax\\.imageio"
+ + "|javax\\.inject"
+ + "|javax\\.interceptor"
+ + "|javax\\.jms"
+ + "|javax\\.json"
+ + "|javax\\.jws"
+ + "|javax\\.lang\\.model"
+ + "|javax\\.mail"
+ + "|javax\\.management"
+ + "|javax\\.management\\.j2ee"
+ + "|javax\\.naming"
+ + "|javax\\.net"
+ + "|javax\\.persistence"
+ + "|javax\\.print"
+ + "|javax\\.resource"
+ + "|javax\\.rmi"
+ + "|javax\\.script"
+ + "|javax\\.security\\.auth"
+ + "|javax\\.security\\.auth\\.message"
+ + "|javax\\.security\\.cert"
+ + "|javax\\.security\\.jacc"
+ + "|javax\\.security\\.sasl"
+ + "|javax\\.servlet"
+ + "|javax\\.sound\\.midi"
+ + "|javax\\.sound\\.sampled"
+ + "|javax\\.sql"
+ + "|javax\\.swing"
+ + "|javax\\.tools"
+ + "|javax\\.transaction"
+ + "|javax\\.validation"
+ + "|javax\\.websocket"
+ + "|javax\\.ws\\.rs"
+ + "|javax\\.xml"
+ + "|javax\\.xml\\.bind"
+ + "|javax\\.xml\\.registry"
+ + "|javax\\.xml\\.rpc"
+ + "|javax\\.xml\\.soap"
+ + "|javax\\.xml\\.ws"
+ + "|org\\.ietf\\.jgss"
+ + "|org\\.omg\\.CORBA"
+ + "|org\\.omg\\.CosNaming"
+ + "|org\\.omg\\.Dynamic"
+ + "|org\\.omg\\.DynamicAny"
+ + "|org\\.omg\\.IOP"
+ + "|org\\.omg\\.Messaging"
+ + "|org\\.omg\\.PortableInterceptor"
+ + "|org\\.omg\\.PortableServer"
+ + "|org\\.omg\\.SendingContext"
+ + "|org\\.omg\\.stub\\.java\\.rmi"
+ "|org\\.w3c\\.dom"
+ + "|org\\.xml\\.sax"
+ "|org\\.apache\\.kafka\\.common"
+ "|org\\.apache\\.kafka\\.connect"
- + "|org\\.apache\\.log4j"
+ + "|org\\.slf4j"
+ ")\\..*$";
private static final String WHITELIST = "^org\\.apache\\.kafka\\.connect\\.(?:"
@@ -50,7 +131,7 @@ public class PluginUtils {
.Filter<Path>() {
@Override
public boolean accept(Path path) throws IOException {
- return Files.isDirectory(path) || PluginUtils.isJar(path);
+ return Files.isDirectory(path) || isArchive(path) || isClassFile(path);
}
};
@@ -63,32 +144,16 @@ public class PluginUtils {
return !Modifier.isAbstract(mod) && !Modifier.isInterface(mod);
}
- public static boolean isJar(Path path) {
+ public static boolean isArchive(Path path) {
return path.toString().toLowerCase(Locale.ROOT).endsWith(".jar");
}
- public static List<URL> pluginUrls(Path pluginPath) throws IOException {
- List<URL> urls = new ArrayList<>();
- if (PluginUtils.isJar(pluginPath)) {
- urls.add(pluginPath.toUri().toURL());
- } else if (Files.isDirectory(pluginPath)) {
- try (
- DirectoryStream<Path> listing = Files.newDirectoryStream(
- pluginPath,
- PLUGIN_PATH_FILTER
- )
- ) {
- for (Path jar : listing) {
- urls.add(jar.toUri().toURL());
- }
- }
- }
- return urls;
+ public static boolean isClassFile(Path path) {
+ return path.toString().toLowerCase(Locale.ROOT).endsWith(".class");
}
public static List<Path> pluginLocations(Path topPath) throws IOException {
List<Path> locations = new ArrayList<>();
- // Non-recursive for now. Plugin directories or jars need to be exactly under the topPath.
try (
DirectoryStream<Path> listing = Files.newDirectoryStream(
topPath,
@@ -102,6 +167,71 @@ public class PluginUtils {
return locations;
}
+ public static List<Path> pluginUrls(Path topPath) throws IOException {
+ boolean containsClassFiles = false;
+ Set<Path> archives = new HashSet<>();
+ LinkedList<DirectoryEntry> dfs = new LinkedList<>();
+ Set<Path> visited = new HashSet<>();
+
+ if (isArchive(topPath)) {
+ return Collections.singletonList(topPath);
+ }
+
+ DirectoryStream<Path> topListing = Files.newDirectoryStream(
+ topPath,
+ PLUGIN_PATH_FILTER
+ );
+ dfs.push(new DirectoryEntry(topListing));
+ visited.add(topPath);
+ try {
+ while (!dfs.isEmpty()) {
+ Iterator<Path> neighbors = dfs.peek().iterator;
+ if (!neighbors.hasNext()) {
+ dfs.pop().stream.close();
+ continue;
+ }
+
+ Path adjacent = neighbors.next();
+ if (Files.isSymbolicLink(adjacent)) {
+ Path absolute = Files.readSymbolicLink(adjacent).toRealPath();
+ if (Files.exists(absolute)) {
+ adjacent = absolute;
+ } else {
+ continue;
+ }
+ }
+
+ if (!visited.contains(adjacent)) {
+ visited.add(adjacent);
+ if (isArchive(adjacent)) {
+ archives.add(adjacent);
+ } else if (isClassFile(adjacent)) {
+ containsClassFiles = true;
+ } else {
+ DirectoryStream<Path> listing = Files.newDirectoryStream(
+ adjacent,
+ PLUGIN_PATH_FILTER
+ );
+ dfs.push(new DirectoryEntry(listing));
+ }
+ }
+ }
+ } finally {
+ while (!dfs.isEmpty()) {
+ dfs.pop().stream.close();
+ }
+ }
+
+ if (containsClassFiles) {
+ if (archives.isEmpty()) {
+ return Collections.singletonList(topPath);
+ }
+ log.warn("Plugin path contains both java archives and class files. Returning only the"
+ + " archives");
+ }
+ return Arrays.asList(archives.toArray(new Path[0]));
+ }
+
public static String simpleName(PluginDesc<?> plugin) {
return plugin.pluginClass().getSimpleName();
}
@@ -144,4 +274,14 @@ public class PluginUtils {
return simple;
}
+ private static class DirectoryEntry {
+ DirectoryStream<Path> stream;
+ Iterator<Path> iterator;
+
+ DirectoryEntry(DirectoryStream<Path> stream) {
+ this.stream = stream;
+ this.iterator = stream.iterator();
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/e0150a25/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java
index c943863..a49e54c 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java
@@ -39,20 +39,22 @@ public class PluginUtilsTest {
assertFalse(PluginUtils.shouldLoadInIsolation("java.lang.String"));
assertFalse(PluginUtils.shouldLoadInIsolation("java.util.HashMap$Entry"));
assertFalse(PluginUtils.shouldLoadInIsolation("java.io.Serializable"));
- assertFalse(PluginUtils.shouldLoadInIsolation("javax."));
+ assertFalse(PluginUtils.shouldLoadInIsolation("javax.rmi."));
assertFalse(PluginUtils.shouldLoadInIsolation(
"javax.management.loading.ClassLoaderRepository")
);
- assertFalse(PluginUtils.shouldLoadInIsolation("org.omg."));
+ assertFalse(PluginUtils.shouldLoadInIsolation("org.omg.CORBA."));
assertFalse(PluginUtils.shouldLoadInIsolation("org.omg.CORBA.Object"));
assertFalse(PluginUtils.shouldLoadInIsolation("org.w3c.dom."));
assertFalse(PluginUtils.shouldLoadInIsolation("org.w3c.dom.traversal.TreeWalker"));
+ assertFalse(PluginUtils.shouldLoadInIsolation("org.xml.sax."));
+ assertFalse(PluginUtils.shouldLoadInIsolation("org.xml.sax.EntityResolver"));
}
@Test
public void testThirdPartyClasses() throws Exception {
- assertFalse(PluginUtils.shouldLoadInIsolation("org.apache.log4j."));
- assertFalse(PluginUtils.shouldLoadInIsolation("org.apache.log4j.Level"));
+ assertFalse(PluginUtils.shouldLoadInIsolation("org.slf4j."));
+ assertFalse(PluginUtils.shouldLoadInIsolation("org.slf4j.LoggerFactory"));
}
@Test