You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by th...@apache.org on 2016/07/16 19:49:02 UTC
[46/51] [partial] nutch git commit: NUTCH-2292 : Mavenize the build
for nutch-core and nutch-plugins
http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/fetcher/Fetcher.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/fetcher/Fetcher.java b/nutch-core/src/main/java/org/apache/nutch/fetcher/Fetcher.java
new file mode 100644
index 0000000..aad9ee9
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/fetcher/Fetcher.java
@@ -0,0 +1,600 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.fetcher;
+
+import java.io.File;
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.conf.*;
+import org.apache.hadoop.mapred.*;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.nutch.crawl.CrawlDatum;
+import org.apache.nutch.crawl.NutchWritable;
+import org.apache.nutch.metadata.Nutch;
+import org.apache.nutch.protocol.*;
+import org.apache.nutch.util.*;
+
+/**
+ * A queue-based fetcher.
+ *
+ * <p>
+ * This fetcher uses a well-known model of one producer (a QueueFeeder) and many
+ * consumers (FetcherThread-s).
+ *
+ * <p>
+ * QueueFeeder reads input fetchlists and populates a set of FetchItemQueue-s,
+ * which hold FetchItem-s that describe the items to be fetched. There are as
+ * many queues as there are unique hosts, but at any given time the total number
+ * of fetch items in all queues is less than a fixed number (currently set to a
+ * multiple of the number of threads).
+ *
+ * <p>
+ * As items are consumed from the queues, the QueueFeeder continues to add new
+ * input items, so that their total count stays fixed (FetcherThread-s may also
+ * add new items to the queues e.g. as a results of redirection) - until all
+ * input items are exhausted, at which point the number of items in the queues
+ * begins to decrease. When this number reaches 0 fetcher will finish.
+ *
+ * <p>
+ * This fetcher implementation handles per-host blocking itself, instead of
+ * delegating this work to protocol-specific plugins. Each per-host queue
+ * handles its own "politeness" settings, such as the maximum number of
+ * concurrent requests and crawl delay between consecutive requests - and also a
+ * list of requests in progress, and the time the last request was finished. As
+ * FetcherThread-s ask for new items to be fetched, queues may return eligible
+ * items or null if for "politeness" reasons this host's queue is not yet ready.
+ *
+ * <p>
+ * If there are still unfetched items in the queues, but none of the items are
+ * ready, FetcherThread-s will spin-wait until either some items become
+ * available, or a timeout is reached (at which point the Fetcher will abort,
+ * assuming the task is hung).
+ *
+ * @author Andrzej Bialecki
+ */
+public class Fetcher extends NutchTool implements Tool,
+MapRunnable<Text, CrawlDatum, Text, NutchWritable> {
+
+ public static final int PERM_REFRESH_TIME = 5;
+
+ public static final String CONTENT_REDIR = "content";
+
+ public static final String PROTOCOL_REDIR = "protocol";
+
+ public static final Logger LOG = LoggerFactory.getLogger(Fetcher.class);
+
+ public static class InputFormat extends
+ SequenceFileInputFormat<Text, CrawlDatum> {
+ /** Don't split inputs, to keep things polite. */
+ public InputSplit[] getSplits(JobConf job, int nSplits) throws IOException {
+ FileStatus[] files = listStatus(job);
+ FileSplit[] splits = new FileSplit[files.length];
+ for (int i = 0; i < files.length; i++) {
+ FileStatus cur = files[i];
+ splits[i] = new FileSplit(cur.getPath(), 0, cur.getLen(),
+ (String[]) null);
+ }
+ return splits;
+ }
+ }
+
+ @SuppressWarnings("unused")
+ private OutputCollector<Text, NutchWritable> output;
+ private Reporter reporter;
+
+ private String segmentName;
+ private AtomicInteger activeThreads = new AtomicInteger(0);
+ private AtomicInteger spinWaiting = new AtomicInteger(0);
+
+ private long start = System.currentTimeMillis(); // start time of fetcher run
+ private AtomicLong lastRequestStart = new AtomicLong(start);
+
+ private AtomicLong bytes = new AtomicLong(0); // total bytes fetched
+ private AtomicInteger pages = new AtomicInteger(0); // total pages fetched
+ private AtomicInteger errors = new AtomicInteger(0); // total pages errored
+
+ private boolean storingContent;
+ private boolean parsing;
+ FetchItemQueues fetchQueues;
+ QueueFeeder feeder;
+
+ LinkedList<FetcherThread> fetcherThreads = new LinkedList<FetcherThread>();
+
+ public Fetcher() {
+ super(null);
+ }
+
+ public Fetcher(Configuration conf) {
+ super(conf);
+ }
+
+ private void reportStatus(int pagesLastSec, int bytesLastSec)
+ throws IOException {
+ StringBuilder status = new StringBuilder();
+ Long elapsed = new Long((System.currentTimeMillis() - start) / 1000);
+
+ float avgPagesSec = (float) pages.get() / elapsed.floatValue();
+ long avgBytesSec = (bytes.get() / 128l) / elapsed.longValue();
+
+ status.append(activeThreads).append(" threads (").append(spinWaiting.get())
+ .append(" waiting), ");
+ status.append(fetchQueues.getQueueCount()).append(" queues, ");
+ status.append(fetchQueues.getTotalSize()).append(" URLs queued, ");
+ status.append(pages).append(" pages, ").append(errors).append(" errors, ");
+ status.append(String.format("%.2f", avgPagesSec)).append(" pages/s (");
+ status.append(pagesLastSec).append(" last sec), ");
+ status.append(avgBytesSec).append(" kbits/s (")
+ .append((bytesLastSec / 128)).append(" last sec)");
+
+ reporter.setStatus(status.toString());
+ }
+
+ public void configure(JobConf job) {
+ setConf(job);
+
+ this.segmentName = job.get(Nutch.SEGMENT_NAME_KEY);
+ this.storingContent = isStoringContent(job);
+ this.parsing = isParsing(job);
+
+ // if (job.getBoolean("fetcher.verbose", false)) {
+ // LOG.setLevel(Level.FINE);
+ // }
+ }
+
+ public void close() {
+ }
+
+ public static boolean isParsing(Configuration conf) {
+ return conf.getBoolean("fetcher.parse", true);
+ }
+
+ public static boolean isStoringContent(Configuration conf) {
+ return conf.getBoolean("fetcher.store.content", true);
+ }
+
+ public void run(RecordReader<Text, CrawlDatum> input,
+ OutputCollector<Text, NutchWritable> output, Reporter reporter)
+ throws IOException {
+
+ this.output = output;
+ this.reporter = reporter;
+ this.fetchQueues = new FetchItemQueues(getConf());
+
+ int threadCount = getConf().getInt("fetcher.threads.fetch", 10);
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Fetcher: threads: {}", threadCount);
+ }
+
+ int timeoutDivisor = getConf().getInt("fetcher.threads.timeout.divisor", 2);
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Fetcher: time-out divisor: {}", timeoutDivisor);
+ }
+
+ int queueDepthMuliplier = getConf().getInt(
+ "fetcher.queue.depth.multiplier", 50);
+
+ feeder = new QueueFeeder(input, fetchQueues, threadCount
+ * queueDepthMuliplier);
+ // feeder.setPriority((Thread.MAX_PRIORITY + Thread.NORM_PRIORITY) / 2);
+
+ // the value of the time limit is either -1 or the time where it should
+ // finish
+ long timelimit = getConf().getLong("fetcher.timelimit", -1);
+ if (timelimit != -1)
+ feeder.setTimeLimit(timelimit);
+ feeder.start();
+
+ // set non-blocking & no-robots mode for HTTP protocol plugins.
+ getConf().setBoolean(Protocol.CHECK_BLOCKING, false);
+ getConf().setBoolean(Protocol.CHECK_ROBOTS, false);
+
+ for (int i = 0; i < threadCount; i++) { // spawn threads
+ FetcherThread t = new FetcherThread(getConf(), getActiveThreads(), fetchQueues,
+ feeder, spinWaiting, lastRequestStart, reporter, errors, segmentName,
+ parsing, output, storingContent, pages, bytes);
+ fetcherThreads.add(t);
+ t.start();
+ }
+
+ // select a timeout that avoids a task timeout
+ long timeout = getConf().getInt("mapred.task.timeout", 10 * 60 * 1000)
+ / timeoutDivisor;
+
+ // Used for threshold check, holds pages and bytes processed in the last
+ // second
+ int pagesLastSec;
+ int bytesLastSec;
+
+ int throughputThresholdNumRetries = 0;
+
+ int throughputThresholdPages = getConf().getInt(
+ "fetcher.throughput.threshold.pages", -1);
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Fetcher: throughput threshold: {}", throughputThresholdPages);
+ }
+ int throughputThresholdMaxRetries = getConf().getInt(
+ "fetcher.throughput.threshold.retries", 5);
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Fetcher: throughput threshold retries: {}",
+ throughputThresholdMaxRetries);
+ }
+ long throughputThresholdTimeLimit = getConf().getLong(
+ "fetcher.throughput.threshold.check.after", -1);
+
+ int targetBandwidth = getConf().getInt("fetcher.bandwidth.target", -1) * 1000;
+ int maxNumThreads = getConf().getInt("fetcher.maxNum.threads", threadCount);
+ if (maxNumThreads < threadCount) {
+ LOG.info("fetcher.maxNum.threads can't be < than {} : using {} instead",
+ threadCount, threadCount);
+ maxNumThreads = threadCount;
+ }
+ int bandwidthTargetCheckEveryNSecs = getConf().getInt(
+ "fetcher.bandwidth.target.check.everyNSecs", 30);
+ if (bandwidthTargetCheckEveryNSecs < 1) {
+ LOG.info("fetcher.bandwidth.target.check.everyNSecs can't be < to 1 : using 1 instead");
+ bandwidthTargetCheckEveryNSecs = 1;
+ }
+
+ int maxThreadsPerQueue = getConf().getInt("fetcher.threads.per.queue", 1);
+
+ int bandwidthTargetCheckCounter = 0;
+ long bytesAtLastBWTCheck = 0l;
+
+ do { // wait for threads to exit
+ pagesLastSec = pages.get();
+ bytesLastSec = (int) bytes.get();
+
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ }
+
+ pagesLastSec = pages.get() - pagesLastSec;
+ bytesLastSec = (int) bytes.get() - bytesLastSec;
+
+ reporter.incrCounter("FetcherStatus", "bytes_downloaded", bytesLastSec);
+
+ reportStatus(pagesLastSec, bytesLastSec);
+
+ LOG.info("-activeThreads=" + activeThreads + ", spinWaiting="
+ + spinWaiting.get() + ", fetchQueues.totalSize="
+ + fetchQueues.getTotalSize() + ", fetchQueues.getQueueCount="
+ + fetchQueues.getQueueCount());
+
+ if (!feeder.isAlive() && fetchQueues.getTotalSize() < 5) {
+ fetchQueues.dump();
+ }
+
+ // if throughput threshold is enabled
+ if (throughputThresholdTimeLimit < System.currentTimeMillis()
+ && throughputThresholdPages != -1) {
+ // Check if we're dropping below the threshold
+ if (pagesLastSec < throughputThresholdPages) {
+ throughputThresholdNumRetries++;
+ LOG.warn("{}: dropping below configured threshold of {} pages per second",
+ Integer.toString(throughputThresholdNumRetries), Integer.toString(throughputThresholdPages));
+
+ // Quit if we dropped below threshold too many times
+ if (throughputThresholdNumRetries == throughputThresholdMaxRetries) {
+ LOG.warn("Dropped below threshold too many times, killing!");
+
+ // Disable the threshold checker
+ throughputThresholdPages = -1;
+
+ // Empty the queues cleanly and get number of items that were
+ // dropped
+ int hitByThrougputThreshold = fetchQueues.emptyQueues();
+
+ if (hitByThrougputThreshold != 0)
+ reporter.incrCounter("FetcherStatus", "hitByThrougputThreshold",
+ hitByThrougputThreshold);
+ }
+ }
+ }
+
+ // adjust the number of threads if a target bandwidth has been set
+ if (targetBandwidth > 0) {
+ if (bandwidthTargetCheckCounter < bandwidthTargetCheckEveryNSecs)
+ bandwidthTargetCheckCounter++;
+ else if (bandwidthTargetCheckCounter == bandwidthTargetCheckEveryNSecs) {
+ long bpsSinceLastCheck = ((bytes.get() - bytesAtLastBWTCheck) * 8)
+ / bandwidthTargetCheckEveryNSecs;
+
+ bytesAtLastBWTCheck = bytes.get();
+ bandwidthTargetCheckCounter = 0;
+
+ int averageBdwPerThread = 0;
+ if (activeThreads.get() > 0)
+ averageBdwPerThread = Math.round(bpsSinceLastCheck
+ / activeThreads.get());
+
+ LOG.info("averageBdwPerThread : {} kbps", (averageBdwPerThread / 1000));
+
+ if (bpsSinceLastCheck < targetBandwidth && averageBdwPerThread > 0) {
+ // check whether it is worth doing e.g. more queues than threads
+
+ if ((fetchQueues.getQueueCount() * maxThreadsPerQueue) > activeThreads
+ .get()) {
+
+ long remainingBdw = targetBandwidth - bpsSinceLastCheck;
+ int additionalThreads = Math.round(remainingBdw
+ / averageBdwPerThread);
+ int availableThreads = maxNumThreads - activeThreads.get();
+
+ // determine the number of available threads (min between
+ // availableThreads and additionalThreads)
+ additionalThreads = (availableThreads < additionalThreads ? availableThreads
+ : additionalThreads);
+ LOG.info("Has space for more threads ({} vs {} kbps) \t=> adding {} new threads",
+ (bpsSinceLastCheck / 1000), (targetBandwidth / 1000), additionalThreads);
+ // activate new threads
+ for (int i = 0; i < additionalThreads; i++) {
+ FetcherThread thread = new FetcherThread(getConf(), getActiveThreads(), fetchQueues,
+ feeder, spinWaiting, lastRequestStart, reporter, errors, segmentName, parsing,
+ output, storingContent, pages, bytes);
+ fetcherThreads.add(thread);
+ thread.start();
+ }
+ }
+ } else if (bpsSinceLastCheck > targetBandwidth
+ && averageBdwPerThread > 0) {
+ // if the bandwidth we're using is greater then the expected
+ // bandwidth, we have to stop some threads
+ long excessBdw = bpsSinceLastCheck - targetBandwidth;
+ int excessThreads = Math.round(excessBdw / averageBdwPerThread);
+ LOG.info("Exceeding target bandwidth ({} vs {} kbps). \t=> excessThreads = {}",
+ bpsSinceLastCheck / 1000, (targetBandwidth / 1000), excessThreads);
+ // keep at least one
+ if (excessThreads >= fetcherThreads.size())
+ excessThreads = 0;
+ // de-activates threads
+ for (int i = 0; i < excessThreads; i++) {
+ FetcherThread thread = fetcherThreads.removeLast();
+ thread.setHalted(true);
+ }
+ }
+ }
+ }
+
+ // check timelimit
+ if (!feeder.isAlive()) {
+ int hitByTimeLimit = fetchQueues.checkTimelimit();
+ if (hitByTimeLimit != 0)
+ reporter.incrCounter("FetcherStatus", "hitByTimeLimit",
+ hitByTimeLimit);
+ }
+
+ // some requests seem to hang, despite all intentions
+ if ((System.currentTimeMillis() - lastRequestStart.get()) > timeout) {
+ if (LOG.isWarnEnabled()) {
+ LOG.warn("Aborting with {} hung threads.", activeThreads);
+ for (int i = 0; i < fetcherThreads.size(); i++) {
+ FetcherThread thread = fetcherThreads.get(i);
+ if (thread.isAlive()) {
+ LOG.warn("Thread #{} hung while processing {}", i, thread.getReprUrl());
+ if (LOG.isDebugEnabled()) {
+ StackTraceElement[] stack = thread.getStackTrace();
+ StringBuilder sb = new StringBuilder();
+ sb.append("Stack of thread #").append(i).append(":\n");
+ for (StackTraceElement s : stack) {
+ sb.append(s.toString()).append('\n');
+ }
+ LOG.debug(sb.toString());
+ }
+ }
+ }
+ }
+ return;
+ }
+
+ } while (activeThreads.get() > 0);
+ LOG.info("-activeThreads={}", activeThreads);
+
+ }
+
+ public void fetch(Path segment, int threads) throws IOException {
+
+ checkConfiguration();
+
+ SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ long start = System.currentTimeMillis();
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Fetcher: starting at {}", sdf.format(start));
+ LOG.info("Fetcher: segment: {}", segment);
+ }
+
+ // set the actual time for the timelimit relative
+ // to the beginning of the whole job and not of a specific task
+ // otherwise it keeps trying again if a task fails
+ long timelimit = getConf().getLong("fetcher.timelimit.mins", -1);
+ if (timelimit != -1) {
+ timelimit = System.currentTimeMillis() + (timelimit * 60 * 1000);
+ LOG.info("Fetcher Timelimit set for : {}", timelimit);
+ getConf().setLong("fetcher.timelimit", timelimit);
+ }
+
+ // Set the time limit after which the throughput threshold feature is
+ // enabled
+ timelimit = getConf().getLong("fetcher.throughput.threshold.check.after",
+ 10);
+ timelimit = System.currentTimeMillis() + (timelimit * 60 * 1000);
+ getConf().setLong("fetcher.throughput.threshold.check.after", timelimit);
+
+ int maxOutlinkDepth = getConf().getInt("fetcher.follow.outlinks.depth", -1);
+ if (maxOutlinkDepth > 0) {
+ LOG.info("Fetcher: following outlinks up to depth: {}",
+ Integer.toString(maxOutlinkDepth));
+
+ int maxOutlinkDepthNumLinks = getConf().getInt(
+ "fetcher.follow.outlinks.num.links", 4);
+ int outlinksDepthDivisor = getConf().getInt(
+ "fetcher.follow.outlinks.depth.divisor", 2);
+
+ int totalOutlinksToFollow = 0;
+ for (int i = 0; i < maxOutlinkDepth; i++) {
+ totalOutlinksToFollow += (int) Math.floor(outlinksDepthDivisor
+ / (i + 1) * maxOutlinkDepthNumLinks);
+ }
+
+ LOG.info("Fetcher: maximum outlinks to follow: {}",
+ Integer.toString(totalOutlinksToFollow));
+ }
+
+ JobConf job = new NutchJob(getConf());
+ job.setJobName("fetch " + segment);
+
+ job.setInt("fetcher.threads.fetch", threads);
+ job.set(Nutch.SEGMENT_NAME_KEY, segment.getName());
+
+ // for politeness, don't permit parallel execution of a single task
+ job.setSpeculativeExecution(false);
+
+ FileInputFormat.addInputPath(job, new Path(segment,
+ CrawlDatum.GENERATE_DIR_NAME));
+ job.setInputFormat(InputFormat.class);
+
+ job.setMapRunnerClass(Fetcher.class);
+
+ FileOutputFormat.setOutputPath(job, segment);
+ job.setOutputFormat(FetcherOutputFormat.class);
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(NutchWritable.class);
+
+ JobClient.runJob(job);
+
+ long end = System.currentTimeMillis();
+ LOG.info("Fetcher: finished at {}, elapsed: {}", sdf.format(end),
+ TimingUtil.elapsedTime(start, end));
+ }
+
+ /** Run the fetcher. */
+ public static void main(String[] args) throws Exception {
+ int res = ToolRunner.run(NutchConfiguration.create(), new Fetcher(), args);
+ System.exit(res);
+ }
+
+ public int run(String[] args) throws Exception {
+
+ String usage = "Usage: Fetcher <segment> [-threads n]";
+
+ if (args.length < 1) {
+ System.err.println(usage);
+ return -1;
+ }
+
+ Path segment = new Path(args[0]);
+
+ int threads = getConf().getInt("fetcher.threads.fetch", 10);
+
+ for (int i = 1; i < args.length; i++) { // parse command line
+ if (args[i].equals("-threads")) { // found -threads option
+ threads = Integer.parseInt(args[++i]);
+ }
+ }
+
+ getConf().setInt("fetcher.threads.fetch", threads);
+
+ try {
+ fetch(segment, threads);
+ return 0;
+ } catch (Exception e) {
+ LOG.error("Fetcher: {}", StringUtils.stringifyException(e));
+ return -1;
+ }
+
+ }
+
+ private void checkConfiguration() {
+ // ensure that a value has been set for the agent name
+ String agentName = getConf().get("http.agent.name");
+ if (agentName == null || agentName.trim().length() == 0) {
+ String message = "Fetcher: No agents listed in 'http.agent.name'"
+ + " property.";
+ if (LOG.isErrorEnabled()) {
+ LOG.error(message);
+ }
+ throw new IllegalArgumentException(message);
+ }
+ }
+
+ private AtomicInteger getActiveThreads() {
+ return activeThreads;
+ }
+
+ @Override
+ public Map<String, Object> run(Map<String, Object> args, String crawlId) throws Exception {
+
+ Map<String, Object> results = new HashMap<String, Object>();
+
+ Path segment;
+ if(args.containsKey(Nutch.ARG_SEGMENT)) {
+ Object seg = args.get(Nutch.ARG_SEGMENT);
+ if(seg instanceof Path) {
+ segment = (Path) seg;
+ }
+ else {
+ segment = new Path(seg.toString());
+ }
+ }
+ else {
+ String segment_dir = crawlId+"/segments";
+ File segmentsDir = new File(segment_dir);
+ File[] segmentsList = segmentsDir.listFiles();
+ Arrays.sort(segmentsList, new Comparator<File>(){
+ @Override
+ public int compare(File f1, File f2) {
+ if(f1.lastModified()>f2.lastModified())
+ return -1;
+ else
+ return 0;
+ }
+ });
+ segment = new Path(segmentsList[0].getPath());
+ }
+
+
+ int threads = getConf().getInt("fetcher.threads.fetch", 10);
+
+ // parse command line
+ if (args.containsKey("threads")) { // found -threads option
+ threads = Integer.parseInt((String)args.get("threads"));
+ }
+ getConf().setInt("fetcher.threads.fetch", threads);
+
+ try {
+ fetch(segment, threads);
+ results.put(Nutch.VAL_RESULT, Integer.toString(0));
+ return results;
+ } catch (Exception e) {
+ LOG.error("Fetcher: {}", StringUtils.stringifyException(e));
+ results.put(Nutch.VAL_RESULT, Integer.toString(-1));
+ return results;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/fetcher/FetcherOutputFormat.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/fetcher/FetcherOutputFormat.java b/nutch-core/src/main/java/org/apache/nutch/fetcher/FetcherOutputFormat.java
new file mode 100644
index 0000000..d526a07
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/fetcher/FetcherOutputFormat.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.fetcher;
+
+import java.io.IOException;
+
+import org.apache.nutch.crawl.CrawlDatum;
+import org.apache.nutch.crawl.NutchWritable;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.MapFile;
+import org.apache.hadoop.io.MapFile.Writer.Option;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.InvalidJobConfException;
+import org.apache.hadoop.mapred.OutputFormat;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.util.Progressable;
+import org.apache.nutch.parse.Parse;
+import org.apache.nutch.parse.ParseOutputFormat;
+import org.apache.nutch.protocol.Content;
+
+/** Splits FetcherOutput entries into multiple map files. */
+public class FetcherOutputFormat implements OutputFormat<Text, NutchWritable> {
+
+ public void checkOutputSpecs(FileSystem fs, JobConf job) throws IOException {
+ Path out = FileOutputFormat.getOutputPath(job);
+ if ((out == null) && (job.getNumReduceTasks() != 0)) {
+ throw new InvalidJobConfException("Output directory not set in JobConf.");
+ }
+ if (fs == null) {
+ fs = out.getFileSystem(job);
+ }
+ if (fs.exists(new Path(out, CrawlDatum.FETCH_DIR_NAME)))
+ throw new IOException("Segment already fetched!");
+ }
+
+ public RecordWriter<Text, NutchWritable> getRecordWriter(final FileSystem fs,
+ final JobConf job, final String name, final Progressable progress)
+ throws IOException {
+
+ Path out = FileOutputFormat.getOutputPath(job);
+ final Path fetch = new Path(new Path(out, CrawlDatum.FETCH_DIR_NAME), name);
+ final Path content = new Path(new Path(out, Content.DIR_NAME), name);
+
+ final CompressionType compType = SequenceFileOutputFormat
+ .getOutputCompressionType(job);
+
+ Option fKeyClassOpt = MapFile.Writer.keyClass(Text.class);
+ org.apache.hadoop.io.SequenceFile.Writer.Option fValClassOpt = SequenceFile.Writer.valueClass(CrawlDatum.class);
+ org.apache.hadoop.io.SequenceFile.Writer.Option fProgressOpt = SequenceFile.Writer.progressable(progress);
+ org.apache.hadoop.io.SequenceFile.Writer.Option fCompOpt = SequenceFile.Writer.compression(compType);
+
+ final MapFile.Writer fetchOut = new MapFile.Writer(job,
+ fetch, fKeyClassOpt, fValClassOpt, fCompOpt, fProgressOpt);
+
+ return new RecordWriter<Text, NutchWritable>() {
+ private MapFile.Writer contentOut;
+ private RecordWriter<Text, Parse> parseOut;
+
+ {
+ if (Fetcher.isStoringContent(job)) {
+ Option cKeyClassOpt = MapFile.Writer.keyClass(Text.class);
+ org.apache.hadoop.io.SequenceFile.Writer.Option cValClassOpt = SequenceFile.Writer.valueClass(Content.class);
+ org.apache.hadoop.io.SequenceFile.Writer.Option cProgressOpt = SequenceFile.Writer.progressable(progress);
+ org.apache.hadoop.io.SequenceFile.Writer.Option cCompOpt = SequenceFile.Writer.compression(compType);
+ contentOut = new MapFile.Writer(job, content,
+ cKeyClassOpt, cValClassOpt, cCompOpt, cProgressOpt);
+ }
+
+ if (Fetcher.isParsing(job)) {
+ parseOut = new ParseOutputFormat().getRecordWriter(fs, job, name,
+ progress);
+ }
+ }
+
+ public void write(Text key, NutchWritable value) throws IOException {
+
+ Writable w = value.get();
+
+ if (w instanceof CrawlDatum)
+ fetchOut.append(key, w);
+ else if (w instanceof Content && contentOut != null)
+ contentOut.append(key, w);
+ else if (w instanceof Parse && parseOut != null)
+ parseOut.write(key, (Parse) w);
+ }
+
+ public void close(Reporter reporter) throws IOException {
+ fetchOut.close();
+ if (contentOut != null) {
+ contentOut.close();
+ }
+ if (parseOut != null) {
+ parseOut.close(reporter);
+ }
+ }
+
+ };
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/fetcher/FetcherThread.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/fetcher/FetcherThread.java b/nutch-core/src/main/java/org/apache/nutch/fetcher/FetcherThread.java
new file mode 100644
index 0000000..e57e735
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/fetcher/FetcherThread.java
@@ -0,0 +1,768 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.fetcher;
+
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.nutch.crawl.CrawlDatum;
+import org.apache.nutch.crawl.NutchWritable;
+import org.apache.nutch.crawl.SignatureFactory;
+import org.apache.nutch.metadata.Metadata;
+import org.apache.nutch.metadata.Nutch;
+import org.apache.nutch.net.URLExemptionFilters;
+import org.apache.nutch.net.URLFilterException;
+import org.apache.nutch.net.URLFilters;
+import org.apache.nutch.net.URLNormalizers;
+import org.apache.nutch.parse.Outlink;
+import org.apache.nutch.parse.Parse;
+import org.apache.nutch.parse.ParseData;
+import org.apache.nutch.parse.ParseImpl;
+import org.apache.nutch.parse.ParseOutputFormat;
+import org.apache.nutch.parse.ParseResult;
+import org.apache.nutch.parse.ParseSegment;
+import org.apache.nutch.parse.ParseStatus;
+import org.apache.nutch.parse.ParseText;
+import org.apache.nutch.parse.ParseUtil;
+import org.apache.nutch.protocol.Content;
+import org.apache.nutch.protocol.Protocol;
+import org.apache.nutch.protocol.ProtocolFactory;
+import org.apache.nutch.protocol.ProtocolOutput;
+import org.apache.nutch.protocol.ProtocolStatus;
+import org.apache.nutch.scoring.ScoringFilterException;
+import org.apache.nutch.scoring.ScoringFilters;
+import org.apache.nutch.service.NutchServer;
+import org.apache.nutch.util.StringUtil;
+import org.apache.nutch.util.URLUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import crawlercommons.robots.BaseRobotRules;
+
+/**
+ * This class picks items from queues and fetches the pages.
+ */
+public class FetcherThread extends Thread {
+
+ private static final Logger LOG = LoggerFactory.getLogger(FetcherThread.class);
+
+ private Configuration conf;
+ private URLFilters urlFilters;
+ private URLExemptionFilters urlExemptionFilters;
+ private ScoringFilters scfilters;
+ private ParseUtil parseUtil;
+ private URLNormalizers normalizers;
+ private ProtocolFactory protocolFactory;
+ private long maxCrawlDelay;
+ private String queueMode;
+ private int maxRedirect;
+ private String reprUrl;
+ private boolean redirecting;
+ private int redirectCount;
+ private boolean ignoreInternalLinks;
+ private boolean ignoreExternalLinks;
+ private String ignoreExternalLinksMode;
+
+ // Used by fetcher.follow.outlinks.depth in parse
+ private int maxOutlinksPerPage;
+ private final int maxOutlinks;
+ private final int interval;
+ private int maxOutlinkDepth;
+ private int maxOutlinkDepthNumLinks;
+ private boolean outlinksIgnoreExternal;
+
+ private int outlinksDepthDivisor;
+ private boolean skipTruncated;
+
+ private boolean halted = false;
+
+ private AtomicInteger activeThreads;
+
+ private Object fetchQueues;
+
+ private QueueFeeder feeder;
+
+ private Object spinWaiting;
+
+ private AtomicLong lastRequestStart;
+
+ private Reporter reporter;
+
+ private AtomicInteger errors;
+
+ private String segmentName;
+
+ private boolean parsing;
+
+ private OutputCollector<Text, NutchWritable> output;
+
+ private boolean storingContent;
+
+ private AtomicInteger pages;
+
+ private AtomicLong bytes;
+
+ //Used by the REST service
+ private FetchNode fetchNode;
+ private boolean reportToNutchServer;
+
+ public FetcherThread(Configuration conf, AtomicInteger activeThreads, FetchItemQueues fetchQueues,
+ QueueFeeder feeder, AtomicInteger spinWaiting, AtomicLong lastRequestStart, Reporter reporter,
+ AtomicInteger errors, String segmentName, boolean parsing, OutputCollector<Text, NutchWritable> output,
+ boolean storingContent, AtomicInteger pages, AtomicLong bytes) {
+ this.setDaemon(true); // don't hang JVM on exit
+ this.setName("FetcherThread"); // use an informative name
+ this.conf = conf;
+ this.urlFilters = new URLFilters(conf);
+ this.urlExemptionFilters = new URLExemptionFilters(conf);
+ this.scfilters = new ScoringFilters(conf);
+ this.parseUtil = new ParseUtil(conf);
+ this.skipTruncated = conf.getBoolean(ParseSegment.SKIP_TRUNCATED, true);
+ this.protocolFactory = new ProtocolFactory(conf);
+ this.normalizers = new URLNormalizers(conf, URLNormalizers.SCOPE_FETCHER);
+ this.maxCrawlDelay = conf.getInt("fetcher.max.crawl.delay", 30) * 1000;
+ this.activeThreads = activeThreads;
+ this.fetchQueues = fetchQueues;
+ this.feeder = feeder;
+ this.spinWaiting = spinWaiting;
+ this.lastRequestStart = lastRequestStart;
+ this.reporter = reporter;
+ this.errors = errors;
+ this.segmentName = segmentName;
+ this.parsing = parsing;
+ this.output = output;
+ this.storingContent = storingContent;
+ this.pages = pages;
+ this.bytes = bytes;
+ queueMode = conf.get("fetcher.queue.mode",
+ FetchItemQueues.QUEUE_MODE_HOST);
+ // check that the mode is known
+ if (!queueMode.equals(FetchItemQueues.QUEUE_MODE_IP)
+ && !queueMode.equals(FetchItemQueues.QUEUE_MODE_DOMAIN)
+ && !queueMode.equals(FetchItemQueues.QUEUE_MODE_HOST)) {
+ LOG.error("Unknown partition mode : " + queueMode
+ + " - forcing to byHost");
+ queueMode = FetchItemQueues.QUEUE_MODE_HOST;
+ }
+ LOG.info("Using queue mode : " + queueMode);
+ this.maxRedirect = conf.getInt("http.redirect.max", 3);
+
+ maxOutlinksPerPage = conf.getInt("db.max.outlinks.per.page", 100);
+ maxOutlinks = (maxOutlinksPerPage < 0) ? Integer.MAX_VALUE
+ : maxOutlinksPerPage;
+ interval = conf.getInt("db.fetch.interval.default", 2592000);
+ ignoreInternalLinks = conf.getBoolean("db.ignore.internal.links", false);
+ ignoreExternalLinks = conf.getBoolean("db.ignore.external.links", false);
+ ignoreExternalLinksMode = conf.get("db.ignore.external.links.mode", "byHost");
+ maxOutlinkDepth = conf.getInt("fetcher.follow.outlinks.depth", -1);
+ outlinksIgnoreExternal = conf.getBoolean(
+ "fetcher.follow.outlinks.ignore.external", false);
+ maxOutlinkDepthNumLinks = conf.getInt(
+ "fetcher.follow.outlinks.num.links", 4);
+ outlinksDepthDivisor = conf.getInt(
+ "fetcher.follow.outlinks.depth.divisor", 2);
+ }
+
+ @SuppressWarnings("fallthrough")
+ public void run() {
+ activeThreads.incrementAndGet(); // count threads
+
+ FetchItem fit = null;
+ try {
+ // checking for the server to be running and fetcher.parse to be true
+ if (parsing && NutchServer.getInstance().isRunning())
+ reportToNutchServer = true;
+
+ while (true) {
+ // creating FetchNode for storing in FetchNodeDb
+ if (reportToNutchServer)
+ this.fetchNode = new FetchNode();
+ else
+ this.fetchNode = null;
+
+ // check whether must be stopped
+ if (isHalted()) {
+ LOG.debug(getName() + " set to halted");
+ fit = null;
+ return;
+ }
+
+ fit = ((FetchItemQueues) fetchQueues).getFetchItem();
+ if (fit == null) {
+ if (feeder.isAlive() || ((FetchItemQueues) fetchQueues).getTotalSize() > 0) {
+ LOG.debug(getName() + " spin-waiting ...");
+ // spin-wait.
+ ((AtomicInteger) spinWaiting).incrementAndGet();
+ try {
+ Thread.sleep(500);
+ } catch (Exception e) {
+ }
+ ((AtomicInteger) spinWaiting).decrementAndGet();
+ continue;
+ } else {
+ // all done, finish this thread
+ LOG.info("Thread " + getName() + " has no more work available");
+ return;
+ }
+ }
+ lastRequestStart.set(System.currentTimeMillis());
+ Text reprUrlWritable = (Text) fit.datum.getMetaData().get(
+ Nutch.WRITABLE_REPR_URL_KEY);
+ if (reprUrlWritable == null) {
+ setReprUrl(fit.url.toString());
+ } else {
+ setReprUrl(reprUrlWritable.toString());
+ }
+ try {
+ // fetch the page
+ redirecting = false;
+ redirectCount = 0;
+ do {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("fetching " + fit.url + " (queue crawl delay="
+ + ((FetchItemQueues) fetchQueues).getFetchItemQueue(fit.queueID).crawlDelay
+ + "ms)");
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("redirectCount=" + redirectCount);
+ }
+ redirecting = false;
+ Protocol protocol = this.protocolFactory.getProtocol(fit.url
+ .toString());
+ BaseRobotRules rules = protocol.getRobotRules(fit.url, fit.datum);
+ if (!rules.isAllowed(fit.u.toString())) {
+ // unblock
+ ((FetchItemQueues) fetchQueues).finishFetchItem(fit, true);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Denied by robots.txt: " + fit.url);
+ }
+ output(fit.url, fit.datum, null,
+ ProtocolStatus.STATUS_ROBOTS_DENIED,
+ CrawlDatum.STATUS_FETCH_GONE);
+ reporter.incrCounter("FetcherStatus", "robots_denied", 1);
+ continue;
+ }
+ if (rules.getCrawlDelay() > 0) {
+ if (rules.getCrawlDelay() > maxCrawlDelay && maxCrawlDelay >= 0) {
+ // unblock
+ ((FetchItemQueues) fetchQueues).finishFetchItem(fit, true);
+ LOG.debug("Crawl-Delay for " + fit.url + " too long ("
+ + rules.getCrawlDelay() + "), skipping");
+ output(fit.url, fit.datum, null,
+ ProtocolStatus.STATUS_ROBOTS_DENIED,
+ CrawlDatum.STATUS_FETCH_GONE);
+ reporter.incrCounter("FetcherStatus",
+ "robots_denied_maxcrawldelay", 1);
+ continue;
+ } else {
+ FetchItemQueue fiq = ((FetchItemQueues) fetchQueues)
+ .getFetchItemQueue(fit.queueID);
+ fiq.crawlDelay = rules.getCrawlDelay();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Crawl delay for queue: " + fit.queueID
+ + " is set to " + fiq.crawlDelay
+ + " as per robots.txt. url: " + fit.url);
+ }
+ }
+ }
+ ProtocolOutput output = protocol.getProtocolOutput(fit.url,
+ fit.datum);
+ ProtocolStatus status = output.getStatus();
+ Content content = output.getContent();
+ ParseStatus pstatus = null;
+ // unblock queue
+ ((FetchItemQueues) fetchQueues).finishFetchItem(fit);
+
+ String urlString = fit.url.toString();
+
+ // used for FetchNode
+ if (fetchNode != null) {
+ fetchNode.setStatus(status.getCode());
+ fetchNode.setFetchTime(System.currentTimeMillis());
+ fetchNode.setUrl(fit.url);
+ }
+
+ reporter.incrCounter("FetcherStatus", status.getName(), 1);
+
+ switch (status.getCode()) {
+
+ case ProtocolStatus.WOULDBLOCK:
+ // retry ?
+ ((FetchItemQueues) fetchQueues).addFetchItem(fit);
+ break;
+
+ case ProtocolStatus.SUCCESS: // got a page
+ pstatus = output(fit.url, fit.datum, content, status,
+ CrawlDatum.STATUS_FETCH_SUCCESS, fit.outlinkDepth);
+ updateStatus(content.getContent().length);
+ if (pstatus != null && pstatus.isSuccess()
+ && pstatus.getMinorCode() == ParseStatus.SUCCESS_REDIRECT) {
+ String newUrl = pstatus.getMessage();
+ int refreshTime = Integer.valueOf(pstatus.getArgs()[1]);
+ Text redirUrl = handleRedirect(fit.url, fit.datum, urlString,
+ newUrl, refreshTime < Fetcher.PERM_REFRESH_TIME,
+ Fetcher.CONTENT_REDIR);
+ if (redirUrl != null) {
+ fit = queueRedirect(redirUrl, fit);
+ }
+ }
+ break;
+
+ case ProtocolStatus.MOVED: // redirect
+ case ProtocolStatus.TEMP_MOVED:
+ int code;
+ boolean temp;
+ if (status.getCode() == ProtocolStatus.MOVED) {
+ code = CrawlDatum.STATUS_FETCH_REDIR_PERM;
+ temp = false;
+ } else {
+ code = CrawlDatum.STATUS_FETCH_REDIR_TEMP;
+ temp = true;
+ }
+ output(fit.url, fit.datum, content, status, code);
+ String newUrl = status.getMessage();
+ Text redirUrl = handleRedirect(fit.url, fit.datum, urlString,
+ newUrl, temp, Fetcher.PROTOCOL_REDIR);
+ if (redirUrl != null) {
+ fit = queueRedirect(redirUrl, fit);
+ } else {
+ // stop redirecting
+ redirecting = false;
+ }
+ break;
+
+ case ProtocolStatus.EXCEPTION:
+ logError(fit.url, status.getMessage());
+ int killedURLs = ((FetchItemQueues) fetchQueues).checkExceptionThreshold(fit
+ .getQueueID());
+ if (killedURLs != 0)
+ reporter.incrCounter("FetcherStatus",
+ "AboveExceptionThresholdInQueue", killedURLs);
+ /* FALLTHROUGH */
+ case ProtocolStatus.RETRY: // retry
+ case ProtocolStatus.BLOCKED:
+ output(fit.url, fit.datum, null, status,
+ CrawlDatum.STATUS_FETCH_RETRY);
+ break;
+
+ case ProtocolStatus.GONE: // gone
+ case ProtocolStatus.NOTFOUND:
+ case ProtocolStatus.ACCESS_DENIED:
+ case ProtocolStatus.ROBOTS_DENIED:
+ output(fit.url, fit.datum, null, status,
+ CrawlDatum.STATUS_FETCH_GONE);
+ break;
+
+ case ProtocolStatus.NOTMODIFIED:
+ output(fit.url, fit.datum, null, status,
+ CrawlDatum.STATUS_FETCH_NOTMODIFIED);
+ break;
+
+ default:
+ if (LOG.isWarnEnabled()) {
+ LOG.warn("Unknown ProtocolStatus: " + status.getCode());
+ }
+ output(fit.url, fit.datum, null, status,
+ CrawlDatum.STATUS_FETCH_RETRY);
+ }
+
+ if (redirecting && redirectCount > maxRedirect) {
+ ((FetchItemQueues) fetchQueues).finishFetchItem(fit);
+ if (LOG.isInfoEnabled()) {
+ LOG.info(" - redirect count exceeded " + fit.url);
+ }
+ output(fit.url, fit.datum, null,
+ ProtocolStatus.STATUS_REDIR_EXCEEDED,
+ CrawlDatum.STATUS_FETCH_GONE);
+ }
+
+ } while (redirecting && (redirectCount <= maxRedirect));
+
+ } catch (Throwable t) { // unexpected exception
+ // unblock
+ ((FetchItemQueues) fetchQueues).finishFetchItem(fit);
+ logError(fit.url, StringUtils.stringifyException(t));
+ output(fit.url, fit.datum, null, ProtocolStatus.STATUS_FAILED,
+ CrawlDatum.STATUS_FETCH_RETRY);
+ }
+ }
+
+ } catch (Throwable e) {
+ if (LOG.isErrorEnabled()) {
+ LOG.error("fetcher caught:" + e.toString());
+ }
+ } finally {
+ if (fit != null)
+ ((FetchItemQueues) fetchQueues).finishFetchItem(fit);
+ activeThreads.decrementAndGet(); // count threads
+ LOG.info("-finishing thread " + getName() + ", activeThreads="
+ + activeThreads);
+ }
+ }
+
+ private Text handleRedirect(Text url, CrawlDatum datum, String urlString,
+ String newUrl, boolean temp, String redirType)
+ throws MalformedURLException, URLFilterException {
+ newUrl = normalizers.normalize(newUrl, URLNormalizers.SCOPE_FETCHER);
+ newUrl = urlFilters.filter(newUrl);
+
+ try {
+ String origHost = new URL(urlString).getHost().toLowerCase();
+ String newHost = new URL(newUrl).getHost().toLowerCase();
+ if (ignoreExternalLinks) {
+ if (!origHost.equals(newHost)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(" - ignoring redirect " + redirType + " from "
+ + urlString + " to " + newUrl
+ + " because external links are ignored");
+ }
+ return null;
+ }
+ }
+
+ if (ignoreInternalLinks) {
+ if (origHost.equals(newHost)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(" - ignoring redirect " + redirType + " from "
+ + urlString + " to " + newUrl
+ + " because internal links are ignored");
+ }
+ return null;
+ }
+ }
+ } catch (MalformedURLException e) { }
+
+ if (newUrl != null && !newUrl.equals(urlString)) {
+ reprUrl = URLUtil.chooseRepr(reprUrl, newUrl, temp);
+ url = new Text(newUrl);
+ if (maxRedirect > 0) {
+ redirecting = true;
+ redirectCount++;
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(" - " + redirType + " redirect to " + url
+ + " (fetching now)");
+ }
+ return url;
+ } else {
+ CrawlDatum newDatum = new CrawlDatum(CrawlDatum.STATUS_LINKED,
+ datum.getFetchInterval(), datum.getScore());
+ // transfer existing metadata
+ newDatum.getMetaData().putAll(datum.getMetaData());
+ try {
+ scfilters.initialScore(url, newDatum);
+ } catch (ScoringFilterException e) {
+ e.printStackTrace();
+ }
+ if (reprUrl != null) {
+ newDatum.getMetaData().put(Nutch.WRITABLE_REPR_URL_KEY,
+ new Text(reprUrl));
+ }
+ output(url, newDatum, null, null, CrawlDatum.STATUS_LINKED);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(" - " + redirType + " redirect to " + url
+ + " (fetching later)");
+ }
+ return null;
+ }
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(" - " + redirType + " redirect skipped: "
+ + (newUrl != null ? "to same url" : "filtered"));
+ }
+ return null;
+ }
+ }
+
+ private FetchItem queueRedirect(Text redirUrl, FetchItem fit)
+ throws ScoringFilterException {
+ CrawlDatum newDatum = new CrawlDatum(CrawlDatum.STATUS_DB_UNFETCHED,
+ fit.datum.getFetchInterval(), fit.datum.getScore());
+ // transfer all existing metadata to the redirect
+ newDatum.getMetaData().putAll(fit.datum.getMetaData());
+ scfilters.initialScore(redirUrl, newDatum);
+ if (reprUrl != null) {
+ newDatum.getMetaData().put(Nutch.WRITABLE_REPR_URL_KEY,
+ new Text(reprUrl));
+ }
+ fit = FetchItem.create(redirUrl, newDatum, queueMode);
+ if (fit != null) {
+ FetchItemQueue fiq = ((FetchItemQueues) fetchQueues).getFetchItemQueue(fit.queueID);
+ fiq.addInProgressFetchItem(fit);
+ } else {
+ // stop redirecting
+ redirecting = false;
+ reporter.incrCounter("FetcherStatus", "FetchItem.notCreated.redirect",
+ 1);
+ }
+ return fit;
+ }
+
+ private void logError(Text url, String message) {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("fetch of " + url + " failed with: " + message);
+ }
+ errors.incrementAndGet();
+ }
+
+ private ParseStatus output(Text key, CrawlDatum datum, Content content,
+ ProtocolStatus pstatus, int status) {
+
+ return output(key, datum, content, pstatus, status, 0);
+ }
+
+ private ParseStatus output(Text key, CrawlDatum datum, Content content,
+ ProtocolStatus pstatus, int status, int outlinkDepth) {
+
+ datum.setStatus(status);
+ datum.setFetchTime(System.currentTimeMillis());
+ if (pstatus != null)
+ datum.getMetaData().put(Nutch.WRITABLE_PROTO_STATUS_KEY, pstatus);
+
+ ParseResult parseResult = null;
+ if (content != null) {
+ Metadata metadata = content.getMetadata();
+
+ // store the guessed content type in the crawldatum
+ if (content.getContentType() != null)
+ datum.getMetaData().put(new Text(Metadata.CONTENT_TYPE),
+ new Text(content.getContentType()));
+
+ // add segment to metadata
+ metadata.set(Nutch.SEGMENT_NAME_KEY, segmentName);
+ // add score to content metadata so that ParseSegment can pick it up.
+ try {
+ scfilters.passScoreBeforeParsing(key, datum, content);
+ } catch (Exception e) {
+ if (LOG.isWarnEnabled()) {
+ LOG.warn("Couldn't pass score, url " + key + " (" + e + ")");
+ }
+ }
+ /*
+ * Note: Fetcher will only follow meta-redirects coming from the
+ * original URL.
+ */
+ if (parsing && status == CrawlDatum.STATUS_FETCH_SUCCESS) {
+ if (!skipTruncated
+ || (skipTruncated && !ParseSegment.isTruncated(content))) {
+ try {
+ parseResult = this.parseUtil.parse(content);
+ } catch (Exception e) {
+ LOG.warn("Error parsing: " + key + ": "
+ + StringUtils.stringifyException(e));
+ }
+ }
+
+ if (parseResult == null) {
+ byte[] signature = SignatureFactory.getSignature(conf)
+ .calculate(content, new ParseStatus().getEmptyParse(conf));
+ datum.setSignature(signature);
+ }
+ }
+
+ /*
+ * Store status code in content So we can read this value during parsing
+ * (as a separate job) and decide to parse or not.
+ */
+ content.getMetadata().add(Nutch.FETCH_STATUS_KEY,
+ Integer.toString(status));
+ }
+
+ try {
+ output.collect(key, new NutchWritable(datum));
+ if (content != null && storingContent)
+ output.collect(key, new NutchWritable(content));
+ if (parseResult != null) {
+ for (Entry<Text, Parse> entry : parseResult) {
+ Text url = entry.getKey();
+ Parse parse = entry.getValue();
+ ParseStatus parseStatus = parse.getData().getStatus();
+ ParseData parseData = parse.getData();
+
+ if (!parseStatus.isSuccess()) {
+ LOG.warn("Error parsing: " + key + ": " + parseStatus);
+ parse = parseStatus.getEmptyParse(conf);
+ }
+
+ // Calculate page signature. For non-parsing fetchers this will
+ // be done in ParseSegment
+ byte[] signature = SignatureFactory.getSignature(conf)
+ .calculate(content, parse);
+ // Ensure segment name and score are in parseData metadata
+ parseData.getContentMeta().set(Nutch.SEGMENT_NAME_KEY, segmentName);
+ parseData.getContentMeta().set(Nutch.SIGNATURE_KEY,
+ StringUtil.toHexString(signature));
+ // Pass fetch time to content meta
+ parseData.getContentMeta().set(Nutch.FETCH_TIME_KEY,
+ Long.toString(datum.getFetchTime()));
+ if (url.equals(key))
+ datum.setSignature(signature);
+ try {
+ scfilters.passScoreAfterParsing(url, content, parse);
+ } catch (Exception e) {
+ if (LOG.isWarnEnabled()) {
+ LOG.warn("Couldn't pass score, url " + key + " (" + e + ")");
+ }
+ }
+
+ String origin = null;
+
+ // collect outlinks for subsequent db update
+ Outlink[] links = parseData.getOutlinks();
+ int outlinksToStore = Math.min(maxOutlinks, links.length);
+ if (ignoreExternalLinks || ignoreInternalLinks) {
+ URL originURL = new URL(url.toString());
+ // based on domain?
+ if ("bydomain".equalsIgnoreCase(ignoreExternalLinksMode)) {
+ origin = URLUtil.getDomainName(originURL).toLowerCase();
+ }
+ // use host
+ else {
+ origin = originURL.getHost().toLowerCase();
+ }
+ }
+
+ //used by fetchNode
+ if(fetchNode!=null){
+ fetchNode.setOutlinks(links);
+ fetchNode.setTitle(parseData.getTitle());
+ FetchNodeDb.getInstance().put(fetchNode.getUrl().toString(), fetchNode);
+ }
+ int validCount = 0;
+
+ // Process all outlinks, normalize, filter and deduplicate
+ List<Outlink> outlinkList = new ArrayList<Outlink>(outlinksToStore);
+ HashSet<String> outlinks = new HashSet<String>(outlinksToStore);
+ for (int i = 0; i < links.length && validCount < outlinksToStore; i++) {
+ String toUrl = links[i].getToUrl();
+
+ toUrl = ParseOutputFormat.filterNormalize(url.toString(), toUrl,
+ origin, ignoreInternalLinks, ignoreExternalLinks, ignoreExternalLinksMode,
+ urlFilters, urlExemptionFilters, normalizers);
+ if (toUrl == null) {
+ continue;
+ }
+
+ validCount++;
+ links[i].setUrl(toUrl);
+ outlinkList.add(links[i]);
+ outlinks.add(toUrl);
+ }
+
+ // Only process depth N outlinks
+ if (maxOutlinkDepth > 0 && outlinkDepth < maxOutlinkDepth) {
+ reporter.incrCounter("FetcherOutlinks", "outlinks_detected",
+ outlinks.size());
+
+ // Counter to limit num outlinks to follow per page
+ int outlinkCounter = 0;
+
+ // Calculate variable number of outlinks by depth using the
+ // divisor (outlinks = Math.floor(divisor / depth * num.links))
+ int maxOutlinksByDepth = (int) Math.floor(outlinksDepthDivisor
+ / (outlinkDepth + 1) * maxOutlinkDepthNumLinks);
+
+ String followUrl;
+
+ // Walk over the outlinks and add as new FetchItem to the queues
+ Iterator<String> iter = outlinks.iterator();
+ while (iter.hasNext() && outlinkCounter < maxOutlinkDepthNumLinks) {
+ followUrl = iter.next();
+
+ // Check whether we'll follow external outlinks
+ if (outlinksIgnoreExternal) {
+ if (!URLUtil.getHost(url.toString()).equals(
+ URLUtil.getHost(followUrl))) {
+ continue;
+ }
+ }
+
+ reporter
+ .incrCounter("FetcherOutlinks", "outlinks_following", 1);
+
+ // Create new FetchItem with depth incremented
+ FetchItem fit = FetchItem.create(new Text(followUrl),
+ new CrawlDatum(CrawlDatum.STATUS_LINKED, interval),
+ queueMode, outlinkDepth + 1);
+ ((FetchItemQueues) fetchQueues).addFetchItem(fit);
+
+ outlinkCounter++;
+ }
+ }
+
+ // Overwrite the outlinks in ParseData with the normalized and
+ // filtered set
+ parseData.setOutlinks(outlinkList.toArray(new Outlink[outlinkList
+ .size()]));
+
+ output.collect(url, new NutchWritable(new ParseImpl(new ParseText(
+ parse.getText()), parseData, parse.isCanonical())));
+ }
+ }
+ } catch (IOException e) {
+ if (LOG.isErrorEnabled()) {
+ LOG.error("fetcher caught:" + e.toString());
+ }
+ }
+
+ // return parse status if it exits
+ if (parseResult != null && !parseResult.isEmpty()) {
+ Parse p = parseResult.get(content.getUrl());
+ if (p != null) {
+ reporter.incrCounter("ParserStatus", ParseStatus.majorCodes[p
+ .getData().getStatus().getMajorCode()], 1);
+ return p.getData().getStatus();
+ }
+ }
+ return null;
+ }
+
+ private void updateStatus(int bytesInPage) throws IOException {
+ pages.incrementAndGet();
+ bytes.addAndGet(bytesInPage);
+ }
+
+ public synchronized void setHalted(boolean halted) {
+ this.halted = halted;
+ }
+
+ public synchronized boolean isHalted() {
+ return halted;
+ }
+
+ public String getReprUrl() {
+ return reprUrl;
+ }
+
+ private void setReprUrl(String urlString) {
+ this.reprUrl = urlString;
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/fetcher/QueueFeeder.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/fetcher/QueueFeeder.java b/nutch-core/src/main/java/org/apache/nutch/fetcher/QueueFeeder.java
new file mode 100644
index 0000000..79652e7
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/fetcher/QueueFeeder.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.fetcher;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.nutch.crawl.CrawlDatum;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class feeds the queues with input items, and re-fills them as items
+ * are consumed by FetcherThread-s.
+ */
+public class QueueFeeder extends Thread {
+
+ private static final Logger LOG = LoggerFactory.getLogger(QueueFeeder.class);
+
+
+ private RecordReader<Text, CrawlDatum> reader;
+ private FetchItemQueues queues;
+ private int size;
+ private long timelimit = -1;
+
+ public QueueFeeder(RecordReader<Text, CrawlDatum> reader,
+ FetchItemQueues queues, int size) {
+ this.reader = reader;
+ this.queues = queues;
+ this.size = size;
+ this.setDaemon(true);
+ this.setName("QueueFeeder");
+ }
+
+ public void setTimeLimit(long tl) {
+ timelimit = tl;
+ }
+
+ public void run() {
+ boolean hasMore = true;
+ int cnt = 0;
+ int timelimitcount = 0;
+ while (hasMore) {
+ if (System.currentTimeMillis() >= timelimit && timelimit != -1) {
+ // enough .. lets' simply
+ // read all the entries from the input without processing them
+ try {
+ Text url = new Text();
+ CrawlDatum datum = new CrawlDatum();
+ hasMore = reader.next(url, datum);
+ timelimitcount++;
+ } catch (IOException e) {
+ LOG.error("QueueFeeder error reading input, record " + cnt, e);
+ return;
+ }
+ continue;
+ }
+ int feed = size - queues.getTotalSize();
+ if (feed <= 0) {
+ // queues are full - spin-wait until they have some free space
+ try {
+ Thread.sleep(1000);
+ } catch (Exception e) {
+ }
+ ;
+ continue;
+ } else {
+ LOG.debug("-feeding " + feed + " input urls ...");
+ while (feed > 0 && hasMore) {
+ try {
+ Text url = new Text();
+ CrawlDatum datum = new CrawlDatum();
+ hasMore = reader.next(url, datum);
+ if (hasMore) {
+ queues.addFetchItem(url, datum);
+ cnt++;
+ feed--;
+ }
+ } catch (IOException e) {
+ LOG.error("QueueFeeder error reading input, record " + cnt, e);
+ return;
+ }
+ }
+ }
+ }
+ LOG.info("QueueFeeder finished: total " + cnt
+ + " records + hit by time limit :" + timelimitcount);
+ }
+}
http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/fetcher/package.html
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/fetcher/package.html b/nutch-core/src/main/java/org/apache/nutch/fetcher/package.html
new file mode 100644
index 0000000..9c843e0
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/fetcher/package.html
@@ -0,0 +1,5 @@
+<html>
+<body>
+The Nutch robot.
+</body>
+</html>
http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/hostdb/HostDatum.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/hostdb/HostDatum.java b/nutch-core/src/main/java/org/apache/nutch/hostdb/HostDatum.java
new file mode 100644
index 0000000..424fb1e
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/hostdb/HostDatum.java
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.hostdb;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Date;
+import java.util.Map.Entry;
+import java.text.SimpleDateFormat;
+
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+
+/**
+ */
+public class HostDatum implements Writable, Cloneable {
+ protected int failures = 0;
+ protected float score = 0;
+ protected Date lastCheck = new Date(0);
+ protected String homepageUrl = new String();
+
+ protected MapWritable metaData = new MapWritable();
+
+ // Records the number of times DNS look-up failed, may indicate host no longer exists
+ protected int dnsFailures = 0;
+
+ // Records the number of connection failures, may indicate our netwerk being blocked by firewall
+ protected int connectionFailures = 0;
+
+ protected int unfetched = 0;
+ protected int fetched = 0;
+ protected int notModified = 0;
+ protected int redirTemp = 0;
+ protected int redirPerm = 0;
+ protected int gone = 0;
+
+ public HostDatum() {
+ }
+
+ public HostDatum(float score) {
+ this(score, new Date());
+ }
+
+ public HostDatum(float score, Date lastCheck) {
+ this(score, lastCheck, new String());
+ }
+
+ public HostDatum(float score, Date lastCheck, String homepageUrl) {
+ this.score = score;
+ this.lastCheck = lastCheck;
+ this.homepageUrl = homepageUrl;
+ }
+
+ public void resetFailures() {
+ setDnsFailures(0);
+ setConnectionFailures(0);
+ }
+
+ public void setDnsFailures(Integer dnsFailures) {
+ this.dnsFailures = dnsFailures;
+ }
+
+ public void setConnectionFailures(Integer connectionFailures) {
+ this.connectionFailures = connectionFailures;
+ }
+
+ public void incDnsFailures() {
+ this.dnsFailures++;
+ }
+
+ public void incConnectionFailures() {
+ this.connectionFailures++;
+ }
+
+ public Integer numFailures() {
+ return getDnsFailures() + getConnectionFailures();
+ }
+
+ public Integer getDnsFailures() {
+ return dnsFailures;
+ }
+
+ public Integer getConnectionFailures() {
+ return connectionFailures;
+ }
+
+ public void setScore(float score) {
+ this.score = score;
+ }
+
+ public void setLastCheck() {
+ setLastCheck(new Date());
+ }
+
+ public void setLastCheck(Date date) {
+ lastCheck = date;
+ }
+
+ public boolean isEmpty() {
+ return (lastCheck.getTime() == 0) ? true : false;
+ }
+
+ public float getScore() {
+ return score;
+ }
+
+ public Integer numRecords() {
+ return unfetched + fetched + gone + redirPerm + redirTemp + notModified;
+ }
+
+ public Date getLastCheck() {
+ return lastCheck;
+ }
+
+ public boolean hasHomepageUrl() {
+ return homepageUrl.length() > 0;
+ }
+
+ public String getHomepageUrl() {
+ return homepageUrl;
+ }
+
+ public void setHomepageUrl(String homepageUrl) {
+ this.homepageUrl = homepageUrl;
+ }
+
+ public void setUnfetched(int val) {
+ unfetched = val;
+ }
+
+ public int getUnfetched() {
+ return unfetched;
+ }
+
+ public void setFetched(int val) {
+ fetched = val;
+ }
+
+ public int getFetched() {
+ return fetched;
+ }
+
+ public void setNotModified(int val) {
+ notModified = val;
+ }
+
+ public int getNotModified() {
+ return notModified;
+ }
+
+ public void setRedirTemp(int val) {
+ redirTemp = val;
+ }
+
+ public int getRedirTemp() {
+ return redirTemp;
+ }
+
+ public void setRedirPerm(int val) {
+ redirPerm = val;
+ }
+
+ public int getRedirPerm() {
+ return redirPerm;
+ }
+
+ public void setGone(int val) {
+ gone = val;
+ }
+
+ public int getGone() {
+ return gone;
+ }
+
+ public void resetStatistics() {
+ setUnfetched(0);
+ setFetched(0);
+ setGone(0);
+ setRedirTemp(0);
+ setRedirPerm(0);
+ setNotModified(0);
+ }
+
+ public void setMetaData(org.apache.hadoop.io.MapWritable mapWritable) {
+ this.metaData = new org.apache.hadoop.io.MapWritable(mapWritable);
+ }
+
+ /**
+ * Add all metadata from other CrawlDatum to this CrawlDatum.
+ *
+ * @param other HostDatum
+ */
+ public void putAllMetaData(HostDatum other) {
+ for (Entry<Writable, Writable> e : other.getMetaData().entrySet()) {
+ getMetaData().put(e.getKey(), e.getValue());
+ }
+ }
+
+ /**
+ * returns a MapWritable if it was set or read in @see readFields(DataInput),
+ * returns empty map in case CrawlDatum was freshly created (lazily instantiated).
+ */
+ public org.apache.hadoop.io.MapWritable getMetaData() {
+ if (this.metaData == null) this.metaData = new org.apache.hadoop.io.MapWritable();
+ return this.metaData;
+ }
+
+ @Override
+ public Object clone() throws CloneNotSupportedException {
+ HostDatum result = (HostDatum)super.clone();
+ result.score = score;
+ result.lastCheck = lastCheck;
+ result.homepageUrl = homepageUrl;
+
+ result.dnsFailures = dnsFailures;
+ result.connectionFailures = connectionFailures;
+
+ result.unfetched = unfetched;
+ result.fetched = fetched;
+ result.notModified = notModified;
+ result.redirTemp = redirTemp;
+ result.redirPerm = redirPerm;
+ result.gone = gone;
+
+ result.metaData = metaData;
+
+ return result;
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ score = in.readFloat();
+ lastCheck = new Date(in.readLong());
+ homepageUrl = Text.readString(in);
+
+ dnsFailures = in.readInt();
+ connectionFailures = in.readInt();
+
+ unfetched= in.readInt();
+ fetched= in.readInt();
+ notModified= in.readInt();
+ redirTemp= in.readInt();
+ redirPerm = in.readInt();
+ gone = in.readInt();
+
+ metaData = new org.apache.hadoop.io.MapWritable();
+ metaData.readFields(in);
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeFloat(score);
+ out.writeLong(lastCheck.getTime());
+ Text.writeString(out, homepageUrl);
+
+ out.writeInt(dnsFailures);
+ out.writeInt(connectionFailures);
+
+ out.writeInt(unfetched);
+ out.writeInt(fetched);
+ out.writeInt(notModified);
+ out.writeInt(redirTemp);
+ out.writeInt(redirPerm);
+ out.writeInt(gone);
+
+ metaData.write(out);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder buf = new StringBuilder();
+ buf.append(Integer.toString(getUnfetched()));
+ buf.append("\t");
+ buf.append(Integer.toString(getFetched()));
+ buf.append("\t");
+ buf.append(Integer.toString(getGone()));
+ buf.append("\t");
+ buf.append(Integer.toString(getRedirTemp()));
+ buf.append("\t");
+ buf.append(Integer.toString(getRedirPerm()));
+ buf.append("\t");
+ buf.append(Integer.toString(getNotModified()));
+ buf.append("\t");
+ buf.append(Integer.toString(numRecords()));
+ buf.append("\t");
+ buf.append(Integer.toString(getDnsFailures()));
+ buf.append("\t");
+ buf.append(Integer.toString(getConnectionFailures()));
+ buf.append("\t");
+ buf.append(Integer.toString(numFailures()));
+ buf.append("\t");
+ buf.append(Float.toString(score));
+ buf.append("\t");
+ buf.append(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(lastCheck));
+ buf.append("\t");
+ buf.append(homepageUrl);
+ buf.append("\t");
+ for (Entry<Writable, Writable> e : getMetaData().entrySet()) {
+ buf.append(e.getKey().toString());
+ buf.append(':');
+ buf.append(e.getValue().toString());
+ buf.append("|||");
+ }
+ return buf.toString();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/hostdb/ReadHostDb.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/hostdb/ReadHostDb.java b/nutch-core/src/main/java/org/apache/nutch/hostdb/ReadHostDb.java
new file mode 100644
index 0000000..240e109
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/hostdb/ReadHostDb.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.hostdb;
+
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.nutch.util.NutchConfiguration;
+import org.apache.nutch.util.StringUtil;
+import org.apache.nutch.util.TimingUtil;
+import org.apache.nutch.util.URLUtil;
+
+import org.apache.commons.jexl2.JexlContext;
+import org.apache.commons.jexl2.Expression;
+import org.apache.commons.jexl2.JexlEngine;
+import org.apache.commons.jexl2.MapContext;
+
+/**
+ * @see http://commons.apache.org/proper/commons-jexl/reference/syntax.html
+ */
+public class ReadHostDb extends Configured implements Tool {
+
+ public static final Logger LOG = LoggerFactory.getLogger(ReadHostDb.class);
+
+ public static final String HOSTDB_DUMP_HOSTNAMES = "hostdb.dump.hostnames";
+ public static final String HOSTDB_DUMP_HOMEPAGES = "hostdb.dump.homepages";
+ public static final String HOSTDB_FILTER_EXPRESSION = "hostdb.filter.expression";
+
+ static class ReadHostDbMapper extends Mapper<Text, HostDatum, Text, Text> {
+ protected boolean dumpHostnames = false;
+ protected boolean dumpHomepages = false;
+ protected Text emptyText = new Text();
+ protected Expression expr = null;
+
+ public void setup(Context context) {
+ dumpHomepages = context.getConfiguration().getBoolean(HOSTDB_DUMP_HOMEPAGES, false);
+ dumpHostnames = context.getConfiguration().getBoolean(HOSTDB_DUMP_HOSTNAMES, false);
+ String expr = context.getConfiguration().get(HOSTDB_FILTER_EXPRESSION);
+ if (expr != null) {
+ // Create or retrieve a JexlEngine
+ JexlEngine jexl = new JexlEngine();
+
+ // Dont't be silent and be strict
+ jexl.setSilent(true);
+ jexl.setStrict(true);
+
+ // Create an expression object
+ this.expr = jexl.createExpression(expr);
+ }
+ }
+
+ public void map(Text key, HostDatum datum, Context context) throws IOException, InterruptedException {
+ if (expr != null) {
+ // Create a context and add data
+ JexlContext jcontext = new MapContext();
+
+ // Set some fixed variables
+ jcontext.set("unfetched", datum.getUnfetched());
+ jcontext.set("fetched", datum.getFetched());
+ jcontext.set("gone", datum.getGone());
+ jcontext.set("redirTemp", datum.getRedirTemp());
+ jcontext.set("redirPerm", datum.getRedirPerm());
+ jcontext.set("redirs", datum.getRedirPerm() + datum.getRedirTemp());
+ jcontext.set("notModified", datum.getNotModified());
+ jcontext.set("ok", datum.getFetched() + datum.getNotModified());
+ jcontext.set("numRecords", datum.numRecords());
+ jcontext.set("dnsFailures", datum.getDnsFailures());
+ jcontext.set("connectionFailures", datum.getConnectionFailures());
+
+ // Set metadata variables
+ for (Map.Entry<Writable, Writable> entry : datum.getMetaData().entrySet()) {
+ Object value = entry.getValue();
+
+ if (value instanceof FloatWritable) {
+ FloatWritable fvalue = (FloatWritable)value;
+ Text tkey = (Text)entry.getKey();
+ jcontext.set(tkey.toString(), fvalue.get());
+ }
+
+ if (value instanceof IntWritable) {
+ IntWritable ivalue = (IntWritable)value;
+ Text tkey = (Text)entry.getKey();
+ jcontext.set(tkey.toString(), ivalue.get());
+ }
+ }
+
+ // Filter this record if evaluation did not pass
+ try {
+ if (!Boolean.TRUE.equals(expr.evaluate(jcontext))) {
+ return;
+ }
+ } catch (Exception e) {
+ LOG.info(e.toString() + " for " + key.toString());
+ }
+ }
+
+ if (dumpHomepages) {
+ if (datum.hasHomepageUrl()) {
+ context.write(new Text(datum.getHomepageUrl()), emptyText);
+ }
+ return;
+ }
+
+ if (dumpHostnames) {
+ context.write(key, emptyText);
+ return;
+ }
+
+ // Write anyway
+ context.write(key, new Text(datum.toString()));
+ }
+ }
+
+ // Todo, reduce unknown hosts to single unknown domain if possible. Enable via configuration
+ // host_a.example.org,host_a.example.org ==> example.org
+// static class ReadHostDbReducer extends Reduce<Text, Text, Text, Text> {
+// public void setup(Context context) { }
+//
+// public void reduce(Text domain, Iterable<Text> hosts, Context context) throws IOException, InterruptedException {
+//
+// }
+// }
+
+ private void readHostDb(Path hostDb, Path output, boolean dumpHomepages, boolean dumpHostnames, String expr) throws Exception {
+ SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ long start = System.currentTimeMillis();
+ LOG.info("ReadHostDb: starting at " + sdf.format(start));
+
+ Configuration conf = getConf();
+ conf.setBoolean(HOSTDB_DUMP_HOMEPAGES, dumpHomepages);
+ conf.setBoolean(HOSTDB_DUMP_HOSTNAMES, dumpHostnames);
+ if (expr != null) {
+ conf.set(HOSTDB_FILTER_EXPRESSION, expr);
+ }
+ conf.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", false);
+ conf.set("mapred.textoutputformat.separator", "\t");
+
+ Job job = new Job(conf, "ReadHostDb");
+ job.setJarByClass(ReadHostDb.class);
+
+ FileInputFormat.addInputPath(job, new Path(hostDb, "current"));
+ FileOutputFormat.setOutputPath(job, output);
+
+ job.setJarByClass(ReadHostDb.class);
+ job.setMapperClass(ReadHostDbMapper.class);
+
+ job.setInputFormatClass(SequenceFileInputFormat.class);
+ job.setOutputFormatClass(TextOutputFormat.class);
+ job.setMapOutputKeyClass(Text.class);
+ job.setMapOutputValueClass(Text.class);
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(Text.class);
+ job.setNumReduceTasks(0);
+
+ try {
+ job.waitForCompletion(true);
+ } catch (Exception e) {
+ throw e;
+ }
+
+ long end = System.currentTimeMillis();
+ LOG.info("ReadHostDb: finished at " + sdf.format(end) + ", elapsed: " + TimingUtil.elapsedTime(start, end));
+ }
+
+ public static void main(String args[]) throws Exception {
+ int res = ToolRunner.run(NutchConfiguration.create(), new ReadHostDb(), args);
+ System.exit(res);
+ }
+
+ public int run(String[] args) throws Exception {
+ if (args.length < 2) {
+ System.err.println("Usage: ReadHostDb <hostdb> <output> [-dumpHomepages | -dumpHostnames | -expr <expr.>]");
+ return -1;
+ }
+
+ boolean dumpHomepages = false;
+ boolean dumpHostnames = false;
+ String expr = null;
+
+ for (int i = 0; i < args.length; i++) {
+ if (args[i].equals("-dumpHomepages")) {
+ LOG.info("ReadHostDb: dumping homepage URL's");
+ dumpHomepages = true;
+ }
+ if (args[i].equals("-dumpHostnames")) {
+ LOG.info("ReadHostDb: dumping hostnames");
+ dumpHostnames = true;
+ }
+ if (args[i].equals("-expr")) {
+ expr = args[i + 1];
+ LOG.info("ReadHostDb: evaluating expression: " + expr);
+ i++;
+ }
+ }
+
+ try {
+ readHostDb(new Path(args[0]), new Path(args[1]), dumpHomepages, dumpHostnames, expr);
+ return 0;
+ } catch (Exception e) {
+ LOG.error("ReadHostDb: " + StringUtils.stringifyException(e));
+ return -1;
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/hostdb/ResolverThread.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/hostdb/ResolverThread.java b/nutch-core/src/main/java/org/apache/nutch/hostdb/ResolverThread.java
new file mode 100644
index 0000000..e7c7978
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/hostdb/ResolverThread.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.hostdb;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.StringUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Simple runnable that performs DNS lookup for a single host.
+ */
+public class ResolverThread implements Runnable {
+
+ public static final Logger LOG = LoggerFactory.getLogger(ResolverThread.class);
+
+ protected String host = null;
+ protected HostDatum datum = null;
+ protected Text hostText = new Text();
+ protected OutputCollector<Text,HostDatum> output;
+ protected Reporter reporter;
+ protected int purgeFailedHostsThreshold;
+
+ /**
+ * Constructor.
+ */
+ public ResolverThread(String host, HostDatum datum,
+ OutputCollector<Text,HostDatum> output, Reporter reporter, int purgeFailedHostsThreshold) {
+
+ hostText.set(host);
+ this.host = host;
+ this.datum = datum;
+ this.output = output;
+ this.reporter = reporter;
+ this.purgeFailedHostsThreshold = purgeFailedHostsThreshold;
+ }
+
+ /**
+ *
+ */
+ public void run() {
+ // Resolve the host and act appropriatly
+ try {
+ // Throws an exception if host is not found
+ InetAddress inetAddr = InetAddress.getByName(host);
+
+ if (datum.isEmpty()) {
+ reporter.incrCounter("UpdateHostDb", "new_known_host" ,1);
+ datum.setLastCheck();
+ LOG.info(host + ": new_known_host " + datum);
+ } else if (datum.getDnsFailures() > 0) {
+ reporter.incrCounter("UpdateHostDb", "rediscovered_host" ,1);
+ datum.setLastCheck();
+ datum.setDnsFailures(0);
+ LOG.info(host + ": rediscovered_host " + datum);
+ } else {
+ reporter.incrCounter("UpdateHostDb", "existing_known_host", 1);
+ datum.setLastCheck();
+ LOG.info(host + ": existing_known_host " + datum);
+ }
+
+ // Write the host datum
+ output.collect(hostText, datum);
+ } catch (UnknownHostException e) {
+ try {
+ // If the counter is empty we'll initialize with date = today and 1 failure
+ if (datum.isEmpty()) {
+ datum.setLastCheck();
+ datum.setDnsFailures(1);
+ output.collect(hostText, datum);
+ reporter.incrCounter("UpdateHostDb", "new_unknown_host", 1);
+ LOG.info(host + ": new_unknown_host " + datum);
+ } else {
+ datum.setLastCheck();
+ datum.incDnsFailures();
+
+ // Check if this host should be forgotten
+ if (purgeFailedHostsThreshold == -1 ||
+ purgeFailedHostsThreshold < datum.getDnsFailures()) {
+
+ output.collect(hostText, datum);
+ reporter.incrCounter("UpdateHostDb", "existing_unknown_host" ,1);
+ LOG.info(host + ": existing_unknown_host " + datum);
+ } else {
+ reporter.incrCounter("UpdateHostDb", "purged_unknown_host" ,1);
+ LOG.info(host + ": purged_unknown_host " + datum);
+ }
+ }
+
+ reporter.incrCounter("UpdateHostDb",
+ Integer.toString(datum.numFailures()) + "_times_failed", 1);
+ } catch (Exception ioe) {
+ LOG.warn(StringUtils.stringifyException(ioe));
+ }
+ } catch (Exception e) {
+ LOG.warn(StringUtils.stringifyException(e));
+ }
+
+ reporter.incrCounter("UpdateHostDb", "checked_hosts", 1);
+ }
+}
\ No newline at end of file