You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by ma...@apache.org on 2019/01/18 12:27:43 UTC
[nutch] branch master updated: NUTCH-2678 Allow for per-host
configurable protocol plugin
This is an automated email from the ASF dual-hosted git repository.
markus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nutch.git
The following commit(s) were added to refs/heads/master by this push:
new 64a6f7c NUTCH-2678 Allow for per-host configurable protocol plugin
64a6f7c is described below
commit 64a6f7cc533f41e53cf6b637979080235b2147fa
Author: Markus Jelsma <ma...@apache.org>
AuthorDate: Fri Jan 18 13:27:16 2019 +0100
NUTCH-2678 Allow for per-host configurable protocol plugin
---
conf/host-protocol-mapping.txt.template | 16 +++
src/java/org/apache/nutch/plugin/Extension.java | 4 +
.../org/apache/nutch/protocol/ProtocolFactory.java | 129 +++++++++++++++++----
src/java/org/apache/nutch/util/ObjectCache.java | 4 +
.../apache/nutch/protocol/TestProtocolFactory.java | 6 -
5 files changed, 130 insertions(+), 29 deletions(-)
diff --git a/conf/host-protocol-mapping.txt.template b/conf/host-protocol-mapping.txt.template
new file mode 100644
index 0000000..a09bca6
--- /dev/null
+++ b/conf/host-protocol-mapping.txt.template
@@ -0,0 +1,16 @@
+# This file defines a hostname to protocol plugin mapping. Each line takes a
+# host name followed by a tab, followed by the ID of the protocol plugin. You
+# can find the ID in the protocol plugin's plugin.xml file.
+#
+# <hostname>\t<plugin_id>\n
+# nutch.apache.org org.apache.nutch.protocol.httpclient.Http
+# tika.apache.org org.apache.nutch.protocol.http.Http
+#
+# If the requested host is not mapped, Nutch can choose any of the enabled
+# plugins so you can force defaults using:
+#
+# protocol:<protocol>\t<plugin_id>\n
+#
+# This example forces httpclient for all protocol in case the host is not mapped:
+# protocol:http org.apache.nutch.protocol.httpclient.Http
+# protocol:https org.apache.nutch.protocol.httpclient.Http
diff --git a/src/java/org/apache/nutch/plugin/Extension.java b/src/java/org/apache/nutch/plugin/Extension.java
index e73b850..be737cb 100644
--- a/src/java/org/apache/nutch/plugin/Extension.java
+++ b/src/java/org/apache/nutch/plugin/Extension.java
@@ -197,4 +197,8 @@ public class Extension {
public void setDescriptor(PluginDescriptor pDescriptor) {
fDescriptor = pDescriptor;
}
+
+ public String toString() {
+ return getId() + ", " + getClazz() + ", " + getTargetPoint();
+ }
}
diff --git a/src/java/org/apache/nutch/protocol/ProtocolFactory.java b/src/java/org/apache/nutch/protocol/ProtocolFactory.java
index 2d20ecd..7f900b2 100644
--- a/src/java/org/apache/nutch/protocol/ProtocolFactory.java
+++ b/src/java/org/apache/nutch/protocol/ProtocolFactory.java
@@ -17,8 +17,13 @@
package org.apache.nutch.protocol;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
import java.net.URL;
import java.net.MalformedURLException;
+import java.util.HashMap;
+import java.util.Map;
import org.apache.nutch.plugin.Extension;
import org.apache.nutch.plugin.ExtensionPoint;
@@ -26,8 +31,13 @@ import org.apache.nutch.plugin.PluginRepository;
import org.apache.nutch.plugin.PluginRuntimeException;
import org.apache.nutch.util.ObjectCache;
+import org.apache.commons.lang.StringUtils;
+
import org.apache.hadoop.conf.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
/**
* Creates and caches {@link Protocol} plugins. Protocol plugins should define
* the attribute "protocolName" with the name of the protocol that they
@@ -37,10 +47,16 @@ import org.apache.hadoop.conf.Configuration;
*/
public class ProtocolFactory {
+ private static final Logger LOG = LoggerFactory
+ .getLogger(MethodHandles.lookup().lookupClass());
+
private ExtensionPoint extensionPoint;
private Configuration conf;
+ protected Map<String, String> defaultProtocolImplMapping = new HashMap<>();
+ protected Map<String, String> hostProtocolMapping = new HashMap<>();
+
public ProtocolFactory(Configuration conf) {
this.conf = conf;
this.extensionPoint = PluginRepository.get(conf).getExtensionPoint(
@@ -49,8 +65,35 @@ public class ProtocolFactory {
throw new RuntimeException("x-point " + Protocol.X_POINT_ID
+ " not found.");
}
- }
+ try {
+ BufferedReader reader = new BufferedReader(conf.getConfResourceAsReader("host-protocol-mapping.txt"));
+ String line;
+ String parts[];
+ while ((line = reader.readLine()) != null) {
+ if (StringUtils.isNotBlank(line) && !line.startsWith("#")) {
+ line = line.trim();
+ parts = line.split("\t");
+
+ // Must be at least two parts
+ if (parts.length == 2) {
+ // Is this a host to plugin mapping, or a default?
+ if (parts[0].indexOf(":") == -1) {
+ hostProtocolMapping.put(parts[0].trim(), parts[1].trim());
+ } else {
+ String[] moreParts = parts[0].split(":");
+ defaultProtocolImplMapping.put(moreParts[1].trim(), parts[1].trim());
+ }
+ } else {
+ LOG.warn("Wrong format of line: {}", line);
+ LOG.warn("Expected format: <hostname> <tab> <plugin_id> or protocol:<protocol> <tab> <plugin_id>");
+ }
+ }
+ }
+ } catch (IOException e) {
+ LOG.error("Unable to read host-protocol-mapping.txt", e);
+ }
+ }
/**
* Returns the appropriate {@link Protocol} implementation for a url.
*
@@ -83,52 +126,92 @@ public class ProtocolFactory {
*/
public Protocol getProtocol(URL url)
throws ProtocolNotFound {
- ObjectCache objectCache = ObjectCache.get(conf);
try {
- String protocolName = url.getProtocol();
- if (protocolName == null) {
- throw new ProtocolNotFound(url.toString());
+ Protocol protocol = null;
+
+ // First attempt to resolve a protocol implementation by hostname
+ String host = url.getHost();
+ if (hostProtocolMapping.containsKey(host)) {
+ Extension extension = getExtensionById(hostProtocolMapping.get(host));
+ if (extension != null) {
+ protocol = getProtocolInstanceByExtension(extension);
+ }
}
- String cacheId = Protocol.X_POINT_ID + protocolName;
- synchronized (objectCache) {
- Protocol protocol = (Protocol) objectCache.getObject(cacheId);
- if (protocol != null) {
- return protocol;
+ // Nothing, see if we have defaults configured
+ if (protocol == null) {
+ // Protocol listed in default map?
+ if (defaultProtocolImplMapping.containsKey(url.getProtocol())) {
+ Extension extension = getExtensionById(defaultProtocolImplMapping.get(url.getProtocol()));
+ if (extension != null) {
+ protocol = getProtocolInstanceByExtension(extension);
+ }
}
+ }
- Extension extension = findExtension(protocolName);
- if (extension == null) {
- throw new ProtocolNotFound(protocolName);
+ // Still couldn't find a protocol? Attempt by protocol
+ if (protocol == null) {
+ Extension extension = findExtension(url.getProtocol(), "protocolName");
+ if (extension != null) {
+ protocol = getProtocolInstanceByExtension(extension);
}
+ }
- protocol = (Protocol) extension.getExtensionInstance();
- objectCache.setObject(cacheId, protocol);
+ // Got anything?
+ if (protocol != null) {
return protocol;
}
+
+ // Nothing!
+ throw new ProtocolNotFound(url.toString());
} catch (PluginRuntimeException e) {
throw new ProtocolNotFound(url.toString(), e.toString());
}
}
- private Extension findExtension(String name) throws PluginRuntimeException {
+ private Protocol getProtocolInstanceByExtension(Extension extension) throws PluginRuntimeException {
+ Protocol protocol = null;
+ String cacheId = extension.getId();
+ ObjectCache objectCache = ObjectCache.get(conf);
+ synchronized (objectCache) {
+ if (!objectCache.hasObject(cacheId)) {
+ protocol = (Protocol) extension.getExtensionInstance();
+ objectCache.setObject(cacheId, protocol);
+ }
+ protocol = (Protocol) objectCache.getObject(cacheId);
+ }
- Extension[] extensions = this.extensionPoint.getExtensions();
+ return protocol;
+ }
+ private Extension getExtensionById(String id) {
+ Extension[] extensions = this.extensionPoint.getExtensions();
for (int i = 0; i < extensions.length; i++) {
- Extension extension = extensions[i];
+ if (id.equals(extensions[i].getId())) {
+ return extensions[i];
+ }
+ }
+
+ return null;
+ }
- if (contains(name, extension.getAttribute("protocolName")))
+ private Extension findExtension(String name, String attribute) throws PluginRuntimeException {
+ for (int i = 0; i < this.extensionPoint.getExtensions().length; i++) {
+ Extension extension = this.extensionPoint.getExtensions()[i];
+
+ if (contains(name, extension.getAttribute(attribute)))
return extension;
}
return null;
}
boolean contains(String what, String where) {
- String parts[] = where.split("[, ]");
- for (int i = 0; i < parts.length; i++) {
- if (parts[i].equals(what))
- return true;
+ if (where != null) {
+ String parts[] = where.split("[, ]");
+ for (int i = 0; i < parts.length; i++) {
+ if (parts[i].equals(what))
+ return true;
+ }
}
return false;
}
diff --git a/src/java/org/apache/nutch/util/ObjectCache.java b/src/java/org/apache/nutch/util/ObjectCache.java
index 4ed3fd0..f1b14c8 100644
--- a/src/java/org/apache/nutch/util/ObjectCache.java
+++ b/src/java/org/apache/nutch/util/ObjectCache.java
@@ -52,6 +52,10 @@ public class ObjectCache {
return objectMap.get(key);
}
+ public boolean hasObject(String key) {
+ return objectMap.containsKey(key);
+ }
+
public synchronized void setObject(String key, Object value) {
objectMap.put(key, value);
}
diff --git a/src/test/org/apache/nutch/protocol/TestProtocolFactory.java b/src/test/org/apache/nutch/protocol/TestProtocolFactory.java
index 394c303..7cab623 100644
--- a/src/test/org/apache/nutch/protocol/TestProtocolFactory.java
+++ b/src/test/org/apache/nutch/protocol/TestProtocolFactory.java
@@ -59,12 +59,6 @@ public class TestProtocolFactory {
Assert.fail("Must not throw any other exception");
}
- // cache key
- Object protocol = ObjectCache.get(conf).getObject(
- Protocol.X_POINT_ID + "http");
- Assert.assertNotNull(protocol);
- Assert.assertEquals(httpProtocol, protocol);
-
// test same object instance
try {
Assert.assertTrue(httpProtocol == factory.getProtocol("http://somehost"));