You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by sn...@apache.org on 2017/12/17 13:15:52 UTC
[nutch] 01/03: fix for NUTCH-2477 (refactor checker classes)
contributed by Jurian Broertjes
This is an automated email from the ASF dual-hosted git repository.
snagel pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nutch.git
commit 9c684114337241bb7652619c11b7374b27204dcb
Author: Jurian Broertjes <ju...@openindex.io>
AuthorDate: Tue Dec 12 15:52:59 2017 +0000
fix for NUTCH-2477 (refactor checker classes) contributed by Jurian Broertjes
---
.../nutch/indexer/IndexingFiltersChecker.java | 125 ++-------------
.../org/apache/nutch/net/URLFilterChecker.java | 126 +++++----------
src/java/org/apache/nutch/net/URLFilters.java | 4 +
.../org/apache/nutch/net/URLNormalizerChecker.java | 102 +++++-------
.../org/apache/nutch/util/AbstractChecker.java | 171 +++++++++++++++++++++
5 files changed, 269 insertions(+), 259 deletions(-)
diff --git a/src/java/org/apache/nutch/indexer/IndexingFiltersChecker.java b/src/java/org/apache/nutch/indexer/IndexingFiltersChecker.java
index 05caf5a..5491638 100644
--- a/src/java/org/apache/nutch/indexer/IndexingFiltersChecker.java
+++ b/src/java/org/apache/nutch/indexer/IndexingFiltersChecker.java
@@ -17,23 +17,14 @@
package org.apache.nutch.indexer;
-import java.io.BufferedReader;
-import java.io.InputStreamReader;
-import java.io.PrintWriter;
import java.lang.invoke.MethodHandles;
-import java.net.ServerSocket;
-import java.net.Socket;
-import java.net.InetSocketAddress;
-import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.nutch.crawl.CrawlDatum;
import org.apache.nutch.crawl.Inlinks;
@@ -52,6 +43,7 @@ import org.apache.nutch.protocol.ProtocolOutput;
import org.apache.nutch.scoring.ScoringFilters;
import org.apache.nutch.util.NutchConfiguration;
import org.apache.nutch.util.StringUtil;
+import org.apache.nutch.util.AbstractChecker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -65,41 +57,33 @@ import org.slf4j.LoggerFactory;
* @author Julien Nioche
**/
-public class IndexingFiltersChecker extends Configured implements Tool {
+public class IndexingFiltersChecker extends AbstractChecker {
protected URLNormalizers normalizers = null;
protected boolean dumpText = false;
protected boolean followRedirects = false;
- protected boolean keepClientCnxOpen = false;
// used to simulate the metadata propagated from injection
protected HashMap<String, String> metadata = new HashMap<>();
- protected int tcpPort = -1;
private static final Logger LOG = LoggerFactory
.getLogger(MethodHandles.lookup().lookupClass());
- public IndexingFiltersChecker() {
-
- }
-
public int run(String[] args) throws Exception {
String url = null;
- String usage = "Usage: IndexingFiltersChecker [-normalize] [-followRedirects] [-dumpText] [-md key=value] [-listen <port>] [-keepClientCnxOpen]";
+ usage = "Usage: IndexingFiltersChecker [-normalize] [-followRedirects] [-dumpText] [-md key=value] (-stdin | -listen <port> [-keepClientCnxOpen])";
- if (args.length == 0) {
+ // Print help when no args given
+ if (args.length < 1) {
System.err.println(usage);
- return -1;
+ System.exit(-1);
}
+ int numConsumed;
for (int i = 0; i < args.length; i++) {
if (args[i].equals("-normalize")) {
normalizers = new URLNormalizers(getConf(), URLNormalizers.SCOPE_DEFAULT);
- } else if (args[i].equals("-listen")) {
- tcpPort = Integer.parseInt(args[++i]);
} else if (args[i].equals("-followRedirects")) {
followRedirects = true;
- } else if (args[i].equals("-keepClientCnxOpen")) {
- keepClientCnxOpen = true;
} else if (args[i].equals("-dumpText")) {
dumpText = true;
} else if (args[i].equals("-md")) {
@@ -112,104 +96,27 @@ public class IndexingFiltersChecker extends Configured implements Tool {
} else
k = nextOne;
metadata.put(k, v);
+ } else if ((numConsumed = super.parseArgs(args, i)) > 0) {
+ i += numConsumed - 1;
} else if (i != args.length - 1) {
+ System.err.println("ERR: Not a recognized argument: " + args[i]);
System.err.println(usage);
System.exit(-1);
} else {
- url =args[i];
+ url = args[i];
}
}
- // In listening mode?
- if (tcpPort == -1) {
- // No, just fetch and display
- StringBuilder output = new StringBuilder();
- int ret = fetch(url, output);
- System.out.println(output);
- return ret;
+ if (url != null) {
+ return super.processSingle(url);
} else {
- // Listen on socket and start workers on incoming requests
- listen();
- }
-
- return 0;
- }
-
- protected void listen() throws Exception {
- ServerSocket server = null;
-
- try{
- server = new ServerSocket();
- server.bind(new InetSocketAddress(tcpPort));
- LOG.info(server.toString());
- } catch (Exception e) {
- LOG.error("Could not listen on port " + tcpPort);
- System.exit(-1);
- }
-
- while(true){
- Worker worker;
- try{
- worker = new Worker(server.accept());
- Thread thread = new Thread(worker);
- thread.start();
- } catch (Exception e) {
- LOG.error("Accept failed: " + tcpPort);
- System.exit(-1);
- }
- }
- }
-
- private class Worker implements Runnable {
- private Socket client;
-
- Worker(Socket client) {
- this.client = client;
- LOG.info(client.toString());
- }
-
- public void run() {
- if (keepClientCnxOpen) {
- while (true) { // keep connection open until closes
- readWrite();
- }
- } else {
- readWrite();
-
- try { // close ourselves
- client.close();
- } catch (Exception e){
- LOG.error(e.toString());
- }
- }
- }
-
- protected void readWrite() {
- String line;
- BufferedReader in = null;
- PrintWriter out = null;
-
- try{
- in = new BufferedReader(new InputStreamReader(client.getInputStream()));
- } catch (Exception e) {
- LOG.error("in or out failed");
- System.exit(-1);
- }
-
- try{
- line = in.readLine();
- StringBuilder output = new StringBuilder();
- fetch(line, output);
-
- client.getOutputStream().write(output.toString().getBytes(Charset.forName("UTF-8")));
- }catch (Exception e) {
- LOG.error("Read/Write failed: " + e);
- }
+ // Start listening
+ return super.run();
}
}
- protected int fetch(String url, StringBuilder output) throws Exception {
+ protected int process(String url, StringBuilder output) throws Exception {
if (normalizers != null) {
url = normalizers.normalize(url, URLNormalizers.SCOPE_DEFAULT);
}
diff --git a/src/java/org/apache/nutch/net/URLFilterChecker.java b/src/java/org/apache/nutch/net/URLFilterChecker.java
index 6fb3cf2..429aa9f 100644
--- a/src/java/org/apache/nutch/net/URLFilterChecker.java
+++ b/src/java/org/apache/nutch/net/URLFilterChecker.java
@@ -21,8 +21,9 @@ import org.apache.nutch.plugin.Extension;
import org.apache.nutch.plugin.ExtensionPoint;
import org.apache.nutch.plugin.PluginRepository;
-import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.nutch.util.AbstractChecker;
import org.apache.nutch.util.NutchConfiguration;
import java.io.BufferedReader;
@@ -33,103 +34,60 @@ import java.io.InputStreamReader;
*
* @author John Xing
*/
-public class URLFilterChecker {
+public class URLFilterChecker extends AbstractChecker {
- private Configuration conf;
+ private URLFilters filters = null;
- public URLFilterChecker(Configuration conf) {
- this.conf = conf;
- }
-
- private void checkOne(String filterName) throws Exception {
- URLFilter filter = null;
-
- ExtensionPoint point = PluginRepository.get(conf).getExtensionPoint(
- URLFilter.X_POINT_ID);
-
- if (point == null)
- throw new RuntimeException(URLFilter.X_POINT_ID + " not found.");
-
- Extension[] extensions = point.getExtensions();
-
- for (int i = 0; i < extensions.length; i++) {
- Extension extension = extensions[i];
- filter = (URLFilter) extension.getExtensionInstance();
- if (filter.getClass().getName().equals(filterName)) {
- break;
- } else {
- filter = null;
- }
- }
-
- if (filter == null)
- throw new RuntimeException("Filter " + filterName + " not found.");
-
- // jerome : should we keep this behavior?
- // if (LogFormatter.hasLoggedSevere())
- // throw new RuntimeException("Severe error encountered.");
-
- System.out.println("Checking URLFilter " + filterName);
-
- BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
- String line;
- while ((line = in.readLine()) != null) {
- String out = filter.filter(line);
- if (out != null) {
- System.out.print("+");
- System.out.println(out);
- } else {
- System.out.print("-");
- System.out.println(line);
- }
- }
- }
+ public int run(String[] args) throws Exception {
+ usage = "Usage: URLFilterChecker [-filterName filterName] (-stdin | -listen <port> [-keepClientCnxOpen]) \n"
+ + "\n\tTool takes a list of URLs, one per line.\n";
- private void checkAll() throws Exception {
- System.out.println("Checking combination of all URLFilters available");
-
- BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
- String line;
- URLFilters filters = new URLFilters(this.conf);
-
- while ((line = in.readLine()) != null) {
- String out = filters.filter(line);
- if (out != null) {
- System.out.print("+");
- System.out.println(out);
- } else {
- System.out.print("-");
- System.out.println(line);
- }
- }
- }
-
- public static void main(String[] args) throws Exception {
-
- String usage = "Usage: URLFilterChecker (-filterName filterName | -allCombined) \n"
- + "Tool takes a list of URLs, one per line, passed via STDIN.\n";
-
- if (args.length == 0) {
+ // Print help when no args given
+ if (args.length < 1) {
System.err.println(usage);
System.exit(-1);
}
- String filterName = null;
- if (args[0].equals("-filterName")) {
- if (args.length != 2) {
+ int numConsumed;
+ for (int i = 0; i < args.length; i++) {
+ if (args[i].equals("-filterName")) {
+ getConf().set("plugin.includes", args[++i]);
+ } else if ((numConsumed = super.parseArgs(args, i)) > 0) {
+ i += numConsumed - 1;
+ } else {
+ System.err.println("ERR: Not a recognized argument: " + args[i]);
System.err.println(usage);
System.exit(-1);
}
- filterName = args[1];
}
- URLFilterChecker checker = new URLFilterChecker(NutchConfiguration.create());
- if (filterName != null) {
- checker.checkOne(filterName);
+ // Print active filter list
+ filters = new URLFilters(getConf());
+ System.out.print("Checking combination of these URLFilters: ");
+ for (URLFilter filter : filters.getFilters()) {
+ System.out.print(filter.getClass().getSimpleName() + " ");
+ }
+ System.out.println("");
+
+ // Start listening
+ return super.run();
+ }
+
+ protected int process(String line, StringBuilder output) throws Exception {
+ String out = filters.filter(line);
+ if (out != null) {
+ output.append("+");
+ output.append(out);
} else {
- checker.checkAll();
+ output.append("-");
+ output.append(line);
}
+ return 0;
+ }
- System.exit(0);
+ public static void main(String[] args) throws Exception {
+ final int res = ToolRunner.run(NutchConfiguration.create(),
+ new URLFilterChecker(), args);
+ System.exit(res);
}
}
diff --git a/src/java/org/apache/nutch/net/URLFilters.java b/src/java/org/apache/nutch/net/URLFilters.java
index 3deccca..4f5bf36 100644
--- a/src/java/org/apache/nutch/net/URLFilters.java
+++ b/src/java/org/apache/nutch/net/URLFilters.java
@@ -31,6 +31,10 @@ public class URLFilters {
URLFilter.class, URLFilter.X_POINT_ID, URLFILTER_ORDER);
}
+ public URLFilter[] getFilters() {
+ return this.filters;
+ }
+
/** Run all defined filters. Assume logical AND. */
public String filter(String urlString) throws URLFilterException {
for (int i = 0; i < this.filters.length; i++) {
diff --git a/src/java/org/apache/nutch/net/URLNormalizerChecker.java b/src/java/org/apache/nutch/net/URLNormalizerChecker.java
index d8f1c6e..a435cc8 100644
--- a/src/java/org/apache/nutch/net/URLNormalizerChecker.java
+++ b/src/java/org/apache/nutch/net/URLNormalizerChecker.java
@@ -21,8 +21,9 @@ import org.apache.nutch.plugin.Extension;
import org.apache.nutch.plugin.ExtensionPoint;
import org.apache.nutch.plugin.PluginRepository;
-import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.nutch.util.AbstractChecker;
import org.apache.nutch.util.NutchConfiguration;
import java.io.BufferedReader;
@@ -31,87 +32,56 @@ import java.io.InputStreamReader;
/**
* Checks one given normalizer or all normalizers.
*/
-public class URLNormalizerChecker {
+public class URLNormalizerChecker extends AbstractChecker {
- private Configuration conf;
+ private String scope = URLNormalizers.SCOPE_DEFAULT;
+ URLNormalizers normalizers;
- public URLNormalizerChecker(Configuration conf) {
- this.conf = conf;
- }
-
- private void checkOne(String normalizerName, String scope) throws Exception {
- URLNormalizer normalizer = null;
-
- ExtensionPoint point = PluginRepository.get(conf).getExtensionPoint(
- URLNormalizer.X_POINT_ID);
-
- if (point == null)
- throw new RuntimeException(URLNormalizer.X_POINT_ID + " not found.");
-
- Extension[] extensions = point.getExtensions();
-
- for (int i = 0; i < extensions.length; i++) {
- Extension extension = extensions[i];
- normalizer = (URLNormalizer) extension.getExtensionInstance();
- if (normalizer.getClass().getName().equals(normalizerName)) {
- break;
- } else {
- normalizer = null;
- }
- }
-
- if (normalizer == null)
- throw new RuntimeException("URLNormalizer " + normalizerName
- + " not found.");
-
- System.out.println("Checking URLNormalizer " + normalizerName);
-
- BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
- String line;
- while ((line = in.readLine()) != null) {
- String out = normalizer.normalize(line, scope);
- System.out.println(out);
- }
- }
-
- private void checkAll(String scope) throws Exception {
- System.out.println("Checking combination of all URLNormalizers available");
+ public int run(String[] args) throws Exception {
+ usage = "Usage: URLNormalizerChecker [-normalizer <normalizerName>] [-scope <scope>] (-stdin | -listen <port> [-keepClientCnxOpen])"
+ + "\n\tscope can be one of: default,partition,generate_host_count,fetcher,crawldb,linkdb,inject,outlink\n";
- BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
- String line;
- URLNormalizers normalizers = new URLNormalizers(conf, scope);
- while ((line = in.readLine()) != null) {
- String out = normalizers.normalize(line, scope);
- System.out.println(out);
+ // Print help when no args given
+ if (args.length < 1) {
+ System.err.println(usage);
+ System.exit(-1);
}
- }
-
- public static void main(String[] args) throws Exception {
- String usage = "Usage: URLNormalizerChecker [-normalizer <normalizerName>] [-scope <scope>]"
- + "\n\tscope can be one of: default,partition,generate_host_count,fetcher,crawldb,linkdb,inject,outlink";
-
- String normalizerName = null;
- String scope = URLNormalizers.SCOPE_DEFAULT;
+ int numConsumed;
for (int i = 0; i < args.length; i++) {
if (args[i].equals("-normalizer")) {
- normalizerName = args[++i];
+ getConf().set("plugin.includes", args[++i]);
} else if (args[i].equals("-scope")) {
scope = args[++i];
+ } else if ((numConsumed = super.parseArgs(args, i)) > 0) {
+ i += numConsumed - 1;
} else {
+ System.err.println("ERR: Not a recognized argument: " + args[i]);
System.err.println(usage);
System.exit(-1);
}
}
- URLNormalizerChecker checker = new URLNormalizerChecker(
- NutchConfiguration.create());
- if (normalizerName != null) {
- checker.checkOne(normalizerName, scope);
- } else {
- checker.checkAll(scope);
+ // Print active normalizer list
+ normalizers = new URLNormalizers(getConf(), scope);
+ System.out.print("Checking combination of these URLNormalizers: ");
+ for (URLNormalizer normalizer : normalizers.getURLNormalizers(scope)) {
+ System.out.print(normalizer.getClass().getSimpleName() + " ");
}
+ System.out.println("");
+
+ // Start listening
+ return super.run();
+ }
- System.exit(0);
+ protected int process(String line, StringBuilder output) throws Exception {
+ output.append(normalizers.normalize(line, scope));
+ return 0;
+ }
+
+ public static void main(String[] args) throws Exception {
+ final int res = ToolRunner.run(NutchConfiguration.create(),
+ new URLNormalizerChecker(), args);
+ System.exit(res);
}
}
diff --git a/src/java/org/apache/nutch/util/AbstractChecker.java b/src/java/org/apache/nutch/util/AbstractChecker.java
new file mode 100644
index 0000000..8424879
--- /dev/null
+++ b/src/java/org/apache/nutch/util/AbstractChecker.java
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.util;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.io.PrintWriter;
+import java.lang.invoke.MethodHandles;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.InetSocketAddress;
+import java.nio.charset.StandardCharsets;
+
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.util.Tool;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Scaffolding class for the various Checker implementations. Can process cmdline input, stdin and TCP connections.
+ *
+ * @author Jurian Broertjes
+ */
+public abstract class AbstractChecker extends Configured implements Tool {
+
+ private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ protected boolean keepClientCnxOpen = false;
+ protected int tcpPort = -1;
+ protected boolean stdin = true;
+ protected String usage;
+
+ // Actual function for the processing of a single input
+ protected abstract int process(String line, StringBuilder output) throws Exception;
+
+ protected int parseArgs(String[] args, int i) {
+ if (args[i].equals("-listen")) {
+ tcpPort = Integer.parseInt(args[++i]);
+ return 2;
+ } else if (args[i].equals("-keepClientCnxOpen")) {
+ keepClientCnxOpen = true;
+ return 1;
+ } else if (args[i].equals("-stdin")) {
+ stdin = true;
+ return 1;
+ }
+ return 0;
+ }
+
+ protected int run() throws Exception {
+ // In listening mode?
+ if (tcpPort != -1) {
+ processTCP(tcpPort);
+ return 0;
+ } else if (stdin) {
+ return processStdin();
+ }
+ // Nothing to do?
+ return -1;
+ }
+
+ // Process single input and return
+ protected int processSingle(String input) throws Exception {
+ StringBuilder output = new StringBuilder();
+ int ret = process(input, output);
+ System.out.println(output);
+ return ret;
+ }
+
+ // Read from stdin
+ protected int processStdin() throws Exception {
+ BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
+ String line;
+ while ((line = in.readLine()) != null) {
+ StringBuilder output = new StringBuilder();
+ int ret = process(line, output);
+ System.out.println(output);
+ }
+ return 0;
+ }
+
+ // Open TCP socket and process input
+ protected void processTCP(int tcpPort) throws Exception {
+ ServerSocket server = null;
+
+ try {
+ server = new ServerSocket();
+ server.bind(new InetSocketAddress(tcpPort));
+ LOG.info(server.toString());
+ } catch (Exception e) {
+ LOG.error("Could not listen on port " + tcpPort);
+ System.exit(-1);
+ }
+
+ while(true){
+ Worker worker;
+ try {
+ worker = new Worker(server.accept());
+ Thread thread = new Thread(worker);
+ thread.start();
+ } catch (Exception e) {
+ LOG.error("Accept failed: " + tcpPort);
+ System.exit(-1);
+ }
+ }
+ }
+
+ private class Worker implements Runnable {
+ private Socket client;
+
+ Worker(Socket client) {
+ this.client = client;
+ LOG.info(client.toString());
+ }
+
+ public void run() {
+ if (keepClientCnxOpen) {
+ while (true) { // keep connection open until closes
+ readWrite();
+ }
+ } else {
+ readWrite();
+
+ try { // close ourselves
+ client.close();
+ } catch (Exception e){
+ LOG.error(e.toString());
+ }
+ }
+ }
+
+ protected void readWrite() {
+ String line;
+ BufferedReader in = null;
+ PrintWriter out = null;
+
+ try {
+ in = new BufferedReader(new InputStreamReader(client.getInputStream()));
+ } catch (Exception e) {
+ LOG.error("in or out failed");
+ System.exit(-1);
+ }
+
+ try {
+ line = in.readLine();
+ StringBuilder output = new StringBuilder();
+ process(line, output);
+
+ client.getOutputStream().write(output.toString().getBytes(StandardCharsets.UTF_8));
+ } catch (Exception e) {
+ LOG.error("Read/Write failed: " + e);
+ }
+ }
+ }
+}
\ No newline at end of file
--
To stop receiving notification emails like this one, please contact
"commits@nutch.apache.org" <co...@nutch.apache.org>.