You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2018/02/15 00:24:37 UTC
[kafka] branch trunk updated: KAFKA-6503: Parallelize plugin
scanning
This is an automated email from the ASF dual-hosted git repository.
ewencp pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 3af1396 KAFKA-6503: Parallelize plugin scanning
3af1396 is described below
commit 3af13967db089b9a8320b539f5d5d218488ce467
Author: Robert Yokota <ra...@gmail.com>
AuthorDate: Wed Feb 14 16:24:05 2018 -0800
KAFKA-6503: Parallelize plugin scanning
This is a small change to parallelize plugin scanning. This may help in some environments where otherwise plugin scanning is slow.
Author: Robert Yokota <ra...@gmail.com>
Reviewers: Konstantine Karantasis <ko...@confluent.io>, Randall Hauch <rh...@gmail.com>, Ewen Cheslack-Postava <ew...@confluent.io>
Closes #4561 from rayokota/K6503-improve-plugin-scanning
---
.../runtime/isolation/DelegatingClassLoader.java | 29 +++++++++++++++++++++-
1 file changed, 28 insertions(+), 1 deletion(-)
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
index 345d7ef..b21cdcb 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
@@ -20,7 +20,10 @@ import org.apache.kafka.connect.connector.Connector;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.HeaderConverter;
import org.apache.kafka.connect.transforms.Transformation;
+import org.reflections.Configuration;
import org.reflections.Reflections;
+import org.reflections.ReflectionsException;
+import org.reflections.scanners.SubTypesScanner;
import org.reflections.util.ClasspathHelper;
import org.reflections.util.ConfigurationBuilder;
import org.slf4j.Logger;
@@ -269,7 +272,10 @@ public class DelegatingClassLoader extends URLClassLoader {
ConfigurationBuilder builder = new ConfigurationBuilder();
builder.setClassLoaders(new ClassLoader[]{loader});
builder.addUrls(urls);
- Reflections reflections = new Reflections(builder);
+ builder.setScanners(new SubTypesScanner());
+ builder.setExpandSuperTypes(false);
+ builder.useParallelExecutor();
+ Reflections reflections = new InternalReflections(builder);
return new PluginScanResult(
getPluginDesc(reflections, Connector.class, loader),
@@ -353,4 +359,25 @@ public class DelegatingClassLoader extends URLClassLoader {
}
}
}
+
+ private static class InternalReflections extends Reflections {
+
+ public InternalReflections(Configuration configuration) {
+ super(configuration);
+ }
+
+ // When Reflections is used for parallel scans, it has a bug where it propagates ReflectionsException
+ // as RuntimeException. Override the scan behavior to emulate the singled-threaded logic.
+ @Override
+ protected void scan(URL url) {
+ try {
+ super.scan(url);
+ } catch (ReflectionsException e) {
+ Logger log = Reflections.log;
+ if (log != null && log.isWarnEnabled()) {
+ log.warn("could not create Vfs.Dir from url. ignoring the exception and continuing", e);
+ }
+ }
+ }
+ }
}
--
To stop receiving notification emails like this one, please contact
ewencp@apache.org.