You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by ma...@apache.org on 2016/10/05 12:53:38 UTC

nutch git commit: NUTCH-2320 URLFilterChecker to run as TCP Telnet service

Repository: nutch
Updated Branches:
  refs/heads/master e53b34b23 -> 836b2e01d


NUTCH-2320 URLFilterChecker to run as TCP Telnet service


Project: http://git-wip-us.apache.org/repos/asf/nutch/repo
Commit: http://git-wip-us.apache.org/repos/asf/nutch/commit/836b2e01
Tree: http://git-wip-us.apache.org/repos/asf/nutch/tree/836b2e01
Diff: http://git-wip-us.apache.org/repos/asf/nutch/diff/836b2e01

Branch: refs/heads/master
Commit: 836b2e01d1a4e0e9443601da755ea37de91b8c7d
Parents: e53b34b
Author: Markus Jelsma <ma...@apache.org>
Authored: Wed Oct 5 14:53:05 2016 +0200
Committer: Markus Jelsma <ma...@apache.org>
Committed: Wed Oct 5 14:53:05 2016 +0200

----------------------------------------------------------------------
 .../org/apache/nutch/net/URLFilterChecker.java  | 181 +++++++++++++------
 1 file changed, 122 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nutch/blob/836b2e01/src/java/org/apache/nutch/net/URLFilterChecker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/nutch/net/URLFilterChecker.java b/src/java/org/apache/nutch/net/URLFilterChecker.java
index 89a3d00..86b91e2 100644
--- a/src/java/org/apache/nutch/net/URLFilterChecker.java
+++ b/src/java/org/apache/nutch/net/URLFilterChecker.java
@@ -17,16 +17,27 @@
 
 package org.apache.nutch.net;
 
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.io.PrintWriter;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.InetSocketAddress;
+import java.nio.charset.Charset;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
 import org.apache.nutch.plugin.Extension;
 import org.apache.nutch.plugin.ExtensionPoint;
 import org.apache.nutch.plugin.PluginRepository;
+import org.apache.nutch.util.NutchConfiguration;
 
 import org.apache.hadoop.conf.Configuration;
 
-import org.apache.nutch.util.NutchConfiguration;
-
-import java.io.BufferedReader;
-import java.io.InputStreamReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Checks one given filter or all filters.
@@ -36,62 +47,118 @@ import java.io.InputStreamReader;
 public class URLFilterChecker {
 
   private Configuration conf;
-
+  private static String filterName = null;
+  protected static boolean keepClientCnxOpen = false;
+  protected static int tcpPort = -1;
+  protected URLFilters filters = null;
+  
+  public static final Logger LOG = LoggerFactory
+      .getLogger(URLFilterChecker.class);
+  
   public URLFilterChecker(Configuration conf) {
+    System.out.println("Checking combination of all URLFilters available");
     this.conf = conf;
+    if (filterName != null) {
+        this.conf.set("plugin.includes", filterName);
+    }
+    filters = new URLFilters(this.conf);
   }
+  
+  public void run() throws Exception {
+    // In listening mode?
+    if (tcpPort == -1) {
+      // No, just fetch and display
+      checkStdin();
+    } else {
+      // Listen on socket and start workers on incoming requests
+      listen();
+    }
+  }
+  
+  private void listen() throws Exception {
+    ServerSocket server = null;
+
+    try{
+      server = new ServerSocket();
+      server.bind(new InetSocketAddress(tcpPort));
+      LOG.info(server.toString());
+    } catch (Exception e) {
+      LOG.error("Could not listen on port " + tcpPort);
+      System.exit(-1);
+    }
+    
+    while(true){
+      Worker worker;
+      try{
+        worker = new Worker(server.accept());
+        Thread thread = new Thread(worker);
+        thread.start();
+      } catch (Exception e) {
+        LOG.error("Accept failed: " + tcpPort);
+        System.exit(-1);
+      }
+    }
+  }
+  
+  private class Worker implements Runnable {
+    private Socket client;
 
-  private void checkOne(String filterName) throws Exception {
-    URLFilter filter = null;
-
-    ExtensionPoint point = PluginRepository.get(conf).getExtensionPoint(
-        URLFilter.X_POINT_ID);
-
-    if (point == null)
-      throw new RuntimeException(URLFilter.X_POINT_ID + " not found.");
-
-    Extension[] extensions = point.getExtensions();
+    Worker(Socket client) {
+      this.client = client;
+      LOG.info(client.toString());
+    }
 
-    for (int i = 0; i < extensions.length; i++) {
-      Extension extension = extensions[i];
-      filter = (URLFilter) extension.getExtensionInstance();
-      if (filter.getClass().getName().equals(filterName)) {
-        break;
+    public void run() {
+      if (keepClientCnxOpen) {
+        while (true) { // keep connection open until closes
+          readWrite();
+        }
       } else {
-        filter = null;
+        readWrite();
+        
+        try { // close ourselves
+          client.close();
+        } catch (Exception e){
+          LOG.error(e.toString());
+        }
       }
     }
+    
+    protected void readWrite() {
+      String line;
+      BufferedReader in = null;
+      PrintWriter out = null;
+      
+      try{
+        in = new BufferedReader(new InputStreamReader(client.getInputStream()));
+      } catch (Exception e) {
+        LOG.error("in or out failed");
+        System.exit(-1);
+      }
 
-    if (filter == null)
-      throw new RuntimeException("Filter " + filterName + " not found.");
-
-    // jerome : should we keep this behavior?
-    // if (LogFormatter.hasLoggedSevere())
-    // throw new RuntimeException("Severe error encountered.");
-
-    System.out.println("Checking URLFilter " + filterName);
-
-    BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
-    String line;
-    while ((line = in.readLine()) != null) {
-      String out = filter.filter(line);
-      if (out != null) {
-        System.out.print("+");
-        System.out.println(out);
-      } else {
-        System.out.print("-");
-        System.out.println(line);
+      try{
+        line = in.readLine();        
+
+        String result = filters.filter(line);
+        String output;
+        if (result != null) {
+          output = "+" + result + "\n";
+        } else {
+          output = "-" + line + "\n";;
+        }        
+        
+        client.getOutputStream().write(output.getBytes(Charset.forName("UTF-8")));
+      }catch (Exception e) {
+        LOG.error("Read/Write failed: " + e);
       }
     }
   }
 
-  private void checkAll() throws Exception {
-    System.out.println("Checking combination of all URLFilters available");
-
+  private void checkStdin() throws Exception {
     BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
     String line;
+    
     while ((line = in.readLine()) != null) {
-      URLFilters filters = new URLFilters(this.conf);
       String out = filters.filter(line);
       if (out != null) {
         System.out.print("+");
@@ -105,30 +172,26 @@ public class URLFilterChecker {
 
   public static void main(String[] args) throws Exception {
 
-    String usage = "Usage: URLFilterChecker (-filterName filterName | -allCombined) \n"
+    String usage = "Usage: URLFilterChecker (-filterName filterName | -allCombined) [-listen <port>] [-keepClientCnxOpen]) \n"
         + "Tool takes a list of URLs, one per line, passed via STDIN.\n";
 
-    if (args.length == 0) {
+    if (args.length < 1) {
       System.err.println(usage);
       System.exit(-1);
     }
 
-    String filterName = null;
-    if (args[0].equals("-filterName")) {
-      if (args.length != 2) {
-        System.err.println(usage);
-        System.exit(-1);
+    for (int i = 0; i < args.length; i++) {
+      if (args[i].equals("-filterName")) {
+        filterName = args[++i];
+      } else if (args[i].equals("-listen")) {
+        tcpPort = Integer.parseInt(args[++i]);
+      } else if (args[i].equals("-keepClientCnxOpen")) {
+        keepClientCnxOpen = true;
       }
-      filterName = args[1];
     }
-
+    
     URLFilterChecker checker = new URLFilterChecker(NutchConfiguration.create());
-    if (filterName != null) {
-      checker.checkOne(filterName);
-    } else {
-      checker.checkAll();
-    }
-
+    checker.run();
     System.exit(0);
   }
 }