You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by sn...@apache.org on 2016/02/25 22:49:50 UTC
[3/5] nutch git commit: NUTCH-1712 applied to current trunk;
run first simple tests (inject + merge)
NUTCH-1712 applied to current trunk; run first simple tests (inject + merge)
Project: http://git-wip-us.apache.org/repos/asf/nutch/repo
Commit: http://git-wip-us.apache.org/repos/asf/nutch/commit/3c691eb2
Tree: http://git-wip-us.apache.org/repos/asf/nutch/tree/3c691eb2
Diff: http://git-wip-us.apache.org/repos/asf/nutch/diff/3c691eb2
Branch: refs/heads/master
Commit: 3c691eb2823cb85c9ffe95e9212ce7ac0e564709
Parents: 25e879a
Author: Sebastian Nagel <sn...@apache.org>
Authored: Mon Oct 19 21:48:05 2015 +0200
Committer: Sebastian Nagel <sn...@apache.org>
Committed: Thu Feb 25 21:26:30 2016 +0100
----------------------------------------------------------------------
src/java/org/apache/nutch/crawl/CrawlDb.java | 19 +
src/java/org/apache/nutch/crawl/Injector.java | 599 ++++++++++++---------
2 files changed, 360 insertions(+), 258 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nutch/blob/3c691eb2/src/java/org/apache/nutch/crawl/CrawlDb.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/nutch/crawl/CrawlDb.java b/src/java/org/apache/nutch/crawl/CrawlDb.java
index 053e8fb..1537cdc 100644
--- a/src/java/org/apache/nutch/crawl/CrawlDb.java
+++ b/src/java/org/apache/nutch/crawl/CrawlDb.java
@@ -28,8 +28,10 @@ import org.apache.hadoop.io.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.mapred.*;
+import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.*;
import org.apache.nutch.metadata.Nutch;
+import org.apache.nutch.util.FSUtils;
import org.apache.nutch.util.HadoopFSUtil;
import org.apache.nutch.util.LockUtil;
import org.apache.nutch.util.NutchConfiguration;
@@ -173,6 +175,23 @@ public class CrawlDb extends NutchTool implements Tool {
LockUtil.removeLockFile(fs, lock);
}
+ public static void install(Job job, Path crawlDb) throws IOException {
+ Configuration conf = job.getConfiguration();
+ boolean preserveBackup = conf.getBoolean("db.preserve.backup", true);
+ FileSystem fs = FileSystem.get(conf);
+ Path old = new Path(crawlDb, "old");
+ Path current = new Path(crawlDb, CURRENT_NAME);
+ Path tempCrawlDb = org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
+ .getOutputPath(job);
+ FSUtils.replace(fs, old, current, true);
+ FSUtils.replace(fs, current, tempCrawlDb, true);
+ Path lock = new Path(crawlDb, LOCK_NAME);
+ LockUtil.removeLockFile(fs, lock);
+ if (!preserveBackup && fs.exists(old)) {
+ fs.delete(old, true);
+ }
+ }
+
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(NutchConfiguration.create(), new CrawlDb(), args);
System.exit(res);
http://git-wip-us.apache.org/repos/asf/nutch/blob/3c691eb2/src/java/org/apache/nutch/crawl/Injector.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/nutch/crawl/Injector.java b/src/java/org/apache/nutch/crawl/Injector.java
index dc1f1cf..0d01dc8 100644
--- a/src/java/org/apache/nutch/crawl/Injector.java
+++ b/src/java/org/apache/nutch/crawl/Injector.java
@@ -17,211 +17,267 @@
package org.apache.nutch.crawl;
-import java.io.*;
-import java.text.SimpleDateFormat;
-import java.util.*;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.MapFileOutputFormat;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
-// Commons Logging imports
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.hadoop.io.*;
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.conf.*;
-import org.apache.hadoop.mapred.*;
-import org.apache.hadoop.util.*;
-import org.apache.nutch.net.*;
import org.apache.nutch.metadata.Nutch;
+import org.apache.nutch.net.URLFilters;
+import org.apache.nutch.net.URLNormalizers;
import org.apache.nutch.scoring.ScoringFilterException;
import org.apache.nutch.scoring.ScoringFilters;
+import org.apache.nutch.util.LockUtil;
import org.apache.nutch.util.NutchConfiguration;
-import org.apache.nutch.util.NutchJob;
import org.apache.nutch.util.NutchTool;
import org.apache.nutch.util.TimingUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+
/**
- * This class takes a flat file of URLs and adds them to the of pages to be
- * crawled. Useful for bootstrapping the system. The URL files contain one URL
- * per line, optionally followed by custom metadata separated by tabs with the
- * metadata key separated from the corresponding value by '='. <br>
- * Note that some metadata keys are reserved : <br>
- * - <i>nutch.score</i> : allows to set a custom score for a specific URL <br>
- * - <i>nutch.fetchInterval</i> : allows to set a custom fetch interval for a
- * specific URL <br>
- * - <i>nutch.fetchInterval.fixed</i> : allows to set a custom fetch interval
- * for a specific URL that is not changed by AdaptiveFetchSchedule <br>
- * e.g. http://www.nutch.org/ \t nutch.score=10 \t nutch.fetchInterval=2592000
- * \t userType=open_source
+ * Injector takes a flat file of URLs and merges ("injects") these URLs into the
+ * CrawlDb. Useful for bootstrapping a Nutch crawl. The URL files contain one
+ * URL per line, optionally followed by custom metadata separated by tabs with
+ * the metadata key separated from the corresponding value by '='.
+ * </p>
+ * <p>
+ * Note, that some metadata keys are reserved:
+ * <dl>
+ * <dt>nutch.score</dt>
+ * <dd>allows to set a custom score for a specific URL</dd>
+ * <dt>nutch.fetchInterval</dt>
+ * <dd>allows to set a custom fetch interval for a specific URL</dd>
+ * <dt>nutch.fetchInterval.fixed</dt>
+ * <dd>allows to set a custom fetch interval for a specific URL that is not
+ * changed by AdaptiveFetchSchedule</dd>
+ * </dl>
+ * </p>
+ * <p>
+ * Example:
+ *
+ * <pre>
+ * http://www.nutch.org/ \t nutch.score=10 \t nutch.fetchInterval=2592000 \t userType=open_source
+ * </pre>
+ * </p>
**/
public class Injector extends NutchTool implements Tool {
public static final Logger LOG = LoggerFactory.getLogger(Injector.class);
/** metadata key reserved for setting a custom score for a specific URL */
public static String nutchScoreMDName = "nutch.score";
+
/**
* metadata key reserved for setting a custom fetchInterval for a specific URL
*/
public static String nutchFetchIntervalMDName = "nutch.fetchInterval";
+
/**
* metadata key reserved for setting a fixed custom fetchInterval for a
* specific URL
*/
public static String nutchFixedFetchIntervalMDName = "nutch.fetchInterval.fixed";
- /** Normalize and filter injected urls. */
- public static class InjectMapper implements
- Mapper<WritableComparable<?>, Text, Text, CrawlDatum> {
+ public static class InjectMapper
+ extends Mapper<Text, Writable, Text, CrawlDatum> {
+ public static final String URL_NORMALIZING_SCOPE = "crawldb.url.normalizers.scope";
+ public static final String TAB_CHARACTER = "\t";
+ public static final String EQUAL_CHARACTER = "=";
+
private URLNormalizers urlNormalizers;
private int interval;
private float scoreInjected;
- private JobConf jobConf;
private URLFilters filters;
private ScoringFilters scfilters;
private long curTime;
-
- public void configure(JobConf job) {
- this.jobConf = job;
- urlNormalizers = new URLNormalizers(job, URLNormalizers.SCOPE_INJECT);
- interval = jobConf.getInt("db.fetch.interval.default", 2592000);
- filters = new URLFilters(jobConf);
- scfilters = new ScoringFilters(jobConf);
- scoreInjected = jobConf.getFloat("db.score.injected", 1.0f);
- curTime = job
- .getLong("injector.current.time", System.currentTimeMillis());
+ private boolean url404Purging;
+ private String scope;
+
+ public void setup(Context context) {
+ Configuration conf = context.getConfiguration();
+ scope = conf.get(URL_NORMALIZING_SCOPE, URLNormalizers.SCOPE_INJECT);
+ urlNormalizers = new URLNormalizers(conf, scope);
+ interval = conf.getInt("db.fetch.interval.default", 2592000);
+ filters = new URLFilters(conf);
+ scfilters = new ScoringFilters(conf);
+ scoreInjected = conf.getFloat("db.score.injected", 1.0f);
+ curTime = conf.getLong("injector.current.time",
+ System.currentTimeMillis());
+ url404Purging = conf.getBoolean(CrawlDb.CRAWLDB_PURGE_404, false);
}
- public void close() {
+ /* Filter and normalize the input url */
+ private String filterNormalize(String url) {
+ if (url != null) {
+ try {
+ url = urlNormalizers.normalize(url, scope); // normalize the url
+ url = filters.filter(url); // filter the url
+ } catch (Exception e) {
+ LOG.warn("Skipping " + url + ":" + e);
+ url = null;
+ }
+ }
+ return url;
}
- public void map(WritableComparable<?> key, Text value,
- OutputCollector<Text, CrawlDatum> output, Reporter reporter)
- throws IOException {
- String url = value.toString().trim(); // value is line of text
+ /**
+ * Extract metadata that could be passed along with url in a seeds file.
+ * Metadata must be key-value pair(s) and separated by a TAB_CHARACTER
+ */
+ private void processMetaData(String metadata, CrawlDatum datum,
+ String url) {
+ String[] splits = metadata.split(TAB_CHARACTER);
- if (url != null && (url.length() == 0 || url.startsWith("#"))) {
- /* Ignore line that start with # */
- return;
- }
+ for (String split : splits) {
+ // find separation between name and value
+ int indexEquals = split.indexOf(EQUAL_CHARACTER);
+ if (indexEquals == -1) // skip anything without a EQUAL_CHARACTER
+ continue;
- // if tabs : metadata that could be stored
- // must be name=value and separated by \t
- float customScore = -1f;
- int customInterval = interval;
- int fixedInterval = -1;
- Map<String, String> metadata = new TreeMap<String, String>();
- if (url.indexOf("\t") != -1) {
- String[] splits = url.split("\t");
- url = splits[0];
- for (int s = 1; s < splits.length; s++) {
- // find separation between name and value
- int indexEquals = splits[s].indexOf("=");
- if (indexEquals == -1) {
- // skip anything without a =
- continue;
- }
- String metaname = splits[s].substring(0, indexEquals);
- String metavalue = splits[s].substring(indexEquals + 1);
+ String metaname = split.substring(0, indexEquals);
+ String metavalue = split.substring(indexEquals + 1);
+
+ try {
if (metaname.equals(nutchScoreMDName)) {
- try {
- customScore = Float.parseFloat(metavalue);
- } catch (NumberFormatException nfe) {
- }
+ datum.setScore(Float.parseFloat(metavalue));
} else if (metaname.equals(nutchFetchIntervalMDName)) {
- try {
- customInterval = Integer.parseInt(metavalue);
- } catch (NumberFormatException nfe) {
- }
+ datum.setFetchInterval(Integer.parseInt(metavalue));
} else if (metaname.equals(nutchFixedFetchIntervalMDName)) {
- try {
- fixedInterval = Integer.parseInt(metavalue);
- } catch (NumberFormatException nfe) {
+ int fixedInterval = Integer.parseInt(metavalue);
+ if (fixedInterval > -1) {
+ // Set writable using float. Float is used by
+ // AdaptiveFetchSchedule
+ datum.getMetaData().put(Nutch.WRITABLE_FIXED_INTERVAL_KEY,
+ new FloatWritable(fixedInterval));
+ datum.setFetchInterval(fixedInterval);
}
- } else
- metadata.put(metaname, metavalue);
- }
- }
- try {
- url = urlNormalizers.normalize(url, URLNormalizers.SCOPE_INJECT);
- url = filters.filter(url); // filter the url
- } catch (Exception e) {
- if (LOG.isWarnEnabled()) {
- LOG.warn("Skipping " + url + ":" + e);
+ } else {
+ datum.getMetaData().put(new Text(metaname), new Text(metavalue));
+ }
+ } catch (NumberFormatException nfe) {
+ LOG.error("Invalid number '" + metavalue + "' in metadata '"
+ + metaname + "' for url " + url);
}
- url = null;
}
- if (url == null) {
- reporter.getCounter("injector", "urls_filtered").increment(1);
- } else { // if it passes
- value.set(url); // collect it
- CrawlDatum datum = new CrawlDatum();
- datum.setStatus(CrawlDatum.STATUS_INJECTED);
-
- // Is interval custom? Then set as meta data
- if (fixedInterval > -1) {
- // Set writable using float. Flaot is used by
- // AdaptiveFetchSchedule
- datum.getMetaData().put(Nutch.WRITABLE_FIXED_INTERVAL_KEY,
- new FloatWritable(fixedInterval));
- datum.setFetchInterval(fixedInterval);
- } else {
- datum.setFetchInterval(customInterval);
- }
+ }
- datum.setFetchTime(curTime);
- // now add the metadata
- Iterator<String> keysIter = metadata.keySet().iterator();
- while (keysIter.hasNext()) {
- String keymd = keysIter.next();
- String valuemd = metadata.get(keymd);
- datum.getMetaData().put(new Text(keymd), new Text(valuemd));
- }
- if (customScore != -1)
- datum.setScore(customScore);
- else
+ public void map(Text key, Writable value, Context context)
+ throws IOException, InterruptedException {
+ if (value instanceof Text) {
+ // if its a url from the seed list
+ String url = key.toString().trim();
+
+ // remove empty string or string starting with '#'
+ if (url.length() == 0 || url.startsWith("#"))
+ return;
+
+ url = filterNormalize(url);
+ if (url == null) {
+ context.getCounter("injector", "urls_filtered").increment(1);
+ } else {
+ CrawlDatum datum = new CrawlDatum();
+ datum.setStatus(CrawlDatum.STATUS_INJECTED);
+ datum.setFetchTime(curTime);
datum.setScore(scoreInjected);
- try {
- scfilters.injectedScore(value, datum);
- } catch (ScoringFilterException e) {
- if (LOG.isWarnEnabled()) {
- LOG.warn("Cannot filter injected score for url " + url
- + ", using default (" + e.getMessage() + ")");
+ datum.setFetchInterval(interval);
+
+ String metadata = value.toString().trim();
+ if (metadata.length() > 0)
+ processMetaData(metadata, datum, url);
+
+ try {
+ key.set(url);
+ scfilters.injectedScore(key, datum);
+ } catch (ScoringFilterException e) {
+ if (LOG.isWarnEnabled()) {
+ LOG.warn("Cannot filter injected score for url " + url
+ + ", using default (" + e.getMessage() + ")");
+ }
}
+ context.getCounter("injector", "urls_injected").increment(1);
+ context.write(key, datum);
+ }
+ } else if (value instanceof CrawlDatum) {
+ // if its a crawlDatum from the input crawldb, emulate CrawlDbFilter's
+ // map()
+ CrawlDatum datum = (CrawlDatum) value;
+
+ // remove 404 urls
+ if (url404Purging && CrawlDatum.STATUS_DB_GONE == datum.getStatus())
+ return;
+
+ String url = filterNormalize(key.toString());
+ if (url != null) {
+ key.set(url);
+ context.write(key, datum);
}
- reporter.getCounter("injector", "urls_injected").increment(1);
- output.collect(value, datum);
}
}
}
/** Combine multiple new entries for a url. */
- public static class InjectReducer implements
- Reducer<Text, CrawlDatum, Text, CrawlDatum> {
+ public static class InjectReducer
+ extends Reducer<Text, CrawlDatum, Text, CrawlDatum> {
private int interval;
private float scoreInjected;
private boolean overwrite = false;
private boolean update = false;
+ private CrawlDatum old = new CrawlDatum();
+ private CrawlDatum injected = new CrawlDatum();
- public void configure(JobConf job) {
- interval = job.getInt("db.fetch.interval.default", 2592000);
- scoreInjected = job.getFloat("db.score.injected", 1.0f);
- overwrite = job.getBoolean("db.injector.overwrite", false);
- update = job.getBoolean("db.injector.update", false);
+ public void setup(Context context) {
+ Configuration conf = context.getConfiguration();
+ interval = conf.getInt("db.fetch.interval.default", 2592000);
+ scoreInjected = conf.getFloat("db.score.injected", 1.0f);
+ overwrite = conf.getBoolean("db.injector.overwrite", false);
+ update = conf.getBoolean("db.injector.update", false);
LOG.info("Injector: overwrite: " + overwrite);
LOG.info("Injector: update: " + update);
}
- public void close() {
- }
-
- private CrawlDatum old = new CrawlDatum();
- private CrawlDatum injected = new CrawlDatum();
+ /**
+ * Merge the input records as per rules below :
+ *
+ * <pre>
+ * 1. If there is ONLY new injected record ==> emit injected record
+ * 2. If there is ONLY old record ==> emit existing record
+ * 3. If BOTH new and old records are present:
+ * (a) If 'overwrite' is true ==> emit injected record
+ * (b) If 'overwrite' is false :
+ * (i) If 'update' is false ==> emit existing record
+ * (ii) If 'update' is true ==> update existing record and emit it
+ * </pre>
+ *
+ * For more details @see NUTCH-1405
+ */
+ public void reduce(Text key, Iterable<CrawlDatum> values, Context context)
+ throws IOException, InterruptedException {
- public void reduce(Text key, Iterator<CrawlDatum> values,
- OutputCollector<Text, CrawlDatum> output, Reporter reporter)
- throws IOException {
boolean oldSet = false;
boolean injectedSet = false;
- while (values.hasNext()) {
- CrawlDatum val = values.next();
+
+ // If we encounter a datum with status as STATUS_INJECTED, then its a
+ // newly injected record. All other statuses correspond to an old record.
+ for (CrawlDatum val : values) {
if (val.getStatus() == CrawlDatum.STATUS_INJECTED) {
injected.set(val);
injected.setStatus(CrawlDatum.STATUS_DB_UNFETCHED);
@@ -230,41 +286,29 @@ public class Injector extends NutchTool implements Tool {
old.set(val);
oldSet = true;
}
-
}
- CrawlDatum res = null;
-
- // Old default behaviour
- if (injectedSet && !oldSet) {
- res = injected;
+ CrawlDatum result;
+ if (injectedSet && (!oldSet || overwrite)) {
+ // corresponds to rules (1) and (3.a) in the method description
+ result = injected;
} else {
- res = old;
+ // corresponds to rules (2) and (3.b) in the method description
+ result = old;
+
+ if (injectedSet && update) {
+ // corresponds to rule (3.b.ii) in the method description
+ old.putAllMetaData(injected);
+ old.setScore(injected.getScore() != scoreInjected
+ ? injected.getScore() : old.getScore());
+ old.setFetchInterval(injected.getFetchInterval() != interval
+ ? injected.getFetchInterval() : old.getFetchInterval());
+ }
}
if (injectedSet && oldSet) {
- reporter.getCounter("injector", "urls_merged").increment(1);
- }
- /**
- * Whether to overwrite, ignore or update existing records
- *
- * @see https://issues.apache.org/jira/browse/NUTCH-1405
- */
- // Injected record already exists and update but not overwrite
- if (injectedSet && oldSet && update && !overwrite) {
- res = old;
- old.putAllMetaData(injected);
- old.setScore(injected.getScore() != scoreInjected ? injected.getScore()
- : old.getScore());
- old.setFetchInterval(injected.getFetchInterval() != interval ? injected
- .getFetchInterval() : old.getFetchInterval());
+ context.getCounter("injector", "urls_merged").increment(1);
}
-
- // Injected record already exists and overwrite
- if (injectedSet && oldSet && overwrite) {
- res = injected;
- }
-
- output.collect(key, res);
+ context.write(key, result);
}
}
@@ -275,94 +319,121 @@ public class Injector extends NutchTool implements Tool {
setConf(conf);
}
- public void inject(Path crawlDb, Path urlDir) throws IOException {
+ public void inject(Path crawlDb, Path urlDir) throws Exception {
+ inject(crawlDb, urlDir, false, false);
+ }
+
+ public void inject(Path crawlDb, Path urlDir, boolean overwrite,
+ boolean update) throws Exception {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
long start = System.currentTimeMillis();
+
if (LOG.isInfoEnabled()) {
LOG.info("Injector: starting at " + sdf.format(start));
LOG.info("Injector: crawlDb: " + crawlDb);
LOG.info("Injector: urlDir: " + urlDir);
- }
-
- Path tempDir = new Path(getConf().get("mapred.temp.dir", ".")
- + "/inject-temp-"
- + Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
-
- // map text input file to a <url,CrawlDatum> file
- if (LOG.isInfoEnabled()) {
LOG.info("Injector: Converting injected urls to crawl db entries.");
}
- FileSystem fs = FileSystem.get(getConf());
- // determine if the crawldb already exists
- boolean dbExists = fs.exists(crawlDb);
-
- JobConf sortJob = new NutchJob(getConf());
- sortJob.setJobName("inject " + urlDir);
- FileInputFormat.addInputPath(sortJob, urlDir);
- sortJob.setMapperClass(InjectMapper.class);
-
- FileOutputFormat.setOutputPath(sortJob, tempDir);
- if (dbExists) {
- // Don't run merge injected urls, wait for merge with
- // existing DB
- sortJob.setOutputFormat(SequenceFileOutputFormat.class);
- sortJob.setNumReduceTasks(0);
- } else {
- sortJob.setOutputFormat(MapFileOutputFormat.class);
- sortJob.setReducerClass(InjectReducer.class);
- sortJob.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs",
- false);
- }
- sortJob.setOutputKeyClass(Text.class);
- sortJob.setOutputValueClass(CrawlDatum.class);
- sortJob.setLong("injector.current.time", System.currentTimeMillis());
+ // set configuration
+ Configuration conf = getConf();
+ conf.setLong("injector.current.time", System.currentTimeMillis());
+ conf.setBoolean("db.injector.overwrite", overwrite);
+ conf.setBoolean("db.injector.update", update);
+ conf.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", false);
+
+ // create all the required paths
+ FileSystem fs = FileSystem.get(conf);
+ Path current = new Path(crawlDb, CrawlDb.CURRENT_NAME);
+ if (!fs.exists(current))
+ fs.mkdirs(current);
+
+ Path tempCrawlDb = new Path(crawlDb,
+ "crawldb-" + Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
+
+ // lock an existing crawldb to prevent multiple simultaneous updates
+ Path lock = new Path(crawlDb, CrawlDb.LOCK_NAME);
+ LockUtil.createLockFile(fs, lock, false);
+
+ // configure job
+ Job job = Job.getInstance(conf, "inject " + urlDir);
+ job.setJarByClass(Injector.class);
+ job.setMapperClass(InjectMapper.class);
+ job.setReducerClass(InjectReducer.class);
+ job.setOutputFormatClass(MapFileOutputFormat.class);
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(CrawlDatum.class);
+ job.setSpeculativeExecution(false);
+
+ // set input and output paths of the job
+ MultipleInputs.addInputPath(job, current, SequenceFileInputFormat.class);
+ MultipleInputs.addInputPath(job, urlDir, KeyValueTextInputFormat.class);
+ FileOutputFormat.setOutputPath(job, tempCrawlDb);
- RunningJob mapJob = null;
try {
- mapJob = JobClient.runJob(sortJob);
- } catch (IOException e) {
- fs.delete(tempDir, true);
- throw e;
- }
- long urlsInjected = mapJob.getCounters()
- .findCounter("injector", "urls_injected").getValue();
- long urlsFiltered = mapJob.getCounters()
- .findCounter("injector", "urls_filtered").getValue();
- LOG.info("Injector: Total number of urls rejected by filters: "
- + urlsFiltered);
- LOG.info("Injector: Total number of urls after normalization: "
- + urlsInjected);
- long urlsMerged = 0;
- if (dbExists) {
- // merge with existing crawl db
+ // run the job
+ job.waitForCompletion(true);
+
+ // save output and perform cleanup
+ CrawlDb.install(job, crawlDb);
+
if (LOG.isInfoEnabled()) {
- LOG.info("Injector: Merging injected urls into crawl db.");
+ long urlsInjected = job.getCounters()
+ .findCounter("injector", "urls_injected").getValue();
+ long urlsFiltered = job.getCounters()
+ .findCounter("injector", "urls_filtered").getValue();
+ long urlsMerged = job.getCounters()
+ .findCounter("injector", "urls_merged").getValue();
+ LOG.info("Injector: Total urls rejected by filters: " + urlsFiltered);
+ LOG.info(
+ "Injector: Total urls injected after normalization and filtering: "
+ + urlsInjected);
+ LOG.info("Injector: Total urls injected but already in CrawlDb: "
+ + urlsMerged);
+ LOG.info("Injector: Total new urls injected: "
+ + (urlsInjected - urlsMerged));
+
+ long end = System.currentTimeMillis();
+ LOG.info("Injector: finished at " + sdf.format(end) + ", elapsed: "
+ + TimingUtil.elapsedTime(start, end));
}
- JobConf mergeJob = CrawlDb.createJob(getConf(), crawlDb);
- FileInputFormat.addInputPath(mergeJob, tempDir);
- mergeJob.setReducerClass(InjectReducer.class);
- try {
- RunningJob merge = JobClient.runJob(mergeJob);
- urlsMerged = merge.getCounters().findCounter("injector", "urls_merged")
- .getValue();
- LOG.info("Injector: URLs merged: " + urlsMerged);
- } catch (IOException e) {
- fs.delete(tempDir, true);
- throw e;
+ } catch (Exception e) {
+ if (fs.exists(tempCrawlDb)) {
+ fs.delete(tempCrawlDb, true);
}
- CrawlDb.install(mergeJob, crawlDb);
- } else {
- CrawlDb.install(sortJob, crawlDb);
+ LockUtil.removeLockFile(fs, lock);
+ throw e;
}
+ }
- // clean up
- fs.delete(tempDir, true);
- LOG.info("Injector: Total new urls injected: "
- + (urlsInjected - urlsMerged));
- long end = System.currentTimeMillis();
- LOG.info("Injector: finished at " + sdf.format(end) + ", elapsed: "
- + TimingUtil.elapsedTime(start, end));
+ public void usage() {
+ System.err.println(
+ "Usage: Injector <crawldb> <url_dir> [-overwrite] [-update]\n");
+ System.err.println(
+ " <crawldb>\tPath to a crawldb directory. If not present, a new one would be created.");
+ System.err.println(
+ " <url_dir>\tPath to directory with URL file(s) containing urls to be injected. A URL file");
+ System.err.println(
+ " \tshould have one URL per line, optionally followed by custom metadata.");
+ System.err.println(
+ " \tBlank lines or lines starting with a '#' would be ignored. Custom metadata must");
+ System.err
+ .println(" \tbe of form 'key=value' and separated by tabs.");
+ System.err.println(" \tBelow are reserved metadata keys:\n");
+ System.err.println(" \t\tnutch.score: A custom score for a url");
+ System.err.println(
+ " \t\tnutch.fetchInterval: A custom fetch interval for a url");
+ System.err.println(
+ " \t\tnutch.fetchInterval.fixed: A custom fetch interval for a url that is not "
+ + "changed by AdaptiveFetchSchedule\n");
+ System.err.println(" \tExample:");
+ System.err.println(" \t http://www.apache.org/");
+ System.err.println(
+ " \t http://www.nutch.org/ \\t nutch.score=10 \\t nutch.fetchInterval=2592000 \\t userType=open_source\n");
+ System.err.println(
+ " -overwrite\tOverwite existing crawldb records by the injected records. Has precedence over 'update'");
+ System.err.println(
+ " -update \tUpdate existing crawldb records with the injected records. Old metadata is preserved");
}
public static void main(String[] args) throws Exception {
@@ -372,11 +443,27 @@ public class Injector extends NutchTool implements Tool {
public int run(String[] args) throws Exception {
if (args.length < 2) {
- System.err.println("Usage: Injector <crawldb> <url_dir>");
+ usage();
return -1;
}
+
+ boolean overwrite = false;
+ boolean update = false;
+
+ for (int i = 2; i < args.length; i++) {
+ if (args[i].equals("-overwrite")) {
+ overwrite = true;
+ } else if (args[i].equals("-update")) {
+ update = true;
+ } else {
+ LOG.info("Injector: Found invalid argument \"" + args[i] + "\"\n");
+ usage();
+ return -1;
+ }
+ }
+
try {
- inject(new Path(args[0]), new Path(args[1]));
+ inject(new Path(args[0]), new Path(args[1]), overwrite, update);
return 0;
} catch (Exception e) {
LOG.error("Injector: " + StringUtils.stringifyException(e));
@@ -384,43 +471,39 @@ public class Injector extends NutchTool implements Tool {
}
}
- @Override
/**
* Used by the Nutch REST service
*/
- public Map<String, Object> run(Map<String, Object> args, String crawlId) throws Exception {
- if(args.size()<1){
+ public Map<String, Object> run(Map<String, Object> args, String crawlId)
+ throws Exception {
+ if (args.size() < 1) {
throw new IllegalArgumentException("Required arguments <url_dir>");
}
Map<String, Object> results = new HashMap<String, Object>();
Path crawlDb;
- if(args.containsKey(Nutch.ARG_CRAWLDB)) {
+ if (args.containsKey(Nutch.ARG_CRAWLDB)) {
Object crawldbPath = args.get(Nutch.ARG_CRAWLDB);
- if(crawldbPath instanceof Path) {
+ if (crawldbPath instanceof Path) {
crawlDb = (Path) crawldbPath;
- }
- else {
+ } else {
crawlDb = new Path(crawldbPath.toString());
}
- }
- else {
- crawlDb = new Path(crawlId+"/crawldb");
+ } else {
+ crawlDb = new Path(crawlId + "/crawldb");
}
Path input;
Object path = args.get(Nutch.ARG_SEEDDIR);
- if(path instanceof Path) {
+ if (path instanceof Path) {
input = (Path) path;
- }
- else {
+ } else {
input = new Path(path.toString());
}
inject(crawlDb, input);
results.put(Nutch.VAL_RESULT, Integer.toString(0));
return results;
-
}
}