You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by sn...@apache.org on 2022/08/09 07:27:37 UTC
[nutch] 03/03: NUTCH-2936 Early registration of URL stream handlers provided by plugins may fail Hadoop jobs running in distributed mode if protocol-okhttp is used NUTCH-2949 Tasks of a multi-threaded map runner may fail because of slow creation of URL stream handlers
This is an automated email from the ASF dual-hosted git repository.
snagel pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nutch.git
commit 487110b07a8b085c5546b58a2157268b3d21cb19
Author: Sebastian Nagel <sn...@apache.org>
AuthorDate: Wed Jun 15 13:08:00 2022 +0200
NUTCH-2936 Early registration of URL stream handlers provided by plugins may fail Hadoop jobs
running in distributed mode if protocol-okhttp is used
NUTCH-2949 Tasks of a multi-threaded map runner may fail because of slow creation of URL stream handlers
- cache URLStreamHandlers for each protocol to avoid that handlers are
created anew
- utilize the cache to route standard protocols (http, https, file, jar)
to handlers implemented by the JVM: this fixes NUTCH-2936
---
.../org/apache/nutch/plugin/PluginRepository.java | 4 +-
.../nutch/plugin/URLStreamHandlerFactory.java | 60 +++++++++++++++++++---
2 files changed, 55 insertions(+), 9 deletions(-)
diff --git a/src/java/org/apache/nutch/plugin/PluginRepository.java b/src/java/org/apache/nutch/plugin/PluginRepository.java
index 1eec0ffc8..d80f971df 100644
--- a/src/java/org/apache/nutch/plugin/PluginRepository.java
+++ b/src/java/org/apache/nutch/plugin/PluginRepository.java
@@ -541,8 +541,8 @@ public class PluginRepository implements URLStreamHandlerFactory {
/**
* Registers this PluginRepository to be invoked whenever URLs have to be
- * parsed. This allows to check the registered protocol plugins for uncommon
- * protocols.
+ * parsed. This allows to check the registered protocol plugins for custom
+ * protocols not covered by standard {@link URLStreamHandler}s of the JVM.
*/
private void registerURLStreamHandlerFactory() {
org.apache.nutch.plugin.URLStreamHandlerFactory.getInstance().registerPluginRepository(this);
diff --git a/src/java/org/apache/nutch/plugin/URLStreamHandlerFactory.java b/src/java/org/apache/nutch/plugin/URLStreamHandlerFactory.java
index 6c79fe9e6..bd7e377d0 100644
--- a/src/java/org/apache/nutch/plugin/URLStreamHandlerFactory.java
+++ b/src/java/org/apache/nutch/plugin/URLStreamHandlerFactory.java
@@ -20,6 +20,9 @@ import java.lang.ref.WeakReference;
import java.net.URL;
import java.net.URLStreamHandler;
import java.util.ArrayList;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -42,16 +45,37 @@ public class URLStreamHandlerFactory
/** The singleton instance. */
private static URLStreamHandlerFactory instance;
- /** Here we register all PluginRepositories.
- * In this class we do not know why several instances of PluginRepository
- * are kept, nor do we know how long they will be used. To prevent
- * a memory leak, this class must not keep references to PluginRepository
- * but use WeakReference which allows PluginRepository to still be
- * garbage collected. The prize is we need to clean the list for
- * outdated references which is done in the {@link #removeInvalidRefs()} method.
+ /**
+ * Here we register all PluginRepositories. In this class we do not know why
+ * several instances of PluginRepository are kept, nor do we know how long
+ * they will be used. To prevent a memory leak, this class must not keep
+ * references to PluginRepository but use WeakReference which allows
+ * PluginRepository to still be garbage collected. The prize is we need to
+ * clean the list for outdated references which is done in the
+ * {@link #removeInvalidRefs()} method.
*/
private ArrayList<WeakReference<PluginRepository>> prs;
+ /**
+ * Cache of URLStreamHandlers for each protocol supported by
+ * <ul>
+ * <li>one of the registered and active plugins</li>
+ * <li>or by the JVM</li>
+ * </ul>
+ * Using the cache avoids that {@link URLStreamHandler} instances are created
+ * multiple times anew. The cache is also pre-populated with protocols handled
+ * obligatorily by the JVM, see {@link SYSTEM_PROTOCOLS}.
+ */
+ private Map<String, Optional<URLStreamHandler>> cache;
+
+ /**
+ * Protocols covered by standard JVM URL handlers. These protocols must not be
+ * handled by Nutch plugins, in order to avoid that basic actions (eg. loading
+ * of classes and configuration files) break.
+ */
+ public static final String[] SYSTEM_PROTOCOLS = { //
+ "http", "https", "file", "jar" };
+
static {
instance = new URLStreamHandlerFactory();
URL.setURLStreamHandlerFactory(instance);
@@ -60,6 +84,16 @@ public class URLStreamHandlerFactory
private URLStreamHandlerFactory() {
this.prs = new ArrayList<>();
+ initCache();
+ }
+
+ /** Reset and initialize cache (protocol -> URLStreamHandler) */
+ private synchronized void initCache() {
+ cache = new ConcurrentHashMap<>();
+ // pre-populate cache with protocols to be handled by the JVM
+ for (String protocol : SYSTEM_PROTOCOLS) {
+ cache.put(protocol, Optional.empty());
+ }
}
/**
@@ -77,11 +111,21 @@ public class URLStreamHandlerFactory
public void registerPluginRepository(PluginRepository pr) {
this.prs.add(new WeakReference<PluginRepository>(pr));
+ // reset the cache, so that the new PluginRepository is used from now on
+ initCache();
+
removeInvalidRefs();
}
@Override
public URLStreamHandler createURLStreamHandler(String protocol) {
+
+ if (cache.containsKey(protocol)) {
+ // use the cached handler, including "null" for standard
+ // handlers implemented by the JVM
+ return cache.get(protocol).orElse(null);
+ }
+
LOG.debug("Creating URLStreamHandler for protocol: {}", protocol);
removeInvalidRefs();
@@ -93,10 +137,12 @@ public class URLStreamHandlerFactory
if (pr != null) {
// found PluginRepository. Let's get the URLStreamHandler...
URLStreamHandler handler = pr.createURLStreamHandler(protocol);
+ cache.put(protocol, Optional.of(handler));
return handler;
}
}
+ cache.put(protocol, Optional.empty());
return null;
}