You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by ma...@apache.org on 2016/01/15 11:45:27 UTC
svn commit: r1724771 - in /nutch/trunk: CHANGES.txt
src/java/org/apache/nutch/indexer/IndexingFiltersChecker.java
Author: markus
Date: Fri Jan 15 10:45:27 2016
New Revision: 1724771
URL: http://svn.apache.org/viewvc?rev=1724771&view=rev
Log:
NUTCH-2194 Run IndexingFilterChecker as simple Telnet server
Modified:
nutch/trunk/CHANGES.txt
nutch/trunk/src/java/org/apache/nutch/indexer/IndexingFiltersChecker.java
Modified: nutch/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/nutch/trunk/CHANGES.txt?rev=1724771&r1=1724770&r2=1724771&view=diff
==============================================================================
--- nutch/trunk/CHANGES.txt (original)
+++ nutch/trunk/CHANGES.txt Fri Jan 15 10:45:27 2016
@@ -1,5 +1,7 @@
Nutch Change Log
+* NUTCH-2194 Run IndexingFilterChecker as simple Telnet server (markus)
+
* NUTCH-2196 IndexingFilterChecker to optionally normalize (markus)
* NUTCH-2195 IndexingFilterChecker to optionally follow N redirects (markus)
Modified: nutch/trunk/src/java/org/apache/nutch/indexer/IndexingFiltersChecker.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/indexer/IndexingFiltersChecker.java?rev=1724771&r1=1724770&r2=1724771&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/indexer/IndexingFiltersChecker.java (original)
+++ nutch/trunk/src/java/org/apache/nutch/indexer/IndexingFiltersChecker.java Fri Jan 15 10:45:27 2016
@@ -17,6 +17,13 @@
package org.apache.nutch.indexer;
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.io.PrintWriter;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.InetSocketAddress;
+import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@@ -59,6 +66,13 @@ import org.slf4j.LoggerFactory;
public class IndexingFiltersChecker extends Configured implements Tool {
+ protected URLNormalizers normalizers = null;
+ protected boolean dumpText = false;
+ protected boolean followRedirects = false;
+ // used to simulate the metadata propagated from injection
+ protected HashMap<String, String> metadata = new HashMap<String, String>();
+ protected int tcpPort = -1;
+
public static final Logger LOG = LoggerFactory
.getLogger(IndexingFiltersChecker.class);
@@ -67,25 +81,19 @@ public class IndexingFiltersChecker exte
}
public int run(String[] args) throws Exception {
- String contentType = null;
String url = null;
- URLNormalizers normalizers = null;
- boolean dumpText = false;
- boolean followRedirects = false;
-
- String usage = "Usage: IndexingFiltersChecker [-normalize] [-followRedirects] [-dumpText] [-md key=value] <url>";
+ String usage = "Usage: IndexingFiltersChecker [-normalize] [-followRedirects] [-dumpText] [-md key=value] [-listen <port>] <url>";
if (args.length == 0) {
System.err.println(usage);
return -1;
}
- // used to simulate the metadata propagated from injection
- HashMap<String, String> metadata = new HashMap<String, String>();
-
for (int i = 0; i < args.length; i++) {
if (args[i].equals("-normalize")) {
normalizers = new URLNormalizers(getConf(), URLNormalizers.SCOPE_DEFAULT);
+ } else if (args[i].equals("-listen")) {
+ tcpPort = Integer.parseInt(args[++i]);
} else if (args[i].equals("-followRedirects")) {
followRedirects = true;
} else if (args[i].equals("-dumpText")) {
@@ -108,6 +116,88 @@ public class IndexingFiltersChecker exte
}
}
+ // In listening mode?
+ if (tcpPort == -1) {
+ // No, just fetch and display
+ StringBuilder output = new StringBuilder();
+ int ret = fetch(url, output);
+ System.out.println(output);
+ return ret;
+ } else {
+ // Listen on socket and start workers on incoming requests
+ listen();
+ }
+
+ return 0;
+ }
+
+ protected void listen() throws Exception {
+ ServerSocket server = null;
+
+ try{
+ server = new ServerSocket();
+ server.bind(new InetSocketAddress(tcpPort));
+ LOG.info(server.toString());
+ } catch (Exception e) {
+ LOG.error("Could not listen on port " + tcpPort);
+ System.exit(-1);
+ }
+
+ while(true){
+ Worker worker;
+ try{
+ worker = new Worker(server.accept());
+ Thread thread = new Thread(worker);
+ thread.start();
+ } catch (Exception e) {
+ LOG.error("Accept failed: " + tcpPort);
+ System.exit(-1);
+ }
+ }
+ }
+
+ private class Worker implements Runnable {
+ private Socket client;
+
+ Worker(Socket client) {
+ this.client = client;
+ LOG.info(client.toString());
+ }
+
+ public void run(){
+ String line;
+ BufferedReader in = null;
+ PrintWriter out = null;
+
+ try{
+ in = new BufferedReader(new InputStreamReader(client.getInputStream()));
+ } catch (Exception e) {
+ LOG.error("in or out failed");
+ System.exit(-1);
+ }
+
+ try{
+ line = in.readLine();
+ StringBuilder output = new StringBuilder();
+ fetch(line, output);
+
+ client.getOutputStream().write(output.toString().getBytes(Charset.forName("UTF-8")));
+ }catch (Exception e) {
+ LOG.error("Read/Write failed: " + e);
+ }
+
+ try {
+ client.close();
+ } catch (Exception e){
+ LOG.error(e.toString());
+ }
+
+ return;
+ }
+ }
+
+
+ protected int fetch(String url, StringBuilder output) throws Exception {
if (normalizers != null) {
url = normalizers.normalize(url, URLNormalizers.SCOPE_DEFAULT);
}
@@ -129,12 +219,12 @@ public class IndexingFiltersChecker exte
int maxRedirects = 3;
- ProtocolOutput output = getProtocolOutput(url, datum);
+ ProtocolOutput protocolOutput = getProtocolOutput(url, datum);
Text turl = new Text(url);
// Following redirects and not reached maxRedirects?
- while (!output.getStatus().isSuccess() && followRedirects && output.getStatus().isRedirect() && maxRedirects != 0) {
- String[] stuff = output.getStatus().getArgs();
+ while (!protocolOutput.getStatus().isSuccess() && followRedirects && protocolOutput.getStatus().isRedirect() && maxRedirects != 0) {
+ String[] stuff = protocolOutput.getStatus().getArgs();
url = stuff[0];
if (normalizers != null) {
@@ -144,24 +234,24 @@ public class IndexingFiltersChecker exte
turl.set(url);
// try again
- output = getProtocolOutput(url, datum);
+ protocolOutput = getProtocolOutput(url, datum);
maxRedirects--;
}
- if (!output.getStatus().isSuccess()) {
- System.out.println("Fetch failed with protocol status: "
- + output.getStatus());
+ if (!protocolOutput.getStatus().isSuccess()) {
+ output.append("Fetch failed with protocol status: "
+ + protocolOutput.getStatus() + "\n");
return 0;
}
- Content content = output.getContent();
+ Content content = protocolOutput.getContent();
if (content == null) {
- System.out.println("No content for " + url);
+ output.append("No content for " + url + "\n");
return 0;
}
- contentType = content.getContentType();
+ String contentType = content.getContentType();
if (contentType == null) {
return -1;
@@ -211,6 +301,7 @@ public class IndexingFiltersChecker exte
.set(Nutch.SIGNATURE_KEY, StringUtil.toHexString(signature));
String digest = parse.getData().getContentMeta().get(Nutch.SIGNATURE_KEY);
doc.add("digest", digest);
+ datum.setSignature(signature);
// call the scoring filters
try {
@@ -226,7 +317,7 @@ public class IndexingFiltersChecker exte
}
if (doc == null) {
- System.out.println("Document discarded by indexing filter");
+ output.append("Document discarded by indexing filter\n");
return 0;
}
@@ -236,7 +327,7 @@ public class IndexingFiltersChecker exte
for (Object value : values) {
String str = value.toString();
int minText = dumpText ? str.length() : Math.min(100, str.length());
- System.out.println(fname + " :\t" + str.substring(0, minText));
+ output.append(fname + " :\t" + str.substring(0, minText) + "\n");
}
}
}
@@ -255,8 +346,8 @@ public class IndexingFiltersChecker exte
ProtocolFactory factory = new ProtocolFactory(getConf());
Protocol protocol = factory.getProtocol(url);
Text turl = new Text(url);
- ProtocolOutput output = protocol.getProtocolOutput(turl, datum);
- return output;
+ ProtocolOutput protocolOutput = protocol.getProtocolOutput(turl, datum);
+ return protocolOutput;
}
public static void main(String[] args) throws Exception {