You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by ma...@apache.org on 2016/10/05 12:53:38 UTC
nutch git commit: NUTCH-2320 URLFilterChecker to run as TCP Telnet
service
Repository: nutch
Updated Branches:
refs/heads/master e53b34b23 -> 836b2e01d
NUTCH-2320 URLFilterChecker to run as TCP Telnet service
Project: http://git-wip-us.apache.org/repos/asf/nutch/repo
Commit: http://git-wip-us.apache.org/repos/asf/nutch/commit/836b2e01
Tree: http://git-wip-us.apache.org/repos/asf/nutch/tree/836b2e01
Diff: http://git-wip-us.apache.org/repos/asf/nutch/diff/836b2e01
Branch: refs/heads/master
Commit: 836b2e01d1a4e0e9443601da755ea37de91b8c7d
Parents: e53b34b
Author: Markus Jelsma <ma...@apache.org>
Authored: Wed Oct 5 14:53:05 2016 +0200
Committer: Markus Jelsma <ma...@apache.org>
Committed: Wed Oct 5 14:53:05 2016 +0200
----------------------------------------------------------------------
.../org/apache/nutch/net/URLFilterChecker.java | 181 +++++++++++++------
1 file changed, 122 insertions(+), 59 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nutch/blob/836b2e01/src/java/org/apache/nutch/net/URLFilterChecker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/nutch/net/URLFilterChecker.java b/src/java/org/apache/nutch/net/URLFilterChecker.java
index 89a3d00..86b91e2 100644
--- a/src/java/org/apache/nutch/net/URLFilterChecker.java
+++ b/src/java/org/apache/nutch/net/URLFilterChecker.java
@@ -17,16 +17,27 @@
package org.apache.nutch.net;
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.io.PrintWriter;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.InetSocketAddress;
+import java.nio.charset.Charset;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
import org.apache.nutch.plugin.Extension;
import org.apache.nutch.plugin.ExtensionPoint;
import org.apache.nutch.plugin.PluginRepository;
+import org.apache.nutch.util.NutchConfiguration;
import org.apache.hadoop.conf.Configuration;
-import org.apache.nutch.util.NutchConfiguration;
-
-import java.io.BufferedReader;
-import java.io.InputStreamReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Checks one given filter or all filters.
@@ -36,62 +47,118 @@ import java.io.InputStreamReader;
public class URLFilterChecker {
private Configuration conf;
-
+ private static String filterName = null;
+ protected static boolean keepClientCnxOpen = false;
+ protected static int tcpPort = -1;
+ protected URLFilters filters = null;
+
+ public static final Logger LOG = LoggerFactory
+ .getLogger(URLFilterChecker.class);
+
public URLFilterChecker(Configuration conf) {
+ System.out.println("Checking combination of all URLFilters available");
this.conf = conf;
+ if (filterName != null) {
+ this.conf.set("plugin.includes", filterName);
+ }
+ filters = new URLFilters(this.conf);
}
+
+ public void run() throws Exception {
+ // In listening mode?
+ if (tcpPort == -1) {
+ // No, just fetch and display
+ checkStdin();
+ } else {
+ // Listen on socket and start workers on incoming requests
+ listen();
+ }
+ }
+
+ private void listen() throws Exception {
+ ServerSocket server = null;
+
+ try{
+ server = new ServerSocket();
+ server.bind(new InetSocketAddress(tcpPort));
+ LOG.info(server.toString());
+ } catch (Exception e) {
+ LOG.error("Could not listen on port " + tcpPort);
+ System.exit(-1);
+ }
+
+ while(true){
+ Worker worker;
+ try{
+ worker = new Worker(server.accept());
+ Thread thread = new Thread(worker);
+ thread.start();
+ } catch (Exception e) {
+ LOG.error("Accept failed: " + tcpPort);
+ System.exit(-1);
+ }
+ }
+ }
+
+ private class Worker implements Runnable {
+ private Socket client;
- private void checkOne(String filterName) throws Exception {
- URLFilter filter = null;
-
- ExtensionPoint point = PluginRepository.get(conf).getExtensionPoint(
- URLFilter.X_POINT_ID);
-
- if (point == null)
- throw new RuntimeException(URLFilter.X_POINT_ID + " not found.");
-
- Extension[] extensions = point.getExtensions();
+ Worker(Socket client) {
+ this.client = client;
+ LOG.info(client.toString());
+ }
- for (int i = 0; i < extensions.length; i++) {
- Extension extension = extensions[i];
- filter = (URLFilter) extension.getExtensionInstance();
- if (filter.getClass().getName().equals(filterName)) {
- break;
+ public void run() {
+ if (keepClientCnxOpen) {
+ while (true) { // keep connection open until closes
+ readWrite();
+ }
} else {
- filter = null;
+ readWrite();
+
+ try { // close ourselves
+ client.close();
+ } catch (Exception e){
+ LOG.error(e.toString());
+ }
}
}
+
+ protected void readWrite() {
+ String line;
+ BufferedReader in = null;
+ PrintWriter out = null;
+
+ try{
+ in = new BufferedReader(new InputStreamReader(client.getInputStream()));
+ } catch (Exception e) {
+ LOG.error("in or out failed");
+ System.exit(-1);
+ }
- if (filter == null)
- throw new RuntimeException("Filter " + filterName + " not found.");
-
- // jerome : should we keep this behavior?
- // if (LogFormatter.hasLoggedSevere())
- // throw new RuntimeException("Severe error encountered.");
-
- System.out.println("Checking URLFilter " + filterName);
-
- BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
- String line;
- while ((line = in.readLine()) != null) {
- String out = filter.filter(line);
- if (out != null) {
- System.out.print("+");
- System.out.println(out);
- } else {
- System.out.print("-");
- System.out.println(line);
+ try{
+ line = in.readLine();
+
+ String result = filters.filter(line);
+ String output;
+ if (result != null) {
+ output = "+" + result + "\n";
+ } else {
+ output = "-" + line + "\n";;
+ }
+
+ client.getOutputStream().write(output.getBytes(Charset.forName("UTF-8")));
+ }catch (Exception e) {
+ LOG.error("Read/Write failed: " + e);
}
}
}
- private void checkAll() throws Exception {
- System.out.println("Checking combination of all URLFilters available");
-
+ private void checkStdin() throws Exception {
BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
String line;
+
while ((line = in.readLine()) != null) {
- URLFilters filters = new URLFilters(this.conf);
String out = filters.filter(line);
if (out != null) {
System.out.print("+");
@@ -105,30 +172,26 @@ public class URLFilterChecker {
public static void main(String[] args) throws Exception {
- String usage = "Usage: URLFilterChecker (-filterName filterName | -allCombined) \n"
+ String usage = "Usage: URLFilterChecker (-filterName filterName | -allCombined) [-listen <port>] [-keepClientCnxOpen]) \n"
+ "Tool takes a list of URLs, one per line, passed via STDIN.\n";
- if (args.length == 0) {
+ if (args.length < 1) {
System.err.println(usage);
System.exit(-1);
}
- String filterName = null;
- if (args[0].equals("-filterName")) {
- if (args.length != 2) {
- System.err.println(usage);
- System.exit(-1);
+ for (int i = 0; i < args.length; i++) {
+ if (args[i].equals("-filterName")) {
+ filterName = args[++i];
+ } else if (args[i].equals("-listen")) {
+ tcpPort = Integer.parseInt(args[++i]);
+ } else if (args[i].equals("-keepClientCnxOpen")) {
+ keepClientCnxOpen = true;
}
- filterName = args[1];
}
-
+
URLFilterChecker checker = new URLFilterChecker(NutchConfiguration.create());
- if (filterName != null) {
- checker.checkOne(filterName);
- } else {
- checker.checkAll();
- }
-
+ checker.run();
System.exit(0);
}
}