You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by th...@apache.org on 2016/07/16 19:49:01 UTC
[45/51] [partial] nutch git commit: NUTCH-2292 : Mavenize the build
for nutch-core and nutch-plugins
http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/hostdb/UpdateHostDb.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/hostdb/UpdateHostDb.java b/nutch-core/src/main/java/org/apache/nutch/hostdb/UpdateHostDb.java
new file mode 100644
index 0000000..3ba3c81
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/hostdb/UpdateHostDb.java
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.hostdb;
+
+import java.text.SimpleDateFormat;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.mapred.KeyValueTextInputFormat;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.lib.MultipleInputs;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.nutch.crawl.CrawlDatum;
+import org.apache.nutch.crawl.CrawlDb;
+import org.apache.nutch.crawl.NutchWritable;
+import org.apache.nutch.util.FSUtils;
+import org.apache.nutch.util.LockUtil;
+import org.apache.nutch.util.NutchConfiguration;
+import org.apache.nutch.util.NutchJob;
+import org.apache.nutch.util.TimingUtil;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Tool to create a HostDB from the CrawlDB. It aggregates fetch status values
+ * by host and checks DNS entries for hosts.
+ */
+public class UpdateHostDb extends Configured implements Tool {
+
+ public static final Logger LOG = LoggerFactory.getLogger(UpdateHostDb.class);
+ public static final String LOCK_NAME = ".locked";
+
+ public static final String HOSTDB_PURGE_FAILED_HOSTS_THRESHOLD = "hostdb.purge.failed.hosts.threshold";
+ public static final String HOSTDB_NUM_RESOLVER_THREADS = "hostdb.num.resolvers.threads";
+ public static final String HOSTDB_RECHECK_INTERVAL = "hostdb.recheck.interval";
+ public static final String HOSTDB_CHECK_FAILED = "hostdb.check.failed";
+ public static final String HOSTDB_CHECK_NEW = "hostdb.check.new";
+ public static final String HOSTDB_CHECK_KNOWN = "hostdb.check.known";
+ public static final String HOSTDB_FORCE_CHECK = "hostdb.force.check";
+ public static final String HOSTDB_URL_FILTERING = "hostdb.url.filter";
+ public static final String HOSTDB_URL_NORMALIZING = "hostdb.url.normalize";
+ public static final String HOSTDB_NUMERIC_FIELDS = "hostdb.numeric.fields";
+ public static final String HOSTDB_STRING_FIELDS = "hostdb.string.fields";
+ public static final String HOSTDB_PERCENTILES = "hostdb.percentiles";
+
+ private void updateHostDb(Path hostDb, Path crawlDb, Path topHosts,
+ boolean checkFailed, boolean checkNew, boolean checkKnown,
+ boolean force, boolean filter, boolean normalize) throws Exception {
+
+ SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ long start = System.currentTimeMillis();
+ LOG.info("UpdateHostDb: starting at " + sdf.format(start));
+
+ JobConf job = new NutchJob(getConf());
+ boolean preserveBackup = job.getBoolean("db.preserve.backup", true);
+ job.setJarByClass(UpdateHostDb.class);
+ job.setJobName("UpdateHostDb");
+
+ // Check whether the urlfilter-domainblacklist plugin is loaded
+ if (filter && new String("urlfilter-domainblacklist").matches(job.get("plugin.includes"))) {
+ throw new Exception("domainblacklist-urlfilter must not be enabled");
+ }
+
+ // Check whether the urlnormalizer-host plugin is loaded
+ if (normalize && new String("urlnormalizer-host").matches(job.get("plugin.includes"))) {
+ throw new Exception("urlnormalizer-host must not be enabled");
+ }
+
+ FileSystem fs = FileSystem.get(job);
+ Path old = new Path(hostDb, "old");
+ Path current = new Path(hostDb, "current");
+ Path tempHostDb = new Path(hostDb, "hostdb-"
+ + Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
+
+ // lock an existing hostdb to prevent multiple simultaneous updates
+ Path lock = new Path(hostDb, LOCK_NAME);
+ if (!fs.exists(current)) {
+ fs.mkdirs(current);
+ }
+ LockUtil.createLockFile(fs, lock, false);
+
+ MultipleInputs.addInputPath(job, current, SequenceFileInputFormat.class);
+
+ if (topHosts != null) {
+ MultipleInputs.addInputPath(job, topHosts, KeyValueTextInputFormat.class);
+ }
+ if (crawlDb != null) {
+ // Tell the job we read from CrawlDB
+ job.setBoolean("hostdb.reading.crawldb", true);
+ MultipleInputs.addInputPath(job, new Path(crawlDb,
+ CrawlDb.CURRENT_NAME), SequenceFileInputFormat.class);
+ }
+
+ FileOutputFormat.setOutputPath(job, tempHostDb);
+
+ job.setOutputFormat(SequenceFileOutputFormat.class);
+
+ job.setMapOutputKeyClass(Text.class);
+ job.setMapOutputValueClass(NutchWritable.class);
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(HostDatum.class);
+ job.setMapperClass(UpdateHostDbMapper.class);
+ job.setReducerClass(UpdateHostDbReducer.class);
+
+ job.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", false);
+ job.setSpeculativeExecution(false);
+ job.setBoolean(HOSTDB_CHECK_FAILED, checkFailed);
+ job.setBoolean(HOSTDB_CHECK_NEW, checkNew);
+ job.setBoolean(HOSTDB_CHECK_KNOWN, checkKnown);
+ job.setBoolean(HOSTDB_FORCE_CHECK, force);
+ job.setBoolean(HOSTDB_URL_FILTERING, filter);
+ job.setBoolean(HOSTDB_URL_NORMALIZING, normalize);
+ job.setClassLoader(Thread.currentThread().getContextClassLoader());
+
+ try {
+ JobClient.runJob(job);
+
+ FSUtils.replace(fs, old, current, true);
+ FSUtils.replace(fs, current, tempHostDb, true);
+
+ if (!preserveBackup && fs.exists(old)) fs.delete(old, true);
+ } catch (Exception e) {
+ if (fs.exists(tempHostDb)) {
+ fs.delete(tempHostDb, true);
+ }
+ LockUtil.removeLockFile(fs, lock);
+ throw e;
+ }
+
+ LockUtil.removeLockFile(fs, lock);
+ long end = System.currentTimeMillis();
+ LOG.info("UpdateHostDb: finished at " + sdf.format(end) +
+ ", elapsed: " + TimingUtil.elapsedTime(start, end));
+ }
+
+ public static void main(String args[]) throws Exception {
+ int res = ToolRunner.run(NutchConfiguration.create(), new UpdateHostDb(), args);
+ System.exit(res);
+ }
+
+ public int run(String[] args) throws Exception {
+ if (args.length < 2) {
+ System.err.println("Usage: UpdateHostDb -hostdb <hostdb> " +
+ "[-tophosts <tophosts>] [-crawldb <crawldb>] [-checkAll] [-checkFailed]" +
+ " [-checkNew] [-checkKnown] [-force] [-filter] [-normalize]");
+ return -1;
+ }
+
+ Path hostDb = null;
+ Path crawlDb = null;
+ Path topHosts = null;
+
+ boolean checkFailed = false;
+ boolean checkNew = false;
+ boolean checkKnown = false;
+ boolean force = false;
+
+ boolean filter = false;
+ boolean normalize = false;
+
+ for (int i = 0; i < args.length; i++) {
+ if (args[i].equals("-hostdb")) {
+ hostDb = new Path(args[i + 1]);
+ LOG.info("UpdateHostDb: hostdb: " + hostDb);
+ i++;
+ }
+ if (args[i].equals("-crawldb")) {
+ crawlDb = new Path(args[i + 1]);
+ LOG.info("UpdateHostDb: crawldb: " + crawlDb);
+ i++;
+ }
+ if (args[i].equals("-tophosts")) {
+ topHosts = new Path(args[i + 1]);
+ LOG.info("UpdateHostDb: tophosts: " + topHosts);
+ i++;
+ }
+
+ if (args[i].equals("-checkFailed")) {
+ LOG.info("UpdateHostDb: checking failed hosts");
+ checkFailed = true;
+ }
+ if (args[i].equals("-checkNew")) {
+ LOG.info("UpdateHostDb: checking new hosts");
+ checkNew = true;
+ }
+ if (args[i].equals("-checkKnown")) {
+ LOG.info("UpdateHostDb: checking known hosts");
+ checkKnown = true;
+ }
+ if (args[i].equals("-checkAll")) {
+ LOG.info("UpdateHostDb: checking all hosts");
+ checkFailed = true;
+ checkNew = true;
+ checkKnown = true;
+ }
+ if (args[i].equals("-force")) {
+ LOG.info("UpdateHostDb: forced check");
+ force = true;
+ }
+ if (args[i].equals("-filter")) {
+ LOG.info("UpdateHostDb: filtering enabled");
+ filter = true;
+ }
+ if (args[i].equals("-normalize")) {
+ LOG.info("UpdateHostDb: normalizing enabled");
+ normalize = true;
+ }
+ }
+
+ if (hostDb == null) {
+ System.err.println("hostDb is mandatory");
+ return -1;
+ }
+
+ try {
+ updateHostDb(hostDb, crawlDb, topHosts, checkFailed, checkNew,
+ checkKnown, force, filter, normalize);
+
+ return 0;
+ } catch (Exception e) {
+ LOG.error("UpdateHostDb: " + StringUtils.stringifyException(e));
+ return -1;
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/hostdb/UpdateHostDbMapper.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/hostdb/UpdateHostDbMapper.java b/nutch-core/src/main/java/org/apache/nutch/hostdb/UpdateHostDbMapper.java
new file mode 100644
index 0000000..5844b04
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/hostdb/UpdateHostDbMapper.java
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.hostdb;
+
+import java.io.IOException;
+
+
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+
+import org.apache.nutch.crawl.CrawlDatum;
+import org.apache.nutch.crawl.CrawlDb;
+import org.apache.nutch.crawl.NutchWritable;
+import org.apache.nutch.metadata.Nutch;
+import org.apache.nutch.net.URLFilters;
+import org.apache.nutch.net.URLNormalizers;
+import org.apache.nutch.protocol.ProtocolStatus;
+import org.apache.nutch.util.URLUtil;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Mapper ingesting HostDB and CrawlDB entries. Additionally it can also read
+ * host score info from a plain text key/value file generated by the
+ * Webgraph's NodeDumper tool.
+ */
+public class UpdateHostDbMapper
+ implements Mapper<Text, Writable, Text, NutchWritable> {
+
+ public static final Logger LOG = LoggerFactory.getLogger(UpdateHostDbMapper.class);
+ protected Text host = new Text();
+ protected HostDatum hostDatum = null;
+ protected CrawlDatum crawlDatum = null;
+ protected String reprUrl = null;
+ protected String buffer = null;
+ protected String[] args = null;
+ protected boolean filter = false;
+ protected boolean normalize = false;
+ protected boolean readingCrawlDb = false;
+ protected URLFilters filters = null;
+ protected URLNormalizers normalizers = null;
+
+ public void close() {}
+
+ /**
+ * @param JobConf
+ * @return void
+ */
+ public void configure(JobConf job) {
+ readingCrawlDb = job.getBoolean("hostdb.reading.crawldb", false);
+ filter = job.getBoolean(UpdateHostDb.HOSTDB_URL_FILTERING, false);
+ normalize = job.getBoolean(UpdateHostDb.HOSTDB_URL_NORMALIZING, false);
+
+ if (filter)
+ filters = new URLFilters(job);
+ if (normalize)
+ normalizers = new URLNormalizers(job, URLNormalizers.SCOPE_DEFAULT);
+ }
+
+ /**
+ * Filters and or normalizes the input URL
+ *
+ * @param String
+ * @return String
+ */
+ protected String filterNormalize(String url) {
+ // We actually receive a hostname here so let's make a URL
+ // TODO: we force shop.fcgroningen to be https, how do we know that here?
+ // http://issues.openindex.io/browse/SPIDER-40
+ url = "http://" + url + "/";
+
+ try {
+ if (normalize)
+ url = normalizers.normalize(url, URLNormalizers.SCOPE_DEFAULT);
+ if (filter)
+ url = filters.filter(url);
+ if (url == null)
+ return null;
+ } catch (Exception e) {
+ return null;
+ }
+
+ // Turn back to host
+ return URLUtil.getHost(url);
+ }
+
+ /**
+ * Mapper ingesting records from the HostDB, CrawlDB and plaintext host
+ * scores file. Statistics and scores are passed on.
+ *
+ * @param Text key
+ * @param Writable value
+ * @param OutputCollector<Text,NutchWritable> output
+ * @param Reporter reporter
+ * @return void
+ */
+ public void map(Text key, Writable value,
+ OutputCollector<Text,NutchWritable> output, Reporter reporter)
+ throws IOException {
+
+ // Get the key!
+ String keyStr = key.toString();
+
+ // Check if we process records from the CrawlDB
+ if (key instanceof Text && value instanceof CrawlDatum) {
+ // Get the normalized and filtered host of this URL
+ buffer = filterNormalize(URLUtil.getHost(keyStr));
+
+ // Filtered out?
+ if (buffer == null) {
+ reporter.incrCounter("UpdateHostDb", "filtered_records", 1);
+ LOG.info("UpdateHostDb: " + URLUtil.getHost(keyStr) + " crawldatum has been filtered");
+ return;
+ }
+
+ // Set the host of this URL
+ host.set(buffer);
+ crawlDatum = (CrawlDatum)value;
+ hostDatum = new HostDatum();
+
+ /**
+ * TODO: fix multi redirects: host_a => host_b/page => host_c/page/whatever
+ * http://www.ferienwohnung-armbruster.de/
+ * http://www.ferienwohnung-armbruster.de/website/
+ * http://www.ferienwohnung-armbruster.de/website/willkommen.php
+ *
+ * We cannot reresolve redirects for host objects as CrawlDatum metadata is
+ * not available. We also cannot reliably use the reducer in all cases
+ * since redirects may be across hosts or even domains. The example
+ * above has redirects that will end up in the same reducer. During that
+ * phase, however, we do not know which URL redirects to the next URL.
+ */
+ // Do not resolve homepages when the root URL is unfetched
+ if (crawlDatum.getStatus() != CrawlDatum.STATUS_DB_UNFETCHED) {
+ // Get the protocol
+ String protocol = URLUtil.getProtocol(keyStr);
+
+ // Get the proposed homepage URL
+ String homepage = protocol + "://" + buffer + "/";
+
+ // Check if the current key is equals the host
+ if (keyStr.equals(homepage)) {
+ // Check if this is a redirect to the real home page
+ if (crawlDatum.getStatus() == CrawlDatum.STATUS_DB_REDIR_PERM ||
+ crawlDatum.getStatus() == CrawlDatum.STATUS_DB_REDIR_TEMP) {
+
+ // Obtain the repr url for this redirect via protocolstatus from the metadata
+ ProtocolStatus z = (ProtocolStatus)crawlDatum.getMetaData().
+ get(Nutch.WRITABLE_PROTO_STATUS_KEY);
+
+ // Get the protocol status' arguments
+ args = z.getArgs();
+
+ // ..and the possible redirect URL
+ reprUrl = args[0];
+
+ // Am i a redirect?
+ if (reprUrl != null) {
+ LOG.info("UpdateHostDb: homepage: " + keyStr + " redirects to: " + args[0]);
+ output.collect(host, new NutchWritable(hostDatum));
+ hostDatum.setHomepageUrl(reprUrl);
+ } else {
+ LOG.info("UpdateHostDb: homepage: " + keyStr +
+ " redirects to: " + args[0] + " but has been filtered out");
+ }
+ } else {
+ hostDatum.setHomepageUrl(homepage);
+ output.collect(host, new NutchWritable(hostDatum));
+ LOG.info("UpdateHostDb: homepage: " + homepage);
+ }
+ }
+ }
+
+ // Always emit crawl datum
+ output.collect(host, new NutchWritable(crawlDatum));
+ }
+
+ // Check if we got a record from the hostdb
+ if (key instanceof Text && value instanceof HostDatum) {
+ buffer = filterNormalize(keyStr);
+
+ // Filtered out?
+ if (buffer == null) {
+ reporter.incrCounter("UpdateHostDb", "filtered_records", 1);
+ LOG.info("UpdateHostDb: " + key.toString() + " hostdatum has been filtered");
+ return;
+ }
+
+ // Get a HostDatum
+ hostDatum = (HostDatum)value;
+ key.set(buffer);
+
+ // If we're also reading CrawlDb entries, reset db_* statistics because
+ // we're aggregating them from CrawlDB anyway
+ if (readingCrawlDb) {
+ hostDatum.resetStatistics();
+ }
+
+ output.collect(key, new NutchWritable(hostDatum));
+ }
+
+ // Check if we got a record with host scores
+ if (key instanceof Text && value instanceof Text) {
+ buffer = filterNormalize(keyStr);
+
+ // Filtered out?
+ if (buffer == null) {
+ reporter.incrCounter("UpdateHostDb", "filtered_records", 1);
+ LOG.info("UpdateHostDb: " + key.toString() + " score has been filtered");
+ return;
+ }
+
+ key.set(buffer);
+
+ output.collect(key,
+ new NutchWritable(new FloatWritable(Float.parseFloat(value.toString()))));
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/hostdb/UpdateHostDbReducer.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/hostdb/UpdateHostDbReducer.java b/nutch-core/src/main/java/org/apache/nutch/hostdb/UpdateHostDbReducer.java
new file mode 100644
index 0000000..33dd18b
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/hostdb/UpdateHostDbReducer.java
@@ -0,0 +1,427 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.hostdb;
+
+import java.io.IOException;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.StringUtils;
+
+import org.apache.nutch.crawl.CrawlDatum;
+import org.apache.nutch.crawl.NutchWritable;
+
+import com.tdunning.math.stats.TDigest;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ *
+ */
+public class UpdateHostDbReducer
+ implements Reducer<Text, NutchWritable, Text, HostDatum> {
+
+ public static final Logger LOG = LoggerFactory.getLogger(UpdateHostDbReducer.class);
+ protected ResolverThread resolverThread = null;
+ protected Integer numResolverThreads = 10;
+ protected static Integer purgeFailedHostsThreshold = -1;
+ protected static Integer recheckInterval = 86400000;
+ protected static boolean checkFailed = false;
+ protected static boolean checkNew = false;
+ protected static boolean checkKnown = false;
+ protected static boolean force = false;
+ protected static long now = new Date().getTime();
+ protected static String[] numericFields;
+ protected static String[] stringFields;
+ protected static int[] percentiles;
+ protected static Text[] numericFieldWritables;
+ protected static Text[] stringFieldWritables;
+
+ protected BlockingQueue<Runnable> queue = new SynchronousQueue<Runnable>();
+ protected ThreadPoolExecutor executor = null;
+
+ /**
+ * Configures the thread pool and prestarts all resolver threads.
+ *
+ * @param JobConf
+ */
+ public void configure(JobConf job) {
+ purgeFailedHostsThreshold = job.getInt(UpdateHostDb.HOSTDB_PURGE_FAILED_HOSTS_THRESHOLD, -1);
+ numResolverThreads = job.getInt(UpdateHostDb.HOSTDB_NUM_RESOLVER_THREADS, 10);
+ recheckInterval = job.getInt(UpdateHostDb.HOSTDB_RECHECK_INTERVAL, 86400) * 1000;
+ checkFailed = job.getBoolean(UpdateHostDb.HOSTDB_CHECK_FAILED, false);
+ checkNew = job.getBoolean(UpdateHostDb.HOSTDB_CHECK_NEW, false);
+ checkKnown = job.getBoolean(UpdateHostDb.HOSTDB_CHECK_KNOWN, false);
+ force = job.getBoolean(UpdateHostDb.HOSTDB_FORCE_CHECK, false);
+ numericFields = job.getStrings(UpdateHostDb.HOSTDB_NUMERIC_FIELDS);
+ stringFields = job.getStrings(UpdateHostDb.HOSTDB_STRING_FIELDS);
+ percentiles = job.getInts(UpdateHostDb.HOSTDB_PERCENTILES);
+
+ // What fields do we need to collect metadata from
+ if (numericFields != null) {
+ numericFieldWritables = new Text[numericFields.length];
+ for (int i = 0; i < numericFields.length; i++) {
+ numericFieldWritables[i] = new Text(numericFields[i]);
+ }
+ }
+
+ if (stringFields != null) {
+ stringFieldWritables = new Text[stringFields.length];
+ for (int i = 0; i < stringFields.length; i++) {
+ stringFieldWritables[i] = new Text(stringFields[i]);
+ }
+ }
+
+ // Initialize the thread pool with our queue
+ executor = new ThreadPoolExecutor(numResolverThreads, numResolverThreads,
+ 5, TimeUnit.SECONDS, queue);
+
+ // Run all threads in the pool
+ executor.prestartAllCoreThreads();
+ }
+
+ /**
+ *
+ */
+ public void reduce(Text key, Iterator<NutchWritable> values,
+ OutputCollector<Text,HostDatum> output, Reporter reporter) throws IOException {
+
+ Map<String,Map<String,Integer>> stringCounts = new HashMap<String,Map<String, Integer>>();
+ Map<String,Float> maximums = new HashMap<String,Float>();
+ Map<String,Float> sums = new HashMap<String,Float>(); // used to calc averages
+ Map<String,Integer> counts = new HashMap<String,Integer>(); // used to calc averages
+ Map<String,Float> minimums = new HashMap<String,Float>();
+ Map<String,TDigest> tdigests = new HashMap<String,TDigest>();
+
+ HostDatum hostDatum = new HostDatum();
+ float score = 0;
+
+ if (stringFields != null) {
+ for (int i = 0; i < stringFields.length; i++) {
+ stringCounts.put(stringFields[i], new HashMap<String,Integer>());
+ }
+ }
+
+ // Loop through all values until we find a non-empty HostDatum or use
+ // an empty if this is a new host for the host db
+ while (values.hasNext()) {
+ Writable value = values.next().get();
+
+ // Count crawl datum status's and collect metadata from fields
+ if (value instanceof CrawlDatum) {
+ CrawlDatum buffer = (CrawlDatum)value;
+
+ // Set the correct status field
+ switch (buffer.getStatus()) {
+ case CrawlDatum.STATUS_DB_UNFETCHED:
+ hostDatum.setUnfetched(hostDatum.getUnfetched() + 1);
+ break;
+
+ case CrawlDatum.STATUS_DB_FETCHED:
+ hostDatum.setFetched(hostDatum.getFetched() + 1);
+ break;
+
+ case CrawlDatum.STATUS_DB_GONE:
+ hostDatum.setGone(hostDatum.getGone() + 1);
+ break;
+
+ case CrawlDatum.STATUS_DB_REDIR_TEMP:
+ hostDatum.setRedirTemp(hostDatum.getRedirTemp() + 1);
+ break;
+
+ case CrawlDatum.STATUS_DB_REDIR_PERM:
+ hostDatum.setRedirPerm(hostDatum.getRedirPerm() + 1);
+ break;
+
+ case CrawlDatum.STATUS_DB_NOTMODIFIED:
+ hostDatum.setNotModified(hostDatum.getNotModified() + 1);
+ break;
+ }
+
+ // Record connection failures
+ if (buffer.getRetriesSinceFetch() != 0) {
+ hostDatum.incConnectionFailures();
+ }
+
+ // Only gather metadata statistics for proper fetched pages
+ if (buffer.getStatus() == CrawlDatum.STATUS_DB_FETCHED || buffer.getStatus() == CrawlDatum.STATUS_DB_NOTMODIFIED) {
+ // Deal with the string fields
+ if (stringFields != null) {
+ for (int i = 0; i < stringFields.length; i++) {
+ // Does this field exist?
+ if (buffer.getMetaData().get(stringFieldWritables[i]) != null) {
+ // Get it!
+ String metadataValue = null;
+ try {
+ metadataValue = buffer.getMetaData().get(stringFieldWritables[i]).toString();
+ } catch (Exception e) {
+ LOG.error("Metadata field " + stringFields[i] + " is probably not a numeric value");
+ }
+
+ // Does the value exist?
+ if (stringCounts.get(stringFields[i]).containsKey(metadataValue)) {
+ // Yes, increment it
+ stringCounts.get(stringFields[i]).put(metadataValue, stringCounts.get(stringFields[i]).get(metadataValue) + 1);
+ } else {
+ // Create it!
+ stringCounts.get(stringFields[i]).put(metadataValue, 1);
+ }
+ }
+ }
+ }
+
+ // Deal with the numeric fields
+ if (numericFields != null) {
+ for (int i = 0; i < numericFields.length; i++) {
+ // Does this field exist?
+ if (buffer.getMetaData().get(numericFieldWritables[i]) != null) {
+ try {
+ // Get it!
+ Float metadataValue = Float.parseFloat(buffer.getMetaData().get(numericFieldWritables[i]).toString());
+
+ // Does the median value exist?
+ if (tdigests.containsKey(numericFields[i])) {
+ tdigests.get(numericFields[i]).add(metadataValue);
+ } else {
+ // Create it!
+ TDigest tdigest = TDigest.createDigest(100);
+ tdigest.add((double)metadataValue);
+ tdigests.put(numericFields[i], tdigest);
+ }
+
+ // Does the minimum value exist?
+ if (minimums.containsKey(numericFields[i])) {
+ // Write if this is lower than existing value
+ if (metadataValue < minimums.get(numericFields[i])) {
+ minimums.put(numericFields[i], metadataValue);
+ }
+ } else {
+ // Create it!
+ minimums.put(numericFields[i], metadataValue);
+ }
+
+ // Does the maximum value exist?
+ if (maximums.containsKey(numericFields[i])) {
+ // Write if this is lower than existing value
+ if (metadataValue > maximums.get(numericFields[i])) {
+ maximums.put(numericFields[i], metadataValue);
+ }
+ } else {
+ // Create it!
+ maximums.put(numericFields[i], metadataValue);
+ }
+
+ // Sum it up!
+ if (sums.containsKey(numericFields[i])) {
+ // Increment
+ sums.put(numericFields[i], sums.get(numericFields[i]) + metadataValue);
+ counts.put(numericFields[i], counts.get(numericFields[i]) + 1);
+ } else {
+ // Create it!
+ sums.put(numericFields[i], metadataValue);
+ counts.put(numericFields[i], 1);
+ }
+ } catch (Exception e) {
+ LOG.error(e.getMessage() + " when processing values for " + key.toString());
+ }
+ }
+ }
+ }
+ }
+ }
+
+ //
+ if (value instanceof HostDatum) {
+ HostDatum buffer = (HostDatum)value;
+
+ // Check homepage URL
+ if (buffer.hasHomepageUrl()) {
+ hostDatum.setHomepageUrl(buffer.getHomepageUrl());
+ }
+
+ // Check lastCheck timestamp
+ if (!buffer.isEmpty()) {
+ hostDatum.setLastCheck(buffer.getLastCheck());
+ }
+
+ // Check and set DNS failures
+ if (buffer.getDnsFailures() > 0) {
+ hostDatum.setDnsFailures(buffer.getDnsFailures());
+ }
+
+ // Check and set connection failures
+ if (buffer.getConnectionFailures() > 0) {
+ hostDatum.setConnectionFailures(buffer.getConnectionFailures());
+ }
+
+ // Check metadata
+ if (!buffer.getMetaData().isEmpty()) {
+ hostDatum.setMetaData(buffer.getMetaData());
+ }
+
+ // Check and set score (score from Web Graph has precedence)
+ if (buffer.getScore() > 0) {
+ hostDatum.setScore(buffer.getScore());
+ }
+ }
+
+ // Check for the score
+ if (value instanceof FloatWritable) {
+ FloatWritable buffer = (FloatWritable)value;
+ score = buffer.get();
+ }
+ }
+
+ // Check if score was set from Web Graph
+ if (score > 0) {
+ hostDatum.setScore(score);
+ }
+
+ // Set metadata
+ for (Map.Entry<String, Map<String,Integer>> entry : stringCounts.entrySet()) {
+ for (Map.Entry<String,Integer> subEntry : entry.getValue().entrySet()) {
+ hostDatum.getMetaData().put(new Text(entry.getKey() + "." + subEntry.getKey()), new IntWritable(subEntry.getValue()));
+ }
+ }
+ for (Map.Entry<String, Float> entry : maximums.entrySet()) {
+ hostDatum.getMetaData().put(new Text("max." + entry.getKey()), new FloatWritable(entry.getValue()));
+ }
+ for (Map.Entry<String, Float> entry : sums.entrySet()) {
+ hostDatum.getMetaData().put(new Text("avg." + entry.getKey()), new FloatWritable(entry.getValue() / counts.get(entry.getKey())));
+ }
+ for (Map.Entry<String, TDigest> entry : tdigests.entrySet()) {
+ // Emit all percentiles
+ for (int i = 0; i < percentiles.length; i++) {
+ hostDatum.getMetaData().put(new Text("pct" + Integer.toString(percentiles[i]) + "." + entry.getKey()), new FloatWritable((float)entry.getValue().quantile(0.5)));
+ }
+ }
+ for (Map.Entry<String, Float> entry : minimums.entrySet()) {
+ hostDatum.getMetaData().put(new Text("min." + entry.getKey()), new FloatWritable(entry.getValue()));
+ }
+
+ reporter.incrCounter("UpdateHostDb", "total_hosts", 1);
+
+ // See if this record is to be checked
+ if (shouldCheck(hostDatum)) {
+ // Make an entry
+ resolverThread = new ResolverThread(key.toString(), hostDatum, output, reporter, purgeFailedHostsThreshold);
+
+ // Add the entry to the queue (blocking)
+ try {
+ queue.put(resolverThread);
+ } catch (InterruptedException e) {
+ LOG.error("UpdateHostDb: " + StringUtils.stringifyException(e));
+ }
+
+ // Do not progress, the datum will be written in the resolver thread
+ return;
+ } else {
+ reporter.incrCounter("UpdateHostDb", "skipped_not_eligible", 1);
+ LOG.info("UpdateHostDb: " + key.toString() + ": skipped_not_eligible");
+ }
+
+ // Write the host datum if it wasn't written by the resolver thread
+ output.collect(key, hostDatum);
+ }
+
+ /**
+ * Determines whether a record should be checked.
+ *
+ * @param HostDatum
+ * @return boolean
+ */
+ protected boolean shouldCheck(HostDatum datum) {
+ // Whether a new record is to be checked
+ if (checkNew && datum.isEmpty()) {
+ return true;
+ }
+
+ // Whether existing known hosts should be rechecked
+ if (checkKnown && !datum.isEmpty() && datum.getDnsFailures() == 0) {
+ return isEligibleForCheck(datum);
+ }
+
+ // Whether failed records are forced to be rechecked
+ if (checkFailed && datum.getDnsFailures() > 0) {
+ return isEligibleForCheck(datum);
+ }
+
+ // It seems this record is not to be checked
+ return false;
+ }
+
+ /**
+ * Determines whether a record is eligible for recheck.
+ *
+ * @param HostDatum
+ * @return boolean
+ */
+ protected boolean isEligibleForCheck(HostDatum datum) {
+ // Whether an existing host, known or unknown, if forced to be rechecked
+ if (force || datum.getLastCheck().getTime() +
+ (recheckInterval * datum.getDnsFailures() + 1) > now) {
+ return true;
+ }
+
+ return false;
+ }
+
+ /**
+ * Shut down all running threads and wait for completion.
+ */
+ public void close() {
+ LOG.info("UpdateHostDb: feeder finished, waiting for shutdown");
+
+ // If we're here all keys have been fed and we can issue a shut down
+ executor.shutdown();
+
+ boolean finished = false;
+
+ // Wait until all resolvers have finished
+ while (!finished) {
+ try {
+ // Wait for the executor to shut down completely
+ if (!executor.isTerminated()) {
+ LOG.info("UpdateHostDb: resolver threads waiting: " + Integer.toString(executor.getPoolSize()));
+ Thread.sleep(1000);
+ } else {
+ // All is well, get out
+ finished = true;
+ }
+ } catch (InterruptedException e) {
+ // Huh?
+ LOG.warn(StringUtils.stringifyException(e));
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/indexer/CleaningJob.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/indexer/CleaningJob.java b/nutch-core/src/main/java/org/apache/nutch/indexer/CleaningJob.java
new file mode 100644
index 0000000..c16003a
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/indexer/CleaningJob.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.indexer;
+
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.Iterator;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.ByteWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.lib.NullOutputFormat;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.nutch.crawl.CrawlDatum;
+import org.apache.nutch.crawl.CrawlDb;
+import org.apache.nutch.util.NutchConfiguration;
+import org.apache.nutch.util.NutchJob;
+import org.apache.nutch.util.TimingUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The class scans CrawlDB looking for entries with status DB_GONE (404) or
+ * DB_DUPLICATE and sends delete requests to indexers for those documents.
+ */
+
+public class CleaningJob implements Tool {
+ public static final Logger LOG = LoggerFactory.getLogger(CleaningJob.class);
+ private Configuration conf;
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ }
+
+ public static class DBFilter implements
+ Mapper<Text, CrawlDatum, ByteWritable, Text> {
+ private ByteWritable OUT = new ByteWritable(CrawlDatum.STATUS_DB_GONE);
+
+ @Override
+ public void configure(JobConf arg0) {
+ }
+
+ @Override
+ public void close() throws IOException {
+ }
+
+ @Override
+ public void map(Text key, CrawlDatum value,
+ OutputCollector<ByteWritable, Text> output, Reporter reporter)
+ throws IOException {
+
+ if (value.getStatus() == CrawlDatum.STATUS_DB_GONE
+ || value.getStatus() == CrawlDatum.STATUS_DB_DUPLICATE) {
+ output.collect(OUT, key);
+ }
+ }
+ }
+
+ public static class DeleterReducer implements
+ Reducer<ByteWritable, Text, Text, ByteWritable> {
+ private static final int NUM_MAX_DELETE_REQUEST = 1000;
+ private int numDeletes = 0;
+ private int totalDeleted = 0;
+
+ private boolean noCommit = false;
+
+ IndexWriters writers = null;
+
+ @Override
+ public void configure(JobConf job) {
+ writers = new IndexWriters(job);
+ try {
+ writers.open(job, "Deletion");
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ noCommit = job.getBoolean("noCommit", false);
+ }
+
+ @Override
+ public void close() throws IOException {
+ // BUFFERING OF CALLS TO INDEXER SHOULD BE HANDLED AT INDEXER LEVEL
+ // if (numDeletes > 0) {
+ // LOG.info("CleaningJob: deleting " + numDeletes + " documents");
+ // // TODO updateRequest.process(solr);
+ // totalDeleted += numDeletes;
+ // }
+
+ writers.close();
+
+ if (totalDeleted > 0 && !noCommit) {
+ writers.commit();
+ }
+
+ LOG.info("CleaningJob: deleted a total of " + totalDeleted + " documents");
+ }
+
+ @Override
+ public void reduce(ByteWritable key, Iterator<Text> values,
+ OutputCollector<Text, ByteWritable> output, Reporter reporter)
+ throws IOException {
+ while (values.hasNext()) {
+ Text document = values.next();
+ writers.delete(document.toString());
+ totalDeleted++;
+ reporter.incrCounter("CleaningJobStatus", "Deleted documents", 1);
+ // if (numDeletes >= NUM_MAX_DELETE_REQUEST) {
+ // LOG.info("CleaningJob: deleting " + numDeletes
+ // + " documents");
+ // // TODO updateRequest.process(solr);
+ // // TODO updateRequest = new UpdateRequest();
+ // writers.delete(key.toString());
+ // totalDeleted += numDeletes;
+ // numDeletes = 0;
+ // }
+ }
+ }
+ }
+
+ public void delete(String crawldb, boolean noCommit) throws IOException {
+ SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ long start = System.currentTimeMillis();
+ LOG.info("CleaningJob: starting at " + sdf.format(start));
+
+ JobConf job = new NutchJob(getConf());
+
+ FileInputFormat.addInputPath(job, new Path(crawldb, CrawlDb.CURRENT_NAME));
+ job.setBoolean("noCommit", noCommit);
+ job.setInputFormat(SequenceFileInputFormat.class);
+ job.setOutputFormat(NullOutputFormat.class);
+ job.setMapOutputKeyClass(ByteWritable.class);
+ job.setMapOutputValueClass(Text.class);
+ job.setMapperClass(DBFilter.class);
+ job.setReducerClass(DeleterReducer.class);
+
+ job.setJobName("CleaningJob");
+
+ // need to expicitely allow deletions
+ job.setBoolean(IndexerMapReduce.INDEXER_DELETE, true);
+
+ JobClient.runJob(job);
+
+ long end = System.currentTimeMillis();
+ LOG.info("CleaningJob: finished at " + sdf.format(end) + ", elapsed: "
+ + TimingUtil.elapsedTime(start, end));
+ }
+
+ public int run(String[] args) throws IOException {
+ if (args.length < 1) {
+ String usage = "Usage: CleaningJob <crawldb> [-noCommit]";
+ LOG.error("Missing crawldb. " + usage);
+ System.err.println(usage);
+ IndexWriters writers = new IndexWriters(getConf());
+ System.err.println(writers.describe());
+ return 1;
+ }
+
+ boolean noCommit = false;
+ if (args.length == 2 && args[1].equals("-noCommit")) {
+ noCommit = true;
+ }
+
+ try {
+ delete(args[0], noCommit);
+ } catch (final Exception e) {
+ LOG.error("CleaningJob: " + StringUtils.stringifyException(e));
+ System.err.println("ERROR CleaningJob: "
+ + StringUtils.stringifyException(e));
+ return -1;
+ }
+ return 0;
+ }
+
+ public static void main(String[] args) throws Exception {
+ int result = ToolRunner.run(NutchConfiguration.create(), new CleaningJob(),
+ args);
+ System.exit(result);
+ }
+}
http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/indexer/IndexWriter.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/indexer/IndexWriter.java b/nutch-core/src/main/java/org/apache/nutch/indexer/IndexWriter.java
new file mode 100644
index 0000000..fbbf2e8
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/indexer/IndexWriter.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.indexer;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.nutch.indexer.NutchDocument;
+import org.apache.nutch.plugin.Pluggable;
+
+public interface IndexWriter extends Pluggable, Configurable {
+ /** The name of the extension point. */
+ final static String X_POINT_ID = IndexWriter.class.getName();
+
+ public void open(JobConf job, String name) throws IOException;
+
+ public void write(NutchDocument doc) throws IOException;
+
+ public void delete(String key) throws IOException;
+
+ public void update(NutchDocument doc) throws IOException;
+
+ public void commit() throws IOException;
+
+ public void close() throws IOException;
+
+ /**
+ * Returns a String describing the IndexWriter instance and the specific
+ * parameters it can take
+ */
+ public String describe();
+}
http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/indexer/IndexWriters.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/indexer/IndexWriters.java b/nutch-core/src/main/java/org/apache/nutch/indexer/IndexWriters.java
new file mode 100644
index 0000000..681812b
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/indexer/IndexWriters.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.indexer;
+
+import java.io.IOException;
+import java.util.HashMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.nutch.indexer.NutchDocument;
+import org.apache.nutch.plugin.Extension;
+import org.apache.nutch.plugin.ExtensionPoint;
+import org.apache.nutch.plugin.PluginRepository;
+import org.apache.nutch.plugin.PluginRuntimeException;
+import org.apache.nutch.util.ObjectCache;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Creates and caches {@link IndexWriter} implementing plugins. */
+public class IndexWriters {
+
+ public final static Logger LOG = LoggerFactory.getLogger(IndexWriters.class);
+
+ private IndexWriter[] indexWriters;
+
+ public IndexWriters(Configuration conf) {
+ ObjectCache objectCache = ObjectCache.get(conf);
+ synchronized (objectCache) {
+ this.indexWriters = (IndexWriter[]) objectCache
+ .getObject(IndexWriter.class.getName());
+ if (this.indexWriters == null) {
+ try {
+ ExtensionPoint point = PluginRepository.get(conf).getExtensionPoint(
+ IndexWriter.X_POINT_ID);
+ if (point == null)
+ throw new RuntimeException(IndexWriter.X_POINT_ID + " not found.");
+ Extension[] extensions = point.getExtensions();
+ HashMap<String, IndexWriter> indexerMap = new HashMap<String, IndexWriter>();
+ for (int i = 0; i < extensions.length; i++) {
+ Extension extension = extensions[i];
+ IndexWriter writer = (IndexWriter) extension.getExtensionInstance();
+ LOG.info("Adding " + writer.getClass().getName());
+ if (!indexerMap.containsKey(writer.getClass().getName())) {
+ indexerMap.put(writer.getClass().getName(), writer);
+ }
+ }
+ objectCache.setObject(IndexWriter.class.getName(), indexerMap
+ .values().toArray(new IndexWriter[0]));
+ } catch (PluginRuntimeException e) {
+ throw new RuntimeException(e);
+ }
+ this.indexWriters = (IndexWriter[]) objectCache
+ .getObject(IndexWriter.class.getName());
+ }
+ }
+ }
+
+ public void open(JobConf job, String name) throws IOException {
+ for (int i = 0; i < this.indexWriters.length; i++) {
+ try {
+ this.indexWriters[i].open(job, name);
+ } catch (IOException ioe) {
+ throw ioe;
+ }
+ }
+ }
+
+ public void write(NutchDocument doc) throws IOException {
+ for (int i = 0; i < this.indexWriters.length; i++) {
+ try {
+ this.indexWriters[i].write(doc);
+ } catch (IOException ioe) {
+ throw ioe;
+ }
+ }
+ }
+
+ public void update(NutchDocument doc) throws IOException {
+ for (int i = 0; i < this.indexWriters.length; i++) {
+ try {
+ this.indexWriters[i].update(doc);
+ } catch (IOException ioe) {
+ throw ioe;
+ }
+ }
+ }
+
+ public void delete(String key) throws IOException {
+ for (int i = 0; i < this.indexWriters.length; i++) {
+ try {
+ this.indexWriters[i].delete(key);
+ } catch (IOException ioe) {
+ throw ioe;
+ }
+ }
+ }
+
+ public void close() throws IOException {
+ for (int i = 0; i < this.indexWriters.length; i++) {
+ try {
+ this.indexWriters[i].close();
+ } catch (IOException ioe) {
+ throw ioe;
+ }
+ }
+ }
+
+ public void commit() throws IOException {
+ for (int i = 0; i < this.indexWriters.length; i++) {
+ try {
+ this.indexWriters[i].commit();
+ } catch (IOException ioe) {
+ throw ioe;
+ }
+ }
+ }
+
+ // lists the active IndexWriters and their configuration
+ public String describe() throws IOException {
+ StringBuffer buffer = new StringBuffer();
+ if (this.indexWriters.length == 0)
+ buffer.append("No IndexWriters activated - check your configuration\n");
+ else
+ buffer.append("Active IndexWriters :\n");
+ for (int i = 0; i < this.indexWriters.length; i++) {
+ buffer.append(this.indexWriters[i].describe()).append("\n");
+ }
+ return buffer.toString();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/indexer/IndexerMapReduce.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/indexer/IndexerMapReduce.java b/nutch-core/src/main/java/org/apache/nutch/indexer/IndexerMapReduce.java
new file mode 100644
index 0000000..5025525
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/indexer/IndexerMapReduce.java
@@ -0,0 +1,422 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.indexer;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Iterator;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.codec.binary.StringUtils;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.nutch.crawl.CrawlDatum;
+import org.apache.nutch.crawl.CrawlDb;
+import org.apache.nutch.crawl.Inlinks;
+import org.apache.nutch.crawl.LinkDb;
+import org.apache.nutch.crawl.NutchWritable;
+import org.apache.nutch.metadata.Metadata;
+import org.apache.nutch.metadata.Nutch;
+import org.apache.nutch.net.URLFilters;
+import org.apache.nutch.net.URLNormalizers;
+import org.apache.nutch.parse.Parse;
+import org.apache.nutch.parse.ParseData;
+import org.apache.nutch.parse.ParseImpl;
+import org.apache.nutch.parse.ParseText;
+import org.apache.nutch.protocol.Content;
+import org.apache.nutch.scoring.ScoringFilterException;
+import org.apache.nutch.scoring.ScoringFilters;
+
+public class IndexerMapReduce extends Configured implements
+ Mapper<Text, Writable, Text, NutchWritable>,
+ Reducer<Text, NutchWritable, Text, NutchIndexAction> {
+
+ public static final Logger LOG = LoggerFactory
+ .getLogger(IndexerMapReduce.class);
+
+ public static final String INDEXER_PARAMS = "indexer.additional.params";
+ public static final String INDEXER_DELETE = "indexer.delete";
+ public static final String INDEXER_DELETE_ROBOTS_NOINDEX = "indexer.delete.robots.noindex";
+ public static final String INDEXER_DELETE_SKIPPED = "indexer.delete.skipped.by.indexingfilter";
+ public static final String INDEXER_SKIP_NOTMODIFIED = "indexer.skip.notmodified";
+ public static final String URL_FILTERING = "indexer.url.filters";
+ public static final String URL_NORMALIZING = "indexer.url.normalizers";
+ public static final String INDEXER_BINARY_AS_BASE64 = "indexer.binary.base64";
+
+ private boolean skip = false;
+ private boolean delete = false;
+ private boolean deleteRobotsNoIndex = false;
+ private boolean deleteSkippedByIndexingFilter = false;
+ private boolean base64 = false;
+ private IndexingFilters filters;
+ private ScoringFilters scfilters;
+
+ // using normalizers and/or filters
+ private boolean normalize = false;
+ private boolean filter = false;
+
+ // url normalizers, filters and job configuration
+ private URLNormalizers urlNormalizers;
+ private URLFilters urlFilters;
+
+ /** Predefined action to delete documents from the index */
+ private static final NutchIndexAction DELETE_ACTION = new NutchIndexAction(
+ null, NutchIndexAction.DELETE);
+
+ public void configure(JobConf job) {
+ setConf(job);
+ this.filters = new IndexingFilters(getConf());
+ this.scfilters = new ScoringFilters(getConf());
+ this.delete = job.getBoolean(INDEXER_DELETE, false);
+ this.deleteRobotsNoIndex = job.getBoolean(INDEXER_DELETE_ROBOTS_NOINDEX,
+ false);
+ this.deleteSkippedByIndexingFilter = job.getBoolean(INDEXER_DELETE_SKIPPED,
+ false);
+ this.skip = job.getBoolean(INDEXER_SKIP_NOTMODIFIED, false);
+ this.base64 = job.getBoolean(INDEXER_BINARY_AS_BASE64, false);
+
+ normalize = job.getBoolean(URL_NORMALIZING, false);
+ filter = job.getBoolean(URL_FILTERING, false);
+
+ if (normalize) {
+ urlNormalizers = new URLNormalizers(getConf(),
+ URLNormalizers.SCOPE_INDEXER);
+ }
+
+ if (filter) {
+ urlFilters = new URLFilters(getConf());
+ }
+ }
+
+ /**
+ * Normalizes and trims extra whitespace from the given url.
+ *
+ * @param url
+ * The url to normalize.
+ *
+ * @return The normalized url.
+ */
+ private String normalizeUrl(String url) {
+ if (!normalize) {
+ return url;
+ }
+
+ String normalized = null;
+ if (urlNormalizers != null) {
+ try {
+
+ // normalize and trim the url
+ normalized = urlNormalizers
+ .normalize(url, URLNormalizers.SCOPE_INDEXER);
+ normalized = normalized.trim();
+ } catch (Exception e) {
+ LOG.warn("Skipping " + url + ":" + e);
+ normalized = null;
+ }
+ }
+
+ return normalized;
+ }
+
+ /**
+ * Filters the given url.
+ *
+ * @param url
+ * The url to filter.
+ *
+ * @return The filtered url or null.
+ */
+ private String filterUrl(String url) {
+ if (!filter) {
+ return url;
+ }
+
+ try {
+ url = urlFilters.filter(url);
+ } catch (Exception e) {
+ url = null;
+ }
+
+ return url;
+ }
+
+ public void map(Text key, Writable value,
+ OutputCollector<Text, NutchWritable> output, Reporter reporter)
+ throws IOException {
+
+ String urlString = filterUrl(normalizeUrl(key.toString()));
+ if (urlString == null) {
+ return;
+ } else {
+ key.set(urlString);
+ }
+
+ output.collect(key, new NutchWritable(value));
+ }
+
+ public void reduce(Text key, Iterator<NutchWritable> values,
+ OutputCollector<Text, NutchIndexAction> output, Reporter reporter)
+ throws IOException {
+ Inlinks inlinks = null;
+ CrawlDatum dbDatum = null;
+ CrawlDatum fetchDatum = null;
+ Content content = null;
+ ParseData parseData = null;
+ ParseText parseText = null;
+
+ while (values.hasNext()) {
+ final Writable value = values.next().get(); // unwrap
+ if (value instanceof Inlinks) {
+ inlinks = (Inlinks) value;
+ } else if (value instanceof CrawlDatum) {
+ final CrawlDatum datum = (CrawlDatum) value;
+ if (CrawlDatum.hasDbStatus(datum)) {
+ dbDatum = datum;
+ } else if (CrawlDatum.hasFetchStatus(datum)) {
+ // don't index unmodified (empty) pages
+ if (datum.getStatus() != CrawlDatum.STATUS_FETCH_NOTMODIFIED) {
+ fetchDatum = datum;
+ }
+ } else if (CrawlDatum.STATUS_LINKED == datum.getStatus()
+ || CrawlDatum.STATUS_SIGNATURE == datum.getStatus()
+ || CrawlDatum.STATUS_PARSE_META == datum.getStatus()) {
+ continue;
+ } else {
+ throw new RuntimeException("Unexpected status: " + datum.getStatus());
+ }
+ } else if (value instanceof ParseData) {
+ parseData = (ParseData) value;
+
+ // Handle robots meta? https://issues.apache.org/jira/browse/NUTCH-1434
+ if (deleteRobotsNoIndex) {
+ // Get the robots meta data
+ String robotsMeta = parseData.getMeta("robots");
+
+ // Has it a noindex for this url?
+ if (robotsMeta != null
+ && robotsMeta.toLowerCase().indexOf("noindex") != -1) {
+ // Delete it!
+ output.collect(key, DELETE_ACTION);
+ reporter.incrCounter("IndexerStatus", "deleted (robots=noindex)", 1);
+ return;
+ }
+ }
+ } else if (value instanceof ParseText) {
+ parseText = (ParseText) value;
+ } else if (value instanceof Content) {
+ content = (Content)value;
+ } else if (LOG.isWarnEnabled()) {
+ LOG.warn("Unrecognized type: " + value.getClass());
+ }
+ }
+
+ // Whether to delete GONE or REDIRECTS
+ if (delete && fetchDatum != null && dbDatum != null) {
+ if (fetchDatum.getStatus() == CrawlDatum.STATUS_FETCH_GONE
+ || dbDatum.getStatus() == CrawlDatum.STATUS_DB_GONE) {
+ reporter.incrCounter("IndexerStatus", "deleted (gone)", 1);
+ output.collect(key, DELETE_ACTION);
+ return;
+ }
+
+ if (fetchDatum.getStatus() == CrawlDatum.STATUS_FETCH_REDIR_PERM
+ || fetchDatum.getStatus() == CrawlDatum.STATUS_FETCH_REDIR_TEMP
+ || dbDatum.getStatus() == CrawlDatum.STATUS_DB_REDIR_PERM
+ || dbDatum.getStatus() == CrawlDatum.STATUS_DB_REDIR_TEMP) {
+ reporter.incrCounter("IndexerStatus", "deleted (redirects)", 1);
+ output.collect(key, DELETE_ACTION);
+ return;
+ }
+ }
+
+ if (fetchDatum == null || dbDatum == null || parseText == null
+ || parseData == null) {
+ return; // only have inlinks
+ }
+
+ // Whether to delete pages marked as duplicates
+ if (delete && dbDatum.getStatus() == CrawlDatum.STATUS_DB_DUPLICATE) {
+ reporter.incrCounter("IndexerStatus", "deleted (duplicates)", 1);
+ output.collect(key, DELETE_ACTION);
+ return;
+ }
+
+ // Whether to skip DB_NOTMODIFIED pages
+ if (skip && dbDatum.getStatus() == CrawlDatum.STATUS_DB_NOTMODIFIED) {
+ reporter.incrCounter("IndexerStatus", "skipped (not modified)", 1);
+ return;
+ }
+
+ if (!parseData.getStatus().isSuccess()
+ || fetchDatum.getStatus() != CrawlDatum.STATUS_FETCH_SUCCESS) {
+ return;
+ }
+
+ NutchDocument doc = new NutchDocument();
+ doc.add("id", key.toString());
+
+ final Metadata metadata = parseData.getContentMeta();
+
+ // add segment, used to map from merged index back to segment files
+ doc.add("segment", metadata.get(Nutch.SEGMENT_NAME_KEY));
+
+ // add digest, used by dedup
+ doc.add("digest", metadata.get(Nutch.SIGNATURE_KEY));
+
+ final Parse parse = new ParseImpl(parseText, parseData);
+ float boost = 1.0f;
+ // run scoring filters
+ try {
+ boost = this.scfilters.indexerScore(key, doc, dbDatum, fetchDatum, parse,
+ inlinks, boost);
+ } catch (final ScoringFilterException e) {
+ reporter.incrCounter("IndexerStatus", "errors (ScoringFilter)", 1);
+ if (LOG.isWarnEnabled()) {
+ LOG.warn("Error calculating score {}: {}", key, e);
+ }
+ return;
+ }
+ // apply boost to all indexed fields.
+ doc.setWeight(boost);
+ // store boost for use by explain and dedup
+ doc.add("boost", Float.toString(boost));
+
+ try {
+ // Indexing filters may also be interested in the signature
+ fetchDatum.setSignature(dbDatum.getSignature());
+
+ // extract information from dbDatum and pass it to
+ // fetchDatum so that indexing filters can use it
+ final Text url = (Text) dbDatum.getMetaData().get(
+ Nutch.WRITABLE_REPR_URL_KEY);
+ if (url != null) {
+ // Representation URL also needs normalization and filtering.
+ // If repr URL is excluded by filters we still accept this document
+ // but represented by its primary URL ("key") which has passed URL
+ // filters.
+ String urlString = filterUrl(normalizeUrl(url.toString()));
+ if (urlString != null) {
+ url.set(urlString);
+ fetchDatum.getMetaData().put(Nutch.WRITABLE_REPR_URL_KEY, url);
+ }
+ }
+ // run indexing filters
+ doc = this.filters.filter(doc, parse, key, fetchDatum, inlinks);
+ } catch (final IndexingException e) {
+ if (LOG.isWarnEnabled()) {
+ LOG.warn("Error indexing " + key + ": " + e);
+ }
+ reporter.incrCounter("IndexerStatus", "errors (IndexingFilter)", 1);
+ return;
+ }
+
+ // skip documents discarded by indexing filters
+ if (doc == null) {
+ // https://issues.apache.org/jira/browse/NUTCH-1449
+ if (deleteSkippedByIndexingFilter) {
+ NutchIndexAction action = new NutchIndexAction(null, NutchIndexAction.DELETE);
+ output.collect(key, action);
+ reporter.incrCounter("IndexerStatus", "deleted (IndexingFilter)", 1);
+ } else {
+ reporter.incrCounter("IndexerStatus", "skipped (IndexingFilter)", 1);
+ }
+ return;
+ }
+
+ if (content != null) {
+ // Add the original binary content
+ String binary;
+ if (base64) {
+ // optionally encode as base64
+ binary = Base64.encodeBase64String(content.getContent());
+ } else {
+ binary = new String(content.getContent());
+ }
+ doc.add("binaryContent", binary);
+ }
+
+ reporter.incrCounter("IndexerStatus", "indexed (add/update)", 1);
+
+ NutchIndexAction action = new NutchIndexAction(doc, NutchIndexAction.ADD);
+ output.collect(key, action);
+ }
+
+ public void close() throws IOException {
+ }
+
+ public static void initMRJob(Path crawlDb, Path linkDb,
+ Collection<Path> segments, JobConf job, boolean addBinaryContent) {
+
+ LOG.info("IndexerMapReduce: crawldb: {}", crawlDb);
+
+ if (linkDb != null)
+ LOG.info("IndexerMapReduce: linkdb: {}", linkDb);
+
+ for (final Path segment : segments) {
+ LOG.info("IndexerMapReduces: adding segment: {}", segment);
+ FileInputFormat.addInputPath(job, new Path(segment,
+ CrawlDatum.FETCH_DIR_NAME));
+ FileInputFormat.addInputPath(job, new Path(segment,
+ CrawlDatum.PARSE_DIR_NAME));
+ FileInputFormat.addInputPath(job, new Path(segment, ParseData.DIR_NAME));
+ FileInputFormat.addInputPath(job, new Path(segment, ParseText.DIR_NAME));
+
+ if (addBinaryContent) {
+ FileInputFormat.addInputPath(job, new Path(segment, Content.DIR_NAME));
+ }
+ }
+
+ FileInputFormat.addInputPath(job, new Path(crawlDb, CrawlDb.CURRENT_NAME));
+
+ if (linkDb != null) {
+ Path currentLinkDb = new Path(linkDb, LinkDb.CURRENT_NAME);
+ try {
+ if (FileSystem.get(job).exists(currentLinkDb)) {
+ FileInputFormat.addInputPath(job, currentLinkDb);
+ } else {
+ LOG.warn("Ignoring linkDb for indexing, no linkDb found in path: {}",
+ linkDb);
+ }
+ } catch (IOException e) {
+ LOG.warn("Failed to use linkDb ({}) for indexing: {}", linkDb,
+ org.apache.hadoop.util.StringUtils.stringifyException(e));
+ }
+ }
+
+ job.setInputFormat(SequenceFileInputFormat.class);
+
+ job.setMapperClass(IndexerMapReduce.class);
+ job.setReducerClass(IndexerMapReduce.class);
+
+ job.setOutputFormat(IndexerOutputFormat.class);
+ job.setOutputKeyClass(Text.class);
+ job.setMapOutputValueClass(NutchWritable.class);
+ job.setOutputValueClass(NutchWritable.class);
+ }
+}
http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/indexer/IndexerOutputFormat.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/indexer/IndexerOutputFormat.java b/nutch-core/src/main/java/org/apache/nutch/indexer/IndexerOutputFormat.java
new file mode 100644
index 0000000..baa9ce6
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/indexer/IndexerOutputFormat.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.indexer;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.Progressable;
+
+public class IndexerOutputFormat extends
+ FileOutputFormat<Text, NutchIndexAction> {
+
+ @Override
+ public RecordWriter<Text, NutchIndexAction> getRecordWriter(
+ FileSystem ignored, JobConf job, String name, Progressable progress)
+ throws IOException {
+
+ final IndexWriters writers = new IndexWriters(job);
+
+ writers.open(job, name);
+
+ return new RecordWriter<Text, NutchIndexAction>() {
+
+ public void close(Reporter reporter) throws IOException {
+ writers.close();
+ }
+
+ public void write(Text key, NutchIndexAction indexAction)
+ throws IOException {
+ if (indexAction.action == NutchIndexAction.ADD) {
+ writers.write(indexAction.doc);
+ } else if (indexAction.action == NutchIndexAction.DELETE) {
+ writers.delete(key.toString());
+ }
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/indexer/IndexingException.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/indexer/IndexingException.java b/nutch-core/src/main/java/org/apache/nutch/indexer/IndexingException.java
new file mode 100644
index 0000000..adfefeb
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/indexer/IndexingException.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.indexer;
+
+@SuppressWarnings("serial")
+public class IndexingException extends Exception {
+
+ public IndexingException() {
+ super();
+ }
+
+ public IndexingException(String message) {
+ super(message);
+ }
+
+ public IndexingException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public IndexingException(Throwable cause) {
+ super(cause);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/indexer/IndexingFilter.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/indexer/IndexingFilter.java b/nutch-core/src/main/java/org/apache/nutch/indexer/IndexingFilter.java
new file mode 100644
index 0000000..f22a0e5
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/indexer/IndexingFilter.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.indexer;
+
+// Hadoop imports
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.io.Text;
+
+// Nutch imports
+import org.apache.nutch.parse.Parse;
+import org.apache.nutch.crawl.CrawlDatum;
+import org.apache.nutch.crawl.Inlinks;
+import org.apache.nutch.plugin.Pluggable;
+
+/**
+ * Extension point for indexing. Permits one to add metadata to the indexed
+ * fields. All plugins found which implement this extension point are run
+ * sequentially on the parse.
+ */
+public interface IndexingFilter extends Pluggable, Configurable {
+ /** The name of the extension point. */
+ final static String X_POINT_ID = IndexingFilter.class.getName();
+
+ /**
+ * Adds fields or otherwise modifies the document that will be indexed for a
+ * parse. Unwanted documents can be removed from indexing by returning a null
+ * value.
+ *
+ * @param doc
+ * document instance for collecting fields
+ * @param parse
+ * parse data instance
+ * @param url
+ * page url
+ * @param datum
+ * crawl datum for the page (fetch datum from segment containing
+ * fetch status and fetch time)
+ * @param inlinks
+ * page inlinks
+ * @return modified (or a new) document instance, or null (meaning the
+ * document should be discarded)
+ * @throws IndexingException
+ */
+ NutchDocument filter(NutchDocument doc, Parse parse, Text url,
+ CrawlDatum datum, Inlinks inlinks) throws IndexingException;
+}
http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/indexer/IndexingFilters.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/indexer/IndexingFilters.java b/nutch-core/src/main/java/org/apache/nutch/indexer/IndexingFilters.java
new file mode 100644
index 0000000..334fcad
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/indexer/IndexingFilters.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.indexer;
+
+// Commons Logging imports
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.nutch.plugin.PluginRepository;
+import org.apache.nutch.parse.Parse;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.nutch.crawl.CrawlDatum;
+import org.apache.nutch.crawl.Inlinks;
+import org.apache.hadoop.io.Text;
+
+/** Creates and caches {@link IndexingFilter} implementing plugins. */
+public class IndexingFilters {
+
+ public static final String INDEXINGFILTER_ORDER = "indexingfilter.order";
+
+ public final static Logger LOG = LoggerFactory
+ .getLogger(IndexingFilters.class);
+
+ private IndexingFilter[] indexingFilters;
+
+ public IndexingFilters(Configuration conf) {
+ indexingFilters = (IndexingFilter[]) PluginRepository.get(conf)
+ .getOrderedPlugins(IndexingFilter.class, IndexingFilter.X_POINT_ID,
+ INDEXINGFILTER_ORDER);
+ }
+
+ /** Run all defined filters. */
+ public NutchDocument filter(NutchDocument doc, Parse parse, Text url,
+ CrawlDatum datum, Inlinks inlinks) throws IndexingException {
+ for (int i = 0; i < this.indexingFilters.length; i++) {
+ doc = this.indexingFilters[i].filter(doc, parse, url, datum, inlinks);
+ // break the loop if an indexing filter discards the doc
+ if (doc == null)
+ return null;
+ }
+
+ return doc;
+ }
+
+}