You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by ma...@apache.org on 2016/01/15 11:45:27 UTC

svn commit: r1724771 - in /nutch/trunk: CHANGES.txt src/java/org/apache/nutch/indexer/IndexingFiltersChecker.java

Author: markus
Date: Fri Jan 15 10:45:27 2016
New Revision: 1724771

URL: http://svn.apache.org/viewvc?rev=1724771&view=rev
Log:
NUTCH-2194 Run IndexingFilterChecker as simple Telnet server

Modified:
    nutch/trunk/CHANGES.txt
    nutch/trunk/src/java/org/apache/nutch/indexer/IndexingFiltersChecker.java

Modified: nutch/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/nutch/trunk/CHANGES.txt?rev=1724771&r1=1724770&r2=1724771&view=diff
==============================================================================
--- nutch/trunk/CHANGES.txt (original)
+++ nutch/trunk/CHANGES.txt Fri Jan 15 10:45:27 2016
@@ -1,5 +1,7 @@
 Nutch Change Log
 
+* NUTCH-2194 Run IndexingFilterChecker as simple Telnet server (markus)
+
 * NUTCH-2196 IndexingFilterChecker to optionally normalize (markus)
 
 * NUTCH-2195 IndexingFilterChecker to optionally follow N redirects (markus)

Modified: nutch/trunk/src/java/org/apache/nutch/indexer/IndexingFiltersChecker.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/indexer/IndexingFiltersChecker.java?rev=1724771&r1=1724770&r2=1724771&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/indexer/IndexingFiltersChecker.java (original)
+++ nutch/trunk/src/java/org/apache/nutch/indexer/IndexingFiltersChecker.java Fri Jan 15 10:45:27 2016
@@ -17,6 +17,13 @@
 
 package org.apache.nutch.indexer;
 
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.io.PrintWriter;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.InetSocketAddress;
+import java.nio.charset.Charset;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -59,6 +66,13 @@ import org.slf4j.LoggerFactory;
 
 public class IndexingFiltersChecker extends Configured implements Tool {
 
+  protected URLNormalizers normalizers = null;
+  protected boolean dumpText = false;
+  protected boolean followRedirects = false;
+  // used to simulate the metadata propagated from injection
+  protected HashMap<String, String> metadata = new HashMap<String, String>();
+  protected int tcpPort = -1;
+
   public static final Logger LOG = LoggerFactory
       .getLogger(IndexingFiltersChecker.class);
 
@@ -67,25 +81,19 @@ public class IndexingFiltersChecker exte
   }
 
   public int run(String[] args) throws Exception {
-    String contentType = null;
     String url = null;
-    URLNormalizers normalizers = null;
-    boolean dumpText = false;
-    boolean followRedirects = false;
-
-    String usage = "Usage: IndexingFiltersChecker [-normalize] [-followRedirects] [-dumpText] [-md key=value] <url>";
+    String usage = "Usage: IndexingFiltersChecker [-normalize] [-followRedirects] [-dumpText] [-md key=value] [-listen <port>] <url>";
 
     if (args.length == 0) {
       System.err.println(usage);
       return -1;
     }
 
-    // used to simulate the metadata propagated from injection
-    HashMap<String, String> metadata = new HashMap<String, String>();
-
     for (int i = 0; i < args.length; i++) {
       if (args[i].equals("-normalize")) {
         normalizers = new URLNormalizers(getConf(), URLNormalizers.SCOPE_DEFAULT);
+      } else if (args[i].equals("-listen")) {
+        tcpPort = Integer.parseInt(args[++i]);
       } else if (args[i].equals("-followRedirects")) {
         followRedirects = true;
       } else if (args[i].equals("-dumpText")) {
@@ -108,6 +116,88 @@ public class IndexingFiltersChecker exte
       }
     }
     
+    // In listening mode?
+    if (tcpPort == -1) {
+      // No, just fetch and display
+      StringBuilder output = new StringBuilder();
+      int ret = fetch(url, output);
+      System.out.println(output);
+      return ret;
+    } else {
+      // Listen on socket and start workers on incoming requests
+      listen();
+    }
+    
+    return 0;
+  }
+  
+  protected void listen() throws Exception {
+    ServerSocket server = null;
+
+    try{
+      server = new ServerSocket();
+      server.bind(new InetSocketAddress(tcpPort));
+      LOG.info(server.toString());
+    } catch (Exception e) {
+      LOG.error("Could not listen on port " + tcpPort);
+      System.exit(-1);
+    }
+    
+    while(true){
+      Worker worker;
+      try{
+        worker = new Worker(server.accept());
+        Thread thread = new Thread(worker);
+        thread.start();
+      } catch (Exception e) {
+        LOG.error("Accept failed: " + tcpPort);
+        System.exit(-1);
+      }
+    }
+  }
+  
+  private class Worker implements Runnable {
+    private Socket client;
+
+    Worker(Socket client) {
+      this.client = client;
+      LOG.info(client.toString());
+    }
+
+    public void run(){
+      String line;
+      BufferedReader in = null;
+      PrintWriter out = null;
+      
+      try{
+        in = new BufferedReader(new InputStreamReader(client.getInputStream()));
+      } catch (Exception e) {
+        LOG.error("in or out failed");
+        System.exit(-1);
+      }
+
+      try{
+        line = in.readLine();        
+        StringBuilder output = new StringBuilder();
+        fetch(line, output);
+        
+        client.getOutputStream().write(output.toString().getBytes(Charset.forName("UTF-8")));
+      }catch (Exception e) {
+        LOG.error("Read/Write failed: " + e);
+      }
+      
+      try {
+        client.close();
+      } catch (Exception e){
+        LOG.error(e.toString());
+      }
+      
+      return;
+    }
+  }
+    
+  
+  protected int fetch(String url, StringBuilder output) throws Exception {
     if (normalizers != null) {
       url = normalizers.normalize(url, URLNormalizers.SCOPE_DEFAULT);
     }
@@ -129,12 +219,12 @@ public class IndexingFiltersChecker exte
     
     int maxRedirects = 3;
 
-    ProtocolOutput output = getProtocolOutput(url, datum);
+    ProtocolOutput protocolOutput = getProtocolOutput(url, datum);
     Text turl = new Text(url);
     
     // Following redirects and not reached maxRedirects?
-    while (!output.getStatus().isSuccess() && followRedirects && output.getStatus().isRedirect() && maxRedirects != 0) {
-      String[] stuff = output.getStatus().getArgs();
+    while (!protocolOutput.getStatus().isSuccess() && followRedirects && protocolOutput.getStatus().isRedirect() && maxRedirects != 0) {
+      String[] stuff = protocolOutput.getStatus().getArgs();
       url = stuff[0];
       
       if (normalizers != null) {
@@ -144,24 +234,24 @@ public class IndexingFiltersChecker exte
       turl.set(url);
       
       // try again
-      output = getProtocolOutput(url, datum);
+      protocolOutput = getProtocolOutput(url, datum);
       maxRedirects--;
     }
 
-    if (!output.getStatus().isSuccess()) {
-      System.out.println("Fetch failed with protocol status: "
-          + output.getStatus());
+    if (!protocolOutput.getStatus().isSuccess()) {
+      output.append("Fetch failed with protocol status: "
+          + protocolOutput.getStatus() + "\n");
       return 0;
     }
 
-    Content content = output.getContent();
+    Content content = protocolOutput.getContent();
 
     if (content == null) {
-      System.out.println("No content for " + url);
+      output.append("No content for " + url + "\n");
       return 0;
     }
 
-    contentType = content.getContentType();
+    String contentType = content.getContentType();
 
     if (contentType == null) {
       return -1;
@@ -211,6 +301,7 @@ public class IndexingFiltersChecker exte
         .set(Nutch.SIGNATURE_KEY, StringUtil.toHexString(signature));
     String digest = parse.getData().getContentMeta().get(Nutch.SIGNATURE_KEY);
     doc.add("digest", digest);
+    datum.setSignature(signature);
 
     // call the scoring filters
     try {
@@ -226,7 +317,7 @@ public class IndexingFiltersChecker exte
     }
 
     if (doc == null) {
-      System.out.println("Document discarded by indexing filter");
+      output.append("Document discarded by indexing filter\n");
       return 0;
     }
 
@@ -236,7 +327,7 @@ public class IndexingFiltersChecker exte
         for (Object value : values) {
           String str = value.toString();
           int minText = dumpText ? str.length() : Math.min(100, str.length());
-          System.out.println(fname + " :\t" + str.substring(0, minText));
+          output.append(fname + " :\t" + str.substring(0, minText) + "\n");
         }
       }
     }
@@ -255,8 +346,8 @@ public class IndexingFiltersChecker exte
     ProtocolFactory factory = new ProtocolFactory(getConf());
     Protocol protocol = factory.getProtocol(url);
     Text turl = new Text(url);
-    ProtocolOutput output = protocol.getProtocolOutput(turl, datum);
-    return output;
+    ProtocolOutput protocolOutput = protocol.getProtocolOutput(turl, datum);
+    return protocolOutput;
   }
 
   public static void main(String[] args) throws Exception {