You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by jn...@apache.org on 2014/05/16 09:59:05 UTC
svn commit: r1595137 - in /nutch/trunk: CHANGES.txt
src/java/org/apache/nutch/crawl/Injector.java
Author: jnioche
Date: Fri May 16 07:59:05 2014
New Revision: 1595137
URL: http://svn.apache.org/r1595137
Log:
NUTCH-1772 Injector does not need merging if no pre-existing crawldb
Modified:
nutch/trunk/CHANGES.txt
nutch/trunk/src/java/org/apache/nutch/crawl/Injector.java
Modified: nutch/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/nutch/trunk/CHANGES.txt?rev=1595137&r1=1595136&r2=1595137&view=diff
==============================================================================
--- nutch/trunk/CHANGES.txt (original)
+++ nutch/trunk/CHANGES.txt Fri May 16 07:59:05 2014
@@ -2,6 +2,8 @@ Nutch Change Log
Nutch Current Development
+* NUTCH-1772 Injector does not need merging if no pre-existing crawldb (jnioche)
+
* NUTCH-1752 Cache robots.txt rules per protocol:host:port (snagel)
* NUTCH-1613 Timeouts in protocol-httpclient when crawling same host with >2 threads (brian44 via jnioche)
Modified: nutch/trunk/src/java/org/apache/nutch/crawl/Injector.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/crawl/Injector.java?rev=1595137&r1=1595136&r2=1595137&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/crawl/Injector.java (original)
+++ nutch/trunk/src/java/org/apache/nutch/crawl/Injector.java Fri May 16 07:59:05 2014
@@ -39,28 +39,38 @@ import org.apache.nutch.util.NutchConfig
import org.apache.nutch.util.NutchJob;
import org.apache.nutch.util.TimingUtil;
-/** This class takes a flat file of URLs and adds them to the of pages to be
- * crawled. Useful for bootstrapping the system.
- * The URL files contain one URL per line, optionally followed by custom metadata
- * separated by tabs with the metadata key separated from the corresponding value by '='. <br>
+/**
+ * This class takes a flat file of URLs and adds them to the of pages to be
+ * crawled. Useful for bootstrapping the system. The URL files contain one URL
+ * per line, optionally followed by custom metadata separated by tabs with the
+ * metadata key separated from the corresponding value by '='. <br>
* Note that some metadata keys are reserved : <br>
* - <i>nutch.score</i> : allows to set a custom score for a specific URL <br>
- * - <i>nutch.fetchInterval</i> : allows to set a custom fetch interval for a specific URL <br>
- * - <i>nutch.fetchInterval.fixed</i> : allows to set a custom fetch interval for a specific URL that is not changed by AdaptiveFetchSchedule <br>
- * e.g. http://www.nutch.org/ \t nutch.score=10 \t nutch.fetchInterval=2592000 \t userType=open_source
+ * - <i>nutch.fetchInterval</i> : allows to set a custom fetch interval for a
+ * specific URL <br>
+ * - <i>nutch.fetchInterval.fixed</i> : allows to set a custom fetch interval
+ * for a specific URL that is not changed by AdaptiveFetchSchedule <br>
+ * e.g. http://www.nutch.org/ \t nutch.score=10 \t nutch.fetchInterval=2592000
+ * \t userType=open_source
**/
public class Injector extends Configured implements Tool {
public static final Logger LOG = LoggerFactory.getLogger(Injector.class);
-
+
/** metadata key reserved for setting a custom score for a specific URL */
public static String nutchScoreMDName = "nutch.score";
- /** metadata key reserved for setting a custom fetchInterval for a specific URL */
+ /**
+ * metadata key reserved for setting a custom fetchInterval for a specific URL
+ */
public static String nutchFetchIntervalMDName = "nutch.fetchInterval";
- /** metadata key reserved for setting a fixed custom fetchInterval for a specific URL */
+ /**
+ * metadata key reserved for setting a fixed custom fetchInterval for a
+ * specific URL
+ */
public static String nutchFixedFetchIntervalMDName = "nutch.fetchInterval.fixed";
/** Normalize and filter injected urls. */
- public static class InjectMapper implements Mapper<WritableComparable<?>, Text, Text, CrawlDatum> {
+ public static class InjectMapper implements
+ Mapper<WritableComparable<?>, Text, Text, CrawlDatum> {
private URLNormalizers urlNormalizers;
private int interval;
private float scoreInjected;
@@ -76,19 +86,21 @@ public class Injector extends Configured
filters = new URLFilters(jobConf);
scfilters = new ScoringFilters(jobConf);
scoreInjected = jobConf.getFloat("db.score.injected", 1.0f);
- curTime = job.getLong("injector.current.time", System.currentTimeMillis());
+ curTime = job
+ .getLong("injector.current.time", System.currentTimeMillis());
}
- public void close() {}
+ public void close() {
+ }
public void map(WritableComparable<?> key, Text value,
- OutputCollector<Text, CrawlDatum> output, Reporter reporter)
- throws IOException {
- String url = value.toString().trim(); // value is line of text
-
- if (url != null && ( url.length() == 0 || url.startsWith("#") ) ) {
- /* Ignore line that start with # */
- return;
+ OutputCollector<Text, CrawlDatum> output, Reporter reporter)
+ throws IOException {
+ String url = value.toString().trim(); // value is line of text
+
+ if (url != null && (url.length() == 0 || url.startsWith("#"))) {
+ /* Ignore line that start with # */
+ return;
}
// if tabs : metadata that could be stored
@@ -96,55 +108,60 @@ public class Injector extends Configured
float customScore = -1f;
int customInterval = interval;
int fixedInterval = -1;
- Map<String,String> metadata = new TreeMap<String,String>();
- if (url.indexOf("\t")!=-1){
- String[] splits = url.split("\t");
- url = splits[0];
- for (int s=1;s<splits.length;s++){
- // find separation between name and value
- int indexEquals = splits[s].indexOf("=");
- if (indexEquals==-1) {
- // skip anything without a =
- continue;
- }
- String metaname = splits[s].substring(0, indexEquals);
- String metavalue = splits[s].substring(indexEquals+1);
- if (metaname.equals(nutchScoreMDName)) {
- try {
- customScore = Float.parseFloat(metavalue);}
- catch (NumberFormatException nfe){}
- }
- else if (metaname.equals(nutchFetchIntervalMDName)) {
- try {
- customInterval = Integer.parseInt(metavalue);}
- catch (NumberFormatException nfe){}
- }
- else if (metaname.equals(nutchFixedFetchIntervalMDName)) {
- try {
- fixedInterval = Integer.parseInt(metavalue);}
- catch (NumberFormatException nfe){}
- }
- else metadata.put(metaname,metavalue);
- }
+ Map<String, String> metadata = new TreeMap<String, String>();
+ if (url.indexOf("\t") != -1) {
+ String[] splits = url.split("\t");
+ url = splits[0];
+ for (int s = 1; s < splits.length; s++) {
+ // find separation between name and value
+ int indexEquals = splits[s].indexOf("=");
+ if (indexEquals == -1) {
+ // skip anything without a =
+ continue;
+ }
+ String metaname = splits[s].substring(0, indexEquals);
+ String metavalue = splits[s].substring(indexEquals + 1);
+ if (metaname.equals(nutchScoreMDName)) {
+ try {
+ customScore = Float.parseFloat(metavalue);
+ } catch (NumberFormatException nfe) {
+ }
+ } else if (metaname.equals(nutchFetchIntervalMDName)) {
+ try {
+ customInterval = Integer.parseInt(metavalue);
+ } catch (NumberFormatException nfe) {
+ }
+ } else if (metaname.equals(nutchFixedFetchIntervalMDName)) {
+ try {
+ fixedInterval = Integer.parseInt(metavalue);
+ } catch (NumberFormatException nfe) {
+ }
+ } else
+ metadata.put(metaname, metavalue);
+ }
}
try {
url = urlNormalizers.normalize(url, URLNormalizers.SCOPE_INJECT);
- url = filters.filter(url); // filter the url
+ url = filters.filter(url); // filter the url
} catch (Exception e) {
- if (LOG.isWarnEnabled()) { LOG.warn("Skipping " +url+":"+e); }
+ if (LOG.isWarnEnabled()) {
+ LOG.warn("Skipping " + url + ":" + e);
+ }
url = null;
}
if (url == null) {
reporter.getCounter("injector", "urls_filtered").increment(1);
- } else { // if it passes
- value.set(url); // collect it
+ } else { // if it passes
+ value.set(url); // collect it
CrawlDatum datum = new CrawlDatum();
datum.setStatus(CrawlDatum.STATUS_INJECTED);
// Is interval custom? Then set as meta data
if (fixedInterval > -1) {
- // Set writable using float. Flaot is used by AdaptiveFetchSchedule
- datum.getMetaData().put(Nutch.WRITABLE_FIXED_INTERVAL_KEY, new FloatWritable(fixedInterval));
+ // Set writable using float. Flaot is used by
+ // AdaptiveFetchSchedule
+ datum.getMetaData().put(Nutch.WRITABLE_FIXED_INTERVAL_KEY,
+ new FloatWritable(fixedInterval));
datum.setFetchInterval(fixedInterval);
} else {
datum.setFetchInterval(customInterval);
@@ -153,20 +170,22 @@ public class Injector extends Configured
datum.setFetchTime(curTime);
// now add the metadata
Iterator<String> keysIter = metadata.keySet().iterator();
- while (keysIter.hasNext()){
- String keymd = keysIter.next();
- String valuemd = metadata.get(keymd);
- datum.getMetaData().put(new Text(keymd), new Text(valuemd));
+ while (keysIter.hasNext()) {
+ String keymd = keysIter.next();
+ String valuemd = metadata.get(keymd);
+ datum.getMetaData().put(new Text(keymd), new Text(valuemd));
}
- if (customScore != -1) datum.setScore(customScore);
- else datum.setScore(scoreInjected);
+ if (customScore != -1)
+ datum.setScore(customScore);
+ else
+ datum.setScore(scoreInjected);
try {
- scfilters.injectedScore(value, datum);
+ scfilters.injectedScore(value, datum);
} catch (ScoringFilterException e) {
- if (LOG.isWarnEnabled()) {
- LOG.warn("Cannot filter injected score for url " + url
- + ", using default (" + e.getMessage() + ")");
- }
+ if (LOG.isWarnEnabled()) {
+ LOG.warn("Cannot filter injected score for url " + url
+ + ", using default (" + e.getMessage() + ")");
+ }
}
reporter.getCounter("injector", "urls_injected").increment(1);
output.collect(value, datum);
@@ -175,7 +194,8 @@ public class Injector extends Configured
}
/** Combine multiple new entries for a url. */
- public static class InjectReducer implements Reducer<Text, CrawlDatum, Text, CrawlDatum> {
+ public static class InjectReducer implements
+ Reducer<Text, CrawlDatum, Text, CrawlDatum> {
private int interval;
private float scoreInjected;
private boolean overwrite = false;
@@ -189,15 +209,16 @@ public class Injector extends Configured
LOG.info("Injector: overwrite: " + overwrite);
LOG.info("Injector: update: " + update);
}
-
- public void close() {}
+
+ public void close() {
+ }
private CrawlDatum old = new CrawlDatum();
private CrawlDatum injected = new CrawlDatum();
-
+
public void reduce(Text key, Iterator<CrawlDatum> values,
- OutputCollector<Text, CrawlDatum> output, Reporter reporter)
- throws IOException {
+ OutputCollector<Text, CrawlDatum> output, Reporter reporter)
+ throws IOException {
boolean oldSet = false;
boolean injectedSet = false;
while (values.hasNext()) {
@@ -210,29 +231,35 @@ public class Injector extends Configured
old.set(val);
oldSet = true;
}
+
}
CrawlDatum res = null;
-
+
// Old default behaviour
if (injectedSet && !oldSet) {
res = injected;
} else {
res = old;
}
-
+ if (injectedSet && oldSet) {
+ reporter.getCounter("injector", "urls_merged").increment(1);
+ }
/**
* Whether to overwrite, ignore or update existing records
+ *
* @see https://issues.apache.org/jira/browse/NUTCH-1405
*/
// Injected record already exists and update but not overwrite
if (injectedSet && oldSet && update && !overwrite) {
res = old;
old.putAllMetaData(injected);
- old.setScore(injected.getScore() != scoreInjected ? injected.getScore() : old.getScore());
- old.setFetchInterval(injected.getFetchInterval() != interval ? injected.getFetchInterval() : old.getFetchInterval());
+ old.setScore(injected.getScore() != scoreInjected ? injected.getScore()
+ : old.getScore());
+ old.setFetchInterval(injected.getFetchInterval() != interval ? injected
+ .getFetchInterval() : old.getFetchInterval());
}
-
+
// Injected record already exists and overwrite
if (injectedSet && oldSet && overwrite) {
res = injected;
@@ -242,12 +269,13 @@ public class Injector extends Configured
}
}
- public Injector() {}
-
+ public Injector() {
+ }
+
public Injector(Configuration conf) {
setConf(conf);
}
-
+
public void inject(Path crawlDb, Path urlDir) throws IOException {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
long start = System.currentTimeMillis();
@@ -257,56 +285,92 @@ public class Injector extends Configured
LOG.info("Injector: urlDir: " + urlDir);
}
- Path tempDir =
- new Path(getConf().get("mapred.temp.dir", ".") +
- "/inject-temp-"+
- Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
+ Path tempDir = new Path(getConf().get("mapred.temp.dir", ".")
+ + "/inject-temp-"
+ + Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
// map text input file to a <url,CrawlDatum> file
if (LOG.isInfoEnabled()) {
LOG.info("Injector: Converting injected urls to crawl db entries.");
}
+
+ FileSystem fs = FileSystem.get(getConf());
+ // determine if the crawldb already exists
+ boolean dbExists = fs.exists(crawlDb);
+
JobConf sortJob = new NutchJob(getConf());
sortJob.setJobName("inject " + urlDir);
FileInputFormat.addInputPath(sortJob, urlDir);
sortJob.setMapperClass(InjectMapper.class);
FileOutputFormat.setOutputPath(sortJob, tempDir);
- sortJob.setOutputFormat(SequenceFileOutputFormat.class);
+ if (dbExists) {
+ // Don't run merge injected urls, wait for merge with
+ // existing DB
+ sortJob.setOutputFormat(SequenceFileOutputFormat.class);
+ sortJob.setNumReduceTasks(0);
+ } else {
+ sortJob.setOutputFormat(MapFileOutputFormat.class);
+ sortJob.setReducerClass(InjectReducer.class);
+ sortJob.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs",
+ false);
+ }
sortJob.setOutputKeyClass(Text.class);
sortJob.setOutputValueClass(CrawlDatum.class);
sortJob.setLong("injector.current.time", System.currentTimeMillis());
- RunningJob mapJob = JobClient.runJob(sortJob);
- long urlsInjected = mapJob.getCounters().findCounter("injector", "urls_injected").getValue();
- long urlsFiltered = mapJob.getCounters().findCounter("injector", "urls_filtered").getValue();
- LOG.info("Injector: total number of urls rejected by filters: " + urlsFiltered);
- LOG.info("Injector: total number of urls injected after normalization and filtering: "
+ RunningJob mapJob = null;
+ try {
+ mapJob = JobClient.runJob(sortJob);
+ } catch (IOException e) {
+ fs.delete(tempDir, true);
+ throw e;
+ }
+ long urlsInjected = mapJob.getCounters()
+ .findCounter("injector", "urls_injected").getValue();
+ long urlsFiltered = mapJob.getCounters()
+ .findCounter("injector", "urls_filtered").getValue();
+ LOG.info("Injector: Total number of urls rejected by filters: "
+ + urlsFiltered);
+ LOG.info("Injector: Total number of urls after normalization: "
+ urlsInjected);
-
- // merge with existing crawl db
- if (LOG.isInfoEnabled()) {
- LOG.info("Injector: Merging injected urls into crawl db.");
+ long urlsMerged = 0;
+ if (dbExists) {
+ // merge with existing crawl db
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Injector: Merging injected urls into crawl db.");
+ }
+ JobConf mergeJob = CrawlDb.createJob(getConf(), crawlDb);
+ FileInputFormat.addInputPath(mergeJob, tempDir);
+ mergeJob.setReducerClass(InjectReducer.class);
+ try {
+ RunningJob merge = JobClient.runJob(mergeJob);
+ urlsMerged = merge.getCounters().findCounter("injector", "urls_merged")
+ .getValue();
+ LOG.info("Injector: URLs merged: " + urlsMerged);
+ } catch (IOException e) {
+ fs.delete(tempDir, true);
+ throw e;
+ }
+ CrawlDb.install(mergeJob, crawlDb);
+ } else {
+ CrawlDb.install(sortJob, crawlDb);
}
- JobConf mergeJob = CrawlDb.createJob(getConf(), crawlDb);
- FileInputFormat.addInputPath(mergeJob, tempDir);
- mergeJob.setReducerClass(InjectReducer.class);
- JobClient.runJob(mergeJob);
- CrawlDb.install(mergeJob, crawlDb);
// clean up
- FileSystem fs = FileSystem.get(getConf());
fs.delete(tempDir, true);
-
+ LOG.info("Injector: Total new urls injected: "
+ + (urlsInjected - urlsMerged));
long end = System.currentTimeMillis();
- LOG.info("Injector: finished at " + sdf.format(end) + ", elapsed: " + TimingUtil.elapsedTime(start, end));
+ LOG.info("Injector: finished at " + sdf.format(end) + ", elapsed: "
+ + TimingUtil.elapsedTime(start, end));
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(NutchConfiguration.create(), new Injector(), args);
System.exit(res);
}
-
+
public int run(String[] args) throws Exception {
if (args.length < 2) {
System.err.println("Usage: Injector <crawldb> <url_dir>");