You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by sn...@apache.org on 2017/12/17 13:15:52 UTC

[nutch] 01/03: fix for NUTCH-2477 (refactor checker classes) contributed by Jurian Broertjes

This is an automated email from the ASF dual-hosted git repository.

snagel pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nutch.git

commit 9c684114337241bb7652619c11b7374b27204dcb
Author: Jurian Broertjes <ju...@openindex.io>
AuthorDate: Tue Dec 12 15:52:59 2017 +0000

    fix for NUTCH-2477 (refactor checker classes) contributed by Jurian Broertjes
---
 .../nutch/indexer/IndexingFiltersChecker.java      | 125 ++-------------
 .../org/apache/nutch/net/URLFilterChecker.java     | 126 +++++----------
 src/java/org/apache/nutch/net/URLFilters.java      |   4 +
 .../org/apache/nutch/net/URLNormalizerChecker.java | 102 +++++-------
 .../org/apache/nutch/util/AbstractChecker.java     | 171 +++++++++++++++++++++
 5 files changed, 269 insertions(+), 259 deletions(-)

diff --git a/src/java/org/apache/nutch/indexer/IndexingFiltersChecker.java b/src/java/org/apache/nutch/indexer/IndexingFiltersChecker.java
index 05caf5a..5491638 100644
--- a/src/java/org/apache/nutch/indexer/IndexingFiltersChecker.java
+++ b/src/java/org/apache/nutch/indexer/IndexingFiltersChecker.java
@@ -17,23 +17,14 @@
 
 package org.apache.nutch.indexer;
 
-import java.io.BufferedReader;
-import java.io.InputStreamReader;
-import java.io.PrintWriter;
 import java.lang.invoke.MethodHandles;
-import java.net.ServerSocket;
-import java.net.Socket;
-import java.net.InetSocketAddress;
-import java.nio.charset.Charset;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.nutch.crawl.CrawlDatum;
 import org.apache.nutch.crawl.Inlinks;
@@ -52,6 +43,7 @@ import org.apache.nutch.protocol.ProtocolOutput;
 import org.apache.nutch.scoring.ScoringFilters;
 import org.apache.nutch.util.NutchConfiguration;
 import org.apache.nutch.util.StringUtil;
+import org.apache.nutch.util.AbstractChecker;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -65,41 +57,33 @@ import org.slf4j.LoggerFactory;
  * @author Julien Nioche
  **/
 
-public class IndexingFiltersChecker extends Configured implements Tool {
+public class IndexingFiltersChecker extends AbstractChecker {
 
   protected URLNormalizers normalizers = null;
   protected boolean dumpText = false;
   protected boolean followRedirects = false;
-  protected boolean keepClientCnxOpen = false;
   // used to simulate the metadata propagated from injection
   protected HashMap<String, String> metadata = new HashMap<>();
-  protected int tcpPort = -1;
 
   private static final Logger LOG = LoggerFactory
       .getLogger(MethodHandles.lookup().lookupClass());
 
-  public IndexingFiltersChecker() {
-
-  }
-
   public int run(String[] args) throws Exception {
     String url = null;
-    String usage = "Usage: IndexingFiltersChecker [-normalize] [-followRedirects] [-dumpText] [-md key=value] [-listen <port>] [-keepClientCnxOpen]";
+    usage = "Usage: IndexingFiltersChecker [-normalize] [-followRedirects] [-dumpText] [-md key=value] (-stdin | -listen <port> [-keepClientCnxOpen])";
 
-    if (args.length == 0) {
+    // Print help when no args given
+    if (args.length < 1) {
       System.err.println(usage);
-      return -1;
+      System.exit(-1);
     }
 
+    int numConsumed;
     for (int i = 0; i < args.length; i++) {
       if (args[i].equals("-normalize")) {
         normalizers = new URLNormalizers(getConf(), URLNormalizers.SCOPE_DEFAULT);
-      } else if (args[i].equals("-listen")) {
-        tcpPort = Integer.parseInt(args[++i]);
       } else if (args[i].equals("-followRedirects")) {
         followRedirects = true;
-      } else if (args[i].equals("-keepClientCnxOpen")) {
-        keepClientCnxOpen = true;
       } else if (args[i].equals("-dumpText")) {
         dumpText = true;
       } else if (args[i].equals("-md")) {
@@ -112,104 +96,27 @@ public class IndexingFiltersChecker extends Configured implements Tool {
         } else
           k = nextOne;
         metadata.put(k, v);
+      } else if ((numConsumed = super.parseArgs(args, i)) > 0) {
+        i += numConsumed - 1;
       } else if (i != args.length - 1) {
+        System.err.println("ERR: Not a recognized argument: " + args[i]);
         System.err.println(usage);
         System.exit(-1);
       } else {
-        url =args[i];
+        url = args[i];
       }
     }
     
-    // In listening mode?
-    if (tcpPort == -1) {
-      // No, just fetch and display
-      StringBuilder output = new StringBuilder();
-      int ret = fetch(url, output);
-      System.out.println(output);
-      return ret;
+    if (url != null) {
+      return super.processSingle(url);
     } else {
-      // Listen on socket and start workers on incoming requests
-      listen();
-    }
-    
-    return 0;
-  }
-  
-  protected void listen() throws Exception {
-    ServerSocket server = null;
-
-    try{
-      server = new ServerSocket();
-      server.bind(new InetSocketAddress(tcpPort));
-      LOG.info(server.toString());
-    } catch (Exception e) {
-      LOG.error("Could not listen on port " + tcpPort);
-      System.exit(-1);
-    }
-    
-    while(true){
-      Worker worker;
-      try{
-        worker = new Worker(server.accept());
-        Thread thread = new Thread(worker);
-        thread.start();
-      } catch (Exception e) {
-        LOG.error("Accept failed: " + tcpPort);
-        System.exit(-1);
-      }
-    }
-  }
-  
-  private class Worker implements Runnable {
-    private Socket client;
-
-    Worker(Socket client) {
-      this.client = client;
-      LOG.info(client.toString());
-    }
-
-    public void run() {
-      if (keepClientCnxOpen) {
-        while (true) { // keep connection open until closes
-          readWrite();
-        }
-      } else {
-        readWrite();
-        
-        try { // close ourselves
-          client.close();
-        } catch (Exception e){
-          LOG.error(e.toString());
-        }
-      }
-    }
-    
-    protected void readWrite() {
-      String line;
-      BufferedReader in = null;
-      PrintWriter out = null;
-      
-      try{
-        in = new BufferedReader(new InputStreamReader(client.getInputStream()));
-      } catch (Exception e) {
-        LOG.error("in or out failed");
-        System.exit(-1);
-      }
-
-      try{
-        line = in.readLine();        
-        StringBuilder output = new StringBuilder();
-        fetch(line, output);
-        
-        client.getOutputStream().write(output.toString().getBytes(Charset.forName("UTF-8")));
-      }catch (Exception e) {
-        LOG.error("Read/Write failed: " + e);
-      }
+      // Start listening
+      return super.run();
     }
   }
     
   
-  protected int fetch(String url, StringBuilder output) throws Exception {
+  protected int process(String url, StringBuilder output) throws Exception {
     if (normalizers != null) {
       url = normalizers.normalize(url, URLNormalizers.SCOPE_DEFAULT);
     }
diff --git a/src/java/org/apache/nutch/net/URLFilterChecker.java b/src/java/org/apache/nutch/net/URLFilterChecker.java
index 6fb3cf2..429aa9f 100644
--- a/src/java/org/apache/nutch/net/URLFilterChecker.java
+++ b/src/java/org/apache/nutch/net/URLFilterChecker.java
@@ -21,8 +21,9 @@ import org.apache.nutch.plugin.Extension;
 import org.apache.nutch.plugin.ExtensionPoint;
 import org.apache.nutch.plugin.PluginRepository;
 
-import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ToolRunner;
 
+import org.apache.nutch.util.AbstractChecker;
 import org.apache.nutch.util.NutchConfiguration;
 
 import java.io.BufferedReader;
@@ -33,103 +34,60 @@ import java.io.InputStreamReader;
  * 
  * @author John Xing
  */
-public class URLFilterChecker {
+public class URLFilterChecker extends AbstractChecker {
 
-  private Configuration conf;
+  private URLFilters filters = null;
 
-  public URLFilterChecker(Configuration conf) {
-    this.conf = conf;
-  }
-
-  private void checkOne(String filterName) throws Exception {
-    URLFilter filter = null;
-
-    ExtensionPoint point = PluginRepository.get(conf).getExtensionPoint(
-        URLFilter.X_POINT_ID);
-
-    if (point == null)
-      throw new RuntimeException(URLFilter.X_POINT_ID + " not found.");
-
-    Extension[] extensions = point.getExtensions();
-
-    for (int i = 0; i < extensions.length; i++) {
-      Extension extension = extensions[i];
-      filter = (URLFilter) extension.getExtensionInstance();
-      if (filter.getClass().getName().equals(filterName)) {
-        break;
-      } else {
-        filter = null;
-      }
-    }
-
-    if (filter == null)
-      throw new RuntimeException("Filter " + filterName + " not found.");
-
-    // jerome : should we keep this behavior?
-    // if (LogFormatter.hasLoggedSevere())
-    // throw new RuntimeException("Severe error encountered.");
-
-    System.out.println("Checking URLFilter " + filterName);
-
-    BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
-    String line;
-    while ((line = in.readLine()) != null) {
-      String out = filter.filter(line);
-      if (out != null) {
-        System.out.print("+");
-        System.out.println(out);
-      } else {
-        System.out.print("-");
-        System.out.println(line);
-      }
-    }
-  }
+  public int run(String[] args) throws Exception {
+    usage = "Usage: URLFilterChecker [-filterName filterName] (-stdin | -listen <port> [-keepClientCnxOpen]) \n"
+        + "\n\tTool takes a list of URLs, one per line.\n";
 
-  private void checkAll() throws Exception {
-    System.out.println("Checking combination of all URLFilters available");
-
-    BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
-    String line;
-    URLFilters filters = new URLFilters(this.conf);
-
-    while ((line = in.readLine()) != null) {
-      String out = filters.filter(line);
-      if (out != null) {
-        System.out.print("+");
-        System.out.println(out);
-      } else {
-        System.out.print("-");
-        System.out.println(line);
-      }
-    }
-  }
-
-  public static void main(String[] args) throws Exception {
-
-    String usage = "Usage: URLFilterChecker (-filterName filterName | -allCombined) \n"
-        + "Tool takes a list of URLs, one per line, passed via STDIN.\n";
-
-    if (args.length == 0) {
+    // Print help when no args given
+    if (args.length < 1) {
       System.err.println(usage);
       System.exit(-1);
     }
 
-    String filterName = null;
-    if (args[0].equals("-filterName")) {
-      if (args.length != 2) {
+    int numConsumed;
+    for (int i = 0; i < args.length; i++) {
+      if (args[i].equals("-filterName")) {
+        getConf().set("plugin.includes", args[++i]);
+      } else if ((numConsumed = super.parseArgs(args, i)) > 0) {
+        i += numConsumed - 1;
+      } else {
+        System.err.println("ERR: Not a recognized argument: " + args[i]);
         System.err.println(usage);
         System.exit(-1);
       }
-      filterName = args[1];
     }
 
-    URLFilterChecker checker = new URLFilterChecker(NutchConfiguration.create());
-    if (filterName != null) {
-      checker.checkOne(filterName);
+    // Print active filter list
+    filters = new URLFilters(getConf());
+    System.out.print("Checking combination of these URLFilters: ");
+    for (URLFilter filter : filters.getFilters()) {
+      System.out.print(filter.getClass().getSimpleName() + " ");
+    }
+    System.out.println("");
+
+    // Start listening
+    return super.run();
+  }
+
+  protected int process(String line, StringBuilder output) throws Exception {
+    String out = filters.filter(line);
+    if (out != null) {
+      output.append("+");
+      output.append(out);
     } else {
-      checker.checkAll();
+      output.append("-");
+      output.append(line);
     }
+    return 0;
+  }
 
-    System.exit(0);
+  public static void main(String[] args) throws Exception {
+    final int res = ToolRunner.run(NutchConfiguration.create(),
+        new URLFilterChecker(), args);
+    System.exit(res);
   }
 }
diff --git a/src/java/org/apache/nutch/net/URLFilters.java b/src/java/org/apache/nutch/net/URLFilters.java
index 3deccca..4f5bf36 100644
--- a/src/java/org/apache/nutch/net/URLFilters.java
+++ b/src/java/org/apache/nutch/net/URLFilters.java
@@ -31,6 +31,10 @@ public class URLFilters {
         URLFilter.class, URLFilter.X_POINT_ID, URLFILTER_ORDER);
   }
 
+  public URLFilter[] getFilters() {
+    return this.filters;
+  }
+
   /** Run all defined filters. Assume logical AND. */
   public String filter(String urlString) throws URLFilterException {
     for (int i = 0; i < this.filters.length; i++) {
diff --git a/src/java/org/apache/nutch/net/URLNormalizerChecker.java b/src/java/org/apache/nutch/net/URLNormalizerChecker.java
index d8f1c6e..a435cc8 100644
--- a/src/java/org/apache/nutch/net/URLNormalizerChecker.java
+++ b/src/java/org/apache/nutch/net/URLNormalizerChecker.java
@@ -21,8 +21,9 @@ import org.apache.nutch.plugin.Extension;
 import org.apache.nutch.plugin.ExtensionPoint;
 import org.apache.nutch.plugin.PluginRepository;
 
-import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ToolRunner;
 
+import org.apache.nutch.util.AbstractChecker;
 import org.apache.nutch.util.NutchConfiguration;
 
 import java.io.BufferedReader;
@@ -31,87 +32,56 @@ import java.io.InputStreamReader;
 /**
  * Checks one given normalizer or all normalizers.
  */
-public class URLNormalizerChecker {
+public class URLNormalizerChecker extends AbstractChecker {
 
-  private Configuration conf;
+  private String scope = URLNormalizers.SCOPE_DEFAULT;
+  URLNormalizers normalizers;
 
-  public URLNormalizerChecker(Configuration conf) {
-    this.conf = conf;
-  }
-
-  private void checkOne(String normalizerName, String scope) throws Exception {
-    URLNormalizer normalizer = null;
-
-    ExtensionPoint point = PluginRepository.get(conf).getExtensionPoint(
-        URLNormalizer.X_POINT_ID);
-
-    if (point == null)
-      throw new RuntimeException(URLNormalizer.X_POINT_ID + " not found.");
-
-    Extension[] extensions = point.getExtensions();
-
-    for (int i = 0; i < extensions.length; i++) {
-      Extension extension = extensions[i];
-      normalizer = (URLNormalizer) extension.getExtensionInstance();
-      if (normalizer.getClass().getName().equals(normalizerName)) {
-        break;
-      } else {
-        normalizer = null;
-      }
-    }
-
-    if (normalizer == null)
-      throw new RuntimeException("URLNormalizer " + normalizerName
-          + " not found.");
-
-    System.out.println("Checking URLNormalizer " + normalizerName);
-
-    BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
-    String line;
-    while ((line = in.readLine()) != null) {
-      String out = normalizer.normalize(line, scope);
-      System.out.println(out);
-    }
-  }
-
-  private void checkAll(String scope) throws Exception {
-    System.out.println("Checking combination of all URLNormalizers available");
+  public int run(String[] args) throws Exception {
+    usage = "Usage: URLNormalizerChecker [-normalizer <normalizerName>] [-scope <scope>] (-stdin | -listen <port> [-keepClientCnxOpen])"
+        + "\n\tscope can be one of: default,partition,generate_host_count,fetcher,crawldb,linkdb,inject,outlink\n";
 
-    BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
-    String line;
-    URLNormalizers normalizers = new URLNormalizers(conf, scope);
-    while ((line = in.readLine()) != null) {
-      String out = normalizers.normalize(line, scope);
-      System.out.println(out);
+    // Print help when no args given
+    if (args.length < 1) {
+      System.err.println(usage);
+      System.exit(-1);
     }
-  }
-
-  public static void main(String[] args) throws Exception {
 
-    String usage = "Usage: URLNormalizerChecker [-normalizer <normalizerName>] [-scope <scope>]"
-        + "\n\tscope can be one of: default,partition,generate_host_count,fetcher,crawldb,linkdb,inject,outlink";
-
-    String normalizerName = null;
-    String scope = URLNormalizers.SCOPE_DEFAULT;
+    int numConsumed;
     for (int i = 0; i < args.length; i++) {
       if (args[i].equals("-normalizer")) {
-        normalizerName = args[++i];
+        getConf().set("plugin.includes", args[++i]);
       } else if (args[i].equals("-scope")) {
         scope = args[++i];
+      } else if ((numConsumed = super.parseArgs(args, i)) > 0) {
+        i += numConsumed - 1;
       } else {
+        System.err.println("ERR: Not a recognized argument: " + args[i]);
         System.err.println(usage);
         System.exit(-1);
       }
     }
 
-    URLNormalizerChecker checker = new URLNormalizerChecker(
-        NutchConfiguration.create());
-    if (normalizerName != null) {
-      checker.checkOne(normalizerName, scope);
-    } else {
-      checker.checkAll(scope);
+    // Print active normalizer list
+    normalizers = new URLNormalizers(getConf(), scope);
+    System.out.print("Checking combination of these URLNormalizers: ");
+    for (URLNormalizer normalizer : normalizers.getURLNormalizers(scope)) {
+      System.out.print(normalizer.getClass().getSimpleName() + " ");
     }
+    System.out.println("");
+
+    // Start listening
+    return super.run();
+  }
 
-    System.exit(0);
+  protected int process(String line, StringBuilder output) throws Exception {
+    output.append(normalizers.normalize(line, scope));
+    return 0;
+  }
+
+  public static void main(String[] args) throws Exception {
+    final int res = ToolRunner.run(NutchConfiguration.create(),
+        new URLNormalizerChecker(), args);
+    System.exit(res);
   }
 }
diff --git a/src/java/org/apache/nutch/util/AbstractChecker.java b/src/java/org/apache/nutch/util/AbstractChecker.java
new file mode 100644
index 0000000..8424879
--- /dev/null
+++ b/src/java/org/apache/nutch/util/AbstractChecker.java
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.util;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.io.PrintWriter;
+import java.lang.invoke.MethodHandles;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.InetSocketAddress;
+import java.nio.charset.StandardCharsets;
+
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.util.Tool;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Scaffolding class for the various Checker implementations. Can process cmdline input, stdin and TCP connections.
+ * 
+ * @author Jurian Broertjes
+ */
+public abstract class AbstractChecker extends Configured implements Tool {
+
+  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  protected boolean keepClientCnxOpen = false;
+  protected int tcpPort = -1;
+  protected boolean stdin = true;
+  protected String usage;
+
+  // Actual function for the processing of a single input
+  protected abstract int process(String line, StringBuilder output) throws Exception;
+
+  protected int parseArgs(String[] args, int i) {
+    if (args[i].equals("-listen")) {
+      tcpPort = Integer.parseInt(args[++i]);
+      return 2;
+    } else if (args[i].equals("-keepClientCnxOpen")) {
+      keepClientCnxOpen = true;
+      return 1;
+    } else if (args[i].equals("-stdin")) {
+      stdin = true;
+      return 1;
+    }
+    return 0;
+  }
+
+  protected int run() throws Exception {
+    // In listening mode?
+    if (tcpPort != -1) {
+      processTCP(tcpPort);
+      return 0;
+    } else if (stdin) {
+      return processStdin();
+    }
+    // Nothing to do?
+    return -1;
+  }
+
+  // Process single input and return
+  protected int processSingle(String input) throws Exception {
+    StringBuilder output = new StringBuilder();
+    int ret = process(input, output);
+    System.out.println(output);
+    return ret;
+  }
+
+  // Read from stdin
+  protected int processStdin() throws Exception {
+    BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
+    String line;
+    while ((line = in.readLine()) != null) {
+      StringBuilder output = new StringBuilder();
+      int ret = process(line, output);
+      System.out.println(output);
+    }
+    return 0;
+  }
+
+  // Open TCP socket and process input
+  protected void processTCP(int tcpPort) throws Exception {
+    ServerSocket server = null;
+
+    try {
+      server = new ServerSocket();
+      server.bind(new InetSocketAddress(tcpPort));
+      LOG.info(server.toString());
+    } catch (Exception e) {
+      LOG.error("Could not listen on port " + tcpPort);
+      System.exit(-1);
+    }
+    
+    while(true){
+      Worker worker;
+      try {
+        worker = new Worker(server.accept());
+        Thread thread = new Thread(worker);
+        thread.start();
+      } catch (Exception e) {
+        LOG.error("Accept failed: " + tcpPort);
+        System.exit(-1);
+      }
+    }
+  }
+
+  private class Worker implements Runnable {
+    private Socket client;
+
+    Worker(Socket client) {
+      this.client = client;
+      LOG.info(client.toString());
+    }
+
+    public void run() {
+      if (keepClientCnxOpen) {
+        while (true) { // keep connection open until closes
+          readWrite();
+        }
+      } else {
+        readWrite();
+        
+        try { // close ourselves
+          client.close();
+        } catch (Exception e){
+          LOG.error(e.toString());
+        }
+      }
+    }
+    
+    protected void readWrite() {
+      String line;
+      BufferedReader in = null;
+      PrintWriter out = null;
+      
+      try {
+        in = new BufferedReader(new InputStreamReader(client.getInputStream()));
+      } catch (Exception e) {
+        LOG.error("in or out failed");
+        System.exit(-1);
+      }
+
+      try {
+        line = in.readLine();
+        StringBuilder output = new StringBuilder();
+        process(line, output);
+        
+        client.getOutputStream().write(output.toString().getBytes(StandardCharsets.UTF_8));
+      } catch (Exception e) {
+        LOG.error("Read/Write failed: " + e);
+      }
+    }
+  }
+}
\ No newline at end of file

-- 
To stop receiving notification emails like this one, please contact
"commits@nutch.apache.org" <co...@nutch.apache.org>.