You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by th...@apache.org on 2016/07/16 19:49:03 UTC
[47/51] [partial] nutch git commit: NUTCH-2292 : Mavenize the build
for nutch-core and nutch-plugins
http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/crawl/LinkDbMerger.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/crawl/LinkDbMerger.java b/nutch-core/src/main/java/org/apache/nutch/crawl/LinkDbMerger.java
new file mode 100644
index 0000000..39923ac
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/crawl/LinkDbMerger.java
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.crawl;
+
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Random;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapFileOutputFormat;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.nutch.util.NutchConfiguration;
+import org.apache.nutch.util.NutchJob;
+import org.apache.nutch.util.TimingUtil;
+
+/**
+ * This tool merges several LinkDb-s into one, optionally filtering URLs through
+ * the current URLFilters, to skip prohibited URLs and links.
+ *
+ * <p>
+ * It's possible to use this tool just for filtering - in that case only one
+ * LinkDb should be specified in arguments.
+ * </p>
+ * <p>
+ * If more than one LinkDb contains information about the same URL, all inlinks
+ * are accumulated, but only at most <code>linkdb.max.inlinks</code> inlinks will
+ * ever be added.
+ * </p>
+ * <p>
+ * If activated, URLFilters will be applied to both the target URLs and to any
+ * incoming link URL. If a target URL is prohibited, all inlinks to that target
+ * will be removed, including the target URL. If some of incoming links are
+ * prohibited, only they will be removed, and they won't count when checking the
+ * above-mentioned maximum limit.
+ *
+ * @author Andrzej Bialecki
+ */
+public class LinkDbMerger extends Configured implements Tool,
+ Reducer<Text, Inlinks, Text, Inlinks> {
+ private static final Logger LOG = LoggerFactory.getLogger(LinkDbMerger.class);
+
+ private int maxInlinks;
+
+ public LinkDbMerger() {
+
+ }
+
+ public LinkDbMerger(Configuration conf) {
+ setConf(conf);
+ }
+
+ public void reduce(Text key, Iterator<Inlinks> values,
+ OutputCollector<Text, Inlinks> output, Reporter reporter)
+ throws IOException {
+
+ Inlinks result = new Inlinks();
+
+ while (values.hasNext()) {
+ Inlinks inlinks = values.next();
+
+ int end = Math.min(maxInlinks - result.size(), inlinks.size());
+ Iterator<Inlink> it = inlinks.iterator();
+ int i = 0;
+ while (it.hasNext() && i++ < end) {
+ result.add(it.next());
+ }
+ }
+ if (result.size() == 0)
+ return;
+ output.collect(key, result);
+
+ }
+
+ public void configure(JobConf job) {
+ maxInlinks = job.getInt("linkdb.max.inlinks", 10000);
+ }
+
+ public void close() throws IOException {
+ }
+
+ public void merge(Path output, Path[] dbs, boolean normalize, boolean filter)
+ throws Exception {
+ SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ long start = System.currentTimeMillis();
+ LOG.info("LinkDb merge: starting at " + sdf.format(start));
+
+ JobConf job = createMergeJob(getConf(), output, normalize, filter);
+ for (int i = 0; i < dbs.length; i++) {
+ FileInputFormat.addInputPath(job, new Path(dbs[i], LinkDb.CURRENT_NAME));
+ }
+ JobClient.runJob(job);
+ FileSystem fs = FileSystem.get(getConf());
+ fs.mkdirs(output);
+ fs.rename(FileOutputFormat.getOutputPath(job), new Path(output,
+ LinkDb.CURRENT_NAME));
+
+ long end = System.currentTimeMillis();
+ LOG.info("LinkDb merge: finished at " + sdf.format(end) + ", elapsed: "
+ + TimingUtil.elapsedTime(start, end));
+ }
+
+ public static JobConf createMergeJob(Configuration config, Path linkDb,
+ boolean normalize, boolean filter) {
+ Path newLinkDb = new Path("linkdb-merge-"
+ + Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
+
+ JobConf job = new NutchJob(config);
+ job.setJobName("linkdb merge " + linkDb);
+
+ job.setInputFormat(SequenceFileInputFormat.class);
+
+ job.setMapperClass(LinkDbFilter.class);
+ job.setBoolean(LinkDbFilter.URL_NORMALIZING, normalize);
+ job.setBoolean(LinkDbFilter.URL_FILTERING, filter);
+ job.setReducerClass(LinkDbMerger.class);
+
+ FileOutputFormat.setOutputPath(job, newLinkDb);
+ job.setOutputFormat(MapFileOutputFormat.class);
+ job.setBoolean("mapred.output.compress", true);
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(Inlinks.class);
+
+ // https://issues.apache.org/jira/browse/NUTCH-1069
+ job.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", false);
+
+ return job;
+ }
+
+ /**
+ * @param args
+ */
+ public static void main(String[] args) throws Exception {
+ int res = ToolRunner.run(NutchConfiguration.create(), new LinkDbMerger(),
+ args);
+ System.exit(res);
+ }
+
+ public int run(String[] args) throws Exception {
+ if (args.length < 2) {
+ System.err
+ .println("Usage: LinkDbMerger <output_linkdb> <linkdb1> [<linkdb2> <linkdb3> ...] [-normalize] [-filter]");
+ System.err.println("\toutput_linkdb\toutput LinkDb");
+ System.err
+ .println("\tlinkdb1 ...\tinput LinkDb-s (single input LinkDb is ok)");
+ System.err
+ .println("\t-normalize\tuse URLNormalizer on both fromUrls and toUrls in linkdb(s) (usually not needed)");
+ System.err
+ .println("\t-filter\tuse URLFilters on both fromUrls and toUrls in linkdb(s)");
+ return -1;
+ }
+ Path output = new Path(args[0]);
+ ArrayList<Path> dbs = new ArrayList<Path>();
+ boolean normalize = false;
+ boolean filter = false;
+ for (int i = 1; i < args.length; i++) {
+ if (args[i].equals("-filter")) {
+ filter = true;
+ } else if (args[i].equals("-normalize")) {
+ normalize = true;
+ } else
+ dbs.add(new Path(args[i]));
+ }
+ try {
+ merge(output, dbs.toArray(new Path[dbs.size()]), normalize, filter);
+ return 0;
+ } catch (Exception e) {
+ LOG.error("LinkDbMerger: " + StringUtils.stringifyException(e));
+ return -1;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/crawl/LinkDbReader.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/crawl/LinkDbReader.java b/nutch-core/src/main/java/org/apache/nutch/crawl/LinkDbReader.java
new file mode 100644
index 0000000..2e50e9a
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/crawl/LinkDbReader.java
@@ -0,0 +1,203 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.crawl;
+
+import java.io.IOException;
+
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+// Commons Logging imports
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.mapred.*;
+import org.apache.hadoop.mapred.lib.HashPartitioner;
+import org.apache.hadoop.util.*;
+import org.apache.hadoop.conf.Configuration;
+
+import org.apache.nutch.util.NutchConfiguration;
+import org.apache.nutch.util.NutchJob;
+import org.apache.nutch.util.TimingUtil;
+
+import java.text.SimpleDateFormat;
+import java.util.Iterator;
+import java.io.Closeable;
+
+/** . */
+public class LinkDbReader extends Configured implements Tool, Closeable {
+ public static final Logger LOG = LoggerFactory.getLogger(LinkDbReader.class);
+
+ private static final Partitioner<WritableComparable, Writable> PARTITIONER = new HashPartitioner<WritableComparable, Writable>();
+
+ private FileSystem fs;
+ private Path directory;
+ private MapFile.Reader[] readers;
+
+ public LinkDbReader() {
+
+ }
+
+ public LinkDbReader(Configuration conf, Path directory) throws Exception {
+ setConf(conf);
+ init(directory);
+ }
+
+ public void init(Path directory) throws Exception {
+ this.fs = FileSystem.get(getConf());
+ this.directory = directory;
+ }
+
+ public String[] getAnchors(Text url) throws IOException {
+ Inlinks inlinks = getInlinks(url);
+ if (inlinks == null)
+ return null;
+ return inlinks.getAnchors();
+ }
+
+ public Inlinks getInlinks(Text url) throws IOException {
+
+ if (readers == null) {
+ synchronized (this) {
+ readers = MapFileOutputFormat.getReaders(fs, new Path(directory,
+ LinkDb.CURRENT_NAME), getConf());
+ }
+ }
+
+ return (Inlinks) MapFileOutputFormat.getEntry(readers, PARTITIONER, url,
+ new Inlinks());
+ }
+
+ public void close() throws IOException {
+ if (readers != null) {
+ for (int i = 0; i < readers.length; i++) {
+ readers[i].close();
+ }
+ }
+ }
+
+ public static class LinkDBDumpMapper implements Mapper<Text, Inlinks, Text, Inlinks> {
+ Pattern pattern = null;
+ Matcher matcher = null;
+
+ public void configure(JobConf job) {
+ if (job.get("linkdb.regex", null) != null) {
+ pattern = Pattern.compile(job.get("linkdb.regex"));
+ }
+ }
+
+ public void close() {}
+ public void map(Text key, Inlinks value, OutputCollector<Text, Inlinks> output, Reporter reporter)
+ throws IOException {
+
+ if (pattern != null) {
+ matcher = pattern.matcher(key.toString());
+ if (!matcher.matches()) {
+ return;
+ }
+ }
+
+ output.collect(key, value);
+ }
+ }
+
+ public void processDumpJob(String linkdb, String output, String regex) throws IOException {
+ SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ long start = System.currentTimeMillis();
+ if (LOG.isInfoEnabled()) {
+ LOG.info("LinkDb dump: starting at " + sdf.format(start));
+ LOG.info("LinkDb dump: db: " + linkdb);
+ }
+ Path outFolder = new Path(output);
+
+ JobConf job = new NutchJob(getConf());
+ job.setJobName("read " + linkdb);
+
+ if (regex != null) {
+ job.set("linkdb.regex", regex);
+ job.setMapperClass(LinkDBDumpMapper.class);
+ }
+
+ FileInputFormat.addInputPath(job, new Path(linkdb, LinkDb.CURRENT_NAME));
+ job.setInputFormat(SequenceFileInputFormat.class);
+
+ FileOutputFormat.setOutputPath(job, outFolder);
+ job.setOutputFormat(TextOutputFormat.class);
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(Inlinks.class);
+
+ JobClient.runJob(job);
+
+ long end = System.currentTimeMillis();
+ LOG.info("LinkDb dump: finished at " + sdf.format(end) + ", elapsed: "
+ + TimingUtil.elapsedTime(start, end));
+ }
+
+ public static void main(String[] args) throws Exception {
+ int res = ToolRunner.run(NutchConfiguration.create(), new LinkDbReader(),
+ args);
+ System.exit(res);
+ }
+
+ public int run(String[] args) throws Exception {
+ if (args.length < 2) {
+ System.err
+ .println("Usage: LinkDbReader <linkdb> (-dump <out_dir> [-regex <regex>]) | -url <url>");
+ System.err
+ .println("\t-dump <out_dir>\tdump whole link db to a text file in <out_dir>");
+ System.err
+ .println("\t\t-regex <regex>\trestrict to url's matching expression");
+ System.err
+ .println("\t-url <url>\tprint information about <url> to System.out");
+ return -1;
+ }
+ try {
+ if (args[1].equals("-dump")) {
+ String regex = null;
+ for (int i = 2; i < args.length; i++) {
+ if (args[i].equals("-regex")) {
+ regex = args[++i];
+ }
+ }
+ processDumpJob(args[0], args[2], regex);
+ return 0;
+ } else if (args[1].equals("-url")) {
+ init(new Path(args[0]));
+ Inlinks links = getInlinks(new Text(args[2]));
+ if (links == null) {
+ System.out.println(" - no link information.");
+ } else {
+ Iterator<Inlink> it = links.iterator();
+ while (it.hasNext()) {
+ System.out.println(it.next().toString());
+ }
+ }
+ return 0;
+ } else {
+ System.err.println("Error: wrong argument " + args[1]);
+ return -1;
+ }
+ } catch (Exception e) {
+ LOG.error("LinkDbReader: " + StringUtils.stringifyException(e));
+ return -1;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/crawl/MD5Signature.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/crawl/MD5Signature.java b/nutch-core/src/main/java/org/apache/nutch/crawl/MD5Signature.java
new file mode 100644
index 0000000..f6ec8dd
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/crawl/MD5Signature.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.crawl;
+
+import org.apache.hadoop.io.MD5Hash;
+import org.apache.nutch.parse.Parse;
+import org.apache.nutch.protocol.Content;
+
+/**
+ * Default implementation of a page signature. It calculates an MD5 hash of the
+ * raw binary content of a page. In case there is no content, it calculates a
+ * hash from the page's URL.
+ *
+ * @author Andrzej Bialecki <ab@getopt.org>
+ */
+public class MD5Signature extends Signature {
+
+ public byte[] calculate(Content content, Parse parse) {
+ byte[] data = content.getContent();
+ if (data == null)
+ data = content.getUrl().getBytes();
+ return MD5Hash.digest(data).getDigest();
+ }
+}
http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/crawl/MimeAdaptiveFetchSchedule.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/crawl/MimeAdaptiveFetchSchedule.java b/nutch-core/src/main/java/org/apache/nutch/crawl/MimeAdaptiveFetchSchedule.java
new file mode 100644
index 0000000..4fe5cef
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/crawl/MimeAdaptiveFetchSchedule.java
@@ -0,0 +1,236 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.crawl;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.Reader;
+import java.util.HashMap;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.nutch.crawl.CrawlDatum;
+import org.apache.nutch.metadata.HttpHeaders;
+import org.apache.nutch.util.MimeUtil;
+import org.apache.nutch.util.NutchConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Extension of @see AdaptiveFetchSchedule that allows for more flexible
+ * configuration of DEC and INC factors for various MIME-types.
+ *
+ * This class can be typically used in cases where a recrawl consists of many
+ * different MIME-types. It's not very common for MIME-types other than
+ * text/html to change frequently. Using this class you can configure different
+ * factors per MIME-type so to prefer frequently changing MIME-types over
+ * others.
+ *
+ * For it to work this class relies on the Content-Type MetaData key being
+ * present in the CrawlDB. This can either be done when injecting new URL's or
+ * by adding "Content-Type" to the db.parsemeta.to.crawldb configuration setting
+ * to force MIME-types of newly discovered URL's to be added to the CrawlDB.
+ *
+ * @author markus
+ */
+public class MimeAdaptiveFetchSchedule extends AdaptiveFetchSchedule {
+ // Loggg
+ public static final Logger LOG = LoggerFactory
+ .getLogger(MimeAdaptiveFetchSchedule.class);
+
+ // Conf directives
+ public static final String SCHEDULE_INC_RATE = "db.fetch.schedule.adaptive.inc_rate";
+ public static final String SCHEDULE_DEC_RATE = "db.fetch.schedule.adaptive.dec_rate";
+ public static final String SCHEDULE_MIME_FILE = "db.fetch.schedule.mime.file";
+
+ // Default values for DEC and INC rate
+ private float defaultIncRate;
+ private float defaultDecRate;
+
+ // Structure to store inc and dec rates per MIME-type
+ private class AdaptiveRate {
+ public float inc;
+ public float dec;
+
+ public AdaptiveRate(Float inc, Float dec) {
+ this.inc = inc;
+ this.dec = dec;
+ }
+ }
+
+ // Here we store the mime's and their delta's
+ private HashMap<String, AdaptiveRate> mimeMap;
+
+ public void setConf(Configuration conf) {
+ super.setConf(conf);
+ if (conf == null)
+ return;
+
+ // Read and set the default INC and DEC rates in case we cannot set values
+ // based on MIME-type
+ defaultIncRate = conf.getFloat(SCHEDULE_INC_RATE, 0.2f);
+ defaultDecRate = conf.getFloat(SCHEDULE_DEC_RATE, 0.2f);
+
+ // Where's the mime/factor file?
+ Reader mimeFile = conf.getConfResourceAsReader(conf.get(SCHEDULE_MIME_FILE,
+ "adaptive-mimetypes.txt"));
+
+ try {
+ readMimeFile(mimeFile);
+ } catch (IOException e) {
+ LOG.error(org.apache.hadoop.util.StringUtils.stringifyException(e));
+ }
+ }
+
+ @Override
+ public CrawlDatum setFetchSchedule(Text url, CrawlDatum datum,
+ long prevFetchTime, long prevModifiedTime, long fetchTime,
+ long modifiedTime, int state) {
+
+ // Set defaults
+ INC_RATE = defaultIncRate;
+ DEC_RATE = defaultDecRate;
+
+ // Check if the Content-Type field is available in the CrawlDatum
+ if (datum.getMetaData().containsKey(HttpHeaders.WRITABLE_CONTENT_TYPE)) {
+ // Get the MIME-type of the current URL
+ String currentMime = MimeUtil.cleanMimeType(datum.getMetaData()
+ .get(HttpHeaders.WRITABLE_CONTENT_TYPE).toString());
+
+ // Check if this MIME-type exists in our map
+ if (mimeMap.containsKey(currentMime)) {
+ // Yes, set the INC and DEC rates for this MIME-type
+ INC_RATE = mimeMap.get(currentMime).inc;
+ DEC_RATE = mimeMap.get(currentMime).dec;
+ }
+ }
+
+ return super.setFetchSchedule(url, datum, prevFetchTime, prevModifiedTime,
+ fetchTime, modifiedTime, state);
+ }
+
+ /**
+ * Reads the mime types and their associated INC/DEC factors in a HashMap
+ *
+ * @param mimeFile
+ * Reader
+ * @return void
+ */
+ private void readMimeFile(Reader mimeFile) throws IOException {
+ // Instance of our mime/factor map
+ mimeMap = new HashMap<String, AdaptiveRate>();
+
+ // Open a reader
+ BufferedReader reader = new BufferedReader(mimeFile);
+
+ String line = null;
+ String[] splits = null;
+
+ // Read all lines
+ while ((line = reader.readLine()) != null) {
+ // Skip blank lines and comments
+ if (StringUtils.isNotBlank(line) && !line.startsWith("#")) {
+ // Split the line by TAB
+ splits = line.split("\t");
+
+ // Sanity check, we need two or three items
+ if (splits.length == 3) {
+ // Add a lower cased MIME-type and the factor to the map
+ mimeMap.put(StringUtils.lowerCase(splits[0]), new AdaptiveRate(
+ new Float(splits[1]), new Float(splits[2])));
+ } else {
+ LOG.warn("Invalid configuration line in: " + line);
+ }
+ }
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ FetchSchedule fs = new MimeAdaptiveFetchSchedule();
+ fs.setConf(NutchConfiguration.create());
+ // we start the time at 0, for simplicity
+ long curTime = 0;
+ long delta = 1000L * 3600L * 24L; // 2 hours
+ // we trigger the update of the page every 30 days
+ long update = 1000L * 3600L * 24L * 30L; // 30 days
+ boolean changed = true;
+ long lastModified = 0;
+ int miss = 0;
+ int totalMiss = 0;
+ int maxMiss = 0;
+ int fetchCnt = 0;
+ int changeCnt = 0;
+
+ // initial fetchInterval is 10 days
+ CrawlDatum p = new CrawlDatum(1, 3600 * 24 * 30, 1.0f);
+
+ // Set a default MIME-type to test with
+ org.apache.hadoop.io.MapWritable x = new org.apache.hadoop.io.MapWritable();
+ x.put(HttpHeaders.WRITABLE_CONTENT_TYPE, new Text(
+ "text/html; charset=utf-8"));
+ p.setMetaData(x);
+
+ p.setFetchTime(0);
+ LOG.info(p.toString());
+
+ // let's move the timeline a couple of deltas
+ for (int i = 0; i < 10000; i++) {
+ if (lastModified + update < curTime) {
+ // System.out.println("i=" + i + ", lastModified=" + lastModified +
+ // ", update=" + update + ", curTime=" + curTime);
+ changed = true;
+ changeCnt++;
+ lastModified = curTime;
+ }
+
+ LOG.info(i + ". " + changed + "\twill fetch at "
+ + (p.getFetchTime() / delta) + "\tinterval "
+ + (p.getFetchInterval() / SECONDS_PER_DAY) + " days" + "\t missed "
+ + miss);
+
+ if (p.getFetchTime() <= curTime) {
+ fetchCnt++;
+ fs.setFetchSchedule(new Text("http://www.example.com"), p, p
+ .getFetchTime(), p.getModifiedTime(), curTime, lastModified,
+ changed ? FetchSchedule.STATUS_MODIFIED
+ : FetchSchedule.STATUS_NOTMODIFIED);
+
+ LOG.info("\tfetched & adjusted: " + "\twill fetch at "
+ + (p.getFetchTime() / delta) + "\tinterval "
+ + (p.getFetchInterval() / SECONDS_PER_DAY) + " days");
+
+ if (!changed)
+ miss++;
+ if (miss > maxMiss)
+ maxMiss = miss;
+ changed = false;
+ totalMiss += miss;
+ miss = 0;
+ }
+
+ if (changed)
+ miss++;
+ curTime += delta;
+ }
+ LOG.info("Total missed: " + totalMiss + ", max miss: " + maxMiss);
+ LOG.info("Page changed " + changeCnt + " times, fetched " + fetchCnt
+ + " times.");
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/crawl/NutchWritable.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/crawl/NutchWritable.java b/nutch-core/src/main/java/org/apache/nutch/crawl/NutchWritable.java
new file mode 100644
index 0000000..589b8b9
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/crawl/NutchWritable.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.crawl;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.nutch.util.GenericWritableConfigurable;
+
+@SuppressWarnings("unchecked")
+public class NutchWritable extends GenericWritableConfigurable {
+
+ private static Class<? extends Writable>[] CLASSES = null;
+
+ static {
+ CLASSES = (Class<? extends Writable>[]) new Class<?>[] {
+ org.apache.hadoop.io.NullWritable.class,
+ org.apache.hadoop.io.BooleanWritable.class,
+ org.apache.hadoop.io.LongWritable.class,
+ org.apache.hadoop.io.ByteWritable.class,
+ org.apache.hadoop.io.BytesWritable.class,
+ org.apache.hadoop.io.FloatWritable.class,
+ org.apache.hadoop.io.IntWritable.class,
+ org.apache.hadoop.io.MapWritable.class,
+ org.apache.hadoop.io.Text.class, org.apache.hadoop.io.MD5Hash.class,
+ org.apache.nutch.crawl.CrawlDatum.class,
+ org.apache.nutch.crawl.Inlink.class,
+ org.apache.nutch.crawl.Inlinks.class,
+ org.apache.nutch.indexer.NutchIndexAction.class,
+ org.apache.nutch.metadata.Metadata.class,
+ org.apache.nutch.parse.Outlink.class,
+ org.apache.nutch.parse.ParseText.class,
+ org.apache.nutch.parse.ParseData.class,
+ org.apache.nutch.parse.ParseImpl.class,
+ org.apache.nutch.parse.ParseStatus.class,
+ org.apache.nutch.protocol.Content.class,
+ org.apache.nutch.protocol.ProtocolStatus.class,
+ org.apache.nutch.scoring.webgraph.LinkDatum.class,
+ org.apache.nutch.hostdb.HostDatum.class };
+ }
+
+ public NutchWritable() {
+ }
+
+ public NutchWritable(Writable instance) {
+ set(instance);
+ }
+
+ @Override
+ protected Class<? extends Writable>[] getTypes() {
+ return CLASSES;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/crawl/Signature.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/crawl/Signature.java b/nutch-core/src/main/java/org/apache/nutch/crawl/Signature.java
new file mode 100644
index 0000000..21dfe07
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/crawl/Signature.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.crawl;
+
+import org.apache.nutch.parse.Parse;
+import org.apache.nutch.protocol.Content;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configurable;
+
+public abstract class Signature implements Configurable {
+ protected Configuration conf;
+
+ public abstract byte[] calculate(Content content, Parse parse);
+
+ public Configuration getConf() {
+ return conf;
+ }
+
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ }
+}
http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/crawl/SignatureComparator.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/crawl/SignatureComparator.java b/nutch-core/src/main/java/org/apache/nutch/crawl/SignatureComparator.java
new file mode 100644
index 0000000..d217d93
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/crawl/SignatureComparator.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.crawl;
+
+import java.util.Comparator;
+
+public class SignatureComparator implements Comparator<Object> {
+ public int compare(Object o1, Object o2) {
+ return _compare(o1, o2);
+ }
+
+ public static int _compare(Object o1, Object o2) {
+ if (o1 == null && o2 == null)
+ return 0;
+ if (o1 == null)
+ return -1;
+ if (o2 == null)
+ return 1;
+ if (!(o1 instanceof byte[]))
+ return -1;
+ if (!(o2 instanceof byte[]))
+ return 1;
+ byte[] data1 = (byte[]) o1;
+ byte[] data2 = (byte[]) o2;
+ return _compare(data1, 0, data1.length, data2, 0, data2.length);
+ }
+
+ public static int _compare(byte[] data1, int s1, int l1, byte[] data2,
+ int s2, int l2) {
+ if (l2 > l1)
+ return -1;
+ if (l2 < l1)
+ return 1;
+ int res = 0;
+ for (int i = 0; i < l1; i++) {
+ res = (data1[s1 + i] - data2[s2 + i]);
+ if (res != 0)
+ return res;
+ }
+ return 0;
+ }
+}
http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/crawl/SignatureFactory.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/crawl/SignatureFactory.java b/nutch-core/src/main/java/org/apache/nutch/crawl/SignatureFactory.java
new file mode 100644
index 0000000..16d8cc0
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/crawl/SignatureFactory.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.crawl;
+
+// Commons Logging imports
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+// Hadoop imports
+import org.apache.hadoop.conf.Configuration;
+import org.apache.nutch.util.ObjectCache;
+
+/**
+ * Factory class, which instantiates a Signature implementation according to the
+ * current Configuration configuration. This newly created instance is cached in
+ * the Configuration instance, so that it could be later retrieved.
+ *
+ * @author Andrzej Bialecki <ab@getopt.org>
+ */
+public class SignatureFactory {
+ private static final Logger LOG = LoggerFactory
+ .getLogger(SignatureFactory.class);
+
+ private SignatureFactory() {
+ } // no public ctor
+
+ /** Return the default Signature implementation. */
+ public synchronized static Signature getSignature(Configuration conf) {
+ String clazz = conf.get("db.signature.class", MD5Signature.class.getName());
+ ObjectCache objectCache = ObjectCache.get(conf);
+ Signature impl = (Signature) objectCache.getObject(clazz);
+ if (impl == null) {
+ try {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Using Signature impl: " + clazz);
+ }
+ Class<?> implClass = Class.forName(clazz);
+ impl = (Signature) implClass.newInstance();
+ impl.setConf(conf);
+ objectCache.setObject(clazz, impl);
+ } catch (Exception e) {
+ throw new RuntimeException("Couldn't create " + clazz, e);
+ }
+ }
+ return impl;
+ }
+}
http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/crawl/TextMD5Signature.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/crawl/TextMD5Signature.java b/nutch-core/src/main/java/org/apache/nutch/crawl/TextMD5Signature.java
new file mode 100644
index 0000000..b88cfa6
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/crawl/TextMD5Signature.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.crawl;
+
+import org.apache.hadoop.io.MD5Hash;
+import org.apache.nutch.parse.Parse;
+import org.apache.nutch.protocol.Content;
+
+/**
+ * Implementation of a page signature. It calculates an MD5 hash of the textual
+ * content of a page. In case there is no content, it calculates a hash from the
+ * page's URL.
+ */
+public class TextMD5Signature extends Signature {
+
+ Signature fallback = new MD5Signature();
+
+ public byte[] calculate(Content content, Parse parse) {
+ String text = parse.getText();
+
+ if (text == null || text.length() == 0) {
+ return fallback.calculate(content, parse);
+ }
+
+ return MD5Hash.digest(text).getDigest();
+ }
+}
http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/crawl/TextProfileSignature.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/crawl/TextProfileSignature.java b/nutch-core/src/main/java/org/apache/nutch/crawl/TextProfileSignature.java
new file mode 100644
index 0000000..5d930f9
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/crawl/TextProfileSignature.java
@@ -0,0 +1,199 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.crawl;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+
+import org.apache.hadoop.io.MD5Hash;
+import org.apache.nutch.parse.Parse;
+import org.apache.nutch.parse.ParseImpl;
+import org.apache.nutch.protocol.Content;
+import org.apache.nutch.util.StringUtil;
+import org.apache.nutch.util.NutchConfiguration;
+
+/**
+ * <p>
+ * An implementation of a page signature. It calculates an MD5 hash of a plain
+ * text "profile" of a page. In case there is no text, it calculates a hash
+ * using the {@link MD5Signature}.
+ * </p>
+ * <p>
+ * The algorithm to calculate a page "profile" takes the plain text version of a
+ * page and performs the following steps:
+ * <ul>
+ * <li>remove all characters except letters and digits, and bring all characters
+ * to lower case,</li>
+ * <li>split the text into tokens (all consecutive non-whitespace characters),</li>
+ * <li>discard tokens equal or shorter than MIN_TOKEN_LEN (default 2
+ * characters),</li>
+ * <li>sort the list of tokens by decreasing frequency,</li>
+ * <li>round down the counts of tokens to the nearest multiple of QUANT (
+ * <code>QUANT = QUANT_RATE * maxFreq</code>, where <code>QUANT_RATE</code> is
+ * 0.01f by default, and <code>maxFreq</code> is the maximum token frequency).
+ * If <code>maxFreq</code> is higher than 1, then QUANT is always higher than 2
+ * (which means that tokens with frequency 1 are always discarded).</li>
+ * <li>tokens, which frequency after quantization falls below QUANT, are
+ * discarded.</li>
+ * <li>create a list of tokens and their quantized frequency, separated by
+ * spaces, in the order of decreasing frequency.</li>
+ * </ul>
+ * This list is then submitted to an MD5 hash calculation.
+ *
+ * @author Andrzej Bialecki <ab@getopt.org>
+ */
+public class TextProfileSignature extends Signature {
+
+ Signature fallback = new MD5Signature();
+
+ public byte[] calculate(Content content, Parse parse) {
+ int MIN_TOKEN_LEN = getConf().getInt(
+ "db.signature.text_profile.min_token_len", 2);
+ float QUANT_RATE = getConf().getFloat(
+ "db.signature.text_profile.quant_rate", 0.01f);
+ HashMap<String, Token> tokens = new HashMap<String, Token>();
+ String text = null;
+ if (parse != null)
+ text = parse.getText();
+ if (text == null || text.length() == 0)
+ return fallback.calculate(content, parse);
+ StringBuffer curToken = new StringBuffer();
+ int maxFreq = 0;
+ for (int i = 0; i < text.length(); i++) {
+ char c = text.charAt(i);
+ if (Character.isLetterOrDigit(c)) {
+ curToken.append(Character.toLowerCase(c));
+ } else {
+ if (curToken.length() > 0) {
+ if (curToken.length() > MIN_TOKEN_LEN) {
+ // add it
+ String s = curToken.toString();
+ Token tok = tokens.get(s);
+ if (tok == null) {
+ tok = new Token(0, s);
+ tokens.put(s, tok);
+ }
+ tok.cnt++;
+ if (tok.cnt > maxFreq)
+ maxFreq = tok.cnt;
+ }
+ curToken.setLength(0);
+ }
+ }
+ }
+ // check the last token
+ if (curToken.length() > MIN_TOKEN_LEN) {
+ // add it
+ String s = curToken.toString();
+ Token tok = tokens.get(s);
+ if (tok == null) {
+ tok = new Token(0, s);
+ tokens.put(s, tok);
+ }
+ tok.cnt++;
+ if (tok.cnt > maxFreq)
+ maxFreq = tok.cnt;
+ }
+ Iterator<Token> it = tokens.values().iterator();
+ ArrayList<Token> profile = new ArrayList<Token>();
+ // calculate the QUANT value
+ int QUANT = Math.round(maxFreq * QUANT_RATE);
+ if (QUANT < 2) {
+ if (maxFreq > 1)
+ QUANT = 2;
+ else
+ QUANT = 1;
+ }
+ while (it.hasNext()) {
+ Token t = it.next();
+ // round down to the nearest QUANT
+ t.cnt = (t.cnt / QUANT) * QUANT;
+ // discard the frequencies below the QUANT
+ if (t.cnt < QUANT) {
+ continue;
+ }
+ profile.add(t);
+ }
+ Collections.sort(profile, new TokenComparator());
+ StringBuffer newText = new StringBuffer();
+ it = profile.iterator();
+ while (it.hasNext()) {
+ Token t = it.next();
+ if (newText.length() > 0)
+ newText.append("\n");
+ newText.append(t.toString());
+ }
+ return MD5Hash.digest(newText.toString()).getDigest();
+ }
+
+ private static class Token {
+ public int cnt;
+ public String val;
+
+ public Token(int cnt, String val) {
+ this.cnt = cnt;
+ this.val = val;
+ }
+
+ public String toString() {
+ return val + " " + cnt;
+ }
+ }
+
+ private static class TokenComparator implements Comparator<Token> {
+ public int compare(Token t1, Token t2) {
+ return t2.cnt - t1.cnt;
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ TextProfileSignature sig = new TextProfileSignature();
+ sig.setConf(NutchConfiguration.create());
+ HashMap<String, byte[]> res = new HashMap<String, byte[]>();
+ File[] files = new File(args[0]).listFiles();
+ for (int i = 0; i < files.length; i++) {
+ FileInputStream fis = new FileInputStream(files[i]);
+ BufferedReader br = new BufferedReader(
+ new InputStreamReader(fis, "UTF-8"));
+ StringBuffer text = new StringBuffer();
+ String line = null;
+ while ((line = br.readLine()) != null) {
+ if (text.length() > 0)
+ text.append("\n");
+ text.append(line);
+ }
+ br.close();
+ byte[] signature = sig.calculate(null, new ParseImpl(text.toString(),
+ null));
+ res.put(files[i].toString(), signature);
+ }
+ Iterator<String> it = res.keySet().iterator();
+ while (it.hasNext()) {
+ String name = it.next();
+ byte[] signature = res.get(name);
+ System.out.println(name + "\t" + StringUtil.toHexString(signature));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/crawl/URLPartitioner.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/crawl/URLPartitioner.java b/nutch-core/src/main/java/org/apache/nutch/crawl/URLPartitioner.java
new file mode 100644
index 0000000..4675f83
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/crawl/URLPartitioner.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.crawl;
+
+import java.net.InetAddress;
+import java.net.URL;
+import java.net.MalformedURLException;
+import java.net.UnknownHostException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.mapred.*;
+import org.apache.nutch.net.URLNormalizers;
+import org.apache.nutch.util.URLUtil;
+
+/**
+ * Partition urls by host, domain name or IP depending on the value of the
+ * parameter 'partition.url.mode' which can be 'byHost', 'byDomain' or 'byIP'
+ */
+public class URLPartitioner implements Partitioner<Text, Writable> {
+ private static final Logger LOG = LoggerFactory
+ .getLogger(URLPartitioner.class);
+
+ public static final String PARTITION_MODE_KEY = "partition.url.mode";
+
+ public static final String PARTITION_MODE_HOST = "byHost";
+ public static final String PARTITION_MODE_DOMAIN = "byDomain";
+ public static final String PARTITION_MODE_IP = "byIP";
+
+ private int seed;
+ private URLNormalizers normalizers;
+ private String mode = PARTITION_MODE_HOST;
+
+ public void configure(JobConf job) {
+ seed = job.getInt("partition.url.seed", 0);
+ mode = job.get(PARTITION_MODE_KEY, PARTITION_MODE_HOST);
+ // check that the mode is known
+ if (!mode.equals(PARTITION_MODE_IP) && !mode.equals(PARTITION_MODE_DOMAIN)
+ && !mode.equals(PARTITION_MODE_HOST)) {
+ LOG.error("Unknown partition mode : " + mode + " - forcing to byHost");
+ mode = PARTITION_MODE_HOST;
+ }
+ normalizers = new URLNormalizers(job, URLNormalizers.SCOPE_PARTITION);
+ }
+
+ public void close() {
+ }
+
+ /** Hash by domain name. */
+ public int getPartition(Text key, Writable value, int numReduceTasks) {
+ String urlString = key.toString();
+ URL url = null;
+ int hashCode = urlString.hashCode();
+ try {
+ urlString = normalizers.normalize(urlString,
+ URLNormalizers.SCOPE_PARTITION);
+ url = new URL(urlString);
+ hashCode = url.getHost().hashCode();
+ } catch (MalformedURLException e) {
+ LOG.warn("Malformed URL: '" + urlString + "'");
+ }
+
+ if (mode.equals(PARTITION_MODE_DOMAIN) && url != null)
+ hashCode = URLUtil.getDomainName(url).hashCode();
+ else if (mode.equals(PARTITION_MODE_IP)) {
+ try {
+ InetAddress address = InetAddress.getByName(url.getHost());
+ hashCode = address.getHostAddress().hashCode();
+ } catch (UnknownHostException e) {
+ Generator.LOG.info("Couldn't find IP for host: " + url.getHost());
+ }
+ }
+
+ // make hosts wind up in different partitions on different runs
+ hashCode ^= seed;
+
+ return (hashCode & Integer.MAX_VALUE) % numReduceTasks;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/crawl/package.html
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/crawl/package.html b/nutch-core/src/main/java/org/apache/nutch/crawl/package.html
new file mode 100644
index 0000000..05eeb50
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/crawl/package.html
@@ -0,0 +1,5 @@
+<html>
+<body>
+Crawl control code and tools to run the crawler.
+</body>
+</html>
http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/fetcher/FetchItem.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/fetcher/FetchItem.java b/nutch-core/src/main/java/org/apache/nutch/fetcher/FetchItem.java
new file mode 100644
index 0000000..3ad4970
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/fetcher/FetchItem.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.fetcher;
+
+import java.net.InetAddress;
+import java.net.URL;
+import java.net.UnknownHostException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.nutch.crawl.CrawlDatum;
+import org.apache.nutch.util.URLUtil;
+import org.slf4j.LoggerFactory;
+import org.slf4j.Logger;
+
+/**
+ * This class describes the item to be fetched.
+ */
+public class FetchItem {
+
+ private static final Logger LOG = LoggerFactory.getLogger(FetchItem.class);
+
+ int outlinkDepth = 0;
+ String queueID;
+ Text url;
+ URL u;
+ CrawlDatum datum;
+
+ public FetchItem(Text url, URL u, CrawlDatum datum, String queueID) {
+ this(url, u, datum, queueID, 0);
+ }
+
+ public FetchItem(Text url, URL u, CrawlDatum datum, String queueID,
+ int outlinkDepth) {
+ this.url = url;
+ this.u = u;
+ this.datum = datum;
+ this.queueID = queueID;
+ this.outlinkDepth = outlinkDepth;
+ }
+
+ /**
+ * Create an item. Queue id will be created based on <code>queueMode</code>
+ * argument, either as a protocol + hostname pair, protocol + IP address
+ * pair or protocol+domain pair.
+ */
+ public static FetchItem create(Text url, CrawlDatum datum, String queueMode) {
+ return create(url, datum, queueMode, 0);
+ }
+
+ public static FetchItem create(Text url, CrawlDatum datum,
+ String queueMode, int outlinkDepth) {
+ String queueID;
+ URL u = null;
+ try {
+ u = new URL(url.toString());
+ } catch (Exception e) {
+ LOG.warn("Cannot parse url: " + url, e);
+ return null;
+ }
+ final String proto = u.getProtocol().toLowerCase();
+ String key;
+ if (FetchItemQueues.QUEUE_MODE_IP.equalsIgnoreCase(queueMode)) {
+ try {
+ final InetAddress addr = InetAddress.getByName(u.getHost());
+ key = addr.getHostAddress();
+ } catch (final UnknownHostException e) {
+ // unable to resolve it, so don't fall back to host name
+ LOG.warn("Unable to resolve: " + u.getHost() + ", skipping.");
+ return null;
+ }
+ } else if (FetchItemQueues.QUEUE_MODE_DOMAIN.equalsIgnoreCase(queueMode)) {
+ key = URLUtil.getDomainName(u);
+ if (key == null) {
+ LOG.warn("Unknown domain for url: " + url
+ + ", using URL string as key");
+ key = u.toExternalForm();
+ }
+ } else {
+ key = u.getHost();
+ if (key == null) {
+ LOG.warn("Unknown host for url: " + url + ", using URL string as key");
+ key = u.toExternalForm();
+ }
+ }
+ queueID = proto + "://" + key.toLowerCase();
+ return new FetchItem(url, u, datum, queueID, outlinkDepth);
+ }
+
+ public CrawlDatum getDatum() {
+ return datum;
+ }
+
+ public String getQueueID() {
+ return queueID;
+ }
+
+ public Text getUrl() {
+ return url;
+ }
+
+ public URL getURL2() {
+ return u;
+ }
+}
http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/fetcher/FetchItemQueue.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/fetcher/FetchItemQueue.java b/nutch-core/src/main/java/org/apache/nutch/fetcher/FetchItemQueue.java
new file mode 100644
index 0000000..182c063
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/fetcher/FetchItemQueue.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.fetcher;
+
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.hadoop.conf.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class handles FetchItems which come from the same host ID (be it a
+ * proto/hostname or proto/IP pair). It also keeps track of requests in
+ * progress and elapsed time between requests.
+ */
+public class FetchItemQueue {
+
+ private static final Logger LOG = LoggerFactory.getLogger(FetchItemQueues.class);
+
+ List<FetchItem> queue = Collections
+ .synchronizedList(new LinkedList<FetchItem>());
+ AtomicInteger inProgress = new AtomicInteger();
+ AtomicLong nextFetchTime = new AtomicLong();
+ AtomicInteger exceptionCounter = new AtomicInteger();
+ long crawlDelay;
+ long minCrawlDelay;
+ int maxThreads;
+ Configuration conf;
+
+ public FetchItemQueue(Configuration conf, int maxThreads, long crawlDelay,
+ long minCrawlDelay) {
+ this.conf = conf;
+ this.maxThreads = maxThreads;
+ this.crawlDelay = crawlDelay;
+ this.minCrawlDelay = minCrawlDelay;
+ // ready to start
+ setEndTime(System.currentTimeMillis() - crawlDelay);
+ }
+
+ public synchronized int emptyQueue() {
+ int presize = queue.size();
+ queue.clear();
+ return presize;
+ }
+
+ public int getQueueSize() {
+ return queue.size();
+ }
+
+ public int getInProgressSize() {
+ return inProgress.get();
+ }
+
+ public int incrementExceptionCounter() {
+ return exceptionCounter.incrementAndGet();
+ }
+
+ public void finishFetchItem(FetchItem it, boolean asap) {
+ if (it != null) {
+ inProgress.decrementAndGet();
+ setEndTime(System.currentTimeMillis(), asap);
+ }
+ }
+
+ public void addFetchItem(FetchItem it) {
+ if (it == null)
+ return;
+ queue.add(it);
+ }
+
+ public void addInProgressFetchItem(FetchItem it) {
+ if (it == null)
+ return;
+ inProgress.incrementAndGet();
+ }
+
+ public FetchItem getFetchItem() {
+ if (inProgress.get() >= maxThreads)
+ return null;
+ long now = System.currentTimeMillis();
+ if (nextFetchTime.get() > now)
+ return null;
+ FetchItem it = null;
+ if (queue.size() == 0)
+ return null;
+ try {
+ it = queue.remove(0);
+ inProgress.incrementAndGet();
+ } catch (Exception e) {
+ LOG.error(
+ "Cannot remove FetchItem from queue or cannot add it to inProgress queue",
+ e);
+ }
+ return it;
+ }
+
+ public synchronized void dump() {
+ LOG.info(" maxThreads = " + maxThreads);
+ LOG.info(" inProgress = " + inProgress.get());
+ LOG.info(" crawlDelay = " + crawlDelay);
+ LOG.info(" minCrawlDelay = " + minCrawlDelay);
+ LOG.info(" nextFetchTime = " + nextFetchTime.get());
+ LOG.info(" now = " + System.currentTimeMillis());
+ for (int i = 0; i < queue.size(); i++) {
+ FetchItem it = queue.get(i);
+ LOG.info(" " + i + ". " + it.url);
+ }
+ }
+
+ private void setEndTime(long endTime) {
+ setEndTime(endTime, false);
+ }
+
+ private void setEndTime(long endTime, boolean asap) {
+ if (!asap)
+ nextFetchTime.set(endTime
+ + (maxThreads > 1 ? minCrawlDelay : crawlDelay));
+ else
+ nextFetchTime.set(endTime);
+ }
+}
http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/fetcher/FetchItemQueues.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/fetcher/FetchItemQueues.java b/nutch-core/src/main/java/org/apache/nutch/fetcher/FetchItemQueues.java
new file mode 100644
index 0000000..4473ff0
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/fetcher/FetchItemQueues.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.fetcher;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.nutch.crawl.CrawlDatum;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Convenience class - a collection of queues that keeps track of the total
+ * number of items, and provides items eligible for fetching from any queue.
+ */
+public class FetchItemQueues {
+
+ private static final Logger LOG = LoggerFactory.getLogger(FetchItemQueues.class);
+
+ public static final String DEFAULT_ID = "default";
+ Map<String, FetchItemQueue> queues = new HashMap<String, FetchItemQueue>();
+ AtomicInteger totalSize = new AtomicInteger(0);
+ int maxThreads;
+ long crawlDelay;
+ long minCrawlDelay;
+ long timelimit = -1;
+ int maxExceptionsPerQueue = -1;
+ Configuration conf;
+
+ public static final String QUEUE_MODE_HOST = "byHost";
+ public static final String QUEUE_MODE_DOMAIN = "byDomain";
+ public static final String QUEUE_MODE_IP = "byIP";
+
+ String queueMode;
+
+ public FetchItemQueues(Configuration conf) {
+ this.conf = conf;
+ this.maxThreads = conf.getInt("fetcher.threads.per.queue", 1);
+ queueMode = conf.get("fetcher.queue.mode", QUEUE_MODE_HOST);
+ // check that the mode is known
+ if (!queueMode.equals(QUEUE_MODE_IP)
+ && !queueMode.equals(QUEUE_MODE_DOMAIN)
+ && !queueMode.equals(QUEUE_MODE_HOST)) {
+ LOG.error("Unknown partition mode : " + queueMode
+ + " - forcing to byHost");
+ queueMode = QUEUE_MODE_HOST;
+ }
+ LOG.info("Using queue mode : " + queueMode);
+
+ this.crawlDelay = (long) (conf.getFloat("fetcher.server.delay", 1.0f) * 1000);
+ this.minCrawlDelay = (long) (conf.getFloat("fetcher.server.min.delay",
+ 0.0f) * 1000);
+ this.timelimit = conf.getLong("fetcher.timelimit", -1);
+ this.maxExceptionsPerQueue = conf.getInt(
+ "fetcher.max.exceptions.per.queue", -1);
+ }
+
+ public int getTotalSize() {
+ return totalSize.get();
+ }
+
+ public int getQueueCount() {
+ return queues.size();
+ }
+
+ public void addFetchItem(Text url, CrawlDatum datum) {
+ FetchItem it = FetchItem.create(url, datum, queueMode);
+ if (it != null)
+ addFetchItem(it);
+ }
+
+ public synchronized void addFetchItem(FetchItem it) {
+ FetchItemQueue fiq = getFetchItemQueue(it.queueID);
+ fiq.addFetchItem(it);
+ totalSize.incrementAndGet();
+ }
+
+ public void finishFetchItem(FetchItem it) {
+ finishFetchItem(it, false);
+ }
+
+ public void finishFetchItem(FetchItem it, boolean asap) {
+ FetchItemQueue fiq = queues.get(it.queueID);
+ if (fiq == null) {
+ LOG.warn("Attempting to finish item from unknown queue: " + it);
+ return;
+ }
+ fiq.finishFetchItem(it, asap);
+ }
+
+ public synchronized FetchItemQueue getFetchItemQueue(String id) {
+ FetchItemQueue fiq = queues.get(id);
+ if (fiq == null) {
+ // initialize queue
+ fiq = new FetchItemQueue(conf, maxThreads, crawlDelay, minCrawlDelay);
+ queues.put(id, fiq);
+ }
+ return fiq;
+ }
+
+ public synchronized FetchItem getFetchItem() {
+ Iterator<Map.Entry<String, FetchItemQueue>> it = queues.entrySet()
+ .iterator();
+ while (it.hasNext()) {
+ FetchItemQueue fiq = it.next().getValue();
+ // reap empty queues
+ if (fiq.getQueueSize() == 0 && fiq.getInProgressSize() == 0) {
+ it.remove();
+ continue;
+ }
+ FetchItem fit = fiq.getFetchItem();
+ if (fit != null) {
+ totalSize.decrementAndGet();
+ return fit;
+ }
+ }
+ return null;
+ }
+
+ // called only once the feeder has stopped
+ public synchronized int checkTimelimit() {
+ int count = 0;
+
+ if (System.currentTimeMillis() >= timelimit && timelimit != -1) {
+ // emptying the queues
+ count = emptyQueues();
+
+ // there might also be a case where totalsize !=0 but number of queues
+ // == 0
+ // in which case we simply force it to 0 to avoid blocking
+ if (totalSize.get() != 0 && queues.size() == 0)
+ totalSize.set(0);
+ }
+ return count;
+ }
+
+ // empties the queues (used by timebomb and throughput threshold)
+ public synchronized int emptyQueues() {
+ int count = 0;
+
+ for (String id : queues.keySet()) {
+ FetchItemQueue fiq = queues.get(id);
+ if (fiq.getQueueSize() == 0)
+ continue;
+ LOG.info("* queue: " + id + " >> dropping! ");
+ int deleted = fiq.emptyQueue();
+ for (int i = 0; i < deleted; i++) {
+ totalSize.decrementAndGet();
+ }
+ count += deleted;
+ }
+
+ return count;
+ }
+
+ /**
+ * Increment the exception counter of a queue in case of an exception e.g.
+ * timeout; when higher than a given threshold simply empty the queue.
+ *
+ * @param queueid
+ * @return number of purged items
+ */
+ public synchronized int checkExceptionThreshold(String queueid) {
+ FetchItemQueue fiq = queues.get(queueid);
+ if (fiq == null) {
+ return 0;
+ }
+ if (fiq.getQueueSize() == 0) {
+ return 0;
+ }
+ int excCount = fiq.incrementExceptionCounter();
+ if (maxExceptionsPerQueue != -1 && excCount >= maxExceptionsPerQueue) {
+ // too many exceptions for items in this queue - purge it
+ int deleted = fiq.emptyQueue();
+ LOG.info("* queue: " + queueid + " >> removed " + deleted
+ + " URLs from queue because " + excCount + " exceptions occurred");
+ for (int i = 0; i < deleted; i++) {
+ totalSize.decrementAndGet();
+ }
+ return deleted;
+ }
+ return 0;
+ }
+
+ public synchronized void dump() {
+ for (String id : queues.keySet()) {
+ FetchItemQueue fiq = queues.get(id);
+ if (fiq.getQueueSize() == 0)
+ continue;
+ LOG.info("* queue: " + id);
+ fiq.dump();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/fetcher/FetchNode.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/fetcher/FetchNode.java b/nutch-core/src/main/java/org/apache/nutch/fetcher/FetchNode.java
new file mode 100644
index 0000000..892c90f
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/fetcher/FetchNode.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.fetcher;
+
+import org.apache.hadoop.io.Text;
+import org.apache.nutch.parse.Outlink;
+
+public class FetchNode {
+ private Text url = null;
+ private Outlink[] outlinks;
+ private int status = 0;
+ private String title = null;
+ private long fetchTime = 0;
+
+ public Text getUrl() {
+ return url;
+ }
+ public void setUrl(Text url) {
+ this.url = url;
+ }
+ public Outlink[] getOutlinks() {
+ return outlinks;
+ }
+ public void setOutlinks(Outlink[] links) {
+ this.outlinks = links;
+ }
+ public int getStatus() {
+ return status;
+ }
+ public void setStatus(int status) {
+ this.status = status;
+ }
+ public String getTitle() {
+ return title;
+ }
+ public void setTitle(String title) {
+ this.title = title;
+ }
+ public long getFetchTime() {
+ return fetchTime;
+ }
+ public void setFetchTime(long fetchTime) {
+ this.fetchTime = fetchTime;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/fetcher/FetchNodeDb.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/fetcher/FetchNodeDb.java b/nutch-core/src/main/java/org/apache/nutch/fetcher/FetchNodeDb.java
new file mode 100644
index 0000000..2e69f31
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/fetcher/FetchNodeDb.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.fetcher;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+
+public class FetchNodeDb {
+
+ private Map<Integer, FetchNode> fetchNodeDbMap;
+ private int index;
+ private static FetchNodeDb fetchNodeDbInstance = null;
+
+ public FetchNodeDb(){
+ fetchNodeDbMap = new ConcurrentHashMap<Integer, FetchNode>();
+ index = 1;
+ }
+
+ public static FetchNodeDb getInstance(){
+
+ if(fetchNodeDbInstance == null){
+ fetchNodeDbInstance = new FetchNodeDb();
+ }
+ return fetchNodeDbInstance;
+ }
+
+ public void put(String url, FetchNode fetchNode){
+ System.out.println("FetchNodeDb : putting node - " + fetchNode.hashCode());
+ fetchNodeDbMap.put(index++, fetchNode);
+ }
+ public Map<Integer, FetchNode> getFetchNodeDb(){
+ return fetchNodeDbMap;
+ }
+}
\ No newline at end of file