You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by ma...@apache.org on 2019/01/18 12:27:43 UTC

[nutch] branch master updated: NUTCH-2678 Allow for per-host configurable protocol plugin

This is an automated email from the ASF dual-hosted git repository.

markus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nutch.git


The following commit(s) were added to refs/heads/master by this push:
     new 64a6f7c  NUTCH-2678 Allow for per-host configurable protocol plugin
64a6f7c is described below

commit 64a6f7cc533f41e53cf6b637979080235b2147fa
Author: Markus Jelsma <ma...@apache.org>
AuthorDate: Fri Jan 18 13:27:16 2019 +0100

    NUTCH-2678 Allow for per-host configurable protocol plugin
---
 conf/host-protocol-mapping.txt.template            |  16 +++
 src/java/org/apache/nutch/plugin/Extension.java    |   4 +
 .../org/apache/nutch/protocol/ProtocolFactory.java | 129 +++++++++++++++++----
 src/java/org/apache/nutch/util/ObjectCache.java    |   4 +
 .../apache/nutch/protocol/TestProtocolFactory.java |   6 -
 5 files changed, 130 insertions(+), 29 deletions(-)

diff --git a/conf/host-protocol-mapping.txt.template b/conf/host-protocol-mapping.txt.template
new file mode 100644
index 0000000..a09bca6
--- /dev/null
+++ b/conf/host-protocol-mapping.txt.template
@@ -0,0 +1,16 @@
+# This file defines a hostname to protocol plugin mapping. Each line takes a
+# host name followed by a tab, followed by the ID of the protocol plugin. You
+# can find the ID in the protocol plugin's plugin.xml file.
+#
+# <hostname>\t<plugin_id>\n
+# nutch.apache.org	org.apache.nutch.protocol.httpclient.Http
+# tika.apache.org	org.apache.nutch.protocol.http.Http
+#
+# If the requested host is not mapped, Nutch can choose any of the enabled
+# plugins so you can force defaults using:
+#
+# protocol:<protocol>\t<plugin_id>\n
+#
+# This example forces httpclient for all protocol in case the host is not mapped:
+# protocol:http	org.apache.nutch.protocol.httpclient.Http
+# protocol:https	org.apache.nutch.protocol.httpclient.Http
diff --git a/src/java/org/apache/nutch/plugin/Extension.java b/src/java/org/apache/nutch/plugin/Extension.java
index e73b850..be737cb 100644
--- a/src/java/org/apache/nutch/plugin/Extension.java
+++ b/src/java/org/apache/nutch/plugin/Extension.java
@@ -197,4 +197,8 @@ public class Extension {
   public void setDescriptor(PluginDescriptor pDescriptor) {
     fDescriptor = pDescriptor;
   }
+
+  public String toString() {
+    return getId() + ", " + getClazz() + ", " + getTargetPoint();
+  }
 }
diff --git a/src/java/org/apache/nutch/protocol/ProtocolFactory.java b/src/java/org/apache/nutch/protocol/ProtocolFactory.java
index 2d20ecd..7f900b2 100644
--- a/src/java/org/apache/nutch/protocol/ProtocolFactory.java
+++ b/src/java/org/apache/nutch/protocol/ProtocolFactory.java
@@ -17,8 +17,13 @@
 
 package org.apache.nutch.protocol;
 
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
 import java.net.URL;
 import java.net.MalformedURLException;
+import java.util.HashMap;
+import java.util.Map;
 
 import org.apache.nutch.plugin.Extension;
 import org.apache.nutch.plugin.ExtensionPoint;
@@ -26,8 +31,13 @@ import org.apache.nutch.plugin.PluginRepository;
 import org.apache.nutch.plugin.PluginRuntimeException;
 import org.apache.nutch.util.ObjectCache;
 
+import org.apache.commons.lang.StringUtils;
+
 import org.apache.hadoop.conf.Configuration;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 /**
  * Creates and caches {@link Protocol} plugins. Protocol plugins should define
  * the attribute "protocolName" with the name of the protocol that they
@@ -37,10 +47,16 @@ import org.apache.hadoop.conf.Configuration;
  */
 public class ProtocolFactory {
 
+  private static final Logger LOG = LoggerFactory
+      .getLogger(MethodHandles.lookup().lookupClass());
+
   private ExtensionPoint extensionPoint;
 
   private Configuration conf;
 
+  protected Map<String, String> defaultProtocolImplMapping = new HashMap<>();
+  protected Map<String, String> hostProtocolMapping = new HashMap<>();
+
   public ProtocolFactory(Configuration conf) {
     this.conf = conf;
     this.extensionPoint = PluginRepository.get(conf).getExtensionPoint(
@@ -49,8 +65,35 @@ public class ProtocolFactory {
       throw new RuntimeException("x-point " + Protocol.X_POINT_ID
           + " not found.");
     }
-  }
 
+    try {
+      BufferedReader reader = new BufferedReader(conf.getConfResourceAsReader("host-protocol-mapping.txt"));
+      String line;
+      String parts[];
+      while ((line = reader.readLine()) != null) {
+        if (StringUtils.isNotBlank(line) && !line.startsWith("#")) {
+          line = line.trim();
+          parts = line.split("\t");
+
+          // Must be at least two parts
+          if (parts.length == 2) {
+            // Is this a host to plugin mapping, or a default?
+            if (parts[0].indexOf(":") == -1) {
+              hostProtocolMapping.put(parts[0].trim(), parts[1].trim());
+            } else {
+              String[] moreParts = parts[0].split(":");
+              defaultProtocolImplMapping.put(moreParts[1].trim(), parts[1].trim());
+            }
+          } else {
+            LOG.warn("Wrong format of line: {}", line);
+            LOG.warn("Expected format: <hostname> <tab> <plugin_id> or protocol:<protocol> <tab> <plugin_id>");
+          }
+        }
+      }
+    } catch (IOException e) {
+      LOG.error("Unable to read host-protocol-mapping.txt", e);
+    }
+  }
   /**
    * Returns the appropriate {@link Protocol} implementation for a url.
    * 
@@ -83,52 +126,92 @@ public class ProtocolFactory {
    */
   public Protocol getProtocol(URL url)
       throws ProtocolNotFound {
-    ObjectCache objectCache = ObjectCache.get(conf);
     try {
-      String protocolName = url.getProtocol();
-      if (protocolName == null) {
-        throw new ProtocolNotFound(url.toString());
+      Protocol protocol = null;
+
+      // First attempt to resolve a protocol implementation by hostname
+      String host = url.getHost();
+      if (hostProtocolMapping.containsKey(host)) {
+        Extension extension = getExtensionById(hostProtocolMapping.get(host));
+        if (extension != null) {
+          protocol = getProtocolInstanceByExtension(extension);
+        }
       }
 
-      String cacheId = Protocol.X_POINT_ID + protocolName;
-      synchronized (objectCache) {
-        Protocol protocol = (Protocol) objectCache.getObject(cacheId);
-        if (protocol != null) {
-          return protocol;
+      // Nothing, see if we have defaults configured
+      if (protocol == null) {
+        // Protocol listed in default map?
+        if (defaultProtocolImplMapping.containsKey(url.getProtocol())) {
+          Extension extension = getExtensionById(defaultProtocolImplMapping.get(url.getProtocol()));
+          if (extension != null) {
+            protocol = getProtocolInstanceByExtension(extension);
+          }
         }
+      }
 
-        Extension extension = findExtension(protocolName);
-        if (extension == null) {
-          throw new ProtocolNotFound(protocolName);
+      // Still couldn't find a protocol? Attempt by protocol
+      if (protocol == null) {
+        Extension extension = findExtension(url.getProtocol(), "protocolName");
+        if (extension != null) {
+          protocol = getProtocolInstanceByExtension(extension);
         }
+      }
 
-        protocol = (Protocol) extension.getExtensionInstance();
-        objectCache.setObject(cacheId, protocol);
+      // Got anything?
+      if (protocol != null) {
         return protocol;
       }
+
+      // Nothing!
+      throw new ProtocolNotFound(url.toString());
     } catch (PluginRuntimeException e) {
       throw new ProtocolNotFound(url.toString(), e.toString());
     }
   }
 
-  private Extension findExtension(String name) throws PluginRuntimeException {
+  private Protocol getProtocolInstanceByExtension(Extension extension) throws PluginRuntimeException {
+    Protocol protocol = null;
+    String cacheId = extension.getId();
+    ObjectCache objectCache = ObjectCache.get(conf);
+    synchronized (objectCache) {
+      if (!objectCache.hasObject(cacheId)) {
+        protocol = (Protocol) extension.getExtensionInstance();
+        objectCache.setObject(cacheId, protocol);
+      }
+      protocol = (Protocol) objectCache.getObject(cacheId);
+    }
 
-    Extension[] extensions = this.extensionPoint.getExtensions();
+    return protocol;
+  }
 
+  private Extension getExtensionById(String id) {
+    Extension[] extensions = this.extensionPoint.getExtensions();
     for (int i = 0; i < extensions.length; i++) {
-      Extension extension = extensions[i];
+      if (id.equals(extensions[i].getId())) {
+        return extensions[i];
+      }
+    }
+
+    return null;
+  }
 
-      if (contains(name, extension.getAttribute("protocolName")))
+  private Extension findExtension(String name, String attribute) throws PluginRuntimeException {
+    for (int i = 0; i < this.extensionPoint.getExtensions().length; i++) {
+      Extension extension = this.extensionPoint.getExtensions()[i];
+
+      if (contains(name, extension.getAttribute(attribute)))
         return extension;
     }
     return null;
   }
 
   boolean contains(String what, String where) {
-    String parts[] = where.split("[, ]");
-    for (int i = 0; i < parts.length; i++) {
-      if (parts[i].equals(what))
-        return true;
+    if (where != null) {
+      String parts[] = where.split("[, ]");
+      for (int i = 0; i < parts.length; i++) {
+        if (parts[i].equals(what))
+          return true;
+      }
     }
     return false;
   }
diff --git a/src/java/org/apache/nutch/util/ObjectCache.java b/src/java/org/apache/nutch/util/ObjectCache.java
index 4ed3fd0..f1b14c8 100644
--- a/src/java/org/apache/nutch/util/ObjectCache.java
+++ b/src/java/org/apache/nutch/util/ObjectCache.java
@@ -52,6 +52,10 @@ public class ObjectCache {
     return objectMap.get(key);
   }
 
+  public boolean hasObject(String key) {
+    return objectMap.containsKey(key);
+  }
+
   public synchronized void setObject(String key, Object value) {
     objectMap.put(key, value);
   }
diff --git a/src/test/org/apache/nutch/protocol/TestProtocolFactory.java b/src/test/org/apache/nutch/protocol/TestProtocolFactory.java
index 394c303..7cab623 100644
--- a/src/test/org/apache/nutch/protocol/TestProtocolFactory.java
+++ b/src/test/org/apache/nutch/protocol/TestProtocolFactory.java
@@ -59,12 +59,6 @@ public class TestProtocolFactory {
       Assert.fail("Must not throw any other exception");
     }
 
-    // cache key
-    Object protocol = ObjectCache.get(conf).getObject(
-        Protocol.X_POINT_ID + "http");
-    Assert.assertNotNull(protocol);
-    Assert.assertEquals(httpProtocol, protocol);
-
     // test same object instance
     try {
       Assert.assertTrue(httpProtocol == factory.getProtocol("http://somehost"));